View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.nio;
29  
30  import java.util.Set;
31  import java.util.concurrent.ExecutionException;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.TimeoutException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicReference;
37  
38  import org.apache.hc.client5.http.DnsResolver;
39  import org.apache.hc.client5.http.EndpointInfo;
40  import org.apache.hc.client5.http.HttpRoute;
41  import org.apache.hc.client5.http.SchemePortResolver;
42  import org.apache.hc.client5.http.config.ConnectionConfig;
43  import org.apache.hc.client5.http.config.TlsConfig;
44  import org.apache.hc.client5.http.impl.ConnPoolSupport;
45  import org.apache.hc.client5.http.impl.ConnectionShutdownException;
46  import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
47  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48  import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
49  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
50  import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
51  import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
52  import org.apache.hc.core5.annotation.Contract;
53  import org.apache.hc.core5.annotation.Internal;
54  import org.apache.hc.core5.annotation.ThreadingBehavior;
55  import org.apache.hc.core5.concurrent.BasicFuture;
56  import org.apache.hc.core5.concurrent.CallbackContribution;
57  import org.apache.hc.core5.concurrent.ComplexFuture;
58  import org.apache.hc.core5.concurrent.FutureCallback;
59  import org.apache.hc.core5.function.Resolver;
60  import org.apache.hc.core5.http.HttpHost;
61  import org.apache.hc.core5.http.HttpVersion;
62  import org.apache.hc.core5.http.ProtocolVersion;
63  import org.apache.hc.core5.http.URIScheme;
64  import org.apache.hc.core5.http.config.Lookup;
65  import org.apache.hc.core5.http.config.RegistryBuilder;
66  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68  import org.apache.hc.core5.http.nio.HandlerFactory;
69  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
71  import org.apache.hc.core5.http.protocol.HttpContext;
72  import org.apache.hc.core5.http2.HttpVersionPolicy;
73  import org.apache.hc.core5.http2.nio.command.PingCommand;
74  import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
75  import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
76  import org.apache.hc.core5.io.CloseMode;
77  import org.apache.hc.core5.pool.ConnPoolControl;
78  import org.apache.hc.core5.pool.LaxConnPool;
79  import org.apache.hc.core5.pool.ManagedConnPool;
80  import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
81  import org.apache.hc.core5.pool.PoolEntry;
82  import org.apache.hc.core5.pool.PoolReusePolicy;
83  import org.apache.hc.core5.pool.PoolStats;
84  import org.apache.hc.core5.pool.StrictConnPool;
85  import org.apache.hc.core5.reactor.Command;
86  import org.apache.hc.core5.reactor.ConnectionInitiator;
87  import org.apache.hc.core5.reactor.ProtocolIOSession;
88  import org.apache.hc.core5.reactor.ssl.TlsDetails;
89  import org.apache.hc.core5.util.Args;
90  import org.apache.hc.core5.util.Deadline;
91  import org.apache.hc.core5.util.Identifiable;
92  import org.apache.hc.core5.util.TimeValue;
93  import org.apache.hc.core5.util.Timeout;
94  import org.slf4j.Logger;
95  import org.slf4j.LoggerFactory;
96  
97  /**
98   * {@code PoolingAsyncClientConnectionManager} maintains a pool of non-blocking
99   * {@link org.apache.hc.core5.http.HttpConnection}s and is able to service
100  * connection requests from multiple execution threads. Connections are pooled
101  * on a per route basis. A request for a route which already the manager has
102  * persistent connections for available in the pool will be services by leasing
103  * a connection from the pool rather than creating a new connection.
104  * <p>
105  * {@code PoolingAsyncClientConnectionManager} maintains a maximum limit
106  * of connection on a per route basis and in total. Connection limits
107  * can be adjusted using {@link ConnPoolControl} methods.
108  * <p>
109  * Total time to live (TTL) set at construction time defines maximum life span
110  * of persistent connections regardless of their expiration setting. No persistent
111  * connection will be re-used past its TTL value.
112  *
113  * @since 5.0
114  */
115 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
116 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
117 
118     private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
119 
120     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
121     public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
122 
123     private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
124     private final AsyncClientConnectionOperator connectionOperator;
125     private final AtomicBoolean closed;
126 
127     private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
128     private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
129 
130     public PoolingAsyncClientConnectionManager() {
131         this(RegistryBuilder.<TlsStrategy>create()
132                 .register(URIScheme.HTTPS.getId(), DefaultClientTlsStrategy.createDefault())
133                 .build());
134     }
135 
136     public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
137         this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
138     }
139 
140     public PoolingAsyncClientConnectionManager(
141             final Lookup<TlsStrategy> tlsStrategyLookup,
142             final PoolConcurrencyPolicy poolConcurrencyPolicy,
143             final TimeValue timeToLive) {
144         this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
145     }
146 
147     public PoolingAsyncClientConnectionManager(
148             final Lookup<TlsStrategy> tlsStrategyLookup,
149             final PoolConcurrencyPolicy poolConcurrencyPolicy,
150             final PoolReusePolicy poolReusePolicy,
151             final TimeValue timeToLive) {
152         this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
153     }
154 
155     public PoolingAsyncClientConnectionManager(
156             final Lookup<TlsStrategy> tlsStrategyLookup,
157             final PoolConcurrencyPolicy poolConcurrencyPolicy,
158             final PoolReusePolicy poolReusePolicy,
159             final TimeValue timeToLive,
160             final SchemePortResolver schemePortResolver,
161             final DnsResolver dnsResolver) {
162         this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
163                 poolConcurrencyPolicy, poolReusePolicy, timeToLive, false);
164     }
165 
166     @Internal
167     public PoolingAsyncClientConnectionManager(
168             final AsyncClientConnectionOperator connectionOperator,
169             final PoolConcurrencyPolicy poolConcurrencyPolicy,
170             final PoolReusePolicy poolReusePolicy,
171             final TimeValue timeToLive,
172             final boolean messageMultiplexing) {
173         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
174         final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> managedConnPool;
175         switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
176             case STRICT:
177                 managedConnPool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
178                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
179                         DEFAULT_MAX_TOTAL_CONNECTIONS,
180                         timeToLive,
181                         poolReusePolicy,
182                         null) {
183 
184                     @Override
185                     public void closeExpired() {
186                         enumAvailable(e -> closeIfExpired(e));
187                     }
188 
189                 };
190                 break;
191             case LAX:
192                 managedConnPool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
193                         DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
194                         timeToLive,
195                         poolReusePolicy,
196                         null) {
197 
198                     @Override
199                     public void closeExpired() {
200                         enumAvailable(e -> closeIfExpired(e));
201                     }
202 
203                 };
204                 break;
205             default:
206                 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
207         }
208         this.pool = messageMultiplexing ? new H2SharingConnPool<>(managedConnPool) : managedConnPool;
209         this.closed = new AtomicBoolean(false);
210     }
211 
212     @Internal
213     protected PoolingAsyncClientConnectionManager(
214             final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
215             final AsyncClientConnectionOperator connectionOperator) {
216         this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
217         this.pool = Args.notNull(pool, "Connection pool");
218         this.closed = new AtomicBoolean(false);
219     }
220 
221     @Override
222     public void close() {
223         close(CloseMode.GRACEFUL);
224     }
225 
226     @Override
227     public void close(final CloseMode closeMode) {
228         if (this.closed.compareAndSet(false, true)) {
229             if (LOG.isDebugEnabled()) {
230                 LOG.debug("Shutdown connection pool {}", closeMode);
231             }
232             this.pool.close(closeMode);
233             LOG.debug("Connection pool shut down");
234         }
235     }
236 
237     private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
238         if (endpoint instanceof InternalConnectionEndpoint) {
239             return (InternalConnectionEndpoint) endpoint;
240         }
241         throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
242     }
243 
244     private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
245         final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
246         final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
247         return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
248     }
249 
250     private TlsConfig resolveTlsConfig(final HttpHost host) {
251         final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
252         TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
253         if (tlsConfig == null) {
254             tlsConfig = TlsConfig.DEFAULT;
255         }
256         if (URIScheme.HTTP.same(host.getSchemeName())
257                 && tlsConfig.getHttpVersionPolicy() == HttpVersionPolicy.NEGOTIATE) {
258             // Plain HTTP does not support protocol negotiation.
259             // Fall back to HTTP/1.1
260             tlsConfig = TlsConfig.copy(tlsConfig)
261                     .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
262                     .build();
263         }
264         return tlsConfig;
265     }
266 
267     @Override
268     public Future<AsyncConnectionEndpoint> lease(
269             final String id,
270             final HttpRoute route,
271             final Object state,
272             final Timeout requestTimeout,
273             final FutureCallback<AsyncConnectionEndpoint> callback) {
274         if (LOG.isDebugEnabled()) {
275             LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
276         }
277         return new Future<AsyncConnectionEndpoint>() {
278 
279             final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
280             final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
281 
282             final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
283                     route,
284                     state,
285                     requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
286 
287                         @Override
288                         public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
289                             if (poolEntry.hasConnection()) {
290                                 final TimeValue timeToLive = connectionConfig.getTimeToLive();
291                                 if (TimeValue.isNonNegative(timeToLive)) {
292                                     if (timeToLive.getDuration() == 0
293                                             || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
294                                         poolEntry.discardConnection(CloseMode.GRACEFUL);
295                                     }
296                                 }
297                             }
298                             if (poolEntry.hasConnection()) {
299                                 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
300                                 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
301                                 if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
302                                     if (timeValue.getDuration() == 0
303                                             || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
304                                         final ProtocolVersion protocolVersion = connection.getProtocolVersion();
305                                         if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
306                                             connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
307                                                 if (result == null || !result) {
308                                                     if (LOG.isDebugEnabled()) {
309                                                         LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
310                                                     }
311                                                     poolEntry.discardConnection(CloseMode.GRACEFUL);
312                                                 }
313                                                 leaseCompleted(poolEntry);
314                                             })), Command.Priority.IMMEDIATE);
315                                             return;
316                                         }
317                                         if (LOG.isDebugEnabled()) {
318                                             LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
319                                         }
320                                         poolEntry.discardConnection(CloseMode.IMMEDIATE);
321                                     }
322                                 }
323                             }
324                             leaseCompleted(poolEntry);
325                         }
326 
327                         void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
328                             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
329                             if (connection != null) {
330                                 connection.activate();
331                             }
332                             if (LOG.isDebugEnabled()) {
333                                 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
334                             }
335                             final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
336                             if (LOG.isDebugEnabled()) {
337                                 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
338                             }
339                             resultFuture.completed(endpoint);
340                         }
341 
342                         @Override
343                         public void failed(final Exception ex) {
344                             if (LOG.isDebugEnabled()) {
345                                 LOG.debug("{} endpoint lease failed", id);
346                             }
347                             resultFuture.failed(ex);
348                         }
349 
350                         @Override
351                         public void cancelled() {
352                             if (LOG.isDebugEnabled()) {
353                                 LOG.debug("{} endpoint lease cancelled", id);
354                             }
355                             resultFuture.cancel();
356                         }
357 
358                     });
359 
360             @Override
361             public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
362                 return resultFuture.get();
363             }
364 
365             @Override
366             public AsyncConnectionEndpoint get(
367                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
368                 return resultFuture.get(timeout, unit);
369             }
370 
371             @Override
372             public boolean cancel(final boolean mayInterruptIfRunning) {
373                 return leaseFuture.cancel(mayInterruptIfRunning);
374             }
375 
376             @Override
377             public boolean isDone() {
378                 return resultFuture.isDone();
379             }
380 
381             @Override
382             public boolean isCancelled() {
383                 return resultFuture.isCancelled();
384             }
385 
386         };
387     }
388 
389     @Override
390     public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
391         Args.notNull(endpoint, "Managed endpoint");
392         Args.notNull(keepAlive, "Keep-alive time");
393         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
394         if (entry == null) {
395             return;
396         }
397         if (LOG.isDebugEnabled()) {
398             LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
399         }
400         if (this.isClosed()) {
401             return;
402         }
403         final ManagedAsyncClientConnection connection = entry.getConnection();
404         boolean reusable = connection != null && connection.isOpen();
405         try {
406             if (reusable) {
407                 entry.updateState(state);
408                 entry.updateExpiry(keepAlive);
409                 connection.passivate();
410                 if (LOG.isDebugEnabled()) {
411                     final String s;
412                     if (TimeValue.isPositive(keepAlive)) {
413                         s = "for " + keepAlive;
414                     } else {
415                         s = "indefinitely";
416                     }
417                     LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
418                 }
419             }
420         } catch (final RuntimeException ex) {
421             reusable = false;
422             throw ex;
423         } finally {
424             pool.release(entry, reusable);
425             if (LOG.isDebugEnabled()) {
426                 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
427             }
428         }
429     }
430 
431     @Override
432     public Future<AsyncConnectionEndpoint> connect(
433             final AsyncConnectionEndpoint endpoint,
434             final ConnectionInitiator connectionInitiator,
435             final Timeout timeout,
436             final Object attachment,
437             final HttpContext context,
438             final FutureCallback<AsyncConnectionEndpoint> callback) {
439         Args.notNull(endpoint, "Endpoint");
440         Args.notNull(connectionInitiator, "Connection initiator");
441         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
442         final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
443         if (internalEndpoint.isConnected()) {
444             resultFuture.completed(endpoint);
445             return resultFuture;
446         }
447         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
448         final HttpRoute route = poolEntry.getRoute();
449         final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
450         final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
451         final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout();
452 
453         if (LOG.isDebugEnabled()) {
454             LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
455         }
456         final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
457                 connectionInitiator,
458                 firstHop,
459                 route.getTargetName(),
460                 route.getLocalSocketAddress(),
461                 connectTimeout,
462                 route.isTunnelled() ? null : resolveTlsConfig(route.getTargetHost()),
463                 context,
464                 new FutureCallback<ManagedAsyncClientConnection>() {
465 
466                     @Override
467                     public void completed(final ManagedAsyncClientConnection connection) {
468                         try {
469                             if (LOG.isDebugEnabled()) {
470                                 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
471                             }
472                             final Timeout socketTimeout = connectionConfig.getSocketTimeout();
473                             if (socketTimeout != null) {
474                                 connection.setSocketTimeout(socketTimeout);
475                             }
476                             poolEntry.assignConnection(connection);
477                             resultFuture.completed(internalEndpoint);
478                         } catch (final RuntimeException ex) {
479                             resultFuture.failed(ex);
480                         }
481                     }
482 
483                     @Override
484                     public void failed(final Exception ex) {
485                         resultFuture.failed(ex);
486                     }
487 
488                     @Override
489                     public void cancelled() {
490                         resultFuture.cancel();
491                     }
492                 });
493         resultFuture.setDependency(connectFuture);
494         return resultFuture;
495     }
496 
497     @Override
498     public void upgrade(
499             final AsyncConnectionEndpoint endpoint,
500             final Object attachment,
501             final HttpContext context,
502             final FutureCallback<AsyncConnectionEndpoint> callback) {
503         Args.notNull(endpoint, "Managed endpoint");
504         final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
505         final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
506         final HttpRoute route = poolEntry.getRoute();
507         final HttpHost target = route.getTargetHost();
508         connectionOperator.upgrade(
509                 poolEntry.getConnection(),
510                 target,
511                 route.getTargetName(),
512                 attachment != null ? attachment : resolveTlsConfig(target),
513                 context,
514                 new CallbackContribution<ManagedAsyncClientConnection>(callback) {
515 
516                     @Override
517                     public void completed(final ManagedAsyncClientConnection connection) {
518                         if (LOG.isDebugEnabled()) {
519                             LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
520                         }
521                         final TlsDetails tlsDetails = connection.getTlsDetails();
522                         if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
523                             connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
524 
525                                 @Override
526                                 public void completed(final ProtocolIOSession protocolIOSession) {
527                                     if (callback != null) {
528                                         callback.completed(endpoint);
529                                     }
530                                 }
531 
532                             });
533                         } else {
534                             if (callback != null) {
535                                 callback.completed(endpoint);
536                             }
537                         }
538                     }
539                 });
540     }
541 
542     @Override
543     public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
544         upgrade(endpoint, attachment, context, null);
545     }
546 
547     @Override
548     public Set<HttpRoute> getRoutes() {
549         return pool.getRoutes();
550     }
551 
552     @Override
553     public void setMaxTotal(final int max) {
554         pool.setMaxTotal(max);
555     }
556 
557     @Override
558     public int getMaxTotal() {
559         return pool.getMaxTotal();
560     }
561 
562     @Override
563     public void setDefaultMaxPerRoute(final int max) {
564         pool.setDefaultMaxPerRoute(max);
565     }
566 
567     @Override
568     public int getDefaultMaxPerRoute() {
569         return pool.getDefaultMaxPerRoute();
570     }
571 
572     @Override
573     public void setMaxPerRoute(final HttpRoute route, final int max) {
574         pool.setMaxPerRoute(route, max);
575     }
576 
577     @Override
578     public int getMaxPerRoute(final HttpRoute route) {
579         return pool.getMaxPerRoute(route);
580     }
581 
582     @Override
583     public void closeIdle(final TimeValue idletime) {
584         if (isClosed()) {
585             return;
586         }
587         pool.closeIdle(idletime);
588     }
589 
590     @Override
591     public void closeExpired() {
592         if (isClosed()) {
593             return;
594         }
595         pool.closeExpired();
596     }
597 
598     @Override
599     public PoolStats getTotalStats() {
600         return pool.getTotalStats();
601     }
602 
603     @Override
604     public PoolStats getStats(final HttpRoute route) {
605         return pool.getStats(route);
606     }
607 
608     /**
609      * Sets the same {@link ConnectionConfig} for all routes
610      *
611      * @since 5.2
612      */
613     public void setDefaultConnectionConfig(final ConnectionConfig config) {
614         this.connectionConfigResolver = route -> config;
615     }
616 
617     /**
618      * Sets {@link Resolver} of {@link ConnectionConfig} on a per route basis.
619      *
620      * @since 5.2
621      */
622     public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
623         this.connectionConfigResolver = connectionConfigResolver;
624     }
625 
626     /**
627      * Sets the same {@link ConnectionConfig} for all hosts
628      *
629      * @since 5.2
630      */
631     public void setDefaultTlsConfig(final TlsConfig config) {
632         this.tlsConfigResolver = host -> config;
633     }
634 
635     /**
636      * Sets {@link Resolver} of {@link TlsConfig} on a per host basis.
637      *
638      * @since 5.2
639      */
640     public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
641         this.tlsConfigResolver = tlsConfigResolver;
642     }
643 
644     void closeIfExpired(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry) {
645         final long now = System.currentTimeMillis();
646         if (entry.getExpiryDeadline().isBefore(now)) {
647             entry.discardConnection(CloseMode.GRACEFUL);
648         } else {
649             final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
650             final TimeValue timeToLive = connectionConfig.getTimeToLive();
651             if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
652                 entry.discardConnection(CloseMode.GRACEFUL);
653             }
654         }
655     }
656 
657     /**
658      * @deprecated Use custom {@link #setConnectionConfigResolver(Resolver)}
659      */
660     @Deprecated
661     public TimeValue getValidateAfterInactivity() {
662         return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
663     }
664 
665     /**
666      * Defines period of inactivity after which persistent connections must
667      * be re-validated prior to being {@link #lease(String, HttpRoute, Object, Timeout,
668      * FutureCallback)} leased} to the consumer. Negative values passed
669      * to this method disable connection validation. This check helps detect connections
670      * that have become stale (half-closed) while kept inactive in the pool.
671      *
672      * @deprecated Use {@link #setConnectionConfigResolver(Resolver)}.
673      */
674     @Deprecated
675     public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
676         setDefaultConnectionConfig(ConnectionConfig.custom()
677                 .setValidateAfterInactivity(validateAfterInactivity)
678                 .build());
679     }
680 
681     private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
682 
683     static class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
684 
685         private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
686         private final String id;
687 
688         InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
689             this.poolEntryRef = new AtomicReference<>(poolEntry);
690             this.id = INCREMENTING_ID.getNextId();
691         }
692 
693         @Override
694         public String getId() {
695             return id;
696         }
697 
698         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
699             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
700             if (poolEntry == null) {
701                 throw new ConnectionShutdownException();
702             }
703             return poolEntry;
704         }
705 
706         PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
707             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
708             if (poolEntry.getConnection() == null) {
709                 throw new ConnectionShutdownException();
710             }
711             return poolEntry;
712         }
713 
714         PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
715             return poolEntryRef.getAndSet(null);
716         }
717 
718         @Override
719         public void close(final CloseMode closeMode) {
720             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
721             if (poolEntry != null) {
722                 if (LOG.isDebugEnabled()) {
723                     LOG.debug("{} close {}", id, closeMode);
724                 }
725                 poolEntry.discardConnection(closeMode);
726             }
727         }
728 
729         @Override
730         public boolean isConnected() {
731             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
732             if (poolEntry == null) {
733                 return false;
734             }
735             final ManagedAsyncClientConnection connection = poolEntry.getConnection();
736             if (connection == null) {
737                 return false;
738             }
739             if (!connection.isOpen()) {
740                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
741                 return false;
742             }
743             return true;
744         }
745 
746         @Override
747         public void setSocketTimeout(final Timeout timeout) {
748             getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
749         }
750 
751         @Override
752         public void execute(
753                 final String exchangeId,
754                 final AsyncClientExchangeHandler exchangeHandler,
755                 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
756                 final HttpContext context) {
757             final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
758             if (LOG.isDebugEnabled()) {
759                 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
760             }
761             context.setProtocolVersion(connection.getProtocolVersion());
762             connection.submitCommand(
763                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
764                     Command.Priority.NORMAL);
765         }
766 
767         @Override
768         public EndpointInfo getInfo() {
769             final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
770             if (poolEntry != null) {
771                 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
772                 if (connection != null && connection.isOpen()) {
773                     final TlsDetails tlsDetails = connection.getTlsDetails();
774                     return new EndpointInfo(connection.getProtocolVersion(), tlsDetails != null ? tlsDetails.getSSLSession() : null);
775                 }
776             }
777             return null;
778         }
779 
780     }
781 
782     /**
783      * Method that can be called to determine whether the connection manager has been shut down and
784      * is closed or not.
785      *
786      * @return {@code true} if the connection manager has been shut down and is closed, otherwise
787      * return {@code false}.
788      * @since 5.4
789      */
790     public boolean isClosed() {
791         return this.closed.get();
792     }
793 
794 }