View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.LengthRequiredException;
48  import org.apache.hc.core5.http.config.CharCodingConfig;
49  import org.apache.hc.core5.http.config.H1Config;
50  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53  import org.apache.hc.core5.http.impl.Http1StreamListener;
54  import org.apache.hc.core5.http.message.MessageSupport;
55  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
56  import org.apache.hc.core5.http.nio.CapacityChannel;
57  import org.apache.hc.core5.http.nio.ContentDecoder;
58  import org.apache.hc.core5.http.nio.ContentEncoder;
59  import org.apache.hc.core5.http.nio.NHttpMessageParser;
60  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61  import org.apache.hc.core5.http.nio.SessionInputBuffer;
62  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.protocol.HttpCoreContext;
65  import org.apache.hc.core5.http.protocol.HttpProcessor;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.reactor.ProtocolIOSession;
68  import org.apache.hc.core5.util.Args;
69  import org.apache.hc.core5.util.Asserts;
70  import org.apache.hc.core5.util.Timeout;
71  
72  /**
73   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
74   * client side HTTP/1.1 messaging protocol with full support for
75   * duplexed message transmission and message pipelining.
76   *
77   * @since 5.0
78   */
79  @Internal
80  public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
81  
82      private final HttpProcessor httpProcessor;
83      private final ConnectionReuseStrategy connectionReuseStrategy;
84      private final H1Config h1Config;
85      private final Http1StreamListener streamListener;
86      private final Queue<ClientHttp1StreamHandler> pipeline;
87      private final Http1StreamChannel<HttpRequest> outputChannel;
88  
89      private volatile ClientHttp1StreamHandler outgoing;
90      private volatile ClientHttp1StreamHandler incoming;
91  
92      public ClientHttp1StreamDuplexer(
93              final ProtocolIOSession ioSession,
94              final HttpProcessor httpProcessor,
95              final H1Config h1Config,
96              final CharCodingConfig charCodingConfig,
97              final ConnectionReuseStrategy connectionReuseStrategy,
98              final NHttpMessageParser<HttpResponse> incomingMessageParser,
99              final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
100             final ContentLengthStrategy incomingContentStrategy,
101             final ContentLengthStrategy outgoingContentStrategy,
102             final Http1StreamListener streamListener) {
103         super(ioSession, h1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
104         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
105         this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
106         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
107                 DefaultConnectionReuseStrategy.INSTANCE;
108         this.streamListener = streamListener;
109         this.pipeline = new ConcurrentLinkedQueue<>();
110         this.outputChannel = new Http1StreamChannel<HttpRequest>() {
111 
112             @Override
113             public void close() {
114                 shutdownSession(CloseMode.IMMEDIATE);
115             }
116 
117             @Override
118             public void submit(
119                     final HttpRequest request,
120                     final boolean endStream,
121                     final FlushMode flushMode) throws HttpException, IOException {
122                 if (streamListener != null) {
123                     streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
124                 }
125                 commitMessageHead(request, endStream, flushMode);
126             }
127 
128             @Override
129             public void suspendOutput() throws IOException {
130                 suspendSessionOutput();
131             }
132 
133             @Override
134             public void requestOutput() {
135                 requestSessionOutput();
136             }
137 
138             @Override
139             public Timeout getSocketTimeout() {
140                 return getSessionTimeout();
141             }
142 
143             @Override
144             public void setSocketTimeout(final Timeout timeout) {
145                 setSessionTimeout(timeout);
146             }
147 
148             @Override
149             public int write(final ByteBuffer src) throws IOException {
150                 return streamOutput(src);
151             }
152 
153             @Override
154             public void complete(final List<? extends Header> trailers) throws IOException {
155                 endOutputStream(trailers);
156             }
157 
158             @Override
159             public boolean isCompleted() {
160                 return isOutputCompleted();
161             }
162 
163             @Override
164             public boolean abortGracefully() throws IOException {
165                 final MessageDelineation messageDelineation = endOutputStream(null);
166                 if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
167                     requestShutdown(CloseMode.GRACEFUL);
168                     return false;
169                 }
170                 return true;
171             }
172 
173             @Override
174             public void activate() throws HttpException, IOException {
175             }
176 
177         };
178     }
179 
180     @Override
181     void terminate(final Exception exception) {
182         if (incoming != null) {
183             incoming.failed(exception);
184             incoming.releaseResources();
185             incoming = null;
186         }
187         if (outgoing != null) {
188             outgoing.failed(exception);
189             outgoing.releaseResources();
190             outgoing = null;
191         }
192         for (;;) {
193             final ClientHttp1StreamHandler handler = pipeline.poll();
194             if (handler != null) {
195                 handler.failed(exception);
196                 handler.releaseResources();
197             } else {
198                 break;
199             }
200         }
201     }
202 
203     @Override
204     void disconnected() {
205         if (incoming != null) {
206             if (!incoming.isCompleted()) {
207                 incoming.failed(new ConnectionClosedException());
208             }
209             incoming.releaseResources();
210             incoming = null;
211         }
212         if (outgoing != null) {
213             if (!outgoing.isCompleted()) {
214                 outgoing.failed(new ConnectionClosedException());
215             }
216             outgoing.releaseResources();
217             outgoing = null;
218         }
219         for (;;) {
220             final ClientHttp1StreamHandler handler = pipeline.poll();
221             if (handler != null) {
222                 handler.failed(new ConnectionClosedException());
223                 handler.releaseResources();
224             } else {
225                 break;
226             }
227         }
228     }
229 
230     @Override
231     void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
232         if (response.getCode() >= 200) {
233             connMetrics.incrementRequestCount();
234         }
235     }
236 
237     @Override
238     void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
239         connMetrics.incrementRequestCount();
240     }
241 
242     @Override
243     protected boolean handleIncomingMessage(final HttpResponse response) throws HttpException {
244 
245         if (incoming == null) {
246             incoming = pipeline.poll();
247         }
248         if (incoming == null) {
249             throw new HttpException("Unexpected response");
250         }
251         return MessageSupport.canResponseHaveBody(incoming.getRequestMethod(), response);
252     }
253 
254     @Override
255     protected ContentDecoder createContentDecoder(
256             final long len,
257             final ReadableByteChannel channel,
258             final SessionInputBuffer buffer,
259             final BasicHttpTransportMetrics metrics) throws HttpException {
260 
261         if (len >= 0) {
262             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
263         } else if (len == ContentLengthStrategy.CHUNKED) {
264             return new ChunkDecoder(channel, buffer, h1Config, metrics);
265         } else {
266             return new IdentityDecoder(channel, buffer, metrics);
267         }
268     }
269 
270     @Override
271     protected boolean handleOutgoingMessage(final HttpRequest request) throws HttpException {
272         return true;
273     }
274 
275     @Override
276     protected ContentEncoder createContentEncoder(
277             final long len,
278             final WritableByteChannel channel,
279             final SessionOutputBuffer buffer,
280             final BasicHttpTransportMetrics metrics) throws HttpException {
281         final int chunkSizeHint = h1Config.getChunkSizeHint() >= 0 ? h1Config.getChunkSizeHint() : 2048;
282         if (len >= 0) {
283             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
284         } else if (len == ContentLengthStrategy.CHUNKED) {
285             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
286         } else {
287             throw new LengthRequiredException();
288         }
289     }
290 
291     @Override
292     boolean inputIdle() {
293         return incoming == null;
294     }
295 
296     @Override
297     boolean outputIdle() {
298         return outgoing == null && pipeline.isEmpty();
299     }
300 
301     @Override
302     void outputEnd() throws HttpException, IOException {
303         if (outgoing != null) {
304             if (outgoing.isCompleted()) {
305                 outgoing.releaseResources();
306             }
307             outgoing = null;
308         }
309     }
310 
311     @Override
312     void execute(final RequestExecutionCommand executionCommand) throws HttpException, IOException {
313         final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
314         final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
315         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
316         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
317         final ClientHttp1StreamHandlerntHttp1StreamHandler.html#ClientHttp1StreamHandler">ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
318                 outputChannel,
319                 httpProcessor,
320                 h1Config,
321                 connectionReuseStrategy,
322                 exchangeHandler,
323                 context);
324         pipeline.add(handler);
325         outgoing = handler;
326 
327         if (handler.isOutputReady()) {
328             handler.produceOutput();
329         }
330     }
331 
332     @Override
333     boolean isOutputReady() {
334         return outgoing != null && outgoing.isOutputReady();
335     }
336 
337     @Override
338     void produceOutput() throws HttpException, IOException {
339         if (outgoing != null) {
340             outgoing.produceOutput();
341         }
342     }
343 
344     @Override
345     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
346         if (streamListener != null) {
347             streamListener.onResponseHead(this, response);
348         }
349         Asserts.notNull(incoming, "Response stream handler");
350         incoming.consumeHeader(response, entityDetails);
351     }
352 
353     @Override
354     void consumeData(final ByteBuffer src) throws HttpException, IOException {
355         Asserts.notNull(incoming, "Response stream handler");
356         incoming.consumeData(src);
357     }
358 
359     @Override
360     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
361         Asserts.notNull(incoming, "Response stream handler");
362         incoming.updateCapacity(capacityChannel);
363     }
364 
365     @Override
366     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
367         Asserts.notNull(incoming, "Response stream handler");
368         incoming.dataEnd(trailers);
369     }
370 
371     @Override
372     void inputEnd() throws HttpException, IOException {
373         if (incoming != null && incoming.isResponseFinal()) {
374             if (streamListener != null) {
375                 streamListener.onExchangeComplete(this, isOpen());
376             }
377             if (incoming.isCompleted()) {
378                 incoming.releaseResources();
379             }
380             incoming = null;
381         }
382     }
383 
384     @Override
385     boolean handleTimeout() {
386         return outgoing != null && outgoing.handleTimeout();
387     }
388 
389     @Override
390     void appendState(final StringBuilder buf) {
391         super.appendState(buf);
392         super.appendState(buf);
393         buf.append(", incoming=[");
394         if (incoming != null) {
395             incoming.appendState(buf);
396         }
397         buf.append("], outgoing=[");
398         if (outgoing != null) {
399             outgoing.appendState(buf);
400         }
401         buf.append("], pipeline=");
402         buf.append(pipeline.size());
403     }
404 
405     @Override
406     public String toString() {
407         final StringBuilder buf = new StringBuilder();
408         buf.append("[");
409         appendState(buf);
410         buf.append("]");
411         return buf.toString();
412     }
413 
414 }