View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.http.annotation.ThreadingBehavior;
44  import org.apache.http.annotation.Contract;
45  import org.apache.http.concurrent.FutureCallback;
46  import org.apache.http.util.Args;
47  import org.apache.http.util.Asserts;
48  
49  /**
50   * Abstract synchronous (blocking) pool of connections.
51   * <p>
52   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
53   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
54   * method on the {@link Future} object returned by the
55   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
56   * to complete.
57   *
58   * @param <T> the route type that represents the opposite endpoint of a pooled
59   *   connection.
60   * @param <C> the connection type.
61   * @param <E> the type of the pool entry containing a pooled connection.
62   * @since 4.2
63   */
64  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
65  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
66                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
67  
68      private final Lock lock;
69      private final ConnFactory<T, C> connFactory;
70      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
71      private final Set<E> leased;
72      private final LinkedList<E> available;
73      private final LinkedList<PoolEntryFuture<E>> pending;
74      private final Map<T, Integer> maxPerRoute;
75  
76      private volatile boolean isShutDown;
77      private volatile int defaultMaxPerRoute;
78      private volatile int maxTotal;
79      private volatile int validateAfterInactivity;
80  
81      public AbstractConnPool(
82              final ConnFactory<T, C> connFactory,
83              final int defaultMaxPerRoute,
84              final int maxTotal) {
85          super();
86          this.connFactory = Args.notNull(connFactory, "Connection factory");
87          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
88          this.maxTotal = Args.positive(maxTotal, "Max total value");
89          this.lock = new ReentrantLock();
90          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
91          this.leased = new HashSet<E>();
92          this.available = new LinkedList<E>();
93          this.pending = new LinkedList<PoolEntryFuture<E>>();
94          this.maxPerRoute = new HashMap<T, Integer>();
95      }
96  
97      /**
98       * Creates a new entry for the given connection with the given route.
99       */
100     protected abstract E createEntry(T route, C conn);
101 
102     /**
103      * @since 4.3
104      */
105     protected void onLease(final E entry) {
106     }
107 
108     /**
109      * @since 4.3
110      */
111     protected void onRelease(final E entry) {
112     }
113 
114     /**
115      * @since 4.4
116      */
117     protected void onReuse(final E entry) {
118     }
119 
120     /**
121      * @since 4.4
122      */
123     protected boolean validate(final E entry) {
124         return true;
125     }
126 
127     public boolean isShutdown() {
128         return this.isShutDown;
129     }
130 
131     /**
132      * Shuts down the pool.
133      */
134     public void shutdown() throws IOException {
135         if (this.isShutDown) {
136             return ;
137         }
138         this.isShutDown = true;
139         this.lock.lock();
140         try {
141             for (final E entry: this.available) {
142                 entry.close();
143             }
144             for (final E entry: this.leased) {
145                 entry.close();
146             }
147             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
148                 pool.shutdown();
149             }
150             this.routeToPool.clear();
151             this.leased.clear();
152             this.available.clear();
153         } finally {
154             this.lock.unlock();
155         }
156     }
157 
158     private RouteSpecificPool<T, C, E> getPool(final T route) {
159         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
160         if (pool == null) {
161             pool = new RouteSpecificPool<T, C, E>(route) {
162 
163                 @Override
164                 protected E createEntry(final C conn) {
165                     return AbstractConnPool.this.createEntry(route, conn);
166                 }
167 
168             };
169             this.routeToPool.put(route, pool);
170         }
171         return pool;
172     }
173 
174     /**
175      * {@inheritDoc}
176      * <p>
177      * Please note that this class does not maintain its own pool of execution
178      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
179      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
180      * returned by this method in order for the lease operation to complete.
181      */
182     @Override
183     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
184         Args.notNull(route, "Route");
185         Asserts.check(!this.isShutDown, "Connection pool shut down");
186         return new PoolEntryFuture<E>(this.lock, callback) {
187 
188             @Override
189             public E getPoolEntry(
190                     final long timeout,
191                     final TimeUnit tunit)
192                         throws InterruptedException, TimeoutException, IOException {
193                 final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
194                 onLease(entry);
195                 return entry;
196             }
197 
198         };
199     }
200 
201     /**
202      * Attempts to lease a connection for the given route and with the given
203      * state from the pool.
204      * <p>
205      * Please note that this class does not maintain its own pool of execution
206      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
207      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
208      * returned by this method in order for the lease operation to complete.
209      *
210      * @param route route of the connection.
211      * @param state arbitrary object that represents a particular state
212      *  (usually a security principal or a unique token identifying
213      *  the user whose credentials have been used while establishing the connection).
214      *  May be {@code null}.
215      * @return future for a leased pool entry.
216      */
217     public Future<E> lease(final T route, final Object state) {
218         return lease(route, state, null);
219     }
220 
221     private E getPoolEntryBlocking(
222             final T route, final Object state,
223             final long timeout, final TimeUnit tunit,
224             final PoolEntryFuture<E> future)
225                 throws IOException, InterruptedException, TimeoutException {
226 
227         Date deadline = null;
228         if (timeout > 0) {
229             deadline = new Date
230                 (System.currentTimeMillis() + tunit.toMillis(timeout));
231         }
232 
233         this.lock.lock();
234         try {
235             final RouteSpecificPool<T, C, E> pool = getPool(route);
236             E entry = null;
237             while (entry == null) {
238                 Asserts.check(!this.isShutDown, "Connection pool shut down");
239                 for (;;) {
240                     entry = pool.getFree(state);
241                     if (entry == null) {
242                         break;
243                     }
244                     if (entry.isExpired(System.currentTimeMillis())) {
245                         entry.close();
246                     } else if (this.validateAfterInactivity > 0) {
247                         if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
248                             if (!validate(entry)) {
249                                 entry.close();
250                             }
251                         }
252                     }
253                     if (entry.isClosed()) {
254                         this.available.remove(entry);
255                         pool.free(entry, false);
256                     } else {
257                         break;
258                     }
259                 }
260                 if (entry != null) {
261                     this.available.remove(entry);
262                     this.leased.add(entry);
263                     onReuse(entry);
264                     return entry;
265                 }
266 
267                 // New connection is needed
268                 final int maxPerRoute = getMax(route);
269                 // Shrink the pool prior to allocating a new connection
270                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
271                 if (excess > 0) {
272                     for (int i = 0; i < excess; i++) {
273                         final E lastUsed = pool.getLastUsed();
274                         if (lastUsed == null) {
275                             break;
276                         }
277                         lastUsed.close();
278                         this.available.remove(lastUsed);
279                         pool.remove(lastUsed);
280                     }
281                 }
282 
283                 if (pool.getAllocatedCount() < maxPerRoute) {
284                     final int totalUsed = this.leased.size();
285                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
286                     if (freeCapacity > 0) {
287                         final int totalAvailable = this.available.size();
288                         if (totalAvailable > freeCapacity - 1) {
289                             if (!this.available.isEmpty()) {
290                                 final E lastUsed = this.available.removeLast();
291                                 lastUsed.close();
292                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
293                                 otherpool.remove(lastUsed);
294                             }
295                         }
296                         final C conn = this.connFactory.create(route);
297                         entry = pool.add(conn);
298                         this.leased.add(entry);
299                         return entry;
300                     }
301                 }
302 
303                 boolean success = false;
304                 try {
305                     pool.queue(future);
306                     this.pending.add(future);
307                     success = future.await(deadline);
308                 } finally {
309                     // In case of 'success', we were woken up by the
310                     // connection pool and should now have a connection
311                     // waiting for us, or else we're shutting down.
312                     // Just continue in the loop, both cases are checked.
313                     pool.unqueue(future);
314                     this.pending.remove(future);
315                 }
316                 // check for spurious wakeup vs. timeout
317                 if (!success && (deadline != null) &&
318                     (deadline.getTime() <= System.currentTimeMillis())) {
319                     break;
320                 }
321             }
322             throw new TimeoutException("Timeout waiting for connection");
323         } finally {
324             this.lock.unlock();
325         }
326     }
327 
328     @Override
329     public void release(final E entry, final boolean reusable) {
330         this.lock.lock();
331         try {
332             if (this.leased.remove(entry)) {
333                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
334                 pool.free(entry, reusable);
335                 if (reusable && !this.isShutDown) {
336                     this.available.addFirst(entry);
337                 } else {
338                     entry.close();
339                 }
340                 onRelease(entry);
341                 PoolEntryFuture<E> future = pool.nextPending();
342                 if (future != null) {
343                     this.pending.remove(future);
344                 } else {
345                     future = this.pending.poll();
346                 }
347                 if (future != null) {
348                     future.wakeup();
349                 }
350             }
351         } finally {
352             this.lock.unlock();
353         }
354     }
355 
356     private int getMax(final T route) {
357         final Integer v = this.maxPerRoute.get(route);
358         if (v != null) {
359             return v.intValue();
360         } else {
361             return this.defaultMaxPerRoute;
362         }
363     }
364 
365     @Override
366     public void setMaxTotal(final int max) {
367         Args.positive(max, "Max value");
368         this.lock.lock();
369         try {
370             this.maxTotal = max;
371         } finally {
372             this.lock.unlock();
373         }
374     }
375 
376     @Override
377     public int getMaxTotal() {
378         this.lock.lock();
379         try {
380             return this.maxTotal;
381         } finally {
382             this.lock.unlock();
383         }
384     }
385 
386     @Override
387     public void setDefaultMaxPerRoute(final int max) {
388         Args.positive(max, "Max per route value");
389         this.lock.lock();
390         try {
391             this.defaultMaxPerRoute = max;
392         } finally {
393             this.lock.unlock();
394         }
395     }
396 
397     @Override
398     public int getDefaultMaxPerRoute() {
399         this.lock.lock();
400         try {
401             return this.defaultMaxPerRoute;
402         } finally {
403             this.lock.unlock();
404         }
405     }
406 
407     @Override
408     public void setMaxPerRoute(final T route, final int max) {
409         Args.notNull(route, "Route");
410         Args.positive(max, "Max per route value");
411         this.lock.lock();
412         try {
413             this.maxPerRoute.put(route, Integer.valueOf(max));
414         } finally {
415             this.lock.unlock();
416         }
417     }
418 
419     @Override
420     public int getMaxPerRoute(final T route) {
421         Args.notNull(route, "Route");
422         this.lock.lock();
423         try {
424             return getMax(route);
425         } finally {
426             this.lock.unlock();
427         }
428     }
429 
430     @Override
431     public PoolStats getTotalStats() {
432         this.lock.lock();
433         try {
434             return new PoolStats(
435                     this.leased.size(),
436                     this.pending.size(),
437                     this.available.size(),
438                     this.maxTotal);
439         } finally {
440             this.lock.unlock();
441         }
442     }
443 
444     @Override
445     public PoolStats getStats(final T route) {
446         Args.notNull(route, "Route");
447         this.lock.lock();
448         try {
449             final RouteSpecificPool<T, C, E> pool = getPool(route);
450             return new PoolStats(
451                     pool.getLeasedCount(),
452                     pool.getPendingCount(),
453                     pool.getAvailableCount(),
454                     getMax(route));
455         } finally {
456             this.lock.unlock();
457         }
458     }
459 
460     /**
461      * Returns snapshot of all knows routes
462      * @return the set of routes
463      *
464      * @since 4.4
465      */
466     public Set<T> getRoutes() {
467         this.lock.lock();
468         try {
469             return new HashSet<T>(routeToPool.keySet());
470         } finally {
471             this.lock.unlock();
472         }
473     }
474 
475     /**
476      * Enumerates all available connections.
477      *
478      * @since 4.3
479      */
480     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
481         this.lock.lock();
482         try {
483             final Iterator<E> it = this.available.iterator();
484             while (it.hasNext()) {
485                 final E entry = it.next();
486                 callback.process(entry);
487                 if (entry.isClosed()) {
488                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
489                     pool.remove(entry);
490                     it.remove();
491                 }
492             }
493             purgePoolMap();
494         } finally {
495             this.lock.unlock();
496         }
497     }
498 
499     /**
500      * Enumerates all leased connections.
501      *
502      * @since 4.3
503      */
504     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
505         this.lock.lock();
506         try {
507             final Iterator<E> it = this.leased.iterator();
508             while (it.hasNext()) {
509                 final E entry = it.next();
510                 callback.process(entry);
511             }
512         } finally {
513             this.lock.unlock();
514         }
515     }
516 
517     private void purgePoolMap() {
518         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
519         while (it.hasNext()) {
520             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
521             final RouteSpecificPool<T, C, E> pool = entry.getValue();
522             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
523                 it.remove();
524             }
525         }
526     }
527 
528     /**
529      * Closes connections that have been idle longer than the given period
530      * of time and evicts them from the pool.
531      *
532      * @param idletime maximum idle time.
533      * @param tunit time unit.
534      */
535     public void closeIdle(final long idletime, final TimeUnit tunit) {
536         Args.notNull(tunit, "Time unit");
537         long time = tunit.toMillis(idletime);
538         if (time < 0) {
539             time = 0;
540         }
541         final long deadline = System.currentTimeMillis() - time;
542         enumAvailable(new PoolEntryCallback<T, C>() {
543 
544             @Override
545             public void process(final PoolEntry<T, C> entry) {
546                 if (entry.getUpdated() <= deadline) {
547                     entry.close();
548                 }
549             }
550 
551         });
552     }
553 
554     /**
555      * Closes expired connections and evicts them from the pool.
556      */
557     public void closeExpired() {
558         final long now = System.currentTimeMillis();
559         enumAvailable(new PoolEntryCallback<T, C>() {
560 
561             @Override
562             public void process(final PoolEntry<T, C> entry) {
563                 if (entry.isExpired(now)) {
564                     entry.close();
565                 }
566             }
567 
568         });
569     }
570 
571     /**
572      * @return the number of milliseconds
573      * @since 4.4
574      */
575     public int getValidateAfterInactivity() {
576         return this.validateAfterInactivity;
577     }
578 
579     /**
580      * @param ms the number of milliseconds
581      * @since 4.4
582      */
583     public void setValidateAfterInactivity(final int ms) {
584         this.validateAfterInactivity = ms;
585     }
586 
587     @Override
588     public String toString() {
589         final StringBuilder buffer = new StringBuilder();
590         buffer.append("[leased: ");
591         buffer.append(this.leased);
592         buffer.append("][available: ");
593         buffer.append(this.available);
594         buffer.append("][pending: ");
595         buffer.append(this.pending);
596         buffer.append("]");
597         return buffer.toString();
598     }
599 
600 }