View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.nio;
29  
30  import java.util.Set;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import org.apache.hc.client5.http.DnsResolver;
39  import org.apache.hc.client5.http.EndpointInfo;
40  import org.apache.hc.client5.http.HttpRoute;
41  import org.apache.hc.client5.http.SchemePortResolver;
42  import org.apache.hc.client5.http.config.ConnectionConfig;
43  import org.apache.hc.client5.http.config.TlsConfig;
44  import org.apache.hc.client5.http.impl.ConnPoolSupport;
45  import org.apache.hc.client5.http.impl.ConnectionHolder;
46  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
47  import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
48  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
49  import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
50  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
51  import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
52  import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
53  import org.apache.hc.core5.annotation.Contract;
54  import org.apache.hc.core5.annotation.Internal;
55  import org.apache.hc.core5.annotation.ThreadingBehavior;
56  import org.apache.hc.core5.concurrent.BasicFuture;
57  import org.apache.hc.core5.concurrent.CallbackContribution;
58  import org.apache.hc.core5.concurrent.ComplexFuture;
59  import org.apache.hc.core5.concurrent.FutureCallback;
60  import org.apache.hc.core5.function.Resolver;
61  import org.apache.hc.core5.http.HttpConnection;
62  import org.apache.hc.core5.http.HttpHost;
63  import org.apache.hc.core5.http.HttpVersion;
64  import org.apache.hc.core5.http.ProtocolVersion;
65  import org.apache.hc.core5.http.URIScheme;
66  import org.apache.hc.core5.http.config.Lookup;
67  import org.apache.hc.core5.http.config.RegistryBuilder;
68  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
69  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
70  import org.apache.hc.core5.http.nio.HandlerFactory;
71  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
72  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
73  import org.apache.hc.core5.http.protocol.HttpContext;
74  import org.apache.hc.core5.http2.HttpVersionPolicy;
75  import org.apache.hc.core5.http2.nio.command.PingCommand;
76  import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
77  import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
78  import org.apache.hc.core5.io.CloseMode;
79  import org.apache.hc.core5.pool.ConnPoolControl;
80  import org.apache.hc.core5.pool.DefaultDisposalCallback;
81  import org.apache.hc.core5.pool.LaxConnPool;
82  import org.apache.hc.core5.pool.ManagedConnPool;
83  import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
84  import org.apache.hc.core5.pool.PoolEntry;
85  import org.apache.hc.core5.pool.PoolReusePolicy;
86  import org.apache.hc.core5.pool.PoolStats;
87  import org.apache.hc.core5.pool.StrictConnPool;
88  import org.apache.hc.core5.reactor.Command;
89  import org.apache.hc.core5.reactor.ConnectionInitiator;
90  import org.apache.hc.core5.reactor.ProtocolIOSession;
91  import org.apache.hc.core5.reactor.ssl.TlsDetails;
92  import org.apache.hc.core5.util.Args;
93  import org.apache.hc.core5.util.Deadline;
94  import org.apache.hc.core5.util.Identifiable;
95  import org.apache.hc.core5.util.TimeValue;
96  import org.apache.hc.core5.util.Timeout;
97  import org.slf4j.Logger;
98  import org.slf4j.LoggerFactory;
99  
100 /**
101  * {@code PoolingAsyncClientConnectionManager} maintains a pool of non-blocking
102  * {@link org.apache.hc.core5.http.HttpConnection}s and is able to service
103  * connection requests from multiple execution threads. Connections are pooled
104  * on a per route basis. A request for a route which already the manager has
105  * persistent connections for available in the pool will be services by leasing
106  * a connection from the pool rather than creating a new connection.
107  * <p>
108  * {@code PoolingAsyncClientConnectionManager} maintains a maximum limit
109  * of connection on a per route basis and in total. Connection limits
110  * can be adjusted using {@link ConnPoolControl} methods.
111  * <p>
112  * Total time to live (TTL) set at construction time defines maximum life span
113  * of persistent connections regardless of their expiration setting. No persistent
114  * connection will be re-used past its TTL value.
115  *
116  * @since 5.0
117  */
118 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
119 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
120 
121     private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
122 
123     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
124     public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
125 
126     private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
127     private final AsyncClientConnectionOperator connectionOperator;
128     private final AtomicBoolean closed;
129 
130     private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
131     private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
132 
133     public PoolingAsyncClientConnectionManager() {
134         this(RegistryBuilder.<TlsStrategy>create()
135                 .register(URIScheme.HTTPS.getId(), DefaultClientTlsStrategy.createDefault())
136                 .build());
137     }
138 
139     public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
140         this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
141     }
142 
143     public PoolingAsyncClientConnectionManager(
144             final Lookup<TlsStrategy> tlsStrategyLookup,
145             final PoolConcurrencyPolicy poolConcurrencyPolicy,
146             final TimeValue timeToLive) {
147         this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
148     }
149 
150     public PoolingAsyncClientConnectionManager(
151             final Lookup<TlsStrategy> tlsStrategyLookup,
152             final PoolConcurrencyPolicy poolConcurrencyPolicy,
153             final PoolReusePolicy poolReusePolicy,
154             final TimeValue timeToLive) {
155         this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
156     }
157 
158     public PoolingAsyncClientConnectionManager(
159             final Lookup<TlsStrategy> tlsStrategyLookup,
160             final PoolConcurrencyPolicy poolConcurrencyPolicy,
161             final PoolReusePolicy poolReusePolicy,
162             final TimeValue timeToLive,
163             final SchemePortResolver schemePortResolver,
164             final DnsResolver dnsResolver) {
165         this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
166                 poolConcurrencyPolicy, poolReusePolicy, timeToLive, false);
167     }
168 
169     @Internal
170     public PoolingAsyncClientConnectionManager(
171             final AsyncClientConnectionOperator connectionOperator,
172             final PoolConcurrencyPolicy poolConcurrencyPolicy,
173             final PoolReusePolicy poolReusePolicy,
174             final TimeValue timeToLive,
175             final boolean messageMultiplexing) {
176         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
177         final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> managedConnPool;
178         switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
179             case STRICT:
180                 managedConnPool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
181                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
182                         DEFAULT_MAX_TOTAL_CONNECTIONS,
183                         timeToLive,
184                         poolReusePolicy,
185                         new DefaultDisposalCallback<>(),
186                         null) {
187 
188                     @Override
189                     public void closeExpired() {
190                         enumAvailable(e -> closeIfExpired(e));
191                     }
192 
193                 };
194                 break;
195             case LAX:
196                 managedConnPool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
197                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
198                         timeToLive,
199                         poolReusePolicy,
200                         null) {
201 
202                     @Override
203                     public void closeExpired() {
204                         enumAvailable(e -> closeIfExpired(e));
205                     }
206 
207                 };
208                 break;
209             default:
210                 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
211         }
212         this.pool = messageMultiplexing ? new H2SharingConnPool<>(managedConnPool) : managedConnPool;
213         this.closed = new AtomicBoolean(false);
214     }
215 
216     @Internal
217     protected PoolingAsyncClientConnectionManager(
218             final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
219             final AsyncClientConnectionOperator connectionOperator) {
220         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
221         this.pool = Args.notNull(pool, "Connection pool");
222         this.closed = new AtomicBoolean(false);
223     }
224 
225     @Override
226     public void close() {
227         close(CloseMode.GRACEFUL);
228     }
229 
230     @Override
231     public void close(final CloseMode closeMode) {
232         if (this.closed.compareAndSet(false, true)) {
233             if (LOG.isDebugEnabled()) {
234                 LOG.debug("Shutdown connection pool {}", closeMode);
235             }
236             this.pool.close(closeMode);
237             LOG.debug("Connection pool shut down");
238         }
239     }
240 
241     private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
242         if (endpoint instanceof InternalConnectionEndpoint) {
243             return (InternalConnectionEndpoint) endpoint;
244         }
245         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
246     }
247 
248     private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
249         final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
250         final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
251         return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
252     }
253 
254     private TlsConfig resolveTlsConfig(final HttpHost host) {
255         final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
256         TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
257         if (tlsConfig == null) {
258             tlsConfig = TlsConfig.DEFAULT;
259         }
260         if (URIScheme.HTTP.same(host.getSchemeName())
261                 && tlsConfig.getHttpVersionPolicy() == HttpVersionPolicy.NEGOTIATE) {
262             // Plain HTTP does not support protocol negotiation.
263             // Fall back to HTTP/1.1
264             tlsConfig = TlsConfig.copy(tlsConfig)
265                     .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
266                     .build();
267         }
268         return tlsConfig;
269     }
270 
271     @Override
272     public Future<AsyncConnectionEndpoint> lease(
273             final String id,
274             final HttpRoute route,
275             final Object state,
276             final Timeout requestTimeout,
277             final FutureCallback<AsyncConnectionEndpoint> callback) {
278         if (LOG.isDebugEnabled()) {
279             LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
280         }
281         return new Future<AsyncConnectionEndpoint>() {
282 
283             final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
284             final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
285 
286             final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
287                     route,
288                     state,
289                     requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
290 
291                         @Override
292                         public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
293                             if (poolEntry.hasConnection()) {
294                                 final TimeValue timeToLive = connectionConfig.getTimeToLive();
295                                 if (TimeValue.isNonNegative(timeToLive)) {
296                                     if (timeToLive.getDuration() == 0
297                                             || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
298                                         poolEntry.discardConnection(CloseMode.GRACEFUL);
299                                     }
300                                 }
301                             }
302                             if (poolEntry.hasConnection()) {
303                                 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
304                                 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
305                                 if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
306                                     if (timeValue.getDuration() == 0
307                                             || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
308                                         final ProtocolVersion protocolVersion = connection.getProtocolVersion();
309                                         if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
310                                             connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
311                                                 if (result == null || !result) {
312                                                     if (LOG.isDebugEnabled()) {
313                                                         LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
314                                                     }
315                                                     poolEntry.discardConnection(CloseMode.GRACEFUL);
316                                                 }
317                                                 leaseCompleted(poolEntry);
318                                             })), Command.Priority.IMMEDIATE);
319                                             return;
320                                         }
321                                     }
322                                 }
323                             }
324                             leaseCompleted(poolEntry);
325                         }
326 
327                         void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
328                             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
329                             if (connection != null) {
330                                 connection.activate();
331                                 if (connectionConfig.getSocketTimeout() != null) {
332                                     connection.setSocketTimeout(connectionConfig.getSocketTimeout());
333                                 }
334                             }
335                             if (LOG.isDebugEnabled()) {
336                                 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
337                             }
338                             final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
339                             if (LOG.isDebugEnabled()) {
340                                 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
341                             }
342                             resultFuture.completed(endpoint);
343                         }
344 
345                         @Override
346                         public void failed(final Exception ex) {
347                             if (LOG.isDebugEnabled()) {
348                                 LOG.debug("{} endpoint lease failed", id);
349                             }
350                             resultFuture.failed(ex);
351                         }
352 
353                         @Override
354                         public void cancelled() {
355                             if (LOG.isDebugEnabled()) {
356                                 LOG.debug("{} endpoint lease cancelled", id);
357                             }
358                             resultFuture.cancel();
359                         }
360 
361                     });
362 
363             @Override
364             public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
365                 return resultFuture.get();
366             }
367 
368             @Override
369             public AsyncConnectionEndpoint get(
370                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
371                 return resultFuture.get(timeout, unit);
372             }
373 
374             @Override
375             public boolean cancel(final boolean mayInterruptIfRunning) {
376                 return leaseFuture.cancel(mayInterruptIfRunning);
377             }
378 
379             @Override
380             public boolean isDone() {
381                 return resultFuture.isDone();
382             }
383 
384             @Override
385             public boolean isCancelled() {
386                 return resultFuture.isCancelled();
387             }
388 
389         };
390     }
391 
392     @Override
393     public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
394         Args.notNull(endpoint, "Managed endpoint");
395         Args.notNull(keepAlive, "Keep-alive time");
396         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
397         if (entry == null) {
398             return;
399         }
400         if (LOG.isDebugEnabled()) {
401             LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
402         }
403         if (this.isClosed()) {
404             return;
405         }
406         final ManagedAsyncClientConnection connection = entry.getConnection();
407         boolean reusable = connection != null && connection.isOpen();
408         try {
409             if (reusable) {
410                 entry.updateState(state);
411                 entry.updateExpiry(keepAlive);
412                 connection.passivate();
413                 if (LOG.isDebugEnabled()) {
414                     final String s;
415                     if (TimeValue.isPositive(keepAlive)) {
416                         s = "for " + keepAlive;
417                     } else {
418                         s = "indefinitely";
419                     }
420                     LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
421                 }
422             }
423         } catch (final RuntimeException ex) {
424             reusable = false;
425             throw ex;
426         } finally {
427             pool.release(entry, reusable);
428             if (LOG.isDebugEnabled()) {
429                 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
430             }
431         }
432     }
433 
434     @Override
435     public Future<AsyncConnectionEndpoint> connect(
436             final AsyncConnectionEndpoint endpoint,
437             final ConnectionInitiator connectionInitiator,
438             final Timeout timeout,
439             final Object attachment,
440             final HttpContext context,
441             final FutureCallback<AsyncConnectionEndpoint> callback) {
442         Args.notNull(endpoint, "Endpoint");
443         Args.notNull(connectionInitiator, "Connection initiator");
444         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
445         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
446         if (internalEndpoint.isConnected()) {
447             resultFuture.completed(endpoint);
448             return resultFuture;
449         }
450         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
451         final HttpRoute route = poolEntry.getRoute();
452         final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
453         final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
454         final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout();
455 
456         if (LOG.isDebugEnabled()) {
457             LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
458         }
459         final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
460                 connectionInitiator,
461                 firstHop,
462                 route.getTargetName(),
463                 route.getLocalSocketAddress(),
464                 connectTimeout,
465                 route.isTunnelled() ? null : resolveTlsConfig(route.getTargetHost()),
466                 context,
467                 new FutureCallback<ManagedAsyncClientConnection>() {
468 
469                     @Override
470                     public void completed(final ManagedAsyncClientConnection connection) {
471                         try {
472                             if (LOG.isDebugEnabled()) {
473                                 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
474                             }
475                             final Timeout socketTimeout = connectionConfig.getSocketTimeout();
476                             if (socketTimeout != null) {
477                                 connection.setSocketTimeout(socketTimeout);
478                             }
479                             poolEntry.assignConnection(connection);
480                             resultFuture.completed(internalEndpoint);
481                         } catch (final RuntimeException ex) {
482                             resultFuture.failed(ex);
483                         }
484                     }
485 
486                     @Override
487                     public void failed(final Exception ex) {
488                         resultFuture.failed(ex);
489                     }
490 
491                     @Override
492                     public void cancelled() {
493                         resultFuture.cancel();
494                     }
495                 });
496         resultFuture.setDependency(connectFuture);
497         return resultFuture;
498     }
499 
500     @Override
501     public void upgrade(
502             final AsyncConnectionEndpoint endpoint,
503             final Object attachment,
504             final HttpContext context,
505             final FutureCallback<AsyncConnectionEndpoint> callback) {
506         Args.notNull(endpoint, "Managed endpoint");
507         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
508         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
509         final HttpRoute route = poolEntry.getRoute();
510         final HttpHost target = route.getTargetHost();
511         connectionOperator.upgrade(
512                 poolEntry.getConnection(),
513                 target,
514                 route.getTargetName(),
515                 attachment != null ? attachment : resolveTlsConfig(target),
516                 context,
517                 new CallbackContribution<ManagedAsyncClientConnection>(callback) {
518 
519                     @Override
520                     public void completed(final ManagedAsyncClientConnection connection) {
521                         if (LOG.isDebugEnabled()) {
522                             LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
523                         }
524                         final TlsDetails tlsDetails = connection.getTlsDetails();
525                         if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
526                             connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
527 
528                                 @Override
529                                 public void completed(final ProtocolIOSession protocolIOSession) {
530                                     if (callback != null) {
531                                         callback.completed(endpoint);
532                                     }
533                                 }
534 
535                             });
536                         } else {
537                             if (callback != null) {
538                                 callback.completed(endpoint);
539                             }
540                         }
541                     }
542                 });
543     }
544 
545     @Override
546     public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
547         upgrade(endpoint, attachment, context, null);
548     }
549 
550     @Override
551     public Set<HttpRoute> getRoutes() {
552         return pool.getRoutes();
553     }
554 
555     @Override
556     public void setMaxTotal(final int max) {
557         pool.setMaxTotal(max);
558     }
559 
560     @Override
561     public int getMaxTotal() {
562         return pool.getMaxTotal();
563     }
564 
565     @Override
566     public void setDefaultMaxPerRoute(final int max) {
567         pool.setDefaultMaxPerRoute(max);
568     }
569 
570     @Override
571     public int getDefaultMaxPerRoute() {
572         return pool.getDefaultMaxPerRoute();
573     }
574 
575     @Override
576     public void setMaxPerRoute(final HttpRoute route, final int max) {
577         pool.setMaxPerRoute(route, max);
578     }
579 
580     @Override
581     public int getMaxPerRoute(final HttpRoute route) {
582         return pool.getMaxPerRoute(route);
583     }
584 
585     @Override
586     public void closeIdle(final TimeValue idletime) {
587         if (isClosed()) {
588             return;
589         }
590         pool.closeIdle(idletime);
591     }
592 
593     @Override
594     public void closeExpired() {
595         if (isClosed()) {
596             return;
597         }
598         pool.closeExpired();
599     }
600 
601     @Override
602     public PoolStats getTotalStats() {
603         return pool.getTotalStats();
604     }
605 
606     @Override
607     public PoolStats getStats(final HttpRoute route) {
608         return pool.getStats(route);
609     }
610 
611     /**
612      * Sets the same {@link ConnectionConfig} for all routes
613      *
614      * @since 5.2
615      */
616     public void setDefaultConnectionConfig(final ConnectionConfig config) {
617         this.connectionConfigResolver = route -> config;
618     }
619 
620     /**
621      * Sets {@link Resolver} of {@link ConnectionConfig} on a per route basis.
622      *
623      * @since 5.2
624      */
625     public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
626         this.connectionConfigResolver = connectionConfigResolver;
627     }
628 
629     /**
630      * Sets the same {@link ConnectionConfig} for all hosts
631      *
632      * @since 5.2
633      */
634     public void setDefaultTlsConfig(final TlsConfig config) {
635         this.tlsConfigResolver = host -> config;
636     }
637 
638     /**
639      * Sets {@link Resolver} of {@link TlsConfig} on a per host basis.
640      *
641      * @since 5.2
642      */
643     public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
644         this.tlsConfigResolver = tlsConfigResolver;
645     }
646 
647     void closeIfExpired(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry) {
648         final long now = System.currentTimeMillis();
649         if (entry.getExpiryDeadline().isBefore(now)) {
650             entry.discardConnection(CloseMode.GRACEFUL);
651         } else {
652             final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
653             final TimeValue timeToLive = connectionConfig.getTimeToLive();
654             if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
655                 entry.discardConnection(CloseMode.GRACEFUL);
656             }
657         }
658     }
659 
660     /**
661      * @deprecated Use custom {@link #setConnectionConfigResolver(Resolver)}
662      */
663     @Deprecated
664     public TimeValue getValidateAfterInactivity() {
665         return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
666     }
667 
668     /**
669      * Defines period of inactivity after which persistent connections must
670      * be re-validated prior to being {@link #lease(String, HttpRoute, Object, Timeout,
671      * FutureCallback)} leased} to the consumer. Negative values passed
672      * to this method disable connection validation. This check helps detect connections
673      * that have become stale (half-closed) while kept inactive in the pool.
674      *
675      * @deprecated Use {@link #setConnectionConfigResolver(Resolver)}.
676      */
677     @Deprecated
678     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
679         setDefaultConnectionConfig(ConnectionConfig.custom()
680                 .setValidateAfterInactivity(validateAfterInactivity)
681                 .build());
682     }
683 
684     private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
685 
686     static class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements ConnectionHolder, Identifiable {
687 
688         private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
689         private final String id;
690 
691         InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
692             this.poolEntryRef = new AtomicReference<>(poolEntry);
693             this.id = INCREMENTING_ID.getNextId();
694         }
695 
696         @Override
697         public String getId() {
698             return id;
699         }
700 
701         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
702             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
703             if (poolEntry == null) {
704                 throw new ConnectionShutdownException();
705             }
706             return poolEntry;
707         }
708 
709         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
710             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
711             if (poolEntry.getConnection() == null) {
712                 throw new ConnectionShutdownException();
713             }
714             return poolEntry;
715         }
716 
717         PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
718             return poolEntryRef.getAndSet(null);
719         }
720 
721         @Override
722         public void close(final CloseMode closeMode) {
723             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
724             if (poolEntry != null) {
725                 if (LOG.isDebugEnabled()) {
726                     LOG.debug("{} close {}", id, closeMode);
727                 }
728                 poolEntry.discardConnection(closeMode);
729             }
730         }
731 
732         @Override
733         public boolean isConnected() {
734             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
735             if (poolEntry == null) {
736                 return false;
737             }
738             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
739             if (connection == null) {
740                 return false;
741             }
742             if (!connection.isOpen()) {
743                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
744                 return false;
745             }
746             return true;
747         }
748 
749         @Override
750         public void setSocketTimeout(final Timeout timeout) {
751             getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
752         }
753 
754         @Override
755         public void execute(
756                 final String exchangeId,
757                 final AsyncClientExchangeHandler exchangeHandler,
758                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
759                 final HttpContext context) {
760             final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
761             if (LOG.isDebugEnabled()) {
762                 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
763             }
764             context.setProtocolVersion(connection.getProtocolVersion());
765             connection.submitCommand(
766                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
767                     Command.Priority.NORMAL);
768         }
769 
770         @Override
771         public EndpointInfo getInfo() {
772             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
773             if (poolEntry != null) {
774                 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
775                 if (connection != null && connection.isOpen()) {
776                     final TlsDetails tlsDetails = connection.getTlsDetails();
777                     return new EndpointInfo(connection.getProtocolVersion(), tlsDetails != null ? tlsDetails.getSSLSession() : null);
778                 }
779             }
780             return null;
781         }
782 
783         @Override
784         public HttpConnection get() {
785             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
786             return poolEntry != null ? poolEntry.getConnection() : null;
787         }
788 
789     }
790 
791     /**
792      * Method that can be called to determine whether the connection manager has been shut down and
793      * is closed or not.
794      *
795      * @return {@code true} if the connection manager has been shut down and is closed, otherwise
796      * return {@code false}.
797      * @since 5.4
798      */
799     public boolean isClosed() {
800         return this.closed.get();
801     }
802 
803 }