View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.http.annotation.ThreadSafe;
45  import org.apache.http.concurrent.BasicFuture;
46  import org.apache.http.concurrent.FutureCallback;
47  import org.apache.http.nio.reactor.ConnectingIOReactor;
48  import org.apache.http.nio.reactor.IOSession;
49  import org.apache.http.nio.reactor.SessionRequest;
50  import org.apache.http.nio.reactor.SessionRequestCallback;
51  import org.apache.http.pool.ConnPool;
52  import org.apache.http.pool.ConnPoolControl;
53  import org.apache.http.pool.PoolEntry;
54  import org.apache.http.pool.PoolStats;
55  import org.apache.http.util.Args;
56  import org.apache.http.util.Asserts;
57  
58  /**
59   * Abstract non-blocking connection pool.
60   *
61   * @param <T> route
62   * @param <C> connection object
63   * @param <E> pool entry
64   *
65   * @since 4.2
66   */
67  @ThreadSafe
68  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
69                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
70  
71      private final ConnectingIOReactor ioreactor;
72      private final NIOConnFactory<T, C> connFactory;
73      private final SocketAddressResolver<T> addressResolver;
74      private final SessionRequestCallback sessionRequestCallback;
75      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
76      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
77      private final Set<SessionRequest> pending;
78      private final Set<E> leased;
79      private final LinkedList<E> available;
80      private final Map<T, Integer> maxPerRoute;
81      private final Lock lock;
82  
83      private volatile boolean isShutDown;
84      private volatile int defaultMaxPerRoute;
85      private volatile int maxTotal;
86  
87      /**
88       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
89       *   NIOConnFactory, SocketAddressResolver, int, int)}
90       */
91      @Deprecated
92      public AbstractNIOConnPool(
93              final ConnectingIOReactor ioreactor,
94              final NIOConnFactory<T, C> connFactory,
95              final int defaultMaxPerRoute,
96              final int maxTotal) {
97          super();
98          Args.notNull(ioreactor, "I/O reactor");
99          Args.notNull(connFactory, "Connection factory");
100         Args.positive(defaultMaxPerRoute, "Max per route value");
101         Args.positive(maxTotal, "Max total value");
102         this.ioreactor = ioreactor;
103         this.connFactory = connFactory;
104         this.addressResolver = new SocketAddressResolver<T>() {
105 
106             public SocketAddress resolveLocalAddress(final T route) throws IOException {
107                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
108             }
109 
110             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
111                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
112             }
113 
114         };
115         this.sessionRequestCallback = new InternalSessionRequestCallback();
116         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
117         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
118         this.pending = new HashSet<SessionRequest>();
119         this.leased = new HashSet<E>();
120         this.available = new LinkedList<E>();
121         this.maxPerRoute = new HashMap<T, Integer>();
122         this.lock = new ReentrantLock();
123         this.defaultMaxPerRoute = defaultMaxPerRoute;
124         this.maxTotal = maxTotal;
125     }
126 
127     /**
128      * @since 4.3
129      */
130     public AbstractNIOConnPool(
131             final ConnectingIOReactor ioreactor,
132             final NIOConnFactory<T, C> connFactory,
133             final SocketAddressResolver<T> addressResolver,
134             final int defaultMaxPerRoute,
135             final int maxTotal) {
136         super();
137         Args.notNull(ioreactor, "I/O reactor");
138         Args.notNull(connFactory, "Connection factory");
139         Args.notNull(addressResolver, "Address resolver");
140         Args.positive(defaultMaxPerRoute, "Max per route value");
141         Args.positive(maxTotal, "Max total value");
142         this.ioreactor = ioreactor;
143         this.connFactory = connFactory;
144         this.addressResolver = addressResolver;
145         this.sessionRequestCallback = new InternalSessionRequestCallback();
146         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
147         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
148         this.pending = new HashSet<SessionRequest>();
149         this.leased = new HashSet<E>();
150         this.available = new LinkedList<E>();
151         this.maxPerRoute = new HashMap<T, Integer>();
152         this.lock = new ReentrantLock();
153         this.defaultMaxPerRoute = defaultMaxPerRoute;
154         this.maxTotal = maxTotal;
155     }
156 
157     /**
158      * @deprecated (4.3) use {@link SocketAddressResolver}
159      */
160     @Deprecated
161     protected SocketAddress resolveRemoteAddress(final T route) {
162         return null;
163     }
164 
165     /**
166      * @deprecated (4.3) use {@link SocketAddressResolver}
167      */
168     @Deprecated
169     protected SocketAddress resolveLocalAddress(final T route) {
170         return null;
171     }
172 
173     protected abstract E createEntry(T route, C conn);
174 
175     public boolean isShutdown() {
176         return this.isShutDown;
177     }
178 
179     public void shutdown(final long waitMs) throws IOException {
180         if (this.isShutDown) {
181             return ;
182         }
183         this.isShutDown = true;
184         this.lock.lock();
185         try {
186             for (final SessionRequest sessionRequest: this.pending) {
187                 sessionRequest.cancel();
188             }
189             for (final E entry: this.available) {
190                 entry.close();
191             }
192             for (final E entry: this.leased) {
193                 entry.close();
194             }
195             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
196                 pool.shutdown();
197             }
198             this.routeToPool.clear();
199             this.leased.clear();
200             this.pending.clear();
201             this.available.clear();
202             this.leasingRequests.clear();
203             this.ioreactor.shutdown(waitMs);
204         } finally {
205             this.lock.unlock();
206         }
207     }
208 
209     private RouteSpecificPool<T, C, E> getPool(final T route) {
210         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
211         if (pool == null) {
212             pool = new RouteSpecificPool<T, C, E>(route) {
213 
214                 @Override
215                 protected E createEntry(final T route, final C conn) {
216                     return AbstractNIOConnPool.this.createEntry(route, conn);
217                 }
218 
219             };
220             this.routeToPool.put(route, pool);
221         }
222         return pool;
223     }
224 
225     public Future<E> lease(
226             final T route, final Object state,
227             final long connectTimeout, final TimeUnit tunit,
228             final FutureCallback<E> callback) {
229         Args.notNull(route, "Route");
230         Args.notNull(tunit, "Time unit");
231         Asserts.check(!this.isShutDown, "Connection pool shut down");
232         this.lock.lock();
233         try {
234             final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
235             final BasicFuture<E> future = new BasicFuture<E>(callback);
236             final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, future);
237             if (!processPendingRequest(request)) {
238                 this.leasingRequests.add(request);
239             }
240             return future;
241         } finally {
242             this.lock.unlock();
243         }
244     }
245 
246     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
247         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
248     }
249 
250     public Future<E> lease(final T route, final Object state) {
251         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
252     }
253 
254     public void release(final E entry, final boolean reusable) {
255         if (entry == null) {
256             return;
257         }
258         if (this.isShutDown) {
259             return;
260         }
261         this.lock.lock();
262         try {
263             if (this.leased.remove(entry)) {
264                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
265                 pool.free(entry, reusable);
266                 if (reusable) {
267                     this.available.addFirst(entry);
268                 } else {
269                     entry.close();
270                 }
271                 processNextPendingRequest();
272             }
273         } finally {
274             this.lock.unlock();
275         }
276     }
277 
278     private void processPendingRequests() {
279         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
280         while (it.hasNext()) {
281             final LeaseRequest<T, C, E> request = it.next();
282             if (processPendingRequest(request)) {
283                 it.remove();
284             }
285         }
286     }
287 
288     private void processNextPendingRequest() {
289         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
290         while (it.hasNext()) {
291             final LeaseRequest<T, C, E> request = it.next();
292             if (processPendingRequest(request)) {
293                 it.remove();
294                 return;
295             }
296         }
297     }
298 
299     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
300         final T route = request.getRoute();
301         final Object state = request.getState();
302         final long deadline = request.getDeadline();
303         final BasicFuture<E> future = request.getFuture();
304 
305         final long now = System.currentTimeMillis();
306         if (now > deadline) {
307             future.failed(new TimeoutException());
308             return true;
309         }
310 
311         final RouteSpecificPool<T, C, E> pool = getPool(route);
312         E entry = null;
313         for (;;) {
314             entry = pool.getFree(state);
315             if (entry == null) {
316                 break;
317             }
318             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
319                 entry.close();
320                 this.available.remove(entry);
321                 pool.free(entry, false);
322             } else {
323                 break;
324             }
325         }
326         if (entry != null) {
327             this.available.remove(entry);
328             this.leased.add(entry);
329             future.completed(entry);
330             return true;
331         }
332 
333         // New connection is needed
334         final int maxPerRoute = getMax(route);
335         // Shrink the pool prior to allocating a new connection
336         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
337         if (excess > 0) {
338             for (int i = 0; i < excess; i++) {
339                 final E lastUsed = pool.getLastUsed();
340                 if (lastUsed == null) {
341                     break;
342                 }
343                 lastUsed.close();
344                 this.available.remove(lastUsed);
345                 pool.remove(lastUsed);
346             }
347         }
348 
349         if (pool.getAllocatedCount() < maxPerRoute) {
350             final int totalUsed = this.pending.size() + this.leased.size();
351             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
352             if (freeCapacity == 0) {
353                 return false;
354             }
355             final int totalAvailable = this.available.size();
356             if (totalAvailable > freeCapacity - 1) {
357                 if (!this.available.isEmpty()) {
358                     final E lastUsed = this.available.removeLast();
359                     lastUsed.close();
360                     final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
361                     otherpool.remove(lastUsed);
362                 }
363             }
364 
365             final SocketAddress localAddress;
366             final SocketAddress remoteAddress;
367             try {
368                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
369                 localAddress = this.addressResolver.resolveLocalAddress(route);
370             } catch (final IOException ex) {
371                 future.failed(ex);
372                 return true;
373             }
374 
375             final SessionRequest sessionRequest = this.ioreactor.connect(
376                     remoteAddress, localAddress, route, this.sessionRequestCallback);
377             final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
378                     (int) request.getConnectTimeout() : Integer.MAX_VALUE;
379             sessionRequest.setConnectTimeout(timout);
380             this.pending.add(sessionRequest);
381             pool.addPending(sessionRequest, future);
382             return true;
383         } else {
384             return false;
385         }
386     }
387 
388     public void validatePendingRequests() {
389         this.lock.lock();
390         try {
391             final long now = System.currentTimeMillis();
392             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
393             while (it.hasNext()) {
394                 final LeaseRequest<T, C, E> request = it.next();
395                 final long deadline = request.getDeadline();
396                 if (now > deadline) {
397                     it.remove();
398                     final BasicFuture<E> future = request.getFuture();
399                     future.failed(new TimeoutException());
400                 }
401             }
402         } finally {
403             this.lock.unlock();
404         }
405     }
406 
407     protected void requestCompleted(final SessionRequest request) {
408         if (this.isShutDown) {
409             return;
410         }
411         @SuppressWarnings("unchecked")
412         final
413         T route = (T) request.getAttachment();
414         this.lock.lock();
415         try {
416             this.pending.remove(request);
417             final RouteSpecificPool<T, C, E> pool = getPool(route);
418             final IOSession session = request.getSession();
419             try {
420                 final C conn = this.connFactory.create(route, session);
421                 final E entry = pool.createEntry(request, conn);
422                 this.leased.add(entry);
423                 pool.completed(request, entry);
424 
425             } catch (final IOException ex) {
426                 pool.failed(request, ex);
427             }
428         } finally {
429             this.lock.unlock();
430         }
431     }
432 
433     protected void requestCancelled(final SessionRequest request) {
434         if (this.isShutDown) {
435             return;
436         }
437         @SuppressWarnings("unchecked")
438         final
439         T route = (T) request.getAttachment();
440         this.lock.lock();
441         try {
442             this.pending.remove(request);
443             final RouteSpecificPool<T, C, E> pool = getPool(route);
444             pool.cancelled(request);
445             processNextPendingRequest();
446         } finally {
447             this.lock.unlock();
448         }
449     }
450 
451     protected void requestFailed(final SessionRequest request) {
452         if (this.isShutDown) {
453             return;
454         }
455         @SuppressWarnings("unchecked")
456         final
457         T route = (T) request.getAttachment();
458         this.lock.lock();
459         try {
460             this.pending.remove(request);
461             final RouteSpecificPool<T, C, E> pool = getPool(route);
462             pool.failed(request, request.getException());
463             processNextPendingRequest();
464         } finally {
465             this.lock.unlock();
466         }
467     }
468 
469     protected void requestTimeout(final SessionRequest request) {
470         if (this.isShutDown) {
471             return;
472         }
473         @SuppressWarnings("unchecked")
474         final
475         T route = (T) request.getAttachment();
476         this.lock.lock();
477         try {
478             this.pending.remove(request);
479             final RouteSpecificPool<T, C, E> pool = getPool(route);
480             pool.timeout(request);
481             processNextPendingRequest();
482         } finally {
483             this.lock.unlock();
484         }
485     }
486 
487     private int getMax(final T route) {
488         final Integer v = this.maxPerRoute.get(route);
489         if (v != null) {
490             return v.intValue();
491         } else {
492             return this.defaultMaxPerRoute;
493         }
494     }
495 
496     public void setMaxTotal(final int max) {
497         Args.positive(max, "Max value");
498         this.lock.lock();
499         try {
500             this.maxTotal = max;
501         } finally {
502             this.lock.unlock();
503         }
504     }
505 
506     public int getMaxTotal() {
507         this.lock.lock();
508         try {
509             return this.maxTotal;
510         } finally {
511             this.lock.unlock();
512         }
513     }
514 
515     public void setDefaultMaxPerRoute(final int max) {
516         Args.positive(max, "Max value");
517         this.lock.lock();
518         try {
519             this.defaultMaxPerRoute = max;
520         } finally {
521             this.lock.unlock();
522         }
523     }
524 
525     public int getDefaultMaxPerRoute() {
526         this.lock.lock();
527         try {
528             return this.defaultMaxPerRoute;
529         } finally {
530             this.lock.unlock();
531         }
532     }
533 
534     public void setMaxPerRoute(final T route, final int max) {
535         Args.notNull(route, "Route");
536         Args.positive(max, "Max value");
537         this.lock.lock();
538         try {
539             this.maxPerRoute.put(route, max);
540         } finally {
541             this.lock.unlock();
542         }
543     }
544 
545     public int getMaxPerRoute(final T route) {
546         Args.notNull(route, "Route");
547         this.lock.lock();
548         try {
549             return getMax(route);
550         } finally {
551             this.lock.unlock();
552         }
553     }
554 
555     public PoolStats getTotalStats() {
556         this.lock.lock();
557         try {
558             return new PoolStats(
559                     this.leased.size(),
560                     this.pending.size(),
561                     this.available.size(),
562                     this.maxTotal);
563         } finally {
564             this.lock.unlock();
565         }
566     }
567 
568     public PoolStats getStats(final T route) {
569         Args.notNull(route, "Route");
570         this.lock.lock();
571         try {
572             final RouteSpecificPool<T, C, E> pool = getPool(route);
573             return new PoolStats(
574                     pool.getLeasedCount(),
575                     pool.getPendingCount(),
576                     pool.getAvailableCount(),
577                     getMax(route));
578         } finally {
579             this.lock.unlock();
580         }
581     }
582 
583     public void closeIdle(final long idletime, final TimeUnit tunit) {
584         Args.notNull(tunit, "Time unit");
585         long time = tunit.toMillis(idletime);
586         if (time < 0) {
587             time = 0;
588         }
589         final long deadline = System.currentTimeMillis() - time;
590         this.lock.lock();
591         try {
592             final Iterator<E> it = this.available.iterator();
593             while (it.hasNext()) {
594                 final E entry = it.next();
595                 if (entry.getUpdated() <= deadline) {
596                     entry.close();
597                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
598                     pool.remove(entry);
599                     it.remove();
600                 }
601             }
602             processPendingRequests();
603         } finally {
604             this.lock.unlock();
605         }
606     }
607 
608     public void closeExpired() {
609         final long now = System.currentTimeMillis();
610         this.lock.lock();
611         try {
612             final Iterator<E> it = this.available.iterator();
613             while (it.hasNext()) {
614                 final E entry = it.next();
615                 if (entry.isExpired(now)) {
616                     entry.close();
617                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
618                     pool.remove(entry);
619                     it.remove();
620                 }
621             }
622             processPendingRequests();
623         } finally {
624             this.lock.unlock();
625         }
626     }
627 
628     @Override
629     public String toString() {
630         final StringBuilder buffer = new StringBuilder();
631         buffer.append("[leased: ");
632         buffer.append(this.leased);
633         buffer.append("][available: ");
634         buffer.append(this.available);
635         buffer.append("][pending: ");
636         buffer.append(this.pending);
637         buffer.append("]");
638         return buffer.toString();
639     }
640 
641     class InternalSessionRequestCallback implements SessionRequestCallback {
642 
643         public void completed(final SessionRequest request) {
644             requestCompleted(request);
645         }
646 
647         public void cancelled(final SessionRequest request) {
648             requestCancelled(request);
649         }
650 
651         public void failed(final SessionRequest request) {
652             requestFailed(request);
653         }
654 
655         public void timeout(final SessionRequest request) {
656             requestTimeout(request);
657         }
658 
659     }
660 
661 }