1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.conn;
28
29 import java.io.IOException;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.SocketAddress;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.http.HttpHost;
44 import org.apache.http.annotation.Contract;
45 import org.apache.http.annotation.ThreadingBehavior;
46 import org.apache.http.concurrent.BasicFuture;
47 import org.apache.http.concurrent.FutureCallback;
48 import org.apache.http.config.ConnectionConfig;
49 import org.apache.http.config.Lookup;
50 import org.apache.http.config.Registry;
51 import org.apache.http.config.RegistryBuilder;
52 import org.apache.http.conn.DnsResolver;
53 import org.apache.http.conn.SchemePortResolver;
54 import org.apache.http.conn.UnsupportedSchemeException;
55 import org.apache.http.conn.routing.HttpRoute;
56 import org.apache.http.impl.conn.DefaultSchemePortResolver;
57 import org.apache.http.impl.conn.SystemDefaultDnsResolver;
58 import org.apache.http.nio.NHttpClientConnection;
59 import org.apache.http.nio.conn.ManagedNHttpClientConnection;
60 import org.apache.http.nio.conn.NHttpClientConnectionManager;
61 import org.apache.http.nio.conn.NHttpConnectionFactory;
62 import org.apache.http.nio.conn.NoopIOSessionStrategy;
63 import org.apache.http.nio.conn.SchemeIOSessionStrategy;
64 import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
65 import org.apache.http.nio.pool.NIOConnFactory;
66 import org.apache.http.nio.pool.SocketAddressResolver;
67 import org.apache.http.nio.reactor.ConnectingIOReactor;
68 import org.apache.http.nio.reactor.IOEventDispatch;
69 import org.apache.http.nio.reactor.IOSession;
70 import org.apache.http.pool.ConnPoolControl;
71 import org.apache.http.pool.PoolStats;
72 import org.apache.http.protocol.HttpContext;
73 import org.apache.http.util.Args;
74 import org.apache.http.util.Asserts;
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @Contract(threading = ThreadingBehavior.SAFE)
95 public class PoolingNHttpClientConnectionManager
96 implements NHttpClientConnectionManager, ConnPoolControl<HttpRoute> {
97
98 private final Log log = LogFactory.getLog(getClass());
99
100 static final String IOSESSION_FACTORY_REGISTRY = "http.ioSession-factory-registry";
101
102 private final ConnectingIOReactor ioReactor;
103 private final ConfigData configData;
104 private final CPool pool;
105 private final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry;
106
107 private static Registry<SchemeIOSessionStrategy> getDefaultRegistry() {
108 return RegistryBuilder.<SchemeIOSessionStrategy>create()
109 .register("http", NoopIOSessionStrategy.INSTANCE)
110 .register("https", SSLIOSessionStrategy.getDefaultStrategy())
111 .build();
112 }
113
114 public PoolingNHttpClientConnectionManager(final ConnectingIOReactor ioReactor) {
115 this(ioReactor, getDefaultRegistry());
116 }
117
118 public PoolingNHttpClientConnectionManager(
119 final ConnectingIOReactor ioReactor,
120 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry) {
121 this(ioReactor, null, ioSessionFactoryRegistry, (DnsResolver) null);
122 }
123
124 public PoolingNHttpClientConnectionManager(
125 final ConnectingIOReactor ioReactor,
126 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
127 final DnsResolver dnsResolver) {
128 this(ioReactor, connFactory, getDefaultRegistry(), dnsResolver);
129 }
130
131 public PoolingNHttpClientConnectionManager(
132 final ConnectingIOReactor ioReactor,
133 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
134 final SocketAddressResolver<HttpRoute> socketAddressResolver) {
135 this(ioReactor, connFactory, getDefaultRegistry(), socketAddressResolver);
136 }
137
138 public PoolingNHttpClientConnectionManager(
139 final ConnectingIOReactor ioReactor,
140 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory) {
141 this(ioReactor, connFactory, getDefaultRegistry(), (DnsResolver) null);
142 }
143
144 public PoolingNHttpClientConnectionManager(
145 final ConnectingIOReactor ioReactor,
146 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
147 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry) {
148 this(ioReactor, connFactory, ioSessionFactoryRegistry, (DnsResolver) null);
149 }
150
151 public PoolingNHttpClientConnectionManager(
152 final ConnectingIOReactor ioReactor,
153 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
154 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
155 final DnsResolver dnsResolver) {
156 this(ioReactor, connFactory, ioSessionFactoryRegistry, null, dnsResolver,
157 -1, TimeUnit.MILLISECONDS);
158 }
159
160 public PoolingNHttpClientConnectionManager(
161 final ConnectingIOReactor ioReactor,
162 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
163 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
164 final SocketAddressResolver<HttpRoute> socketAddressResolver) {
165 this(ioReactor, connFactory, ioSessionFactoryRegistry, socketAddressResolver,
166 -1, TimeUnit.MILLISECONDS);
167 }
168
169 public PoolingNHttpClientConnectionManager(
170 final ConnectingIOReactor ioReactor,
171 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
172 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
173 final SchemePortResolver schemePortResolver,
174 final DnsResolver dnsResolver,
175 final long timeToLive, final TimeUnit timeUnit) {
176 this(ioReactor, connFactory, ioSessionFactoryRegistry,
177 new InternalAddressResolver(schemePortResolver, dnsResolver), timeToLive, timeUnit);
178 }
179
180 public PoolingNHttpClientConnectionManager(
181 final ConnectingIOReactor ioReactor,
182 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
183 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
184 final SocketAddressResolver<HttpRoute> socketAddressResolver,
185 final long timeToLive, final TimeUnit timeUnit) {
186 super();
187 Args.notNull(ioReactor, "I/O reactor");
188 Args.notNull(ioSessionFactoryRegistry, "I/O session factory registry");
189 Args.notNull(socketAddressResolver, "Socket address resolver");
190 this.ioReactor = ioReactor;
191 this.configData = new ConfigData();
192 this.pool = new CPool(ioReactor,
193 new InternalConnectionFactory(this.configData, connFactory),
194 socketAddressResolver,
195 2, 20, timeToLive, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
196 this.ioSessionFactoryRegistry = ioSessionFactoryRegistry;
197 }
198
199 PoolingNHttpClientConnectionManager(
200 final ConnectingIOReactor ioReactor,
201 final CPool pool,
202 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry) {
203 super();
204 this.ioReactor = ioReactor;
205 this.configData = new ConfigData();
206 this.pool = pool;
207 this.ioSessionFactoryRegistry = ioSessionFactoryRegistry;
208 }
209
210 @Override
211 protected void finalize() throws Throwable {
212 try {
213 shutdown();
214 } finally {
215 super.finalize();
216 }
217 }
218
219 @Override
220 public void execute(final IOEventDispatch eventDispatch) throws IOException {
221 this.ioReactor.execute(eventDispatch);
222 }
223
224 public void shutdown(final long waitMs) throws IOException {
225 this.log.debug("Connection manager is shutting down");
226 this.pool.shutdown(waitMs);
227 this.log.debug("Connection manager shut down");
228 }
229
230 @Override
231 public void shutdown() throws IOException {
232 this.log.debug("Connection manager is shutting down");
233 this.pool.shutdown(2000);
234 this.log.debug("Connection manager shut down");
235 }
236
237 private String format(final HttpRoute route, final Object state) {
238 final StringBuilder buf = new StringBuilder();
239 buf.append("[route: ").append(route).append("]");
240 if (state != null) {
241 buf.append("[state: ").append(state).append("]");
242 }
243 return buf.toString();
244 }
245
246 private String formatStats(final HttpRoute route) {
247 final StringBuilder buf = new StringBuilder();
248 final PoolStats totals = this.pool.getTotalStats();
249 final PoolStats stats = this.pool.getStats(route);
250 buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
251 buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
252 buf.append(" of ").append(stats.getMax()).append("; ");
253 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
254 buf.append(" of ").append(totals.getMax()).append("]");
255 return buf.toString();
256 }
257
258 private String format(final CPoolEntry entry) {
259 final StringBuilder buf = new StringBuilder();
260 buf.append("[id: ").append(entry.getId()).append("]");
261 buf.append("[route: ").append(entry.getRoute()).append("]");
262 final Object state = entry.getState();
263 if (state != null) {
264 buf.append("[state: ").append(state).append("]");
265 }
266 return buf.toString();
267 }
268
269 @Override
270 public Future<NHttpClientConnection> requestConnection(
271 final HttpRoute route,
272 final Object state,
273 final long connectTimeout,
274 final long leaseTimeout,
275 final TimeUnit timeUnit,
276 final FutureCallback<NHttpClientConnection> callback) {
277 Args.notNull(route, "HTTP route");
278 if (this.log.isDebugEnabled()) {
279 this.log.debug("Connection request: " + format(route, state) + formatStats(route));
280 }
281 final BasicFuture<NHttpClientConnection> resultFuture = new BasicFuture<NHttpClientConnection>(callback);
282 final HttpHost host;
283 if (route.getProxyHost() != null) {
284 host = route.getProxyHost();
285 } else {
286 host = route.getTargetHost();
287 }
288 final SchemeIOSessionStrategy sf = this.ioSessionFactoryRegistry.lookup(
289 host.getSchemeName());
290 if (sf == null) {
291 resultFuture.failed(new UnsupportedSchemeException(host.getSchemeName() +
292 " protocol is not supported"));
293 return resultFuture;
294 }
295 final Future<CPoolEntry> leaseFuture = this.pool.lease(route, state,
296 connectTimeout, leaseTimeout, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS,
297 new FutureCallback<CPoolEntry>() {
298
299 @Override
300 public void completed(final CPoolEntry entry) {
301 Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
302 if (log.isDebugEnabled()) {
303 log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
304 }
305 final NHttpClientConnection managedConn = CPoolProxy.newProxy(entry);
306 synchronized (managedConn) {
307 if (!resultFuture.completed(managedConn)) {
308 pool.release(entry, true);
309 }
310 }
311 }
312
313 @Override
314 public void failed(final Exception ex) {
315 if (log.isDebugEnabled()) {
316 log.debug("Connection request failed", ex);
317 }
318 resultFuture.failed(ex);
319 }
320
321 @Override
322 public void cancelled() {
323 log.debug("Connection request cancelled");
324 resultFuture.cancel(true);
325 }
326
327 });
328 return new Future<NHttpClientConnection>() {
329
330 @Override
331 public boolean cancel(final boolean mayInterruptIfRunning) {
332 try {
333 leaseFuture.cancel(mayInterruptIfRunning);
334 } finally {
335 return resultFuture.cancel(mayInterruptIfRunning);
336 }
337 }
338
339 @Override
340 public boolean isCancelled() {
341 return resultFuture.isCancelled();
342 }
343
344 @Override
345 public boolean isDone() {
346 return resultFuture.isDone();
347 }
348
349 @Override
350 public NHttpClientConnection get() throws InterruptedException, ExecutionException {
351 return resultFuture.get();
352 }
353
354 @Override
355 public NHttpClientConnection get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
356 return resultFuture.get(timeout, unit);
357 }
358
359 };
360 }
361
362 @Override
363 public void releaseConnection(
364 final NHttpClientConnection managedConn,
365 final Object state,
366 final long keepalive,
367 final TimeUnit timeUnit) {
368 Args.notNull(managedConn, "Managed connection");
369 synchronized (managedConn) {
370 final CPoolEntry entry = CPoolProxy.detach(managedConn);
371 if (entry == null) {
372 return;
373 }
374 if (this.log.isDebugEnabled()) {
375 this.log.debug("Releasing connection: " + format(entry) + formatStats(entry.getRoute()));
376 }
377 final NHttpClientConnection conn = entry.getConnection();
378 try {
379 if (conn.isOpen()) {
380 entry.setState(state);
381 entry.updateExpiry(keepalive, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
382 if (this.log.isDebugEnabled()) {
383 final String s;
384 if (keepalive > 0) {
385 s = "for " + (double) keepalive / 1000 + " seconds";
386 } else {
387 s = "indefinitely";
388 }
389 this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
390 }
391 }
392 } finally {
393 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
394 if (this.log.isDebugEnabled()) {
395 this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
396 }
397 }
398 }
399 }
400
401 private Lookup<SchemeIOSessionStrategy> getIOSessionFactoryRegistry(final HttpContext context) {
402 @SuppressWarnings("unchecked")
403 Lookup<SchemeIOSessionStrategy> reg = (Lookup<SchemeIOSessionStrategy>) context.getAttribute(
404 IOSESSION_FACTORY_REGISTRY);
405 if (reg == null) {
406 reg = this.ioSessionFactoryRegistry;
407 }
408 return reg;
409 }
410
411 @Override
412 public void startRoute(
413 final NHttpClientConnection managedConn,
414 final HttpRoute route,
415 final HttpContext context) throws IOException {
416 Args.notNull(managedConn, "Managed connection");
417 Args.notNull(route, "HTTP route");
418 final HttpHost host;
419 if (route.getProxyHost() != null) {
420 host = route.getProxyHost();
421 } else {
422 host = route.getTargetHost();
423 }
424 final Lookup<SchemeIOSessionStrategy> reg = getIOSessionFactoryRegistry(context);
425 final SchemeIOSessionStrategy sf = reg.lookup(host.getSchemeName());
426 if (sf == null) {
427 throw new UnsupportedSchemeException(host.getSchemeName() +
428 " protocol is not supported");
429 }
430 if (sf.isLayeringRequired()) {
431 synchronized (managedConn) {
432 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
433 final ManagedNHttpClientConnection conn = entry.getConnection();
434 final IOSession ioSession = conn.getIOSession();
435 final IOSession currentSession = sf.upgrade(host, ioSession);
436 conn.bind(currentSession);
437 }
438 }
439 }
440
441 @Override
442 public void upgrade(
443 final NHttpClientConnection managedConn,
444 final HttpRoute route,
445 final HttpContext context) throws IOException {
446 Args.notNull(managedConn, "Managed connection");
447 Args.notNull(route, "HTTP route");
448 final HttpHost host = route.getTargetHost();
449 final Lookup<SchemeIOSessionStrategy> reg = getIOSessionFactoryRegistry(context);
450 final SchemeIOSessionStrategy sf = reg.lookup(host.getSchemeName());
451 if (sf == null) {
452 throw new UnsupportedSchemeException(host.getSchemeName() +
453 " protocol is not supported");
454 }
455 if (!sf.isLayeringRequired()) {
456 throw new UnsupportedSchemeException(host.getSchemeName() +
457 " protocol does not support connection upgrade");
458 }
459 synchronized (managedConn) {
460 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
461 final ManagedNHttpClientConnection conn = entry.getConnection();
462 final IOSession currentSession = sf.upgrade(host, conn.getIOSession());
463 conn.bind(currentSession);
464 }
465 }
466
467 @Override
468 public void routeComplete(
469 final NHttpClientConnection managedConn,
470 final HttpRoute route,
471 final HttpContext context) {
472 Args.notNull(managedConn, "Managed connection");
473 Args.notNull(route, "HTTP route");
474 synchronized (managedConn) {
475 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
476 entry.markRouteComplete();
477 }
478 }
479
480 @Override
481 public boolean isRouteComplete(
482 final NHttpClientConnection managedConn) {
483 Args.notNull(managedConn, "Managed connection");
484 synchronized (managedConn) {
485 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
486 return entry.isRouteComplete();
487 }
488 }
489
490 @Override
491 public void closeIdleConnections(final long idleTimeout, final TimeUnit timeUnit) {
492 if (this.log.isDebugEnabled()) {
493 this.log.debug("Closing connections idle longer than " + idleTimeout + " " + timeUnit);
494 }
495 this.pool.closeIdle(idleTimeout, timeUnit);
496 }
497
498 @Override
499 public void closeExpiredConnections() {
500 log.debug("Closing expired connections");
501 this.pool.closeExpired();
502 }
503
504 public void validatePendingRequests() {
505 log.debug("Validating pending requests");
506 this.pool.validatePendingRequests();
507 }
508
509 @Override
510 public int getMaxTotal() {
511 return this.pool.getMaxTotal();
512 }
513
514 @Override
515 public void setMaxTotal(final int max) {
516 this.pool.setMaxTotal(max);
517 }
518
519 @Override
520 public int getDefaultMaxPerRoute() {
521 return this.pool.getDefaultMaxPerRoute();
522 }
523
524 @Override
525 public void setDefaultMaxPerRoute(final int max) {
526 this.pool.setDefaultMaxPerRoute(max);
527 }
528
529 @Override
530 public int getMaxPerRoute(final HttpRoute route) {
531 return this.pool.getMaxPerRoute(route);
532 }
533
534 @Override
535 public void setMaxPerRoute(final HttpRoute route, final int max) {
536 this.pool.setMaxPerRoute(route, max);
537 }
538
539 @Override
540 public PoolStats getTotalStats() {
541 return this.pool.getTotalStats();
542 }
543
544 @Override
545 public PoolStats getStats(final HttpRoute route) {
546 return this.pool.getStats(route);
547 }
548
549
550
551
552 public Set<HttpRoute> getRoutes() {
553 return this.pool.getRoutes();
554 }
555
556 public ConnectionConfig getDefaultConnectionConfig() {
557 return this.configData.getDefaultConnectionConfig();
558 }
559
560 public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
561 this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
562 }
563
564 public ConnectionConfig getConnectionConfig(final HttpHost host) {
565 return this.configData.getConnectionConfig(host);
566 }
567
568 public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
569 this.configData.setConnectionConfig(host, connectionConfig);
570 }
571
572 static class ConfigData {
573
574 private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
575 private volatile ConnectionConfig defaultConnectionConfig;
576
577 ConfigData() {
578 super();
579 this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
580 }
581
582 public ConnectionConfig getDefaultConnectionConfig() {
583 return this.defaultConnectionConfig;
584 }
585
586 public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
587 this.defaultConnectionConfig = defaultConnectionConfig;
588 }
589
590 public ConnectionConfig getConnectionConfig(final HttpHost host) {
591 return this.connectionConfigMap.get(host);
592 }
593
594 public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
595 this.connectionConfigMap.put(host, connectionConfig);
596 }
597
598 }
599
600 static class InternalConnectionFactory implements NIOConnFactory<HttpRoute, ManagedNHttpClientConnection> {
601
602 private final ConfigData configData;
603 private final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory;
604
605 InternalConnectionFactory(
606 final ConfigData configData,
607 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory) {
608 super();
609 this.configData = configData != null ? configData : new ConfigData();
610 this.connFactory = connFactory != null ? connFactory :
611 ManagedNHttpClientConnectionFactory.INSTANCE;
612 }
613
614 @Override
615 public ManagedNHttpClientConnection create(
616 final HttpRoute route, final IOSession ioSession) throws IOException {
617 ConnectionConfig config = null;
618 if (route.getProxyHost() != null) {
619 config = this.configData.getConnectionConfig(route.getProxyHost());
620 }
621 if (config == null) {
622 config = this.configData.getConnectionConfig(route.getTargetHost());
623 }
624 if (config == null) {
625 config = this.configData.getDefaultConnectionConfig();
626 }
627 if (config == null) {
628 config = ConnectionConfig.DEFAULT;
629 }
630 final ManagedNHttpClientConnection conn = this.connFactory.create(ioSession, config);
631 ioSession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
632 return conn;
633 }
634
635 }
636
637 static class InternalAddressResolver implements SocketAddressResolver<HttpRoute> {
638
639 private final SchemePortResolver schemePortResolver;
640 private final DnsResolver dnsResolver;
641
642 public InternalAddressResolver(
643 final SchemePortResolver schemePortResolver,
644 final DnsResolver dnsResolver) {
645 super();
646 this.schemePortResolver = schemePortResolver != null ? schemePortResolver :
647 DefaultSchemePortResolver.INSTANCE;
648 this.dnsResolver = dnsResolver != null ? dnsResolver :
649 SystemDefaultDnsResolver.INSTANCE;
650 }
651
652 @Override
653 public SocketAddress resolveLocalAddress(final HttpRoute route) throws IOException {
654 return route.getLocalAddress() != null ? new InetSocketAddress(route.getLocalAddress(), 0) : null;
655 }
656
657 @Override
658 public SocketAddress resolveRemoteAddress(final HttpRoute route) throws IOException {
659 final HttpHost host;
660 if (route.getProxyHost() != null) {
661 host = route.getProxyHost();
662 } else {
663 host = route.getTargetHost();
664 }
665 final int port = this.schemePortResolver.resolve(host);
666 final InetAddress[] addresses = this.dnsResolver.resolve(host.getHostName());
667 return new InetSocketAddress(addresses[0], port);
668 }
669
670 }
671 }