View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.LengthRequiredException;
48  import org.apache.hc.core5.http.config.CharCodingConfig;
49  import org.apache.hc.core5.http.config.H1Config;
50  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53  import org.apache.hc.core5.http.impl.Http1StreamListener;
54  import org.apache.hc.core5.http.message.MessageSupport;
55  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56  import org.apache.hc.core5.http.nio.CapacityChannel;
57  import org.apache.hc.core5.http.nio.ContentDecoder;
58  import org.apache.hc.core5.http.nio.ContentEncoder;
59  import org.apache.hc.core5.http.nio.NHttpMessageParser;
60  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61  import org.apache.hc.core5.http.nio.SessionInputBuffer;
62  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.protocol.HttpCoreContext;
65  import org.apache.hc.core5.http.protocol.HttpProcessor;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.net.InetAddressUtils;
68  import org.apache.hc.core5.reactor.ProtocolIOSession;
69  import org.apache.hc.core5.util.Args;
70  import org.apache.hc.core5.util.Asserts;
71  
72  /**
73   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
74   * client side HTTP/1.1 messaging protocol with full support for
75   * duplexed message transmission and message pipelining.
76   *
77   * @since 5.0
78   */
79  @Internal
80  public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
81  
82      private final HttpProcessor httpProcessor;
83      private final ConnectionReuseStrategy connectionReuseStrategy;
84      private final H1Config h1Config;
85      private final Http1StreamListener streamListener;
86      private final Queue<ClientHttp1StreamHandler> pipeline;
87      private final Http1StreamChannel<HttpRequest> outputChannel;
88  
89      private volatile ClientHttp1StreamHandler outgoing;
90      private volatile ClientHttp1StreamHandler incoming;
91  
92      public ClientHttp1StreamDuplexer(
93              final ProtocolIOSession ioSession,
94              final HttpProcessor httpProcessor,
95              final H1Config h1Config,
96              final CharCodingConfig charCodingConfig,
97              final ConnectionReuseStrategy connectionReuseStrategy,
98              final NHttpMessageParser<HttpResponse> incomingMessageParser,
99              final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
100             final ContentLengthStrategy incomingContentStrategy,
101             final ContentLengthStrategy outgoingContentStrategy,
102             final Http1StreamListener streamListener) {
103         super(ioSession, h1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
104         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
105         this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
106         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
107                 DefaultConnectionReuseStrategy.INSTANCE;
108         this.streamListener = streamListener;
109         this.pipeline = new ConcurrentLinkedQueue<>();
110         this.outputChannel = new Http1StreamChannel<HttpRequest>() {
111 
112             @Override
113             public void close() {
114                 shutdownSession(CloseMode.IMMEDIATE);
115             }
116 
117             @Override
118             public void submit(final HttpRequest request, final boolean endStream) throws HttpException, IOException {
119                 if (streamListener != null) {
120                     streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
121                 }
122                 commitMessageHead(request, endStream);
123             }
124 
125             @Override
126             public void suspendOutput() {
127                 suspendSessionOutput();
128             }
129 
130             @Override
131             public void requestOutput() {
132                 requestSessionOutput();
133             }
134 
135             @Override
136             public int getSocketTimeoutMillis() {
137                 return getSessionTimeoutMillis();
138             }
139 
140             @Override
141             public void setSocketTimeoutMillis(final int timeout) {
142                 setSessionTimeoutMillis(timeout);
143             }
144 
145             @Override
146             public int write(final ByteBuffer src) throws IOException {
147                 return streamOutput(src);
148             }
149 
150             @Override
151             public void complete(final List<? extends Header> trailers) throws IOException {
152                 endOutputStream(trailers);
153             }
154 
155             @Override
156             public boolean isCompleted() {
157                 return isOutputCompleted();
158             }
159 
160             @Override
161             public boolean abortGracefully() throws IOException {
162                 final MessageDelineation messageDelineation = endOutputStream(null);
163                 if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
164                     requestShutdown(CloseMode.GRACEFUL);
165                     return false;
166                 }
167                 return true;
168             }
169 
170             @Override
171             public void activate() throws HttpException, IOException {
172             }
173 
174         };
175     }
176 
177     @Override
178     void terminate(final Exception exception) {
179         if (incoming != null) {
180             incoming.failed(exception);
181             incoming.releaseResources();
182             incoming = null;
183         }
184         if (outgoing != null) {
185             outgoing.failed(exception);
186             outgoing.releaseResources();
187             outgoing = null;
188         }
189         for (;;) {
190             final ClientHttp1StreamHandler handler = pipeline.poll();
191             if (handler != null) {
192                 handler.failed(exception);
193                 handler.releaseResources();
194             } else {
195                 break;
196             }
197         }
198     }
199 
200     @Override
201     void disconnected() {
202         if (incoming != null) {
203             if (!incoming.isCompleted()) {
204                 incoming.failed(new ConnectionClosedException());
205             }
206             incoming.releaseResources();
207             incoming = null;
208         }
209         if (outgoing != null) {
210             if (!outgoing.isCompleted()) {
211                 outgoing.failed(new ConnectionClosedException());
212             }
213             outgoing.releaseResources();
214             outgoing = null;
215         }
216         for (;;) {
217             final ClientHttp1StreamHandler handler = pipeline.poll();
218             if (handler != null) {
219                 handler.failed(new ConnectionClosedException());
220                 handler.releaseResources();
221             } else {
222                 break;
223             }
224         }
225     }
226 
227     @Override
228     void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
229         if (response.getCode() >= 200) {
230             connMetrics.incrementRequestCount();
231         }
232     }
233 
234     @Override
235     void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
236         connMetrics.incrementRequestCount();
237     }
238 
239     @Override
240     protected boolean handleIncomingMessage(final HttpResponse response) throws HttpException {
241 
242         if (incoming == null) {
243             incoming = pipeline.poll();
244         }
245         if (incoming == null) {
246             throw new HttpException("Unexpected response");
247         }
248         return MessageSupport.canResponseHaveBody(incoming.getRequestMethod(), response);
249     }
250 
251     @Override
252     protected ContentDecoder createContentDecoder(
253             final long len,
254             final ReadableByteChannel channel,
255             final SessionInputBuffer buffer,
256             final BasicHttpTransportMetrics metrics) throws HttpException {
257 
258         if (len >= 0) {
259             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
260         } else if (len == ContentLengthStrategy.CHUNKED) {
261             return new ChunkDecoder(channel, buffer, h1Config, metrics);
262         } else {
263             return new IdentityDecoder(channel, buffer, metrics);
264         }
265     }
266 
267     @Override
268     protected boolean handleOutgoingMessage(final HttpRequest request) throws HttpException {
269         return true;
270     }
271 
272     @Override
273     protected ContentEncoder createContentEncoder(
274             final long len,
275             final WritableByteChannel channel,
276             final SessionOutputBuffer buffer,
277             final BasicHttpTransportMetrics metrics) throws HttpException {
278         if (len >= 0) {
279             return new LengthDelimitedEncoder(channel, buffer, metrics, len, h1Config.getChunkSizeHint());
280         } else if (len == ContentLengthStrategy.CHUNKED) {
281             final int chunkSizeHint = h1Config.getChunkSizeHint() >= 0 ? h1Config.getChunkSizeHint() : 2048;
282             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
283         } else {
284             throw new LengthRequiredException();
285         }
286     }
287 
288     @Override
289     boolean inputIdle() {
290         return incoming == null;
291     }
292 
293     @Override
294     boolean outputIdle() {
295         return outgoing == null && pipeline.isEmpty();
296     }
297 
298     @Override
299     void outputEnd() throws HttpException, IOException {
300         if (outgoing != null) {
301             if (outgoing.isCompleted()) {
302                 outgoing.releaseResources();
303             }
304             outgoing = null;
305         }
306     }
307 
308     @Override
309     void execute(final RequestExecutionCommand executionCommand) throws HttpException, IOException {
310         final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
311         final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
312         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
313         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
314         final ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
315                 outputChannel,
316                 httpProcessor,
317                 h1Config,
318                 connectionReuseStrategy,
319                 exchangeHandler,
320                 context);
321         pipeline.add(handler);
322         outgoing = handler;
323 
324         if (handler.isOutputReady()) {
325             handler.produceOutput();
326         }
327     }
328 
329     @Override
330     boolean isOutputReady() {
331         return outgoing != null && outgoing.isOutputReady();
332     }
333 
334     @Override
335     void produceOutput() throws HttpException, IOException {
336         if (outgoing != null) {
337             outgoing.produceOutput();
338         }
339     }
340 
341     @Override
342     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
343         if (streamListener != null) {
344             streamListener.onResponseHead(this, response);
345         }
346         Asserts.notNull(incoming, "Response stream handler");
347         incoming.consumeHeader(response, entityDetails);
348     }
349 
350     @Override
351     int consumeData(final ByteBuffer src) throws HttpException, IOException {
352         Asserts.notNull(incoming, "Response stream handler");
353         return incoming.consumeData(src);
354     }
355 
356     @Override
357     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
358         Asserts.notNull(incoming, "Response stream handler");
359         incoming.updateCapacity(capacityChannel);
360     }
361 
362     @Override
363     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
364         Asserts.notNull(incoming, "Response stream handler");
365         incoming.dataEnd(trailers);
366     }
367 
368     @Override
369     void inputEnd() throws HttpException, IOException {
370         if (incoming != null && incoming.isResponseFinal()) {
371             if (streamListener != null) {
372                 streamListener.onExchangeComplete(this, isOpen());
373             }
374             if (incoming.isCompleted()) {
375                 incoming.releaseResources();
376             }
377             incoming = null;
378         }
379     }
380 
381     @Override
382     boolean handleTimeout() {
383         return outgoing != null && outgoing.handleTimeout();
384     }
385 
386     @Override
387     public String toString() {
388         final StringBuilder buffer = new StringBuilder();
389         InetAddressUtils.formatAddress(buffer, getLocalAddress());
390         buffer.append("->");
391         InetAddressUtils.formatAddress(buffer, getRemoteAddress());
392         return buffer.toString();
393     }
394 
395 }