View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.conn;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.http.HttpClientConnection;
44  import org.apache.http.HttpHost;
45  import org.apache.http.annotation.Contract;
46  import org.apache.http.annotation.ThreadingBehavior;
47  import org.apache.http.config.ConnectionConfig;
48  import org.apache.http.config.Lookup;
49  import org.apache.http.config.Registry;
50  import org.apache.http.config.RegistryBuilder;
51  import org.apache.http.config.SocketConfig;
52  import org.apache.http.conn.ConnectionPoolTimeoutException;
53  import org.apache.http.conn.ConnectionRequest;
54  import org.apache.http.conn.DnsResolver;
55  import org.apache.http.conn.HttpClientConnectionManager;
56  import org.apache.http.conn.HttpClientConnectionOperator;
57  import org.apache.http.conn.HttpConnectionFactory;
58  import org.apache.http.conn.ManagedHttpClientConnection;
59  import org.apache.http.conn.SchemePortResolver;
60  import org.apache.http.conn.routing.HttpRoute;
61  import org.apache.http.conn.socket.ConnectionSocketFactory;
62  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
63  import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
64  import org.apache.http.pool.ConnFactory;
65  import org.apache.http.pool.ConnPoolControl;
66  import org.apache.http.pool.PoolEntryCallback;
67  import org.apache.http.pool.PoolStats;
68  import org.apache.http.protocol.HttpContext;
69  import org.apache.http.util.Args;
70  import org.apache.http.util.Asserts;
71  
72  /**
73   * {@code ClientConnectionPoolManager} maintains a pool of
74   * {@link HttpClientConnection}s and is able to service connection requests
75   * from multiple execution threads. Connections are pooled on a per route
76   * basis. A request for a route which already the manager has persistent
77   * connections for available in the pool will be services by leasing
78   * a connection from the pool rather than creating a brand new connection.
79   * <p>
80   * {@code ClientConnectionPoolManager} maintains a maximum limit of connection
81   * on a per route basis and in total. Per default this implementation will
82   * create no more than than 2 concurrent connections per given route
83   * and no more 20 connections in total. For many real-world applications
84   * these limits may prove too constraining, especially if they use HTTP
85   * as a transport protocol for their services. Connection limits, however,
86   * can be adjusted using {@link ConnPoolControl} methods.
87   * </p>
88   * <p>
89   * Total time to live (TTL) set at construction time defines maximum life span
90   * of persistent connections regardless of their expiration setting. No persistent
91   * connection will be re-used past its TTL value.
92   * </p>
93   * <p>
94   * The handling of stale connections was changed in version 4.4.
95   * Previously, the code would check every connection by default before re-using it.
96   * The code now only checks the connection if the elapsed time since
97   * the last use of the connection exceeds the timeout that has been set.
98   * The default timeout is set to 2000ms
99   * </p>
100  *
101  * @since 4.3
102  */
103 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
104 public class PoolingHttpClientConnectionManager
105     implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
106 
107     private final Log log = LogFactory.getLog(getClass());
108 
109     private final ConfigData configData;
110     private final CPool pool;
111     private final HttpClientConnectionOperator connectionOperator;
112     private final AtomicBoolean isShutDown;
113 
114     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
115         return RegistryBuilder.<ConnectionSocketFactory>create()
116                 .register("http", PlainConnectionSocketFactory.getSocketFactory())
117                 .register("https", SSLConnectionSocketFactory.getSocketFactory())
118                 .build();
119     }
120 
121     public PoolingHttpClientConnectionManager() {
122         this(getDefaultRegistry());
123     }
124 
125     public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit tunit) {
126         this(getDefaultRegistry(), null, null ,null, timeToLive, tunit);
127     }
128 
129     public PoolingHttpClientConnectionManager(
130             final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
131         this(socketFactoryRegistry, null, null);
132     }
133 
134     public PoolingHttpClientConnectionManager(
135             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
136             final DnsResolver dnsResolver) {
137         this(socketFactoryRegistry, null, dnsResolver);
138     }
139 
140     public PoolingHttpClientConnectionManager(
141             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
142             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
143         this(socketFactoryRegistry, connFactory, null);
144     }
145 
146     public PoolingHttpClientConnectionManager(
147             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
148         this(getDefaultRegistry(), connFactory, null);
149     }
150 
151     public PoolingHttpClientConnectionManager(
152             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
153             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
154             final DnsResolver dnsResolver) {
155         this(socketFactoryRegistry, connFactory, null, dnsResolver, -1, TimeUnit.MILLISECONDS);
156     }
157 
158     public PoolingHttpClientConnectionManager(
159             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
160             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
161             final SchemePortResolver schemePortResolver,
162             final DnsResolver dnsResolver,
163             final long timeToLive, final TimeUnit tunit) {
164         this(
165             new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
166             connFactory,
167             timeToLive, tunit
168         );
169     }
170 
171     /**
172      * @since 4.4
173      */
174     public PoolingHttpClientConnectionManager(
175         final HttpClientConnectionOperator httpClientConnectionOperator,
176         final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
177         final long timeToLive, final TimeUnit tunit) {
178         super();
179         this.configData = new ConfigData();
180         this.pool = new CPool(new InternalConnectionFactory(
181                 this.configData, connFactory), 2, 20, timeToLive, tunit);
182         this.pool.setValidateAfterInactivity(2000);
183         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
184         this.isShutDown = new AtomicBoolean(false);
185     }
186 
187     /**
188      * Visible for test.
189      */
190     PoolingHttpClientConnectionManager(
191             final CPool pool,
192             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
193             final SchemePortResolver schemePortResolver,
194             final DnsResolver dnsResolver) {
195         super();
196         this.configData = new ConfigData();
197         this.pool = pool;
198         this.connectionOperator = new DefaultHttpClientConnectionOperator(
199                 socketFactoryRegistry, schemePortResolver, dnsResolver);
200         this.isShutDown = new AtomicBoolean(false);
201     }
202 
203     @Override
204     protected void finalize() throws Throwable {
205         try {
206             shutdown();
207         } finally {
208             super.finalize();
209         }
210     }
211 
212     @Override
213     public void close() {
214         shutdown();
215     }
216 
217     private String format(final HttpRoute route, final Object state) {
218         final StringBuilder buf = new StringBuilder();
219         buf.append("[route: ").append(route).append("]");
220         if (state != null) {
221             buf.append("[state: ").append(state).append("]");
222         }
223         return buf.toString();
224     }
225 
226     private String formatStats(final HttpRoute route) {
227         final StringBuilder buf = new StringBuilder();
228         final PoolStats totals = this.pool.getTotalStats();
229         final PoolStats stats = this.pool.getStats(route);
230         buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
231         buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
232         buf.append(" of ").append(stats.getMax()).append("; ");
233         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
234         buf.append(" of ").append(totals.getMax()).append("]");
235         return buf.toString();
236     }
237 
238     private String format(final CPoolEntry entry) {
239         final StringBuilder buf = new StringBuilder();
240         buf.append("[id: ").append(entry.getId()).append("]");
241         buf.append("[route: ").append(entry.getRoute()).append("]");
242         final Object state = entry.getState();
243         if (state != null) {
244             buf.append("[state: ").append(state).append("]");
245         }
246         return buf.toString();
247     }
248 
249     @Override
250     public ConnectionRequest requestConnection(
251             final HttpRoute route,
252             final Object state) {
253         Args.notNull(route, "HTTP route");
254         if (this.log.isDebugEnabled()) {
255             this.log.debug("Connection request: " + format(route, state) + formatStats(route));
256         }
257         final Future<CPoolEntry> future = this.pool.lease(route, state, null);
258         return new ConnectionRequest() {
259 
260             @Override
261             public boolean cancel() {
262                 return future.cancel(true);
263             }
264 
265             @Override
266             public HttpClientConnection get(
267                     final long timeout,
268                     final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
269                 return leaseConnection(future, timeout, tunit);
270             }
271 
272         };
273 
274     }
275 
276     protected HttpClientConnection leaseConnection(
277             final Future<CPoolEntry> future,
278             final long timeout,
279             final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
280         final CPoolEntry entry;
281         try {
282             entry = future.get(timeout, tunit);
283             if (entry == null || future.isCancelled()) {
284                 throw new InterruptedException();
285             }
286             Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
287             if (this.log.isDebugEnabled()) {
288                 this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
289             }
290             return CPoolProxy.newProxy(entry);
291         } catch (final TimeoutException ex) {
292             throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
293         }
294     }
295 
296     @Override
297     public void releaseConnection(
298             final HttpClientConnection managedConn,
299             final Object state,
300             final long keepalive, final TimeUnit tunit) {
301         Args.notNull(managedConn, "Managed connection");
302         synchronized (managedConn) {
303             final CPoolEntry entry = CPoolProxy.detach(managedConn);
304             if (entry == null) {
305                 return;
306             }
307             final ManagedHttpClientConnection conn = entry.getConnection();
308             try {
309                 if (conn.isOpen()) {
310                     final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
311                     entry.setState(state);
312                     entry.updateExpiry(keepalive, effectiveUnit);
313                     if (this.log.isDebugEnabled()) {
314                         final String s;
315                         if (keepalive > 0) {
316                             s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
317                         } else {
318                             s = "indefinitely";
319                         }
320                         this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
321                     }
322                 }
323             } finally {
324                 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
325                 if (this.log.isDebugEnabled()) {
326                     this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
327                 }
328             }
329         }
330     }
331 
332     @Override
333     public void connect(
334             final HttpClientConnection managedConn,
335             final HttpRoute route,
336             final int connectTimeout,
337             final HttpContext context) throws IOException {
338         Args.notNull(managedConn, "Managed Connection");
339         Args.notNull(route, "HTTP route");
340         final ManagedHttpClientConnection conn;
341         synchronized (managedConn) {
342             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
343             conn = entry.getConnection();
344         }
345         final HttpHost host;
346         if (route.getProxyHost() != null) {
347             host = route.getProxyHost();
348         } else {
349             host = route.getTargetHost();
350         }
351         final InetSocketAddress localAddress = route.getLocalSocketAddress();
352         SocketConfig socketConfig = this.configData.getSocketConfig(host);
353         if (socketConfig == null) {
354             socketConfig = this.configData.getDefaultSocketConfig();
355         }
356         if (socketConfig == null) {
357             socketConfig = SocketConfig.DEFAULT;
358         }
359         this.connectionOperator.connect(
360                 conn, host, localAddress, connectTimeout, socketConfig, context);
361     }
362 
363     @Override
364     public void upgrade(
365             final HttpClientConnection managedConn,
366             final HttpRoute route,
367             final HttpContext context) throws IOException {
368         Args.notNull(managedConn, "Managed Connection");
369         Args.notNull(route, "HTTP route");
370         final ManagedHttpClientConnection conn;
371         synchronized (managedConn) {
372             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
373             conn = entry.getConnection();
374         }
375         this.connectionOperator.upgrade(conn, route.getTargetHost(), context);
376     }
377 
378     @Override
379     public void routeComplete(
380             final HttpClientConnection managedConn,
381             final HttpRoute route,
382             final HttpContext context) throws IOException {
383         Args.notNull(managedConn, "Managed Connection");
384         Args.notNull(route, "HTTP route");
385         synchronized (managedConn) {
386             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
387             entry.markRouteComplete();
388         }
389     }
390 
391     @Override
392     public void shutdown() {
393         if (this.isShutDown.compareAndSet(false, true)) {
394             this.log.debug("Connection manager is shutting down");
395             try {
396                 this.pool.shutdown();
397             } catch (final IOException ex) {
398                 this.log.debug("I/O exception shutting down connection manager", ex);
399             }
400             this.log.debug("Connection manager shut down");
401         }
402     }
403 
404     @Override
405     public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
406         if (this.log.isDebugEnabled()) {
407             this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
408         }
409         this.pool.closeIdle(idleTimeout, tunit);
410     }
411 
412     @Override
413     public void closeExpiredConnections() {
414         this.log.debug("Closing expired connections");
415         this.pool.closeExpired();
416     }
417 
418     protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
419         this.pool.enumAvailable(callback);
420     }
421 
422     protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
423         this.pool.enumLeased(callback);
424     }
425 
426     @Override
427     public int getMaxTotal() {
428         return this.pool.getMaxTotal();
429     }
430 
431     @Override
432     public void setMaxTotal(final int max) {
433         this.pool.setMaxTotal(max);
434     }
435 
436     @Override
437     public int getDefaultMaxPerRoute() {
438         return this.pool.getDefaultMaxPerRoute();
439     }
440 
441     @Override
442     public void setDefaultMaxPerRoute(final int max) {
443         this.pool.setDefaultMaxPerRoute(max);
444     }
445 
446     @Override
447     public int getMaxPerRoute(final HttpRoute route) {
448         return this.pool.getMaxPerRoute(route);
449     }
450 
451     @Override
452     public void setMaxPerRoute(final HttpRoute route, final int max) {
453         this.pool.setMaxPerRoute(route, max);
454     }
455 
456     @Override
457     public PoolStats getTotalStats() {
458         return this.pool.getTotalStats();
459     }
460 
461     @Override
462     public PoolStats getStats(final HttpRoute route) {
463         return this.pool.getStats(route);
464     }
465 
466     /**
467      * @since 4.4
468      */
469     public Set<HttpRoute> getRoutes() {
470         return this.pool.getRoutes();
471     }
472 
473     public SocketConfig getDefaultSocketConfig() {
474         return this.configData.getDefaultSocketConfig();
475     }
476 
477     public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
478         this.configData.setDefaultSocketConfig(defaultSocketConfig);
479     }
480 
481     public ConnectionConfig getDefaultConnectionConfig() {
482         return this.configData.getDefaultConnectionConfig();
483     }
484 
485     public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
486         this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
487     }
488 
489     public SocketConfig getSocketConfig(final HttpHost host) {
490         return this.configData.getSocketConfig(host);
491     }
492 
493     public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
494         this.configData.setSocketConfig(host, socketConfig);
495     }
496 
497     public ConnectionConfig getConnectionConfig(final HttpHost host) {
498         return this.configData.getConnectionConfig(host);
499     }
500 
501     public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
502         this.configData.setConnectionConfig(host, connectionConfig);
503     }
504 
505     /**
506      * @see #setValidateAfterInactivity(int)
507      *
508      * @since 4.4
509      */
510     public int getValidateAfterInactivity() {
511         return pool.getValidateAfterInactivity();
512     }
513 
514     /**
515      * Defines period of inactivity in milliseconds after which persistent connections must
516      * be re-validated prior to being {@link #leaseConnection(java.util.concurrent.Future,
517      *   long, java.util.concurrent.TimeUnit) leased} to the consumer. Non-positive value passed
518      * to this method disables connection validation. This check helps detect connections
519      * that have become stale (half-closed) while kept inactive in the pool.
520      *
521      * @see #leaseConnection(java.util.concurrent.Future, long, java.util.concurrent.TimeUnit)
522      *
523      * @since 4.4
524      */
525     public void setValidateAfterInactivity(final int ms) {
526         pool.setValidateAfterInactivity(ms);
527     }
528 
529     static class ConfigData {
530 
531         private final Map<HttpHost, SocketConfig> socketConfigMap;
532         private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
533         private volatile SocketConfig defaultSocketConfig;
534         private volatile ConnectionConfig defaultConnectionConfig;
535 
536         ConfigData() {
537             super();
538             this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
539             this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
540         }
541 
542         public SocketConfig getDefaultSocketConfig() {
543             return this.defaultSocketConfig;
544         }
545 
546         public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
547             this.defaultSocketConfig = defaultSocketConfig;
548         }
549 
550         public ConnectionConfig getDefaultConnectionConfig() {
551             return this.defaultConnectionConfig;
552         }
553 
554         public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
555             this.defaultConnectionConfig = defaultConnectionConfig;
556         }
557 
558         public SocketConfig getSocketConfig(final HttpHost host) {
559             return this.socketConfigMap.get(host);
560         }
561 
562         public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
563             this.socketConfigMap.put(host, socketConfig);
564         }
565 
566         public ConnectionConfig getConnectionConfig(final HttpHost host) {
567             return this.connectionConfigMap.get(host);
568         }
569 
570         public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
571             this.connectionConfigMap.put(host, connectionConfig);
572         }
573 
574     }
575 
576     static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
577 
578         private final ConfigData configData;
579         private final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory;
580 
581         InternalConnectionFactory(
582                 final ConfigData configData,
583                 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
584             super();
585             this.configData = configData != null ? configData : new ConfigData();
586             this.connFactory = connFactory != null ? connFactory :
587                 ManagedHttpClientConnectionFactory.INSTANCE;
588         }
589 
590         @Override
591         public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
592             ConnectionConfig config = null;
593             if (route.getProxyHost() != null) {
594                 config = this.configData.getConnectionConfig(route.getProxyHost());
595             }
596             if (config == null) {
597                 config = this.configData.getConnectionConfig(route.getTargetHost());
598             }
599             if (config == null) {
600                 config = this.configData.getDefaultConnectionConfig();
601             }
602             if (config == null) {
603                 config = ConnectionConfig.DEFAULT;
604             }
605             return this.connFactory.create(route, config);
606         }
607 
608     }
609 
610 }