View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.io;
29  
30  import java.io.IOException;
31  import java.io.InputStream;
32  
33  import org.apache.http.ConnectionClosedException;
34  import org.apache.http.annotation.NotThreadSafe;
35  import org.apache.http.io.BufferInfo;
36  import org.apache.http.io.SessionInputBuffer;
37  import org.apache.http.util.Args;
38  
39  /**
40   * Input stream that cuts off after a defined number of bytes. This class
41   * is used to receive content of HTTP messages where the end of the content
42   * entity is determined by the value of the {@code Content-Length header}.
43   * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
44   * long.
45   * <p>
46   * Note that this class NEVER closes the underlying stream, even when close
47   * gets called.  Instead, it will read until the "end" of its limit on
48   * close, which allows for the seamless execution of subsequent HTTP 1.1
49   * requests, while not requiring the client to remember to read the entire
50   * contents of the response.
51   *
52   *
53   * @since 4.0
54   */
55  @NotThreadSafe
56  public class ContentLengthInputStream extends InputStream {
57  
58      private static final int BUFFER_SIZE = 2048;
59      /**
60       * The maximum number of bytes that can be read from the stream. Subsequent
61       * read operations will return -1.
62       */
63      private final long contentLength;
64  
65      /** The current position */
66      private long pos = 0;
67  
68      /** True if the stream is closed. */
69      private boolean closed = false;
70  
71      /**
72       * Wrapped input stream that all calls are delegated to.
73       */
74      private SessionInputBuffer in = null;
75  
76      /**
77       * Wraps a session input buffer and cuts off output after a defined number
78       * of bytes.
79       *
80       * @param in The session input buffer
81       * @param contentLength The maximum number of bytes that can be read from
82       * the stream. Subsequent read operations will return -1.
83       */
84      public ContentLengthInputStream(final SessionInputBuffer in, final long contentLength) {
85          super();
86          this.in = Args.notNull(in, "Session input buffer");
87          this.contentLength = Args.notNegative(contentLength, "Content length");
88      }
89  
90      /**
91       * <p>Reads until the end of the known length of content.</p>
92       *
93       * <p>Does not close the underlying socket input, but instead leaves it
94       * primed to parse the next response.</p>
95       * @throws IOException If an IO problem occurs.
96       */
97      @Override
98      public void close() throws IOException {
99          if (!closed) {
100             try {
101                 if (pos < contentLength) {
102                     final byte buffer[] = new byte[BUFFER_SIZE];
103                     while (read(buffer) >= 0) {
104                     }
105                 }
106             } finally {
107                 // close after above so that we don't throw an exception trying
108                 // to read after closed!
109                 closed = true;
110             }
111         }
112     }
113 
114     @Override
115     public int available() throws IOException {
116         if (this.in instanceof BufferInfo) {
117             final int len = ((BufferInfo) this.in).length();
118             return Math.min(len, (int) (this.contentLength - this.pos));
119         } else {
120             return 0;
121         }
122     }
123 
124     /**
125      * Read the next byte from the stream
126      * @return The next byte or -1 if the end of stream has been reached.
127      * @throws IOException If an IO problem occurs
128      * @see java.io.InputStream#read()
129      */
130     @Override
131     public int read() throws IOException {
132         if (closed) {
133             throw new IOException("Attempted read from closed stream.");
134         }
135 
136         if (pos >= contentLength) {
137             return -1;
138         }
139         final int b = this.in.read();
140         if (b == -1) {
141             if (pos < contentLength) {
142                 throw new ConnectionClosedException(
143                         "Premature end of Content-Length delimited message body (expected: "
144                         + contentLength + "; received: " + pos);
145             }
146         } else {
147             pos++;
148         }
149         return b;
150     }
151 
152     /**
153      * Does standard {@link InputStream#read(byte[], int, int)} behavior, but
154      * also notifies the watcher when the contents have been consumed.
155      *
156      * @param b     The byte array to fill.
157      * @param off   Start filling at this position.
158      * @param len   The number of bytes to attempt to read.
159      * @return The number of bytes read, or -1 if the end of content has been
160      *  reached.
161      *
162      * @throws java.io.IOException Should an error occur on the wrapped stream.
163      */
164     @Override
165     public int read (final byte[] b, final int off, final int len) throws java.io.IOException {
166         if (closed) {
167             throw new IOException("Attempted read from closed stream.");
168         }
169 
170         if (pos >= contentLength) {
171             return -1;
172         }
173 
174         int chunk = len;
175         if (pos + len > contentLength) {
176             chunk = (int) (contentLength - pos);
177         }
178         final int count = this.in.read(b, off, chunk);
179         if (count == -1 && pos < contentLength) {
180             throw new ConnectionClosedException(
181                     "Premature end of Content-Length delimited message body (expected: "
182                     + contentLength + "; received: " + pos);
183         }
184         if (count > 0) {
185             pos += count;
186         }
187         return count;
188     }
189 
190 
191     /**
192      * Read more bytes from the stream.
193      * @param b The byte array to put the new data in.
194      * @return The number of bytes read into the buffer.
195      * @throws IOException If an IO problem occurs
196      * @see java.io.InputStream#read(byte[])
197      */
198     @Override
199     public int read(final byte[] b) throws IOException {
200         return read(b, 0, b.length);
201     }
202 
203     /**
204      * Skips and discards a number of bytes from the input stream.
205      * @param n The number of bytes to skip.
206      * @return The actual number of bytes skipped. &le; 0 if no bytes
207      * are skipped.
208      * @throws IOException If an error occurs while skipping bytes.
209      * @see InputStream#skip(long)
210      */
211     @Override
212     public long skip(final long n) throws IOException {
213         if (n <= 0) {
214             return 0;
215         }
216         final byte[] buffer = new byte[BUFFER_SIZE];
217         // make sure we don't skip more bytes than are
218         // still available
219         long remaining = Math.min(n, this.contentLength - this.pos);
220         // skip and keep track of the bytes actually skipped
221         long count = 0;
222         while (remaining > 0) {
223             final int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining));
224             if (l == -1) {
225                 break;
226             }
227             count += l;
228             remaining -= l;
229         }
230         return count;
231     }
232 }