View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.conn;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.http.HttpClientConnection;
44  import org.apache.http.HttpHost;
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.config.ConnectionConfig;
47  import org.apache.http.config.Lookup;
48  import org.apache.http.config.Registry;
49  import org.apache.http.config.RegistryBuilder;
50  import org.apache.http.config.SocketConfig;
51  import org.apache.http.conn.ConnectionPoolTimeoutException;
52  import org.apache.http.conn.ConnectionRequest;
53  import org.apache.http.conn.DnsResolver;
54  import org.apache.http.conn.HttpClientConnectionManager;
55  import org.apache.http.conn.HttpClientConnectionOperator;
56  import org.apache.http.conn.HttpConnectionFactory;
57  import org.apache.http.conn.SchemePortResolver;
58  import org.apache.http.conn.ManagedHttpClientConnection;
59  import org.apache.http.conn.routing.HttpRoute;
60  import org.apache.http.conn.socket.ConnectionSocketFactory;
61  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
62  import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
63  import org.apache.http.pool.ConnFactory;
64  import org.apache.http.pool.ConnPoolControl;
65  import org.apache.http.pool.PoolStats;
66  import org.apache.http.protocol.HttpContext;
67  import org.apache.http.util.Args;
68  import org.apache.http.util.Asserts;
69  
70  /**
71   * {@code ClientConnectionPoolManager} maintains a pool of
72   * {@link HttpClientConnection}s and is able to service connection requests
73   * from multiple execution threads. Connections are pooled on a per route
74   * basis. A request for a route which already the manager has persistent
75   * connections for available in the pool will be services by leasing
76   * a connection from the pool rather than creating a brand new connection.
77   * <p>
78   * {@code ClientConnectionPoolManager} maintains a maximum limit of connection
79   * on a per route basis and in total. Per default this implementation will
80   * create no more than than 2 concurrent connections per given route
81   * and no more 20 connections in total. For many real-world applications
82   * these limits may prove too constraining, especially if they use HTTP
83   * as a transport protocol for their services. Connection limits, however,
84   * can be adjusted using {@link ConnPoolControl} methods.
85   * </p>
86   * <p>
87   * The handling of stale connections was changed in version 4.4.
88   * Previously, the code would check every connection by default before re-using it.
89   * The code now only checks the connection if the elapsed time since
90   * the last use of the connection exceeds the timeout that has been set.
91   * The default timeout is set to 5000ms - see
92   * {@link #PoolingHttpClientConnectionManager(HttpClientConnectionOperator, HttpConnectionFactory, long, TimeUnit)}
93   * </p>
94   *
95   * @since 4.3
96   */
97  @ThreadSafe
98  public class PoolingHttpClientConnectionManager
99      implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
100 
101     private final Log log = LogFactory.getLog(getClass());
102 
103     private final ConfigData configData;
104     private final CPool pool;
105     private final HttpClientConnectionOperator connectionOperator;
106     private final AtomicBoolean isShutDown;
107 
108     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
109         return RegistryBuilder.<ConnectionSocketFactory>create()
110                 .register("http", PlainConnectionSocketFactory.getSocketFactory())
111                 .register("https", SSLConnectionSocketFactory.getSocketFactory())
112                 .build();
113     }
114 
115     public PoolingHttpClientConnectionManager() {
116         this(getDefaultRegistry());
117     }
118 
119     public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit tunit) {
120         this(getDefaultRegistry(), null, null ,null, timeToLive, tunit);
121     }
122 
123     public PoolingHttpClientConnectionManager(
124             final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
125         this(socketFactoryRegistry, null, null);
126     }
127 
128     public PoolingHttpClientConnectionManager(
129             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
130             final DnsResolver dnsResolver) {
131         this(socketFactoryRegistry, null, dnsResolver);
132     }
133 
134     public PoolingHttpClientConnectionManager(
135             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
136             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
137         this(socketFactoryRegistry, connFactory, null);
138     }
139 
140     public PoolingHttpClientConnectionManager(
141             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
142         this(getDefaultRegistry(), connFactory, null);
143     }
144 
145     public PoolingHttpClientConnectionManager(
146             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
147             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
148             final DnsResolver dnsResolver) {
149         this(socketFactoryRegistry, connFactory, null, dnsResolver, -1, TimeUnit.MILLISECONDS);
150     }
151 
152     public PoolingHttpClientConnectionManager(
153             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
154             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
155             final SchemePortResolver schemePortResolver,
156             final DnsResolver dnsResolver,
157             final long timeToLive, final TimeUnit tunit) {
158         this(
159             new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
160             connFactory,
161             timeToLive, tunit
162         );
163     }
164 
165     /**
166      * @since 4.4
167      */
168     public PoolingHttpClientConnectionManager(
169         final HttpClientConnectionOperator httpClientConnectionOperator,
170         final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
171         final long timeToLive, final TimeUnit tunit) {
172         super();
173         this.configData = new ConfigData();
174         this.pool = new CPool(new InternalConnectionFactory(
175                 this.configData, connFactory), 2, 20, timeToLive, tunit);
176         this.pool.setValidateAfterInactivity(5000);
177         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
178         this.isShutDown = new AtomicBoolean(false);
179     }
180 
181     /**
182      * Visible for test.
183      */
184     PoolingHttpClientConnectionManager(
185             final CPool pool,
186             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
187             final SchemePortResolver schemePortResolver,
188             final DnsResolver dnsResolver) {
189         super();
190         this.configData = new ConfigData();
191         this.pool = pool;
192         this.connectionOperator = new DefaultHttpClientConnectionOperator(
193                 socketFactoryRegistry, schemePortResolver, dnsResolver);
194         this.isShutDown = new AtomicBoolean(false);
195     }
196 
197     @Override
198     protected void finalize() throws Throwable {
199         try {
200             shutdown();
201         } finally {
202             super.finalize();
203         }
204     }
205 
206     @Override
207     public void close() {
208         shutdown();
209     }
210 
211     private String format(final HttpRoute route, final Object state) {
212         final StringBuilder buf = new StringBuilder();
213         buf.append("[route: ").append(route).append("]");
214         if (state != null) {
215             buf.append("[state: ").append(state).append("]");
216         }
217         return buf.toString();
218     }
219 
220     private String formatStats(final HttpRoute route) {
221         final StringBuilder buf = new StringBuilder();
222         final PoolStats totals = this.pool.getTotalStats();
223         final PoolStats stats = this.pool.getStats(route);
224         buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
225         buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
226         buf.append(" of ").append(stats.getMax()).append("; ");
227         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
228         buf.append(" of ").append(totals.getMax()).append("]");
229         return buf.toString();
230     }
231 
232     private String format(final CPoolEntry entry) {
233         final StringBuilder buf = new StringBuilder();
234         buf.append("[id: ").append(entry.getId()).append("]");
235         buf.append("[route: ").append(entry.getRoute()).append("]");
236         final Object state = entry.getState();
237         if (state != null) {
238             buf.append("[state: ").append(state).append("]");
239         }
240         return buf.toString();
241     }
242 
243     @Override
244     public ConnectionRequest requestConnection(
245             final HttpRoute route,
246             final Object state) {
247         Args.notNull(route, "HTTP route");
248         if (this.log.isDebugEnabled()) {
249             this.log.debug("Connection request: " + format(route, state) + formatStats(route));
250         }
251         final Future<CPoolEntry> future = this.pool.lease(route, state, null);
252         return new ConnectionRequest() {
253 
254             @Override
255             public boolean cancel() {
256                 return future.cancel(true);
257             }
258 
259             @Override
260             public HttpClientConnection get(
261                     final long timeout,
262                     final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
263                 return leaseConnection(future, timeout, tunit);
264             }
265 
266         };
267 
268     }
269 
270     protected HttpClientConnection leaseConnection(
271             final Future<CPoolEntry> future,
272             final long timeout,
273             final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
274         final CPoolEntry entry;
275         try {
276             entry = future.get(timeout, tunit);
277             if (entry == null || future.isCancelled()) {
278                 throw new InterruptedException();
279             }
280             Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
281             if (this.log.isDebugEnabled()) {
282                 this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
283             }
284             return CPoolProxy.newProxy(entry);
285         } catch (final TimeoutException ex) {
286             throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
287         }
288     }
289 
290     @Override
291     public void releaseConnection(
292             final HttpClientConnection managedConn,
293             final Object state,
294             final long keepalive, final TimeUnit tunit) {
295         Args.notNull(managedConn, "Managed connection");
296         synchronized (managedConn) {
297             final CPoolEntry entry = CPoolProxy.detach(managedConn);
298             if (entry == null) {
299                 return;
300             }
301             final ManagedHttpClientConnection conn = entry.getConnection();
302             try {
303                 if (conn.isOpen()) {
304                     final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
305                     entry.setState(state);
306                     entry.updateExpiry(keepalive, effectiveUnit);
307                     if (this.log.isDebugEnabled()) {
308                         final String s;
309                         if (keepalive > 0) {
310                             s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
311                         } else {
312                             s = "indefinitely";
313                         }
314                         this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
315                     }
316                 }
317             } finally {
318                 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
319                 if (this.log.isDebugEnabled()) {
320                     this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
321                 }
322             }
323         }
324     }
325 
326     @Override
327     public void connect(
328             final HttpClientConnection managedConn,
329             final HttpRoute route,
330             final int connectTimeout,
331             final HttpContext context) throws IOException {
332         Args.notNull(managedConn, "Managed Connection");
333         Args.notNull(route, "HTTP route");
334         final ManagedHttpClientConnection conn;
335         synchronized (managedConn) {
336             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
337             conn = entry.getConnection();
338         }
339         final HttpHost host;
340         if (route.getProxyHost() != null) {
341             host = route.getProxyHost();
342         } else {
343             host = route.getTargetHost();
344         }
345         final InetSocketAddress localAddress = route.getLocalSocketAddress();
346         SocketConfig socketConfig = this.configData.getSocketConfig(host);
347         if (socketConfig == null) {
348             socketConfig = this.configData.getDefaultSocketConfig();
349         }
350         if (socketConfig == null) {
351             socketConfig = SocketConfig.DEFAULT;
352         }
353         this.connectionOperator.connect(
354                 conn, host, localAddress, connectTimeout, socketConfig, context);
355     }
356 
357     @Override
358     public void upgrade(
359             final HttpClientConnection managedConn,
360             final HttpRoute route,
361             final HttpContext context) throws IOException {
362         Args.notNull(managedConn, "Managed Connection");
363         Args.notNull(route, "HTTP route");
364         final ManagedHttpClientConnection conn;
365         synchronized (managedConn) {
366             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
367             conn = entry.getConnection();
368         }
369         this.connectionOperator.upgrade(conn, route.getTargetHost(), context);
370     }
371 
372     @Override
373     public void routeComplete(
374             final HttpClientConnection managedConn,
375             final HttpRoute route,
376             final HttpContext context) throws IOException {
377         Args.notNull(managedConn, "Managed Connection");
378         Args.notNull(route, "HTTP route");
379         synchronized (managedConn) {
380             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
381             entry.markRouteComplete();
382         }
383     }
384 
385     @Override
386     public void shutdown() {
387         if (this.isShutDown.compareAndSet(false, true)) {
388             this.log.debug("Connection manager is shutting down");
389             try {
390                 this.pool.shutdown();
391             } catch (final IOException ex) {
392                 this.log.debug("I/O exception shutting down connection manager", ex);
393             }
394             this.log.debug("Connection manager shut down");
395         }
396     }
397 
398     @Override
399     public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
400         if (this.log.isDebugEnabled()) {
401             this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
402         }
403         this.pool.closeIdle(idleTimeout, tunit);
404     }
405 
406     @Override
407     public void closeExpiredConnections() {
408         this.log.debug("Closing expired connections");
409         this.pool.closeExpired();
410     }
411 
412     @Override
413     public int getMaxTotal() {
414         return this.pool.getMaxTotal();
415     }
416 
417     @Override
418     public void setMaxTotal(final int max) {
419         this.pool.setMaxTotal(max);
420     }
421 
422     @Override
423     public int getDefaultMaxPerRoute() {
424         return this.pool.getDefaultMaxPerRoute();
425     }
426 
427     @Override
428     public void setDefaultMaxPerRoute(final int max) {
429         this.pool.setDefaultMaxPerRoute(max);
430     }
431 
432     @Override
433     public int getMaxPerRoute(final HttpRoute route) {
434         return this.pool.getMaxPerRoute(route);
435     }
436 
437     @Override
438     public void setMaxPerRoute(final HttpRoute route, final int max) {
439         this.pool.setMaxPerRoute(route, max);
440     }
441 
442     @Override
443     public PoolStats getTotalStats() {
444         return this.pool.getTotalStats();
445     }
446 
447     @Override
448     public PoolStats getStats(final HttpRoute route) {
449         return this.pool.getStats(route);
450     }
451 
452     /**
453      * @since 4.4
454      */
455     public Set<HttpRoute> getRoutes() {
456         return this.pool.getRoutes();
457     }
458 
459     public SocketConfig getDefaultSocketConfig() {
460         return this.configData.getDefaultSocketConfig();
461     }
462 
463     public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
464         this.configData.setDefaultSocketConfig(defaultSocketConfig);
465     }
466 
467     public ConnectionConfig getDefaultConnectionConfig() {
468         return this.configData.getDefaultConnectionConfig();
469     }
470 
471     public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
472         this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
473     }
474 
475     public SocketConfig getSocketConfig(final HttpHost host) {
476         return this.configData.getSocketConfig(host);
477     }
478 
479     public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
480         this.configData.setSocketConfig(host, socketConfig);
481     }
482 
483     public ConnectionConfig getConnectionConfig(final HttpHost host) {
484         return this.configData.getConnectionConfig(host);
485     }
486 
487     public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
488         this.configData.setConnectionConfig(host, connectionConfig);
489     }
490 
491     /**
492      * @see #setValidateAfterInactivity(int)
493      *
494      * @since 4.4
495      */
496     public int getValidateAfterInactivity() {
497         return pool.getValidateAfterInactivity();
498     }
499 
500     /**
501      * Defines period of inactivity in milliseconds after which persistent connections must
502      * be re-validated prior to being {@link #leaseConnection(java.util.concurrent.Future,
503      *   long, java.util.concurrent.TimeUnit) leased} to the consumer. Non-positive value passed
504      * to this method disables connection validation. This check helps detect connections
505      * that have become stale (half-closed) while kept inactive in the pool.
506      *
507      * @see #leaseConnection(java.util.concurrent.Future, long, java.util.concurrent.TimeUnit)
508      *
509      * @since 4.4
510      */
511     public void setValidateAfterInactivity(final int ms) {
512         pool.setValidateAfterInactivity(ms);
513     }
514 
515     static class ConfigData {
516 
517         private final Map<HttpHost, SocketConfig> socketConfigMap;
518         private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
519         private volatile SocketConfig defaultSocketConfig;
520         private volatile ConnectionConfig defaultConnectionConfig;
521 
522         ConfigData() {
523             super();
524             this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
525             this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
526         }
527 
528         public SocketConfig getDefaultSocketConfig() {
529             return this.defaultSocketConfig;
530         }
531 
532         public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
533             this.defaultSocketConfig = defaultSocketConfig;
534         }
535 
536         public ConnectionConfig getDefaultConnectionConfig() {
537             return this.defaultConnectionConfig;
538         }
539 
540         public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
541             this.defaultConnectionConfig = defaultConnectionConfig;
542         }
543 
544         public SocketConfig getSocketConfig(final HttpHost host) {
545             return this.socketConfigMap.get(host);
546         }
547 
548         public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
549             this.socketConfigMap.put(host, socketConfig);
550         }
551 
552         public ConnectionConfig getConnectionConfig(final HttpHost host) {
553             return this.connectionConfigMap.get(host);
554         }
555 
556         public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
557             this.connectionConfigMap.put(host, connectionConfig);
558         }
559 
560     }
561 
562     static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
563 
564         private final ConfigData configData;
565         private final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory;
566 
567         InternalConnectionFactory(
568                 final ConfigData configData,
569                 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
570             super();
571             this.configData = configData != null ? configData : new ConfigData();
572             this.connFactory = connFactory != null ? connFactory :
573                 ManagedHttpClientConnectionFactory.INSTANCE;
574         }
575 
576         @Override
577         public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
578             ConnectionConfig config = null;
579             if (route.getProxyHost() != null) {
580                 config = this.configData.getConnectionConfig(route.getProxyHost());
581             }
582             if (config == null) {
583                 config = this.configData.getConnectionConfig(route.getTargetHost());
584             }
585             if (config == null) {
586                 config = this.configData.getDefaultConnectionConfig();
587             }
588             if (config == null) {
589                 config = ConnectionConfig.DEFAULT;
590             }
591             return this.connFactory.create(route, config);
592         }
593 
594     }
595 
596 }