View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.io;
29  
30  import java.io.IOException;
31  import java.io.InputStream;
32  
33  import org.apache.hc.core5.http.ConnectionClosedException;
34  import org.apache.hc.core5.http.StreamClosedException;
35  import org.apache.hc.core5.http.io.SessionInputBuffer;
36  import org.apache.hc.core5.util.Args;
37  
38  /**
39   * Input stream that cuts off after a defined number of bytes. This class
40   * is used to receive content of HTTP messages where the end of the content
41   * entity is determined by the value of the {@code Content-Length header}.
42   * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
43   * long.
44   * <p>
45   * Note that this class NEVER closes the underlying stream, even when
46   * {@link #close()} gets called.  Instead, it will read until the "end" of
47   * its limit on close, which allows for the seamless execution of subsequent
48   * HTTP 1.1 requests, while not requiring the client to remember to read the
49   * entire contents of the response.
50   *
51   *
52   * @since 4.0
53   */
54  public class ContentLengthInputStream extends InputStream {
55  
56      private static final int BUFFER_SIZE = 2048;
57  
58      private final SessionInputBuffer buffer;
59      private final InputStream inputStream;
60  
61      /**
62       * The maximum number of bytes that can be read from the stream. Subsequent
63       * read operations will return -1.
64       */
65      private final long contentLength;
66  
67      /** The current position */
68      private long pos;
69  
70      /** True if the stream is closed. */
71      private boolean closed;
72  
73      /**
74       * Default constructor.
75       *
76       * @param buffer Session input buffer
77       * @param inputStream Input stream
78       * @param contentLength The maximum number of bytes that can be read from
79       * the stream. Subsequent read operations will return -1.
80       */
81      public ContentLengthInputStream(final SessionInputBuffer buffer, final InputStream inputStream, final long contentLength) {
82          super();
83          this.buffer = Args.notNull(buffer, "Session input buffer");
84          this.inputStream = Args.notNull(inputStream, "Input stream");
85          this.contentLength = Args.notNegative(contentLength, "Content length");
86      }
87  
88      /**
89       * <p>Reads until the end of the known length of content.</p>
90       *
91       * <p>Does NOT close the underlying stream, but instead leaves it
92       * primed to parse the next response.</p>
93       * @throws IOException If an IO problem occurs.
94       */
95      @Override
96      public void close() throws IOException {
97          if (!closed) {
98              try {
99                  if (pos < contentLength) {
100                     final byte[] buffer = new byte[BUFFER_SIZE];
101                     while (read(buffer) >= 0) {
102                         // keep reading
103                     }
104                 }
105             } finally {
106                 // close after above so that we don't throw an exception trying
107                 // to read after closed!
108                 closed = true;
109             }
110         }
111     }
112 
113     @Override
114     public int available() throws IOException {
115         final int len = this.buffer.length();
116         return Math.min(len, (int) (this.contentLength - this.pos));
117     }
118 
119     /**
120      * Read the next byte from the stream
121      * @return The next byte or -1 if the end of stream has been reached.
122      * @throws IOException If an IO problem occurs
123      * @see java.io.InputStream#read()
124      */
125     @Override
126     public int read() throws IOException {
127         if (closed) {
128             throw new StreamClosedException();
129         }
130 
131         if (pos >= contentLength) {
132             return -1;
133         }
134         final int b = this.buffer.read(this.inputStream);
135         if (b == -1) {
136             if (pos < contentLength) {
137                 throw new ConnectionClosedException(
138                                 "Premature end of Content-Length delimited message body (expected: %d; received: %d)",
139                                 contentLength, pos);
140             }
141         } else {
142             pos++;
143         }
144         return b;
145     }
146 
147     /**
148      * Does standard {@link InputStream#read(byte[], int, int)} behavior, but
149      * also notifies the watcher when the contents have been consumed.
150      *
151      * @param b     The byte array to fill.
152      * @param off   Start filling at this position.
153      * @param len   The number of bytes to attempt to read.
154      * @return The number of bytes read, or -1 if the end of content has been
155      *  reached.
156      *
157      * @throws java.io.IOException Should an error occur on the wrapped stream.
158      */
159     @Override
160     public int read(final byte[] b, final int off, final int len) throws java.io.IOException {
161         if (closed) {
162             throw new StreamClosedException();
163         }
164 
165         if (pos >= contentLength) {
166             return -1;
167         }
168 
169         int chunk = len;
170         if (pos + len > contentLength) {
171             chunk = (int) (contentLength - pos);
172         }
173         final int count = this.buffer.read(b, off, chunk, this.inputStream);
174         if (count == -1 && pos < contentLength) {
175             throw new ConnectionClosedException(
176                             "Premature end of Content-Length delimited message body (expected: %d; received: %d)",
177                             contentLength, pos);
178         }
179         if (count > 0) {
180             pos += count;
181         }
182         return count;
183     }
184 
185 
186     /**
187      * Read more bytes from the stream.
188      * @param b The byte array to put the new data in.
189      * @return The number of bytes read into the buffer.
190      * @throws IOException If an IO problem occurs
191      * @see java.io.InputStream#read(byte[])
192      */
193     @Override
194     public int read(final byte[] b) throws IOException {
195         return read(b, 0, b.length);
196     }
197 
198     /**
199      * Skips and discards a number of bytes from the input stream.
200      * @param n The number of bytes to skip.
201      * @return The actual number of bytes skipped. &le; 0 if no bytes
202      * are skipped.
203      * @throws IOException If an error occurs while skipping bytes.
204      * @see InputStream#skip(long)
205      */
206     @Override
207     public long skip(final long n) throws IOException {
208         if (n <= 0) {
209             return 0;
210         }
211         final byte[] buffer = new byte[BUFFER_SIZE];
212         // make sure we don't skip more bytes than are
213         // still available
214         long remaining = Math.min(n, this.contentLength - this.pos);
215         // skip and keep track of the bytes actually skipped
216         long count = 0;
217         while (remaining > 0) {
218             final int readLen = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining));
219             if (readLen == -1) {
220                 break;
221             }
222             count += readLen;
223             remaining -= readLen;
224         }
225         return count;
226     }
227 }