1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.conn;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.TimeoutException;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.http.HttpClientConnection;
44 import org.apache.http.HttpHost;
45 import org.apache.http.annotation.Contract;
46 import org.apache.http.annotation.ThreadingBehavior;
47 import org.apache.http.config.ConnectionConfig;
48 import org.apache.http.config.Lookup;
49 import org.apache.http.config.Registry;
50 import org.apache.http.config.RegistryBuilder;
51 import org.apache.http.config.SocketConfig;
52 import org.apache.http.conn.ConnectionPoolTimeoutException;
53 import org.apache.http.conn.ConnectionRequest;
54 import org.apache.http.conn.DnsResolver;
55 import org.apache.http.conn.HttpClientConnectionManager;
56 import org.apache.http.conn.HttpClientConnectionOperator;
57 import org.apache.http.conn.HttpConnectionFactory;
58 import org.apache.http.conn.ManagedHttpClientConnection;
59 import org.apache.http.conn.SchemePortResolver;
60 import org.apache.http.conn.routing.HttpRoute;
61 import org.apache.http.conn.socket.ConnectionSocketFactory;
62 import org.apache.http.conn.socket.PlainConnectionSocketFactory;
63 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
64 import org.apache.http.pool.ConnFactory;
65 import org.apache.http.pool.ConnPoolControl;
66 import org.apache.http.pool.PoolEntry;
67 import org.apache.http.pool.PoolEntryCallback;
68 import org.apache.http.pool.PoolStats;
69 import org.apache.http.protocol.HttpContext;
70 import org.apache.http.util.Args;
71 import org.apache.http.util.Asserts;
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
105 public class PoolingHttpClientConnectionManager
106 implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
107
108 private final Log log = LogFactory.getLog(getClass());
109
110 private final ConfigData configData;
111 private final CPool pool;
112 private final HttpClientConnectionOperator connectionOperator;
113 private final AtomicBoolean isShutDown;
114
115 private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
116 return RegistryBuilder.<ConnectionSocketFactory>create()
117 .register("http", PlainConnectionSocketFactory.getSocketFactory())
118 .register("https", SSLConnectionSocketFactory.getSocketFactory())
119 .build();
120 }
121
122 public PoolingHttpClientConnectionManager() {
123 this(getDefaultRegistry());
124 }
125
126 public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit timeUnit) {
127 this(getDefaultRegistry(), null, null ,null, timeToLive, timeUnit);
128 }
129
130 public PoolingHttpClientConnectionManager(
131 final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
132 this(socketFactoryRegistry, null, null);
133 }
134
135 public PoolingHttpClientConnectionManager(
136 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
137 final DnsResolver dnsResolver) {
138 this(socketFactoryRegistry, null, dnsResolver);
139 }
140
141 public PoolingHttpClientConnectionManager(
142 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
143 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
144 this(socketFactoryRegistry, connFactory, null);
145 }
146
147 public PoolingHttpClientConnectionManager(
148 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
149 this(getDefaultRegistry(), connFactory, null);
150 }
151
152 public PoolingHttpClientConnectionManager(
153 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
154 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
155 final DnsResolver dnsResolver) {
156 this(socketFactoryRegistry, connFactory, null, dnsResolver, -1, TimeUnit.MILLISECONDS);
157 }
158
159 public PoolingHttpClientConnectionManager(
160 final Registry<ConnectionSocketFactory> socketFactoryRegistry,
161 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
162 final SchemePortResolver schemePortResolver,
163 final DnsResolver dnsResolver,
164 final long timeToLive, final TimeUnit timeUnit) {
165 this(
166 new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
167 connFactory,
168 timeToLive, timeUnit
169 );
170 }
171
172
173
174
175 public PoolingHttpClientConnectionManager(
176 final HttpClientConnectionOperator httpClientConnectionOperator,
177 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
178 final long timeToLive, final TimeUnit timeUnit) {
179 super();
180 this.configData = new ConfigData();
181 this.pool = new CPool(new InternalConnectionFactory(
182 this.configData, connFactory), 2, 20, timeToLive, timeUnit);
183 this.pool.setValidateAfterInactivity(2000);
184 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
185 this.isShutDown = new AtomicBoolean(false);
186 }
187
188
189
190
191 PoolingHttpClientConnectionManager(
192 final CPool pool,
193 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
194 final SchemePortResolver schemePortResolver,
195 final DnsResolver dnsResolver) {
196 super();
197 this.configData = new ConfigData();
198 this.pool = pool;
199 this.connectionOperator = new DefaultHttpClientConnectionOperator(
200 socketFactoryRegistry, schemePortResolver, dnsResolver);
201 this.isShutDown = new AtomicBoolean(false);
202 }
203
204 @Override
205 protected void finalize() throws Throwable {
206 try {
207 shutdown();
208 } finally {
209 super.finalize();
210 }
211 }
212
213 @Override
214 public void close() {
215 shutdown();
216 }
217
218 private String format(final HttpRoute route, final Object state) {
219 final StringBuilder buf = new StringBuilder();
220 buf.append("[route: ").append(route).append("]");
221 if (state != null) {
222 buf.append("[state: ").append(state).append("]");
223 }
224 return buf.toString();
225 }
226
227 private String formatStats(final HttpRoute route) {
228 final StringBuilder buf = new StringBuilder();
229 final PoolStats totals = this.pool.getTotalStats();
230 final PoolStats stats = this.pool.getStats(route);
231 buf.append("[total available: ").append(totals.getAvailable()).append("; ");
232 buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
233 buf.append(" of ").append(stats.getMax()).append("; ");
234 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
235 buf.append(" of ").append(totals.getMax()).append("]");
236 return buf.toString();
237 }
238
239 private String format(final CPoolEntry entry) {
240 final StringBuilder buf = new StringBuilder();
241 buf.append("[id: ").append(entry.getId()).append("]");
242 buf.append("[route: ").append(entry.getRoute()).append("]");
243 final Object state = entry.getState();
244 if (state != null) {
245 buf.append("[state: ").append(state).append("]");
246 }
247 return buf.toString();
248 }
249
250 private SocketConfig resolveSocketConfig(final HttpHost host) {
251 SocketConfig socketConfig = this.configData.getSocketConfig(host);
252 if (socketConfig == null) {
253 socketConfig = this.configData.getDefaultSocketConfig();
254 }
255 if (socketConfig == null) {
256 socketConfig = SocketConfig.DEFAULT;
257 }
258 return socketConfig;
259 }
260
261 @Override
262 public ConnectionRequest requestConnection(
263 final HttpRoute route,
264 final Object state) {
265 Args.notNull(route, "HTTP route");
266 if (this.log.isDebugEnabled()) {
267 this.log.debug("Connection request: " + format(route, state) + formatStats(route));
268 }
269 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
270 final Future<CPoolEntry> future = this.pool.lease(route, state, null);
271 return new ConnectionRequest() {
272
273 @Override
274 public boolean cancel() {
275 return future.cancel(true);
276 }
277
278 @Override
279 public HttpClientConnection get(
280 final long timeout,
281 final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
282 final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
283 if (conn.isOpen()) {
284 final HttpHost host;
285 if (route.getProxyHost() != null) {
286 host = route.getProxyHost();
287 } else {
288 host = route.getTargetHost();
289 }
290 final SocketConfig socketConfig = resolveSocketConfig(host);
291 conn.setSocketTimeout(socketConfig.getSoTimeout());
292 }
293 return conn;
294 }
295
296 };
297
298 }
299
300 protected HttpClientConnection leaseConnection(
301 final Future<CPoolEntry> future,
302 final long timeout,
303 final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
304 final CPoolEntry entry;
305 try {
306 entry = future.get(timeout, timeUnit);
307 if (entry == null || future.isCancelled()) {
308 throw new ExecutionException(new CancellationException("Operation cancelled"));
309 }
310 Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
311 if (this.log.isDebugEnabled()) {
312 this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
313 }
314 return CPoolProxy.newProxy(entry);
315 } catch (final TimeoutException ex) {
316 throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
317 }
318 }
319
320 @Override
321 public void releaseConnection(
322 final HttpClientConnection managedConn,
323 final Object state,
324 final long keepalive, final TimeUnit timeUnit) {
325 Args.notNull(managedConn, "Managed connection");
326 synchronized (managedConn) {
327 final CPoolEntry entry = CPoolProxy.detach(managedConn);
328 if (entry == null) {
329 return;
330 }
331 final ManagedHttpClientConnection conn = entry.getConnection();
332 try {
333 if (conn.isOpen()) {
334 final TimeUnit effectiveUnit = timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS;
335 entry.setState(state);
336 entry.updateExpiry(keepalive, effectiveUnit);
337 if (this.log.isDebugEnabled()) {
338 final String s;
339 if (keepalive > 0) {
340 s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
341 } else {
342 s = "indefinitely";
343 }
344 this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
345 }
346 conn.setSocketTimeout(0);
347 }
348 } finally {
349 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
350 if (this.log.isDebugEnabled()) {
351 this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
352 }
353 }
354 }
355 }
356
357 @Override
358 public void connect(
359 final HttpClientConnection managedConn,
360 final HttpRoute route,
361 final int connectTimeout,
362 final HttpContext context) throws IOException {
363 Args.notNull(managedConn, "Managed Connection");
364 Args.notNull(route, "HTTP route");
365 final ManagedHttpClientConnection conn;
366 synchronized (managedConn) {
367 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
368 conn = entry.getConnection();
369 }
370 final HttpHost host;
371 if (route.getProxyHost() != null) {
372 host = route.getProxyHost();
373 } else {
374 host = route.getTargetHost();
375 }
376 this.connectionOperator.connect(
377 conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
378 }
379
380 @Override
381 public void upgrade(
382 final HttpClientConnection managedConn,
383 final HttpRoute route,
384 final HttpContext context) throws IOException {
385 Args.notNull(managedConn, "Managed Connection");
386 Args.notNull(route, "HTTP route");
387 final ManagedHttpClientConnection conn;
388 synchronized (managedConn) {
389 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
390 conn = entry.getConnection();
391 }
392 this.connectionOperator.upgrade(conn, route.getTargetHost(), context);
393 }
394
395 @Override
396 public void routeComplete(
397 final HttpClientConnection managedConn,
398 final HttpRoute route,
399 final HttpContext context) throws IOException {
400 Args.notNull(managedConn, "Managed Connection");
401 Args.notNull(route, "HTTP route");
402 synchronized (managedConn) {
403 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
404 entry.markRouteComplete();
405 }
406 }
407
408 @Override
409 public void shutdown() {
410 if (this.isShutDown.compareAndSet(false, true)) {
411 this.log.debug("Connection manager is shutting down");
412 try {
413 this.pool.enumLeased(new PoolEntryCallback<HttpRoute, ManagedHttpClientConnection>() {
414
415 @Override
416 public void process(final PoolEntry<HttpRoute, ManagedHttpClientConnection> entry) {
417 final ManagedHttpClientConnection connection = entry.getConnection();
418 if (connection != null) {
419 try {
420 connection.shutdown();
421 } catch (final IOException iox) {
422 if (log.isDebugEnabled()) {
423 log.debug("I/O exception shutting down connection", iox);
424 }
425 }
426 }
427 }
428
429 });
430 this.pool.shutdown();
431 } catch (final IOException ex) {
432 this.log.debug("I/O exception shutting down connection manager", ex);
433 }
434 this.log.debug("Connection manager shut down");
435 }
436 }
437
438 @Override
439 public void closeIdleConnections(final long idleTimeout, final TimeUnit timeUnit) {
440 if (this.log.isDebugEnabled()) {
441 this.log.debug("Closing connections idle longer than " + idleTimeout + " " + timeUnit);
442 }
443 this.pool.closeIdle(idleTimeout, timeUnit);
444 }
445
446 @Override
447 public void closeExpiredConnections() {
448 this.log.debug("Closing expired connections");
449 this.pool.closeExpired();
450 }
451
452 protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
453 this.pool.enumAvailable(callback);
454 }
455
456 protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
457 this.pool.enumLeased(callback);
458 }
459
460 @Override
461 public int getMaxTotal() {
462 return this.pool.getMaxTotal();
463 }
464
465 @Override
466 public void setMaxTotal(final int max) {
467 this.pool.setMaxTotal(max);
468 }
469
470 @Override
471 public int getDefaultMaxPerRoute() {
472 return this.pool.getDefaultMaxPerRoute();
473 }
474
475 @Override
476 public void setDefaultMaxPerRoute(final int max) {
477 this.pool.setDefaultMaxPerRoute(max);
478 }
479
480 @Override
481 public int getMaxPerRoute(final HttpRoute route) {
482 return this.pool.getMaxPerRoute(route);
483 }
484
485 @Override
486 public void setMaxPerRoute(final HttpRoute route, final int max) {
487 this.pool.setMaxPerRoute(route, max);
488 }
489
490 @Override
491 public PoolStats getTotalStats() {
492 return this.pool.getTotalStats();
493 }
494
495 @Override
496 public PoolStats getStats(final HttpRoute route) {
497 return this.pool.getStats(route);
498 }
499
500
501
502
503 public Set<HttpRoute> getRoutes() {
504 return this.pool.getRoutes();
505 }
506
507 public SocketConfig getDefaultSocketConfig() {
508 return this.configData.getDefaultSocketConfig();
509 }
510
511 public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
512 this.configData.setDefaultSocketConfig(defaultSocketConfig);
513 }
514
515 public ConnectionConfig getDefaultConnectionConfig() {
516 return this.configData.getDefaultConnectionConfig();
517 }
518
519 public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
520 this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
521 }
522
523 public SocketConfig getSocketConfig(final HttpHost host) {
524 return this.configData.getSocketConfig(host);
525 }
526
527 public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
528 this.configData.setSocketConfig(host, socketConfig);
529 }
530
531 public ConnectionConfig getConnectionConfig(final HttpHost host) {
532 return this.configData.getConnectionConfig(host);
533 }
534
535 public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
536 this.configData.setConnectionConfig(host, connectionConfig);
537 }
538
539
540
541
542
543
544 public int getValidateAfterInactivity() {
545 return pool.getValidateAfterInactivity();
546 }
547
548
549
550
551
552
553
554
555
556
557
558
559 public void setValidateAfterInactivity(final int ms) {
560 pool.setValidateAfterInactivity(ms);
561 }
562
563 static class ConfigData {
564
565 private final Map<HttpHost, SocketConfig> socketConfigMap;
566 private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
567 private volatile SocketConfig defaultSocketConfig;
568 private volatile ConnectionConfig defaultConnectionConfig;
569
570 ConfigData() {
571 super();
572 this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
573 this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
574 }
575
576 public SocketConfig getDefaultSocketConfig() {
577 return this.defaultSocketConfig;
578 }
579
580 public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
581 this.defaultSocketConfig = defaultSocketConfig;
582 }
583
584 public ConnectionConfig getDefaultConnectionConfig() {
585 return this.defaultConnectionConfig;
586 }
587
588 public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
589 this.defaultConnectionConfig = defaultConnectionConfig;
590 }
591
592 public SocketConfig getSocketConfig(final HttpHost host) {
593 return this.socketConfigMap.get(host);
594 }
595
596 public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
597 this.socketConfigMap.put(host, socketConfig);
598 }
599
600 public ConnectionConfig getConnectionConfig(final HttpHost host) {
601 return this.connectionConfigMap.get(host);
602 }
603
604 public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
605 this.connectionConfigMap.put(host, connectionConfig);
606 }
607
608 }
609
610 static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
611
612 private final ConfigData configData;
613 private final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory;
614
615 InternalConnectionFactory(
616 final ConfigData configData,
617 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
618 super();
619 this.configData = configData != null ? configData : new ConfigData();
620 this.connFactory = connFactory != null ? connFactory :
621 ManagedHttpClientConnectionFactory.INSTANCE;
622 }
623
624 @Override
625 public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
626 ConnectionConfig config = null;
627 if (route.getProxyHost() != null) {
628 config = this.configData.getConnectionConfig(route.getProxyHost());
629 }
630 if (config == null) {
631 config = this.configData.getConnectionConfig(route.getTargetHost());
632 }
633 if (config == null) {
634 config = this.configData.getDefaultConnectionConfig();
635 }
636 if (config == null) {
637 config = ConnectionConfig.DEFAULT;
638 }
639 return this.connFactory.create(route, config);
640 }
641
642 }
643
644 }