1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.util.Set;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.client5.http.DnsResolver;
39 import org.apache.hc.client5.http.EndpointInfo;
40 import org.apache.hc.client5.http.HttpRoute;
41 import org.apache.hc.client5.http.SchemePortResolver;
42 import org.apache.hc.client5.http.config.ConnectionConfig;
43 import org.apache.hc.client5.http.config.TlsConfig;
44 import org.apache.hc.client5.http.impl.ConnPoolSupport;
45 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
46 import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
47 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
48 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
49 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
50 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
51 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
52 import org.apache.hc.core5.annotation.Contract;
53 import org.apache.hc.core5.annotation.Internal;
54 import org.apache.hc.core5.annotation.ThreadingBehavior;
55 import org.apache.hc.core5.concurrent.BasicFuture;
56 import org.apache.hc.core5.concurrent.CallbackContribution;
57 import org.apache.hc.core5.concurrent.ComplexFuture;
58 import org.apache.hc.core5.concurrent.FutureCallback;
59 import org.apache.hc.core5.function.Resolver;
60 import org.apache.hc.core5.http.HttpHost;
61 import org.apache.hc.core5.http.HttpVersion;
62 import org.apache.hc.core5.http.ProtocolVersion;
63 import org.apache.hc.core5.http.URIScheme;
64 import org.apache.hc.core5.http.config.Lookup;
65 import org.apache.hc.core5.http.config.RegistryBuilder;
66 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
67 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
68 import org.apache.hc.core5.http.nio.HandlerFactory;
69 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
70 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
71 import org.apache.hc.core5.http.protocol.HttpContext;
72 import org.apache.hc.core5.http2.HttpVersionPolicy;
73 import org.apache.hc.core5.http2.nio.command.PingCommand;
74 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
75 import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
76 import org.apache.hc.core5.io.CloseMode;
77 import org.apache.hc.core5.pool.ConnPoolControl;
78 import org.apache.hc.core5.pool.LaxConnPool;
79 import org.apache.hc.core5.pool.ManagedConnPool;
80 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
81 import org.apache.hc.core5.pool.PoolEntry;
82 import org.apache.hc.core5.pool.PoolReusePolicy;
83 import org.apache.hc.core5.pool.PoolStats;
84 import org.apache.hc.core5.pool.StrictConnPool;
85 import org.apache.hc.core5.reactor.Command;
86 import org.apache.hc.core5.reactor.ConnectionInitiator;
87 import org.apache.hc.core5.reactor.ProtocolIOSession;
88 import org.apache.hc.core5.reactor.ssl.TlsDetails;
89 import org.apache.hc.core5.util.Args;
90 import org.apache.hc.core5.util.Deadline;
91 import org.apache.hc.core5.util.Identifiable;
92 import org.apache.hc.core5.util.TimeValue;
93 import org.apache.hc.core5.util.Timeout;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
116 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
117
118 private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
119
120 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
121 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
122
123 private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
124 private final AsyncClientConnectionOperator connectionOperator;
125 private final AtomicBoolean closed;
126
127 private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
128 private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
129
130 public PoolingAsyncClientConnectionManager() {
131 this(RegistryBuilder.<TlsStrategy>create()
132 .register(URIScheme.HTTPS.getId(), DefaultClientTlsStrategy.createDefault())
133 .build());
134 }
135
136 public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
137 this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
138 }
139
140 public PoolingAsyncClientConnectionManager(
141 final Lookup<TlsStrategy> tlsStrategyLookup,
142 final PoolConcurrencyPolicy poolConcurrencyPolicy,
143 final TimeValue timeToLive) {
144 this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
145 }
146
147 public PoolingAsyncClientConnectionManager(
148 final Lookup<TlsStrategy> tlsStrategyLookup,
149 final PoolConcurrencyPolicy poolConcurrencyPolicy,
150 final PoolReusePolicy poolReusePolicy,
151 final TimeValue timeToLive) {
152 this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
153 }
154
155 public PoolingAsyncClientConnectionManager(
156 final Lookup<TlsStrategy> tlsStrategyLookup,
157 final PoolConcurrencyPolicy poolConcurrencyPolicy,
158 final PoolReusePolicy poolReusePolicy,
159 final TimeValue timeToLive,
160 final SchemePortResolver schemePortResolver,
161 final DnsResolver dnsResolver) {
162 this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
163 poolConcurrencyPolicy, poolReusePolicy, timeToLive, false);
164 }
165
166 @Internal
167 public PoolingAsyncClientConnectionManager(
168 final AsyncClientConnectionOperator connectionOperator,
169 final PoolConcurrencyPolicy poolConcurrencyPolicy,
170 final PoolReusePolicy poolReusePolicy,
171 final TimeValue timeToLive,
172 final boolean messageMultiplexing) {
173 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
174 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> managedConnPool;
175 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
176 case STRICT:
177 managedConnPool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
178 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
179 DEFAULT_MAX_TOTAL_CONNECTIONS,
180 timeToLive,
181 poolReusePolicy,
182 null) {
183
184 @Override
185 public void closeExpired() {
186 enumAvailable(e -> closeIfExpired(e));
187 }
188
189 };
190 break;
191 case LAX:
192 managedConnPool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
193 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
194 timeToLive,
195 poolReusePolicy,
196 null) {
197
198 @Override
199 public void closeExpired() {
200 enumAvailable(e -> closeIfExpired(e));
201 }
202
203 };
204 break;
205 default:
206 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
207 }
208 this.pool = messageMultiplexing ? new H2SharingConnPool<>(managedConnPool) : managedConnPool;
209 this.closed = new AtomicBoolean(false);
210 }
211
212 @Internal
213 protected PoolingAsyncClientConnectionManager(
214 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
215 final AsyncClientConnectionOperator connectionOperator) {
216 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
217 this.pool = Args.notNull(pool, "Connection pool");
218 this.closed = new AtomicBoolean(false);
219 }
220
221 @Override
222 public void close() {
223 close(CloseMode.GRACEFUL);
224 }
225
226 @Override
227 public void close(final CloseMode closeMode) {
228 if (this.closed.compareAndSet(false, true)) {
229 if (LOG.isDebugEnabled()) {
230 LOG.debug("Shutdown connection pool {}", closeMode);
231 }
232 this.pool.close(closeMode);
233 LOG.debug("Connection pool shut down");
234 }
235 }
236
237 private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
238 if (endpoint instanceof InternalConnectionEndpoint) {
239 return (InternalConnectionEndpoint) endpoint;
240 }
241 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
242 }
243
244 private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
245 final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
246 final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
247 return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
248 }
249
250 private TlsConfig resolveTlsConfig(final HttpHost host) {
251 final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
252 TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
253 if (tlsConfig == null) {
254 tlsConfig = TlsConfig.DEFAULT;
255 }
256 if (URIScheme.HTTP.same(host.getSchemeName())
257 && tlsConfig.getHttpVersionPolicy() == HttpVersionPolicy.NEGOTIATE) {
258
259
260 tlsConfig = TlsConfig.copy(tlsConfig)
261 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
262 .build();
263 }
264 return tlsConfig;
265 }
266
267 @Override
268 public Future<AsyncConnectionEndpoint> lease(
269 final String id,
270 final HttpRoute route,
271 final Object state,
272 final Timeout requestTimeout,
273 final FutureCallback<AsyncConnectionEndpoint> callback) {
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
276 }
277 return new Future<AsyncConnectionEndpoint>() {
278
279 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
280 final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
281
282 final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
283 route,
284 state,
285 requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
286
287 @Override
288 public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
289 if (poolEntry.hasConnection()) {
290 final TimeValue timeToLive = connectionConfig.getTimeToLive();
291 if (TimeValue.isNonNegative(timeToLive)) {
292 if (timeToLive.getDuration() == 0
293 || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
294 poolEntry.discardConnection(CloseMode.GRACEFUL);
295 }
296 }
297 }
298 if (poolEntry.hasConnection()) {
299 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
300 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
301 if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
302 if (timeValue.getDuration() == 0
303 || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
304 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
305 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
306 connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
307 if (result == null || !result) {
308 if (LOG.isDebugEnabled()) {
309 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
310 }
311 poolEntry.discardConnection(CloseMode.GRACEFUL);
312 }
313 leaseCompleted(poolEntry);
314 })), Command.Priority.IMMEDIATE);
315 return;
316 }
317 if (LOG.isDebugEnabled()) {
318 LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
319 }
320 poolEntry.discardConnection(CloseMode.IMMEDIATE);
321 }
322 }
323 }
324 leaseCompleted(poolEntry);
325 }
326
327 void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
328 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
329 if (connection != null) {
330 connection.activate();
331 }
332 if (LOG.isDebugEnabled()) {
333 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
334 }
335 final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
336 if (LOG.isDebugEnabled()) {
337 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
338 }
339 resultFuture.completed(endpoint);
340 }
341
342 @Override
343 public void failed(final Exception ex) {
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("{} endpoint lease failed", id);
346 }
347 resultFuture.failed(ex);
348 }
349
350 @Override
351 public void cancelled() {
352 if (LOG.isDebugEnabled()) {
353 LOG.debug("{} endpoint lease cancelled", id);
354 }
355 resultFuture.cancel();
356 }
357
358 });
359
360 @Override
361 public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
362 return resultFuture.get();
363 }
364
365 @Override
366 public AsyncConnectionEndpoint get(
367 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
368 return resultFuture.get(timeout, unit);
369 }
370
371 @Override
372 public boolean cancel(final boolean mayInterruptIfRunning) {
373 return leaseFuture.cancel(mayInterruptIfRunning);
374 }
375
376 @Override
377 public boolean isDone() {
378 return resultFuture.isDone();
379 }
380
381 @Override
382 public boolean isCancelled() {
383 return resultFuture.isCancelled();
384 }
385
386 };
387 }
388
389 @Override
390 public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
391 Args.notNull(endpoint, "Managed endpoint");
392 Args.notNull(keepAlive, "Keep-alive time");
393 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
394 if (entry == null) {
395 return;
396 }
397 if (LOG.isDebugEnabled()) {
398 LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
399 }
400 if (this.isClosed()) {
401 return;
402 }
403 final ManagedAsyncClientConnection connection = entry.getConnection();
404 boolean reusable = connection != null && connection.isOpen();
405 try {
406 if (reusable) {
407 entry.updateState(state);
408 entry.updateExpiry(keepAlive);
409 connection.passivate();
410 if (LOG.isDebugEnabled()) {
411 final String s;
412 if (TimeValue.isPositive(keepAlive)) {
413 s = "for " + keepAlive;
414 } else {
415 s = "indefinitely";
416 }
417 LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
418 }
419 }
420 } catch (final RuntimeException ex) {
421 reusable = false;
422 throw ex;
423 } finally {
424 pool.release(entry, reusable);
425 if (LOG.isDebugEnabled()) {
426 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
427 }
428 }
429 }
430
431 @Override
432 public Future<AsyncConnectionEndpoint> connect(
433 final AsyncConnectionEndpoint endpoint,
434 final ConnectionInitiator connectionInitiator,
435 final Timeout timeout,
436 final Object attachment,
437 final HttpContext context,
438 final FutureCallback<AsyncConnectionEndpoint> callback) {
439 Args.notNull(endpoint, "Endpoint");
440 Args.notNull(connectionInitiator, "Connection initiator");
441 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
442 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
443 if (internalEndpoint.isConnected()) {
444 resultFuture.completed(endpoint);
445 return resultFuture;
446 }
447 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
448 final HttpRoute route = poolEntry.getRoute();
449 final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
450 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
451 final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout();
452
453 if (LOG.isDebugEnabled()) {
454 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
455 }
456 final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
457 connectionInitiator,
458 firstHop,
459 route.getTargetName(),
460 route.getLocalSocketAddress(),
461 connectTimeout,
462 route.isTunnelled() ? null : resolveTlsConfig(route.getTargetHost()),
463 context,
464 new FutureCallback<ManagedAsyncClientConnection>() {
465
466 @Override
467 public void completed(final ManagedAsyncClientConnection connection) {
468 try {
469 if (LOG.isDebugEnabled()) {
470 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
471 }
472 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
473 if (socketTimeout != null) {
474 connection.setSocketTimeout(socketTimeout);
475 }
476 poolEntry.assignConnection(connection);
477 resultFuture.completed(internalEndpoint);
478 } catch (final RuntimeException ex) {
479 resultFuture.failed(ex);
480 }
481 }
482
483 @Override
484 public void failed(final Exception ex) {
485 resultFuture.failed(ex);
486 }
487
488 @Override
489 public void cancelled() {
490 resultFuture.cancel();
491 }
492 });
493 resultFuture.setDependency(connectFuture);
494 return resultFuture;
495 }
496
497 @Override
498 public void upgrade(
499 final AsyncConnectionEndpoint endpoint,
500 final Object attachment,
501 final HttpContext context,
502 final FutureCallback<AsyncConnectionEndpoint> callback) {
503 Args.notNull(endpoint, "Managed endpoint");
504 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
505 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
506 final HttpRoute route = poolEntry.getRoute();
507 final HttpHost target = route.getTargetHost();
508 connectionOperator.upgrade(
509 poolEntry.getConnection(),
510 target,
511 route.getTargetName(),
512 attachment != null ? attachment : resolveTlsConfig(target),
513 context,
514 new CallbackContribution<ManagedAsyncClientConnection>(callback) {
515
516 @Override
517 public void completed(final ManagedAsyncClientConnection connection) {
518 if (LOG.isDebugEnabled()) {
519 LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
520 }
521 final TlsDetails tlsDetails = connection.getTlsDetails();
522 if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
523 connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
524
525 @Override
526 public void completed(final ProtocolIOSession protocolIOSession) {
527 if (callback != null) {
528 callback.completed(endpoint);
529 }
530 }
531
532 });
533 } else {
534 if (callback != null) {
535 callback.completed(endpoint);
536 }
537 }
538 }
539 });
540 }
541
542 @Override
543 public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
544 upgrade(endpoint, attachment, context, null);
545 }
546
547 @Override
548 public Set<HttpRoute> getRoutes() {
549 return pool.getRoutes();
550 }
551
552 @Override
553 public void setMaxTotal(final int max) {
554 pool.setMaxTotal(max);
555 }
556
557 @Override
558 public int getMaxTotal() {
559 return pool.getMaxTotal();
560 }
561
562 @Override
563 public void setDefaultMaxPerRoute(final int max) {
564 pool.setDefaultMaxPerRoute(max);
565 }
566
567 @Override
568 public int getDefaultMaxPerRoute() {
569 return pool.getDefaultMaxPerRoute();
570 }
571
572 @Override
573 public void setMaxPerRoute(final HttpRoute route, final int max) {
574 pool.setMaxPerRoute(route, max);
575 }
576
577 @Override
578 public int getMaxPerRoute(final HttpRoute route) {
579 return pool.getMaxPerRoute(route);
580 }
581
582 @Override
583 public void closeIdle(final TimeValue idletime) {
584 if (isClosed()) {
585 return;
586 }
587 pool.closeIdle(idletime);
588 }
589
590 @Override
591 public void closeExpired() {
592 if (isClosed()) {
593 return;
594 }
595 pool.closeExpired();
596 }
597
598 @Override
599 public PoolStats getTotalStats() {
600 return pool.getTotalStats();
601 }
602
603 @Override
604 public PoolStats getStats(final HttpRoute route) {
605 return pool.getStats(route);
606 }
607
608
609
610
611
612
613 public void setDefaultConnectionConfig(final ConnectionConfig config) {
614 this.connectionConfigResolver = route -> config;
615 }
616
617
618
619
620
621
622 public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
623 this.connectionConfigResolver = connectionConfigResolver;
624 }
625
626
627
628
629
630
631 public void setDefaultTlsConfig(final TlsConfig config) {
632 this.tlsConfigResolver = host -> config;
633 }
634
635
636
637
638
639
640 public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
641 this.tlsConfigResolver = tlsConfigResolver;
642 }
643
644 void closeIfExpired(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry) {
645 final long now = System.currentTimeMillis();
646 if (entry.getExpiryDeadline().isBefore(now)) {
647 entry.discardConnection(CloseMode.GRACEFUL);
648 } else {
649 final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
650 final TimeValue timeToLive = connectionConfig.getTimeToLive();
651 if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
652 entry.discardConnection(CloseMode.GRACEFUL);
653 }
654 }
655 }
656
657
658
659
660 @Deprecated
661 public TimeValue getValidateAfterInactivity() {
662 return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
663 }
664
665
666
667
668
669
670
671
672
673
674 @Deprecated
675 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
676 setDefaultConnectionConfig(ConnectionConfig.custom()
677 .setValidateAfterInactivity(validateAfterInactivity)
678 .build());
679 }
680
681 private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
682
683 static class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements Identifiable {
684
685 private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
686 private final String id;
687
688 InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
689 this.poolEntryRef = new AtomicReference<>(poolEntry);
690 this.id = INCREMENTING_ID.getNextId();
691 }
692
693 @Override
694 public String getId() {
695 return id;
696 }
697
698 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
699 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
700 if (poolEntry == null) {
701 throw new ConnectionShutdownException();
702 }
703 return poolEntry;
704 }
705
706 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
707 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
708 if (poolEntry.getConnection() == null) {
709 throw new ConnectionShutdownException();
710 }
711 return poolEntry;
712 }
713
714 PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
715 return poolEntryRef.getAndSet(null);
716 }
717
718 @Override
719 public void close(final CloseMode closeMode) {
720 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
721 if (poolEntry != null) {
722 if (LOG.isDebugEnabled()) {
723 LOG.debug("{} close {}", id, closeMode);
724 }
725 poolEntry.discardConnection(closeMode);
726 }
727 }
728
729 @Override
730 public boolean isConnected() {
731 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
732 if (poolEntry == null) {
733 return false;
734 }
735 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
736 if (connection == null) {
737 return false;
738 }
739 if (!connection.isOpen()) {
740 poolEntry.discardConnection(CloseMode.IMMEDIATE);
741 return false;
742 }
743 return true;
744 }
745
746 @Override
747 public void setSocketTimeout(final Timeout timeout) {
748 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
749 }
750
751 @Override
752 public void execute(
753 final String exchangeId,
754 final AsyncClientExchangeHandler exchangeHandler,
755 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
756 final HttpContext context) {
757 final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
758 if (LOG.isDebugEnabled()) {
759 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
760 }
761 context.setProtocolVersion(connection.getProtocolVersion());
762 connection.submitCommand(
763 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
764 Command.Priority.NORMAL);
765 }
766
767 @Override
768 public EndpointInfo getInfo() {
769 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
770 if (poolEntry != null) {
771 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
772 if (connection != null && connection.isOpen()) {
773 final TlsDetails tlsDetails = connection.getTlsDetails();
774 return new EndpointInfo(connection.getProtocolVersion(), tlsDetails != null ? tlsDetails.getSSLSession() : null);
775 }
776 }
777 return null;
778 }
779
780 }
781
782
783
784
785
786
787
788
789
790 public boolean isClosed() {
791 return this.closed.get();
792 }
793
794 }