View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.http.annotation.ThreadingBehavior;
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.concurrent.BasicFuture;
49  import org.apache.http.concurrent.FutureCallback;
50  import org.apache.http.nio.reactor.ConnectingIOReactor;
51  import org.apache.http.nio.reactor.IOReactorStatus;
52  import org.apache.http.nio.reactor.IOSession;
53  import org.apache.http.nio.reactor.SessionRequest;
54  import org.apache.http.nio.reactor.SessionRequestCallback;
55  import org.apache.http.pool.ConnPool;
56  import org.apache.http.pool.ConnPoolControl;
57  import org.apache.http.pool.PoolEntry;
58  import org.apache.http.pool.PoolEntryCallback;
59  import org.apache.http.pool.PoolStats;
60  import org.apache.http.util.Args;
61  import org.apache.http.util.Asserts;
62  
63  /**
64   * Abstract non-blocking connection pool.
65   *
66   * @param <T> route
67   * @param <C> connection object
68   * @param <E> pool entry
69   *
70   * @since 4.2
71   */
72  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
73  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
74                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
75  
76      private final ConnectingIOReactor ioreactor;
77      private final NIOConnFactory<T, C> connFactory;
78      private final SocketAddressResolver<T> addressResolver;
79      private final SessionRequestCallback sessionRequestCallback;
80      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
81      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
82      private final Set<SessionRequest> pending;
83      private final Set<E> leased;
84      private final LinkedList<E> available;
85      private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
86      private final Map<T, Integer> maxPerRoute;
87      private final Lock lock;
88      private final AtomicBoolean isShutDown;
89  
90      private volatile int defaultMaxPerRoute;
91      private volatile int maxTotal;
92  
93      /**
94       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
95       *   NIOConnFactory, SocketAddressResolver, int, int)}
96       */
97      @Deprecated
98      public AbstractNIOConnPool(
99              final ConnectingIOReactor ioreactor,
100             final NIOConnFactory<T, C> connFactory,
101             final int defaultMaxPerRoute,
102             final int maxTotal) {
103         super();
104         Args.notNull(ioreactor, "I/O reactor");
105         Args.notNull(connFactory, "Connection factory");
106         Args.positive(defaultMaxPerRoute, "Max per route value");
107         Args.positive(maxTotal, "Max total value");
108         this.ioreactor = ioreactor;
109         this.connFactory = connFactory;
110         this.addressResolver = new SocketAddressResolver<T>() {
111 
112             @Override
113             public SocketAddress resolveLocalAddress(final T route) throws IOException {
114                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
115             }
116 
117             @Override
118             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
119                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
120             }
121 
122         };
123         this.sessionRequestCallback = new InternalSessionRequestCallback();
124         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
125         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
126         this.pending = new HashSet<SessionRequest>();
127         this.leased = new HashSet<E>();
128         this.available = new LinkedList<E>();
129         this.maxPerRoute = new HashMap<T, Integer>();
130         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
131         this.lock = new ReentrantLock();
132         this.isShutDown = new AtomicBoolean(false);
133         this.defaultMaxPerRoute = defaultMaxPerRoute;
134         this.maxTotal = maxTotal;
135     }
136 
137     /**
138      * @since 4.3
139      */
140     public AbstractNIOConnPool(
141             final ConnectingIOReactor ioreactor,
142             final NIOConnFactory<T, C> connFactory,
143             final SocketAddressResolver<T> addressResolver,
144             final int defaultMaxPerRoute,
145             final int maxTotal) {
146         super();
147         Args.notNull(ioreactor, "I/O reactor");
148         Args.notNull(connFactory, "Connection factory");
149         Args.notNull(addressResolver, "Address resolver");
150         Args.positive(defaultMaxPerRoute, "Max per route value");
151         Args.positive(maxTotal, "Max total value");
152         this.ioreactor = ioreactor;
153         this.connFactory = connFactory;
154         this.addressResolver = addressResolver;
155         this.sessionRequestCallback = new InternalSessionRequestCallback();
156         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
157         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
158         this.pending = new HashSet<SessionRequest>();
159         this.leased = new HashSet<E>();
160         this.available = new LinkedList<E>();
161         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
162         this.maxPerRoute = new HashMap<T, Integer>();
163         this.lock = new ReentrantLock();
164         this.isShutDown = new AtomicBoolean(false);
165         this.defaultMaxPerRoute = defaultMaxPerRoute;
166         this.maxTotal = maxTotal;
167     }
168 
169     /**
170      * @deprecated (4.3) use {@link SocketAddressResolver}
171      */
172     @Deprecated
173     protected SocketAddress resolveRemoteAddress(final T route) {
174         return null;
175     }
176 
177     /**
178      * @deprecated (4.3) use {@link SocketAddressResolver}
179      */
180     @Deprecated
181     protected SocketAddress resolveLocalAddress(final T route) {
182         return null;
183     }
184 
185     protected abstract E createEntry(T route, C conn);
186 
187     /**
188      * @since 4.3
189      */
190     protected void onLease(final E entry) {
191     }
192 
193     /**
194      * @since 4.3
195      */
196     protected void onRelease(final E entry) {
197     }
198 
199     /**
200      * @since 4.4
201      */
202     protected void onReuse(final E entry) {
203     }
204 
205     public boolean isShutdown() {
206         return this.isShutDown.get();
207     }
208 
209     public void shutdown(final long waitMs) throws IOException {
210         if (this.isShutDown.compareAndSet(false, true)) {
211             fireCallbacks();
212             this.lock.lock();
213             try {
214                 for (final SessionRequest sessionRequest: this.pending) {
215                     sessionRequest.cancel();
216                 }
217                 for (final E entry: this.available) {
218                     entry.close();
219                 }
220                 for (final E entry: this.leased) {
221                     entry.close();
222                 }
223                 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
224                     pool.shutdown();
225                 }
226                 this.routeToPool.clear();
227                 this.leased.clear();
228                 this.pending.clear();
229                 this.available.clear();
230                 this.leasingRequests.clear();
231                 this.ioreactor.shutdown(waitMs);
232             } finally {
233                 this.lock.unlock();
234             }
235         }
236     }
237 
238     private RouteSpecificPool<T, C, E> getPool(final T route) {
239         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
240         if (pool == null) {
241             pool = new RouteSpecificPool<T, C, E>(route) {
242 
243                 @Override
244                 protected E createEntry(final T route, final C conn) {
245                     return AbstractNIOConnPool.this.createEntry(route, conn);
246                 }
247 
248             };
249             this.routeToPool.put(route, pool);
250         }
251         return pool;
252     }
253 
254     public Future<E> lease(
255             final T route, final Object state,
256             final long connectTimeout, final TimeUnit tunit,
257             final FutureCallback<E> callback) {
258         return this.lease(route, state, connectTimeout, connectTimeout, tunit, callback);
259     }
260 
261     /**
262      * @since 4.3
263      */
264     public Future<E> lease(
265             final T route, final Object state,
266             final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
267             final FutureCallback<E> callback) {
268         Args.notNull(route, "Route");
269         Args.notNull(tunit, "Time unit");
270         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
271         final BasicFuture<E> future = new BasicFuture<E>(callback);
272         this.lock.lock();
273         try {
274             final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
275             final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future);
276             final boolean completed = processPendingRequest(request);
277             if (!request.isDone() && !completed) {
278                 this.leasingRequests.add(request);
279             }
280             if (request.isDone()) {
281                 this.completedRequests.add(request);
282             }
283         } finally {
284             this.lock.unlock();
285         }
286         fireCallbacks();
287         return future;
288     }
289 
290     @Override
291     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
292         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
293     }
294 
295     public Future<E> lease(final T route, final Object state) {
296         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
297     }
298 
299     @Override
300     public void release(final E entry, final boolean reusable) {
301         if (entry == null) {
302             return;
303         }
304         if (this.isShutDown.get()) {
305             return;
306         }
307         this.lock.lock();
308         try {
309             if (this.leased.remove(entry)) {
310                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
311                 pool.free(entry, reusable);
312                 if (reusable) {
313                     this.available.addFirst(entry);
314                     onRelease(entry);
315                 } else {
316                     entry.close();
317                 }
318                 processNextPendingRequest();
319             }
320         } finally {
321             this.lock.unlock();
322         }
323         fireCallbacks();
324     }
325 
326     private void processPendingRequests() {
327         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
328         while (it.hasNext()) {
329             final LeaseRequest<T, C, E> request = it.next();
330             final boolean completed = processPendingRequest(request);
331             if (request.isDone() || completed) {
332                 it.remove();
333             }
334             if (request.isDone()) {
335                 this.completedRequests.add(request);
336             }
337         }
338     }
339 
340     private void processNextPendingRequest() {
341         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
342         while (it.hasNext()) {
343             final LeaseRequest<T, C, E> request = it.next();
344             final boolean completed = processPendingRequest(request);
345             if (request.isDone() || completed) {
346                 it.remove();
347             }
348             if (request.isDone()) {
349                 this.completedRequests.add(request);
350             }
351             if (completed) {
352                 return;
353             }
354         }
355     }
356 
357     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
358         final T route = request.getRoute();
359         final Object state = request.getState();
360         final long deadline = request.getDeadline();
361 
362         final long now = System.currentTimeMillis();
363         if (now > deadline) {
364             request.failed(new TimeoutException());
365             return false;
366         }
367 
368         final RouteSpecificPool<T, C, E> pool = getPool(route);
369         E entry;
370         for (;;) {
371             entry = pool.getFree(state);
372             if (entry == null) {
373                 break;
374             }
375             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
376                 entry.close();
377                 this.available.remove(entry);
378                 pool.free(entry, false);
379             } else {
380                 break;
381             }
382         }
383         if (entry != null) {
384             this.available.remove(entry);
385             this.leased.add(entry);
386             request.completed(entry);
387             onReuse(entry);
388             onLease(entry);
389             return true;
390         }
391 
392         // New connection is needed
393         final int maxPerRoute = getMax(route);
394         // Shrink the pool prior to allocating a new connection
395         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
396         if (excess > 0) {
397             for (int i = 0; i < excess; i++) {
398                 final E lastUsed = pool.getLastUsed();
399                 if (lastUsed == null) {
400                     break;
401                 }
402                 lastUsed.close();
403                 this.available.remove(lastUsed);
404                 pool.remove(lastUsed);
405             }
406         }
407 
408         if (pool.getAllocatedCount() < maxPerRoute) {
409             final int totalUsed = this.pending.size() + this.leased.size();
410             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
411             if (freeCapacity == 0) {
412                 return false;
413             }
414             final int totalAvailable = this.available.size();
415             if (totalAvailable > freeCapacity - 1) {
416                 if (!this.available.isEmpty()) {
417                     final E lastUsed = this.available.removeLast();
418                     lastUsed.close();
419                     final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
420                     otherpool.remove(lastUsed);
421                 }
422             }
423 
424             final SocketAddress localAddress;
425             final SocketAddress remoteAddress;
426             try {
427                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
428                 localAddress = this.addressResolver.resolveLocalAddress(route);
429             } catch (final IOException ex) {
430                 request.failed(ex);
431                 return false;
432             }
433 
434             final SessionRequest sessionRequest = this.ioreactor.connect(
435                     remoteAddress, localAddress, route, this.sessionRequestCallback);
436             final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
437                     (int) request.getConnectTimeout() : Integer.MAX_VALUE;
438             sessionRequest.setConnectTimeout(timout);
439             this.pending.add(sessionRequest);
440             pool.addPending(sessionRequest, request.getFuture());
441             return true;
442         } else {
443             return false;
444         }
445     }
446 
447     private void fireCallbacks() {
448         LeaseRequest<T, C, E> request;
449         while ((request = this.completedRequests.poll()) != null) {
450             final BasicFuture<E> future = request.getFuture();
451             final Exception ex = request.getException();
452             final E result = request.getResult();
453             if (ex != null) {
454                 future.failed(ex);
455             } else if (result != null) {
456                 future.completed(result);
457             } else {
458                 future.cancel();
459             }
460         }
461     }
462 
463     public void validatePendingRequests() {
464         this.lock.lock();
465         try {
466             final long now = System.currentTimeMillis();
467             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
468             while (it.hasNext()) {
469                 final LeaseRequest<T, C, E> request = it.next();
470                 final long deadline = request.getDeadline();
471                 if (now > deadline) {
472                     it.remove();
473                     request.failed(new TimeoutException());
474                     this.completedRequests.add(request);
475                 }
476             }
477         } finally {
478             this.lock.unlock();
479         }
480         fireCallbacks();
481     }
482 
483     protected void requestCompleted(final SessionRequest request) {
484         if (this.isShutDown.get()) {
485             return;
486         }
487         @SuppressWarnings("unchecked")
488         final
489         T route = (T) request.getAttachment();
490         this.lock.lock();
491         try {
492             this.pending.remove(request);
493             final RouteSpecificPool<T, C, E> pool = getPool(route);
494             final IOSession session = request.getSession();
495             try {
496                 final C conn = this.connFactory.create(route, session);
497                 final E entry = pool.createEntry(request, conn);
498                 this.leased.add(entry);
499                 pool.completed(request, entry);
500                 onLease(entry);
501             } catch (final IOException ex) {
502                 pool.failed(request, ex);
503             }
504         } finally {
505             this.lock.unlock();
506         }
507         fireCallbacks();
508     }
509 
510     protected void requestCancelled(final SessionRequest request) {
511         if (this.isShutDown.get()) {
512             return;
513         }
514         @SuppressWarnings("unchecked")
515         final
516         T route = (T) request.getAttachment();
517         this.lock.lock();
518         try {
519             this.pending.remove(request);
520             final RouteSpecificPool<T, C, E> pool = getPool(route);
521             pool.cancelled(request);
522             if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
523                 processNextPendingRequest();
524             }
525         } finally {
526             this.lock.unlock();
527         }
528         fireCallbacks();
529     }
530 
531     protected void requestFailed(final SessionRequest request) {
532         if (this.isShutDown.get()) {
533             return;
534         }
535         @SuppressWarnings("unchecked")
536         final
537         T route = (T) request.getAttachment();
538         this.lock.lock();
539         try {
540             this.pending.remove(request);
541             final RouteSpecificPool<T, C, E> pool = getPool(route);
542             pool.failed(request, request.getException());
543             processNextPendingRequest();
544         } finally {
545             this.lock.unlock();
546         }
547         fireCallbacks();
548     }
549 
550     protected void requestTimeout(final SessionRequest request) {
551         if (this.isShutDown.get()) {
552             return;
553         }
554         @SuppressWarnings("unchecked")
555         final
556         T route = (T) request.getAttachment();
557         this.lock.lock();
558         try {
559             this.pending.remove(request);
560             final RouteSpecificPool<T, C, E> pool = getPool(route);
561             pool.timeout(request);
562             processNextPendingRequest();
563         } finally {
564             this.lock.unlock();
565         }
566         fireCallbacks();
567     }
568 
569     private int getMax(final T route) {
570         final Integer v = this.maxPerRoute.get(route);
571         if (v != null) {
572             return v.intValue();
573         } else {
574             return this.defaultMaxPerRoute;
575         }
576     }
577 
578     @Override
579     public void setMaxTotal(final int max) {
580         Args.positive(max, "Max value");
581         this.lock.lock();
582         try {
583             this.maxTotal = max;
584         } finally {
585             this.lock.unlock();
586         }
587     }
588 
589     @Override
590     public int getMaxTotal() {
591         this.lock.lock();
592         try {
593             return this.maxTotal;
594         } finally {
595             this.lock.unlock();
596         }
597     }
598 
599     @Override
600     public void setDefaultMaxPerRoute(final int max) {
601         Args.positive(max, "Max value");
602         this.lock.lock();
603         try {
604             this.defaultMaxPerRoute = max;
605         } finally {
606             this.lock.unlock();
607         }
608     }
609 
610     @Override
611     public int getDefaultMaxPerRoute() {
612         this.lock.lock();
613         try {
614             return this.defaultMaxPerRoute;
615         } finally {
616             this.lock.unlock();
617         }
618     }
619 
620     @Override
621     public void setMaxPerRoute(final T route, final int max) {
622         Args.notNull(route, "Route");
623         Args.positive(max, "Max value");
624         this.lock.lock();
625         try {
626             this.maxPerRoute.put(route, Integer.valueOf(max));
627         } finally {
628             this.lock.unlock();
629         }
630     }
631 
632     @Override
633     public int getMaxPerRoute(final T route) {
634         Args.notNull(route, "Route");
635         this.lock.lock();
636         try {
637             return getMax(route);
638         } finally {
639             this.lock.unlock();
640         }
641     }
642 
643     @Override
644     public PoolStats getTotalStats() {
645         this.lock.lock();
646         try {
647             return new PoolStats(
648                     this.leased.size(),
649                     this.pending.size(),
650                     this.available.size(),
651                     this.maxTotal);
652         } finally {
653             this.lock.unlock();
654         }
655     }
656 
657     @Override
658     public PoolStats getStats(final T route) {
659         Args.notNull(route, "Route");
660         this.lock.lock();
661         try {
662             final RouteSpecificPool<T, C, E> pool = getPool(route);
663             return new PoolStats(
664                     pool.getLeasedCount(),
665                     pool.getPendingCount(),
666                     pool.getAvailableCount(),
667                     getMax(route));
668         } finally {
669             this.lock.unlock();
670         }
671     }
672 
673     /**
674      * Returns snapshot of all knows routes
675      *
676      * @since 4.4
677      */
678     public Set<T> getRoutes() {
679         this.lock.lock();
680         try {
681             return new HashSet<T>(routeToPool.keySet());
682         } finally {
683             this.lock.unlock();
684         }
685     }
686 
687     /**
688      * Enumerates all available connections.
689      *
690      * @since 4.3
691      */
692     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
693         this.lock.lock();
694         try {
695             final Iterator<E> it = this.available.iterator();
696             while (it.hasNext()) {
697                 final E entry = it.next();
698                 callback.process(entry);
699                 if (entry.isClosed()) {
700                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
701                     pool.remove(entry);
702                     it.remove();
703                 }
704             }
705             processPendingRequests();
706             purgePoolMap();
707         } finally {
708             this.lock.unlock();
709         }
710     }
711 
712     /**
713      * Enumerates all leased connections.
714      *
715      * @since 4.3
716      */
717     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
718         this.lock.lock();
719         try {
720             final Iterator<E> it = this.leased.iterator();
721             while (it.hasNext()) {
722                 final E entry = it.next();
723                 callback.process(entry);
724             }
725             processPendingRequests();
726         } finally {
727             this.lock.unlock();
728         }
729     }
730 
731     /**
732      * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
733      *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
734      *
735      * @deprecated (4.3.2)
736      */
737     @Deprecated
738     protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
739         while (it.hasNext()) {
740             final E entry = it.next();
741             callback.process(entry);
742         }
743         processPendingRequests();
744     }
745 
746     private void purgePoolMap() {
747         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
748         while (it.hasNext()) {
749             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
750             final RouteSpecificPool<T, C, E> pool = entry.getValue();
751             if (pool.getAllocatedCount() == 0) {
752                 it.remove();
753             }
754         }
755     }
756 
757     public void closeIdle(final long idletime, final TimeUnit tunit) {
758         Args.notNull(tunit, "Time unit");
759         long time = tunit.toMillis(idletime);
760         if (time < 0) {
761             time = 0;
762         }
763         final long deadline = System.currentTimeMillis() - time;
764         enumAvailable(new PoolEntryCallback<T, C>() {
765 
766             @Override
767             public void process(final PoolEntry<T, C> entry) {
768                 if (entry.getUpdated() <= deadline) {
769                     entry.close();
770                 }
771             }
772 
773         });
774     }
775 
776     public void closeExpired() {
777         final long now = System.currentTimeMillis();
778         enumAvailable(new PoolEntryCallback<T, C>() {
779 
780             @Override
781             public void process(final PoolEntry<T, C> entry) {
782                 if (entry.isExpired(now)) {
783                     entry.close();
784                 }
785             }
786 
787         });
788     }
789 
790     @Override
791     public String toString() {
792         final StringBuilder buffer = new StringBuilder();
793         buffer.append("[leased: ");
794         buffer.append(this.leased);
795         buffer.append("][available: ");
796         buffer.append(this.available);
797         buffer.append("][pending: ");
798         buffer.append(this.pending);
799         buffer.append("]");
800         return buffer.toString();
801     }
802 
803     class InternalSessionRequestCallback implements SessionRequestCallback {
804 
805         @Override
806         public void completed(final SessionRequest request) {
807             requestCompleted(request);
808         }
809 
810         @Override
811         public void cancelled(final SessionRequest request) {
812             requestCancelled(request);
813         }
814 
815         @Override
816         public void failed(final SessionRequest request) {
817             requestFailed(request);
818         }
819 
820         @Override
821         public void timeout(final SessionRequest request) {
822             requestTimeout(request);
823         }
824 
825     }
826 
827 }