View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.impl.nio.conn;
28  
29  import java.io.IOException;
30  import java.net.InetAddress;
31  import java.net.InetSocketAddress;
32  import java.net.SocketAddress;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.http.HttpHost;
44  import org.apache.http.annotation.Contract;
45  import org.apache.http.annotation.ThreadingBehavior;
46  import org.apache.http.concurrent.BasicFuture;
47  import org.apache.http.concurrent.FutureCallback;
48  import org.apache.http.config.ConnectionConfig;
49  import org.apache.http.config.Lookup;
50  import org.apache.http.config.Registry;
51  import org.apache.http.config.RegistryBuilder;
52  import org.apache.http.conn.DnsResolver;
53  import org.apache.http.conn.SchemePortResolver;
54  import org.apache.http.conn.UnsupportedSchemeException;
55  import org.apache.http.conn.routing.HttpRoute;
56  import org.apache.http.impl.conn.DefaultSchemePortResolver;
57  import org.apache.http.impl.conn.SystemDefaultDnsResolver;
58  import org.apache.http.nio.NHttpClientConnection;
59  import org.apache.http.nio.conn.ManagedNHttpClientConnection;
60  import org.apache.http.nio.conn.NHttpClientConnectionManager;
61  import org.apache.http.nio.conn.NHttpConnectionFactory;
62  import org.apache.http.nio.conn.NoopIOSessionStrategy;
63  import org.apache.http.nio.conn.SchemeIOSessionStrategy;
64  import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
65  import org.apache.http.nio.pool.NIOConnFactory;
66  import org.apache.http.nio.pool.SocketAddressResolver;
67  import org.apache.http.nio.reactor.ConnectingIOReactor;
68  import org.apache.http.nio.reactor.IOEventDispatch;
69  import org.apache.http.nio.reactor.IOSession;
70  import org.apache.http.pool.ConnPoolControl;
71  import org.apache.http.pool.PoolStats;
72  import org.apache.http.protocol.HttpContext;
73  import org.apache.http.util.Args;
74  import org.apache.http.util.Asserts;
75  
76  /**
77   * {@code PoolingNHttpClientConnectionManager} maintains a pool of
78   * {@link NHttpClientConnection}s and is able to service connection requests
79   * from multiple execution threads. Connections are pooled on a per route
80   * basis. A request for a route which already the manager has persistent
81   * connections for available in the pool will be services by leasing
82   * a connection from the pool rather than creating a brand new connection.
83   * <p>
84   * {@code PoolingNHttpClientConnectionManager} maintains a maximum limit
85   * of connection on a per route basis and in total. Per default this
86   * implementation will create no more than than 2 concurrent connections
87   * per given route and no more 20 connections in total. For many real-world
88   * applications these limits may prove too constraining, especially if they
89   * use HTTP as a transport protocol for their services. Connection limits,
90   * however, can be adjusted using {@link ConnPoolControl} methods.
91   *
92   * @since 4.0
93   */
94  @Contract(threading = ThreadingBehavior.SAFE)
95  public class PoolingNHttpClientConnectionManager
96         implements NHttpClientConnectionManager, ConnPoolControl<HttpRoute> {
97  
98      private final Log log = LogFactory.getLog(getClass());
99  
100     static final String IOSESSION_FACTORY_REGISTRY = "http.iosession-factory-registry";
101 
102     private final ConnectingIOReactor ioreactor;
103     private final ConfigData configData;
104     private final CPool pool;
105     private final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry;
106 
107     private static Registry<SchemeIOSessionStrategy> getDefaultRegistry() {
108         return RegistryBuilder.<SchemeIOSessionStrategy>create()
109                 .register("http", NoopIOSessionStrategy.INSTANCE)
110                 .register("https", SSLIOSessionStrategy.getDefaultStrategy())
111                 .build();
112     }
113 
114     public PoolingNHttpClientConnectionManager(final ConnectingIOReactor ioreactor) {
115         this(ioreactor, getDefaultRegistry());
116     }
117 
118     public PoolingNHttpClientConnectionManager(
119             final ConnectingIOReactor ioreactor,
120             final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
121         this(ioreactor, null, iosessionFactoryRegistry, null);
122     }
123 
124     public PoolingNHttpClientConnectionManager(
125             final ConnectingIOReactor ioreactor,
126             final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
127             final DnsResolver dnsResolver) {
128         this(ioreactor, connFactory, getDefaultRegistry(), dnsResolver);
129     }
130 
131     public PoolingNHttpClientConnectionManager(
132             final ConnectingIOReactor ioreactor,
133             final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory) {
134         this(ioreactor, connFactory, getDefaultRegistry(), null);
135     }
136 
137     public PoolingNHttpClientConnectionManager(
138             final ConnectingIOReactor ioreactor,
139             final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
140             final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
141         this(ioreactor, connFactory, iosessionFactoryRegistry, null);
142     }
143 
144     public PoolingNHttpClientConnectionManager(
145             final ConnectingIOReactor ioreactor,
146             final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
147             final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
148             final DnsResolver dnsResolver) {
149         this(ioreactor, connFactory, iosessionFactoryRegistry, null, dnsResolver,
150             -1, TimeUnit.MILLISECONDS);
151     }
152 
153     public PoolingNHttpClientConnectionManager(
154             final ConnectingIOReactor ioreactor,
155             final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
156             final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
157             final SchemePortResolver schemePortResolver,
158             final DnsResolver dnsResolver,
159             final long timeToLive, final TimeUnit tunit) {
160         super();
161         Args.notNull(ioreactor, "I/O reactor");
162         Args.notNull(iosessionFactoryRegistry, "I/O session factory registry");
163         this.ioreactor = ioreactor;
164         this.configData = new ConfigData();
165         this.pool = new CPool(ioreactor,
166             new InternalConnectionFactory(this.configData, connFactory),
167             new InternalAddressResolver(schemePortResolver, dnsResolver),
168             2, 20, timeToLive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
169         this.iosessionFactoryRegistry = iosessionFactoryRegistry;
170     }
171 
172     PoolingNHttpClientConnectionManager(
173             final ConnectingIOReactor ioreactor,
174             final CPool pool,
175             final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
176         super();
177         this.ioreactor = ioreactor;
178         this.configData = new ConfigData();
179         this.pool = pool;
180         this.iosessionFactoryRegistry = iosessionFactoryRegistry;
181     }
182 
183     @Override
184     protected void finalize() throws Throwable {
185         try {
186             shutdown();
187         } finally {
188             super.finalize();
189         }
190     }
191 
192     @Override
193     public void execute(final IOEventDispatch eventDispatch) throws IOException {
194         this.ioreactor.execute(eventDispatch);
195     }
196 
197     public void shutdown(final long waitMs) throws IOException {
198         this.log.debug("Connection manager is shutting down");
199         this.pool.shutdown(waitMs);
200         this.log.debug("Connection manager shut down");
201     }
202 
203     @Override
204     public void shutdown() throws IOException {
205         this.log.debug("Connection manager is shutting down");
206         this.pool.shutdown(2000);
207         this.log.debug("Connection manager shut down");
208     }
209 
210     private String format(final HttpRoute route, final Object state) {
211         final StringBuilder buf = new StringBuilder();
212         buf.append("[route: ").append(route).append("]");
213         if (state != null) {
214             buf.append("[state: ").append(state).append("]");
215         }
216         return buf.toString();
217     }
218 
219     private String formatStats(final HttpRoute route) {
220         final StringBuilder buf = new StringBuilder();
221         final PoolStats totals = this.pool.getTotalStats();
222         final PoolStats stats = this.pool.getStats(route);
223         buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
224         buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
225         buf.append(" of ").append(stats.getMax()).append("; ");
226         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
227         buf.append(" of ").append(totals.getMax()).append("]");
228         return buf.toString();
229     }
230 
231     private String format(final CPoolEntry entry) {
232         final StringBuilder buf = new StringBuilder();
233         buf.append("[id: ").append(entry.getId()).append("]");
234         buf.append("[route: ").append(entry.getRoute()).append("]");
235         final Object state = entry.getState();
236         if (state != null) {
237             buf.append("[state: ").append(state).append("]");
238         }
239         return buf.toString();
240     }
241 
242     @Override
243     public Future<NHttpClientConnection> requestConnection(
244             final HttpRoute route,
245             final Object state,
246             final long connectTimeout,
247             final long leaseTimeout,
248             final TimeUnit tunit,
249             final FutureCallback<NHttpClientConnection> callback) {
250         Args.notNull(route, "HTTP route");
251         if (this.log.isDebugEnabled()) {
252             this.log.debug("Connection request: " + format(route, state) + formatStats(route));
253         }
254         final BasicFuture<NHttpClientConnection> resultFuture = new BasicFuture<NHttpClientConnection>(callback);
255         final HttpHost host;
256         if (route.getProxyHost() != null) {
257             host = route.getProxyHost();
258         } else {
259             host = route.getTargetHost();
260         }
261         final SchemeIOSessionStrategy sf = this.iosessionFactoryRegistry.lookup(
262                 host.getSchemeName());
263         if (sf == null) {
264             resultFuture.failed(new UnsupportedSchemeException(host.getSchemeName() +
265                     " protocol is not supported"));
266             return resultFuture;
267         }
268         final Future<CPoolEntry> leaseFuture = this.pool.lease(route, state,
269                 connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS,
270                 new FutureCallback<CPoolEntry>() {
271 
272                     @Override
273                     public void completed(final CPoolEntry entry) {
274                         Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
275                         if (log.isDebugEnabled()) {
276                             log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
277                         }
278                         final NHttpClientConnection managedConn = CPoolProxy.newProxy(entry);
279                         if (!resultFuture.completed(managedConn)) {
280                             pool.release(entry, true);
281                         }
282                     }
283 
284                     @Override
285                     public void failed(final Exception ex) {
286                         if (log.isDebugEnabled()) {
287                             log.debug("Connection request failed", ex);
288                         }
289                         resultFuture.failed(ex);
290                     }
291 
292                     @Override
293                     public void cancelled() {
294                         log.debug("Connection request cancelled");
295                         resultFuture.cancel(true);
296                     }
297 
298                 });
299         return new Future<NHttpClientConnection>() {
300 
301             @Override
302             public boolean cancel(final boolean mayInterruptIfRunning) {
303                 try {
304                     leaseFuture.cancel(mayInterruptIfRunning);
305                 } finally {
306                     return resultFuture.cancel(mayInterruptIfRunning);
307                 }
308             }
309 
310             @Override
311             public boolean isCancelled() {
312                 return resultFuture.isCancelled();
313             }
314 
315             @Override
316             public boolean isDone() {
317                 return resultFuture.isDone();
318             }
319 
320             @Override
321             public NHttpClientConnection get() throws InterruptedException, ExecutionException {
322                 return resultFuture.get();
323             }
324 
325             @Override
326             public NHttpClientConnection get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
327                 return resultFuture.get(timeout, unit);
328             }
329 
330         };
331     }
332 
333     @Override
334     public void releaseConnection(
335             final NHttpClientConnection managedConn,
336             final Object state,
337             final long keepalive,
338             final TimeUnit tunit) {
339         Args.notNull(managedConn, "Managed connection");
340         synchronized (managedConn) {
341             final CPoolEntry entry = CPoolProxy.detach(managedConn);
342             if (entry == null) {
343                 return;
344             }
345             if (this.log.isDebugEnabled()) {
346                 this.log.debug("Releasing connection: " + format(entry) + formatStats(entry.getRoute()));
347             }
348             final NHttpClientConnection conn = entry.getConnection();
349             try {
350                 if (conn.isOpen()) {
351                     entry.setState(state);
352                     entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
353                     if (this.log.isDebugEnabled()) {
354                         final String s;
355                         if (keepalive > 0) {
356                             s = "for " + (double) keepalive / 1000 + " seconds";
357                         } else {
358                             s = "indefinitely";
359                         }
360                         this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
361                     }
362                 }
363             } finally {
364                 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
365                 if (this.log.isDebugEnabled()) {
366                     this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
367                 }
368             }
369         }
370     }
371 
372     private Lookup<SchemeIOSessionStrategy> getIOSessionFactoryRegistry(final HttpContext context) {
373         @SuppressWarnings("unchecked")
374         Lookup<SchemeIOSessionStrategy> reg = (Lookup<SchemeIOSessionStrategy>) context.getAttribute(
375                 IOSESSION_FACTORY_REGISTRY);
376         if (reg == null) {
377             reg = this.iosessionFactoryRegistry;
378         }
379         return reg;
380     }
381 
382     @Override
383     public void startRoute(
384             final NHttpClientConnection managedConn,
385             final HttpRoute route,
386             final HttpContext context) throws IOException {
387         Args.notNull(managedConn, "Managed connection");
388         Args.notNull(route, "HTTP route");
389         final HttpHost host;
390         if (route.getProxyHost() != null) {
391             host = route.getProxyHost();
392         } else {
393             host = route.getTargetHost();
394         }
395         final Lookup<SchemeIOSessionStrategy> reg = getIOSessionFactoryRegistry(context);
396         final SchemeIOSessionStrategy sf = reg.lookup(host.getSchemeName());
397         if (sf == null) {
398             throw new UnsupportedSchemeException(host.getSchemeName() +
399                     " protocol is not supported");
400         }
401         if (sf.isLayeringRequired()) {
402             synchronized (managedConn) {
403                 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
404                 final ManagedNHttpClientConnection conn = entry.getConnection();
405                 final IOSession ioSession = conn.getIOSession();
406                 final IOSession currentSession = sf.upgrade(host, ioSession);
407                 conn.bind(currentSession);
408             }
409         }
410     }
411 
412     @Override
413     public void upgrade(
414             final NHttpClientConnection managedConn,
415             final HttpRoute route,
416             final HttpContext context) throws IOException {
417         Args.notNull(managedConn, "Managed connection");
418         Args.notNull(route, "HTTP route");
419         final HttpHost host  = route.getTargetHost();
420         final Lookup<SchemeIOSessionStrategy> reg = getIOSessionFactoryRegistry(context);
421         final SchemeIOSessionStrategy sf = reg.lookup(host.getSchemeName());
422         if (sf == null) {
423             throw new UnsupportedSchemeException(host.getSchemeName() +
424                     " protocol is not supported");
425         }
426         if (!sf.isLayeringRequired()) {
427             throw new UnsupportedSchemeException(host.getSchemeName() +
428                     " protocol does not support connection upgrade");
429         }
430         synchronized (managedConn) {
431             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
432             final ManagedNHttpClientConnection conn = entry.getConnection();
433             final IOSession currentSession = sf.upgrade(host, conn.getIOSession());
434             conn.bind(currentSession);
435         }
436     }
437 
438     @Override
439     public void routeComplete(
440             final NHttpClientConnection managedConn,
441             final HttpRoute route,
442             final HttpContext context) {
443         Args.notNull(managedConn, "Managed connection");
444         Args.notNull(route, "HTTP route");
445         synchronized (managedConn) {
446             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
447             entry.markRouteComplete();
448         }
449     }
450 
451     @Override
452     public boolean isRouteComplete(
453             final NHttpClientConnection managedConn) {
454         Args.notNull(managedConn, "Managed connection");
455         synchronized (managedConn) {
456             final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
457             return entry.isRouteComplete();
458         }
459     }
460 
461     @Override
462     public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
463         if (this.log.isDebugEnabled()) {
464             this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
465         }
466         this.pool.closeIdle(idleTimeout, tunit);
467     }
468 
469     @Override
470     public void closeExpiredConnections() {
471         log.debug("Closing expired connections");
472         this.pool.closeExpired();
473     }
474 
475     public void validatePendingRequests() {
476         log.debug("Validating pending requests");
477         this.pool.validatePendingRequests();
478     }
479 
480     @Override
481     public int getMaxTotal() {
482         return this.pool.getMaxTotal();
483     }
484 
485     @Override
486     public void setMaxTotal(final int max) {
487         this.pool.setMaxTotal(max);
488     }
489 
490     @Override
491     public int getDefaultMaxPerRoute() {
492         return this.pool.getDefaultMaxPerRoute();
493     }
494 
495     @Override
496     public void setDefaultMaxPerRoute(final int max) {
497         this.pool.setDefaultMaxPerRoute(max);
498     }
499 
500     @Override
501     public int getMaxPerRoute(final HttpRoute route) {
502         return this.pool.getMaxPerRoute(route);
503     }
504 
505     @Override
506     public void setMaxPerRoute(final HttpRoute route, final int max) {
507         this.pool.setMaxPerRoute(route, max);
508     }
509 
510     @Override
511     public PoolStats getTotalStats() {
512         return this.pool.getTotalStats();
513     }
514 
515     @Override
516     public PoolStats getStats(final HttpRoute route) {
517         return this.pool.getStats(route);
518     }
519 
520     /**
521      * @since 4.1
522      */
523     public Set<HttpRoute> getRoutes() {
524         return this.pool.getRoutes();
525     }
526 
527     public ConnectionConfig getDefaultConnectionConfig() {
528         return this.configData.getDefaultConnectionConfig();
529     }
530 
531     public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
532         this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
533     }
534 
535     public ConnectionConfig getConnectionConfig(final HttpHost host) {
536         return this.configData.getConnectionConfig(host);
537     }
538 
539     public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
540         this.configData.setConnectionConfig(host, connectionConfig);
541     }
542 
543     static class ConfigData {
544 
545         private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
546         private volatile ConnectionConfig defaultConnectionConfig;
547 
548         ConfigData() {
549             super();
550             this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
551         }
552 
553         public ConnectionConfig getDefaultConnectionConfig() {
554             return this.defaultConnectionConfig;
555         }
556 
557         public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
558             this.defaultConnectionConfig = defaultConnectionConfig;
559         }
560 
561         public ConnectionConfig getConnectionConfig(final HttpHost host) {
562             return this.connectionConfigMap.get(host);
563         }
564 
565         public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
566             this.connectionConfigMap.put(host, connectionConfig);
567         }
568 
569     }
570 
571     static class InternalConnectionFactory implements NIOConnFactory<HttpRoute, ManagedNHttpClientConnection> {
572 
573         private final ConfigData configData;
574         private final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory;
575 
576         InternalConnectionFactory(
577                 final ConfigData configData,
578                 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory) {
579             super();
580             this.configData = configData != null ? configData : new ConfigData();
581             this.connFactory = connFactory != null ? connFactory :
582                 ManagedNHttpClientConnectionFactory.INSTANCE;
583         }
584 
585         @Override
586         public ManagedNHttpClientConnection create(
587                 final HttpRoute route, final IOSession iosession) throws IOException {
588             ConnectionConfig config = null;
589             if (route.getProxyHost() != null) {
590                 config = this.configData.getConnectionConfig(route.getProxyHost());
591             }
592             if (config == null) {
593                 config = this.configData.getConnectionConfig(route.getTargetHost());
594             }
595             if (config == null) {
596                 config = this.configData.getDefaultConnectionConfig();
597             }
598             if (config == null) {
599                 config = ConnectionConfig.DEFAULT;
600             }
601             final ManagedNHttpClientConnection conn = this.connFactory.create(iosession, config);
602             iosession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
603             return conn;
604         }
605 
606     }
607 
608     static class InternalAddressResolver implements SocketAddressResolver<HttpRoute> {
609 
610         private final SchemePortResolver schemePortResolver;
611         private final DnsResolver dnsResolver;
612 
613         public InternalAddressResolver(
614                 final SchemePortResolver schemePortResolver,
615                 final DnsResolver dnsResolver) {
616             super();
617             this.schemePortResolver = schemePortResolver != null ? schemePortResolver :
618                 DefaultSchemePortResolver.INSTANCE;
619             this.dnsResolver = dnsResolver != null ? dnsResolver :
620                     SystemDefaultDnsResolver.INSTANCE;
621         }
622 
623         @Override
624         public SocketAddress resolveLocalAddress(final HttpRoute route) throws IOException {
625             return route.getLocalAddress() != null ? new InetSocketAddress(route.getLocalAddress(), 0) : null;
626         }
627 
628         @Override
629         public SocketAddress resolveRemoteAddress(final HttpRoute route) throws IOException {
630             final HttpHost host;
631             if (route.getProxyHost() != null) {
632                 host = route.getProxyHost();
633             } else {
634                 host = route.getTargetHost();
635             }
636             final int port = this.schemePortResolver.resolve(host);
637             final InetAddress[] addresses = this.dnsResolver.resolve(host.getHostName());
638             return new InetSocketAddress(addresses[0], port);
639         }
640 
641     }
642 }