View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.io;
29  
30  import java.io.IOException;
31  import java.io.OutputStream;
32  import java.util.List;
33  
34  import org.apache.hc.core5.function.Supplier;
35  import org.apache.hc.core5.http.FormattedHeader;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.StreamClosedException;
38  import org.apache.hc.core5.http.io.SessionOutputBuffer;
39  import org.apache.hc.core5.http.message.BasicLineFormatter;
40  import org.apache.hc.core5.util.Args;
41  import org.apache.hc.core5.util.CharArrayBuffer;
42  
43  /**
44   * Implements chunked transfer coding. The content is sent in small chunks.
45   * Entities transferred using this output stream can be of unlimited length.
46   * Writes are buffered to an internal buffer (2048 default size).
47   * <p>
48   * Note that this class NEVER closes the underlying stream, even when
49   * {@link #close()} gets called.  Instead, the stream will be marked as
50   * closed and no further output will be permitted.
51   *
52   *
53   * @since 4.0
54   */
55  public class ChunkedOutputStream extends OutputStream {
56  
57      private final SessionOutputBuffer buffer;
58      private final OutputStream outputStream;
59  
60      private final byte[] cache;
61      private int cachePosition;
62      private boolean wroteLastChunk;
63      private boolean closed;
64      private final CharArrayBuffer lineBuffer;
65      private final Supplier<List<? extends Header>> trailerSupplier;
66  
67      /**
68       * Default constructor.
69       *
70       * @param buffer Session output buffer
71       * @param outputStream Output stream
72       * @param chunkCache Buffer used to aggregate smaller writes into chunks.
73       * @param trailerSupplier Trailer supplier. May be {@code null}
74       *
75       * @since 5.1
76       */
77      public ChunkedOutputStream(
78              final SessionOutputBuffer buffer,
79              final OutputStream outputStream,
80              final byte[] chunkCache,
81              final Supplier<List<? extends Header>> trailerSupplier) {
82          super();
83          this.buffer = Args.notNull(buffer, "Session output buffer");
84          this.outputStream = Args.notNull(outputStream, "Output stream");
85          this.cache = Args.notNull(chunkCache, "Chunk cache");
86          this.lineBuffer = new CharArrayBuffer(32);
87          this.trailerSupplier = trailerSupplier;
88      }
89  
90      /**
91       * Constructor taking an integer chunk size hint.
92       *
93       * @param buffer Session output buffer
94       * @param outputStream Output stream
95       * @param chunkSizeHint minimal chunk size hint
96       * @param trailerSupplier Trailer supplier. May be {@code null}
97       *
98       * @since 5.0
99       */
100     public ChunkedOutputStream(
101             final SessionOutputBuffer buffer,
102             final OutputStream outputStream,
103             final int chunkSizeHint,
104             final Supplier<List<? extends Header>> trailerSupplier) {
105         this(buffer, outputStream, new byte[chunkSizeHint > 0 ? chunkSizeHint : 8192], trailerSupplier);
106     }
107 
108     /**
109      * Constructor with no trailers.
110      *
111      * @param buffer Session output buffer
112      * @param outputStream Output stream
113      * @param chunkSizeHint minimal chunk size hint
114      */
115     public ChunkedOutputStream(final SessionOutputBuffer buffer, final OutputStream outputStream, final int chunkSizeHint) {
116         this(buffer, outputStream, chunkSizeHint, null);
117     }
118 
119     /**
120      * Writes the cache out onto the underlying stream
121      */
122     private void flushCache() throws IOException {
123         if (this.cachePosition > 0) {
124             this.lineBuffer.clear();
125             this.lineBuffer.append(Integer.toHexString(this.cachePosition));
126             this.buffer.writeLine(this.lineBuffer, this.outputStream);
127             this.buffer.write(this.cache, 0, this.cachePosition, this.outputStream);
128             this.lineBuffer.clear();
129             this.buffer.writeLine(this.lineBuffer, this.outputStream);
130             this.cachePosition = 0;
131         }
132     }
133 
134     /**
135      * Writes the cache and bufferToAppend to the underlying stream
136      * as one large chunk
137      */
138     private void flushCacheWithAppend(final byte[] bufferToAppend, final int off, final int len) throws IOException {
139         this.lineBuffer.clear();
140         this.lineBuffer.append(Integer.toHexString(this.cachePosition + len));
141         this.buffer.writeLine(this.lineBuffer, this.outputStream);
142         this.buffer.write(this.cache, 0, this.cachePosition, this.outputStream);
143         this.buffer.write(bufferToAppend, off, len, this.outputStream);
144         this.lineBuffer.clear();
145         this.buffer.writeLine(this.lineBuffer, this.outputStream);
146         this.cachePosition = 0;
147     }
148 
149     private void writeClosingChunk() throws IOException {
150         // Write the final chunk.
151         this.lineBuffer.clear();
152         this.lineBuffer.append('0');
153         this.buffer.writeLine(this.lineBuffer, this.outputStream);
154         writeTrailers();
155         this.lineBuffer.clear();
156         this.buffer.writeLine(this.lineBuffer, this.outputStream);
157     }
158 
159     private void writeTrailers() throws IOException {
160         final List<? extends Header> trailers = this.trailerSupplier != null ? this.trailerSupplier.get() : null;
161         if (trailers != null) {
162             for (int i = 0; i < trailers.size(); i++) {
163                 final Header header = trailers.get(i);
164                 if (header instanceof FormattedHeader) {
165                     final CharArrayBuffer chbuffer = ((FormattedHeader) header).getBuffer();
166                     this.buffer.writeLine(chbuffer, this.outputStream);
167                 } else {
168                     this.lineBuffer.clear();
169                     BasicLineFormatter.INSTANCE.formatHeader(this.lineBuffer, header);
170                     this.buffer.writeLine(this.lineBuffer, this.outputStream);
171                 }
172             }
173         }
174     }
175 
176     // ----------------------------------------------------------- Public Methods
177     /**
178      * Must be called to ensure the internal cache is flushed and the closing
179      * chunk is written.
180      * @throws IOException in case of an I/O error
181      */
182     public void finish() throws IOException {
183         if (!this.wroteLastChunk) {
184             flushCache();
185             writeClosingChunk();
186             this.wroteLastChunk = true;
187         }
188     }
189 
190     // -------------------------------------------- OutputStream Methods
191     @Override
192     public void write(final int b) throws IOException {
193         if (this.closed) {
194             throw new StreamClosedException();
195         }
196         this.cache[this.cachePosition] = (byte) b;
197         this.cachePosition++;
198         if (this.cachePosition == this.cache.length) {
199             flushCache();
200         }
201     }
202 
203     /**
204      * Writes the array. If the array does not fit within the buffer, it is
205      * not split, but rather written out as one large chunk.
206      */
207     @Override
208     public void write(final byte[] b) throws IOException {
209         write(b, 0, b.length);
210     }
211 
212     /**
213      * Writes the array. If the array does not fit within the buffer, it is
214      * not split, but rather written out as one large chunk.
215      */
216     @Override
217     public void write(final byte[] src, final int off, final int len) throws IOException {
218         if (this.closed) {
219             throw new StreamClosedException();
220         }
221         if (len >= this.cache.length - this.cachePosition) {
222             flushCacheWithAppend(src, off, len);
223         } else {
224             System.arraycopy(src, off, cache, this.cachePosition, len);
225             this.cachePosition += len;
226         }
227     }
228 
229     /**
230      * Flushes the content buffer and the underlying stream.
231      */
232     @Override
233     public void flush() throws IOException {
234         flushCache();
235         this.buffer.flush(this.outputStream);
236     }
237 
238     /**
239      * Finishes writing to the underlying stream, but does NOT close the underlying stream.
240      */
241     @Override
242     public void close() throws IOException {
243         if (!this.closed) {
244             this.closed = true;
245             finish();
246             this.buffer.flush(this.outputStream);
247         }
248     }
249 }