View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.concurrent.FutureCallback;
45  import org.apache.http.util.Args;
46  import org.apache.http.util.Asserts;
47  
48  /**
49   * Abstract synchronous (blocking) pool of connections.
50   * <p/>
51   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
52   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
53   * method on the {@link Future} object returned by the
54   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
55   * to complete.
56   *
57   * @param <T> the route type that represents the opposite endpoint of a pooled
58   *   connection.
59   * @param <C> the connection type.
60   * @param <E> the type of the pool entry containing a pooled connection.
61   * @since 4.2
62   */
63  @ThreadSafe
64  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
65                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
66  
67      private final Lock lock;
68      private final ConnFactory<T, C> connFactory;
69      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
70      private final Set<E> leased;
71      private final LinkedList<E> available;
72      private final LinkedList<PoolEntryFuture<E>> pending;
73      private final Map<T, Integer> maxPerRoute;
74  
75      private volatile boolean isShutDown;
76      private volatile int defaultMaxPerRoute;
77      private volatile int maxTotal;
78      private volatile int validateAfterInactivity;
79  
80      public AbstractConnPool(
81              final ConnFactory<T, C> connFactory,
82              final int defaultMaxPerRoute,
83              final int maxTotal) {
84          super();
85          this.connFactory = Args.notNull(connFactory, "Connection factory");
86          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
87          this.maxTotal = Args.positive(maxTotal, "Max total value");
88          this.lock = new ReentrantLock();
89          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
90          this.leased = new HashSet<E>();
91          this.available = new LinkedList<E>();
92          this.pending = new LinkedList<PoolEntryFuture<E>>();
93          this.maxPerRoute = new HashMap<T, Integer>();
94      }
95  
96      /**
97       * Creates a new entry for the given connection with the given route.
98       */
99      protected abstract E createEntry(T route, C conn);
100 
101     /**
102      * @since 4.3
103      */
104     protected void onLease(final E entry) {
105     }
106 
107     /**
108      * @since 4.3
109      */
110     protected void onRelease(final E entry) {
111     }
112 
113     /**
114      * @since 4.4
115      */
116     protected boolean validate(final E entry) {
117         return true;
118     }
119 
120     public boolean isShutdown() {
121         return this.isShutDown;
122     }
123 
124     /**
125      * Shuts down the pool.
126      */
127     public void shutdown() throws IOException {
128         if (this.isShutDown) {
129             return ;
130         }
131         this.isShutDown = true;
132         this.lock.lock();
133         try {
134             for (final E entry: this.available) {
135                 entry.close();
136             }
137             for (final E entry: this.leased) {
138                 entry.close();
139             }
140             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
141                 pool.shutdown();
142             }
143             this.routeToPool.clear();
144             this.leased.clear();
145             this.available.clear();
146         } finally {
147             this.lock.unlock();
148         }
149     }
150 
151     private RouteSpecificPool<T, C, E> getPool(final T route) {
152         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
153         if (pool == null) {
154             pool = new RouteSpecificPool<T, C, E>(route) {
155 
156                 @Override
157                 protected E createEntry(final C conn) {
158                     return AbstractConnPool.this.createEntry(route, conn);
159                 }
160 
161             };
162             this.routeToPool.put(route, pool);
163         }
164         return pool;
165     }
166 
167     /**
168      * {@inheritDoc}
169      * <p/>
170      * Please note that this class does not maintain its own pool of execution
171      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
172      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
173      * returned by this method in order for the lease operation to complete.
174      */
175     @Override
176     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
177         Args.notNull(route, "Route");
178         Asserts.check(!this.isShutDown, "Connection pool shut down");
179         return new PoolEntryFuture<E>(this.lock, callback) {
180 
181             @Override
182             public E getPoolEntry(
183                     final long timeout,
184                     final TimeUnit tunit)
185                         throws InterruptedException, TimeoutException, IOException {
186                 final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
187                 onLease(entry);
188                 return entry;
189             }
190 
191         };
192     }
193 
194     /**
195      * Attempts to lease a connection for the given route and with the given
196      * state from the pool.
197      * <p/>
198      * Please note that this class does not maintain its own pool of execution
199      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
200      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
201      * returned by this method in order for the lease operation to complete.
202      *
203      * @param route route of the connection.
204      * @param state arbitrary object that represents a particular state
205      *  (usually a security principal or a unique token identifying
206      *  the user whose credentials have been used while establishing the connection).
207      *  May be <code>null</code>.
208      * @return future for a leased pool entry.
209      */
210     public Future<E> lease(final T route, final Object state) {
211         return lease(route, state, null);
212     }
213 
214     private E getPoolEntryBlocking(
215             final T route, final Object state,
216             final long timeout, final TimeUnit tunit,
217             final PoolEntryFuture<E> future)
218                 throws IOException, InterruptedException, TimeoutException {
219 
220         Date deadline = null;
221         if (timeout > 0) {
222             deadline = new Date
223                 (System.currentTimeMillis() + tunit.toMillis(timeout));
224         }
225 
226         this.lock.lock();
227         try {
228             final RouteSpecificPool<T, C, E> pool = getPool(route);
229             E entry = null;
230             while (entry == null) {
231                 Asserts.check(!this.isShutDown, "Connection pool shut down");
232                 for (;;) {
233                     entry = pool.getFree(state);
234                     if (entry == null) {
235                         break;
236                     }
237                     if (entry.isExpired(System.currentTimeMillis())) {
238                         entry.close();
239                     } else if (this.validateAfterInactivity > 0) {
240                         if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
241                             if (!validate(entry)) {
242                                 entry.close();
243                             }
244                         }
245                     }
246                     if (entry.isClosed()) {
247                         this.available.remove(entry);
248                         pool.free(entry, false);
249                     } else {
250                         break;
251                     }
252                 }
253                 if (entry != null) {
254                     this.available.remove(entry);
255                     this.leased.add(entry);
256                     return entry;
257                 }
258 
259                 // New connection is needed
260                 final int maxPerRoute = getMax(route);
261                 // Shrink the pool prior to allocating a new connection
262                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
263                 if (excess > 0) {
264                     for (int i = 0; i < excess; i++) {
265                         final E lastUsed = pool.getLastUsed();
266                         if (lastUsed == null) {
267                             break;
268                         }
269                         lastUsed.close();
270                         this.available.remove(lastUsed);
271                         pool.remove(lastUsed);
272                     }
273                 }
274 
275                 if (pool.getAllocatedCount() < maxPerRoute) {
276                     final int totalUsed = this.leased.size();
277                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
278                     if (freeCapacity > 0) {
279                         final int totalAvailable = this.available.size();
280                         if (totalAvailable > freeCapacity - 1) {
281                             if (!this.available.isEmpty()) {
282                                 final E lastUsed = this.available.removeLast();
283                                 lastUsed.close();
284                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
285                                 otherpool.remove(lastUsed);
286                             }
287                         }
288                         final C conn = this.connFactory.create(route);
289                         entry = pool.add(conn);
290                         this.leased.add(entry);
291                         return entry;
292                     }
293                 }
294 
295                 boolean success = false;
296                 try {
297                     pool.queue(future);
298                     this.pending.add(future);
299                     success = future.await(deadline);
300                 } finally {
301                     // In case of 'success', we were woken up by the
302                     // connection pool and should now have a connection
303                     // waiting for us, or else we're shutting down.
304                     // Just continue in the loop, both cases are checked.
305                     pool.unqueue(future);
306                     this.pending.remove(future);
307                 }
308                 // check for spurious wakeup vs. timeout
309                 if (!success && (deadline != null) &&
310                     (deadline.getTime() <= System.currentTimeMillis())) {
311                     break;
312                 }
313             }
314             throw new TimeoutException("Timeout waiting for connection");
315         } finally {
316             this.lock.unlock();
317         }
318     }
319 
320     @Override
321     public void release(final E entry, final boolean reusable) {
322         this.lock.lock();
323         try {
324             if (this.leased.remove(entry)) {
325                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
326                 pool.free(entry, reusable);
327                 if (reusable && !this.isShutDown) {
328                     this.available.addFirst(entry);
329                     onRelease(entry);
330                 } else {
331                     entry.close();
332                 }
333                 PoolEntryFuture<E> future = pool.nextPending();
334                 if (future != null) {
335                     this.pending.remove(future);
336                 } else {
337                     future = this.pending.poll();
338                 }
339                 if (future != null) {
340                     future.wakeup();
341                 }
342             }
343         } finally {
344             this.lock.unlock();
345         }
346     }
347 
348     private int getMax(final T route) {
349         final Integer v = this.maxPerRoute.get(route);
350         if (v != null) {
351             return v.intValue();
352         } else {
353             return this.defaultMaxPerRoute;
354         }
355     }
356 
357     @Override
358     public void setMaxTotal(final int max) {
359         Args.positive(max, "Max value");
360         this.lock.lock();
361         try {
362             this.maxTotal = max;
363         } finally {
364             this.lock.unlock();
365         }
366     }
367 
368     @Override
369     public int getMaxTotal() {
370         this.lock.lock();
371         try {
372             return this.maxTotal;
373         } finally {
374             this.lock.unlock();
375         }
376     }
377 
378     @Override
379     public void setDefaultMaxPerRoute(final int max) {
380         Args.positive(max, "Max per route value");
381         this.lock.lock();
382         try {
383             this.defaultMaxPerRoute = max;
384         } finally {
385             this.lock.unlock();
386         }
387     }
388 
389     @Override
390     public int getDefaultMaxPerRoute() {
391         this.lock.lock();
392         try {
393             return this.defaultMaxPerRoute;
394         } finally {
395             this.lock.unlock();
396         }
397     }
398 
399     @Override
400     public void setMaxPerRoute(final T route, final int max) {
401         Args.notNull(route, "Route");
402         Args.positive(max, "Max per route value");
403         this.lock.lock();
404         try {
405             this.maxPerRoute.put(route, Integer.valueOf(max));
406         } finally {
407             this.lock.unlock();
408         }
409     }
410 
411     @Override
412     public int getMaxPerRoute(final T route) {
413         Args.notNull(route, "Route");
414         this.lock.lock();
415         try {
416             return getMax(route);
417         } finally {
418             this.lock.unlock();
419         }
420     }
421 
422     @Override
423     public PoolStats getTotalStats() {
424         this.lock.lock();
425         try {
426             return new PoolStats(
427                     this.leased.size(),
428                     this.pending.size(),
429                     this.available.size(),
430                     this.maxTotal);
431         } finally {
432             this.lock.unlock();
433         }
434     }
435 
436     @Override
437     public PoolStats getStats(final T route) {
438         Args.notNull(route, "Route");
439         this.lock.lock();
440         try {
441             final RouteSpecificPool<T, C, E> pool = getPool(route);
442             return new PoolStats(
443                     pool.getLeasedCount(),
444                     pool.getPendingCount(),
445                     pool.getAvailableCount(),
446                     getMax(route));
447         } finally {
448             this.lock.unlock();
449         }
450     }
451 
452     /**
453      * Returns snapshot of all knows routes
454      *
455      * @since 4.4
456      */
457     public Set<T> getRoutes() {
458         this.lock.lock();
459         try {
460             return new HashSet<T>(routeToPool.keySet());
461         } finally {
462             this.lock.unlock();
463         }
464     }
465 
466     /**
467      * Enumerates all available connections.
468      *
469      * @since 4.3
470      */
471     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
472         this.lock.lock();
473         try {
474             final Iterator<E> it = this.available.iterator();
475             while (it.hasNext()) {
476                 final E entry = it.next();
477                 callback.process(entry);
478                 if (entry.isClosed()) {
479                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
480                     pool.remove(entry);
481                     it.remove();
482                 }
483             }
484             purgePoolMap();
485         } finally {
486             this.lock.unlock();
487         }
488     }
489 
490     /**
491      * Enumerates all leased connections.
492      *
493      * @since 4.3
494      */
495     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
496         this.lock.lock();
497         try {
498             final Iterator<E> it = this.leased.iterator();
499             while (it.hasNext()) {
500                 final E entry = it.next();
501                 callback.process(entry);
502             }
503         } finally {
504             this.lock.unlock();
505         }
506     }
507 
508     private void purgePoolMap() {
509         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
510         while (it.hasNext()) {
511             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
512             final RouteSpecificPool<T, C, E> pool = entry.getValue();
513             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
514                 it.remove();
515             }
516         }
517     }
518 
519     /**
520      * Closes connections that have been idle longer than the given period
521      * of time and evicts them from the pool.
522      *
523      * @param idletime maximum idle time.
524      * @param tunit time unit.
525      */
526     public void closeIdle(final long idletime, final TimeUnit tunit) {
527         Args.notNull(tunit, "Time unit");
528         long time = tunit.toMillis(idletime);
529         if (time < 0) {
530             time = 0;
531         }
532         final long deadline = System.currentTimeMillis() - time;
533         enumAvailable(new PoolEntryCallback<T, C>() {
534 
535             @Override
536             public void process(final PoolEntry<T, C> entry) {
537                 if (entry.getUpdated() <= deadline) {
538                     entry.close();
539                 }
540             }
541 
542         });
543     }
544 
545     /**
546      * Closes expired connections and evicts them from the pool.
547      */
548     public void closeExpired() {
549         final long now = System.currentTimeMillis();
550         enumAvailable(new PoolEntryCallback<T, C>() {
551 
552             @Override
553             public void process(final PoolEntry<T, C> entry) {
554                 if (entry.isExpired(now)) {
555                     entry.close();
556                 }
557             }
558 
559         });
560     }
561 
562     /**
563      * @since 4.4
564      */
565     public int getValidateAfterInactivity() {
566         return this.validateAfterInactivity;
567     }
568 
569     /**
570      * @since 4.4
571      */
572     public void setValidateAfterInactivity(final int ms) {
573         this.validateAfterInactivity = ms;
574     }
575 
576     @Override
577     public String toString() {
578         final StringBuilder buffer = new StringBuilder();
579         buffer.append("[leased: ");
580         buffer.append(this.leased);
581         buffer.append("][available: ");
582         buffer.append(this.available);
583         buffer.append("][pending: ");
584         buffer.append(this.pending);
585         buffer.append("]");
586         return buffer.toString();
587     }
588 
589 }