View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.http.annotation.ThreadSafe;
45  import org.apache.http.concurrent.BasicFuture;
46  import org.apache.http.concurrent.FutureCallback;
47  import org.apache.http.nio.reactor.ConnectingIOReactor;
48  import org.apache.http.nio.reactor.IOSession;
49  import org.apache.http.nio.reactor.SessionRequest;
50  import org.apache.http.nio.reactor.SessionRequestCallback;
51  import org.apache.http.pool.ConnPool;
52  import org.apache.http.pool.ConnPoolControl;
53  import org.apache.http.pool.PoolEntry;
54  import org.apache.http.pool.PoolStats;
55  
56  /**
57   * Abstract non-blocking connection pool.
58   *
59   * @param <T> route
60   * @param <C> connection object
61   * @param <E> pool entry
62   *
63   * @since 4.2
64   */
65  @ThreadSafe
66  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
67                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
68  
69      private final ConnectingIOReactor ioreactor;
70      private final NIOConnFactory<T, C> connFactory;
71      private final SessionRequestCallback sessionRequestCallback;
72      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
73      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
74      private final Set<SessionRequest> pending;
75      private final Set<E> leased;
76      private final LinkedList<E> available;
77      private final Map<T, Integer> maxPerRoute;
78      private final Lock lock;
79  
80      private volatile boolean isShutDown;
81      private volatile int defaultMaxPerRoute;
82      private volatile int maxTotal;
83  
84      public AbstractNIOConnPool(
85              final ConnectingIOReactor ioreactor,
86              final NIOConnFactory<T, C> connFactory,
87              int defaultMaxPerRoute,
88              int maxTotal) {
89          super();
90          if (ioreactor == null) {
91              throw new IllegalArgumentException("I/O reactor may not be null");
92          }
93          if (connFactory == null) {
94              throw new IllegalArgumentException("Connection factory may not null");
95          }
96          if (defaultMaxPerRoute <= 0) {
97              throw new IllegalArgumentException("Max per route value may not be negative or zero");
98          }
99          if (maxTotal <= 0) {
100             throw new IllegalArgumentException("Max total value may not be negative or zero");
101         }
102         this.ioreactor = ioreactor;
103         this.connFactory = connFactory;
104         this.sessionRequestCallback = new InternalSessionRequestCallback();
105         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
106         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
107         this.pending = new HashSet<SessionRequest>();
108         this.leased = new HashSet<E>();
109         this.available = new LinkedList<E>();
110         this.maxPerRoute = new HashMap<T, Integer>();
111         this.lock = new ReentrantLock();
112         this.defaultMaxPerRoute = defaultMaxPerRoute;
113         this.maxTotal = maxTotal;
114     }
115 
116     protected abstract SocketAddress resolveRemoteAddress(T route);
117 
118     protected abstract SocketAddress resolveLocalAddress(T route);
119 
120     protected abstract E createEntry(T route, C conn);
121 
122     public boolean isShutdown() {
123         return this.isShutDown;
124     }
125 
126     public void shutdown(long waitMs) throws IOException {
127         if (this.isShutDown) {
128             return ;
129         }
130         this.isShutDown = true;
131         this.lock.lock();
132         try {
133             for (SessionRequest sessionRequest: this.pending) {
134                 sessionRequest.cancel();
135             }
136             for (E entry: this.available) {
137                 entry.close();
138             }
139             for (E entry: this.leased) {
140                 entry.close();
141             }
142             for (RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
143                 pool.shutdown();
144             }
145             this.routeToPool.clear();
146             this.leased.clear();
147             this.pending.clear();
148             this.available.clear();
149             this.leasingRequests.clear();
150             this.ioreactor.shutdown(waitMs);
151         } finally {
152             this.lock.unlock();
153         }
154     }
155 
156     private RouteSpecificPool<T, C, E> getPool(final T route) {
157         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
158         if (pool == null) {
159             pool = new RouteSpecificPool<T, C, E>(route) {
160 
161                 @Override
162                 protected E createEntry(final T route, final C conn) {
163                     return AbstractNIOConnPool.this.createEntry(route, conn);
164                 }
165 
166             };
167             this.routeToPool.put(route, pool);
168         }
169         return pool;
170     }
171 
172     public Future<E> lease(
173             final T route, final Object state,
174             final long connectTimeout, final TimeUnit tunit,
175             final FutureCallback<E> callback) {
176         if (route == null) {
177             throw new IllegalArgumentException("Route may not be null");
178         }
179         if (tunit == null) {
180             throw new IllegalArgumentException("Time unit may not be null.");
181         }
182         if (this.isShutDown) {
183             throw new IllegalStateException("Session pool has been shut down");
184         }
185         this.lock.lock();
186         try {
187             long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
188             BasicFuture<E> future = new BasicFuture<E>(callback);
189             LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, future);
190             boolean completed = processPendingRequest(request);
191             if (!request.isDone() && !completed) {
192                 this.leasingRequests.add(request);
193             }
194             return future;
195         } finally {
196             this.lock.unlock();
197         }
198     }
199 
200     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
201         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
202     }
203 
204     public Future<E> lease(final T route, final Object state) {
205         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
206     }
207 
208     public void release(final E entry, boolean reusable) {
209         if (entry == null) {
210             return;
211         }
212         if (this.isShutDown) {
213             return;
214         }
215         this.lock.lock();
216         try {
217             if (this.leased.remove(entry)) {
218                 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
219                 pool.free(entry, reusable);
220                 if (reusable) {
221                     this.available.addFirst(entry);
222                 } else {
223                     entry.close();
224                 }
225                 processNextPendingRequest();
226             }
227         } finally {
228             this.lock.unlock();
229         }
230     }
231 
232     private void processPendingRequests() {
233         ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
234         while (it.hasNext()) {
235             LeaseRequest<T, C, E> request = it.next();
236             boolean completed = processPendingRequest(request);
237             if (request.isDone() || completed) {
238                 it.remove();
239             }
240         }
241     }
242 
243     private void processNextPendingRequest() {
244         ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
245         while (it.hasNext()) {
246             LeaseRequest<T, C, E> request = it.next();
247             boolean completed = processPendingRequest(request);
248             if (request.isDone() || completed) {
249                 it.remove();
250             }
251             if (completed) {
252                 return;
253             }
254         }
255     }
256 
257     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
258         T route = request.getRoute();
259         Object state = request.getState();
260         long deadline = request.getDeadline();
261         BasicFuture<E> future = request.getFuture();
262 
263         long now = System.currentTimeMillis();
264         if (now > deadline) {
265             future.failed(new TimeoutException());
266             return false;
267         }
268 
269         RouteSpecificPool<T, C, E> pool = getPool(route);
270         E entry = null;
271         for (;;) {
272             entry = pool.getFree(state);
273             if (entry == null) {
274                 break;
275             }
276             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
277                 entry.close();
278                 this.available.remove(entry);
279                 pool.free(entry, false);
280             } else {
281                 break;
282             }
283         }
284         if (entry != null) {
285             this.available.remove(entry);
286             this.leased.add(entry);
287             future.completed(entry);
288             return true;
289         }
290 
291         // New connection is needed
292         int maxPerRoute = getMax(route);
293         // Shrink the pool prior to allocating a new connection
294         int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
295         if (excess > 0) {
296             for (int i = 0; i < excess; i++) {
297                 E lastUsed = pool.getLastUsed();
298                 if (lastUsed == null) {
299                     break;
300                 }
301                 lastUsed.close();
302                 this.available.remove(lastUsed);
303                 pool.remove(lastUsed);
304             }
305         }
306 
307         if (pool.getAllocatedCount() < maxPerRoute) {
308             int totalUsed = this.pending.size() + this.leased.size();
309             int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
310             if (freeCapacity == 0) {
311                 return false;
312             }
313             int totalAvailable = this.available.size();
314             if (totalAvailable > freeCapacity - 1) {
315                 if (!this.available.isEmpty()) {
316                     E lastUsed = this.available.removeLast();
317                     lastUsed.close();
318                     RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
319                     otherpool.remove(lastUsed);
320                 }
321             }
322             SessionRequest sessionRequest = this.ioreactor.connect(
323                     resolveRemoteAddress(route),
324                     resolveLocalAddress(route),
325                     route,
326                     this.sessionRequestCallback);
327             int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
328                     (int) request.getConnectTimeout() : Integer.MAX_VALUE;
329             sessionRequest.setConnectTimeout(timout);
330             this.pending.add(sessionRequest);
331             pool.addPending(sessionRequest, future);
332             return true;
333         } else {
334             return false;
335         }
336     }
337 
338     public void validatePendingRequests() {
339         this.lock.lock();
340         try {
341             long now = System.currentTimeMillis();
342             ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
343             while (it.hasNext()) {
344                 LeaseRequest<T, C, E> request = it.next();
345                 long deadline = request.getDeadline();
346                 if (now > deadline) {
347                     it.remove();
348                     BasicFuture<E> future = request.getFuture();
349                     future.failed(new TimeoutException());
350                 }
351             }
352         } finally {
353             this.lock.unlock();
354         }
355     }
356 
357     protected void requestCompleted(final SessionRequest request) {
358         if (this.isShutDown) {
359             return;
360         }
361         @SuppressWarnings("unchecked")
362         T route = (T) request.getAttachment();
363         this.lock.lock();
364         try {
365             this.pending.remove(request);
366             RouteSpecificPool<T, C, E> pool = getPool(route);
367             IOSession session = request.getSession();
368             try {
369                 C conn = this.connFactory.create(route, session);
370                 E entry = pool.createEntry(request, conn);
371                 this.leased.add(entry);
372                 pool.completed(request, entry);
373 
374             } catch (IOException ex) {
375                 pool.failed(request, ex);
376             }
377         } finally {
378             this.lock.unlock();
379         }
380     }
381 
382     protected void requestCancelled(final SessionRequest request) {
383         if (this.isShutDown) {
384             return;
385         }
386         @SuppressWarnings("unchecked")
387         T route = (T) request.getAttachment();
388         this.lock.lock();
389         try {
390             this.pending.remove(request);
391             RouteSpecificPool<T, C, E> pool = getPool(route);
392             pool.cancelled(request);
393             processNextPendingRequest();
394         } finally {
395             this.lock.unlock();
396         }
397     }
398 
399     protected void requestFailed(final SessionRequest request) {
400         if (this.isShutDown) {
401             return;
402         }
403         @SuppressWarnings("unchecked")
404         T route = (T) request.getAttachment();
405         this.lock.lock();
406         try {
407             this.pending.remove(request);
408             RouteSpecificPool<T, C, E> pool = getPool(route);
409             pool.failed(request, request.getException());
410             processNextPendingRequest();
411         } finally {
412             this.lock.unlock();
413         }
414     }
415 
416     protected void requestTimeout(final SessionRequest request) {
417         if (this.isShutDown) {
418             return;
419         }
420         @SuppressWarnings("unchecked")
421         T route = (T) request.getAttachment();
422         this.lock.lock();
423         try {
424             this.pending.remove(request);
425             RouteSpecificPool<T, C, E> pool = getPool(route);
426             pool.timeout(request);
427             processNextPendingRequest();
428         } finally {
429             this.lock.unlock();
430         }
431     }
432 
433     private int getMax(final T route) {
434         Integer v = this.maxPerRoute.get(route);
435         if (v != null) {
436             return v.intValue();
437         } else {
438             return this.defaultMaxPerRoute;
439         }
440     }
441 
442     public void setMaxTotal(int max) {
443         if (max <= 0) {
444             throw new IllegalArgumentException("Max value may not be negative or zero");
445         }
446         this.lock.lock();
447         try {
448             this.maxTotal = max;
449         } finally {
450             this.lock.unlock();
451         }
452     }
453 
454     public int getMaxTotal() {
455         this.lock.lock();
456         try {
457             return this.maxTotal;
458         } finally {
459             this.lock.unlock();
460         }
461     }
462 
463     public void setDefaultMaxPerRoute(int max) {
464         if (max <= 0) {
465             throw new IllegalArgumentException("Max value may not be negative or zero");
466         }
467         this.lock.lock();
468         try {
469             this.defaultMaxPerRoute = max;
470         } finally {
471             this.lock.unlock();
472         }
473     }
474 
475     public int getDefaultMaxPerRoute() {
476         this.lock.lock();
477         try {
478             return this.defaultMaxPerRoute;
479         } finally {
480             this.lock.unlock();
481         }
482     }
483 
484     public void setMaxPerRoute(final T route, int max) {
485         if (route == null) {
486             throw new IllegalArgumentException("Route may not be null");
487         }
488         if (max <= 0) {
489             throw new IllegalArgumentException("Max value may not be negative or zero");
490         }
491         this.lock.lock();
492         try {
493             this.maxPerRoute.put(route, max);
494         } finally {
495             this.lock.unlock();
496         }
497     }
498 
499     public int getMaxPerRoute(T route) {
500         if (route == null) {
501             throw new IllegalArgumentException("Route may not be null");
502         }
503         this.lock.lock();
504         try {
505             return getMax(route);
506         } finally {
507             this.lock.unlock();
508         }
509     }
510 
511     public PoolStats getTotalStats() {
512         this.lock.lock();
513         try {
514             return new PoolStats(
515                     this.leased.size(),
516                     this.pending.size(),
517                     this.available.size(),
518                     this.maxTotal);
519         } finally {
520             this.lock.unlock();
521         }
522     }
523 
524     public PoolStats getStats(final T route) {
525         if (route == null) {
526             throw new IllegalArgumentException("Route may not be null");
527         }
528         this.lock.lock();
529         try {
530             RouteSpecificPool<T, C, E> pool = getPool(route);
531             return new PoolStats(
532                     pool.getLeasedCount(),
533                     pool.getPendingCount(),
534                     pool.getAvailableCount(),
535                     getMax(route));
536         } finally {
537             this.lock.unlock();
538         }
539     }
540 
541     public void closeIdle(long idletime, final TimeUnit tunit) {
542         if (tunit == null) {
543             throw new IllegalArgumentException("Time unit must not be null.");
544         }
545         long time = tunit.toMillis(idletime);
546         if (time < 0) {
547             time = 0;
548         }
549         long deadline = System.currentTimeMillis() - time;
550         this.lock.lock();
551         try {
552             Iterator<E> it = this.available.iterator();
553             while (it.hasNext()) {
554                 E entry = it.next();
555                 if (entry.getUpdated() <= deadline) {
556                     entry.close();
557                     RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
558                     pool.remove(entry);
559                     it.remove();
560                 }
561             }
562             processPendingRequests();
563         } finally {
564             this.lock.unlock();
565         }
566     }
567 
568     public void closeExpired() {
569         long now = System.currentTimeMillis();
570         this.lock.lock();
571         try {
572             Iterator<E> it = this.available.iterator();
573             while (it.hasNext()) {
574                 E entry = it.next();
575                 if (entry.isExpired(now)) {
576                     entry.close();
577                     RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
578                     pool.remove(entry);
579                     it.remove();
580                 }
581             }
582             processPendingRequests();
583         } finally {
584             this.lock.unlock();
585         }
586     }
587 
588     @Override
589     public String toString() {
590         StringBuilder buffer = new StringBuilder();
591         buffer.append("[leased: ");
592         buffer.append(this.leased);
593         buffer.append("][available: ");
594         buffer.append(this.available);
595         buffer.append("][pending: ");
596         buffer.append(this.pending);
597         buffer.append("]");
598         return buffer.toString();
599     }
600 
601     class InternalSessionRequestCallback implements SessionRequestCallback {
602 
603         public void completed(final SessionRequest request) {
604             requestCompleted(request);
605         }
606 
607         public void cancelled(final SessionRequest request) {
608             requestCancelled(request);
609         }
610 
611         public void failed(final SessionRequest request) {
612             requestFailed(request);
613         }
614 
615         public void timeout(final SessionRequest request) {
616             requestTimeout(request);
617         }
618 
619     }
620 
621 }