View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.Future;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.locks.Lock;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.annotation.ThreadingBehavior;
49  import org.apache.http.concurrent.BasicFuture;
50  import org.apache.http.concurrent.FutureCallback;
51  import org.apache.http.nio.reactor.ConnectingIOReactor;
52  import org.apache.http.nio.reactor.IOReactorStatus;
53  import org.apache.http.nio.reactor.IOSession;
54  import org.apache.http.nio.reactor.SessionRequest;
55  import org.apache.http.nio.reactor.SessionRequestCallback;
56  import org.apache.http.pool.ConnPool;
57  import org.apache.http.pool.ConnPoolControl;
58  import org.apache.http.pool.PoolEntry;
59  import org.apache.http.pool.PoolEntryCallback;
60  import org.apache.http.pool.PoolStats;
61  import org.apache.http.util.Args;
62  import org.apache.http.util.Asserts;
63  import org.apache.http.util.LangUtils;
64  
65  /**
66   * Abstract non-blocking connection pool.
67   *
68   * @param <T> route
69   * @param <C> connection object
70   * @param <E> pool entry
71   *
72   * @since 4.2
73   */
74  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
75  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
76                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
77  
78      private final ConnectingIOReactor ioreactor;
79      private final NIOConnFactory<T, C> connFactory;
80      private final SocketAddressResolver<T> addressResolver;
81      private final SessionRequestCallback sessionRequestCallback;
82      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
83      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
84      private final Set<SessionRequest> pending;
85      private final Set<E> leased;
86      private final LinkedList<E> available;
87      private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
88      private final Map<T, Integer> maxPerRoute;
89      private final Lock lock;
90      private final AtomicBoolean isShutDown;
91  
92      private volatile int defaultMaxPerRoute;
93      private volatile int maxTotal;
94  
95      /**
96       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
97       *   NIOConnFactory, SocketAddressResolver, int, int)}
98       */
99      @Deprecated
100     public AbstractNIOConnPool(
101             final ConnectingIOReactor ioreactor,
102             final NIOConnFactory<T, C> connFactory,
103             final int defaultMaxPerRoute,
104             final int maxTotal) {
105         super();
106         Args.notNull(ioreactor, "I/O reactor");
107         Args.notNull(connFactory, "Connection factory");
108         Args.positive(defaultMaxPerRoute, "Max per route value");
109         Args.positive(maxTotal, "Max total value");
110         this.ioreactor = ioreactor;
111         this.connFactory = connFactory;
112         this.addressResolver = new SocketAddressResolver<T>() {
113 
114             @Override
115             public SocketAddress resolveLocalAddress(final T route) throws IOException {
116                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
117             }
118 
119             @Override
120             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
121                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
122             }
123 
124         };
125         this.sessionRequestCallback = new InternalSessionRequestCallback();
126         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
127         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
128         this.pending = new HashSet<SessionRequest>();
129         this.leased = new HashSet<E>();
130         this.available = new LinkedList<E>();
131         this.maxPerRoute = new HashMap<T, Integer>();
132         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
133         this.lock = new ReentrantLock();
134         this.isShutDown = new AtomicBoolean(false);
135         this.defaultMaxPerRoute = defaultMaxPerRoute;
136         this.maxTotal = maxTotal;
137     }
138 
139     /**
140      * @since 4.3
141      */
142     public AbstractNIOConnPool(
143             final ConnectingIOReactor ioreactor,
144             final NIOConnFactory<T, C> connFactory,
145             final SocketAddressResolver<T> addressResolver,
146             final int defaultMaxPerRoute,
147             final int maxTotal) {
148         super();
149         Args.notNull(ioreactor, "I/O reactor");
150         Args.notNull(connFactory, "Connection factory");
151         Args.notNull(addressResolver, "Address resolver");
152         Args.positive(defaultMaxPerRoute, "Max per route value");
153         Args.positive(maxTotal, "Max total value");
154         this.ioreactor = ioreactor;
155         this.connFactory = connFactory;
156         this.addressResolver = addressResolver;
157         this.sessionRequestCallback = new InternalSessionRequestCallback();
158         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
159         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
160         this.pending = new HashSet<SessionRequest>();
161         this.leased = new HashSet<E>();
162         this.available = new LinkedList<E>();
163         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
164         this.maxPerRoute = new HashMap<T, Integer>();
165         this.lock = new ReentrantLock();
166         this.isShutDown = new AtomicBoolean(false);
167         this.defaultMaxPerRoute = defaultMaxPerRoute;
168         this.maxTotal = maxTotal;
169     }
170 
171     /**
172      * @deprecated (4.3) use {@link SocketAddressResolver}
173      */
174     @Deprecated
175     protected SocketAddress resolveRemoteAddress(final T route) {
176         return null;
177     }
178 
179     /**
180      * @deprecated (4.3) use {@link SocketAddressResolver}
181      */
182     @Deprecated
183     protected SocketAddress resolveLocalAddress(final T route) {
184         return null;
185     }
186 
187     protected abstract E createEntry(T route, C conn);
188 
189     /**
190      * @since 4.3
191      */
192     protected void onLease(final E entry) {
193     }
194 
195     /**
196      * @since 4.3
197      */
198     protected void onRelease(final E entry) {
199     }
200 
201     /**
202      * @since 4.4
203      */
204     protected void onReuse(final E entry) {
205     }
206 
207     public boolean isShutdown() {
208         return this.isShutDown.get();
209     }
210 
211     public void shutdown(final long waitMs) throws IOException {
212         if (this.isShutDown.compareAndSet(false, true)) {
213             fireCallbacks();
214             this.lock.lock();
215             try {
216                 for (final SessionRequest sessionRequest: this.pending) {
217                     sessionRequest.cancel();
218                 }
219                 for (final E entry: this.available) {
220                     entry.close();
221                 }
222                 for (final E entry: this.leased) {
223                     entry.close();
224                 }
225                 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
226                     pool.shutdown();
227                 }
228                 this.routeToPool.clear();
229                 this.leased.clear();
230                 this.pending.clear();
231                 this.available.clear();
232                 this.leasingRequests.clear();
233                 this.ioreactor.shutdown(waitMs);
234             } finally {
235                 this.lock.unlock();
236             }
237         }
238     }
239 
240     private RouteSpecificPool<T, C, E> getPool(final T route) {
241         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
242         if (pool == null) {
243             pool = new RouteSpecificPool<T, C, E>(route) {
244 
245                 @Override
246                 protected E createEntry(final T route, final C conn) {
247                     return AbstractNIOConnPool.this.createEntry(route, conn);
248                 }
249 
250             };
251             this.routeToPool.put(route, pool);
252         }
253         return pool;
254     }
255 
256     public Future<E> lease(
257             final T route, final Object state,
258             final long connectTimeout, final TimeUnit tunit,
259             final FutureCallback<E> callback) {
260         return this.lease(route, state, connectTimeout, connectTimeout, tunit, callback);
261     }
262 
263     /**
264      * @since 4.3
265      */
266     public Future<E> lease(
267             final T route, final Object state,
268             final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
269             final FutureCallback<E> callback) {
270         Args.notNull(route, "Route");
271         Args.notNull(tunit, "Time unit");
272         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
273         final BasicFuture<E> future = new BasicFuture<E>(callback);
274         final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state,
275                 connectTimeout >= 0 ? tunit.toMillis(connectTimeout) : -1,
276                 leaseTimeout > 0 ? tunit.toMillis(leaseTimeout) : 0,
277                 future);
278         this.lock.lock();
279         try {
280             final boolean completed = processPendingRequest(leaseRequest);
281             if (!leaseRequest.isDone() && !completed) {
282                 this.leasingRequests.add(leaseRequest);
283             }
284             if (leaseRequest.isDone()) {
285                 this.completedRequests.add(leaseRequest);
286             }
287         } finally {
288             this.lock.unlock();
289         }
290         fireCallbacks();
291         return new Future<E>() {
292 
293             @Override
294             public E get() throws InterruptedException, ExecutionException {
295                 return future.get();
296             }
297 
298             @Override
299             public E get(
300                     final long timeout,
301                     final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
302                 return future.get(timeout, unit);
303             }
304 
305             @Override
306             public boolean cancel(final boolean mayInterruptIfRunning) {
307                 try {
308                     leaseRequest.cancel();
309                 } finally {
310                     return future.cancel(mayInterruptIfRunning);
311                 }
312             }
313 
314             @Override
315             public boolean isCancelled() {
316                 return future.isCancelled();
317             }
318 
319             @Override
320             public boolean isDone() {
321                 return future.isDone();
322             }
323 
324         };
325     }
326 
327     @Override
328     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
329         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
330     }
331 
332     public Future<E> lease(final T route, final Object state) {
333         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
334     }
335 
336     @Override
337     public void release(final E entry, final boolean reusable) {
338         if (entry == null) {
339             return;
340         }
341         if (this.isShutDown.get()) {
342             return;
343         }
344         this.lock.lock();
345         try {
346             if (this.leased.remove(entry)) {
347                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
348                 pool.free(entry, reusable);
349                 if (reusable) {
350                     this.available.addFirst(entry);
351                     onRelease(entry);
352                 } else {
353                     entry.close();
354                 }
355                 processNextPendingRequest();
356             }
357         } finally {
358             this.lock.unlock();
359         }
360         fireCallbacks();
361     }
362 
363     private void processPendingRequests() {
364         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
365         while (it.hasNext()) {
366             final LeaseRequest<T, C, E> request = it.next();
367             final BasicFuture<E> future = request.getFuture();
368             if (future.isCancelled()) {
369                 it.remove();
370                 continue;
371             }
372             final boolean completed = processPendingRequest(request);
373             if (request.isDone() || completed) {
374                 it.remove();
375             }
376             if (request.isDone()) {
377                 this.completedRequests.add(request);
378             }
379         }
380     }
381 
382     private void processNextPendingRequest() {
383         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
384         while (it.hasNext()) {
385             final LeaseRequest<T, C, E> request = it.next();
386             final BasicFuture<E> future = request.getFuture();
387             if (future.isCancelled()) {
388                 it.remove();
389                 continue;
390             }
391             final boolean completed = processPendingRequest(request);
392             if (request.isDone() || completed) {
393                 it.remove();
394             }
395             if (request.isDone()) {
396                 this.completedRequests.add(request);
397             }
398             if (completed) {
399                 return;
400             }
401         }
402     }
403 
404     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
405         final T route = request.getRoute();
406         final Object state = request.getState();
407         final long deadline = request.getDeadline();
408 
409         final long now = System.currentTimeMillis();
410         if (now > deadline) {
411             request.failed(new TimeoutException());
412             return false;
413         }
414 
415         final RouteSpecificPool<T, C, E> pool = getPool(route);
416         E entry;
417         for (;;) {
418             entry = pool.getFree(state);
419             if (entry == null) {
420                 break;
421             }
422             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
423                 entry.close();
424                 this.available.remove(entry);
425                 pool.free(entry, false);
426             } else {
427                 break;
428             }
429         }
430         if (entry != null) {
431             this.available.remove(entry);
432             this.leased.add(entry);
433             request.completed(entry);
434             onReuse(entry);
435             onLease(entry);
436             return true;
437         }
438 
439         // New connection is needed
440         final int maxPerRoute = getMax(route);
441         // Shrink the pool prior to allocating a new connection
442         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
443         if (excess > 0) {
444             for (int i = 0; i < excess; i++) {
445                 final E lastUsed = pool.getLastUsed();
446                 if (lastUsed == null) {
447                     break;
448                 }
449                 lastUsed.close();
450                 this.available.remove(lastUsed);
451                 pool.remove(lastUsed);
452             }
453         }
454 
455         if (pool.getAllocatedCount() < maxPerRoute) {
456             final int totalUsed = this.pending.size() + this.leased.size();
457             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
458             if (freeCapacity == 0) {
459                 return false;
460             }
461             final int totalAvailable = this.available.size();
462             if (totalAvailable > freeCapacity - 1) {
463                 if (!this.available.isEmpty()) {
464                     final E lastUsed = this.available.removeLast();
465                     lastUsed.close();
466                     final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
467                     otherpool.remove(lastUsed);
468                 }
469             }
470 
471             final SocketAddress localAddress;
472             final SocketAddress remoteAddress;
473             try {
474                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
475                 localAddress = this.addressResolver.resolveLocalAddress(route);
476             } catch (final IOException ex) {
477                 request.failed(ex);
478                 return false;
479             }
480 
481             final SessionRequest sessionRequest = this.ioreactor.connect(
482                     remoteAddress, localAddress, route, this.sessionRequestCallback);
483             request.attachSessionRequest(sessionRequest);
484             final long connectTimeout = request.getConnectTimeout();
485             if (connectTimeout >= 0) {
486                 sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE);
487             }
488             this.pending.add(sessionRequest);
489             pool.addPending(sessionRequest, request.getFuture());
490             return true;
491         } else {
492             return false;
493         }
494     }
495 
496     private void fireCallbacks() {
497         LeaseRequest<T, C, E> request;
498         while ((request = this.completedRequests.poll()) != null) {
499             final BasicFuture<E> future = request.getFuture();
500             final Exception ex = request.getException();
501             final E result = request.getResult();
502             boolean successfullyCompleted = false;
503             if (ex != null) {
504                 future.failed(ex);
505             } else if (result != null) {
506                 if (future.completed(result)) {
507                     successfullyCompleted = true;
508                 }
509             } else {
510                 future.cancel();
511             }
512             if (!successfullyCompleted) {
513                 release(result, true);
514             }
515         }
516     }
517 
518     public void validatePendingRequests() {
519         this.lock.lock();
520         try {
521             final long now = System.currentTimeMillis();
522             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
523             while (it.hasNext()) {
524                 final LeaseRequest<T, C, E> request = it.next();
525                 final BasicFuture<E> future = request.getFuture();
526                 if (future.isCancelled() && !request.isDone()) {
527                     it.remove();
528                 } else {
529                     final long deadline = request.getDeadline();
530                     if (now > deadline) {
531                         request.failed(new TimeoutException());
532                     }
533                     if (request.isDone()) {
534                         it.remove();
535                         this.completedRequests.add(request);
536                     }
537                 }
538             }
539         } finally {
540             this.lock.unlock();
541         }
542         fireCallbacks();
543     }
544 
545     protected void requestCompleted(final SessionRequest request) {
546         if (this.isShutDown.get()) {
547             return;
548         }
549         @SuppressWarnings("unchecked")
550         final
551         T route = (T) request.getAttachment();
552         this.lock.lock();
553         try {
554             this.pending.remove(request);
555             final RouteSpecificPool<T, C, E> pool = getPool(route);
556             final IOSession session = request.getSession();
557             try {
558                 final C conn = this.connFactory.create(route, session);
559                 final E entry = pool.createEntry(request, conn);
560                 if (pool.completed(request, entry)) {
561                     this.leased.add(entry);
562                     onLease(entry);
563                 } else {
564                     this.available.add(entry);
565                     if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
566                         processNextPendingRequest();
567                     }
568                 }
569             } catch (final IOException ex) {
570                 pool.failed(request, ex);
571             }
572         } finally {
573             this.lock.unlock();
574         }
575         fireCallbacks();
576     }
577 
578     protected void requestCancelled(final SessionRequest request) {
579         if (this.isShutDown.get()) {
580             return;
581         }
582         @SuppressWarnings("unchecked")
583         final
584         T route = (T) request.getAttachment();
585         this.lock.lock();
586         try {
587             this.pending.remove(request);
588             final RouteSpecificPool<T, C, E> pool = getPool(route);
589             pool.cancelled(request);
590             if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
591                 processNextPendingRequest();
592             }
593         } finally {
594             this.lock.unlock();
595         }
596         fireCallbacks();
597     }
598 
599     protected void requestFailed(final SessionRequest request) {
600         if (this.isShutDown.get()) {
601             return;
602         }
603         @SuppressWarnings("unchecked")
604         final
605         T route = (T) request.getAttachment();
606         this.lock.lock();
607         try {
608             this.pending.remove(request);
609             final RouteSpecificPool<T, C, E> pool = getPool(route);
610             pool.failed(request, request.getException());
611             processNextPendingRequest();
612         } finally {
613             this.lock.unlock();
614         }
615         fireCallbacks();
616     }
617 
618     protected void requestTimeout(final SessionRequest request) {
619         if (this.isShutDown.get()) {
620             return;
621         }
622         @SuppressWarnings("unchecked")
623         final
624         T route = (T) request.getAttachment();
625         this.lock.lock();
626         try {
627             this.pending.remove(request);
628             final RouteSpecificPool<T, C, E> pool = getPool(route);
629             pool.timeout(request);
630             processNextPendingRequest();
631         } finally {
632             this.lock.unlock();
633         }
634         fireCallbacks();
635     }
636 
637     private int getMax(final T route) {
638         final Integer v = this.maxPerRoute.get(route);
639         if (v != null) {
640             return v.intValue();
641         } else {
642             return this.defaultMaxPerRoute;
643         }
644     }
645 
646     @Override
647     public void setMaxTotal(final int max) {
648         Args.positive(max, "Max value");
649         this.lock.lock();
650         try {
651             this.maxTotal = max;
652         } finally {
653             this.lock.unlock();
654         }
655     }
656 
657     @Override
658     public int getMaxTotal() {
659         this.lock.lock();
660         try {
661             return this.maxTotal;
662         } finally {
663             this.lock.unlock();
664         }
665     }
666 
667     @Override
668     public void setDefaultMaxPerRoute(final int max) {
669         Args.positive(max, "Max value");
670         this.lock.lock();
671         try {
672             this.defaultMaxPerRoute = max;
673         } finally {
674             this.lock.unlock();
675         }
676     }
677 
678     @Override
679     public int getDefaultMaxPerRoute() {
680         this.lock.lock();
681         try {
682             return this.defaultMaxPerRoute;
683         } finally {
684             this.lock.unlock();
685         }
686     }
687 
688     @Override
689     public void setMaxPerRoute(final T route, final int max) {
690         Args.notNull(route, "Route");
691         Args.positive(max, "Max value");
692         this.lock.lock();
693         try {
694             this.maxPerRoute.put(route, Integer.valueOf(max));
695         } finally {
696             this.lock.unlock();
697         }
698     }
699 
700     @Override
701     public int getMaxPerRoute(final T route) {
702         Args.notNull(route, "Route");
703         this.lock.lock();
704         try {
705             return getMax(route);
706         } finally {
707             this.lock.unlock();
708         }
709     }
710 
711     @Override
712     public PoolStats getTotalStats() {
713         this.lock.lock();
714         try {
715             return new PoolStats(
716                     this.leased.size(),
717                     this.pending.size(),
718                     this.available.size(),
719                     this.maxTotal);
720         } finally {
721             this.lock.unlock();
722         }
723     }
724 
725     @Override
726     public PoolStats getStats(final T route) {
727         Args.notNull(route, "Route");
728         this.lock.lock();
729         try {
730             final RouteSpecificPool<T, C, E> pool = getPool(route);
731             int pendingCount = 0;
732             for (final LeaseRequest<T, C, E> request: leasingRequests) {
733                 if (LangUtils.equals(route, request.getRoute())) {
734                     pendingCount++;
735                 }
736             }
737             return new PoolStats(
738                     pool.getLeasedCount(),
739                     pendingCount + pool.getPendingCount(),
740                     pool.getAvailableCount(),
741                     getMax(route));
742         } finally {
743             this.lock.unlock();
744         }
745     }
746 
747     /**
748      * Returns snapshot of all knows routes
749      *
750      * @since 4.4
751      */
752     public Set<T> getRoutes() {
753         this.lock.lock();
754         try {
755             return new HashSet<T>(routeToPool.keySet());
756         } finally {
757             this.lock.unlock();
758         }
759     }
760 
761     /**
762      * Enumerates all available connections.
763      *
764      * @since 4.3
765      */
766     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
767         this.lock.lock();
768         try {
769             final Iterator<E> it = this.available.iterator();
770             while (it.hasNext()) {
771                 final E entry = it.next();
772                 callback.process(entry);
773                 if (entry.isClosed()) {
774                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
775                     pool.remove(entry);
776                     it.remove();
777                 }
778             }
779             processPendingRequests();
780             purgePoolMap();
781         } finally {
782             this.lock.unlock();
783         }
784     }
785 
786     /**
787      * Enumerates all leased connections.
788      *
789      * @since 4.3
790      */
791     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
792         this.lock.lock();
793         try {
794             final Iterator<E> it = this.leased.iterator();
795             while (it.hasNext()) {
796                 final E entry = it.next();
797                 callback.process(entry);
798             }
799             processPendingRequests();
800         } finally {
801             this.lock.unlock();
802         }
803     }
804 
805     /**
806      * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
807      *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
808      *
809      * @deprecated (4.3.2)
810      */
811     @Deprecated
812     protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
813         while (it.hasNext()) {
814             final E entry = it.next();
815             callback.process(entry);
816         }
817         processPendingRequests();
818     }
819 
820     private void purgePoolMap() {
821         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
822         while (it.hasNext()) {
823             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
824             final RouteSpecificPool<T, C, E> pool = entry.getValue();
825             if (pool.getAllocatedCount() == 0) {
826                 it.remove();
827             }
828         }
829     }
830 
831     public void closeIdle(final long idletime, final TimeUnit tunit) {
832         Args.notNull(tunit, "Time unit");
833         long time = tunit.toMillis(idletime);
834         if (time < 0) {
835             time = 0;
836         }
837         final long deadline = System.currentTimeMillis() - time;
838         enumAvailable(new PoolEntryCallback<T, C>() {
839 
840             @Override
841             public void process(final PoolEntry<T, C> entry) {
842                 if (entry.getUpdated() <= deadline) {
843                     entry.close();
844                 }
845             }
846 
847         });
848     }
849 
850     public void closeExpired() {
851         final long now = System.currentTimeMillis();
852         enumAvailable(new PoolEntryCallback<T, C>() {
853 
854             @Override
855             public void process(final PoolEntry<T, C> entry) {
856                 if (entry.isExpired(now)) {
857                     entry.close();
858                 }
859             }
860 
861         });
862     }
863 
864     @Override
865     public String toString() {
866         final StringBuilder buffer = new StringBuilder();
867         buffer.append("[leased: ");
868         buffer.append(this.leased);
869         buffer.append("][available: ");
870         buffer.append(this.available);
871         buffer.append("][pending: ");
872         buffer.append(this.pending);
873         buffer.append("]");
874         return buffer.toString();
875     }
876 
877     class InternalSessionRequestCallback implements SessionRequestCallback {
878 
879         @Override
880         public void completed(final SessionRequest request) {
881             requestCompleted(request);
882         }
883 
884         @Override
885         public void cancelled(final SessionRequest request) {
886             requestCancelled(request);
887         }
888 
889         @Override
890         public void failed(final SessionRequest request) {
891             requestFailed(request);
892         }
893 
894         @Override
895         public void timeout(final SessionRequest request) {
896             requestTimeout(request);
897         }
898 
899     }
900 
901 }