View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.http.annotation.ThreadSafe;
47  import org.apache.http.concurrent.BasicFuture;
48  import org.apache.http.concurrent.FutureCallback;
49  import org.apache.http.nio.reactor.ConnectingIOReactor;
50  import org.apache.http.nio.reactor.IOReactorStatus;
51  import org.apache.http.nio.reactor.IOSession;
52  import org.apache.http.nio.reactor.SessionRequest;
53  import org.apache.http.nio.reactor.SessionRequestCallback;
54  import org.apache.http.pool.ConnPool;
55  import org.apache.http.pool.ConnPoolControl;
56  import org.apache.http.pool.PoolEntry;
57  import org.apache.http.pool.PoolEntryCallback;
58  import org.apache.http.pool.PoolStats;
59  import org.apache.http.util.Args;
60  import org.apache.http.util.Asserts;
61  
62  /**
63   * Abstract non-blocking connection pool.
64   *
65   * @param <T> route
66   * @param <C> connection object
67   * @param <E> pool entry
68   *
69   * @since 4.2
70   */
71  @ThreadSafe
72  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
73                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
74  
75      private final ConnectingIOReactor ioreactor;
76      private final NIOConnFactory<T, C> connFactory;
77      private final SocketAddressResolver<T> addressResolver;
78      private final SessionRequestCallback sessionRequestCallback;
79      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
80      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
81      private final Set<SessionRequest> pending;
82      private final Set<E> leased;
83      private final LinkedList<E> available;
84      private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
85      private final Map<T, Integer> maxPerRoute;
86      private final Lock lock;
87      private final AtomicBoolean isShutDown;
88  
89      private volatile int defaultMaxPerRoute;
90      private volatile int maxTotal;
91  
92      /**
93       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
94       *   NIOConnFactory, SocketAddressResolver, int, int)}
95       */
96      @Deprecated
97      public AbstractNIOConnPool(
98              final ConnectingIOReactor ioreactor,
99              final NIOConnFactory<T, C> connFactory,
100             final int defaultMaxPerRoute,
101             final int maxTotal) {
102         super();
103         Args.notNull(ioreactor, "I/O reactor");
104         Args.notNull(connFactory, "Connection factory");
105         Args.positive(defaultMaxPerRoute, "Max per route value");
106         Args.positive(maxTotal, "Max total value");
107         this.ioreactor = ioreactor;
108         this.connFactory = connFactory;
109         this.addressResolver = new SocketAddressResolver<T>() {
110 
111             @Override
112             public SocketAddress resolveLocalAddress(final T route) throws IOException {
113                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
114             }
115 
116             @Override
117             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
118                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
119             }
120 
121         };
122         this.sessionRequestCallback = new InternalSessionRequestCallback();
123         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
124         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
125         this.pending = new HashSet<SessionRequest>();
126         this.leased = new HashSet<E>();
127         this.available = new LinkedList<E>();
128         this.maxPerRoute = new HashMap<T, Integer>();
129         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
130         this.lock = new ReentrantLock();
131         this.isShutDown = new AtomicBoolean(false);
132         this.defaultMaxPerRoute = defaultMaxPerRoute;
133         this.maxTotal = maxTotal;
134     }
135 
136     /**
137      * @since 4.3
138      */
139     public AbstractNIOConnPool(
140             final ConnectingIOReactor ioreactor,
141             final NIOConnFactory<T, C> connFactory,
142             final SocketAddressResolver<T> addressResolver,
143             final int defaultMaxPerRoute,
144             final int maxTotal) {
145         super();
146         Args.notNull(ioreactor, "I/O reactor");
147         Args.notNull(connFactory, "Connection factory");
148         Args.notNull(addressResolver, "Address resolver");
149         Args.positive(defaultMaxPerRoute, "Max per route value");
150         Args.positive(maxTotal, "Max total value");
151         this.ioreactor = ioreactor;
152         this.connFactory = connFactory;
153         this.addressResolver = addressResolver;
154         this.sessionRequestCallback = new InternalSessionRequestCallback();
155         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
156         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
157         this.pending = new HashSet<SessionRequest>();
158         this.leased = new HashSet<E>();
159         this.available = new LinkedList<E>();
160         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
161         this.maxPerRoute = new HashMap<T, Integer>();
162         this.lock = new ReentrantLock();
163         this.isShutDown = new AtomicBoolean(false);
164         this.defaultMaxPerRoute = defaultMaxPerRoute;
165         this.maxTotal = maxTotal;
166     }
167 
168     /**
169      * @deprecated (4.3) use {@link SocketAddressResolver}
170      */
171     @Deprecated
172     protected SocketAddress resolveRemoteAddress(final T route) {
173         return null;
174     }
175 
176     /**
177      * @deprecated (4.3) use {@link SocketAddressResolver}
178      */
179     @Deprecated
180     protected SocketAddress resolveLocalAddress(final T route) {
181         return null;
182     }
183 
184     protected abstract E createEntry(T route, C conn);
185 
186     /**
187      * @since 4.3
188      */
189     protected void onLease(final E entry) {
190     }
191 
192     /**
193      * @since 4.3
194      */
195     protected void onRelease(final E entry) {
196     }
197 
198     /**
199      * @since 4.4
200      */
201     protected void onReuse(final E entry) {
202     }
203 
204     public boolean isShutdown() {
205         return this.isShutDown.get();
206     }
207 
208     public void shutdown(final long waitMs) throws IOException {
209         if (this.isShutDown.compareAndSet(false, true)) {
210             fireCallbacks();
211             this.lock.lock();
212             try {
213                 for (final SessionRequest sessionRequest: this.pending) {
214                     sessionRequest.cancel();
215                 }
216                 for (final E entry: this.available) {
217                     entry.close();
218                 }
219                 for (final E entry: this.leased) {
220                     entry.close();
221                 }
222                 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
223                     pool.shutdown();
224                 }
225                 this.routeToPool.clear();
226                 this.leased.clear();
227                 this.pending.clear();
228                 this.available.clear();
229                 this.leasingRequests.clear();
230                 this.ioreactor.shutdown(waitMs);
231             } finally {
232                 this.lock.unlock();
233             }
234         }
235     }
236 
237     private RouteSpecificPool<T, C, E> getPool(final T route) {
238         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
239         if (pool == null) {
240             pool = new RouteSpecificPool<T, C, E>(route) {
241 
242                 @Override
243                 protected E createEntry(final T route, final C conn) {
244                     return AbstractNIOConnPool.this.createEntry(route, conn);
245                 }
246 
247             };
248             this.routeToPool.put(route, pool);
249         }
250         return pool;
251     }
252 
253     public Future<E> lease(
254             final T route, final Object state,
255             final long connectTimeout, final TimeUnit tunit,
256             final FutureCallback<E> callback) {
257         return this.lease(route, state, connectTimeout, connectTimeout, tunit, callback);
258     }
259 
260     /**
261      * @since 4.3
262      */
263     public Future<E> lease(
264             final T route, final Object state,
265             final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
266             final FutureCallback<E> callback) {
267         Args.notNull(route, "Route");
268         Args.notNull(tunit, "Time unit");
269         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
270         final BasicFuture<E> future = new BasicFuture<E>(callback);
271         this.lock.lock();
272         try {
273             final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
274             final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future);
275             final boolean completed = processPendingRequest(request);
276             if (!request.isDone() && !completed) {
277                 this.leasingRequests.add(request);
278             }
279             if (request.isDone()) {
280                 this.completedRequests.add(request);
281             }
282         } finally {
283             this.lock.unlock();
284         }
285         fireCallbacks();
286         return future;
287     }
288 
289     @Override
290     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
291         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
292     }
293 
294     public Future<E> lease(final T route, final Object state) {
295         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
296     }
297 
298     @Override
299     public void release(final E entry, final boolean reusable) {
300         if (entry == null) {
301             return;
302         }
303         if (this.isShutDown.get()) {
304             return;
305         }
306         this.lock.lock();
307         try {
308             if (this.leased.remove(entry)) {
309                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
310                 pool.free(entry, reusable);
311                 if (reusable) {
312                     this.available.addFirst(entry);
313                     onRelease(entry);
314                 } else {
315                     entry.close();
316                 }
317                 processNextPendingRequest();
318             }
319         } finally {
320             this.lock.unlock();
321         }
322         fireCallbacks();
323     }
324 
325     private void processPendingRequests() {
326         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
327         while (it.hasNext()) {
328             final LeaseRequest<T, C, E> request = it.next();
329             final boolean completed = processPendingRequest(request);
330             if (request.isDone() || completed) {
331                 it.remove();
332             }
333             if (request.isDone()) {
334                 this.completedRequests.add(request);
335             }
336         }
337     }
338 
339     private void processNextPendingRequest() {
340         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
341         while (it.hasNext()) {
342             final LeaseRequest<T, C, E> request = it.next();
343             final boolean completed = processPendingRequest(request);
344             if (request.isDone() || completed) {
345                 it.remove();
346             }
347             if (request.isDone()) {
348                 this.completedRequests.add(request);
349             }
350             if (completed) {
351                 return;
352             }
353         }
354     }
355 
356     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
357         final T route = request.getRoute();
358         final Object state = request.getState();
359         final long deadline = request.getDeadline();
360 
361         final long now = System.currentTimeMillis();
362         if (now > deadline) {
363             request.failed(new TimeoutException());
364             return false;
365         }
366 
367         final RouteSpecificPool<T, C, E> pool = getPool(route);
368         E entry;
369         for (;;) {
370             entry = pool.getFree(state);
371             if (entry == null) {
372                 break;
373             }
374             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
375                 entry.close();
376                 this.available.remove(entry);
377                 pool.free(entry, false);
378             } else {
379                 break;
380             }
381         }
382         if (entry != null) {
383             this.available.remove(entry);
384             this.leased.add(entry);
385             request.completed(entry);
386             onReuse(entry);
387             onLease(entry);
388             return true;
389         }
390 
391         // New connection is needed
392         final int maxPerRoute = getMax(route);
393         // Shrink the pool prior to allocating a new connection
394         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
395         if (excess > 0) {
396             for (int i = 0; i < excess; i++) {
397                 final E lastUsed = pool.getLastUsed();
398                 if (lastUsed == null) {
399                     break;
400                 }
401                 lastUsed.close();
402                 this.available.remove(lastUsed);
403                 pool.remove(lastUsed);
404             }
405         }
406 
407         if (pool.getAllocatedCount() < maxPerRoute) {
408             final int totalUsed = this.pending.size() + this.leased.size();
409             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
410             if (freeCapacity == 0) {
411                 return false;
412             }
413             final int totalAvailable = this.available.size();
414             if (totalAvailable > freeCapacity - 1) {
415                 if (!this.available.isEmpty()) {
416                     final E lastUsed = this.available.removeLast();
417                     lastUsed.close();
418                     final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
419                     otherpool.remove(lastUsed);
420                 }
421             }
422 
423             final SocketAddress localAddress;
424             final SocketAddress remoteAddress;
425             try {
426                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
427                 localAddress = this.addressResolver.resolveLocalAddress(route);
428             } catch (final IOException ex) {
429                 request.failed(ex);
430                 return false;
431             }
432 
433             final SessionRequest sessionRequest = this.ioreactor.connect(
434                     remoteAddress, localAddress, route, this.sessionRequestCallback);
435             final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
436                     (int) request.getConnectTimeout() : Integer.MAX_VALUE;
437             sessionRequest.setConnectTimeout(timout);
438             this.pending.add(sessionRequest);
439             pool.addPending(sessionRequest, request.getFuture());
440             return true;
441         } else {
442             return false;
443         }
444     }
445 
446     private void fireCallbacks() {
447         LeaseRequest<T, C, E> request;
448         while ((request = this.completedRequests.poll()) != null) {
449             final BasicFuture<E> future = request.getFuture();
450             final Exception ex = request.getException();
451             final E result = request.getResult();
452             if (ex != null) {
453                 future.failed(ex);
454             } else if (result != null) {
455                 future.completed(result);
456             } else {
457                 future.cancel();
458             }
459         }
460     }
461 
462     public void validatePendingRequests() {
463         this.lock.lock();
464         try {
465             final long now = System.currentTimeMillis();
466             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
467             while (it.hasNext()) {
468                 final LeaseRequest<T, C, E> request = it.next();
469                 final long deadline = request.getDeadline();
470                 if (now > deadline) {
471                     it.remove();
472                     request.failed(new TimeoutException());
473                     this.completedRequests.add(request);
474                 }
475             }
476         } finally {
477             this.lock.unlock();
478         }
479         fireCallbacks();
480     }
481 
482     protected void requestCompleted(final SessionRequest request) {
483         if (this.isShutDown.get()) {
484             return;
485         }
486         @SuppressWarnings("unchecked")
487         final
488         T route = (T) request.getAttachment();
489         this.lock.lock();
490         try {
491             this.pending.remove(request);
492             final RouteSpecificPool<T, C, E> pool = getPool(route);
493             final IOSession session = request.getSession();
494             try {
495                 final C conn = this.connFactory.create(route, session);
496                 final E entry = pool.createEntry(request, conn);
497                 this.leased.add(entry);
498                 pool.completed(request, entry);
499                 onLease(entry);
500             } catch (final IOException ex) {
501                 pool.failed(request, ex);
502             }
503         } finally {
504             this.lock.unlock();
505         }
506         fireCallbacks();
507     }
508 
509     protected void requestCancelled(final SessionRequest request) {
510         if (this.isShutDown.get()) {
511             return;
512         }
513         @SuppressWarnings("unchecked")
514         final
515         T route = (T) request.getAttachment();
516         this.lock.lock();
517         try {
518             this.pending.remove(request);
519             final RouteSpecificPool<T, C, E> pool = getPool(route);
520             pool.cancelled(request);
521             if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
522                 processNextPendingRequest();
523             }
524         } finally {
525             this.lock.unlock();
526         }
527         fireCallbacks();
528     }
529 
530     protected void requestFailed(final SessionRequest request) {
531         if (this.isShutDown.get()) {
532             return;
533         }
534         @SuppressWarnings("unchecked")
535         final
536         T route = (T) request.getAttachment();
537         this.lock.lock();
538         try {
539             this.pending.remove(request);
540             final RouteSpecificPool<T, C, E> pool = getPool(route);
541             pool.failed(request, request.getException());
542             processNextPendingRequest();
543         } finally {
544             this.lock.unlock();
545         }
546         fireCallbacks();
547     }
548 
549     protected void requestTimeout(final SessionRequest request) {
550         if (this.isShutDown.get()) {
551             return;
552         }
553         @SuppressWarnings("unchecked")
554         final
555         T route = (T) request.getAttachment();
556         this.lock.lock();
557         try {
558             this.pending.remove(request);
559             final RouteSpecificPool<T, C, E> pool = getPool(route);
560             pool.timeout(request);
561             processNextPendingRequest();
562         } finally {
563             this.lock.unlock();
564         }
565         fireCallbacks();
566     }
567 
568     private int getMax(final T route) {
569         final Integer v = this.maxPerRoute.get(route);
570         if (v != null) {
571             return v.intValue();
572         } else {
573             return this.defaultMaxPerRoute;
574         }
575     }
576 
577     @Override
578     public void setMaxTotal(final int max) {
579         Args.positive(max, "Max value");
580         this.lock.lock();
581         try {
582             this.maxTotal = max;
583         } finally {
584             this.lock.unlock();
585         }
586     }
587 
588     @Override
589     public int getMaxTotal() {
590         this.lock.lock();
591         try {
592             return this.maxTotal;
593         } finally {
594             this.lock.unlock();
595         }
596     }
597 
598     @Override
599     public void setDefaultMaxPerRoute(final int max) {
600         Args.positive(max, "Max value");
601         this.lock.lock();
602         try {
603             this.defaultMaxPerRoute = max;
604         } finally {
605             this.lock.unlock();
606         }
607     }
608 
609     @Override
610     public int getDefaultMaxPerRoute() {
611         this.lock.lock();
612         try {
613             return this.defaultMaxPerRoute;
614         } finally {
615             this.lock.unlock();
616         }
617     }
618 
619     @Override
620     public void setMaxPerRoute(final T route, final int max) {
621         Args.notNull(route, "Route");
622         Args.positive(max, "Max value");
623         this.lock.lock();
624         try {
625             this.maxPerRoute.put(route, Integer.valueOf(max));
626         } finally {
627             this.lock.unlock();
628         }
629     }
630 
631     @Override
632     public int getMaxPerRoute(final T route) {
633         Args.notNull(route, "Route");
634         this.lock.lock();
635         try {
636             return getMax(route);
637         } finally {
638             this.lock.unlock();
639         }
640     }
641 
642     @Override
643     public PoolStats getTotalStats() {
644         this.lock.lock();
645         try {
646             return new PoolStats(
647                     this.leased.size(),
648                     this.pending.size(),
649                     this.available.size(),
650                     this.maxTotal);
651         } finally {
652             this.lock.unlock();
653         }
654     }
655 
656     @Override
657     public PoolStats getStats(final T route) {
658         Args.notNull(route, "Route");
659         this.lock.lock();
660         try {
661             final RouteSpecificPool<T, C, E> pool = getPool(route);
662             return new PoolStats(
663                     pool.getLeasedCount(),
664                     pool.getPendingCount(),
665                     pool.getAvailableCount(),
666                     getMax(route));
667         } finally {
668             this.lock.unlock();
669         }
670     }
671 
672     /**
673      * Returns snapshot of all knows routes
674      *
675      * @since 4.4
676      */
677     public Set<T> getRoutes() {
678         this.lock.lock();
679         try {
680             return new HashSet<T>(routeToPool.keySet());
681         } finally {
682             this.lock.unlock();
683         }
684     }
685 
686     /**
687      * Enumerates all available connections.
688      *
689      * @since 4.3
690      */
691     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
692         this.lock.lock();
693         try {
694             final Iterator<E> it = this.available.iterator();
695             while (it.hasNext()) {
696                 final E entry = it.next();
697                 callback.process(entry);
698                 if (entry.isClosed()) {
699                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
700                     pool.remove(entry);
701                     it.remove();
702                 }
703             }
704             processPendingRequests();
705             purgePoolMap();
706         } finally {
707             this.lock.unlock();
708         }
709     }
710 
711     /**
712      * Enumerates all leased connections.
713      *
714      * @since 4.3
715      */
716     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
717         this.lock.lock();
718         try {
719             final Iterator<E> it = this.leased.iterator();
720             while (it.hasNext()) {
721                 final E entry = it.next();
722                 callback.process(entry);
723             }
724             processPendingRequests();
725         } finally {
726             this.lock.unlock();
727         }
728     }
729 
730     /**
731      * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
732      *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
733      *
734      * @deprecated (4.3.2)
735      */
736     @Deprecated
737     protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
738         while (it.hasNext()) {
739             final E entry = it.next();
740             callback.process(entry);
741         }
742         processPendingRequests();
743     }
744 
745     private void purgePoolMap() {
746         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
747         while (it.hasNext()) {
748             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
749             final RouteSpecificPool<T, C, E> pool = entry.getValue();
750             if (pool.getAllocatedCount() == 0) {
751                 it.remove();
752             }
753         }
754     }
755 
756     public void closeIdle(final long idletime, final TimeUnit tunit) {
757         Args.notNull(tunit, "Time unit");
758         long time = tunit.toMillis(idletime);
759         if (time < 0) {
760             time = 0;
761         }
762         final long deadline = System.currentTimeMillis() - time;
763         enumAvailable(new PoolEntryCallback<T, C>() {
764 
765             @Override
766             public void process(final PoolEntry<T, C> entry) {
767                 if (entry.getUpdated() <= deadline) {
768                     entry.close();
769                 }
770             }
771 
772         });
773     }
774 
775     public void closeExpired() {
776         final long now = System.currentTimeMillis();
777         enumAvailable(new PoolEntryCallback<T, C>() {
778 
779             @Override
780             public void process(final PoolEntry<T, C> entry) {
781                 if (entry.isExpired(now)) {
782                     entry.close();
783                 }
784             }
785 
786         });
787     }
788 
789     @Override
790     public String toString() {
791         final StringBuilder buffer = new StringBuilder();
792         buffer.append("[leased: ");
793         buffer.append(this.leased);
794         buffer.append("][available: ");
795         buffer.append(this.available);
796         buffer.append("][pending: ");
797         buffer.append(this.pending);
798         buffer.append("]");
799         return buffer.toString();
800     }
801 
802     class InternalSessionRequestCallback implements SessionRequestCallback {
803 
804         @Override
805         public void completed(final SessionRequest request) {
806             requestCompleted(request);
807         }
808 
809         @Override
810         public void cancelled(final SessionRequest request) {
811             requestCancelled(request);
812         }
813 
814         @Override
815         public void failed(final SessionRequest request) {
816             requestFailed(request);
817         }
818 
819         @Override
820         public void timeout(final SessionRequest request) {
821             requestTimeout(request);
822         }
823 
824     }
825 
826 }