View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.ThreadFactory;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.SchemePortResolver;
40  import org.apache.hc.client5.http.config.Configurable;
41  import org.apache.hc.client5.http.config.RequestConfig;
42  import org.apache.hc.client5.http.impl.ConnPoolSupport;
43  import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
44  import org.apache.hc.client5.http.impl.ExecSupport;
45  import org.apache.hc.client5.http.impl.classic.RequestFailedException;
46  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
47  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
48  import org.apache.hc.client5.http.protocol.HttpClientContext;
49  import org.apache.hc.client5.http.routing.RoutingSupport;
50  import org.apache.hc.core5.annotation.Contract;
51  import org.apache.hc.core5.annotation.ThreadingBehavior;
52  import org.apache.hc.core5.concurrent.BasicFuture;
53  import org.apache.hc.core5.concurrent.Cancellable;
54  import org.apache.hc.core5.concurrent.ComplexCancellable;
55  import org.apache.hc.core5.concurrent.ComplexFuture;
56  import org.apache.hc.core5.concurrent.FutureCallback;
57  import org.apache.hc.core5.function.Callback;
58  import org.apache.hc.core5.http.EntityDetails;
59  import org.apache.hc.core5.http.Header;
60  import org.apache.hc.core5.http.HttpException;
61  import org.apache.hc.core5.http.HttpHost;
62  import org.apache.hc.core5.http.HttpRequest;
63  import org.apache.hc.core5.http.HttpResponse;
64  import org.apache.hc.core5.http.HttpStatus;
65  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
66  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68  import org.apache.hc.core5.http.nio.CapacityChannel;
69  import org.apache.hc.core5.http.nio.DataStreamChannel;
70  import org.apache.hc.core5.http.nio.HandlerFactory;
71  import org.apache.hc.core5.http.nio.RequestChannel;
72  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
73  import org.apache.hc.core5.http.protocol.HttpContext;
74  import org.apache.hc.core5.http2.HttpVersionPolicy;
75  import org.apache.hc.core5.io.CloseMode;
76  import org.apache.hc.core5.io.Closer;
77  import org.apache.hc.core5.reactor.Command;
78  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
79  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
80  import org.apache.hc.core5.reactor.IOReactorConfig;
81  import org.apache.hc.core5.reactor.IOSession;
82  import org.apache.hc.core5.util.Args;
83  import org.apache.hc.core5.util.Asserts;
84  import org.apache.hc.core5.util.TimeValue;
85  import org.apache.hc.core5.util.Timeout;
86  import org.slf4j.Logger;
87  import org.slf4j.LoggerFactory;
88  
89  /**
90   * Minimal implementation of {@link CloseableHttpAsyncClient}. This client is
91   * optimized for HTTP/1.1 and HTTP/2 message transport and does not support
92   * advanced HTTP protocol functionality such as request execution via a proxy,
93   * state management, authentication and request redirects.
94   * <p>
95   * Concurrent message exchanges executed by this client will get assigned to
96   * separate connections leased from the connection pool.
97   * </p>
98   *
99   * @since 5.0
100  */
101 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
102 public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
103 
104     private static final Logger LOG = LoggerFactory.getLogger(MinimalHttpAsyncClient.class);
105     private final AsyncClientConnectionManager manager;
106     private final SchemePortResolver schemePortResolver;
107     private final HttpVersionPolicy versionPolicy;
108 
109     MinimalHttpAsyncClient(
110             final IOEventHandlerFactory eventHandlerFactory,
111             final AsyncPushConsumerRegistry pushConsumerRegistry,
112             final HttpVersionPolicy versionPolicy,
113             final IOReactorConfig reactorConfig,
114             final ThreadFactory threadFactory,
115             final ThreadFactory workerThreadFactory,
116             final AsyncClientConnectionManager manager,
117             final SchemePortResolver schemePortResolver) {
118         super(new DefaultConnectingIOReactor(
119                 eventHandlerFactory,
120                 reactorConfig,
121                 workerThreadFactory,
122                 LoggingIOSessionDecorator.INSTANCE,
123                 LoggingExceptionCallback.INSTANCE,
124                 null,
125                 new Callback<IOSession>() {
126 
127                     @Override
128                     public void execute(final IOSession ioSession) {
129                         ioSession.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.NORMAL);
130                     }
131 
132                 }),
133                 pushConsumerRegistry,
134                 threadFactory);
135         this.manager = manager;
136         this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
137         this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
138     }
139 
140     private Future<AsyncConnectionEndpoint> leaseEndpoint(
141             final HttpHost host,
142             final Timeout connectionRequestTimeout,
143             final Timeout connectTimeout,
144             final HttpClientContext clientContext,
145             final FutureCallback<AsyncConnectionEndpoint> callback) {
146         final HttpRoute/http/HttpRoute.html#HttpRoute">HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
147         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
148         final String exchangeId = ExecSupport.getNextExchangeId();
149         clientContext.setExchangeId(exchangeId);
150         final Future<AsyncConnectionEndpoint> leaseFuture = manager.lease(
151                 exchangeId,
152                 route,
153                 null,
154                 connectionRequestTimeout,
155                 new FutureCallback<AsyncConnectionEndpoint>() {
156 
157                     @Override
158                     public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
159                         if (connectionEndpoint.isConnected()) {
160                             resultFuture.completed(connectionEndpoint);
161                         } else {
162                             final Future<AsyncConnectionEndpoint> connectFuture = manager.connect(
163                                     connectionEndpoint,
164                                     getConnectionInitiator(),
165                                     connectTimeout,
166                                     versionPolicy,
167                                     clientContext,
168                                     new FutureCallback<AsyncConnectionEndpoint>() {
169 
170                                         @Override
171                                         public void completed(final AsyncConnectionEndpoint result) {
172                                             resultFuture.completed(result);
173                                         }
174 
175                                         @Override
176                                         public void failed(final Exception ex) {
177                                             try {
178                                                 Closer.closeQuietly(connectionEndpoint);
179                                                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
180                                             } finally {
181                                                 resultFuture.failed(ex);
182                                             }
183                                         }
184 
185                                         @Override
186                                         public void cancelled() {
187                                             try {
188                                                 Closer.closeQuietly(connectionEndpoint);
189                                                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
190                                             } finally {
191                                                 resultFuture.cancel(true);
192                                             }
193                                         }
194 
195                                     });
196                             resultFuture.setDependency(connectFuture);
197                         }
198                     }
199 
200                     @Override
201                     public void failed(final Exception ex) {
202                         callback.failed(ex);
203                     }
204 
205                     @Override
206                     public void cancelled() {
207                         callback.cancelled();
208                     }
209 
210                 });
211         resultFuture.setDependency(leaseFuture);
212         return resultFuture;
213     }
214 
215     public Future<AsyncClientEndpoint> lease(
216             final HttpHost host,
217             final FutureCallback<AsyncClientEndpoint> callback) {
218         return lease(host, HttpClientContext.create(), callback);
219     }
220 
221     public Future<AsyncClientEndpoint> lease(
222             final HttpHost host,
223             final HttpContext context,
224             final FutureCallback<AsyncClientEndpoint> callback) {
225         Args.notNull(host, "Host");
226         Args.notNull(context, "HTTP context");
227         final BasicFuture<AsyncClientEndpoint> future = new BasicFuture<>(callback);
228         if (!isRunning()) {
229             future.failed(new CancellationException("Connection lease cancelled"));
230             return future;
231         }
232         final HttpClientContext clientContext = HttpClientContext.adapt(context);
233         final RequestConfig requestConfig = clientContext.getRequestConfig();
234         final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
235         final Timeout connectTimeout = requestConfig.getConnectTimeout();
236         leaseEndpoint(
237                 host,
238                 connectionRequestTimeout,
239                 connectTimeout,
240                 clientContext,
241                 new FutureCallback<AsyncConnectionEndpoint>() {
242 
243                     @Override
244                     public void completed(final AsyncConnectionEndpoint result) {
245                         future.completed(new InternalAsyncClientEndpoint(result));
246                     }
247 
248                     @Override
249                     public void failed(final Exception ex) {
250                         future.failed(ex);
251                     }
252 
253                     @Override
254                     public void cancelled() {
255                         future.cancel(true);
256                     }
257 
258                 });
259         return future;
260     }
261 
262     @Override
263     public Cancellable execute(
264             final AsyncClientExchangeHandler exchangeHandler,
265             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
266             final HttpContext context) {
267         final ComplexCancellable cancellable = new ComplexCancellable();
268         try {
269             if (!isRunning()) {
270                 throw new CancellationException("Request execution cancelled");
271             }
272             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
273             exchangeHandler.produceRequest(new RequestChannel() {
274 
275                 @Override
276                 public void sendRequest(
277                         final HttpRequest request,
278                         final EntityDetails entityDetails,
279                         final HttpContext context) throws HttpException, IOException {
280                     RequestConfig requestConfig = null;
281                     if (request instanceof Configurable) {
282                         requestConfig = ((Configurable) request).getConfig();
283                     }
284                     if (requestConfig != null) {
285                         clientContext.setRequestConfig(requestConfig);
286                     } else {
287                         requestConfig = clientContext.getRequestConfig();
288                     }
289                     final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
290                     final Timeout connectTimeout = requestConfig.getConnectTimeout();
291                     final Timeout responseTimeout = requestConfig.getResponseTimeout();
292                     final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());
293 
294                     final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
295                             target,
296                             connectionRequestTimeout,
297                             connectTimeout,
298                             clientContext,
299                             new FutureCallback<AsyncConnectionEndpoint>() {
300 
301                                 @Override
302                                 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
303                                     final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
304                                     final AtomicInteger messageCountDown = new AtomicInteger(2);
305                                     final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
306 
307                                         @Override
308                                         public void releaseResources() {
309                                             try {
310                                                 exchangeHandler.releaseResources();
311                                             } finally {
312                                                 endpoint.releaseAndDiscard();
313                                             }
314                                         }
315 
316                                         @Override
317                                         public void failed(final Exception cause) {
318                                             try {
319                                                 exchangeHandler.failed(cause);
320                                             } finally {
321                                                 endpoint.releaseAndDiscard();
322                                             }
323                                         }
324 
325                                         @Override
326                                         public void cancel() {
327                                             failed(new RequestFailedException("Request aborted"));
328                                         }
329 
330                                         @Override
331                                         public void produceRequest(
332                                                 final RequestChannel channel,
333                                                 final HttpContext context) throws HttpException, IOException {
334                                             channel.sendRequest(request, entityDetails, context);
335                                             if (entityDetails == null) {
336                                                 messageCountDown.decrementAndGet();
337                                             }
338                                         }
339 
340                                         @Override
341                                         public int available() {
342                                             return exchangeHandler.available();
343                                         }
344 
345                                         @Override
346                                         public void produce(final DataStreamChannel channel) throws IOException {
347                                             exchangeHandler.produce(new DataStreamChannel() {
348 
349                                                 @Override
350                                                 public void requestOutput() {
351                                                     channel.requestOutput();
352                                                 }
353 
354                                                 @Override
355                                                 public int write(final ByteBuffer src) throws IOException {
356                                                     return channel.write(src);
357                                                 }
358 
359                                                 @Override
360                                                 public void endStream(final List<? extends Header> trailers) throws IOException {
361                                                     channel.endStream(trailers);
362                                                     if (messageCountDown.decrementAndGet() <= 0) {
363                                                         endpoint.releaseAndReuse();
364                                                     }
365                                                 }
366 
367                                                 @Override
368                                                 public void endStream() throws IOException {
369                                                     channel.endStream();
370                                                     if (messageCountDown.decrementAndGet() <= 0) {
371                                                         endpoint.releaseAndReuse();
372                                                     }
373                                                 }
374 
375                                             });
376                                         }
377 
378                                         @Override
379                                         public void consumeInformation(
380                                                 final HttpResponse response,
381                                                 final HttpContext context) throws HttpException, IOException {
382                                             exchangeHandler.consumeInformation(response, context);
383                                         }
384 
385                                         @Override
386                                         public void consumeResponse(
387                                                 final HttpResponse response,
388                                                 final EntityDetails entityDetails,
389                                                 final HttpContext context) throws HttpException, IOException {
390                                             exchangeHandler.consumeResponse(response, entityDetails, context);
391                                             if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
392                                                 messageCountDown.decrementAndGet();
393                                             }
394                                             if (entityDetails == null) {
395                                                 if (messageCountDown.decrementAndGet() <= 0) {
396                                                     endpoint.releaseAndReuse();
397                                                 }
398                                             }
399                                         }
400 
401                                         @Override
402                                         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
403                                             exchangeHandler.updateCapacity(capacityChannel);
404                                         }
405 
406                                         @Override
407                                         public void consume(final ByteBuffer src) throws IOException {
408                                             exchangeHandler.consume(src);
409                                         }
410 
411                                         @Override
412                                         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
413                                             if (messageCountDown.decrementAndGet() <= 0) {
414                                                 endpoint.releaseAndReuse();
415                                             }
416                                             exchangeHandler.streamEnd(trailers);
417                                         }
418 
419                                     };
420                                     if (responseTimeout != null) {
421                                         endpoint.setSocketTimeout(responseTimeout);
422                                     }
423                                     endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
424                                 }
425 
426                                 @Override
427                                 public void failed(final Exception ex) {
428                                     exchangeHandler.failed(ex);
429                                 }
430 
431                                 @Override
432                                 public void cancelled() {
433                                     exchangeHandler.cancel();
434                                 }
435 
436                             });
437 
438                     cancellable.setDependency(new Cancellable() {
439 
440                         @Override
441                         public boolean cancel() {
442                             return leaseFuture.cancel(true);
443                         }
444 
445                     });
446                 }
447             }, context);
448 
449         } catch (final HttpException | IOException | IllegalStateException ex) {
450             exchangeHandler.failed(ex);
451         }
452         return cancellable;
453     }
454 
455     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
456 
457         private final AsyncConnectionEndpoint connectionEndpoint;
458         private final AtomicBoolean released;
459 
460         InternalAsyncClientEndpoint(final AsyncConnectionEndpoint connectionEndpoint) {
461             this.connectionEndpoint = connectionEndpoint;
462             this.released = new AtomicBoolean(false);
463         }
464 
465         boolean isReleased() {
466             return released.get();
467         }
468 
469         @Override
470         public boolean isConnected() {
471             return !isReleased() && connectionEndpoint.isConnected();
472         }
473 
474         @Override
475         public void execute(
476                 final AsyncClientExchangeHandler exchangeHandler,
477                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
478                 final HttpContext context) {
479             Asserts.check(!released.get(), "Endpoint has already been released");
480 
481             final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
482             final String exchangeId = ExecSupport.getNextExchangeId();
483             clientContext.setExchangeId(exchangeId);
484             if (LOG.isDebugEnabled()) {
485                 LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
486                 connectionEndpoint.execute(
487                         exchangeId,
488                         new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
489                         pushHandlerFactory,
490                         clientContext);
491             } else {
492                 connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
493             }
494         }
495 
496         public void setSocketTimeout(final Timeout timeout) {
497             connectionEndpoint.setSocketTimeout(timeout);
498         }
499 
500         @Override
501         public void releaseAndReuse() {
502             if (released.compareAndSet(false, true)) {
503                 manager.release(connectionEndpoint, null, TimeValue.NEG_ONE_MILLISECOND);
504             }
505         }
506 
507         @Override
508         public void releaseAndDiscard() {
509             if (released.compareAndSet(false, true)) {
510                 Closer.closeQuietly(connectionEndpoint);
511                 manager.release(connectionEndpoint, null, TimeValue.ZERO_MILLISECONDS);
512             }
513         }
514 
515     }
516 
517 }