View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.conn;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.http.HttpClientConnection;
44  import org.apache.http.HttpHost;
45  import org.apache.http.annotation.Contract;
46  import org.apache.http.annotation.ThreadingBehavior;
47  import org.apache.http.config.ConnectionConfig;
48  import org.apache.http.config.Lookup;
49  import org.apache.http.config.Registry;
50  import org.apache.http.config.RegistryBuilder;
51  import org.apache.http.config.SocketConfig;
52  import org.apache.http.conn.ConnectionPoolTimeoutException;
53  import org.apache.http.conn.ConnectionRequest;
54  import org.apache.http.conn.DnsResolver;
55  import org.apache.http.conn.HttpClientConnectionManager;
56  import org.apache.http.conn.HttpClientConnectionOperator;
57  import org.apache.http.conn.HttpConnectionFactory;
58  import org.apache.http.conn.ManagedHttpClientConnection;
59  import org.apache.http.conn.SchemePortResolver;
60  import org.apache.http.conn.routing.HttpRoute;
61  import org.apache.http.conn.socket.ConnectionSocketFactory;
62  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
63  import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
64  import org.apache.http.pool.ConnFactory;
65  import org.apache.http.pool.ConnPoolControl;
66  import org.apache.http.pool.PoolEntryCallback;
67  import org.apache.http.pool.PoolStats;
68  import org.apache.http.protocol.HttpContext;
69  import org.apache.http.util.Args;
70  import org.apache.http.util.Asserts;
71  
72  /**
73   * {@code ClientConnectionPoolManager} maintains a pool of
74   * {@link HttpClientConnection}s and is able to service connection requests
75   * from multiple execution threads. Connections are pooled on a per route
76   * basis. A request for a route which already the manager has persistent
77   * connections for available in the pool will be services by leasing
78   * a connection from the pool rather than creating a brand new connection.
79   * <p>
80   * {@code ClientConnectionPoolManager} maintains a maximum limit of connection
81   * on a per route basis and in total. Per default this implementation will
82   * create no more than than 2 concurrent connections per given route
83   * and no more 20 connections in total. For many real-world applications
84   * these limits may prove too constraining, especially if they use HTTP
85   * as a transport protocol for their services. Connection limits, however,
86   * can be adjusted using {@link ConnPoolControl} methods.
87   * </p>
88   * <p>
89   * Total time to live (TTL) set at construction time defines maximum life span
90   * of persistent connections regardless of their expiration setting. No persistent
91   * connection will be re-used past its TTL value.
92   * </p>
93   * <p>
94   * The handling of stale connections was changed in version 4.4.
95   * Previously, the code would check every connection by default before re-using it.
96   * The code now only checks the connection if the elapsed time since
97   * the last use of the connection exceeds the timeout that has been set.
98   * The default timeout is set to 2000ms
99   * </p>
100  *
101  * @since 4.3
102  */
103 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
104 public class PoolingHttpClientConnectionManager
105     implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
106 
107     private final Log log = LogFactory.getLog(getClass());
108 
109     private final ConfigData configData;
110     private final CPool pool;
111     private final HttpClientConnectionOperator connectionOperator;
112     private final AtomicBoolean isShutDown;
113 
114     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
115         return RegistryBuilder.<ConnectionSocketFactory>create()
116                 .register("http", PlainConnectionSocketFactory.getSocketFactory())
117                 .register("https", SSLConnectionSocketFactory.getSocketFactory())
118                 .build();
119     }
120 
121     public PoolingHttpClientConnectionManager() {
122         this(getDefaultRegistry());
123     }
124 
125     public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit timeUnit) {
126         this(getDefaultRegistry(), null, null ,null, timeToLive, timeUnit);
127     }
128 
129     public PoolingHttpClientConnectionManager(
130             final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
131         this(socketFactoryRegistry, null, null);
132     }
133 
134     public PoolingHttpClientConnectionManager(
135             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
136             final DnsResolver dnsResolver) {
137         this(socketFactoryRegistry, null, dnsResolver);
138     }
139 
140     public PoolingHttpClientConnectionManager(
141             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
142             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
143         this(socketFactoryRegistry, connFactory, null);
144     }
145 
146     public PoolingHttpClientConnectionManager(
147             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
148         this(getDefaultRegistry(), connFactory, null);
149     }
150 
151     public PoolingHttpClientConnectionManager(
152             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
153             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
154             final DnsResolver dnsResolver) {
155         this(socketFactoryRegistry, connFactory, null, dnsResolver, -1, TimeUnit.MILLISECONDS);
156     }
157 
158     public PoolingHttpClientConnectionManager(
159             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
160             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
161             final SchemePortResolver schemePortResolver,
162             final DnsResolver dnsResolver,
163             final long timeToLive, final TimeUnit timeUnit) {
164         this(
165             new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
166             connFactory,
167             timeToLive, timeUnit
168         );
169     }
170 
171     /**
172      * @since 4.4
173      */
174     public PoolingHttpClientConnectionManager(
175         final HttpClientConnectionOperator httpClientConnectionOperator,
176         final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
177         final long timeToLive, final TimeUnit timeUnit) {
178         super();
179         this.configData = new ConfigData();
180         this.pool = new CPool(new InternalConnectionFactory(
181                 this.configData, connFactory), 2, 20, timeToLive, timeUnit);
182         this.pool.setValidateAfterInactivity(2000);
183         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
184         this.isShutDown = new AtomicBoolean(false);
185     }
186 
187     /**
188      * Visible for test.
189      */
190     PoolingHttpClientConnectionManager(
191             final CPool pool,
192             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
193             final SchemePortResolver schemePortResolver,
194             final DnsResolver dnsResolver) {
195         super();
196         this.configData = new ConfigData();
197         this.pool = pool;
198         this.connectionOperator = new DefaultHttpClientConnectionOperator(
199                 socketFactoryRegistry, schemePortResolver, dnsResolver);
200         this.isShutDown = new AtomicBoolean(false);
201     }
202 
203     @Override
204     protected void finalize() throws Throwable {
205         try {
206             shutdown();
207         } finally {
208             super.finalize();
209         }
210     }
211 
212     @Override
213     public void close() {
214         shutdown();
215     }
216 
217     private String format(final HttpRoute route, final Object state) {
218         final StringBuilder buf = new StringBuilder();
219         buf.append("[route: ").append(route).append("]");
220         if (state != null) {
221             buf.append("[state: ").append(state).append("]");
222         }
223         return buf.toString();
224     }
225 
226     private String formatStats(final HttpRoute route) {
227         final StringBuilder buf = new StringBuilder();
228         final PoolStats totals = this.pool.getTotalStats();
229         final PoolStats stats = this.pool.getStats(route);
230         buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
231         buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
232         buf.append(" of ").append(stats.getMax()).append("; ");
233         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
234         buf.append(" of ").append(totals.getMax()).append("]");
235         return buf.toString();
236     }
237 
238     private String format(final CPoolEntry entry) {
239         final StringBuilder buf = new StringBuilder();
240         buf.append("[id: ").append(entry.getId()).append("]");
241         buf.append("[route: ").append(entry.getRoute()).append("]");
242         final Object state = entry.getState();
243         if (state != null) {
244             buf.append("[state: ").append(state).append("]");
245         }
246         return buf.toString();
247     }
248 
249     private SocketConfig resolveSocketConfig(final HttpHost host) {
250         SocketConfig socketConfig = this.configData.getSocketConfig(host);
251         if (socketConfig == null) {
252             socketConfig = this.configData.getDefaultSocketConfig();
253         }
254         if (socketConfig == null) {
255             socketConfig = SocketConfig.DEFAULT;
256         }
257         return socketConfig;
258     }
259 
260     @Override
261     public ConnectionRequest requestConnection(
262             final HttpRoute route,
263             final Object state) {
264         Args.notNull(route, "HTTP route");
265         if (this.log.isDebugEnabled()) {
266             this.log.debug("Connection request: " + format(route, state) + formatStats(route));
267         }
268         final Future<CPoolEntry> future = this.pool.lease(route, state, null);
269         return new ConnectionRequest() {
270 
271             @Override
272             public boolean cancel() {
273                 return future.cancel(true);
274             }
275 
276             @Override
277             public HttpClientConnection get(
278                     final long timeout,
279                     final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
280                 final HttpClientConnection conn = leaseConnection(future, timeout, timeUnit);
281                 if (conn.isOpen()) {
282                     final HttpHost host;
283                     if (route.getProxyHost() != null) {
284                         host = route.getProxyHost();
285                     } else {
286                         host = route.getTargetHost();
287                     }
288                     final SocketConfig socketConfig = resolveSocketConfig(host);
289                     conn.setSocketTimeout(socketConfig.getSoTimeout());
290                 }
291                 return conn;
292             }
293 
294         };
295 
296     }
297 
298     protected HttpClientConnection leaseConnection(
299             final Future<CPoolEntry> future,
300             final long timeout,
301             final TimeUnit timeUnit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
302         final CPoolEntry entry;
303         try {
304             entry = future.get(timeout, timeUnit);
305             if (entry == null || future.isCancelled()) {
306                 throw new ExecutionException(new CancellationException("Operation cancelled"));
307             }
308             Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
309             if (this.log.isDebugEnabled()) {
310                 this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
311             }
312             return CPoolProxy.newProxy(entry);
313         } catch (final TimeoutException ex) {
314             throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
315         }
316     }
317 
318     @Override
319     public void releaseConnection(
320             final HttpClientConnection managedConn,
321             final Object state,
322             final long keepalive, final TimeUnit timeUnit) {
323         Args.notNull(managedConn, "Managed connection");
324         synchronized (managedConn) {
325             final CPoolEntry entry = CPoolProxy.detach(managedConn);
326             if (entry == null) {
327                 return;
328             }
329             final ManagedHttpClientConnection conn = entry.getConnection();
330             try {
331                 if (conn.isOpen()) {
332                     final TimeUnit effectiveUnit = timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS;
333                     entry.setState(state);
334                     entry.updateExpiry(keepalive, effectiveUnit);
335                     if (this.log.isDebugEnabled()) {
336                         final String s;
337                         if (keepalive > 0) {
338                             s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
339                         } else {
340                             s = "indefinitely";
341                         }
342                         this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
343                     }
344                     conn.setSocketTimeout(0);
345                 }
346             } finally {
347                 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
348                 if (this.log.isDebugEnabled()) {
349                     this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
350                 }
351             }
352         }
353     }
354 
355     @Override
356     public void connect(
357             final HttpClientConnection managedConn,
358             final HttpRoute route,
359             final int connectTimeout,
360             final HttpContext context) throws IOException {
361         Args.notNull(managedConn, "Managed Connection");
362         Args.notNull(route, "HTTP route");
363         final ManagedHttpClientConnection conn;
364         synchronized (managedConn) {
365             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
366             conn = entry.getConnection();
367         }
368         final HttpHost host;
369         if (route.getProxyHost() != null) {
370             host = route.getProxyHost();
371         } else {
372             host = route.getTargetHost();
373         }
374         this.connectionOperator.connect(
375                 conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
376     }
377 
378     @Override
379     public void upgrade(
380             final HttpClientConnection managedConn,
381             final HttpRoute route,
382             final HttpContext context) throws IOException {
383         Args.notNull(managedConn, "Managed Connection");
384         Args.notNull(route, "HTTP route");
385         final ManagedHttpClientConnection conn;
386         synchronized (managedConn) {
387             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
388             conn = entry.getConnection();
389         }
390         this.connectionOperator.upgrade(conn, route.getTargetHost(), context);
391     }
392 
393     @Override
394     public void routeComplete(
395             final HttpClientConnection managedConn,
396             final HttpRoute route,
397             final HttpContext context) throws IOException {
398         Args.notNull(managedConn, "Managed Connection");
399         Args.notNull(route, "HTTP route");
400         synchronized (managedConn) {
401             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
402             entry.markRouteComplete();
403         }
404     }
405 
406     @Override
407     public void shutdown() {
408         if (this.isShutDown.compareAndSet(false, true)) {
409             this.log.debug("Connection manager is shutting down");
410             try {
411                 this.pool.shutdown();
412             } catch (final IOException ex) {
413                 this.log.debug("I/O exception shutting down connection manager", ex);
414             }
415             this.log.debug("Connection manager shut down");
416         }
417     }
418 
419     @Override
420     public void closeIdleConnections(final long idleTimeout, final TimeUnit timeUnit) {
421         if (this.log.isDebugEnabled()) {
422             this.log.debug("Closing connections idle longer than " + idleTimeout + " " + timeUnit);
423         }
424         this.pool.closeIdle(idleTimeout, timeUnit);
425     }
426 
427     @Override
428     public void closeExpiredConnections() {
429         this.log.debug("Closing expired connections");
430         this.pool.closeExpired();
431     }
432 
433     protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
434         this.pool.enumAvailable(callback);
435     }
436 
437     protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
438         this.pool.enumLeased(callback);
439     }
440 
441     @Override
442     public int getMaxTotal() {
443         return this.pool.getMaxTotal();
444     }
445 
446     @Override
447     public void setMaxTotal(final int max) {
448         this.pool.setMaxTotal(max);
449     }
450 
451     @Override
452     public int getDefaultMaxPerRoute() {
453         return this.pool.getDefaultMaxPerRoute();
454     }
455 
456     @Override
457     public void setDefaultMaxPerRoute(final int max) {
458         this.pool.setDefaultMaxPerRoute(max);
459     }
460 
461     @Override
462     public int getMaxPerRoute(final HttpRoute route) {
463         return this.pool.getMaxPerRoute(route);
464     }
465 
466     @Override
467     public void setMaxPerRoute(final HttpRoute route, final int max) {
468         this.pool.setMaxPerRoute(route, max);
469     }
470 
471     @Override
472     public PoolStats getTotalStats() {
473         return this.pool.getTotalStats();
474     }
475 
476     @Override
477     public PoolStats getStats(final HttpRoute route) {
478         return this.pool.getStats(route);
479     }
480 
481     /**
482      * @since 4.4
483      */
484     public Set<HttpRoute> getRoutes() {
485         return this.pool.getRoutes();
486     }
487 
488     public SocketConfig getDefaultSocketConfig() {
489         return this.configData.getDefaultSocketConfig();
490     }
491 
492     public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
493         this.configData.setDefaultSocketConfig(defaultSocketConfig);
494     }
495 
496     public ConnectionConfig getDefaultConnectionConfig() {
497         return this.configData.getDefaultConnectionConfig();
498     }
499 
500     public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
501         this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
502     }
503 
504     public SocketConfig getSocketConfig(final HttpHost host) {
505         return this.configData.getSocketConfig(host);
506     }
507 
508     public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
509         this.configData.setSocketConfig(host, socketConfig);
510     }
511 
512     public ConnectionConfig getConnectionConfig(final HttpHost host) {
513         return this.configData.getConnectionConfig(host);
514     }
515 
516     public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
517         this.configData.setConnectionConfig(host, connectionConfig);
518     }
519 
520     /**
521      * @see #setValidateAfterInactivity(int)
522      *
523      * @since 4.4
524      */
525     public int getValidateAfterInactivity() {
526         return pool.getValidateAfterInactivity();
527     }
528 
529     /**
530      * Defines period of inactivity in milliseconds after which persistent connections must
531      * be re-validated prior to being {@link #leaseConnection(java.util.concurrent.Future,
532      *   long, java.util.concurrent.TimeUnit) leased} to the consumer. Non-positive value passed
533      * to this method disables connection validation. This check helps detect connections
534      * that have become stale (half-closed) while kept inactive in the pool.
535      *
536      * @see #leaseConnection(java.util.concurrent.Future, long, java.util.concurrent.TimeUnit)
537      *
538      * @since 4.4
539      */
540     public void setValidateAfterInactivity(final int ms) {
541         pool.setValidateAfterInactivity(ms);
542     }
543 
544     static class ConfigData {
545 
546         private final Map<HttpHost, SocketConfig> socketConfigMap;
547         private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
548         private volatile SocketConfig defaultSocketConfig;
549         private volatile ConnectionConfig defaultConnectionConfig;
550 
551         ConfigData() {
552             super();
553             this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
554             this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
555         }
556 
557         public SocketConfig getDefaultSocketConfig() {
558             return this.defaultSocketConfig;
559         }
560 
561         public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
562             this.defaultSocketConfig = defaultSocketConfig;
563         }
564 
565         public ConnectionConfig getDefaultConnectionConfig() {
566             return this.defaultConnectionConfig;
567         }
568 
569         public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
570             this.defaultConnectionConfig = defaultConnectionConfig;
571         }
572 
573         public SocketConfig getSocketConfig(final HttpHost host) {
574             return this.socketConfigMap.get(host);
575         }
576 
577         public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
578             this.socketConfigMap.put(host, socketConfig);
579         }
580 
581         public ConnectionConfig getConnectionConfig(final HttpHost host) {
582             return this.connectionConfigMap.get(host);
583         }
584 
585         public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
586             this.connectionConfigMap.put(host, connectionConfig);
587         }
588 
589     }
590 
591     static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
592 
593         private final ConfigData configData;
594         private final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory;
595 
596         InternalConnectionFactory(
597                 final ConfigData configData,
598                 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
599             super();
600             this.configData = configData != null ? configData : new ConfigData();
601             this.connFactory = connFactory != null ? connFactory :
602                 ManagedHttpClientConnectionFactory.INSTANCE;
603         }
604 
605         @Override
606         public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
607             ConnectionConfig config = null;
608             if (route.getProxyHost() != null) {
609                 config = this.configData.getConnectionConfig(route.getProxyHost());
610             }
611             if (config == null) {
612                 config = this.configData.getConnectionConfig(route.getTargetHost());
613             }
614             if (config == null) {
615                 config = this.configData.getDefaultConnectionConfig();
616             }
617             if (config == null) {
618                 config = ConnectionConfig.DEFAULT;
619             }
620             return this.connFactory.create(route, config);
621         }
622 
623     }
624 
625 }