View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.io;
29  
30  import java.io.IOException;
31  import java.io.InputStream;
32  
33  import org.apache.hc.core5.http.ConnectionClosedException;
34  import org.apache.hc.core5.http.StreamClosedException;
35  import org.apache.hc.core5.http.io.SessionInputBuffer;
36  import org.apache.hc.core5.util.Args;
37  
38  /**
39   * Input stream that cuts off after a defined number of bytes. This class
40   * is used to receive content of HTTP messages where the end of the content
41   * entity is determined by the value of the {@code Content-Length header}.
42   * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
43   * long.
44   * <p>
45   * Note that this class NEVER closes the underlying stream, even when close
46   * gets called.  Instead, it will read until the "end" of its limit on
47   * close, which allows for the seamless execution of subsequent HTTP 1.1
48   * requests, while not requiring the client to remember to read the entire
49   * contents of the response.
50   *
51   *
52   * @since 4.0
53   */
54  public class ContentLengthInputStream extends InputStream {
55  
56      private static final int BUFFER_SIZE = 2048;
57  
58      private final SessionInputBuffer buffer;
59      private final InputStream inputStream;
60  
61      /**
62       * The maximum number of bytes that can be read from the stream. Subsequent
63       * read operations will return -1.
64       */
65      private final long contentLength;
66  
67      /** The current position */
68      private long pos = 0;
69  
70      /** True if the stream is closed. */
71      private boolean closed = false;
72  
73      /**
74       * Default constructor.
75       *
76       * @param buffer Session input buffer
77       * @param inputStream Input stream
78       * @param contentLength The maximum number of bytes that can be read from
79       * the stream. Subsequent read operations will return -1.
80       */
81      public ContentLengthInputStream(final SessionInputBuffer buffer, final InputStream inputStream, final long contentLength) {
82          super();
83          this.buffer = Args.notNull(buffer, "Session input buffer");
84          this.inputStream = Args.notNull(inputStream, "Input stream");
85          this.contentLength = Args.notNegative(contentLength, "Content length");
86      }
87  
88      /**
89       * <p>Reads until the end of the known length of content.</p>
90       *
91       * <p>Does not close the underlying socket input, but instead leaves it
92       * primed to parse the next response.</p>
93       * @throws IOException If an IO problem occurs.
94       */
95      @Override
96      public void close() throws IOException {
97          if (!closed) {
98              try {
99                  if (pos < contentLength) {
100                     final byte[] buffer = new byte[BUFFER_SIZE];
101                     while (read(buffer) >= 0) {
102                     }
103                 }
104             } finally {
105                 // close after above so that we don't throw an exception trying
106                 // to read after closed!
107                 closed = true;
108             }
109         }
110     }
111 
112     @Override
113     public int available() throws IOException {
114         final int len = this.buffer.length();
115         return Math.min(len, (int) (this.contentLength - this.pos));
116     }
117 
118     /**
119      * Read the next byte from the stream
120      * @return The next byte or -1 if the end of stream has been reached.
121      * @throws IOException If an IO problem occurs
122      * @see java.io.InputStream#read()
123      */
124     @Override
125     public int read() throws IOException {
126         if (closed) {
127             throw new StreamClosedException("Stream already closed");
128         }
129 
130         if (pos >= contentLength) {
131             return -1;
132         }
133         final int b = this.buffer.read(this.inputStream);
134         if (b == -1) {
135             if (pos < contentLength) {
136                 throw new ConnectionClosedException(
137                         "Premature end of Content-Length delimited message body (expected: "
138                         + contentLength + "; received: " + pos);
139             }
140         } else {
141             pos++;
142         }
143         return b;
144     }
145 
146     /**
147      * Does standard {@link InputStream#read(byte[], int, int)} behavior, but
148      * also notifies the watcher when the contents have been consumed.
149      *
150      * @param b     The byte array to fill.
151      * @param off   Start filling at this position.
152      * @param len   The number of bytes to attempt to read.
153      * @return The number of bytes read, or -1 if the end of content has been
154      *  reached.
155      *
156      * @throws java.io.IOException Should an error occur on the wrapped stream.
157      */
158     @Override
159     public int read (final byte[] b, final int off, final int len) throws java.io.IOException {
160         if (closed) {
161             throw new StreamClosedException("Stream already closed");
162         }
163 
164         if (pos >= contentLength) {
165             return -1;
166         }
167 
168         int chunk = len;
169         if (pos + len > contentLength) {
170             chunk = (int) (contentLength - pos);
171         }
172         final int count = this.buffer.read(b, off, chunk, this.inputStream);
173         if (count == -1 && pos < contentLength) {
174             throw new ConnectionClosedException(
175                     "Premature end of Content-Length delimited message body (expected: "
176                     + contentLength + "; received: " + pos);
177         }
178         if (count > 0) {
179             pos += count;
180         }
181         return count;
182     }
183 
184 
185     /**
186      * Read more bytes from the stream.
187      * @param b The byte array to put the new data in.
188      * @return The number of bytes read into the buffer.
189      * @throws IOException If an IO problem occurs
190      * @see java.io.InputStream#read(byte[])
191      */
192     @Override
193     public int read(final byte[] b) throws IOException {
194         return read(b, 0, b.length);
195     }
196 
197     /**
198      * Skips and discards a number of bytes from the input stream.
199      * @param n The number of bytes to skip.
200      * @return The actual number of bytes skipped. &le; 0 if no bytes
201      * are skipped.
202      * @throws IOException If an error occurs while skipping bytes.
203      * @see InputStream#skip(long)
204      */
205     @Override
206     public long skip(final long n) throws IOException {
207         if (n <= 0) {
208             return 0;
209         }
210         final byte[] buffer = new byte[BUFFER_SIZE];
211         // make sure we don't skip more bytes than are
212         // still available
213         long remaining = Math.min(n, this.contentLength - this.pos);
214         // skip and keep track of the bytes actually skipped
215         long count = 0;
216         while (remaining > 0) {
217             final int l = read(buffer, 0, (int)Math.min(BUFFER_SIZE, remaining));
218             if (l == -1) {
219                 break;
220             }
221             count += l;
222             remaining -= l;
223         }
224         return count;
225     }
226 }