View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.http.ConnectionClosedException;
39  import org.apache.hc.core5.http.ConnectionReuseStrategy;
40  import org.apache.hc.core5.http.ContentLengthStrategy;
41  import org.apache.hc.core5.http.EntityDetails;
42  import org.apache.hc.core5.http.Header;
43  import org.apache.hc.core5.http.HttpException;
44  import org.apache.hc.core5.http.HttpRequest;
45  import org.apache.hc.core5.http.HttpResponse;
46  import org.apache.hc.core5.http.LengthRequiredException;
47  import org.apache.hc.core5.http.config.CharCodingConfig;
48  import org.apache.hc.core5.http.config.H1Config;
49  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
50  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
51  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
52  import org.apache.hc.core5.http.impl.Http1StreamListener;
53  import org.apache.hc.core5.http.message.MessageSupport;
54  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
55  import org.apache.hc.core5.http.nio.CapacityChannel;
56  import org.apache.hc.core5.http.nio.ContentDecoder;
57  import org.apache.hc.core5.http.nio.ContentEncoder;
58  import org.apache.hc.core5.http.nio.NHttpMessageParser;
59  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
60  import org.apache.hc.core5.http.nio.SessionInputBuffer;
61  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
62  import org.apache.hc.core5.http.nio.command.ExecutionCommand;
63  import org.apache.hc.core5.http.protocol.HttpCoreContext;
64  import org.apache.hc.core5.http.protocol.HttpProcessor;
65  import org.apache.hc.core5.io.ShutdownType;
66  import org.apache.hc.core5.net.InetAddressUtils;
67  import org.apache.hc.core5.reactor.TlsCapableIOSession;
68  import org.apache.hc.core5.util.Args;
69  import org.apache.hc.core5.util.Asserts;
70  
71  public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
72  
73      private final HttpProcessor httpProcessor;
74      private final ConnectionReuseStrategy connectionReuseStrategy;
75      private final H1Config h1Config;
76      private final Http1StreamListener streamListener;
77      private final Queue<ClientHttp1StreamHandler> pipeline;
78      private final Http1StreamChannel<HttpRequest> outputChannel;
79  
80      private volatile ClientHttp1StreamHandler outgoing;
81      private volatile ClientHttp1StreamHandler incoming;
82  
83      public ClientHttp1StreamDuplexer(
84              final TlsCapableIOSession ioSession,
85              final HttpProcessor httpProcessor,
86              final H1Config h1Config,
87              final CharCodingConfig charCodingConfig,
88              final ConnectionReuseStrategy connectionReuseStrategy,
89              final NHttpMessageParser<HttpResponse> incomingMessageParser,
90              final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
91              final ContentLengthStrategy incomingContentStrategy,
92              final ContentLengthStrategy outgoingContentStrategy,
93              final Http1StreamListener streamListener) {
94          super(ioSession, h1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
95          this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
96          this.h1Config = h1Config != null ? h1Config : H1Config.DEFAULT;
97          this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
98                  DefaultConnectionReuseStrategy.INSTANCE;
99          this.streamListener = streamListener;
100         this.pipeline = new ConcurrentLinkedQueue<>();
101         this.outputChannel = new Http1StreamChannel<HttpRequest>() {
102 
103             @Override
104             public void close() {
105                 shutdownSession(ShutdownType.IMMEDIATE);
106             }
107 
108             @Override
109             public void submit(final HttpRequest request, final boolean endStream) throws HttpException, IOException {
110                 if (streamListener != null) {
111                     streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
112                 }
113                 commitMessageHead(request, endStream);
114             }
115 
116             @Override
117             public void suspendOutput() {
118                 suspendSessionOutput();
119             }
120 
121             @Override
122             public void requestOutput() {
123                 requestSessionOutput();
124             }
125 
126             @Override
127             public int getSocketTimeout() {
128                 return getSessionTimeout();
129             }
130 
131             @Override
132             public void setSocketTimeout(final int timeout) {
133                 setSessionTimeout(timeout);
134             }
135 
136             @Override
137             public int write(final ByteBuffer src) throws IOException {
138                 return streamOutput(src);
139             }
140 
141             @Override
142             public void complete(final List<? extends Header> trailers) throws IOException {
143                 endOutputStream(trailers);
144             }
145 
146             @Override
147             public boolean isCompleted() {
148                 return isOutputCompleted();
149             }
150 
151             @Override
152             public boolean abortGracefully() throws IOException {
153                 final MessageDelineation messageDelineation = endOutputStream(null);
154                 if (messageDelineation == MessageDelineation.MESSAGE_HEAD) {
155                     requestShutdown(ShutdownType.GRACEFUL);
156                     return false;
157                 } else {
158                     return true;
159                 }
160             }
161 
162             @Override
163             public void activate() throws HttpException, IOException {
164             }
165 
166         };
167     }
168 
169     @Override
170     public void releaseResources() {
171         if (incoming != null) {
172             incoming.releaseResources();
173             incoming = null;
174         }
175         if (outgoing != null) {
176             outgoing.releaseResources();
177             outgoing = null;
178         }
179         for (;;) {
180             final ClientHttp1StreamHandler handler = pipeline.poll();
181             if (handler != null) {
182                 handler.releaseResources();
183             } else {
184                 break;
185             }
186         }
187     }
188 
189     @Override
190     void terminate(final Exception exception) {
191         if (incoming != null) {
192             incoming.failed(exception);
193             incoming.releaseResources();
194             incoming = null;
195         }
196         if (outgoing != null) {
197             outgoing.failed(exception);
198             outgoing.releaseResources();
199             outgoing = null;
200         }
201         for (;;) {
202             final ClientHttp1StreamHandler handler = pipeline.poll();
203             if (handler != null) {
204                 handler.failed(exception);
205                 handler.releaseResources();
206             } else {
207                 break;
208             }
209         }
210     }
211 
212     @Override
213     void disconnected() {
214         if (incoming != null) {
215             if (!incoming.isCompleted()) {
216                 incoming.failed(new ConnectionClosedException("Connection closed"));
217             }
218             incoming.releaseResources();
219             incoming = null;
220         }
221         if (outgoing != null) {
222             if (!outgoing.isCompleted()) {
223                 outgoing.failed(new ConnectionClosedException("Connection closed"));
224             }
225             outgoing.releaseResources();
226             outgoing = null;
227         }
228         for (;;) {
229             final ClientHttp1StreamHandler handler = pipeline.poll();
230             if (handler != null) {
231                 handler.failed(new ConnectionClosedException("Connection closed"));
232                 handler.releaseResources();
233             } else {
234                 break;
235             }
236         }
237     }
238 
239     @Override
240     void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
241         if (response.getCode() >= 200) {
242             connMetrics.incrementRequestCount();
243         }
244     }
245 
246     @Override
247     void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
248         connMetrics.incrementRequestCount();
249     }
250 
251     @Override
252     protected boolean handleIncomingMessage(final HttpResponse response) throws HttpException {
253 
254         if (incoming == null) {
255             incoming = pipeline.poll();
256         }
257         if (incoming == null) {
258             throw new HttpException("Unexpected response");
259         }
260         return MessageSupport.canResponseHaveBody(incoming.getRequestMethod(), response);
261     }
262 
263     @Override
264     protected ContentDecoder createContentDecoder(
265             final long len,
266             final ReadableByteChannel channel,
267             final SessionInputBuffer buffer,
268             final BasicHttpTransportMetrics metrics) throws HttpException {
269 
270         if (len >= 0) {
271             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
272         } else if (len == ContentLengthStrategy.CHUNKED) {
273             return new ChunkDecoder(channel, buffer, h1Config, metrics);
274         } else {
275             return new IdentityDecoder(channel, buffer, metrics);
276         }
277     }
278 
279     @Override
280     protected boolean handleOutgoingMessage(final HttpRequest request) throws HttpException {
281         return true;
282     }
283 
284     @Override
285     protected ContentEncoder createContentEncoder(
286             final long len,
287             final WritableByteChannel channel,
288             final SessionOutputBuffer buffer,
289             final BasicHttpTransportMetrics metrics) throws HttpException {
290         if (len >= 0) {
291             return new LengthDelimitedEncoder(channel, buffer, metrics, len, h1Config.getChunkSizeHint());
292         } else if (len == ContentLengthStrategy.CHUNKED) {
293             final int chunkSizeHint = h1Config.getChunkSizeHint() >= 0 ? h1Config.getChunkSizeHint() : 2048;
294             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
295         } else {
296             throw new LengthRequiredException("Length required");
297         }
298     }
299 
300     @Override
301     boolean inputIdle() {
302         return incoming == null;
303     }
304 
305     @Override
306     boolean outputIdle() {
307         return outgoing == null && pipeline.isEmpty();
308     }
309 
310     @Override
311     void outputEnd() throws HttpException, IOException {
312         if (outgoing != null) {
313             if (outgoing.isCompleted()) {
314                 outgoing.releaseResources();
315             }
316             outgoing = null;
317         }
318     }
319 
320     @Override
321     void execute(final ExecutionCommand executionCommand) throws HttpException, IOException {
322         final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
323         final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
324         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
325         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
326         final ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
327                 outputChannel,
328                 httpProcessor,
329                 h1Config,
330                 connectionReuseStrategy,
331                 exchangeHandler,
332                 context);
333         pipeline.add(handler);
334         outgoing = handler;
335 
336         if (handler.isOutputReady()) {
337             handler.produceOutput();
338         }
339     }
340 
341     @Override
342     boolean isOutputReady() {
343         return outgoing != null && outgoing.isOutputReady();
344     }
345 
346     @Override
347     void produceOutput() throws HttpException, IOException {
348         if (outgoing != null) {
349             outgoing.produceOutput();
350         }
351     }
352 
353     @Override
354     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
355         if (streamListener != null) {
356             streamListener.onResponseHead(this, response);
357         }
358         Asserts.notNull(incoming, "Response stream handler");
359         incoming.consumeHeader(response, entityDetails);
360     }
361 
362     @Override
363     int consumeData(final ByteBuffer src) throws HttpException, IOException {
364         Asserts.notNull(incoming, "Response stream handler");
365         return incoming.consumeData(src);
366     }
367 
368     @Override
369     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
370         Asserts.notNull(incoming, "Response stream handler");
371         incoming.updateCapacity(capacityChannel);
372     }
373 
374     @Override
375     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
376         Asserts.notNull(incoming, "Response stream handler");
377         incoming.dataEnd(trailers);
378     }
379 
380     @Override
381     void inputEnd() throws HttpException, IOException {
382         if (incoming != null && incoming.isResponseFinal()) {
383             if (streamListener != null) {
384                 streamListener.onExchangeComplete(this, isOpen());
385             }
386             if (incoming.isCompleted()) {
387                 incoming.releaseResources();
388             }
389             incoming = null;
390         }
391     }
392 
393     @Override
394     boolean handleTimeout() {
395         return outgoing != null && outgoing.handleTimeout();
396     }
397 
398     @Override
399     public String toString() {
400         final StringBuilder buffer = new StringBuilder();
401         InetAddressUtils.formatAddress(buffer, getLocalAddress());
402         buffer.append("->");
403         InetAddressUtils.formatAddress(buffer, getRemoteAddress());
404         return buffer.toString();
405     }
406 
407 }