View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.codecs;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.WritableByteChannel;
33  
34  import org.apache.http.annotation.NotThreadSafe;
35  import org.apache.http.impl.io.HttpTransportMetricsImpl;
36  import org.apache.http.nio.ContentEncoder;
37  import org.apache.http.nio.reactor.SessionOutputBuffer;
38  import org.apache.http.util.Args;
39  import org.apache.http.util.Asserts;
40  
41  /**
42   * Abstract {@link ContentEncoder} that serves as a base for all content
43   * encoder implementations.
44   *
45   * @since 4.0
46   */
47  @NotThreadSafe
48  public abstract class AbstractContentEncoder implements ContentEncoder {
49  
50      protected final WritableByteChannel channel;
51      protected final SessionOutputBuffer buffer;
52      protected final HttpTransportMetricsImpl metrics;
53  
54      /**
55       * TODO: make private
56       */
57      protected boolean completed;
58  
59      /**
60       * Creates an instance of this class.
61       *
62       * @param channel the destination channel.
63       * @param buffer the session output buffer that can be used to store
64       *    session data for intermediate processing.
65       * @param metrics Transport metrics of the underlying HTTP transport.
66       */
67      public AbstractContentEncoder(
68              final WritableByteChannel channel,
69              final SessionOutputBuffer buffer,
70              final HttpTransportMetricsImpl metrics) {
71          super();
72          Args.notNull(channel, "Channel");
73          Args.notNull(buffer, "Session input buffer");
74          Args.notNull(metrics, "Transport metrics");
75          this.buffer = buffer;
76          this.channel = channel;
77          this.metrics = metrics;
78      }
79  
80      @Override
81      public boolean isCompleted() {
82          return this.completed;
83      }
84  
85      @Override
86      public void complete() throws IOException {
87          this.completed = true;
88      }
89  
90      protected void assertNotCompleted() {
91          Asserts.check(!this.completed, "Encoding process already completed");
92      }
93  
94      /**
95       * Flushes content of the session buffer to the channel and updates transport metrics.
96       *
97       * @return number of bytes written to the channel.
98       *
99       * @since 4.3
100      */
101     protected int flushToChannel() throws IOException {
102         if (!this.buffer.hasData()) {
103             return 0;
104         }
105         final int bytesWritten = this.buffer.flush(this.channel);
106         if (bytesWritten > 0) {
107             this.metrics.incrementBytesTransferred(bytesWritten);
108         }
109         return bytesWritten;
110     }
111 
112     /**
113      * Flushes content of the given buffer to the channel and updates transport metrics.
114      *
115      * @return number of bytes written to the channel.
116      *
117      * @since 4.3
118      */
119     protected int writeToChannel(final ByteBuffer src) throws IOException {
120         if (!src.hasRemaining()) {
121             return 0;
122         }
123         final int bytesWritten = this.channel.write(src);
124         if (bytesWritten > 0) {
125             this.metrics.incrementBytesTransferred(bytesWritten);
126         }
127         return bytesWritten;
128     }
129 
130     /**
131      * Transfers content of the source to the channel and updates transport metrics.
132      *
133      * @param src source.
134      * @param limit max number of bytes to transfer.
135      * @return number of bytes transferred.
136      *
137      * @since 4.3
138      */
139     protected int writeToChannel(final ByteBuffer src, final int limit) throws IOException {
140         return doWriteChunk(src, limit, true);
141     }
142 
143     /**
144      * Transfers content of the source to the buffer and updates transport metrics.
145      *
146      * @param src source.
147      * @param limit max number of bytes to transfer.
148      * @return number of bytes transferred.
149      *
150      * @since 4.3
151      */
152     protected int writeToBuffer(final ByteBuffer src, final int limit) throws IOException {
153         return doWriteChunk(src, limit, false);
154     }
155 
156     private int doWriteChunk(
157         final ByteBuffer src, final int chunk, final boolean direct) throws IOException {
158         final int bytesWritten;
159         if (src.remaining() > chunk) {
160             final int oldLimit = src.limit();
161             final int newLimit = oldLimit - (src.remaining() - chunk);
162             src.limit(newLimit);
163             bytesWritten = doWriteChunk(src, direct);
164             src.limit(oldLimit);
165         } else {
166             bytesWritten = doWriteChunk(src, direct);
167         }
168         return bytesWritten;
169     }
170 
171     private int doWriteChunk(final ByteBuffer src, final boolean direct) throws IOException {
172         if (direct) {
173             final int bytesWritten = this.channel.write(src);
174             if (bytesWritten > 0) {
175                 this.metrics.incrementBytesTransferred(bytesWritten);
176             }
177             return bytesWritten;
178         } else {
179             final int chunk = src.remaining();
180             this.buffer.write(src);
181             return chunk;
182         }
183     }
184 
185 }