View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.conn;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.Map;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.TimeoutException;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.http.HttpClientConnection;
43  import org.apache.http.HttpHost;
44  import org.apache.http.annotation.ThreadSafe;
45  import org.apache.http.config.ConnectionConfig;
46  import org.apache.http.config.Lookup;
47  import org.apache.http.config.Registry;
48  import org.apache.http.config.RegistryBuilder;
49  import org.apache.http.config.SocketConfig;
50  import org.apache.http.conn.ConnectionPoolTimeoutException;
51  import org.apache.http.conn.ConnectionRequest;
52  import org.apache.http.conn.DnsResolver;
53  import org.apache.http.conn.HttpClientConnectionManager;
54  import org.apache.http.conn.HttpClientConnectionOperator;
55  import org.apache.http.conn.HttpConnectionFactory;
56  import org.apache.http.conn.SchemePortResolver;
57  import org.apache.http.conn.ManagedHttpClientConnection;
58  import org.apache.http.conn.routing.HttpRoute;
59  import org.apache.http.conn.socket.ConnectionSocketFactory;
60  import org.apache.http.conn.socket.PlainConnectionSocketFactory;
61  import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
62  import org.apache.http.pool.ConnFactory;
63  import org.apache.http.pool.ConnPoolControl;
64  import org.apache.http.pool.PoolStats;
65  import org.apache.http.protocol.HttpContext;
66  import org.apache.http.util.Args;
67  import org.apache.http.util.Asserts;
68  
69  /**
70   * {@code ClientConnectionPoolManager} maintains a pool of
71   * {@link HttpClientConnection}s and is able to service connection requests
72   * from multiple execution threads. Connections are pooled on a per route
73   * basis. A request for a route which already the manager has persistent
74   * connections for available in the pool will be services by leasing
75   * a connection from the pool rather than creating a brand new connection.
76   * <p/>
77   * {@code ClientConnectionPoolManager} maintains a maximum limit of connection
78   * on a per route basis and in total. Per default this implementation will
79   * create no more than than 2 concurrent connections per given route
80   * and no more 20 connections in total. For many real-world applications
81   * these limits may prove too constraining, especially if they use HTTP
82   * as a transport protocol for their services. Connection limits, however,
83   * can be adjusted using {@link ConnPoolControl} methods.
84   *
85   * The handling of stale connections was changed in version 4.4.
86   * Previously, the code would check every connection by default before re-using it.
87   * The code now only checks the connection if the elapsed time since
88   * the last use of the connection exceeds the timeout that has been set.
89   * The default timeout is set to 5000ms - see
90   * {@link #PoolingHttpClientConnectionManager(HttpClientConnectionOperator, HttpConnectionFactory, long, TimeUnit))}
91   *
92   * @since 4.3
93   */
94  @ThreadSafe
95  public class PoolingHttpClientConnectionManager
96      implements HttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable {
97  
98      private final Log log = LogFactory.getLog(getClass());
99  
100     private final ConfigData configData;
101     private final CPool pool;
102     private final HttpClientConnectionOperator connectionOperator;
103     private final AtomicBoolean isShutDown;
104 
105     private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
106         return RegistryBuilder.<ConnectionSocketFactory>create()
107                 .register("http", PlainConnectionSocketFactory.getSocketFactory())
108                 .register("https", SSLConnectionSocketFactory.getSocketFactory())
109                 .build();
110     }
111 
112     public PoolingHttpClientConnectionManager() {
113         this(getDefaultRegistry());
114     }
115 
116     public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit tunit) {
117         this(getDefaultRegistry(), null, null ,null, timeToLive, tunit);
118     }
119 
120     public PoolingHttpClientConnectionManager(
121             final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
122         this(socketFactoryRegistry, null, null);
123     }
124 
125     public PoolingHttpClientConnectionManager(
126             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
127             final DnsResolver dnsResolver) {
128         this(socketFactoryRegistry, null, dnsResolver);
129     }
130 
131     public PoolingHttpClientConnectionManager(
132             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
133             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
134         this(socketFactoryRegistry, connFactory, null);
135     }
136 
137     public PoolingHttpClientConnectionManager(
138             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
139         this(getDefaultRegistry(), connFactory, null);
140     }
141 
142     public PoolingHttpClientConnectionManager(
143             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
144             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
145             final DnsResolver dnsResolver) {
146         this(socketFactoryRegistry, connFactory, null, dnsResolver, -1, TimeUnit.MILLISECONDS);
147     }
148 
149     public PoolingHttpClientConnectionManager(
150             final Registry<ConnectionSocketFactory> socketFactoryRegistry,
151             final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
152             final SchemePortResolver schemePortResolver,
153             final DnsResolver dnsResolver,
154             final long timeToLive, final TimeUnit tunit) {
155         this(
156             new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
157             connFactory,
158             timeToLive, tunit
159         );
160     }
161 
162     /**
163      * @since 4.4
164      */
165     public PoolingHttpClientConnectionManager(
166         final HttpClientConnectionOperator httpClientConnectionOperator,
167         final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
168         final long timeToLive, final TimeUnit tunit) {
169         super();
170         this.configData = new ConfigData();
171         this.pool = new CPool(new InternalConnectionFactory(
172                 this.configData, connFactory), 2, 20, timeToLive, tunit);
173         this.pool.setValidateAfterInactivity(5000);
174         this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
175         this.isShutDown = new AtomicBoolean(false);
176     }
177 
178     /**
179      * Visible for test.
180      */
181     PoolingHttpClientConnectionManager(
182             final CPool pool,
183             final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
184             final SchemePortResolver schemePortResolver,
185             final DnsResolver dnsResolver) {
186         super();
187         this.configData = new ConfigData();
188         this.pool = pool;
189         this.connectionOperator = new DefaultHttpClientConnectionOperator(
190                 socketFactoryRegistry, schemePortResolver, dnsResolver);
191         this.isShutDown = new AtomicBoolean(false);
192     }
193 
194     @Override
195     protected void finalize() throws Throwable {
196         try {
197             shutdown();
198         } finally {
199             super.finalize();
200         }
201     }
202 
203     @Override
204     public void close() {
205         shutdown();
206     }
207 
208     private String format(final HttpRoute route, final Object state) {
209         final StringBuilder buf = new StringBuilder();
210         buf.append("[route: ").append(route).append("]");
211         if (state != null) {
212             buf.append("[state: ").append(state).append("]");
213         }
214         return buf.toString();
215     }
216 
217     private String formatStats(final HttpRoute route) {
218         final StringBuilder buf = new StringBuilder();
219         final PoolStats totals = this.pool.getTotalStats();
220         final PoolStats stats = this.pool.getStats(route);
221         buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
222         buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
223         buf.append(" of ").append(stats.getMax()).append("; ");
224         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
225         buf.append(" of ").append(totals.getMax()).append("]");
226         return buf.toString();
227     }
228 
229     private String format(final CPoolEntry entry) {
230         final StringBuilder buf = new StringBuilder();
231         buf.append("[id: ").append(entry.getId()).append("]");
232         buf.append("[route: ").append(entry.getRoute()).append("]");
233         final Object state = entry.getState();
234         if (state != null) {
235             buf.append("[state: ").append(state).append("]");
236         }
237         return buf.toString();
238     }
239 
240     @Override
241     public ConnectionRequest requestConnection(
242             final HttpRoute route,
243             final Object state) {
244         Args.notNull(route, "HTTP route");
245         if (this.log.isDebugEnabled()) {
246             this.log.debug("Connection request: " + format(route, state) + formatStats(route));
247         }
248         final Future<CPoolEntry> future = this.pool.lease(route, state, null);
249         return new ConnectionRequest() {
250 
251             @Override
252             public boolean cancel() {
253                 return future.cancel(true);
254             }
255 
256             @Override
257             public HttpClientConnection get(
258                     final long timeout,
259                     final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
260                 return leaseConnection(future, timeout, tunit);
261             }
262 
263         };
264 
265     }
266 
267     protected HttpClientConnection leaseConnection(
268             final Future<CPoolEntry> future,
269             final long timeout,
270             final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
271         final CPoolEntry entry;
272         try {
273             entry = future.get(timeout, tunit);
274             if (entry == null || future.isCancelled()) {
275                 throw new InterruptedException();
276             }
277             Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
278             if (this.log.isDebugEnabled()) {
279                 this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
280             }
281             return CPoolProxy.newProxy(entry);
282         } catch (final TimeoutException ex) {
283             throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
284         }
285     }
286 
287     @Override
288     public void releaseConnection(
289             final HttpClientConnection managedConn,
290             final Object state,
291             final long keepalive, final TimeUnit tunit) {
292         Args.notNull(managedConn, "Managed connection");
293         synchronized (managedConn) {
294             final CPoolEntry entry = CPoolProxy.detach(managedConn);
295             if (entry == null) {
296                 return;
297             }
298             final ManagedHttpClientConnection conn = entry.getConnection();
299             try {
300                 if (conn.isOpen()) {
301                     entry.setState(state);
302                     entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
303                     if (this.log.isDebugEnabled()) {
304                         final String s;
305                         if (keepalive > 0) {
306                             s = "for " + (double) keepalive / 1000 + " seconds";
307                         } else {
308                             s = "indefinitely";
309                         }
310                         this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
311                     }
312                 }
313             } finally {
314                 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
315                 if (this.log.isDebugEnabled()) {
316                     this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
317                 }
318             }
319         }
320     }
321 
322     @Override
323     public void connect(
324             final HttpClientConnection managedConn,
325             final HttpRoute route,
326             final int connectTimeout,
327             final HttpContext context) throws IOException {
328         Args.notNull(managedConn, "Managed Connection");
329         Args.notNull(route, "HTTP route");
330         final ManagedHttpClientConnection conn;
331         synchronized (managedConn) {
332             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
333             conn = entry.getConnection();
334         }
335         final HttpHost host;
336         if (route.getProxyHost() != null) {
337             host = route.getProxyHost();
338         } else {
339             host = route.getTargetHost();
340         }
341         final InetSocketAddress localAddress = route.getLocalSocketAddress();
342         SocketConfig socketConfig = this.configData.getSocketConfig(host);
343         if (socketConfig == null) {
344             socketConfig = this.configData.getDefaultSocketConfig();
345         }
346         if (socketConfig == null) {
347             socketConfig = SocketConfig.DEFAULT;
348         }
349         this.connectionOperator.connect(
350                 conn, host, localAddress, connectTimeout, socketConfig, context);
351     }
352 
353     @Override
354     public void upgrade(
355             final HttpClientConnection managedConn,
356             final HttpRoute route,
357             final HttpContext context) throws IOException {
358         Args.notNull(managedConn, "Managed Connection");
359         Args.notNull(route, "HTTP route");
360         final ManagedHttpClientConnection conn;
361         synchronized (managedConn) {
362             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
363             conn = entry.getConnection();
364         }
365         this.connectionOperator.upgrade(conn, route.getTargetHost(), context);
366     }
367 
368     @Override
369     public void routeComplete(
370             final HttpClientConnection managedConn,
371             final HttpRoute route,
372             final HttpContext context) throws IOException {
373         Args.notNull(managedConn, "Managed Connection");
374         Args.notNull(route, "HTTP route");
375         synchronized (managedConn) {
376             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
377             entry.markRouteComplete();
378         }
379     }
380 
381     @Override
382     public void shutdown() {
383         if (this.isShutDown.compareAndSet(false, true)) {
384             this.log.debug("Connection manager is shutting down");
385             try {
386                 this.pool.shutdown();
387             } catch (final IOException ex) {
388                 this.log.debug("I/O exception shutting down connection manager", ex);
389             }
390             this.log.debug("Connection manager shut down");
391         }
392     }
393 
394     @Override
395     public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
396         if (this.log.isDebugEnabled()) {
397             this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
398         }
399         this.pool.closeIdle(idleTimeout, tunit);
400     }
401 
402     @Override
403     public void closeExpiredConnections() {
404         this.log.debug("Closing expired connections");
405         this.pool.closeExpired();
406     }
407 
408     @Override
409     public int getMaxTotal() {
410         return this.pool.getMaxTotal();
411     }
412 
413     @Override
414     public void setMaxTotal(final int max) {
415         this.pool.setMaxTotal(max);
416     }
417 
418     @Override
419     public int getDefaultMaxPerRoute() {
420         return this.pool.getDefaultMaxPerRoute();
421     }
422 
423     @Override
424     public void setDefaultMaxPerRoute(final int max) {
425         this.pool.setDefaultMaxPerRoute(max);
426     }
427 
428     @Override
429     public int getMaxPerRoute(final HttpRoute route) {
430         return this.pool.getMaxPerRoute(route);
431     }
432 
433     @Override
434     public void setMaxPerRoute(final HttpRoute route, final int max) {
435         this.pool.setMaxPerRoute(route, max);
436     }
437 
438     @Override
439     public PoolStats getTotalStats() {
440         return this.pool.getTotalStats();
441     }
442 
443     @Override
444     public PoolStats getStats(final HttpRoute route) {
445         return this.pool.getStats(route);
446     }
447 
448     public SocketConfig getDefaultSocketConfig() {
449         return this.configData.getDefaultSocketConfig();
450     }
451 
452     public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
453         this.configData.setDefaultSocketConfig(defaultSocketConfig);
454     }
455 
456     public ConnectionConfig getDefaultConnectionConfig() {
457         return this.configData.getDefaultConnectionConfig();
458     }
459 
460     public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
461         this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
462     }
463 
464     public SocketConfig getSocketConfig(final HttpHost host) {
465         return this.configData.getSocketConfig(host);
466     }
467 
468     public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
469         this.configData.setSocketConfig(host, socketConfig);
470     }
471 
472     public ConnectionConfig getConnectionConfig(final HttpHost host) {
473         return this.configData.getConnectionConfig(host);
474     }
475 
476     public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
477         this.configData.setConnectionConfig(host, connectionConfig);
478     }
479 
480     /**
481      * @see #setValidateAfterInactivity(int)
482      *
483      * @since 4.4
484      */
485     public int getValidateAfterInactivity() {
486         return pool.getValidateAfterInactivity();
487     }
488 
489     /**
490      * Defines period of inactivity in milliseconds after which persistent connections must
491      * be re-validated prior to being {@link #leaseConnection(java.util.concurrent.Future,
492      *   long, java.util.concurrent.TimeUnit) leased} to the consumer. Non-positive value passed
493      * to this method disables connection validation. This check helps detect connections
494      * that have become stale (half-closed) while kept inactive in the pool.
495      *
496      * @see #leaseConnection(java.util.concurrent.Future, long, java.util.concurrent.TimeUnit)
497      *
498      * @since 4.4
499      */
500     public void setValidateAfterInactivity(final int ms) {
501         pool.setValidateAfterInactivity(ms);
502     }
503 
504     static class ConfigData {
505 
506         private final Map<HttpHost, SocketConfig> socketConfigMap;
507         private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
508         private volatile SocketConfig defaultSocketConfig;
509         private volatile ConnectionConfig defaultConnectionConfig;
510 
511         ConfigData() {
512             super();
513             this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
514             this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
515         }
516 
517         public SocketConfig getDefaultSocketConfig() {
518             return this.defaultSocketConfig;
519         }
520 
521         public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
522             this.defaultSocketConfig = defaultSocketConfig;
523         }
524 
525         public ConnectionConfig getDefaultConnectionConfig() {
526             return this.defaultConnectionConfig;
527         }
528 
529         public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
530             this.defaultConnectionConfig = defaultConnectionConfig;
531         }
532 
533         public SocketConfig getSocketConfig(final HttpHost host) {
534             return this.socketConfigMap.get(host);
535         }
536 
537         public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
538             this.socketConfigMap.put(host, socketConfig);
539         }
540 
541         public ConnectionConfig getConnectionConfig(final HttpHost host) {
542             return this.connectionConfigMap.get(host);
543         }
544 
545         public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
546             this.connectionConfigMap.put(host, connectionConfig);
547         }
548 
549     }
550 
551     static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
552 
553         private final ConfigData configData;
554         private final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory;
555 
556         InternalConnectionFactory(
557                 final ConfigData configData,
558                 final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
559             super();
560             this.configData = configData != null ? configData : new ConfigData();
561             this.connFactory = connFactory != null ? connFactory :
562                 ManagedHttpClientConnectionFactory.INSTANCE;
563         }
564 
565         @Override
566         public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
567             ConnectionConfig config = null;
568             if (route.getProxyHost() != null) {
569                 config = this.configData.getConnectionConfig(route.getProxyHost());
570             }
571             if (config == null) {
572                 config = this.configData.getConnectionConfig(route.getTargetHost());
573             }
574             if (config == null) {
575                 config = this.configData.getDefaultConnectionConfig();
576             }
577             if (config == null) {
578                 config = ConnectionConfig.DEFAULT;
579             }
580             return this.connFactory.create(route, config);
581         }
582 
583     }
584 
585 }