1 /*
2 * ====================================================================
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 * ====================================================================
20 *
21 * This software consists of voluntary contributions made by many
22 * individuals on behalf of the Apache Software Foundation. For more
23 * information on the Apache Software Foundation, please see
24 * <http://www.apache.org/>.
25 *
26 */
27
28 package org.apache.http.impl.io;
29
30 import java.io.IOException;
31 import java.io.InputStream;
32
33 import org.apache.http.ConnectionClosedException;
34 import org.apache.http.annotation.NotThreadSafe;
35 import org.apache.http.io.BufferInfo;
36 import org.apache.http.io.SessionInputBuffer;
37
38 /**
39 * Input stream that cuts off after a defined number of bytes. This class
40 * is used to receive content of HTTP messages where the end of the content
41 * entity is determined by the value of the <code>Content-Length header</code>.
42 * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
43 * long.
44 * <p>
45 * Note that this class NEVER closes the underlying stream, even when close
46 * gets called. Instead, it will read until the "end" of its limit on
47 * close, which allows for the seamless execution of subsequent HTTP 1.1
48 * requests, while not requiring the client to remember to read the entire
49 * contents of the response.
50 *
51 *
52 * @since 4.0
53 */
54 @NotThreadSafe
55 public class ContentLengthInputStream extends InputStream {
56
57 private static final int BUFFER_SIZE = 2048;
58 /**
59 * The maximum number of bytes that can be read from the stream. Subsequent
60 * read operations will return -1.
61 */
62 private long contentLength;
63
64 /** The current position */
65 private long pos = 0;
66
67 /** True if the stream is closed. */
68 private boolean closed = false;
69
70 /**
71 * Wrapped input stream that all calls are delegated to.
72 */
73 private SessionInputBuffer in = null;
74
75 /**
76 * Wraps a session input buffer and cuts off output after a defined number
77 * of bytes.
78 *
79 * @param in The session input buffer
80 * @param contentLength The maximum number of bytes that can be read from
81 * the stream. Subsequent read operations will return -1.
82 */
83 public ContentLengthInputStream(final SessionInputBuffer in, long contentLength) {
84 super();
85 if (in == null) {
86 throw new IllegalArgumentException("Input stream may not be null");
87 }
88 if (contentLength < 0) {
89 throw new IllegalArgumentException("Content length may not be negative");
90 }
91 this.in = in;
92 this.contentLength = contentLength;
93 }
94
95 /**
96 * <p>Reads until the end of the known length of content.</p>
97 *
98 * <p>Does not close the underlying socket input, but instead leaves it
99 * primed to parse the next response.</p>
100 * @throws IOException If an IO problem occurs.
101 */
102 @Override
103 public void close() throws IOException {
104 if (!closed) {
105 try {
106 if (pos < contentLength) {
107 byte buffer[] = new byte[BUFFER_SIZE];
108 while (read(buffer) >= 0) {
109 }
110 }
111 } finally {
112 // close after above so that we don't throw an exception trying
113 // to read after closed!
114 closed = true;
115 }
116 }
117 }
118
119 @Override
120 public int available() throws IOException {
121 if (this.in instanceof BufferInfo) {
122 int len = ((BufferInfo) this.in).length();
123 return Math.min(len, (int) (this.contentLength - this.pos));
124 } else {
125 return 0;
126 }
127 }
128
129 /**
130 * Read the next byte from the stream
131 * @return The next byte or -1 if the end of stream has been reached.
132 * @throws IOException If an IO problem occurs
133 * @see java.io.InputStream#read()
134 */
135 @Override
136 public int read() throws IOException {
137 if (closed) {
138 throw new IOException("Attempted read from closed stream.");
139 }
140
141 if (pos >= contentLength) {
142 return -1;
143 }
144 int b = this.in.read();
145 if (b == -1) {
146 if (pos < contentLength) {
147 throw new ConnectionClosedException(
148 "Premature end of Content-Length delimited message body (expected: "
149 + contentLength + "; received: " + pos);
150 }
151 } else {
152 pos++;
153 }
154 return b;
155 }
156
157 /**
158 * Does standard {@link InputStream#read(byte[], int, int)} behavior, but
159 * also notifies the watcher when the contents have been consumed.
160 *
161 * @param b The byte array to fill.
162 * @param off Start filling at this position.
163 * @param len The number of bytes to attempt to read.
164 * @return The number of bytes read, or -1 if the end of content has been
165 * reached.
166 *
167 * @throws java.io.IOException Should an error occur on the wrapped stream.
168 */
169 @Override
170 public int read (byte[] b, int off, int len) throws java.io.IOException {
171 if (closed) {
172 throw new IOException("Attempted read from closed stream.");
173 }
174
175 if (pos >= contentLength) {
176 return -1;
177 }
178
179 if (pos + len > contentLength) {
180 len = (int) (contentLength - pos);
181 }
182 int count = this.in.read(b, off, len);
183 if (count == -1 && pos < contentLength) {
184 throw new ConnectionClosedException(
185 "Premature end of Content-Length delimited message body (expected: "
186 + contentLength + "; received: " + pos);
187 }
188 if (count > 0) {
189 pos += count;
190 }
191 return count;
192 }
193
194
195 /**
196 * Read more bytes from the stream.
197 * @param b The byte array to put the new data in.
198 * @return The number of bytes read into the buffer.
199 * @throws IOException If an IO problem occurs
200 * @see java.io.InputStream#read(byte[])
201 */
202 @Override
203 public int read(byte[] b) throws IOException {
204 return read(b, 0, b.length);
205 }
206
207 /**
208 * Skips and discards a number of bytes from the input stream.
209 * @param n The number of bytes to skip.
210 * @return The actual number of bytes skipped. <= 0 if no bytes
211 * are skipped.
212 * @throws IOException If an error occurs while skipping bytes.
213 * @see InputStream#skip(long)
214 */
215 @Override
216 public long skip(long n) throws IOException {
217 if (n <= 0) {
218 return 0;
219 }
220 byte[] buffer = new byte[BUFFER_SIZE];
221 // make sure we don't skip more bytes than are
222 // still available
223 long remaining = Math.min(n, this.contentLength - this.pos);
224 // skip and keep track of the bytes actually skipped
225 long count = 0;
226 while (remaining > 0) {
227 int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining));
228 if (l == -1) {
229 break;
230 }
231 count += l;
232 remaining -= l;
233 }
234 return count;
235 }
236 }