View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.concurrent.FutureCallback;
45  import org.apache.http.util.Args;
46  import org.apache.http.util.Asserts;
47  
48  /**
49   * Abstract synchronous (blocking) pool of connections.
50   * <p>
51   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
52   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
53   * method on the {@link Future} object returned by the
54   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
55   * to complete.
56   *
57   * @param <T> the route type that represents the opposite endpoint of a pooled
58   *   connection.
59   * @param <C> the connection type.
60   * @param <E> the type of the pool entry containing a pooled connection.
61   * @since 4.2
62   */
63  @ThreadSafe
64  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
65                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
66  
67      private final Lock lock;
68      private final ConnFactory<T, C> connFactory;
69      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
70      private final Set<E> leased;
71      private final LinkedList<E> available;
72      private final LinkedList<PoolEntryFuture<E>> pending;
73      private final Map<T, Integer> maxPerRoute;
74  
75      private volatile boolean isShutDown;
76      private volatile int defaultMaxPerRoute;
77      private volatile int maxTotal;
78      private volatile int validateAfterInactivity;
79  
80      public AbstractConnPool(
81              final ConnFactory<T, C> connFactory,
82              final int defaultMaxPerRoute,
83              final int maxTotal) {
84          super();
85          this.connFactory = Args.notNull(connFactory, "Connection factory");
86          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
87          this.maxTotal = Args.positive(maxTotal, "Max total value");
88          this.lock = new ReentrantLock();
89          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
90          this.leased = new HashSet<E>();
91          this.available = new LinkedList<E>();
92          this.pending = new LinkedList<PoolEntryFuture<E>>();
93          this.maxPerRoute = new HashMap<T, Integer>();
94      }
95  
96      /**
97       * Creates a new entry for the given connection with the given route.
98       */
99      protected abstract E createEntry(T route, C conn);
100 
101     /**
102      * @since 4.3
103      */
104     protected void onLease(final E entry) {
105     }
106 
107     /**
108      * @since 4.3
109      */
110     protected void onRelease(final E entry) {
111     }
112 
113     /**
114      * @since 4.4
115      */
116     protected boolean validate(final E entry) {
117         return true;
118     }
119 
120     public boolean isShutdown() {
121         return this.isShutDown;
122     }
123 
124     /**
125      * Shuts down the pool.
126      */
127     public void shutdown() throws IOException {
128         if (this.isShutDown) {
129             return ;
130         }
131         this.isShutDown = true;
132         this.lock.lock();
133         try {
134             for (final E entry: this.available) {
135                 entry.close();
136             }
137             for (final E entry: this.leased) {
138                 entry.close();
139             }
140             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
141                 pool.shutdown();
142             }
143             this.routeToPool.clear();
144             this.leased.clear();
145             this.available.clear();
146         } finally {
147             this.lock.unlock();
148         }
149     }
150 
151     private RouteSpecificPool<T, C, E> getPool(final T route) {
152         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
153         if (pool == null) {
154             pool = new RouteSpecificPool<T, C, E>(route) {
155 
156                 @Override
157                 protected E createEntry(final C conn) {
158                     return AbstractConnPool.this.createEntry(route, conn);
159                 }
160 
161             };
162             this.routeToPool.put(route, pool);
163         }
164         return pool;
165     }
166 
167     /**
168      * {@inheritDoc}
169      * <p>
170      * Please note that this class does not maintain its own pool of execution
171      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
172      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
173      * returned by this method in order for the lease operation to complete.
174      */
175     @Override
176     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
177         Args.notNull(route, "Route");
178         Asserts.check(!this.isShutDown, "Connection pool shut down");
179         return new PoolEntryFuture<E>(this.lock, callback) {
180 
181             @Override
182             public E getPoolEntry(
183                     final long timeout,
184                     final TimeUnit tunit)
185                         throws InterruptedException, TimeoutException, IOException {
186                 final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
187                 onLease(entry);
188                 return entry;
189             }
190 
191         };
192     }
193 
194     /**
195      * Attempts to lease a connection for the given route and with the given
196      * state from the pool.
197      * <p>
198      * Please note that this class does not maintain its own pool of execution
199      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
200      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
201      * returned by this method in order for the lease operation to complete.
202      *
203      * @param route route of the connection.
204      * @param state arbitrary object that represents a particular state
205      *  (usually a security principal or a unique token identifying
206      *  the user whose credentials have been used while establishing the connection).
207      *  May be {@code null}.
208      * @return future for a leased pool entry.
209      */
210     public Future<E> lease(final T route, final Object state) {
211         return lease(route, state, null);
212     }
213 
214     private E getPoolEntryBlocking(
215             final T route, final Object state,
216             final long timeout, final TimeUnit tunit,
217             final PoolEntryFuture<E> future)
218                 throws IOException, InterruptedException, TimeoutException {
219 
220         Date deadline = null;
221         if (timeout > 0) {
222             deadline = new Date
223                 (System.currentTimeMillis() + tunit.toMillis(timeout));
224         }
225 
226         this.lock.lock();
227         try {
228             final RouteSpecificPool<T, C, E> pool = getPool(route);
229             E entry = null;
230             while (entry == null) {
231                 Asserts.check(!this.isShutDown, "Connection pool shut down");
232                 for (;;) {
233                     entry = pool.getFree(state);
234                     if (entry == null) {
235                         break;
236                     }
237                     if (entry.isExpired(System.currentTimeMillis())) {
238                         entry.close();
239                     } else if (this.validateAfterInactivity > 0) {
240                         if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
241                             if (!validate(entry)) {
242                                 entry.close();
243                             }
244                         }
245                     }
246                     if (entry.isClosed()) {
247                         this.available.remove(entry);
248                         pool.free(entry, false);
249                     } else {
250                         break;
251                     }
252                 }
253                 if (entry != null) {
254                     this.available.remove(entry);
255                     this.leased.add(entry);
256                     return entry;
257                 }
258 
259                 // New connection is needed
260                 final int maxPerRoute = getMax(route);
261                 // Shrink the pool prior to allocating a new connection
262                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
263                 if (excess > 0) {
264                     for (int i = 0; i < excess; i++) {
265                         final E lastUsed = pool.getLastUsed();
266                         if (lastUsed == null) {
267                             break;
268                         }
269                         lastUsed.close();
270                         this.available.remove(lastUsed);
271                         pool.remove(lastUsed);
272                     }
273                 }
274 
275                 if (pool.getAllocatedCount() < maxPerRoute) {
276                     final int totalUsed = this.leased.size();
277                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
278                     if (freeCapacity > 0) {
279                         final int totalAvailable = this.available.size();
280                         if (totalAvailable > freeCapacity - 1) {
281                             if (!this.available.isEmpty()) {
282                                 final E lastUsed = this.available.removeLast();
283                                 lastUsed.close();
284                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
285                                 otherpool.remove(lastUsed);
286                             }
287                         }
288                         final C conn = this.connFactory.create(route);
289                         entry = pool.add(conn);
290                         this.leased.add(entry);
291                         return entry;
292                     }
293                 }
294 
295                 boolean success = false;
296                 try {
297                     pool.queue(future);
298                     this.pending.add(future);
299                     success = future.await(deadline);
300                 } finally {
301                     // In case of 'success', we were woken up by the
302                     // connection pool and should now have a connection
303                     // waiting for us, or else we're shutting down.
304                     // Just continue in the loop, both cases are checked.
305                     pool.unqueue(future);
306                     this.pending.remove(future);
307                 }
308                 // check for spurious wakeup vs. timeout
309                 if (!success && (deadline != null) &&
310                     (deadline.getTime() <= System.currentTimeMillis())) {
311                     break;
312                 }
313             }
314             throw new TimeoutException("Timeout waiting for connection");
315         } finally {
316             this.lock.unlock();
317         }
318     }
319 
320     @Override
321     public void release(final E entry, final boolean reusable) {
322         this.lock.lock();
323         try {
324             if (this.leased.remove(entry)) {
325                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
326                 pool.free(entry, reusable);
327                 if (reusable && !this.isShutDown) {
328                     this.available.addFirst(entry);
329                     onRelease(entry);
330                 } else {
331                     entry.close();
332                 }
333                 PoolEntryFuture<E> future = pool.nextPending();
334                 if (future != null) {
335                     this.pending.remove(future);
336                 } else {
337                     future = this.pending.poll();
338                 }
339                 if (future != null) {
340                     future.wakeup();
341                 }
342             }
343         } finally {
344             this.lock.unlock();
345         }
346     }
347 
348     private int getMax(final T route) {
349         final Integer v = this.maxPerRoute.get(route);
350         if (v != null) {
351             return v.intValue();
352         } else {
353             return this.defaultMaxPerRoute;
354         }
355     }
356 
357     @Override
358     public void setMaxTotal(final int max) {
359         Args.positive(max, "Max value");
360         this.lock.lock();
361         try {
362             this.maxTotal = max;
363         } finally {
364             this.lock.unlock();
365         }
366     }
367 
368     @Override
369     public int getMaxTotal() {
370         this.lock.lock();
371         try {
372             return this.maxTotal;
373         } finally {
374             this.lock.unlock();
375         }
376     }
377 
378     @Override
379     public void setDefaultMaxPerRoute(final int max) {
380         Args.positive(max, "Max per route value");
381         this.lock.lock();
382         try {
383             this.defaultMaxPerRoute = max;
384         } finally {
385             this.lock.unlock();
386         }
387     }
388 
389     @Override
390     public int getDefaultMaxPerRoute() {
391         this.lock.lock();
392         try {
393             return this.defaultMaxPerRoute;
394         } finally {
395             this.lock.unlock();
396         }
397     }
398 
399     @Override
400     public void setMaxPerRoute(final T route, final int max) {
401         Args.notNull(route, "Route");
402         Args.positive(max, "Max per route value");
403         this.lock.lock();
404         try {
405             this.maxPerRoute.put(route, Integer.valueOf(max));
406         } finally {
407             this.lock.unlock();
408         }
409     }
410 
411     @Override
412     public int getMaxPerRoute(final T route) {
413         Args.notNull(route, "Route");
414         this.lock.lock();
415         try {
416             return getMax(route);
417         } finally {
418             this.lock.unlock();
419         }
420     }
421 
422     @Override
423     public PoolStats getTotalStats() {
424         this.lock.lock();
425         try {
426             return new PoolStats(
427                     this.leased.size(),
428                     this.pending.size(),
429                     this.available.size(),
430                     this.maxTotal);
431         } finally {
432             this.lock.unlock();
433         }
434     }
435 
436     @Override
437     public PoolStats getStats(final T route) {
438         Args.notNull(route, "Route");
439         this.lock.lock();
440         try {
441             final RouteSpecificPool<T, C, E> pool = getPool(route);
442             return new PoolStats(
443                     pool.getLeasedCount(),
444                     pool.getPendingCount(),
445                     pool.getAvailableCount(),
446                     getMax(route));
447         } finally {
448             this.lock.unlock();
449         }
450     }
451 
452     /**
453      * Returns snapshot of all knows routes
454      * @return the set of routes
455      *
456      * @since 4.4
457      */
458     public Set<T> getRoutes() {
459         this.lock.lock();
460         try {
461             return new HashSet<T>(routeToPool.keySet());
462         } finally {
463             this.lock.unlock();
464         }
465     }
466 
467     /**
468      * Enumerates all available connections.
469      *
470      * @since 4.3
471      */
472     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
473         this.lock.lock();
474         try {
475             final Iterator<E> it = this.available.iterator();
476             while (it.hasNext()) {
477                 final E entry = it.next();
478                 callback.process(entry);
479                 if (entry.isClosed()) {
480                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
481                     pool.remove(entry);
482                     it.remove();
483                 }
484             }
485             purgePoolMap();
486         } finally {
487             this.lock.unlock();
488         }
489     }
490 
491     /**
492      * Enumerates all leased connections.
493      *
494      * @since 4.3
495      */
496     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
497         this.lock.lock();
498         try {
499             final Iterator<E> it = this.leased.iterator();
500             while (it.hasNext()) {
501                 final E entry = it.next();
502                 callback.process(entry);
503             }
504         } finally {
505             this.lock.unlock();
506         }
507     }
508 
509     private void purgePoolMap() {
510         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
511         while (it.hasNext()) {
512             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
513             final RouteSpecificPool<T, C, E> pool = entry.getValue();
514             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
515                 it.remove();
516             }
517         }
518     }
519 
520     /**
521      * Closes connections that have been idle longer than the given period
522      * of time and evicts them from the pool.
523      *
524      * @param idletime maximum idle time.
525      * @param tunit time unit.
526      */
527     public void closeIdle(final long idletime, final TimeUnit tunit) {
528         Args.notNull(tunit, "Time unit");
529         long time = tunit.toMillis(idletime);
530         if (time < 0) {
531             time = 0;
532         }
533         final long deadline = System.currentTimeMillis() - time;
534         enumAvailable(new PoolEntryCallback<T, C>() {
535 
536             @Override
537             public void process(final PoolEntry<T, C> entry) {
538                 if (entry.getUpdated() <= deadline) {
539                     entry.close();
540                 }
541             }
542 
543         });
544     }
545 
546     /**
547      * Closes expired connections and evicts them from the pool.
548      */
549     public void closeExpired() {
550         final long now = System.currentTimeMillis();
551         enumAvailable(new PoolEntryCallback<T, C>() {
552 
553             @Override
554             public void process(final PoolEntry<T, C> entry) {
555                 if (entry.isExpired(now)) {
556                     entry.close();
557                 }
558             }
559 
560         });
561     }
562 
563     /**
564      * @return the number of milliseconds
565      * @since 4.4
566      */
567     public int getValidateAfterInactivity() {
568         return this.validateAfterInactivity;
569     }
570 
571     /**
572      * @param ms the number of milliseconds
573      * @since 4.4
574      */
575     public void setValidateAfterInactivity(final int ms) {
576         this.validateAfterInactivity = ms;
577     }
578 
579     @Override
580     public String toString() {
581         final StringBuilder buffer = new StringBuilder();
582         buffer.append("[leased: ");
583         buffer.append(this.leased);
584         buffer.append("][available: ");
585         buffer.append(this.available);
586         buffer.append("][pending: ");
587         buffer.append(this.pending);
588         buffer.append("]");
589         return buffer.toString();
590     }
591 
592 }