View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.http.annotation.ThreadSafe;
45  import org.apache.http.concurrent.BasicFuture;
46  import org.apache.http.concurrent.FutureCallback;
47  import org.apache.http.nio.reactor.ConnectingIOReactor;
48  import org.apache.http.nio.reactor.IOSession;
49  import org.apache.http.nio.reactor.SessionRequest;
50  import org.apache.http.nio.reactor.SessionRequestCallback;
51  import org.apache.http.pool.ConnPool;
52  import org.apache.http.pool.ConnPoolControl;
53  import org.apache.http.pool.PoolEntry;
54  import org.apache.http.pool.PoolStats;
55  
56  /**
57   * Abstract non-blocking connection pool.
58   *
59   * @param <T> route
60   * @param <C> connection object
61   * @param <E> pool entry
62   *
63   * @since 4.2
64   */
65  @ThreadSafe
66  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
67                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
68  
69      private final ConnectingIOReactor ioreactor;
70      private final NIOConnFactory<T, C> connFactory;
71      private final SessionRequestCallback sessionRequestCallback;
72      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
73      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
74      private final Set<SessionRequest> pending;
75      private final Set<E> leased;
76      private final LinkedList<E> available;
77      private final Map<T, Integer> maxPerRoute;
78      private final Lock lock;
79  
80      private volatile boolean isShutDown;
81      private volatile int defaultMaxPerRoute;
82      private volatile int maxTotal;
83  
84      public AbstractNIOConnPool(
85              final ConnectingIOReactor ioreactor,
86              final NIOConnFactory<T, C> connFactory,
87              int defaultMaxPerRoute,
88              int maxTotal) {
89          super();
90          if (ioreactor == null) {
91              throw new IllegalArgumentException("I/O reactor may not be null");
92          }
93          if (connFactory == null) {
94              throw new IllegalArgumentException("Connection factory may not null");
95          }
96          if (defaultMaxPerRoute <= 0) {
97              throw new IllegalArgumentException("Max per route value may not be negative or zero");
98          }
99          if (maxTotal <= 0) {
100             throw new IllegalArgumentException("Max total value may not be negative or zero");
101         }
102         this.ioreactor = ioreactor;
103         this.connFactory = connFactory;
104         this.sessionRequestCallback = new InternalSessionRequestCallback();
105         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
106         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
107         this.pending = new HashSet<SessionRequest>();
108         this.leased = new HashSet<E>();
109         this.available = new LinkedList<E>();
110         this.maxPerRoute = new HashMap<T, Integer>();
111         this.lock = new ReentrantLock();
112         this.defaultMaxPerRoute = defaultMaxPerRoute;
113         this.maxTotal = maxTotal;
114     }
115 
116     protected abstract SocketAddress resolveRemoteAddress(T route);
117 
118     protected abstract SocketAddress resolveLocalAddress(T route);
119 
120     protected abstract E createEntry(T route, C conn);
121 
122     public boolean isShutdown() {
123         return this.isShutDown;
124     }
125 
126     public void shutdown(long waitMs) throws IOException {
127         if (this.isShutDown) {
128             return ;
129         }
130         this.isShutDown = true;
131         this.lock.lock();
132         try {
133             for (SessionRequest sessionRequest: this.pending) {
134                 sessionRequest.cancel();
135             }
136             for (E entry: this.available) {
137                 entry.close();
138             }
139             for (E entry: this.leased) {
140                 entry.close();
141             }
142             for (RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
143                 pool.shutdown();
144             }
145             this.routeToPool.clear();
146             this.leased.clear();
147             this.pending.clear();
148             this.available.clear();
149             this.leasingRequests.clear();
150             this.ioreactor.shutdown(waitMs);
151         } finally {
152             this.lock.unlock();
153         }
154     }
155 
156     private RouteSpecificPool<T, C, E> getPool(final T route) {
157         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
158         if (pool == null) {
159             pool = new RouteSpecificPool<T, C, E>(route) {
160 
161                 @Override
162                 protected E createEntry(final T route, final C conn) {
163                     return AbstractNIOConnPool.this.createEntry(route, conn);
164                 }
165 
166             };
167             this.routeToPool.put(route, pool);
168         }
169         return pool;
170     }
171 
172     public Future<E> lease(
173             final T route, final Object state,
174             final long connectTimeout, final TimeUnit tunit,
175             final FutureCallback<E> callback) {
176         if (route == null) {
177             throw new IllegalArgumentException("Route may not be null");
178         }
179         if (tunit == null) {
180             throw new IllegalArgumentException("Time unit may not be null.");
181         }
182         if (this.isShutDown) {
183             throw new IllegalStateException("Session pool has been shut down");
184         }
185         this.lock.lock();
186         try {
187             long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
188             BasicFuture<E> future = new BasicFuture<E>(callback);
189             LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, future);
190             this.leasingRequests.add(request);
191 
192             processPendingRequests();
193             return future;
194         } finally {
195             this.lock.unlock();
196         }
197     }
198 
199     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
200         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
201     }
202 
203     public Future<E> lease(final T route, final Object state) {
204         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
205     }
206 
207     public void release(final E entry, boolean reusable) {
208         if (entry == null) {
209             return;
210         }
211         if (this.isShutDown) {
212             return;
213         }
214         this.lock.lock();
215         try {
216             if (this.leased.remove(entry)) {
217                 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
218                 pool.free(entry, reusable);
219                 if (reusable) {
220                     this.available.addFirst(entry);
221                 } else {
222                     entry.close();
223                 }
224                 processPendingRequests();
225             }
226         } finally {
227             this.lock.unlock();
228         }
229     }
230 
231     private void processPendingRequests() {
232         ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
233         while (it.hasNext()) {
234             LeaseRequest<T, C, E> request = it.next();
235 
236             T route = request.getRoute();
237             Object state = request.getState();
238             long deadline = request.getDeadline();
239             BasicFuture<E> future = request.getFuture();
240 
241             long now = System.currentTimeMillis();
242             if (now > deadline) {
243                 it.remove();
244                 future.failed(new TimeoutException());
245                 continue;
246             }
247 
248             RouteSpecificPool<T, C, E> pool = getPool(route);
249             E entry = null;
250             for (;;) {
251                 entry = pool.getFree(state);
252                 if (entry == null) {
253                     break;
254                 }
255                 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
256                     entry.close();
257                     this.available.remove(entry);
258                     pool.free(entry, false);
259                 } else {
260                     break;
261                 }
262             }
263             if (entry != null) {
264                 it.remove();
265                 this.available.remove(entry);
266                 this.leased.add(entry);
267                 future.completed(entry);
268                 continue;
269             }
270 
271             // New connection is needed
272             int maxPerRoute = getMax(route);
273             // Shrink the pool prior to allocating a new connection
274             int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
275             if (excess > 0) {
276                 for (int i = 0; i < excess; i++) {
277                     E lastUsed = pool.getLastUsed();
278                     if (lastUsed == null) {
279                         break;
280                     }
281                     lastUsed.close();
282                     this.available.remove(lastUsed);
283                     pool.remove(lastUsed);
284                 }
285             }
286 
287             if (pool.getAllocatedCount() < maxPerRoute) {
288                 int totalUsed = this.pending.size() + this.leased.size();
289                 int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
290                 if (freeCapacity == 0) {
291                     continue;
292                 }
293                 int totalAvailable = this.available.size();
294                 if (totalAvailable > freeCapacity - 1) {
295                     if (!this.available.isEmpty()) {
296                         E lastUsed = this.available.removeLast();
297                         lastUsed.close();
298                         RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
299                         otherpool.remove(lastUsed);
300                     }
301                 }
302                 it.remove();
303                 SessionRequest sessionRequest = this.ioreactor.connect(
304                         resolveRemoteAddress(route),
305                         resolveLocalAddress(route),
306                         route,
307                         this.sessionRequestCallback);
308                 int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
309                         (int) request.getConnectTimeout() : Integer.MAX_VALUE;
310                 sessionRequest.setConnectTimeout(timout);
311                 this.pending.add(sessionRequest);
312                 pool.addPending(sessionRequest, future);
313             }
314         }
315     }
316 
317     public void validatePendingRequests() {
318         this.lock.lock();
319         try {
320             long now = System.currentTimeMillis();
321             ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
322             while (it.hasNext()) {
323                 LeaseRequest<T, C, E> request = it.next();
324                 long deadline = request.getDeadline();
325                 if (now > deadline) {
326                     it.remove();
327                     BasicFuture<E> future = request.getFuture();
328                     future.failed(new TimeoutException());
329                 }
330             }
331         } finally {
332             this.lock.unlock();
333         }
334     }
335 
336     protected void requestCompleted(final SessionRequest request) {
337         if (this.isShutDown) {
338             return;
339         }
340         @SuppressWarnings("unchecked")
341         T route = (T) request.getAttachment();
342         this.lock.lock();
343         try {
344             this.pending.remove(request);
345             RouteSpecificPool<T, C, E> pool = getPool(route);
346             IOSession session = request.getSession();
347             try {
348                 C conn = this.connFactory.create(route, session);
349                 E entry = pool.createEntry(request, conn);
350                 this.leased.add(entry);
351                 pool.completed(request, entry);
352 
353             } catch (IOException ex) {
354                 pool.failed(request, ex);
355             }
356         } finally {
357             this.lock.unlock();
358         }
359     }
360 
361     protected void requestCancelled(final SessionRequest request) {
362         if (this.isShutDown) {
363             return;
364         }
365         @SuppressWarnings("unchecked")
366         T route = (T) request.getAttachment();
367         this.lock.lock();
368         try {
369             this.pending.remove(request);
370             RouteSpecificPool<T, C, E> pool = getPool(route);
371             pool.cancelled(request);
372             processPendingRequests();
373         } finally {
374             this.lock.unlock();
375         }
376     }
377 
378     protected void requestFailed(final SessionRequest request) {
379         if (this.isShutDown) {
380             return;
381         }
382         @SuppressWarnings("unchecked")
383         T route = (T) request.getAttachment();
384         this.lock.lock();
385         try {
386             this.pending.remove(request);
387             RouteSpecificPool<T, C, E> pool = getPool(route);
388             pool.failed(request, request.getException());
389             processPendingRequests();
390         } finally {
391             this.lock.unlock();
392         }
393     }
394 
395     protected void requestTimeout(final SessionRequest request) {
396         if (this.isShutDown) {
397             return;
398         }
399         @SuppressWarnings("unchecked")
400         T route = (T) request.getAttachment();
401         this.lock.lock();
402         try {
403             this.pending.remove(request);
404             RouteSpecificPool<T, C, E> pool = getPool(route);
405             pool.timeout(request);
406             processPendingRequests();
407         } finally {
408             this.lock.unlock();
409         }
410     }
411 
412     private int getMax(final T route) {
413         Integer v = this.maxPerRoute.get(route);
414         if (v != null) {
415             return v.intValue();
416         } else {
417             return this.defaultMaxPerRoute;
418         }
419     }
420 
421     public void setMaxTotal(int max) {
422         if (max <= 0) {
423             throw new IllegalArgumentException("Max value may not be negative or zero");
424         }
425         this.lock.lock();
426         try {
427             this.maxTotal = max;
428         } finally {
429             this.lock.unlock();
430         }
431     }
432 
433     public int getMaxTotal() {
434         this.lock.lock();
435         try {
436             return this.maxTotal;
437         } finally {
438             this.lock.unlock();
439         }
440     }
441 
442     public void setDefaultMaxPerRoute(int max) {
443         if (max <= 0) {
444             throw new IllegalArgumentException("Max value may not be negative or zero");
445         }
446         this.lock.lock();
447         try {
448             this.defaultMaxPerRoute = max;
449         } finally {
450             this.lock.unlock();
451         }
452     }
453 
454     public int getDefaultMaxPerRoute() {
455         this.lock.lock();
456         try {
457             return this.defaultMaxPerRoute;
458         } finally {
459             this.lock.unlock();
460         }
461     }
462 
463     public void setMaxPerRoute(final T route, int max) {
464         if (route == null) {
465             throw new IllegalArgumentException("Route may not be null");
466         }
467         if (max <= 0) {
468             throw new IllegalArgumentException("Max value may not be negative or zero");
469         }
470         this.lock.lock();
471         try {
472             this.maxPerRoute.put(route, max);
473         } finally {
474             this.lock.unlock();
475         }
476     }
477 
478     public int getMaxPerRoute(T route) {
479         if (route == null) {
480             throw new IllegalArgumentException("Route may not be null");
481         }
482         this.lock.lock();
483         try {
484             return getMax(route);
485         } finally {
486             this.lock.unlock();
487         }
488     }
489 
490     public PoolStats getTotalStats() {
491         this.lock.lock();
492         try {
493             return new PoolStats(
494                     this.leased.size(),
495                     this.pending.size(),
496                     this.available.size(),
497                     this.maxTotal);
498         } finally {
499             this.lock.unlock();
500         }
501     }
502 
503     public PoolStats getStats(final T route) {
504         if (route == null) {
505             throw new IllegalArgumentException("Route may not be null");
506         }
507         this.lock.lock();
508         try {
509             RouteSpecificPool<T, C, E> pool = getPool(route);
510             return new PoolStats(
511                     pool.getLeasedCount(),
512                     pool.getPendingCount(),
513                     pool.getAvailableCount(),
514                     getMax(route));
515         } finally {
516             this.lock.unlock();
517         }
518     }
519 
520     public void closeIdle(long idletime, final TimeUnit tunit) {
521         if (tunit == null) {
522             throw new IllegalArgumentException("Time unit must not be null.");
523         }
524         long time = tunit.toMillis(idletime);
525         if (time < 0) {
526             time = 0;
527         }
528         long deadline = System.currentTimeMillis() - time;
529         this.lock.lock();
530         try {
531             Iterator<E> it = this.available.iterator();
532             while (it.hasNext()) {
533                 E entry = it.next();
534                 if (entry.getUpdated() <= deadline) {
535                     entry.close();
536                     RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
537                     pool.remove(entry);
538                     it.remove();
539                 }
540             }
541             processPendingRequests();
542         } finally {
543             this.lock.unlock();
544         }
545     }
546 
547     public void closeExpired() {
548         long now = System.currentTimeMillis();
549         this.lock.lock();
550         try {
551             Iterator<E> it = this.available.iterator();
552             while (it.hasNext()) {
553                 E entry = it.next();
554                 if (entry.isExpired(now)) {
555                     entry.close();
556                     RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
557                     pool.remove(entry);
558                     it.remove();
559                 }
560             }
561             processPendingRequests();
562         } finally {
563             this.lock.unlock();
564         }
565     }
566 
567     @Override
568     public String toString() {
569         StringBuilder buffer = new StringBuilder();
570         buffer.append("[leased: ");
571         buffer.append(this.leased);
572         buffer.append("][available: ");
573         buffer.append(this.available);
574         buffer.append("][pending: ");
575         buffer.append(this.pending);
576         buffer.append("]");
577         return buffer.toString();
578     }
579 
580     class InternalSessionRequestCallback implements SessionRequestCallback {
581 
582         public void completed(final SessionRequest request) {
583             requestCompleted(request);
584         }
585 
586         public void cancelled(final SessionRequest request) {
587             requestCancelled(request);
588         }
589 
590         public void failed(final SessionRequest request) {
591             requestFailed(request);
592         }
593 
594         public void timeout(final SessionRequest request) {
595             requestTimeout(request);
596         }
597 
598     }
599 
600 }