View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.http.annotation.Contract;
47  import org.apache.http.annotation.ThreadingBehavior;
48  import org.apache.http.concurrent.BasicFuture;
49  import org.apache.http.concurrent.FutureCallback;
50  import org.apache.http.nio.reactor.ConnectingIOReactor;
51  import org.apache.http.nio.reactor.IOReactorStatus;
52  import org.apache.http.nio.reactor.IOSession;
53  import org.apache.http.nio.reactor.SessionRequest;
54  import org.apache.http.nio.reactor.SessionRequestCallback;
55  import org.apache.http.pool.ConnPool;
56  import org.apache.http.pool.ConnPoolControl;
57  import org.apache.http.pool.PoolEntry;
58  import org.apache.http.pool.PoolEntryCallback;
59  import org.apache.http.pool.PoolStats;
60  import org.apache.http.util.Args;
61  import org.apache.http.util.Asserts;
62  import org.apache.http.util.LangUtils;
63  
64  /**
65   * Abstract non-blocking connection pool.
66   *
67   * @param <T> route
68   * @param <C> connection object
69   * @param <E> pool entry
70   *
71   * @since 4.2
72   */
73  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
74  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
75                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
76  
77      private final ConnectingIOReactor ioreactor;
78      private final NIOConnFactory<T, C> connFactory;
79      private final SocketAddressResolver<T> addressResolver;
80      private final SessionRequestCallback sessionRequestCallback;
81      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
82      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
83      private final Set<SessionRequest> pending;
84      private final Set<E> leased;
85      private final LinkedList<E> available;
86      private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
87      private final Map<T, Integer> maxPerRoute;
88      private final Lock lock;
89      private final AtomicBoolean isShutDown;
90  
91      private volatile int defaultMaxPerRoute;
92      private volatile int maxTotal;
93  
94      /**
95       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
96       *   NIOConnFactory, SocketAddressResolver, int, int)}
97       */
98      @Deprecated
99      public AbstractNIOConnPool(
100             final ConnectingIOReactor ioreactor,
101             final NIOConnFactory<T, C> connFactory,
102             final int defaultMaxPerRoute,
103             final int maxTotal) {
104         super();
105         Args.notNull(ioreactor, "I/O reactor");
106         Args.notNull(connFactory, "Connection factory");
107         Args.positive(defaultMaxPerRoute, "Max per route value");
108         Args.positive(maxTotal, "Max total value");
109         this.ioreactor = ioreactor;
110         this.connFactory = connFactory;
111         this.addressResolver = new SocketAddressResolver<T>() {
112 
113             @Override
114             public SocketAddress resolveLocalAddress(final T route) throws IOException {
115                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
116             }
117 
118             @Override
119             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
120                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
121             }
122 
123         };
124         this.sessionRequestCallback = new InternalSessionRequestCallback();
125         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
126         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
127         this.pending = new HashSet<SessionRequest>();
128         this.leased = new HashSet<E>();
129         this.available = new LinkedList<E>();
130         this.maxPerRoute = new HashMap<T, Integer>();
131         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
132         this.lock = new ReentrantLock();
133         this.isShutDown = new AtomicBoolean(false);
134         this.defaultMaxPerRoute = defaultMaxPerRoute;
135         this.maxTotal = maxTotal;
136     }
137 
138     /**
139      * @since 4.3
140      */
141     public AbstractNIOConnPool(
142             final ConnectingIOReactor ioreactor,
143             final NIOConnFactory<T, C> connFactory,
144             final SocketAddressResolver<T> addressResolver,
145             final int defaultMaxPerRoute,
146             final int maxTotal) {
147         super();
148         Args.notNull(ioreactor, "I/O reactor");
149         Args.notNull(connFactory, "Connection factory");
150         Args.notNull(addressResolver, "Address resolver");
151         Args.positive(defaultMaxPerRoute, "Max per route value");
152         Args.positive(maxTotal, "Max total value");
153         this.ioreactor = ioreactor;
154         this.connFactory = connFactory;
155         this.addressResolver = addressResolver;
156         this.sessionRequestCallback = new InternalSessionRequestCallback();
157         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
158         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
159         this.pending = new HashSet<SessionRequest>();
160         this.leased = new HashSet<E>();
161         this.available = new LinkedList<E>();
162         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
163         this.maxPerRoute = new HashMap<T, Integer>();
164         this.lock = new ReentrantLock();
165         this.isShutDown = new AtomicBoolean(false);
166         this.defaultMaxPerRoute = defaultMaxPerRoute;
167         this.maxTotal = maxTotal;
168     }
169 
170     /**
171      * @deprecated (4.3) use {@link SocketAddressResolver}
172      */
173     @Deprecated
174     protected SocketAddress resolveRemoteAddress(final T route) {
175         return null;
176     }
177 
178     /**
179      * @deprecated (4.3) use {@link SocketAddressResolver}
180      */
181     @Deprecated
182     protected SocketAddress resolveLocalAddress(final T route) {
183         return null;
184     }
185 
186     protected abstract E createEntry(T route, C conn);
187 
188     /**
189      * @since 4.3
190      */
191     protected void onLease(final E entry) {
192     }
193 
194     /**
195      * @since 4.3
196      */
197     protected void onRelease(final E entry) {
198     }
199 
200     /**
201      * @since 4.4
202      */
203     protected void onReuse(final E entry) {
204     }
205 
206     public boolean isShutdown() {
207         return this.isShutDown.get();
208     }
209 
210     public void shutdown(final long waitMs) throws IOException {
211         if (this.isShutDown.compareAndSet(false, true)) {
212             fireCallbacks();
213             this.lock.lock();
214             try {
215                 for (final SessionRequest sessionRequest: this.pending) {
216                     sessionRequest.cancel();
217                 }
218                 for (final E entry: this.available) {
219                     entry.close();
220                 }
221                 for (final E entry: this.leased) {
222                     entry.close();
223                 }
224                 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
225                     pool.shutdown();
226                 }
227                 this.routeToPool.clear();
228                 this.leased.clear();
229                 this.pending.clear();
230                 this.available.clear();
231                 this.leasingRequests.clear();
232                 this.ioreactor.shutdown(waitMs);
233             } finally {
234                 this.lock.unlock();
235             }
236         }
237     }
238 
239     private RouteSpecificPool<T, C, E> getPool(final T route) {
240         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
241         if (pool == null) {
242             pool = new RouteSpecificPool<T, C, E>(route) {
243 
244                 @Override
245                 protected E createEntry(final T route, final C conn) {
246                     return AbstractNIOConnPool.this.createEntry(route, conn);
247                 }
248 
249             };
250             this.routeToPool.put(route, pool);
251         }
252         return pool;
253     }
254 
255     public Future<E> lease(
256             final T route, final Object state,
257             final long connectTimeout, final TimeUnit tunit,
258             final FutureCallback<E> callback) {
259         return this.lease(route, state, connectTimeout, connectTimeout, tunit, callback);
260     }
261 
262     /**
263      * @since 4.3
264      */
265     public Future<E> lease(
266             final T route, final Object state,
267             final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
268             final FutureCallback<E> callback) {
269         Args.notNull(route, "Route");
270         Args.notNull(tunit, "Time unit");
271         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
272         final BasicFuture<E> future = new BasicFuture<E>(callback);
273         this.lock.lock();
274         try {
275             final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
276             final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future);
277             final boolean completed = processPendingRequest(request);
278             if (!request.isDone() && !completed) {
279                 this.leasingRequests.add(request);
280             }
281             if (request.isDone()) {
282                 this.completedRequests.add(request);
283             }
284         } finally {
285             this.lock.unlock();
286         }
287         fireCallbacks();
288         return future;
289     }
290 
291     @Override
292     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
293         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
294     }
295 
296     public Future<E> lease(final T route, final Object state) {
297         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
298     }
299 
300     @Override
301     public void release(final E entry, final boolean reusable) {
302         if (entry == null) {
303             return;
304         }
305         if (this.isShutDown.get()) {
306             return;
307         }
308         this.lock.lock();
309         try {
310             if (this.leased.remove(entry)) {
311                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
312                 pool.free(entry, reusable);
313                 if (reusable) {
314                     this.available.addFirst(entry);
315                     onRelease(entry);
316                 } else {
317                     entry.close();
318                 }
319                 processNextPendingRequest();
320             }
321         } finally {
322             this.lock.unlock();
323         }
324         fireCallbacks();
325     }
326 
327     private void processPendingRequests() {
328         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
329         while (it.hasNext()) {
330             final LeaseRequest<T, C, E> request = it.next();
331             final BasicFuture<E> future = request.getFuture();
332             if (future.isCancelled()) {
333                 it.remove();
334                 continue;
335             }
336             final boolean completed = processPendingRequest(request);
337             if (request.isDone() || completed) {
338                 it.remove();
339             }
340             if (request.isDone()) {
341                 this.completedRequests.add(request);
342             }
343         }
344     }
345 
346     private void processNextPendingRequest() {
347         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
348         while (it.hasNext()) {
349             final LeaseRequest<T, C, E> request = it.next();
350             final BasicFuture<E> future = request.getFuture();
351             if (future.isCancelled()) {
352                 it.remove();
353                 continue;
354             }
355             final boolean completed = processPendingRequest(request);
356             if (request.isDone() || completed) {
357                 it.remove();
358             }
359             if (request.isDone()) {
360                 this.completedRequests.add(request);
361             }
362             if (completed) {
363                 return;
364             }
365         }
366     }
367 
368     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
369         final T route = request.getRoute();
370         final Object state = request.getState();
371         final long deadline = request.getDeadline();
372 
373         final long now = System.currentTimeMillis();
374         if (now > deadline) {
375             request.failed(new TimeoutException());
376             return false;
377         }
378 
379         final RouteSpecificPool<T, C, E> pool = getPool(route);
380         E entry;
381         for (;;) {
382             entry = pool.getFree(state);
383             if (entry == null) {
384                 break;
385             }
386             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
387                 entry.close();
388                 this.available.remove(entry);
389                 pool.free(entry, false);
390             } else {
391                 break;
392             }
393         }
394         if (entry != null) {
395             this.available.remove(entry);
396             this.leased.add(entry);
397             request.completed(entry);
398             onReuse(entry);
399             onLease(entry);
400             return true;
401         }
402 
403         // New connection is needed
404         final int maxPerRoute = getMax(route);
405         // Shrink the pool prior to allocating a new connection
406         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
407         if (excess > 0) {
408             for (int i = 0; i < excess; i++) {
409                 final E lastUsed = pool.getLastUsed();
410                 if (lastUsed == null) {
411                     break;
412                 }
413                 lastUsed.close();
414                 this.available.remove(lastUsed);
415                 pool.remove(lastUsed);
416             }
417         }
418 
419         if (pool.getAllocatedCount() < maxPerRoute) {
420             final int totalUsed = this.pending.size() + this.leased.size();
421             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
422             if (freeCapacity == 0) {
423                 return false;
424             }
425             final int totalAvailable = this.available.size();
426             if (totalAvailable > freeCapacity - 1) {
427                 if (!this.available.isEmpty()) {
428                     final E lastUsed = this.available.removeLast();
429                     lastUsed.close();
430                     final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
431                     otherpool.remove(lastUsed);
432                 }
433             }
434 
435             final SocketAddress localAddress;
436             final SocketAddress remoteAddress;
437             try {
438                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
439                 localAddress = this.addressResolver.resolveLocalAddress(route);
440             } catch (final IOException ex) {
441                 request.failed(ex);
442                 return false;
443             }
444 
445             final SessionRequest sessionRequest = this.ioreactor.connect(
446                     remoteAddress, localAddress, route, this.sessionRequestCallback);
447             final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
448                     (int) request.getConnectTimeout() : Integer.MAX_VALUE;
449             sessionRequest.setConnectTimeout(timout);
450             this.pending.add(sessionRequest);
451             pool.addPending(sessionRequest, request.getFuture());
452             return true;
453         } else {
454             return false;
455         }
456     }
457 
458     private void fireCallbacks() {
459         LeaseRequest<T, C, E> request;
460         while ((request = this.completedRequests.poll()) != null) {
461             final BasicFuture<E> future = request.getFuture();
462             final Exception ex = request.getException();
463             final E result = request.getResult();
464             boolean successfullyCompleted = false;
465             if (ex != null) {
466                 future.failed(ex);
467             } else if (result != null) {
468                 if (future.completed(result)) {
469                     successfullyCompleted = true;
470                 }
471             } else {
472                 future.cancel();
473             }
474             if (!successfullyCompleted) {
475                 release(result, true);
476             }
477         }
478     }
479 
480     public void validatePendingRequests() {
481         this.lock.lock();
482         try {
483             final long now = System.currentTimeMillis();
484             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
485             while (it.hasNext()) {
486                 final LeaseRequest<T, C, E> request = it.next();
487                 final long deadline = request.getDeadline();
488                 if (now > deadline) {
489                     it.remove();
490                     request.failed(new TimeoutException());
491                     this.completedRequests.add(request);
492                 }
493             }
494         } finally {
495             this.lock.unlock();
496         }
497         fireCallbacks();
498     }
499 
500     protected void requestCompleted(final SessionRequest request) {
501         if (this.isShutDown.get()) {
502             return;
503         }
504         @SuppressWarnings("unchecked")
505         final
506         T route = (T) request.getAttachment();
507         this.lock.lock();
508         try {
509             this.pending.remove(request);
510             final RouteSpecificPool<T, C, E> pool = getPool(route);
511             final IOSession session = request.getSession();
512             try {
513                 final C conn = this.connFactory.create(route, session);
514                 final E entry = pool.createEntry(request, conn);
515                 if (pool.completed(request, entry)) {
516                     this.leased.add(entry);
517                     onLease(entry);
518                 } else {
519                     this.available.add(entry);
520                     if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
521                         processNextPendingRequest();
522                     }
523                 }
524             } catch (final IOException ex) {
525                 pool.failed(request, ex);
526             }
527         } finally {
528             this.lock.unlock();
529         }
530         fireCallbacks();
531     }
532 
533     protected void requestCancelled(final SessionRequest request) {
534         if (this.isShutDown.get()) {
535             return;
536         }
537         @SuppressWarnings("unchecked")
538         final
539         T route = (T) request.getAttachment();
540         this.lock.lock();
541         try {
542             this.pending.remove(request);
543             final RouteSpecificPool<T, C, E> pool = getPool(route);
544             pool.cancelled(request);
545             if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
546                 processNextPendingRequest();
547             }
548         } finally {
549             this.lock.unlock();
550         }
551         fireCallbacks();
552     }
553 
554     protected void requestFailed(final SessionRequest request) {
555         if (this.isShutDown.get()) {
556             return;
557         }
558         @SuppressWarnings("unchecked")
559         final
560         T route = (T) request.getAttachment();
561         this.lock.lock();
562         try {
563             this.pending.remove(request);
564             final RouteSpecificPool<T, C, E> pool = getPool(route);
565             pool.failed(request, request.getException());
566             processNextPendingRequest();
567         } finally {
568             this.lock.unlock();
569         }
570         fireCallbacks();
571     }
572 
573     protected void requestTimeout(final SessionRequest request) {
574         if (this.isShutDown.get()) {
575             return;
576         }
577         @SuppressWarnings("unchecked")
578         final
579         T route = (T) request.getAttachment();
580         this.lock.lock();
581         try {
582             this.pending.remove(request);
583             final RouteSpecificPool<T, C, E> pool = getPool(route);
584             pool.timeout(request);
585             processNextPendingRequest();
586         } finally {
587             this.lock.unlock();
588         }
589         fireCallbacks();
590     }
591 
592     private int getMax(final T route) {
593         final Integer v = this.maxPerRoute.get(route);
594         if (v != null) {
595             return v.intValue();
596         } else {
597             return this.defaultMaxPerRoute;
598         }
599     }
600 
601     @Override
602     public void setMaxTotal(final int max) {
603         Args.positive(max, "Max value");
604         this.lock.lock();
605         try {
606             this.maxTotal = max;
607         } finally {
608             this.lock.unlock();
609         }
610     }
611 
612     @Override
613     public int getMaxTotal() {
614         this.lock.lock();
615         try {
616             return this.maxTotal;
617         } finally {
618             this.lock.unlock();
619         }
620     }
621 
622     @Override
623     public void setDefaultMaxPerRoute(final int max) {
624         Args.positive(max, "Max value");
625         this.lock.lock();
626         try {
627             this.defaultMaxPerRoute = max;
628         } finally {
629             this.lock.unlock();
630         }
631     }
632 
633     @Override
634     public int getDefaultMaxPerRoute() {
635         this.lock.lock();
636         try {
637             return this.defaultMaxPerRoute;
638         } finally {
639             this.lock.unlock();
640         }
641     }
642 
643     @Override
644     public void setMaxPerRoute(final T route, final int max) {
645         Args.notNull(route, "Route");
646         Args.positive(max, "Max value");
647         this.lock.lock();
648         try {
649             this.maxPerRoute.put(route, Integer.valueOf(max));
650         } finally {
651             this.lock.unlock();
652         }
653     }
654 
655     @Override
656     public int getMaxPerRoute(final T route) {
657         Args.notNull(route, "Route");
658         this.lock.lock();
659         try {
660             return getMax(route);
661         } finally {
662             this.lock.unlock();
663         }
664     }
665 
666     @Override
667     public PoolStats getTotalStats() {
668         this.lock.lock();
669         try {
670             return new PoolStats(
671                     this.leased.size(),
672                     this.pending.size(),
673                     this.available.size(),
674                     this.maxTotal);
675         } finally {
676             this.lock.unlock();
677         }
678     }
679 
680     @Override
681     public PoolStats getStats(final T route) {
682         Args.notNull(route, "Route");
683         this.lock.lock();
684         try {
685             final RouteSpecificPool<T, C, E> pool = getPool(route);
686             int pendingCount = 0;
687             for (LeaseRequest<T, C, E> request: leasingRequests) {
688                 if (LangUtils.equals(route, request.getRoute())) {
689                     pendingCount++;
690                 }
691             }
692             return new PoolStats(
693                     pool.getLeasedCount(),
694                     pendingCount + pool.getPendingCount(),
695                     pool.getAvailableCount(),
696                     getMax(route));
697         } finally {
698             this.lock.unlock();
699         }
700     }
701 
702     /**
703      * Returns snapshot of all knows routes
704      *
705      * @since 4.4
706      */
707     public Set<T> getRoutes() {
708         this.lock.lock();
709         try {
710             return new HashSet<T>(routeToPool.keySet());
711         } finally {
712             this.lock.unlock();
713         }
714     }
715 
716     /**
717      * Enumerates all available connections.
718      *
719      * @since 4.3
720      */
721     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
722         this.lock.lock();
723         try {
724             final Iterator<E> it = this.available.iterator();
725             while (it.hasNext()) {
726                 final E entry = it.next();
727                 callback.process(entry);
728                 if (entry.isClosed()) {
729                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
730                     pool.remove(entry);
731                     it.remove();
732                 }
733             }
734             processPendingRequests();
735             purgePoolMap();
736         } finally {
737             this.lock.unlock();
738         }
739     }
740 
741     /**
742      * Enumerates all leased connections.
743      *
744      * @since 4.3
745      */
746     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
747         this.lock.lock();
748         try {
749             final Iterator<E> it = this.leased.iterator();
750             while (it.hasNext()) {
751                 final E entry = it.next();
752                 callback.process(entry);
753             }
754             processPendingRequests();
755         } finally {
756             this.lock.unlock();
757         }
758     }
759 
760     /**
761      * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
762      *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
763      *
764      * @deprecated (4.3.2)
765      */
766     @Deprecated
767     protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
768         while (it.hasNext()) {
769             final E entry = it.next();
770             callback.process(entry);
771         }
772         processPendingRequests();
773     }
774 
775     private void purgePoolMap() {
776         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
777         while (it.hasNext()) {
778             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
779             final RouteSpecificPool<T, C, E> pool = entry.getValue();
780             if (pool.getAllocatedCount() == 0) {
781                 it.remove();
782             }
783         }
784     }
785 
786     public void closeIdle(final long idletime, final TimeUnit tunit) {
787         Args.notNull(tunit, "Time unit");
788         long time = tunit.toMillis(idletime);
789         if (time < 0) {
790             time = 0;
791         }
792         final long deadline = System.currentTimeMillis() - time;
793         enumAvailable(new PoolEntryCallback<T, C>() {
794 
795             @Override
796             public void process(final PoolEntry<T, C> entry) {
797                 if (entry.getUpdated() <= deadline) {
798                     entry.close();
799                 }
800             }
801 
802         });
803     }
804 
805     public void closeExpired() {
806         final long now = System.currentTimeMillis();
807         enumAvailable(new PoolEntryCallback<T, C>() {
808 
809             @Override
810             public void process(final PoolEntry<T, C> entry) {
811                 if (entry.isExpired(now)) {
812                     entry.close();
813                 }
814             }
815 
816         });
817     }
818 
819     @Override
820     public String toString() {
821         final StringBuilder buffer = new StringBuilder();
822         buffer.append("[leased: ");
823         buffer.append(this.leased);
824         buffer.append("][available: ");
825         buffer.append(this.available);
826         buffer.append("][pending: ");
827         buffer.append(this.pending);
828         buffer.append("]");
829         return buffer.toString();
830     }
831 
832     class InternalSessionRequestCallback implements SessionRequestCallback {
833 
834         @Override
835         public void completed(final SessionRequest request) {
836             requestCompleted(request);
837         }
838 
839         @Override
840         public void cancelled(final SessionRequest request) {
841             requestCancelled(request);
842         }
843 
844         @Override
845         public void failed(final SessionRequest request) {
846             requestFailed(request);
847         }
848 
849         @Override
850         public void timeout(final SessionRequest request) {
851             requestTimeout(request);
852         }
853 
854     }
855 
856 }