View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.bootstrap;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.annotation.Internal;
38  import org.apache.hc.core5.concurrent.BasicFuture;
39  import org.apache.hc.core5.concurrent.ComplexFuture;
40  import org.apache.hc.core5.concurrent.FutureCallback;
41  import org.apache.hc.core5.function.Callback;
42  import org.apache.hc.core5.function.Decorator;
43  import org.apache.hc.core5.http.EntityDetails;
44  import org.apache.hc.core5.http.Header;
45  import org.apache.hc.core5.http.HttpException;
46  import org.apache.hc.core5.http.HttpHost;
47  import org.apache.hc.core5.http.HttpRequest;
48  import org.apache.hc.core5.http.HttpResponse;
49  import org.apache.hc.core5.http.ProtocolException;
50  import org.apache.hc.core5.http.URIScheme;
51  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
52  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
53  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
54  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
55  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
56  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
57  import org.apache.hc.core5.http.nio.CapacityChannel;
58  import org.apache.hc.core5.http.nio.DataStreamChannel;
59  import org.apache.hc.core5.http.nio.HandlerFactory;
60  import org.apache.hc.core5.http.nio.RequestChannel;
61  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
62  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
63  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
64  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
65  import org.apache.hc.core5.http.protocol.HttpContext;
66  import org.apache.hc.core5.http.protocol.HttpCoreContext;
67  import org.apache.hc.core5.io.CloseMode;
68  import org.apache.hc.core5.net.URIAuthority;
69  import org.apache.hc.core5.pool.ConnPoolControl;
70  import org.apache.hc.core5.pool.ManagedConnPool;
71  import org.apache.hc.core5.pool.PoolEntry;
72  import org.apache.hc.core5.pool.PoolStats;
73  import org.apache.hc.core5.reactor.Command;
74  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
75  import org.apache.hc.core5.reactor.IOReactorConfig;
76  import org.apache.hc.core5.reactor.IOSession;
77  import org.apache.hc.core5.reactor.IOSessionListener;
78  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
79  import org.apache.hc.core5.util.Args;
80  import org.apache.hc.core5.util.TimeValue;
81  import org.apache.hc.core5.util.Timeout;
82  
83  /**
84   * HTTP/1.1 client side message exchange initiator.
85   *
86   * @since 5.0
87   */
88  public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
89  
90      private final ManagedConnPool<HttpHost, IOSession> connPool;
91      private final TlsStrategy tlsStrategy;
92  
93      /**
94       * Use {@link AsyncRequesterBootstrap} to create instances of this class.
95       */
96      @Internal
97      public HttpAsyncRequester(
98              final IOReactorConfig ioReactorConfig,
99              final IOEventHandlerFactory eventHandlerFactory,
100             final Decorator<IOSession> ioSessionDecorator,
101             final IOSessionListener sessionListener,
102             final ManagedConnPool<HttpHost, IOSession> connPool,
103             final TlsStrategy tlsStrategy) {
104         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, sessionListener, new Callback<IOSession>() {
105 
106             @Override
107             public void execute(final IOSession session) {
108                 session.enqueue(new ShutdownCommand(CloseMode.GRACEFUL), Command.Priority.IMMEDIATE);
109             }
110 
111         }, DefaultAddressResolver.INSTANCE);
112         this.connPool = Args.notNull(connPool, "Connection pool");
113         this.tlsStrategy = tlsStrategy;
114     }
115 
116     @Override
117     public PoolStats getTotalStats() {
118         return connPool.getTotalStats();
119     }
120 
121     @Override
122     public PoolStats getStats(final HttpHost route) {
123         return connPool.getStats(route);
124     }
125 
126     @Override
127     public void setMaxTotal(final int max) {
128         connPool.setMaxTotal(max);
129     }
130 
131     @Override
132     public int getMaxTotal() {
133         return connPool.getMaxTotal();
134     }
135 
136     @Override
137     public void setDefaultMaxPerRoute(final int max) {
138         connPool.setDefaultMaxPerRoute(max);
139     }
140 
141     @Override
142     public int getDefaultMaxPerRoute() {
143         return connPool.getDefaultMaxPerRoute();
144     }
145 
146     @Override
147     public void setMaxPerRoute(final HttpHost route, final int max) {
148         connPool.setMaxPerRoute(route, max);
149     }
150 
151     @Override
152     public int getMaxPerRoute(final HttpHost route) {
153         return connPool.getMaxPerRoute(route);
154     }
155 
156     @Override
157     public void closeIdle(final TimeValue idleTime) {
158         connPool.closeIdle(idleTime);
159     }
160 
161     @Override
162     public void closeExpired() {
163         connPool.closeExpired();
164     }
165 
166     @Override
167     public Set<HttpHost> getRoutes() {
168         return connPool.getRoutes();
169     }
170 
171     public Future<AsyncClientEndpoint> connect(
172             final HttpHost host,
173             final Timeout timeout,
174             final Object attachment,
175             final FutureCallback<AsyncClientEndpoint> callback) {
176         return doConnect(host, timeout, attachment, callback);
177     }
178 
179     protected Future<AsyncClientEndpoint> doConnect(
180             final HttpHost host,
181             final Timeout timeout,
182             final Object attachment,
183             final FutureCallback<AsyncClientEndpoint> callback) {
184         Args.notNull(host, "Host");
185         Args.notNull(timeout, "Timeout");
186         final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
187         final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
188                 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
189 
190             @Override
191             public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
192                 final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
193                 final IOSession ioSession = poolEntry.getConnection();
194                 if (ioSession != null && ioSession.isClosed()) {
195                     poolEntry.discardConnection(CloseMode.IMMEDIATE);
196                 }
197                 if (poolEntry.hasConnection()) {
198                     resultFuture.completed(endpoint);
199                 } else {
200                     final Future<IOSession> futute = requestSession(host, timeout, attachment, new FutureCallback<IOSession>() {
201 
202                         @Override
203                         public void completed(final IOSession session) {
204                             if (tlsStrategy != null
205                                     && URIScheme.HTTPS.same(host.getSchemeName())
206                                     && session instanceof TransportSecurityLayer) {
207                                 tlsStrategy.upgrade(
208                                         (TransportSecurityLayer) session,
209                                         host,
210                                         session.getLocalAddress(),
211                                         session.getRemoteAddress(),
212                                         attachment);
213                             }
214                             session.setSocketTimeoutMillis(timeout.toMillisIntBound());
215                             poolEntry.assignConnection(session);
216                             resultFuture.completed(endpoint);
217                         }
218 
219                         @Override
220                         public void failed(final Exception cause) {
221                             try {
222                                 resultFuture.failed(cause);
223                             } finally {
224                                 endpoint.releaseAndDiscard();
225                             }
226                         }
227 
228                         @Override
229                         public void cancelled() {
230                             try {
231                                 resultFuture.cancel();
232                             } finally {
233                                 endpoint.releaseAndDiscard();
234                             }
235                         }
236 
237                     });
238                     resultFuture.setDependency(futute);
239                 }
240             }
241 
242             @Override
243             public void failed(final Exception ex) {
244                 resultFuture.failed(ex);
245             }
246 
247             @Override
248             public void cancelled() {
249                 resultFuture.cancel();
250             }
251 
252         });
253         resultFuture.setDependency(leaseFuture);
254         return resultFuture;
255     }
256 
257     public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) throws InterruptedException {
258         return connect(host, timeout, null, null);
259     }
260 
261     public void execute(
262             final AsyncClientExchangeHandler exchangeHandler,
263             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
264             final Timeout timeout,
265             final HttpContext executeContext) {
266         Args.notNull(exchangeHandler, "Exchange handler");
267         Args.notNull(timeout, "Timeout");
268         Args.notNull(executeContext, "Context");
269         try {
270             exchangeHandler.produceRequest(new RequestChannel() {
271 
272                 @Override
273                 public void sendRequest(
274                         final HttpRequest request,
275                         final EntityDetails entityDetails, final HttpContext requestContext) throws HttpException, IOException {
276                     final String scheme = request.getScheme();
277                     final URIAuthority authority = request.getAuthority();
278                     if (authority == null) {
279                         throw new ProtocolException("Request authority not specified");
280                     }
281                     final HttpHost target = new HttpHost(authority, scheme);
282                     connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
283 
284                         @Override
285                         public void completed(final AsyncClientEndpoint endpoint) {
286                             endpoint.execute(new AsyncClientExchangeHandler() {
287 
288                                 @Override
289                                 public void releaseResources() {
290                                     endpoint.releaseAndDiscard();
291                                     exchangeHandler.releaseResources();
292                                 }
293 
294                                 @Override
295                                 public void failed(final Exception cause) {
296                                     endpoint.releaseAndDiscard();
297                                     exchangeHandler.failed(cause);
298                                 }
299 
300                                 @Override
301                                 public void cancel() {
302                                     endpoint.releaseAndDiscard();
303                                     exchangeHandler.cancel();
304                                 }
305 
306                                 @Override
307                                 public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
308                                     channel.sendRequest(request, entityDetails, httpContext);
309                                 }
310 
311                                 @Override
312                                 public int available() {
313                                     return exchangeHandler.available();
314                                 }
315 
316                                 @Override
317                                 public void produce(final DataStreamChannel channel) throws IOException {
318                                     exchangeHandler.produce(channel);
319                                 }
320 
321                                 @Override
322                                 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
323                                     exchangeHandler.consumeInformation(response, httpContext);
324                                 }
325 
326                                 @Override
327                                 public void consumeResponse(
328                                         final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
329                                     if (entityDetails == null) {
330                                         endpoint.releaseAndReuse();
331                                     }
332                                     exchangeHandler.consumeResponse(response, entityDetails, httpContext);
333                                 }
334 
335                                 @Override
336                                 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
337                                     exchangeHandler.updateCapacity(capacityChannel);
338                                 }
339 
340                                 @Override
341                                 public int consume(final ByteBuffer src) throws IOException {
342                                     return exchangeHandler.consume(src);
343                                 }
344 
345                                 @Override
346                                 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
347                                     endpoint.releaseAndReuse();
348                                     exchangeHandler.streamEnd(trailers);
349                                 }
350 
351                             }, pushHandlerFactory, executeContext);
352 
353                         }
354 
355                         @Override
356                         public void failed(final Exception ex) {
357                             exchangeHandler.failed(ex);
358                         }
359 
360                         @Override
361                         public void cancelled() {
362                             exchangeHandler.cancel();
363                         }
364 
365                     });
366 
367                 }
368 
369             }, executeContext);
370 
371         } catch (final IOException | HttpException ex) {
372             exchangeHandler.failed(ex);
373         }
374     }
375 
376     public void execute(
377             final AsyncClientExchangeHandler exchangeHandler,
378             final Timeout timeout,
379             final HttpContext executeContext) {
380         execute(exchangeHandler, null, timeout, executeContext);
381     }
382 
383     public final <T> Future<T> execute(
384             final AsyncRequestProducer requestProducer,
385             final AsyncResponseConsumer<T> responseConsumer,
386             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
387             final Timeout timeout,
388             final HttpContext context,
389             final FutureCallback<T> callback) {
390         Args.notNull(requestProducer, "Request producer");
391         Args.notNull(responseConsumer, "Response consumer");
392         Args.notNull(timeout, "Timeout");
393         final BasicFuture<T> future = new BasicFuture<>(callback);
394         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
395 
396             @Override
397             public void completed(final T result) {
398                 future.completed(result);
399             }
400 
401             @Override
402             public void failed(final Exception ex) {
403                 future.failed(ex);
404             }
405 
406             @Override
407             public void cancelled() {
408                 future.cancel();
409             }
410 
411         });
412         execute(exchangeHandler, pushHandlerFactory, timeout, context != null ? context : HttpCoreContext.create());
413         return future;
414     }
415 
416     public final <T> Future<T> execute(
417             final AsyncRequestProducer requestProducer,
418             final AsyncResponseConsumer<T> responseConsumer,
419             final Timeout timeout,
420             final HttpContext context,
421             final FutureCallback<T> callback) {
422         return execute(requestProducer, responseConsumer, null, timeout, context, callback);
423     }
424 
425     public final <T> Future<T> execute(
426             final AsyncRequestProducer requestProducer,
427             final AsyncResponseConsumer<T> responseConsumer,
428             final Timeout timeout,
429             final FutureCallback<T> callback) {
430         return execute(requestProducer, responseConsumer, null, timeout, null, callback);
431     }
432 
433     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
434 
435         final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
436 
437         InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
438             this.poolEntryRef = new AtomicReference<>(poolEntry);
439         }
440 
441         @Override
442         public void execute(
443                 final AsyncClientExchangeHandler exchangeHandler,
444                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
445                 final HttpContext context) {
446             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
447             if (poolEntry == null) {
448                 throw new IllegalStateException("Endpoint has already been released");
449             }
450             final IOSession ioSession = poolEntry.getConnection();
451             if (ioSession == null) {
452                 throw new IllegalStateException("I/O session is invalid");
453             }
454             ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
455         }
456 
457         @Override
458         public void releaseAndReuse() {
459             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
460             if (poolEntry != null) {
461                 final IOSession ioSession = poolEntry.getConnection();
462                 connPool.release(poolEntry, ioSession != null && !ioSession.isClosed());
463             }
464         }
465 
466         @Override
467         public void releaseAndDiscard() {
468             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
469             if (poolEntry != null) {
470                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
471                 connPool.release(poolEntry, false);
472             }
473         }
474 
475     }
476 
477 }