1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.io;
29
30 import java.io.IOException;
31 import java.time.Instant;
32 import java.util.Objects;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.EndpointInfo;
42 import org.apache.hc.client5.http.HttpRoute;
43 import org.apache.hc.client5.http.SchemePortResolver;
44 import org.apache.hc.client5.http.config.ConnectionConfig;
45 import org.apache.hc.client5.http.config.TlsConfig;
46 import org.apache.hc.client5.http.impl.ConnPoolSupport;
47 import org.apache.hc.client5.http.impl.ConnectionHolder;
48 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
49 import org.apache.hc.client5.http.io.ConnectionEndpoint;
50 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
51 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
52 import org.apache.hc.client5.http.io.LeaseRequest;
53 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
54 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
55 import org.apache.hc.client5.http.ssl.TlsSocketStrategy;
56 import org.apache.hc.core5.annotation.Contract;
57 import org.apache.hc.core5.annotation.ThreadingBehavior;
58 import org.apache.hc.core5.http.ClassicHttpRequest;
59 import org.apache.hc.core5.http.ClassicHttpResponse;
60 import org.apache.hc.core5.http.HttpConnection;
61 import org.apache.hc.core5.http.HttpException;
62 import org.apache.hc.core5.http.HttpHost;
63 import org.apache.hc.core5.http.URIScheme;
64 import org.apache.hc.core5.http.config.Lookup;
65 import org.apache.hc.core5.http.config.RegistryBuilder;
66 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
67 import org.apache.hc.core5.http.io.HttpConnectionFactory;
68 import org.apache.hc.core5.http.io.SocketConfig;
69 import org.apache.hc.core5.http.protocol.HttpContext;
70 import org.apache.hc.core5.io.CloseMode;
71 import org.apache.hc.core5.util.Args;
72 import org.apache.hc.core5.util.Asserts;
73 import org.apache.hc.core5.util.Deadline;
74 import org.apache.hc.core5.util.Identifiable;
75 import org.apache.hc.core5.util.TimeValue;
76 import org.apache.hc.core5.util.Timeout;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @Contract(threading = ThreadingBehavior.SAFE)
99 public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
100
101 private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
102
103 private static final AtomicLong COUNT = new AtomicLong(0);
104
105 private final HttpClientConnectionOperator connectionOperator;
106 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
107 private final String id;
108
109 private final ReentrantLock lock;
110
111 private ManagedHttpClientConnection conn;
112 private HttpRoute route;
113 private Object state;
114 private long created;
115 private long updated;
116 private long expiry;
117 private boolean leased;
118 private SocketConfig socketConfig;
119 private ConnectionConfig connectionConfig;
120 private TlsConfig tlsConfig;
121
122 private final AtomicBoolean closed;
123
124
125
126
127 public BasicHttpClientConnectionManager(
128 final HttpClientConnectionOperator httpClientConnectionOperator,
129 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
130 super();
131 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
132 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
133 this.id = String.format("ep-%010d", COUNT.getAndIncrement());
134 this.expiry = Long.MAX_VALUE;
135 this.socketConfig = SocketConfig.DEFAULT;
136 this.connectionConfig = ConnectionConfig.DEFAULT;
137 this.tlsConfig = TlsConfig.DEFAULT;
138 this.closed = new AtomicBoolean(false);
139 this.lock = new ReentrantLock();
140 }
141
142
143
144
145 public static BasicHttpClientConnectionManager create(
146 final SchemePortResolver schemePortResolver,
147 final DnsResolver dnsResolver,
148 final Lookup<TlsSocketStrategy> tlsSocketStrategyRegistry,
149 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
150 return new BasicHttpClientConnectionManager(
151 new DefaultHttpClientConnectionOperator(schemePortResolver, dnsResolver, tlsSocketStrategyRegistry),
152 connFactory);
153 }
154
155
156
157
158 public static BasicHttpClientConnectionManager create(
159 final Lookup<TlsSocketStrategy> tlsSocketStrategyRegistry,
160 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
161 return new BasicHttpClientConnectionManager(
162 new DefaultHttpClientConnectionOperator(null, null, tlsSocketStrategyRegistry), connFactory);
163 }
164
165
166
167
168 public static BasicHttpClientConnectionManager create(
169 final Lookup<TlsSocketStrategy> tlsSocketStrategyRegistry) {
170 return new BasicHttpClientConnectionManager(
171 new DefaultHttpClientConnectionOperator(null, null, tlsSocketStrategyRegistry), null);
172 }
173
174
175
176
177 @Deprecated
178 public BasicHttpClientConnectionManager(
179 final Lookup<org.apache.hc.client5.http.socket.ConnectionSocketFactory> socketFactoryRegistry,
180 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
181 final SchemePortResolver schemePortResolver,
182 final DnsResolver dnsResolver) {
183 this(new DefaultHttpClientConnectionOperator(
184 socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
185 }
186
187
188
189
190 @Deprecated
191 public BasicHttpClientConnectionManager(
192 final Lookup<org.apache.hc.client5.http.socket.ConnectionSocketFactory> socketFactoryRegistry,
193 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
194 this(socketFactoryRegistry, connFactory, null, null);
195 }
196
197
198
199
200 @Deprecated
201 public BasicHttpClientConnectionManager(
202 final Lookup<org.apache.hc.client5.http.socket.ConnectionSocketFactory> socketFactoryRegistry) {
203 this(socketFactoryRegistry, null, null, null);
204 }
205
206 public BasicHttpClientConnectionManager() {
207 this(new DefaultHttpClientConnectionOperator(null, null,
208 RegistryBuilder.<TlsSocketStrategy>create()
209 .register(URIScheme.HTTPS.id, DefaultClientTlsStrategy.createDefault())
210 .build()),
211 null);
212 }
213
214 @Override
215 public void close() {
216 close(CloseMode.GRACEFUL);
217 }
218
219 @Override
220 public void close(final CloseMode closeMode) {
221 if (this.closed.compareAndSet(false, true)) {
222 closeConnection(closeMode);
223 }
224 }
225
226 HttpRoute getRoute() {
227 return route;
228 }
229
230 Object getState() {
231 return state;
232 }
233
234 public SocketConfig getSocketConfig() {
235 lock.lock();
236 try {
237 return socketConfig;
238 } finally {
239 lock.unlock();
240 }
241 }
242
243 public void setSocketConfig(final SocketConfig socketConfig) {
244 lock.lock();
245 try {
246 this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
247 } finally {
248 lock.unlock();
249 }
250 }
251
252
253
254
255 public ConnectionConfig getConnectionConfig() {
256 lock.lock();
257 try {
258 return connectionConfig;
259 } finally {
260 lock.unlock();
261 }
262
263 }
264
265
266
267
268 public void setConnectionConfig(final ConnectionConfig connectionConfig) {
269 lock.lock();
270 try {
271 this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
272 } finally {
273 lock.unlock();
274 }
275 }
276
277
278
279
280 public TlsConfig getTlsConfig() {
281 lock.lock();
282 try {
283 return tlsConfig;
284 } finally {
285 lock.unlock();
286 }
287 }
288
289
290
291
292 public void setTlsConfig(final TlsConfig tlsConfig) {
293 lock.lock();
294 try {
295 this.tlsConfig = tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
296 } finally {
297 lock.unlock();
298 }
299 }
300
301 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
302 return lease(id, route, Timeout.DISABLED, state);
303 }
304
305 @Override
306 public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
307 return new LeaseRequest() {
308
309 @Override
310 public ConnectionEndpoint get(
311 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
312 try {
313 return new InternalConnectionEndpoint(route, getConnection(route, state));
314 } catch (final IOException ex) {
315 throw new ExecutionException(ex.getMessage(), ex);
316 }
317 }
318
319 @Override
320 public boolean cancel() {
321 return false;
322 }
323
324 };
325 }
326
327 private void closeConnection(final CloseMode closeMode) {
328 lock.lock();
329 try {
330 if (this.conn != null) {
331 if (LOG.isDebugEnabled()) {
332 LOG.debug("{} Closing connection {}", id, closeMode);
333 }
334 this.conn.close(closeMode);
335 this.conn = null;
336 }
337 } finally {
338 lock.unlock();
339 }
340 }
341
342 private void checkExpiry() {
343 if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
344 if (LOG.isDebugEnabled()) {
345 LOG.debug("{} Connection expired @ {}", id, Instant.ofEpochMilli(this.expiry));
346 }
347 closeConnection(CloseMode.GRACEFUL);
348 }
349 }
350
351 private void validate() {
352 if (this.conn != null) {
353 final TimeValue timeToLive = connectionConfig.getTimeToLive();
354 if (TimeValue.isNonNegative(timeToLive)) {
355 final Deadline deadline = Deadline.calculate(created, timeToLive);
356 if (deadline.isExpired()) {
357 closeConnection(CloseMode.GRACEFUL);
358 }
359 }
360 }
361 if (this.conn != null) {
362 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity() != null ?
363 connectionConfig.getValidateAfterInactivity() : TimeValue.ofSeconds(2);
364 if (TimeValue.isNonNegative(timeValue)) {
365 final Deadline deadline = Deadline.calculate(updated, timeValue);
366 if (deadline.isExpired()) {
367 boolean stale;
368 try {
369 stale = conn.isStale();
370 } catch (final IOException ignore) {
371 stale = true;
372 }
373 if (stale) {
374 if (LOG.isDebugEnabled()) {
375 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
376 }
377 closeConnection(CloseMode.GRACEFUL);
378 }
379 }
380 }
381 }
382 }
383
384 ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
385 lock.lock();
386 try {
387 Asserts.check(!isClosed(), "Connection manager has been shut down");
388 if (LOG.isDebugEnabled()) {
389 LOG.debug("{} Get connection for route {}", id, route);
390 }
391 Asserts.check(!this.leased, "Connection %s is still allocated", conn);
392 if (!Objects.equals(this.route, route) || !Objects.equals(this.state, state)) {
393 closeConnection(CloseMode.GRACEFUL);
394 }
395 this.route = route;
396 this.state = state;
397 checkExpiry();
398 validate();
399 if (this.conn == null) {
400 this.conn = this.connFactory.createConnection(null);
401 this.created = System.currentTimeMillis();
402 } else {
403 this.conn.activate();
404 if (connectionConfig.getSocketTimeout() != null) {
405 conn.setSocketTimeout(connectionConfig.getSocketTimeout());
406 }
407 }
408 this.leased = true;
409 if (LOG.isDebugEnabled()) {
410 LOG.debug("{} Using connection {}", id, conn);
411 }
412 return this.conn;
413 } finally {
414 lock.unlock();
415 }
416 }
417
418
419 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
420 if (endpoint instanceof InternalConnectionEndpoint) {
421 return (InternalConnectionEndpoint) endpoint;
422 }
423 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
424 }
425
426 @Override
427 public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
428 lock.lock();
429 try {
430 Args.notNull(endpoint, "Managed endpoint");
431 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
432 final ManagedHttpClientConnection conn = internalEndpoint.detach();
433 if (LOG.isDebugEnabled()) {
434 LOG.debug("{} Releasing connection {}", id, conn);
435 }
436 if (isClosed()) {
437 return;
438 }
439 try {
440 if (keepAlive == null && conn != null) {
441 conn.close(CloseMode.GRACEFUL);
442 }
443 this.updated = System.currentTimeMillis();
444 if (conn != null && conn.isOpen() && conn.isConsistent()) {
445 this.state = state;
446 conn.passivate();
447 if (TimeValue.isPositive(keepAlive)) {
448 if (LOG.isDebugEnabled()) {
449 LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
450 }
451 this.expiry = this.updated + keepAlive.toMilliseconds();
452 } else {
453 if (LOG.isDebugEnabled()) {
454 LOG.debug("{} Connection can be kept alive indefinitely", id);
455 }
456 this.expiry = Long.MAX_VALUE;
457 }
458 } else {
459 this.route = null;
460 this.conn = null;
461 this.expiry = Long.MAX_VALUE;
462 if (LOG.isDebugEnabled()) {
463 LOG.debug("{} Connection is not kept alive", id);
464 }
465 }
466 } finally {
467 this.leased = false;
468 }
469 } finally {
470 lock.unlock();
471 }
472 }
473
474 @Override
475 public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
476 lock.lock();
477 try {
478 Args.notNull(endpoint, "Endpoint");
479
480 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
481 if (internalEndpoint.isConnected()) {
482 return;
483 }
484 final HttpRoute route = internalEndpoint.getRoute();
485 final HttpHost firstHop = route.getProxyHost() != null ? route.getProxyHost() : route.getTargetHost();
486 final Timeout connectTimeout = timeout != null ? Timeout.of(timeout.getDuration(), timeout.getTimeUnit()) : connectionConfig.getConnectTimeout();
487 final ManagedHttpClientConnection connection = internalEndpoint.getConnection();
488 if (LOG.isDebugEnabled()) {
489 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), firstHop, connectTimeout);
490 }
491 this.connectionOperator.connect(
492 connection,
493 firstHop,
494 route.getTargetName(),
495 route.getLocalSocketAddress(),
496 connectTimeout,
497 socketConfig,
498 route.isTunnelled() ? null : tlsConfig,
499 context);
500 if (LOG.isDebugEnabled()) {
501 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
502 }
503 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
504 if (socketTimeout != null) {
505 connection.setSocketTimeout(socketTimeout);
506 }
507 } finally {
508 lock.unlock();
509 }
510 }
511
512 @Override
513 public void upgrade(
514 final ConnectionEndpoint endpoint,
515 final HttpContext context) throws IOException {
516 lock.lock();
517 try {
518 Args.notNull(endpoint, "Endpoint");
519 Args.notNull(route, "HTTP route");
520 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
521 final HttpRoute route = internalEndpoint.getRoute();
522 this.connectionOperator.upgrade(
523 internalEndpoint.getConnection(),
524 route.getTargetHost(),
525 route.getTargetName(),
526 tlsConfig,
527 context);
528 } finally {
529 lock.unlock();
530 }
531 }
532
533 public void closeExpired() {
534 lock.lock();
535 try {
536 if (isClosed()) {
537 return;
538 }
539 if (!this.leased) {
540 checkExpiry();
541 }
542 } finally {
543 lock.unlock();
544 }
545 }
546
547 public void closeIdle(final TimeValue idleTime) {
548 lock.lock();
549 try {
550 Args.notNull(idleTime, "Idle time");
551 if (isClosed()) {
552 return;
553 }
554 if (!this.leased) {
555 long time = idleTime.toMilliseconds();
556 if (time < 0) {
557 time = 0;
558 }
559 final long deadline = System.currentTimeMillis() - time;
560 if (this.updated <= deadline) {
561 closeConnection(CloseMode.GRACEFUL);
562 }
563 }
564 } finally {
565 lock.unlock();
566 }
567 }
568
569
570
571
572
573
574
575
576 @Deprecated
577 public TimeValue getValidateAfterInactivity() {
578 return connectionConfig.getValidateAfterInactivity();
579 }
580
581
582
583
584
585
586
587
588
589
590
591 @Deprecated
592 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
593 this.connectionConfig = ConnectionConfig.custom()
594 .setValidateAfterInactivity(validateAfterInactivity)
595 .build();
596 }
597
598 class InternalConnectionEndpoint extends ConnectionEndpoint implements ConnectionHolder, Identifiable {
599
600 private final HttpRoute route;
601 private final AtomicReference<ManagedHttpClientConnection> connRef;
602
603 public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
604 this.route = route;
605 this.connRef = new AtomicReference<>(conn);
606 }
607
608 @Override
609 public String getId() {
610 return id;
611 }
612
613 HttpRoute getRoute() {
614 return route;
615 }
616
617 ManagedHttpClientConnection getConnection() {
618 final ManagedHttpClientConnection conn = this.connRef.get();
619 if (conn == null) {
620 throw new ConnectionShutdownException();
621 }
622 return conn;
623 }
624
625 ManagedHttpClientConnection getValidatedConnection() {
626 final ManagedHttpClientConnection conn = this.connRef.get();
627 if (conn == null || !conn.isOpen()) {
628 throw new ConnectionShutdownException();
629 }
630 return conn;
631 }
632
633 ManagedHttpClientConnection detach() {
634 return this.connRef.getAndSet(null);
635 }
636
637 @Override
638 public boolean isConnected() {
639 final ManagedHttpClientConnection conn = this.connRef.get();
640 return conn != null && conn.isOpen();
641 }
642
643 @Override
644 public void close(final CloseMode closeMode) {
645 final ManagedHttpClientConnection conn = this.connRef.get();
646 if (conn != null) {
647 conn.close(closeMode);
648 }
649 }
650
651 @Override
652 public void close() throws IOException {
653 final ManagedHttpClientConnection conn = this.connRef.get();
654 if (conn != null) {
655 conn.close();
656 }
657 }
658
659 @Override
660 public void setSocketTimeout(final Timeout timeout) {
661 getValidatedConnection().setSocketTimeout(timeout);
662 }
663
664
665
666
667 @Deprecated
668 @Override
669 public ClassicHttpResponse execute(
670 final String exchangeId,
671 final ClassicHttpRequest request,
672 final HttpRequestExecutor requestExecutor,
673 final HttpContext context) throws IOException, HttpException {
674 Args.notNull(request, "HTTP request");
675 Args.notNull(requestExecutor, "Request executor");
676 if (LOG.isDebugEnabled()) {
677 LOG.debug("{} Executing exchange {}", id, exchangeId);
678 }
679 return requestExecutor.execute(request, getValidatedConnection(), context);
680 }
681
682
683
684
685 @Override
686 public ClassicHttpResponse execute(
687 final String exchangeId,
688 final ClassicHttpRequest request,
689 final RequestExecutor requestExecutor,
690 final HttpContext context) throws IOException, HttpException {
691 Args.notNull(request, "HTTP request");
692 Args.notNull(requestExecutor, "Request executor");
693 if (LOG.isDebugEnabled()) {
694 LOG.debug("{} Executing exchange {}", id, exchangeId);
695 }
696 return requestExecutor.execute(request, getValidatedConnection(), context);
697 }
698
699
700
701
702 @Override
703 public EndpointInfo getInfo() {
704 final ManagedHttpClientConnection connection = this.connRef.get();
705 if (connection != null && connection.isOpen()) {
706 return new EndpointInfo(connection.getProtocolVersion(), connection.getSSLSession());
707 }
708 return null;
709 }
710
711 @Override
712 public HttpConnection get() {
713 return this.connRef.get();
714 }
715
716 }
717
718
719
720
721
722
723
724
725
726 public boolean isClosed() {
727 return this.closed.get();
728 }
729
730 }