1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.pool;
28
29 import java.io.IOException;
30 import java.net.SocketAddress;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.ListIterator;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.locks.Lock;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import org.apache.http.annotation.ThreadSafe;
45 import org.apache.http.concurrent.BasicFuture;
46 import org.apache.http.concurrent.FutureCallback;
47 import org.apache.http.nio.reactor.ConnectingIOReactor;
48 import org.apache.http.nio.reactor.IOSession;
49 import org.apache.http.nio.reactor.SessionRequest;
50 import org.apache.http.nio.reactor.SessionRequestCallback;
51 import org.apache.http.pool.ConnPool;
52 import org.apache.http.pool.ConnPoolControl;
53 import org.apache.http.pool.PoolEntry;
54 import org.apache.http.pool.PoolStats;
55 import org.apache.http.util.Args;
56 import org.apache.http.util.Asserts;
57
58
59
60
61
62
63
64
65
66
67 @ThreadSafe
68 public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
69 implements ConnPool<T, E>, ConnPoolControl<T> {
70
71 private final ConnectingIOReactor ioreactor;
72 private final NIOConnFactory<T, C> connFactory;
73 private final SocketAddressResolver<T> addressResolver;
74 private final SessionRequestCallback sessionRequestCallback;
75 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
76 private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
77 private final Set<SessionRequest> pending;
78 private final Set<E> leased;
79 private final LinkedList<E> available;
80 private final Map<T, Integer> maxPerRoute;
81 private final Lock lock;
82
83 private volatile boolean isShutDown;
84 private volatile int defaultMaxPerRoute;
85 private volatile int maxTotal;
86
87
88
89
90
91 @Deprecated
92 public AbstractNIOConnPool(
93 final ConnectingIOReactor ioreactor,
94 final NIOConnFactory<T, C> connFactory,
95 final int defaultMaxPerRoute,
96 final int maxTotal) {
97 super();
98 Args.notNull(ioreactor, "I/O reactor");
99 Args.notNull(connFactory, "Connection factory");
100 Args.positive(defaultMaxPerRoute, "Max per route value");
101 Args.positive(maxTotal, "Max total value");
102 this.ioreactor = ioreactor;
103 this.connFactory = connFactory;
104 this.addressResolver = new SocketAddressResolver<T>() {
105
106 public SocketAddress resolveLocalAddress(final T route) throws IOException {
107 return AbstractNIOConnPool.this.resolveLocalAddress(route);
108 }
109
110 public SocketAddress resolveRemoteAddress(final T route) throws IOException {
111 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
112 }
113
114 };
115 this.sessionRequestCallback = new InternalSessionRequestCallback();
116 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
117 this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
118 this.pending = new HashSet<SessionRequest>();
119 this.leased = new HashSet<E>();
120 this.available = new LinkedList<E>();
121 this.maxPerRoute = new HashMap<T, Integer>();
122 this.lock = new ReentrantLock();
123 this.defaultMaxPerRoute = defaultMaxPerRoute;
124 this.maxTotal = maxTotal;
125 }
126
127
128
129
130 public AbstractNIOConnPool(
131 final ConnectingIOReactor ioreactor,
132 final NIOConnFactory<T, C> connFactory,
133 final SocketAddressResolver<T> addressResolver,
134 final int defaultMaxPerRoute,
135 final int maxTotal) {
136 super();
137 Args.notNull(ioreactor, "I/O reactor");
138 Args.notNull(connFactory, "Connection factory");
139 Args.notNull(addressResolver, "Address resolver");
140 Args.positive(defaultMaxPerRoute, "Max per route value");
141 Args.positive(maxTotal, "Max total value");
142 this.ioreactor = ioreactor;
143 this.connFactory = connFactory;
144 this.addressResolver = addressResolver;
145 this.sessionRequestCallback = new InternalSessionRequestCallback();
146 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
147 this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
148 this.pending = new HashSet<SessionRequest>();
149 this.leased = new HashSet<E>();
150 this.available = new LinkedList<E>();
151 this.maxPerRoute = new HashMap<T, Integer>();
152 this.lock = new ReentrantLock();
153 this.defaultMaxPerRoute = defaultMaxPerRoute;
154 this.maxTotal = maxTotal;
155 }
156
157
158
159
160 @Deprecated
161 protected SocketAddress resolveRemoteAddress(final T route) {
162 return null;
163 }
164
165
166
167
168 @Deprecated
169 protected SocketAddress resolveLocalAddress(final T route) {
170 return null;
171 }
172
173 protected abstract E createEntry(T route, C conn);
174
175 public boolean isShutdown() {
176 return this.isShutDown;
177 }
178
179 public void shutdown(final long waitMs) throws IOException {
180 if (this.isShutDown) {
181 return ;
182 }
183 this.isShutDown = true;
184 this.lock.lock();
185 try {
186 for (final SessionRequest sessionRequest: this.pending) {
187 sessionRequest.cancel();
188 }
189 for (final E entry: this.available) {
190 entry.close();
191 }
192 for (final E entry: this.leased) {
193 entry.close();
194 }
195 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
196 pool.shutdown();
197 }
198 this.routeToPool.clear();
199 this.leased.clear();
200 this.pending.clear();
201 this.available.clear();
202 this.leasingRequests.clear();
203 this.ioreactor.shutdown(waitMs);
204 } finally {
205 this.lock.unlock();
206 }
207 }
208
209 private RouteSpecificPool<T, C, E> getPool(final T route) {
210 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
211 if (pool == null) {
212 pool = new RouteSpecificPool<T, C, E>(route) {
213
214 @Override
215 protected E createEntry(final T route, final C conn) {
216 return AbstractNIOConnPool.this.createEntry(route, conn);
217 }
218
219 };
220 this.routeToPool.put(route, pool);
221 }
222 return pool;
223 }
224
225 public Future<E> lease(
226 final T route, final Object state,
227 final long connectTimeout, final TimeUnit tunit,
228 final FutureCallback<E> callback) {
229 Args.notNull(route, "Route");
230 Args.notNull(tunit, "Time unit");
231 Asserts.check(!this.isShutDown, "Connection pool shut down");
232 this.lock.lock();
233 try {
234 final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
235 final BasicFuture<E> future = new BasicFuture<E>(callback);
236 final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, future);
237 if (!processPendingRequest(request)) {
238 this.leasingRequests.add(request);
239 }
240 return future;
241 } finally {
242 this.lock.unlock();
243 }
244 }
245
246 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
247 return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
248 }
249
250 public Future<E> lease(final T route, final Object state) {
251 return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
252 }
253
254 public void release(final E entry, final boolean reusable) {
255 if (entry == null) {
256 return;
257 }
258 if (this.isShutDown) {
259 return;
260 }
261 this.lock.lock();
262 try {
263 if (this.leased.remove(entry)) {
264 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
265 pool.free(entry, reusable);
266 if (reusable) {
267 this.available.addFirst(entry);
268 } else {
269 entry.close();
270 }
271 processNextPendingRequest();
272 }
273 } finally {
274 this.lock.unlock();
275 }
276 }
277
278 private void processPendingRequests() {
279 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
280 while (it.hasNext()) {
281 final LeaseRequest<T, C, E> request = it.next();
282 if (processPendingRequest(request)) {
283 it.remove();
284 }
285 }
286 }
287
288 private void processNextPendingRequest() {
289 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
290 while (it.hasNext()) {
291 final LeaseRequest<T, C, E> request = it.next();
292 if (processPendingRequest(request)) {
293 it.remove();
294 return;
295 }
296 }
297 }
298
299 private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
300 final T route = request.getRoute();
301 final Object state = request.getState();
302 final long deadline = request.getDeadline();
303 final BasicFuture<E> future = request.getFuture();
304
305 final long now = System.currentTimeMillis();
306 if (now > deadline) {
307 future.failed(new TimeoutException());
308 return true;
309 }
310
311 final RouteSpecificPool<T, C, E> pool = getPool(route);
312 E entry = null;
313 for (;;) {
314 entry = pool.getFree(state);
315 if (entry == null) {
316 break;
317 }
318 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
319 entry.close();
320 this.available.remove(entry);
321 pool.free(entry, false);
322 } else {
323 break;
324 }
325 }
326 if (entry != null) {
327 this.available.remove(entry);
328 this.leased.add(entry);
329 future.completed(entry);
330 return true;
331 }
332
333
334 final int maxPerRoute = getMax(route);
335
336 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
337 if (excess > 0) {
338 for (int i = 0; i < excess; i++) {
339 final E lastUsed = pool.getLastUsed();
340 if (lastUsed == null) {
341 break;
342 }
343 lastUsed.close();
344 this.available.remove(lastUsed);
345 pool.remove(lastUsed);
346 }
347 }
348
349 if (pool.getAllocatedCount() < maxPerRoute) {
350 final int totalUsed = this.pending.size() + this.leased.size();
351 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
352 if (freeCapacity == 0) {
353 return false;
354 }
355 final int totalAvailable = this.available.size();
356 if (totalAvailable > freeCapacity - 1) {
357 if (!this.available.isEmpty()) {
358 final E lastUsed = this.available.removeLast();
359 lastUsed.close();
360 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
361 otherpool.remove(lastUsed);
362 }
363 }
364
365 final SocketAddress localAddress;
366 final SocketAddress remoteAddress;
367 try {
368 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
369 localAddress = this.addressResolver.resolveLocalAddress(route);
370 } catch (final IOException ex) {
371 future.failed(ex);
372 return true;
373 }
374
375 final SessionRequest sessionRequest = this.ioreactor.connect(
376 remoteAddress, localAddress, route, this.sessionRequestCallback);
377 final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
378 (int) request.getConnectTimeout() : Integer.MAX_VALUE;
379 sessionRequest.setConnectTimeout(timout);
380 this.pending.add(sessionRequest);
381 pool.addPending(sessionRequest, future);
382 return true;
383 } else {
384 return false;
385 }
386 }
387
388 public void validatePendingRequests() {
389 this.lock.lock();
390 try {
391 final long now = System.currentTimeMillis();
392 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
393 while (it.hasNext()) {
394 final LeaseRequest<T, C, E> request = it.next();
395 final long deadline = request.getDeadline();
396 if (now > deadline) {
397 it.remove();
398 final BasicFuture<E> future = request.getFuture();
399 future.failed(new TimeoutException());
400 }
401 }
402 } finally {
403 this.lock.unlock();
404 }
405 }
406
407 protected void requestCompleted(final SessionRequest request) {
408 if (this.isShutDown) {
409 return;
410 }
411 @SuppressWarnings("unchecked")
412 final
413 T route = (T) request.getAttachment();
414 this.lock.lock();
415 try {
416 this.pending.remove(request);
417 final RouteSpecificPool<T, C, E> pool = getPool(route);
418 final IOSession session = request.getSession();
419 try {
420 final C conn = this.connFactory.create(route, session);
421 final E entry = pool.createEntry(request, conn);
422 this.leased.add(entry);
423 pool.completed(request, entry);
424
425 } catch (final IOException ex) {
426 pool.failed(request, ex);
427 }
428 } finally {
429 this.lock.unlock();
430 }
431 }
432
433 protected void requestCancelled(final SessionRequest request) {
434 if (this.isShutDown) {
435 return;
436 }
437 @SuppressWarnings("unchecked")
438 final
439 T route = (T) request.getAttachment();
440 this.lock.lock();
441 try {
442 this.pending.remove(request);
443 final RouteSpecificPool<T, C, E> pool = getPool(route);
444 pool.cancelled(request);
445 processNextPendingRequest();
446 } finally {
447 this.lock.unlock();
448 }
449 }
450
451 protected void requestFailed(final SessionRequest request) {
452 if (this.isShutDown) {
453 return;
454 }
455 @SuppressWarnings("unchecked")
456 final
457 T route = (T) request.getAttachment();
458 this.lock.lock();
459 try {
460 this.pending.remove(request);
461 final RouteSpecificPool<T, C, E> pool = getPool(route);
462 pool.failed(request, request.getException());
463 processNextPendingRequest();
464 } finally {
465 this.lock.unlock();
466 }
467 }
468
469 protected void requestTimeout(final SessionRequest request) {
470 if (this.isShutDown) {
471 return;
472 }
473 @SuppressWarnings("unchecked")
474 final
475 T route = (T) request.getAttachment();
476 this.lock.lock();
477 try {
478 this.pending.remove(request);
479 final RouteSpecificPool<T, C, E> pool = getPool(route);
480 pool.timeout(request);
481 processNextPendingRequest();
482 } finally {
483 this.lock.unlock();
484 }
485 }
486
487 private int getMax(final T route) {
488 final Integer v = this.maxPerRoute.get(route);
489 if (v != null) {
490 return v.intValue();
491 } else {
492 return this.defaultMaxPerRoute;
493 }
494 }
495
496 public void setMaxTotal(final int max) {
497 Args.positive(max, "Max value");
498 this.lock.lock();
499 try {
500 this.maxTotal = max;
501 } finally {
502 this.lock.unlock();
503 }
504 }
505
506 public int getMaxTotal() {
507 this.lock.lock();
508 try {
509 return this.maxTotal;
510 } finally {
511 this.lock.unlock();
512 }
513 }
514
515 public void setDefaultMaxPerRoute(final int max) {
516 Args.positive(max, "Max value");
517 this.lock.lock();
518 try {
519 this.defaultMaxPerRoute = max;
520 } finally {
521 this.lock.unlock();
522 }
523 }
524
525 public int getDefaultMaxPerRoute() {
526 this.lock.lock();
527 try {
528 return this.defaultMaxPerRoute;
529 } finally {
530 this.lock.unlock();
531 }
532 }
533
534 public void setMaxPerRoute(final T route, final int max) {
535 Args.notNull(route, "Route");
536 Args.positive(max, "Max value");
537 this.lock.lock();
538 try {
539 this.maxPerRoute.put(route, max);
540 } finally {
541 this.lock.unlock();
542 }
543 }
544
545 public int getMaxPerRoute(final T route) {
546 Args.notNull(route, "Route");
547 this.lock.lock();
548 try {
549 return getMax(route);
550 } finally {
551 this.lock.unlock();
552 }
553 }
554
555 public PoolStats getTotalStats() {
556 this.lock.lock();
557 try {
558 return new PoolStats(
559 this.leased.size(),
560 this.pending.size(),
561 this.available.size(),
562 this.maxTotal);
563 } finally {
564 this.lock.unlock();
565 }
566 }
567
568 public PoolStats getStats(final T route) {
569 Args.notNull(route, "Route");
570 this.lock.lock();
571 try {
572 final RouteSpecificPool<T, C, E> pool = getPool(route);
573 return new PoolStats(
574 pool.getLeasedCount(),
575 pool.getPendingCount(),
576 pool.getAvailableCount(),
577 getMax(route));
578 } finally {
579 this.lock.unlock();
580 }
581 }
582
583 public void closeIdle(final long idletime, final TimeUnit tunit) {
584 Args.notNull(tunit, "Time unit");
585 long time = tunit.toMillis(idletime);
586 if (time < 0) {
587 time = 0;
588 }
589 final long deadline = System.currentTimeMillis() - time;
590 this.lock.lock();
591 try {
592 final Iterator<E> it = this.available.iterator();
593 while (it.hasNext()) {
594 final E entry = it.next();
595 if (entry.getUpdated() <= deadline) {
596 entry.close();
597 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
598 pool.remove(entry);
599 it.remove();
600 }
601 }
602 processPendingRequests();
603 } finally {
604 this.lock.unlock();
605 }
606 }
607
608 public void closeExpired() {
609 final long now = System.currentTimeMillis();
610 this.lock.lock();
611 try {
612 final Iterator<E> it = this.available.iterator();
613 while (it.hasNext()) {
614 final E entry = it.next();
615 if (entry.isExpired(now)) {
616 entry.close();
617 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
618 pool.remove(entry);
619 it.remove();
620 }
621 }
622 processPendingRequests();
623 } finally {
624 this.lock.unlock();
625 }
626 }
627
628 @Override
629 public String toString() {
630 final StringBuilder buffer = new StringBuilder();
631 buffer.append("[leased: ");
632 buffer.append(this.leased);
633 buffer.append("][available: ");
634 buffer.append(this.available);
635 buffer.append("][pending: ");
636 buffer.append(this.pending);
637 buffer.append("]");
638 return buffer.toString();
639 }
640
641 class InternalSessionRequestCallback implements SessionRequestCallback {
642
643 public void completed(final SessionRequest request) {
644 requestCompleted(request);
645 }
646
647 public void cancelled(final SessionRequest request) {
648 requestCancelled(request);
649 }
650
651 public void failed(final SessionRequest request) {
652 requestFailed(request);
653 }
654
655 public void timeout(final SessionRequest request) {
656 requestTimeout(request);
657 }
658
659 }
660
661 }