View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.http.annotation.ThreadSafe;
47  import org.apache.http.concurrent.BasicFuture;
48  import org.apache.http.concurrent.FutureCallback;
49  import org.apache.http.nio.reactor.ConnectingIOReactor;
50  import org.apache.http.nio.reactor.IOReactorStatus;
51  import org.apache.http.nio.reactor.IOSession;
52  import org.apache.http.nio.reactor.SessionRequest;
53  import org.apache.http.nio.reactor.SessionRequestCallback;
54  import org.apache.http.pool.ConnPool;
55  import org.apache.http.pool.ConnPoolControl;
56  import org.apache.http.pool.PoolEntry;
57  import org.apache.http.pool.PoolEntryCallback;
58  import org.apache.http.pool.PoolStats;
59  import org.apache.http.util.Args;
60  import org.apache.http.util.Asserts;
61  
62  /**
63   * Abstract non-blocking connection pool.
64   *
65   * @param <T> route
66   * @param <C> connection object
67   * @param <E> pool entry
68   *
69   * @since 4.2
70   */
71  @ThreadSafe
72  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
73                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
74  
75      private final ConnectingIOReactor ioreactor;
76      private final NIOConnFactory<T, C> connFactory;
77      private final SocketAddressResolver<T> addressResolver;
78      private final SessionRequestCallback sessionRequestCallback;
79      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
80      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
81      private final Set<SessionRequest> pending;
82      private final Set<E> leased;
83      private final LinkedList<E> available;
84      private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
85      private final Map<T, Integer> maxPerRoute;
86      private final Lock lock;
87      private final AtomicBoolean isShutDown;
88  
89      private volatile int defaultMaxPerRoute;
90      private volatile int maxTotal;
91  
92      /**
93       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
94       *   NIOConnFactory, SocketAddressResolver, int, int)}
95       */
96      @Deprecated
97      public AbstractNIOConnPool(
98              final ConnectingIOReactor ioreactor,
99              final NIOConnFactory<T, C> connFactory,
100             final int defaultMaxPerRoute,
101             final int maxTotal) {
102         super();
103         Args.notNull(ioreactor, "I/O reactor");
104         Args.notNull(connFactory, "Connection factory");
105         Args.positive(defaultMaxPerRoute, "Max per route value");
106         Args.positive(maxTotal, "Max total value");
107         this.ioreactor = ioreactor;
108         this.connFactory = connFactory;
109         this.addressResolver = new SocketAddressResolver<T>() {
110 
111             @Override
112             public SocketAddress resolveLocalAddress(final T route) throws IOException {
113                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
114             }
115 
116             @Override
117             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
118                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
119             }
120 
121         };
122         this.sessionRequestCallback = new InternalSessionRequestCallback();
123         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
124         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
125         this.pending = new HashSet<SessionRequest>();
126         this.leased = new HashSet<E>();
127         this.available = new LinkedList<E>();
128         this.maxPerRoute = new HashMap<T, Integer>();
129         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
130         this.lock = new ReentrantLock();
131         this.isShutDown = new AtomicBoolean(false);
132         this.defaultMaxPerRoute = defaultMaxPerRoute;
133         this.maxTotal = maxTotal;
134     }
135 
136     /**
137      * @since 4.3
138      */
139     public AbstractNIOConnPool(
140             final ConnectingIOReactor ioreactor,
141             final NIOConnFactory<T, C> connFactory,
142             final SocketAddressResolver<T> addressResolver,
143             final int defaultMaxPerRoute,
144             final int maxTotal) {
145         super();
146         Args.notNull(ioreactor, "I/O reactor");
147         Args.notNull(connFactory, "Connection factory");
148         Args.notNull(addressResolver, "Address resolver");
149         Args.positive(defaultMaxPerRoute, "Max per route value");
150         Args.positive(maxTotal, "Max total value");
151         this.ioreactor = ioreactor;
152         this.connFactory = connFactory;
153         this.addressResolver = addressResolver;
154         this.sessionRequestCallback = new InternalSessionRequestCallback();
155         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
156         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
157         this.pending = new HashSet<SessionRequest>();
158         this.leased = new HashSet<E>();
159         this.available = new LinkedList<E>();
160         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
161         this.maxPerRoute = new HashMap<T, Integer>();
162         this.lock = new ReentrantLock();
163         this.isShutDown = new AtomicBoolean(false);
164         this.defaultMaxPerRoute = defaultMaxPerRoute;
165         this.maxTotal = maxTotal;
166     }
167 
168     /**
169      * @deprecated (4.3) use {@link SocketAddressResolver}
170      */
171     @Deprecated
172     protected SocketAddress resolveRemoteAddress(final T route) {
173         return null;
174     }
175 
176     /**
177      * @deprecated (4.3) use {@link SocketAddressResolver}
178      */
179     @Deprecated
180     protected SocketAddress resolveLocalAddress(final T route) {
181         return null;
182     }
183 
184     protected abstract E createEntry(T route, C conn);
185 
186     /**
187      * @since 4.3
188      */
189     protected void onLease(final E entry) {
190     }
191 
192     /**
193      * @since 4.3
194      */
195     protected void onRelease(final E entry) {
196     }
197 
198     public boolean isShutdown() {
199         return this.isShutDown.get();
200     }
201 
202     public void shutdown(final long waitMs) throws IOException {
203         if (this.isShutDown.compareAndSet(false, true)) {
204             fireCallbacks();
205             this.lock.lock();
206             try {
207                 for (final SessionRequest sessionRequest: this.pending) {
208                     sessionRequest.cancel();
209                 }
210                 for (final E entry: this.available) {
211                     entry.close();
212                 }
213                 for (final E entry: this.leased) {
214                     entry.close();
215                 }
216                 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
217                     pool.shutdown();
218                 }
219                 this.routeToPool.clear();
220                 this.leased.clear();
221                 this.pending.clear();
222                 this.available.clear();
223                 this.leasingRequests.clear();
224                 this.ioreactor.shutdown(waitMs);
225             } finally {
226                 this.lock.unlock();
227             }
228         }
229     }
230 
231     private RouteSpecificPool<T, C, E> getPool(final T route) {
232         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
233         if (pool == null) {
234             pool = new RouteSpecificPool<T, C, E>(route) {
235 
236                 @Override
237                 protected E createEntry(final T route, final C conn) {
238                     return AbstractNIOConnPool.this.createEntry(route, conn);
239                 }
240 
241             };
242             this.routeToPool.put(route, pool);
243         }
244         return pool;
245     }
246 
247     public Future<E> lease(
248             final T route, final Object state,
249             final long connectTimeout, final TimeUnit tunit,
250             final FutureCallback<E> callback) {
251         return this.lease(route, state, connectTimeout, connectTimeout, tunit, callback);
252     }
253 
254     /**
255      * @since 4.3
256      */
257     public Future<E> lease(
258             final T route, final Object state,
259             final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
260             final FutureCallback<E> callback) {
261         Args.notNull(route, "Route");
262         Args.notNull(tunit, "Time unit");
263         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
264         final BasicFuture<E> future = new BasicFuture<E>(callback);
265         this.lock.lock();
266         try {
267             final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
268             final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future);
269             final boolean completed = processPendingRequest(request);
270             if (!request.isDone() && !completed) {
271                 this.leasingRequests.add(request);
272             }
273             if (request.isDone()) {
274                 this.completedRequests.add(request);
275             }
276         } finally {
277             this.lock.unlock();
278         }
279         fireCallbacks();
280         return future;
281     }
282 
283     @Override
284     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
285         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
286     }
287 
288     public Future<E> lease(final T route, final Object state) {
289         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
290     }
291 
292     @Override
293     public void release(final E entry, final boolean reusable) {
294         if (entry == null) {
295             return;
296         }
297         if (this.isShutDown.get()) {
298             return;
299         }
300         this.lock.lock();
301         try {
302             if (this.leased.remove(entry)) {
303                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
304                 pool.free(entry, reusable);
305                 if (reusable) {
306                     this.available.addFirst(entry);
307                     onRelease(entry);
308                 } else {
309                     entry.close();
310                 }
311                 processNextPendingRequest();
312             }
313         } finally {
314             this.lock.unlock();
315         }
316         fireCallbacks();
317     }
318 
319     private void processPendingRequests() {
320         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
321         while (it.hasNext()) {
322             final LeaseRequest<T, C, E> request = it.next();
323             final boolean completed = processPendingRequest(request);
324             if (request.isDone() || completed) {
325                 it.remove();
326             }
327             if (request.isDone()) {
328                 this.completedRequests.add(request);
329             }
330         }
331     }
332 
333     private void processNextPendingRequest() {
334         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
335         while (it.hasNext()) {
336             final LeaseRequest<T, C, E> request = it.next();
337             final boolean completed = processPendingRequest(request);
338             if (request.isDone() || completed) {
339                 it.remove();
340             }
341             if (request.isDone()) {
342                 this.completedRequests.add(request);
343             }
344             if (completed) {
345                 return;
346             }
347         }
348     }
349 
350     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
351         final T route = request.getRoute();
352         final Object state = request.getState();
353         final long deadline = request.getDeadline();
354 
355         final long now = System.currentTimeMillis();
356         if (now > deadline) {
357             request.failed(new TimeoutException());
358             return false;
359         }
360 
361         final RouteSpecificPool<T, C, E> pool = getPool(route);
362         E entry;
363         for (;;) {
364             entry = pool.getFree(state);
365             if (entry == null) {
366                 break;
367             }
368             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
369                 entry.close();
370                 this.available.remove(entry);
371                 pool.free(entry, false);
372             } else {
373                 break;
374             }
375         }
376         if (entry != null) {
377             this.available.remove(entry);
378             this.leased.add(entry);
379             request.completed(entry);
380             onLease(entry);
381             return true;
382         }
383 
384         // New connection is needed
385         final int maxPerRoute = getMax(route);
386         // Shrink the pool prior to allocating a new connection
387         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
388         if (excess > 0) {
389             for (int i = 0; i < excess; i++) {
390                 final E lastUsed = pool.getLastUsed();
391                 if (lastUsed == null) {
392                     break;
393                 }
394                 lastUsed.close();
395                 this.available.remove(lastUsed);
396                 pool.remove(lastUsed);
397             }
398         }
399 
400         if (pool.getAllocatedCount() < maxPerRoute) {
401             final int totalUsed = this.pending.size() + this.leased.size();
402             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
403             if (freeCapacity == 0) {
404                 return false;
405             }
406             final int totalAvailable = this.available.size();
407             if (totalAvailable > freeCapacity - 1) {
408                 if (!this.available.isEmpty()) {
409                     final E lastUsed = this.available.removeLast();
410                     lastUsed.close();
411                     final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
412                     otherpool.remove(lastUsed);
413                 }
414             }
415 
416             final SocketAddress localAddress;
417             final SocketAddress remoteAddress;
418             try {
419                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
420                 localAddress = this.addressResolver.resolveLocalAddress(route);
421             } catch (final IOException ex) {
422                 request.failed(ex);
423                 return false;
424             }
425 
426             final SessionRequest sessionRequest = this.ioreactor.connect(
427                     remoteAddress, localAddress, route, this.sessionRequestCallback);
428             final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
429                     (int) request.getConnectTimeout() : Integer.MAX_VALUE;
430             sessionRequest.setConnectTimeout(timout);
431             this.pending.add(sessionRequest);
432             pool.addPending(sessionRequest, request.getFuture());
433             return true;
434         } else {
435             return false;
436         }
437     }
438 
439     private void fireCallbacks() {
440         LeaseRequest<T, C, E> request;
441         while ((request = this.completedRequests.poll()) != null) {
442             final BasicFuture<E> future = request.getFuture();
443             final Exception ex = request.getException();
444             final E result = request.getResult();
445             if (ex != null) {
446                 future.failed(ex);
447             } else if (result != null) {
448                 future.completed(result);
449             } else {
450                 future.cancel();
451             }
452         }
453     }
454 
455     public void validatePendingRequests() {
456         this.lock.lock();
457         try {
458             final long now = System.currentTimeMillis();
459             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
460             while (it.hasNext()) {
461                 final LeaseRequest<T, C, E> request = it.next();
462                 final long deadline = request.getDeadline();
463                 if (now > deadline) {
464                     it.remove();
465                     request.failed(new TimeoutException());
466                     this.completedRequests.add(request);
467                 }
468             }
469         } finally {
470             this.lock.unlock();
471         }
472         fireCallbacks();
473     }
474 
475     protected void requestCompleted(final SessionRequest request) {
476         if (this.isShutDown.get()) {
477             return;
478         }
479         @SuppressWarnings("unchecked")
480         final
481         T route = (T) request.getAttachment();
482         this.lock.lock();
483         try {
484             this.pending.remove(request);
485             final RouteSpecificPool<T, C, E> pool = getPool(route);
486             final IOSession session = request.getSession();
487             try {
488                 final C conn = this.connFactory.create(route, session);
489                 final E entry = pool.createEntry(request, conn);
490                 this.leased.add(entry);
491                 pool.completed(request, entry);
492                 onLease(entry);
493             } catch (final IOException ex) {
494                 pool.failed(request, ex);
495             }
496         } finally {
497             this.lock.unlock();
498         }
499         fireCallbacks();
500     }
501 
502     protected void requestCancelled(final SessionRequest request) {
503         if (this.isShutDown.get()) {
504             return;
505         }
506         @SuppressWarnings("unchecked")
507         final
508         T route = (T) request.getAttachment();
509         this.lock.lock();
510         try {
511             this.pending.remove(request);
512             final RouteSpecificPool<T, C, E> pool = getPool(route);
513             pool.cancelled(request);
514             if (this.ioreactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
515                 processNextPendingRequest();
516             }
517         } finally {
518             this.lock.unlock();
519         }
520         fireCallbacks();
521     }
522 
523     protected void requestFailed(final SessionRequest request) {
524         if (this.isShutDown.get()) {
525             return;
526         }
527         @SuppressWarnings("unchecked")
528         final
529         T route = (T) request.getAttachment();
530         this.lock.lock();
531         try {
532             this.pending.remove(request);
533             final RouteSpecificPool<T, C, E> pool = getPool(route);
534             pool.failed(request, request.getException());
535             processNextPendingRequest();
536         } finally {
537             this.lock.unlock();
538         }
539         fireCallbacks();
540     }
541 
542     protected void requestTimeout(final SessionRequest request) {
543         if (this.isShutDown.get()) {
544             return;
545         }
546         @SuppressWarnings("unchecked")
547         final
548         T route = (T) request.getAttachment();
549         this.lock.lock();
550         try {
551             this.pending.remove(request);
552             final RouteSpecificPool<T, C, E> pool = getPool(route);
553             pool.timeout(request);
554             processNextPendingRequest();
555         } finally {
556             this.lock.unlock();
557         }
558         fireCallbacks();
559     }
560 
561     private int getMax(final T route) {
562         final Integer v = this.maxPerRoute.get(route);
563         if (v != null) {
564             return v.intValue();
565         } else {
566             return this.defaultMaxPerRoute;
567         }
568     }
569 
570     @Override
571     public void setMaxTotal(final int max) {
572         Args.positive(max, "Max value");
573         this.lock.lock();
574         try {
575             this.maxTotal = max;
576         } finally {
577             this.lock.unlock();
578         }
579     }
580 
581     @Override
582     public int getMaxTotal() {
583         this.lock.lock();
584         try {
585             return this.maxTotal;
586         } finally {
587             this.lock.unlock();
588         }
589     }
590 
591     @Override
592     public void setDefaultMaxPerRoute(final int max) {
593         Args.positive(max, "Max value");
594         this.lock.lock();
595         try {
596             this.defaultMaxPerRoute = max;
597         } finally {
598             this.lock.unlock();
599         }
600     }
601 
602     @Override
603     public int getDefaultMaxPerRoute() {
604         this.lock.lock();
605         try {
606             return this.defaultMaxPerRoute;
607         } finally {
608             this.lock.unlock();
609         }
610     }
611 
612     @Override
613     public void setMaxPerRoute(final T route, final int max) {
614         Args.notNull(route, "Route");
615         Args.positive(max, "Max value");
616         this.lock.lock();
617         try {
618             this.maxPerRoute.put(route, Integer.valueOf(max));
619         } finally {
620             this.lock.unlock();
621         }
622     }
623 
624     @Override
625     public int getMaxPerRoute(final T route) {
626         Args.notNull(route, "Route");
627         this.lock.lock();
628         try {
629             return getMax(route);
630         } finally {
631             this.lock.unlock();
632         }
633     }
634 
635     @Override
636     public PoolStats getTotalStats() {
637         this.lock.lock();
638         try {
639             return new PoolStats(
640                     this.leased.size(),
641                     this.pending.size(),
642                     this.available.size(),
643                     this.maxTotal);
644         } finally {
645             this.lock.unlock();
646         }
647     }
648 
649     @Override
650     public PoolStats getStats(final T route) {
651         Args.notNull(route, "Route");
652         this.lock.lock();
653         try {
654             final RouteSpecificPool<T, C, E> pool = getPool(route);
655             return new PoolStats(
656                     pool.getLeasedCount(),
657                     pool.getPendingCount(),
658                     pool.getAvailableCount(),
659                     getMax(route));
660         } finally {
661             this.lock.unlock();
662         }
663     }
664 
665     /**
666      * Returns snapshot of all knows routes
667      *
668      * @since 4.4
669      */
670     public Set<T> getRoutes() {
671         this.lock.lock();
672         try {
673             return new HashSet<T>(routeToPool.keySet());
674         } finally {
675             this.lock.unlock();
676         }
677     }
678 
679     /**
680      * Enumerates all available connections.
681      *
682      * @since 4.3
683      */
684     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
685         this.lock.lock();
686         try {
687             final Iterator<E> it = this.available.iterator();
688             while (it.hasNext()) {
689                 final E entry = it.next();
690                 callback.process(entry);
691                 if (entry.isClosed()) {
692                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
693                     pool.remove(entry);
694                     it.remove();
695                 }
696             }
697             processPendingRequests();
698             purgePoolMap();
699         } finally {
700             this.lock.unlock();
701         }
702     }
703 
704     /**
705      * Enumerates all leased connections.
706      *
707      * @since 4.3
708      */
709     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
710         this.lock.lock();
711         try {
712             final Iterator<E> it = this.leased.iterator();
713             while (it.hasNext()) {
714                 final E entry = it.next();
715                 callback.process(entry);
716             }
717             processPendingRequests();
718         } finally {
719             this.lock.unlock();
720         }
721     }
722 
723     /**
724      * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
725      *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
726      *
727      * @deprecated (4.3.2)
728      */
729     @Deprecated
730     protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
731         while (it.hasNext()) {
732             final E entry = it.next();
733             callback.process(entry);
734         }
735         processPendingRequests();
736     }
737 
738     private void purgePoolMap() {
739         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
740         while (it.hasNext()) {
741             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
742             final RouteSpecificPool<T, C, E> pool = entry.getValue();
743             if (pool.getAllocatedCount() == 0) {
744                 it.remove();
745             }
746         }
747     }
748 
749     public void closeIdle(final long idletime, final TimeUnit tunit) {
750         Args.notNull(tunit, "Time unit");
751         long time = tunit.toMillis(idletime);
752         if (time < 0) {
753             time = 0;
754         }
755         final long deadline = System.currentTimeMillis() - time;
756         enumAvailable(new PoolEntryCallback<T, C>() {
757 
758             @Override
759             public void process(final PoolEntry<T, C> entry) {
760                 if (entry.getUpdated() <= deadline) {
761                     entry.close();
762                 }
763             }
764 
765         });
766     }
767 
768     public void closeExpired() {
769         final long now = System.currentTimeMillis();
770         enumAvailable(new PoolEntryCallback<T, C>() {
771 
772             @Override
773             public void process(final PoolEntry<T, C> entry) {
774                 if (entry.isExpired(now)) {
775                     entry.close();
776                 }
777             }
778 
779         });
780     }
781 
782     @Override
783     public String toString() {
784         final StringBuilder buffer = new StringBuilder();
785         buffer.append("[leased: ");
786         buffer.append(this.leased);
787         buffer.append("][available: ");
788         buffer.append(this.available);
789         buffer.append("][pending: ");
790         buffer.append(this.pending);
791         buffer.append("]");
792         return buffer.toString();
793     }
794 
795     class InternalSessionRequestCallback implements SessionRequestCallback {
796 
797         @Override
798         public void completed(final SessionRequest request) {
799             requestCompleted(request);
800         }
801 
802         @Override
803         public void cancelled(final SessionRequest request) {
804             requestCancelled(request);
805         }
806 
807         @Override
808         public void failed(final SessionRequest request) {
809             requestFailed(request);
810         }
811 
812         @Override
813         public void timeout(final SessionRequest request) {
814             requestTimeout(request);
815         }
816 
817     }
818 
819 }