1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.util.Set;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.client5.http.DnsResolver;
39 import org.apache.hc.client5.http.EndpointInfo;
40 import org.apache.hc.client5.http.HttpRoute;
41 import org.apache.hc.client5.http.SchemePortResolver;
42 import org.apache.hc.client5.http.config.ConnectionConfig;
43 import org.apache.hc.client5.http.config.TlsConfig;
44 import org.apache.hc.client5.http.impl.ConnPoolSupport;
45 import org.apache.hc.client5.http.impl.ConnectionHolder;
46 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
47 import org.apache.hc.client5.http.impl.PrefixedIncrementingId;
48 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
49 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
50 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
51 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
52 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
53 import org.apache.hc.core5.annotation.Contract;
54 import org.apache.hc.core5.annotation.Internal;
55 import org.apache.hc.core5.annotation.ThreadingBehavior;
56 import org.apache.hc.core5.concurrent.BasicFuture;
57 import org.apache.hc.core5.concurrent.CallbackContribution;
58 import org.apache.hc.core5.concurrent.ComplexFuture;
59 import org.apache.hc.core5.concurrent.FutureCallback;
60 import org.apache.hc.core5.function.Resolver;
61 import org.apache.hc.core5.http.HttpConnection;
62 import org.apache.hc.core5.http.HttpHost;
63 import org.apache.hc.core5.http.HttpVersion;
64 import org.apache.hc.core5.http.ProtocolVersion;
65 import org.apache.hc.core5.http.URIScheme;
66 import org.apache.hc.core5.http.config.Lookup;
67 import org.apache.hc.core5.http.config.RegistryBuilder;
68 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
69 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
70 import org.apache.hc.core5.http.nio.HandlerFactory;
71 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
72 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
73 import org.apache.hc.core5.http.protocol.HttpContext;
74 import org.apache.hc.core5.http2.HttpVersionPolicy;
75 import org.apache.hc.core5.http2.nio.command.PingCommand;
76 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
77 import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
78 import org.apache.hc.core5.io.CloseMode;
79 import org.apache.hc.core5.pool.ConnPoolControl;
80 import org.apache.hc.core5.pool.DefaultDisposalCallback;
81 import org.apache.hc.core5.pool.LaxConnPool;
82 import org.apache.hc.core5.pool.ManagedConnPool;
83 import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
84 import org.apache.hc.core5.pool.PoolEntry;
85 import org.apache.hc.core5.pool.PoolReusePolicy;
86 import org.apache.hc.core5.pool.PoolStats;
87 import org.apache.hc.core5.pool.StrictConnPool;
88 import org.apache.hc.core5.reactor.Command;
89 import org.apache.hc.core5.reactor.ConnectionInitiator;
90 import org.apache.hc.core5.reactor.ProtocolIOSession;
91 import org.apache.hc.core5.reactor.ssl.TlsDetails;
92 import org.apache.hc.core5.util.Args;
93 import org.apache.hc.core5.util.Deadline;
94 import org.apache.hc.core5.util.Identifiable;
95 import org.apache.hc.core5.util.TimeValue;
96 import org.apache.hc.core5.util.Timeout;
97 import org.slf4j.Logger;
98 import org.slf4j.LoggerFactory;
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
119 public class PoolingAsyncClientConnectionManager implements AsyncClientConnectionManager, ConnPoolControl<HttpRoute> {
120
121 private static final Logger LOG = LoggerFactory.getLogger(PoolingAsyncClientConnectionManager.class);
122
123 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
124 public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
125
126 private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
127 private final AsyncClientConnectionOperator connectionOperator;
128 private final AtomicBoolean closed;
129
130 private volatile Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
131 private volatile Resolver<HttpHost, TlsConfig> tlsConfigResolver;
132
133 public PoolingAsyncClientConnectionManager() {
134 this(RegistryBuilder.<TlsStrategy>create()
135 .register(URIScheme.HTTPS.getId(), DefaultClientTlsStrategy.createDefault())
136 .build());
137 }
138
139 public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
140 this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECOND);
141 }
142
143 public PoolingAsyncClientConnectionManager(
144 final Lookup<TlsStrategy> tlsStrategyLookup,
145 final PoolConcurrencyPolicy poolConcurrencyPolicy,
146 final TimeValue timeToLive) {
147 this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
148 }
149
150 public PoolingAsyncClientConnectionManager(
151 final Lookup<TlsStrategy> tlsStrategyLookup,
152 final PoolConcurrencyPolicy poolConcurrencyPolicy,
153 final PoolReusePolicy poolReusePolicy,
154 final TimeValue timeToLive) {
155 this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
156 }
157
158 public PoolingAsyncClientConnectionManager(
159 final Lookup<TlsStrategy> tlsStrategyLookup,
160 final PoolConcurrencyPolicy poolConcurrencyPolicy,
161 final PoolReusePolicy poolReusePolicy,
162 final TimeValue timeToLive,
163 final SchemePortResolver schemePortResolver,
164 final DnsResolver dnsResolver) {
165 this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
166 poolConcurrencyPolicy, poolReusePolicy, timeToLive, false);
167 }
168
169 @Internal
170 public PoolingAsyncClientConnectionManager(
171 final AsyncClientConnectionOperator connectionOperator,
172 final PoolConcurrencyPolicy poolConcurrencyPolicy,
173 final PoolReusePolicy poolReusePolicy,
174 final TimeValue timeToLive,
175 final boolean messageMultiplexing) {
176 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
177 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> managedConnPool;
178 switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
179 case STRICT:
180 managedConnPool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
181 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
182 DEFAULT_MAX_TOTAL_CONNECTIONS,
183 timeToLive,
184 poolReusePolicy,
185 new DefaultDisposalCallback<>(),
186 null) {
187
188 @Override
189 public void closeExpired() {
190 enumAvailable(e -> closeIfExpired(e));
191 }
192
193 };
194 break;
195 case LAX:
196 managedConnPool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
197 DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
198 timeToLive,
199 poolReusePolicy,
200 null) {
201
202 @Override
203 public void closeExpired() {
204 enumAvailable(e -> closeIfExpired(e));
205 }
206
207 };
208 break;
209 default:
210 throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
211 }
212 this.pool = messageMultiplexing ? new H2SharingConnPool<>(managedConnPool) : managedConnPool;
213 this.closed = new AtomicBoolean(false);
214 }
215
216 @Internal
217 protected PoolingAsyncClientConnectionManager(
218 final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
219 final AsyncClientConnectionOperator connectionOperator) {
220 this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
221 this.pool = Args.notNull(pool, "Connection pool");
222 this.closed = new AtomicBoolean(false);
223 }
224
225 @Override
226 public void close() {
227 close(CloseMode.GRACEFUL);
228 }
229
230 @Override
231 public void close(final CloseMode closeMode) {
232 if (this.closed.compareAndSet(false, true)) {
233 if (LOG.isDebugEnabled()) {
234 LOG.debug("Shutdown connection pool {}", closeMode);
235 }
236 this.pool.close(closeMode);
237 LOG.debug("Connection pool shut down");
238 }
239 }
240
241 private InternalConnectionEndpoint cast(final AsyncConnectionEndpoint endpoint) {
242 if (endpoint instanceof InternalConnectionEndpoint) {
243 return (InternalConnectionEndpoint) endpoint;
244 }
245 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
246 }
247
248 private ConnectionConfig resolveConnectionConfig(final HttpRoute route) {
249 final Resolver<HttpRoute, ConnectionConfig> resolver = this.connectionConfigResolver;
250 final ConnectionConfig connectionConfig = resolver != null ? resolver.resolve(route) : null;
251 return connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
252 }
253
254 private TlsConfig resolveTlsConfig(final HttpHost host) {
255 final Resolver<HttpHost, TlsConfig> resolver = this.tlsConfigResolver;
256 TlsConfig tlsConfig = resolver != null ? resolver.resolve(host) : null;
257 if (tlsConfig == null) {
258 tlsConfig = TlsConfig.DEFAULT;
259 }
260 if (URIScheme.HTTP.same(host.getSchemeName())
261 && tlsConfig.getHttpVersionPolicy() == HttpVersionPolicy.NEGOTIATE) {
262
263
264 tlsConfig = TlsConfig.copy(tlsConfig)
265 .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
266 .build();
267 }
268 return tlsConfig;
269 }
270
271 @Override
272 public Future<AsyncConnectionEndpoint> lease(
273 final String id,
274 final HttpRoute route,
275 final Object state,
276 final Timeout requestTimeout,
277 final FutureCallback<AsyncConnectionEndpoint> callback) {
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
280 }
281 return new Future<AsyncConnectionEndpoint>() {
282
283 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
284 final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
285
286 final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
287 route,
288 state,
289 requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
290
291 @Override
292 public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
293 if (poolEntry.hasConnection()) {
294 final TimeValue timeToLive = connectionConfig.getTimeToLive();
295 if (TimeValue.isNonNegative(timeToLive)) {
296 if (timeToLive.getDuration() == 0
297 || Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
298 poolEntry.discardConnection(CloseMode.GRACEFUL);
299 }
300 }
301 }
302 if (poolEntry.hasConnection()) {
303 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
304 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
305 if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
306 if (timeValue.getDuration() == 0
307 || Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
308 final ProtocolVersion protocolVersion = connection.getProtocolVersion();
309 if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
310 connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
311 if (result == null || !result) {
312 if (LOG.isDebugEnabled()) {
313 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
314 }
315 poolEntry.discardConnection(CloseMode.GRACEFUL);
316 }
317 leaseCompleted(poolEntry);
318 })), Command.Priority.IMMEDIATE);
319 return;
320 }
321 }
322 }
323 }
324 leaseCompleted(poolEntry);
325 }
326
327 void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
328 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
329 if (connection != null) {
330 connection.activate();
331 if (connectionConfig.getSocketTimeout() != null) {
332 connection.setSocketTimeout(connectionConfig.getSocketTimeout());
333 }
334 }
335 if (LOG.isDebugEnabled()) {
336 LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
337 }
338 final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
339 if (LOG.isDebugEnabled()) {
340 LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
341 }
342 resultFuture.completed(endpoint);
343 }
344
345 @Override
346 public void failed(final Exception ex) {
347 if (LOG.isDebugEnabled()) {
348 LOG.debug("{} endpoint lease failed", id);
349 }
350 resultFuture.failed(ex);
351 }
352
353 @Override
354 public void cancelled() {
355 if (LOG.isDebugEnabled()) {
356 LOG.debug("{} endpoint lease cancelled", id);
357 }
358 resultFuture.cancel();
359 }
360
361 });
362
363 @Override
364 public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
365 return resultFuture.get();
366 }
367
368 @Override
369 public AsyncConnectionEndpoint get(
370 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
371 return resultFuture.get(timeout, unit);
372 }
373
374 @Override
375 public boolean cancel(final boolean mayInterruptIfRunning) {
376 return leaseFuture.cancel(mayInterruptIfRunning);
377 }
378
379 @Override
380 public boolean isDone() {
381 return resultFuture.isDone();
382 }
383
384 @Override
385 public boolean isCancelled() {
386 return resultFuture.isCancelled();
387 }
388
389 };
390 }
391
392 @Override
393 public void release(final AsyncConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
394 Args.notNull(endpoint, "Managed endpoint");
395 Args.notNull(keepAlive, "Keep-alive time");
396 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry = cast(endpoint).detach();
397 if (entry == null) {
398 return;
399 }
400 if (LOG.isDebugEnabled()) {
401 LOG.debug("{} releasing endpoint", ConnPoolSupport.getId(endpoint));
402 }
403 if (this.isClosed()) {
404 return;
405 }
406 final ManagedAsyncClientConnection connection = entry.getConnection();
407 boolean reusable = connection != null && connection.isOpen();
408 try {
409 if (reusable) {
410 entry.updateState(state);
411 entry.updateExpiry(keepAlive);
412 connection.passivate();
413 if (LOG.isDebugEnabled()) {
414 final String s;
415 if (TimeValue.isPositive(keepAlive)) {
416 s = "for " + keepAlive;
417 } else {
418 s = "indefinitely";
419 }
420 LOG.debug("{} connection {} can be kept alive {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection), s);
421 }
422 }
423 } catch (final RuntimeException ex) {
424 reusable = false;
425 throw ex;
426 } finally {
427 pool.release(entry, reusable);
428 if (LOG.isDebugEnabled()) {
429 LOG.debug("{} connection released {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.formatStats(entry.getRoute(), entry.getState(), pool));
430 }
431 }
432 }
433
434 @Override
435 public Future<AsyncConnectionEndpoint> connect(
436 final AsyncConnectionEndpoint endpoint,
437 final ConnectionInitiator connectionInitiator,
438 final Timeout timeout,
439 final Object attachment,
440 final HttpContext context,
441 final FutureCallback<AsyncConnectionEndpoint> callback) {
442 Args.notNull(endpoint, "Endpoint");
443 Args.notNull(connectionInitiator, "Connection initiator");
444 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
445 final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
446 if (internalEndpoint.isConnected()) {
447 resultFuture.completed(endpoint);
448 return resultFuture;
449 }
450 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getPoolEntry();
451 final HttpRoute route = poolEntry.getRoute();
452 final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
453 final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
454 final Timeout connectTimeout = timeout != null ? timeout : connectionConfig.getConnectTimeout();
455
456 if (LOG.isDebugEnabled()) {
457 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
458 }
459 final Future<ManagedAsyncClientConnection> connectFuture = connectionOperator.connect(
460 connectionInitiator,
461 firstHop,
462 route.getTargetName(),
463 route.getLocalSocketAddress(),
464 connectTimeout,
465 route.isTunnelled() ? null : resolveTlsConfig(route.getTargetHost()),
466 context,
467 new FutureCallback<ManagedAsyncClientConnection>() {
468
469 @Override
470 public void completed(final ManagedAsyncClientConnection connection) {
471 try {
472 if (LOG.isDebugEnabled()) {
473 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
474 }
475 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
476 if (socketTimeout != null) {
477 connection.setSocketTimeout(socketTimeout);
478 }
479 poolEntry.assignConnection(connection);
480 resultFuture.completed(internalEndpoint);
481 } catch (final RuntimeException ex) {
482 resultFuture.failed(ex);
483 }
484 }
485
486 @Override
487 public void failed(final Exception ex) {
488 resultFuture.failed(ex);
489 }
490
491 @Override
492 public void cancelled() {
493 resultFuture.cancel();
494 }
495 });
496 resultFuture.setDependency(connectFuture);
497 return resultFuture;
498 }
499
500 @Override
501 public void upgrade(
502 final AsyncConnectionEndpoint endpoint,
503 final Object attachment,
504 final HttpContext context,
505 final FutureCallback<AsyncConnectionEndpoint> callback) {
506 Args.notNull(endpoint, "Managed endpoint");
507 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
508 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
509 final HttpRoute route = poolEntry.getRoute();
510 final HttpHost target = route.getTargetHost();
511 connectionOperator.upgrade(
512 poolEntry.getConnection(),
513 target,
514 route.getTargetName(),
515 attachment != null ? attachment : resolveTlsConfig(target),
516 context,
517 new CallbackContribution<ManagedAsyncClientConnection>(callback) {
518
519 @Override
520 public void completed(final ManagedAsyncClientConnection connection) {
521 if (LOG.isDebugEnabled()) {
522 LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
523 }
524 final TlsDetails tlsDetails = connection.getTlsDetails();
525 if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
526 connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
527
528 @Override
529 public void completed(final ProtocolIOSession protocolIOSession) {
530 if (callback != null) {
531 callback.completed(endpoint);
532 }
533 }
534
535 });
536 } else {
537 if (callback != null) {
538 callback.completed(endpoint);
539 }
540 }
541 }
542 });
543 }
544
545 @Override
546 public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
547 upgrade(endpoint, attachment, context, null);
548 }
549
550 @Override
551 public Set<HttpRoute> getRoutes() {
552 return pool.getRoutes();
553 }
554
555 @Override
556 public void setMaxTotal(final int max) {
557 pool.setMaxTotal(max);
558 }
559
560 @Override
561 public int getMaxTotal() {
562 return pool.getMaxTotal();
563 }
564
565 @Override
566 public void setDefaultMaxPerRoute(final int max) {
567 pool.setDefaultMaxPerRoute(max);
568 }
569
570 @Override
571 public int getDefaultMaxPerRoute() {
572 return pool.getDefaultMaxPerRoute();
573 }
574
575 @Override
576 public void setMaxPerRoute(final HttpRoute route, final int max) {
577 pool.setMaxPerRoute(route, max);
578 }
579
580 @Override
581 public int getMaxPerRoute(final HttpRoute route) {
582 return pool.getMaxPerRoute(route);
583 }
584
585 @Override
586 public void closeIdle(final TimeValue idletime) {
587 if (isClosed()) {
588 return;
589 }
590 pool.closeIdle(idletime);
591 }
592
593 @Override
594 public void closeExpired() {
595 if (isClosed()) {
596 return;
597 }
598 pool.closeExpired();
599 }
600
601 @Override
602 public PoolStats getTotalStats() {
603 return pool.getTotalStats();
604 }
605
606 @Override
607 public PoolStats getStats(final HttpRoute route) {
608 return pool.getStats(route);
609 }
610
611
612
613
614
615
616 public void setDefaultConnectionConfig(final ConnectionConfig config) {
617 this.connectionConfigResolver = route -> config;
618 }
619
620
621
622
623
624
625 public void setConnectionConfigResolver(final Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver) {
626 this.connectionConfigResolver = connectionConfigResolver;
627 }
628
629
630
631
632
633
634 public void setDefaultTlsConfig(final TlsConfig config) {
635 this.tlsConfigResolver = host -> config;
636 }
637
638
639
640
641
642
643 public void setTlsConfigResolver(final Resolver<HttpHost, TlsConfig> tlsConfigResolver) {
644 this.tlsConfigResolver = tlsConfigResolver;
645 }
646
647 void closeIfExpired(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> entry) {
648 final long now = System.currentTimeMillis();
649 if (entry.getExpiryDeadline().isBefore(now)) {
650 entry.discardConnection(CloseMode.GRACEFUL);
651 } else {
652 final ConnectionConfig connectionConfig = resolveConnectionConfig(entry.getRoute());
653 final TimeValue timeToLive = connectionConfig.getTimeToLive();
654 if (timeToLive != null && Deadline.calculate(entry.getCreated(), timeToLive).isBefore(now)) {
655 entry.discardConnection(CloseMode.GRACEFUL);
656 }
657 }
658 }
659
660
661
662
663 @Deprecated
664 public TimeValue getValidateAfterInactivity() {
665 return ConnectionConfig.DEFAULT.getValidateAfterInactivity();
666 }
667
668
669
670
671
672
673
674
675
676
677 @Deprecated
678 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
679 setDefaultConnectionConfig(ConnectionConfig.custom()
680 .setValidateAfterInactivity(validateAfterInactivity)
681 .build());
682 }
683
684 private static final PrefixedIncrementingId INCREMENTING_ID = new PrefixedIncrementingId("ep-");
685
686 static class InternalConnectionEndpoint extends AsyncConnectionEndpoint implements ConnectionHolder, Identifiable {
687
688 private final AtomicReference<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> poolEntryRef;
689 private final String id;
690
691 InternalConnectionEndpoint(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
692 this.poolEntryRef = new AtomicReference<>(poolEntry);
693 this.id = INCREMENTING_ID.getNextId();
694 }
695
696 @Override
697 public String getId() {
698 return id;
699 }
700
701 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getPoolEntry() {
702 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
703 if (poolEntry == null) {
704 throw new ConnectionShutdownException();
705 }
706 return poolEntry;
707 }
708
709 PoolEntry<HttpRoute, ManagedAsyncClientConnection> getValidatedPoolEntry() {
710 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = getPoolEntry();
711 if (poolEntry.getConnection() == null) {
712 throw new ConnectionShutdownException();
713 }
714 return poolEntry;
715 }
716
717 PoolEntry<HttpRoute, ManagedAsyncClientConnection> detach() {
718 return poolEntryRef.getAndSet(null);
719 }
720
721 @Override
722 public void close(final CloseMode closeMode) {
723 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
724 if (poolEntry != null) {
725 if (LOG.isDebugEnabled()) {
726 LOG.debug("{} close {}", id, closeMode);
727 }
728 poolEntry.discardConnection(closeMode);
729 }
730 }
731
732 @Override
733 public boolean isConnected() {
734 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
735 if (poolEntry == null) {
736 return false;
737 }
738 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
739 if (connection == null) {
740 return false;
741 }
742 if (!connection.isOpen()) {
743 poolEntry.discardConnection(CloseMode.IMMEDIATE);
744 return false;
745 }
746 return true;
747 }
748
749 @Override
750 public void setSocketTimeout(final Timeout timeout) {
751 getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
752 }
753
754 @Override
755 public void execute(
756 final String exchangeId,
757 final AsyncClientExchangeHandler exchangeHandler,
758 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
759 final HttpContext context) {
760 final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
761 if (LOG.isDebugEnabled()) {
762 LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
763 }
764 context.setProtocolVersion(connection.getProtocolVersion());
765 connection.submitCommand(
766 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
767 Command.Priority.NORMAL);
768 }
769
770 @Override
771 public EndpointInfo getInfo() {
772 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
773 if (poolEntry != null) {
774 final ManagedAsyncClientConnection connection = poolEntry.getConnection();
775 if (connection != null && connection.isOpen()) {
776 final TlsDetails tlsDetails = connection.getTlsDetails();
777 return new EndpointInfo(connection.getProtocolVersion(), tlsDetails != null ? tlsDetails.getSSLSession() : null);
778 }
779 }
780 return null;
781 }
782
783 @Override
784 public HttpConnection get() {
785 final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();
786 return poolEntry != null ? poolEntry.getConnection() : null;
787 }
788
789 }
790
791
792
793
794
795
796
797
798
799 public boolean isClosed() {
800 return this.closed.get();
801 }
802
803 }