1 /*
2 * ====================================================================
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 * ====================================================================
20 *
21 * This software consists of voluntary contributions made by many
22 * individuals on behalf of the Apache Software Foundation. For more
23 * information on the Apache Software Foundation, please see
24 * <http://www.apache.org/>.
25 *
26 */
27
28 package org.apache.http.impl.io;
29
30 import java.io.IOException;
31 import java.io.InputStream;
32
33 import org.apache.http.ConnectionClosedException;
34 import org.apache.http.annotation.NotThreadSafe;
35 import org.apache.http.io.BufferInfo;
36 import org.apache.http.io.SessionInputBuffer;
37 import org.apache.http.util.Args;
38
39 /**
40 * Input stream that cuts off after a defined number of bytes. This class
41 * is used to receive content of HTTP messages where the end of the content
42 * entity is determined by the value of the <code>Content-Length header</code>.
43 * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
44 * long.
45 * <p>
46 * Note that this class NEVER closes the underlying stream, even when close
47 * gets called. Instead, it will read until the "end" of its limit on
48 * close, which allows for the seamless execution of subsequent HTTP 1.1
49 * requests, while not requiring the client to remember to read the entire
50 * contents of the response.
51 *
52 *
53 * @since 4.0
54 */
55 @NotThreadSafe
56 public class ContentLengthInputStream extends InputStream {
57
58 private static final int BUFFER_SIZE = 2048;
59 /**
60 * The maximum number of bytes that can be read from the stream. Subsequent
61 * read operations will return -1.
62 */
63 private final long contentLength;
64
65 /** The current position */
66 private long pos = 0;
67
68 /** True if the stream is closed. */
69 private boolean closed = false;
70
71 /**
72 * Wrapped input stream that all calls are delegated to.
73 */
74 private SessionInputBuffer in = null;
75
76 /**
77 * Wraps a session input buffer and cuts off output after a defined number
78 * of bytes.
79 *
80 * @param in The session input buffer
81 * @param contentLength The maximum number of bytes that can be read from
82 * the stream. Subsequent read operations will return -1.
83 */
84 public ContentLengthInputStream(final SessionInputBuffer in, final long contentLength) {
85 super();
86 this.in = Args.notNull(in, "Session input buffer");
87 this.contentLength = Args.notNegative(contentLength, "Content length");
88 }
89
90 /**
91 * <p>Reads until the end of the known length of content.</p>
92 *
93 * <p>Does not close the underlying socket input, but instead leaves it
94 * primed to parse the next response.</p>
95 * @throws IOException If an IO problem occurs.
96 */
97 @Override
98 public void close() throws IOException {
99 if (!closed) {
100 try {
101 if (pos < contentLength) {
102 final byte buffer[] = new byte[BUFFER_SIZE];
103 while (read(buffer) >= 0) {
104 }
105 }
106 } finally {
107 // close after above so that we don't throw an exception trying
108 // to read after closed!
109 closed = true;
110 }
111 }
112 }
113
114 @Override
115 public int available() throws IOException {
116 if (this.in instanceof BufferInfo) {
117 final int len = ((BufferInfo) this.in).length();
118 return Math.min(len, (int) (this.contentLength - this.pos));
119 } else {
120 return 0;
121 }
122 }
123
124 /**
125 * Read the next byte from the stream
126 * @return The next byte or -1 if the end of stream has been reached.
127 * @throws IOException If an IO problem occurs
128 * @see java.io.InputStream#read()
129 */
130 @Override
131 public int read() throws IOException {
132 if (closed) {
133 throw new IOException("Attempted read from closed stream.");
134 }
135
136 if (pos >= contentLength) {
137 return -1;
138 }
139 final int b = this.in.read();
140 if (b == -1) {
141 if (pos < contentLength) {
142 throw new ConnectionClosedException(
143 "Premature end of Content-Length delimited message body (expected: "
144 + contentLength + "; received: " + pos);
145 }
146 } else {
147 pos++;
148 }
149 return b;
150 }
151
152 /**
153 * Does standard {@link InputStream#read(byte[], int, int)} behavior, but
154 * also notifies the watcher when the contents have been consumed.
155 *
156 * @param b The byte array to fill.
157 * @param off Start filling at this position.
158 * @param len The number of bytes to attempt to read.
159 * @return The number of bytes read, or -1 if the end of content has been
160 * reached.
161 *
162 * @throws java.io.IOException Should an error occur on the wrapped stream.
163 */
164 @Override
165 public int read (final byte[] b, final int off, int len) throws java.io.IOException {
166 if (closed) {
167 throw new IOException("Attempted read from closed stream.");
168 }
169
170 if (pos >= contentLength) {
171 return -1;
172 }
173
174 if (pos + len > contentLength) {
175 len = (int) (contentLength - pos);
176 }
177 final int count = this.in.read(b, off, len);
178 if (count == -1 && pos < contentLength) {
179 throw new ConnectionClosedException(
180 "Premature end of Content-Length delimited message body (expected: "
181 + contentLength + "; received: " + pos);
182 }
183 if (count > 0) {
184 pos += count;
185 }
186 return count;
187 }
188
189
190 /**
191 * Read more bytes from the stream.
192 * @param b The byte array to put the new data in.
193 * @return The number of bytes read into the buffer.
194 * @throws IOException If an IO problem occurs
195 * @see java.io.InputStream#read(byte[])
196 */
197 @Override
198 public int read(final byte[] b) throws IOException {
199 return read(b, 0, b.length);
200 }
201
202 /**
203 * Skips and discards a number of bytes from the input stream.
204 * @param n The number of bytes to skip.
205 * @return The actual number of bytes skipped. <= 0 if no bytes
206 * are skipped.
207 * @throws IOException If an error occurs while skipping bytes.
208 * @see InputStream#skip(long)
209 */
210 @Override
211 public long skip(final long n) throws IOException {
212 if (n <= 0) {
213 return 0;
214 }
215 final byte[] buffer = new byte[BUFFER_SIZE];
216 // make sure we don't skip more bytes than are
217 // still available
218 long remaining = Math.min(n, this.contentLength - this.pos);
219 // skip and keep track of the bytes actually skipped
220 long count = 0;
221 while (remaining > 0) {
222 final int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining));
223 if (l == -1) {
224 break;
225 }
226 count += l;
227 remaining -= l;
228 }
229 return count;
230 }
231 }