View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.conn;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.TimeoutException;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.http.HttpClientConnection;
43  import org.apache.http.HttpHost;
44  import org.apache.http.annotation.Contract;
45  import org.apache.http.annotation.ThreadingBehavior;
46  import org.apache.http.config.ConnectionConfig;
47  import org.apache.http.config.Lookup;
48  import org.apache.http.config.Registry;
49  import org.apache.http.config.RegistryBuilder;
50  import org.apache.http.config.SocketConfig;
51  import org.apache.http.conn.ConnectionPoolTimeoutException;
52  import org.apache.http.conn.ConnectionRequest;
53  import org.apache.http.conn.DnsResolver;
54  import org.apache.http.conn.HttpClientConnectionManager;
55  import org.apache.http.conn.HttpClientConnectionOperator;
56  import org.apache.http.conn.HttpConnectionFactory;
57  import org.apache.http.conn.ManagedHttpClientConnection;
58  import org.apache.http.conn.SchemePortResolver;
59  import org.apache.http.conn.routing.HttpRoute;
60  import org.apache.http.conn.socket.ConnectionSocketFactory;
61  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
62  import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
63  import org.apache.http.pool.ConnFactory;
64  import org.apache.http.pool.ConnPoolControl;
65  import org.apache.http.pool.PoolEntryCallback;
66  import org.apache.http.pool.PoolStats;
67  import org.apache.http.protocol.HttpContext;
68  import org.apache.http.util.Args;
69  import org.apache.http.util.Asserts;
70  
71  /**
72   * {@code ClientConnectionPoolManager} maintains a pool of
73   * {@link HttpClientConnection}s and is able to service connection requests
74   * from multiple execution threads. Connections are pooled on a per route
75   * basis. A request for a route which already the manager has persistent
76   * connections for available in the pool will be services by leasing
77   * a connection from the pool rather than creating a brand new connection.
78   * <p>
79   * {@code ClientConnectionPoolManager} maintains a maximum limit of connection
80   * on a per route basis and in total. Per default this implementation will
81   * create no more than than 2 concurrent connections per given route
82   * and no more 20 connections in total. For many real-world applications
83   * these limits may prove too constraining, especially if they use HTTP
84   * as a transport protocol for their services. Connection limits, however,
85   * can be adjusted using {@link ConnPoolControl} methods.
86   * </p>
87   * <p>
88   * Total time to live (TTL) set at construction time defines maximum life span
89   * of persistent connections regardless of their expiration setting. No persistent
90   * connection will be re-used past its TTL value.
91   * </p>
92   * <p>
93   * The handling of stale connections was changed in version 4.4.
94   * Previously, the code would check every connection by default before re-using it.
95   * The code now only checks the connection if the elapsed time since
96   * the last use of the connection exceeds the timeout that has been set.
97   * The default timeout is set to 2000ms
98   * </p>
99   *
100  * @since 4.3
101  */
102 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
103 public class PoolingHttpClientConnectionManager
104     implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
105 
106     private final Log log = LogFactory.getLog(getClass());
107 
108     private final ConfigData configData;
109     private final CPool pool;
110     private final HttpClientConnectionOperator connectionOperator;
111     private final AtomicBoolean isShutDown;
112 
113     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
114         return RegistryBuilder.<ConnectionSocketFactory>create()
115                 .register("http", PlainConnectionSocketFactory.getSocketFactory())
116                 .register("https", SSLConnectionSocketFactory.getSocketFactory())
117                 .build();
118     }
119 
120     public PoolingHttpClientConnectionManager() {
121         this(getDefaultRegistry());
122     }
123 
124     public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit tunit) {
125         this(getDefaultRegistry(), null, null ,null, timeToLive, tunit);
126     }
127 
128     public PoolingHttpClientConnectionManager(
129             final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
130         this(socketFactoryRegistry, null, null);
131     }
132 
133     public PoolingHttpClientConnectionManager(
134             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
135             final DnsResolver dnsResolver) {
136         this(socketFactoryRegistry, null, dnsResolver);
137     }
138 
139     public PoolingHttpClientConnectionManager(
140             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
141             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
142         this(socketFactoryRegistry, connFactory, null);
143     }
144 
145     public PoolingHttpClientConnectionManager(
146             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
147         this(getDefaultRegistry(), connFactory, null);
148     }
149 
150     public PoolingHttpClientConnectionManager(
151             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
152             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
153             final DnsResolver dnsResolver) {
154         this(socketFactoryRegistry, connFactory, null, dnsResolver, -1, TimeUnit.MILLISECONDS);
155     }
156 
157     public PoolingHttpClientConnectionManager(
158             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
159             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
160             final SchemePortResolver schemePortResolver,
161             final DnsResolver dnsResolver,
162             final long timeToLive, final TimeUnit tunit) {
163         this(
164             new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
165             connFactory,
166             timeToLive, tunit
167         );
168     }
169 
170     /**
171      * @since 4.4
172      */
173     public PoolingHttpClientConnectionManager(
174         final HttpClientConnectionOperator httpClientConnectionOperator,
175         final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
176         final long timeToLive, final TimeUnit tunit) {
177         super();
178         this.configData = new ConfigData();
179         this.pool = new CPool(new InternalConnectionFactory(
180                 this.configData, connFactory), 2, 20, timeToLive, tunit);
181         this.pool.setValidateAfterInactivity(2000);
182         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
183         this.isShutDown = new AtomicBoolean(false);
184     }
185 
186     /**
187      * Visible for test.
188      */
189     PoolingHttpClientConnectionManager(
190             final CPool pool,
191             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
192             final SchemePortResolver schemePortResolver,
193             final DnsResolver dnsResolver) {
194         super();
195         this.configData = new ConfigData();
196         this.pool = pool;
197         this.connectionOperator = new DefaultHttpClientConnectionOperator(
198                 socketFactoryRegistry, schemePortResolver, dnsResolver);
199         this.isShutDown = new AtomicBoolean(false);
200     }
201 
202     @Override
203     protected void finalize() throws Throwable {
204         try {
205             shutdown();
206         } finally {
207             super.finalize();
208         }
209     }
210 
211     @Override
212     public void close() {
213         shutdown();
214     }
215 
216     private String format(final HttpRoute route, final Object state) {
217         final StringBuilder buf = new StringBuilder();
218         buf.append("[route: ").append(route).append("]");
219         if (state != null) {
220             buf.append("[state: ").append(state).append("]");
221         }
222         return buf.toString();
223     }
224 
225     private String formatStats(final HttpRoute route) {
226         final StringBuilder buf = new StringBuilder();
227         final PoolStats totals = this.pool.getTotalStats();
228         final PoolStats stats = this.pool.getStats(route);
229         buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
230         buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
231         buf.append(" of ").append(stats.getMax()).append("; ");
232         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
233         buf.append(" of ").append(totals.getMax()).append("]");
234         return buf.toString();
235     }
236 
237     private String format(final CPoolEntry entry) {
238         final StringBuilder buf = new StringBuilder();
239         buf.append("[id: ").append(entry.getId()).append("]");
240         buf.append("[route: ").append(entry.getRoute()).append("]");
241         final Object state = entry.getState();
242         if (state != null) {
243             buf.append("[state: ").append(state).append("]");
244         }
245         return buf.toString();
246     }
247 
248     private SocketConfig resolveSocketConfig(final HttpHost host) {
249         SocketConfig socketConfig = this.configData.getSocketConfig(host);
250         if (socketConfig == null) {
251             socketConfig = this.configData.getDefaultSocketConfig();
252         }
253         if (socketConfig == null) {
254             socketConfig = SocketConfig.DEFAULT;
255         }
256         return socketConfig;
257     }
258 
259     @Override
260     public ConnectionRequest requestConnection(
261             final HttpRoute route,
262             final Object state) {
263         Args.notNull(route, "HTTP route");
264         if (this.log.isDebugEnabled()) {
265             this.log.debug("Connection request: " + format(route, state) + formatStats(route));
266         }
267         final Future<CPoolEntry> future = this.pool.lease(route, state, null);
268         return new ConnectionRequest() {
269 
270             @Override
271             public boolean cancel() {
272                 return future.cancel(true);
273             }
274 
275             @Override
276             public HttpClientConnection get(
277                     final long timeout,
278                     final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
279                 final HttpClientConnection conn = leaseConnection(future, timeout, tunit);
280                 if (conn.isOpen()) {
281                     final HttpHost host;
282                     if (route.getProxyHost() != null) {
283                         host = route.getProxyHost();
284                     } else {
285                         host = route.getTargetHost();
286                     }
287                     final SocketConfig socketConfig = resolveSocketConfig(host);
288                     conn.setSocketTimeout(socketConfig.getSoTimeout());
289                 }
290                 return conn;
291             }
292 
293         };
294 
295     }
296 
297     protected HttpClientConnection leaseConnection(
298             final Future<CPoolEntry> future,
299             final long timeout,
300             final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
301         final CPoolEntry entry;
302         try {
303             entry = future.get(timeout, tunit);
304             if (entry == null || future.isCancelled()) {
305                 throw new InterruptedException();
306             }
307             Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
308             if (this.log.isDebugEnabled()) {
309                 this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
310             }
311             return CPoolProxy.newProxy(entry);
312         } catch (final TimeoutException ex) {
313             throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
314         }
315     }
316 
317     @Override
318     public void releaseConnection(
319             final HttpClientConnection managedConn,
320             final Object state,
321             final long keepalive, final TimeUnit tunit) {
322         Args.notNull(managedConn, "Managed connection");
323         synchronized (managedConn) {
324             final CPoolEntry entry = CPoolProxy.detach(managedConn);
325             if (entry == null) {
326                 return;
327             }
328             final ManagedHttpClientConnection conn = entry.getConnection();
329             try {
330                 if (conn.isOpen()) {
331                     final TimeUnit effectiveUnit = tunit != null ? tunit : TimeUnit.MILLISECONDS;
332                     entry.setState(state);
333                     entry.updateExpiry(keepalive, effectiveUnit);
334                     if (this.log.isDebugEnabled()) {
335                         final String s;
336                         if (keepalive > 0) {
337                             s = "for " + (double) effectiveUnit.toMillis(keepalive) / 1000 + " seconds";
338                         } else {
339                             s = "indefinitely";
340                         }
341                         this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
342                     }
343                     conn.setSocketTimeout(0);
344                 }
345             } finally {
346                 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
347                 if (this.log.isDebugEnabled()) {
348                     this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
349                 }
350             }
351         }
352     }
353 
354     @Override
355     public void connect(
356             final HttpClientConnection managedConn,
357             final HttpRoute route,
358             final int connectTimeout,
359             final HttpContext context) throws IOException {
360         Args.notNull(managedConn, "Managed Connection");
361         Args.notNull(route, "HTTP route");
362         final ManagedHttpClientConnection conn;
363         synchronized (managedConn) {
364             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
365             conn = entry.getConnection();
366         }
367         final HttpHost host;
368         if (route.getProxyHost() != null) {
369             host = route.getProxyHost();
370         } else {
371             host = route.getTargetHost();
372         }
373         this.connectionOperator.connect(
374                 conn, host, route.getLocalSocketAddress(), connectTimeout, resolveSocketConfig(host), context);
375     }
376 
377     @Override
378     public void upgrade(
379             final HttpClientConnection managedConn,
380             final HttpRoute route,
381             final HttpContext context) throws IOException {
382         Args.notNull(managedConn, "Managed Connection");
383         Args.notNull(route, "HTTP route");
384         final ManagedHttpClientConnection conn;
385         synchronized (managedConn) {
386             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
387             conn = entry.getConnection();
388         }
389         this.connectionOperator.upgrade(conn, route.getTargetHost(), context);
390     }
391 
392     @Override
393     public void routeComplete(
394             final HttpClientConnection managedConn,
395             final HttpRoute route,
396             final HttpContext context) throws IOException {
397         Args.notNull(managedConn, "Managed Connection");
398         Args.notNull(route, "HTTP route");
399         synchronized (managedConn) {
400             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
401             entry.markRouteComplete();
402         }
403     }
404 
405     @Override
406     public void shutdown() {
407         if (this.isShutDown.compareAndSet(false, true)) {
408             this.log.debug("Connection manager is shutting down");
409             try {
410                 this.pool.shutdown();
411             } catch (final IOException ex) {
412                 this.log.debug("I/O exception shutting down connection manager", ex);
413             }
414             this.log.debug("Connection manager shut down");
415         }
416     }
417 
418     @Override
419     public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
420         if (this.log.isDebugEnabled()) {
421             this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
422         }
423         this.pool.closeIdle(idleTimeout, tunit);
424     }
425 
426     @Override
427     public void closeExpiredConnections() {
428         this.log.debug("Closing expired connections");
429         this.pool.closeExpired();
430     }
431 
432     protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
433         this.pool.enumAvailable(callback);
434     }
435 
436     protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {
437         this.pool.enumLeased(callback);
438     }
439 
440     @Override
441     public int getMaxTotal() {
442         return this.pool.getMaxTotal();
443     }
444 
445     @Override
446     public void setMaxTotal(final int max) {
447         this.pool.setMaxTotal(max);
448     }
449 
450     @Override
451     public int getDefaultMaxPerRoute() {
452         return this.pool.getDefaultMaxPerRoute();
453     }
454 
455     @Override
456     public void setDefaultMaxPerRoute(final int max) {
457         this.pool.setDefaultMaxPerRoute(max);
458     }
459 
460     @Override
461     public int getMaxPerRoute(final HttpRoute route) {
462         return this.pool.getMaxPerRoute(route);
463     }
464 
465     @Override
466     public void setMaxPerRoute(final HttpRoute route, final int max) {
467         this.pool.setMaxPerRoute(route, max);
468     }
469 
470     @Override
471     public PoolStats getTotalStats() {
472         return this.pool.getTotalStats();
473     }
474 
475     @Override
476     public PoolStats getStats(final HttpRoute route) {
477         return this.pool.getStats(route);
478     }
479 
480     /**
481      * @since 4.4
482      */
483     public Set<HttpRoute> getRoutes() {
484         return this.pool.getRoutes();
485     }
486 
487     public SocketConfig getDefaultSocketConfig() {
488         return this.configData.getDefaultSocketConfig();
489     }
490 
491     public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
492         this.configData.setDefaultSocketConfig(defaultSocketConfig);
493     }
494 
495     public ConnectionConfig getDefaultConnectionConfig() {
496         return this.configData.getDefaultConnectionConfig();
497     }
498 
499     public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
500         this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
501     }
502 
503     public SocketConfig getSocketConfig(final HttpHost host) {
504         return this.configData.getSocketConfig(host);
505     }
506 
507     public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
508         this.configData.setSocketConfig(host, socketConfig);
509     }
510 
511     public ConnectionConfig getConnectionConfig(final HttpHost host) {
512         return this.configData.getConnectionConfig(host);
513     }
514 
515     public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
516         this.configData.setConnectionConfig(host, connectionConfig);
517     }
518 
519     /**
520      * @see #setValidateAfterInactivity(int)
521      *
522      * @since 4.4
523      */
524     public int getValidateAfterInactivity() {
525         return pool.getValidateAfterInactivity();
526     }
527 
528     /**
529      * Defines period of inactivity in milliseconds after which persistent connections must
530      * be re-validated prior to being {@link #leaseConnection(java.util.concurrent.Future,
531      *   long, java.util.concurrent.TimeUnit) leased} to the consumer. Non-positive value passed
532      * to this method disables connection validation. This check helps detect connections
533      * that have become stale (half-closed) while kept inactive in the pool.
534      *
535      * @see #leaseConnection(java.util.concurrent.Future, long, java.util.concurrent.TimeUnit)
536      *
537      * @since 4.4
538      */
539     public void setValidateAfterInactivity(final int ms) {
540         pool.setValidateAfterInactivity(ms);
541     }
542 
543     static class ConfigData {
544 
545         private final Map<HttpHost, SocketConfig> socketConfigMap;
546         private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
547         private volatile SocketConfig defaultSocketConfig;
548         private volatile ConnectionConfig defaultConnectionConfig;
549 
550         ConfigData() {
551             super();
552             this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
553             this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
554         }
555 
556         public SocketConfig getDefaultSocketConfig() {
557             return this.defaultSocketConfig;
558         }
559 
560         public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
561             this.defaultSocketConfig = defaultSocketConfig;
562         }
563 
564         public ConnectionConfig getDefaultConnectionConfig() {
565             return this.defaultConnectionConfig;
566         }
567 
568         public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
569             this.defaultConnectionConfig = defaultConnectionConfig;
570         }
571 
572         public SocketConfig getSocketConfig(final HttpHost host) {
573             return this.socketConfigMap.get(host);
574         }
575 
576         public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
577             this.socketConfigMap.put(host, socketConfig);
578         }
579 
580         public ConnectionConfig getConnectionConfig(final HttpHost host) {
581             return this.connectionConfigMap.get(host);
582         }
583 
584         public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
585             this.connectionConfigMap.put(host, connectionConfig);
586         }
587 
588     }
589 
590     static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
591 
592         private final ConfigData configData;
593         private final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory;
594 
595         InternalConnectionFactory(
596                 final ConfigData configData,
597                 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
598             super();
599             this.configData = configData != null ? configData : new ConfigData();
600             this.connFactory = connFactory != null ? connFactory :
601                 ManagedHttpClientConnectionFactory.INSTANCE;
602         }
603 
604         @Override
605         public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
606             ConnectionConfig config = null;
607             if (route.getProxyHost() != null) {
608                 config = this.configData.getConnectionConfig(route.getProxyHost());
609             }
610             if (config == null) {
611                 config = this.configData.getConnectionConfig(route.getTargetHost());
612             }
613             if (config == null) {
614                 config = this.configData.getDefaultConnectionConfig();
615             }
616             if (config == null) {
617                 config = ConnectionConfig.DEFAULT;
618             }
619             return this.connFactory.create(route, config);
620         }
621 
622     }
623 
624 }