View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.codecs;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.FileChannel;
33  import java.nio.channels.WritableByteChannel;
34  
35  import org.apache.http.annotation.NotThreadSafe;
36  import org.apache.http.impl.io.HttpTransportMetricsImpl;
37  import org.apache.http.nio.FileContentEncoder;
38  import org.apache.http.nio.reactor.SessionOutputBuffer;
39  import org.apache.http.util.Args;
40  
41  /**
42   * Content encoder that cuts off after a defined number of bytes. This class
43   * is used to send content of HTTP messages where the end of the content entity
44   * is determined by the value of the {@code Content-Length header}.
45   * Entities transferred using this stream can be maximum {@link Long#MAX_VALUE}
46   * long.
47   * <p>
48   * This decoder is optimized to transfer data directly from
49   * a {@link FileChannel} to the underlying I/O session's channel whenever
50   * possible avoiding intermediate buffering in the session buffer.
51   *
52   * @since 4.0
53   */
54  @NotThreadSafe
55  public class LengthDelimitedEncoder extends AbstractContentEncoder
56          implements FileContentEncoder {
57  
58      private final long contentLength;
59      private final int fragHint;
60  
61      private long remaining;
62  
63      /**
64       * @since 4.3
65       *
66       * @param channel underlying channel.
67       * @param buffer  session buffer.
68       * @param metrics transport metrics.
69       * @param contentLength content length.
70       * @param fragementSizeHint fragment size hint defining an minimal size of a fragment
71       *   that should be written out directly to the channel bypassing the session buffer.
72       *   Value {@code 0} disables fragment buffering.
73       */
74      public LengthDelimitedEncoder(
75              final WritableByteChannel channel,
76              final SessionOutputBuffer buffer,
77              final HttpTransportMetricsImpl metrics,
78              final long contentLength,
79              final int fragementSizeHint) {
80          super(channel, buffer, metrics);
81          Args.notNegative(contentLength, "Content length");
82          this.contentLength = contentLength;
83          this.fragHint = fragementSizeHint > 0 ? fragementSizeHint : 0;
84          this.remaining = contentLength;
85      }
86  
87      public LengthDelimitedEncoder(
88              final WritableByteChannel channel,
89              final SessionOutputBuffer buffer,
90              final HttpTransportMetricsImpl metrics,
91              final long contentLength) {
92          this(channel, buffer, metrics, contentLength, 0);
93      }
94  
95      private int nextChunk(final ByteBuffer src) {
96          return (int) Math.min(Math.min(this.remaining, Integer.MAX_VALUE), src.remaining());
97      }
98  
99      @Override
100     public int write(final ByteBuffer src) throws IOException {
101         if (src == null) {
102             return 0;
103         }
104         assertNotCompleted();
105 
106         int total = 0;
107         while (src.hasRemaining() && this.remaining > 0) {
108             if (this.buffer.hasData() || this.fragHint > 0) {
109                 final int chunk = nextChunk(src);
110                 if (chunk <= this.fragHint) {
111                     final int capacity = this.fragHint - this.buffer.length();
112                     if (capacity > 0) {
113                         final int limit = Math.min(capacity, chunk);
114                         final int bytesWritten = writeToBuffer(src, limit);
115                         this.remaining -= bytesWritten;
116                         total += bytesWritten;
117                     }
118                 }
119             }
120             if (this.buffer.hasData()) {
121                 final int chunk = nextChunk(src);
122                 if (this.buffer.length() >= this.fragHint || chunk > 0) {
123                     final int bytesWritten = flushToChannel();
124                     if (bytesWritten == 0) {
125                         break;
126                     }
127                 }
128             }
129             if (!this.buffer.hasData()) {
130                 final int chunk = nextChunk(src);
131                 if (chunk > this.fragHint) {
132                     final int bytesWritten = writeToChannel(src, chunk);
133                     this.remaining -= bytesWritten;
134                     total += bytesWritten;
135                     if (bytesWritten == 0) {
136                         break;
137                     }
138                 }
139             }
140         }
141         if (this.remaining <= 0) {
142             super.complete();
143         }
144         return total;
145     }
146 
147     @Override
148     public long transfer(
149             final FileChannel src,
150             final long position,
151             final long count) throws IOException {
152 
153         if (src == null) {
154             return 0;
155         }
156         assertNotCompleted();
157 
158         flushToChannel();
159         if (this.buffer.hasData()) {
160             return 0;
161         }
162 
163         final long chunk = Math.min(this.remaining, count);
164         final long bytesWritten = src.transferTo(position, chunk, this.channel);
165         if (bytesWritten > 0) {
166             this.metrics.incrementBytesTransferred(bytesWritten);
167         }
168         this.remaining -= bytesWritten;
169         if (this.remaining <= 0) {
170             super.complete();
171         }
172         return bytesWritten;
173     }
174 
175     @Override
176     public String toString() {
177         final StringBuilder sb = new StringBuilder();
178         sb.append("[content length: ");
179         sb.append(this.contentLength);
180         sb.append("; pos: ");
181         sb.append(this.contentLength - this.remaining);
182         sb.append("; completed: ");
183         sb.append(isCompleted());
184         sb.append("]");
185         return sb.toString();
186     }
187 
188 }