View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.io;
29  
30  import java.io.IOException;
31  import java.io.InputStream;
32  
33  import org.apache.http.ConnectionClosedException;
34  import org.apache.http.annotation.NotThreadSafe;
35  import org.apache.http.io.BufferInfo;
36  import org.apache.http.io.SessionInputBuffer;
37  
38  /**
39   * Input stream that cuts off after a defined number of bytes. This class
40   * is used to receive content of HTTP messages where the end of the content
41   * entity is determined by the value of the <code>Content-Length header</code>.
42   * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
43   * long.
44   * <p>
45   * Note that this class NEVER closes the underlying stream, even when close
46   * gets called.  Instead, it will read until the "end" of its limit on
47   * close, which allows for the seamless execution of subsequent HTTP 1.1
48   * requests, while not requiring the client to remember to read the entire
49   * contents of the response.
50   *
51   *
52   * @since 4.0
53   */
54  @NotThreadSafe
55  public class ContentLengthInputStream extends InputStream {
56  
57      private static final int BUFFER_SIZE = 2048;
58      /**
59       * The maximum number of bytes that can be read from the stream. Subsequent
60       * read operations will return -1.
61       */
62      private long contentLength;
63  
64      /** The current position */
65      private long pos = 0;
66  
67      /** True if the stream is closed. */
68      private boolean closed = false;
69  
70      /**
71       * Wrapped input stream that all calls are delegated to.
72       */
73      private SessionInputBuffer in = null;
74  
75      /**
76       * Wraps a session input buffer and cuts off output after a defined number
77       * of bytes.
78       *
79       * @param in The session input buffer
80       * @param contentLength The maximum number of bytes that can be read from
81       * the stream. Subsequent read operations will return -1.
82       */
83      public ContentLengthInputStream(final SessionInputBuffer in, long contentLength) {
84          super();
85          if (in == null) {
86              throw new IllegalArgumentException("Input stream may not be null");
87          }
88          if (contentLength < 0) {
89              throw new IllegalArgumentException("Content length may not be negative");
90          }
91          this.in = in;
92          this.contentLength = contentLength;
93      }
94  
95      /**
96       * <p>Reads until the end of the known length of content.</p>
97       *
98       * <p>Does not close the underlying socket input, but instead leaves it
99       * primed to parse the next response.</p>
100      * @throws IOException If an IO problem occurs.
101      */
102     @Override
103     public void close() throws IOException {
104         if (!closed) {
105             try {
106                 if (pos < contentLength) {
107                     byte buffer[] = new byte[BUFFER_SIZE];
108                     while (read(buffer) >= 0) {
109                     }
110                 }
111             } finally {
112                 // close after above so that we don't throw an exception trying
113                 // to read after closed!
114                 closed = true;
115             }
116         }
117     }
118 
119     @Override
120     public int available() throws IOException {
121         if (this.in instanceof BufferInfo) {
122             int len = ((BufferInfo) this.in).length();
123             return Math.min(len, (int) (this.contentLength - this.pos));
124         } else {
125             return 0;
126         }
127     }
128 
129     /**
130      * Read the next byte from the stream
131      * @return The next byte or -1 if the end of stream has been reached.
132      * @throws IOException If an IO problem occurs
133      * @see java.io.InputStream#read()
134      */
135     @Override
136     public int read() throws IOException {
137         if (closed) {
138             throw new IOException("Attempted read from closed stream.");
139         }
140 
141         if (pos >= contentLength) {
142             return -1;
143         }
144         int b = this.in.read();
145         if (b == -1) {
146             if (pos < contentLength) {
147                 throw new ConnectionClosedException(
148                         "Premature end of Content-Length delimited message body (expected: "
149                         + contentLength + "; received: " + pos);
150             }
151         } else {
152             pos++;
153         }
154         return b;
155     }
156 
157     /**
158      * Does standard {@link InputStream#read(byte[], int, int)} behavior, but
159      * also notifies the watcher when the contents have been consumed.
160      *
161      * @param b     The byte array to fill.
162      * @param off   Start filling at this position.
163      * @param len   The number of bytes to attempt to read.
164      * @return The number of bytes read, or -1 if the end of content has been
165      *  reached.
166      *
167      * @throws java.io.IOException Should an error occur on the wrapped stream.
168      */
169     @Override
170     public int read (byte[] b, int off, int len) throws java.io.IOException {
171         if (closed) {
172             throw new IOException("Attempted read from closed stream.");
173         }
174 
175         if (pos >= contentLength) {
176             return -1;
177         }
178 
179         if (pos + len > contentLength) {
180             len = (int) (contentLength - pos);
181         }
182         int count = this.in.read(b, off, len);
183         if (count == -1 && pos < contentLength) {
184             throw new ConnectionClosedException(
185                     "Premature end of Content-Length delimited message body (expected: "
186                     + contentLength + "; received: " + pos);
187         }
188         if (count > 0) {
189             pos += count;
190         }
191         return count;
192     }
193 
194 
195     /**
196      * Read more bytes from the stream.
197      * @param b The byte array to put the new data in.
198      * @return The number of bytes read into the buffer.
199      * @throws IOException If an IO problem occurs
200      * @see java.io.InputStream#read(byte[])
201      */
202     @Override
203     public int read(byte[] b) throws IOException {
204         return read(b, 0, b.length);
205     }
206 
207     /**
208      * Skips and discards a number of bytes from the input stream.
209      * @param n The number of bytes to skip.
210      * @return The actual number of bytes skipped. <= 0 if no bytes
211      * are skipped.
212      * @throws IOException If an error occurs while skipping bytes.
213      * @see InputStream#skip(long)
214      */
215     @Override
216     public long skip(long n) throws IOException {
217         if (n <= 0) {
218             return 0;
219         }
220         byte[] buffer = new byte[BUFFER_SIZE];
221         // make sure we don't skip more bytes than are
222         // still available
223         long remaining = Math.min(n, this.contentLength - this.pos);
224         // skip and keep track of the bytes actually skipped
225         long count = 0;
226         while (remaining > 0) {
227             int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining));
228             if (l == -1) {
229                 break;
230             }
231             count += l;
232             remaining -= l;
233         }
234         return count;
235     }
236 }