View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.concurrent.FutureCallback;
45  import org.apache.http.util.Args;
46  import org.apache.http.util.Asserts;
47  
48  /**
49   * Abstract synchronous (blocking) pool of connections.
50   * <p>
51   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
52   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
53   * method on the {@link Future} object returned by the
54   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
55   * to complete.
56   *
57   * @param <T> the route type that represents the opposite endpoint of a pooled
58   *   connection.
59   * @param <C> the connection type.
60   * @param <E> the type of the pool entry containing a pooled connection.
61   * @since 4.2
62   */
63  @ThreadSafe
64  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
65                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
66  
67      private final Lock lock;
68      private final ConnFactory<T, C> connFactory;
69      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
70      private final Set<E> leased;
71      private final LinkedList<E> available;
72      private final LinkedList<PoolEntryFuture<E>> pending;
73      private final Map<T, Integer> maxPerRoute;
74  
75      private volatile boolean isShutDown;
76      private volatile int defaultMaxPerRoute;
77      private volatile int maxTotal;
78      private volatile int validateAfterInactivity;
79  
80      public AbstractConnPool(
81              final ConnFactory<T, C> connFactory,
82              final int defaultMaxPerRoute,
83              final int maxTotal) {
84          super();
85          this.connFactory = Args.notNull(connFactory, "Connection factory");
86          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
87          this.maxTotal = Args.positive(maxTotal, "Max total value");
88          this.lock = new ReentrantLock();
89          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
90          this.leased = new HashSet<E>();
91          this.available = new LinkedList<E>();
92          this.pending = new LinkedList<PoolEntryFuture<E>>();
93          this.maxPerRoute = new HashMap<T, Integer>();
94      }
95  
96      /**
97       * Creates a new entry for the given connection with the given route.
98       */
99      protected abstract E createEntry(T route, C conn);
100 
101     /**
102      * @since 4.3
103      */
104     protected void onLease(final E entry) {
105     }
106 
107     /**
108      * @since 4.3
109      */
110     protected void onRelease(final E entry) {
111     }
112 
113     /**
114      * @since 4.4
115      */
116     protected void onReuse(final E entry) {
117     }
118 
119     /**
120      * @since 4.4
121      */
122     protected boolean validate(final E entry) {
123         return true;
124     }
125 
126     public boolean isShutdown() {
127         return this.isShutDown;
128     }
129 
130     /**
131      * Shuts down the pool.
132      */
133     public void shutdown() throws IOException {
134         if (this.isShutDown) {
135             return ;
136         }
137         this.isShutDown = true;
138         this.lock.lock();
139         try {
140             for (final E entry: this.available) {
141                 entry.close();
142             }
143             for (final E entry: this.leased) {
144                 entry.close();
145             }
146             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
147                 pool.shutdown();
148             }
149             this.routeToPool.clear();
150             this.leased.clear();
151             this.available.clear();
152         } finally {
153             this.lock.unlock();
154         }
155     }
156 
157     private RouteSpecificPool<T, C, E> getPool(final T route) {
158         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
159         if (pool == null) {
160             pool = new RouteSpecificPool<T, C, E>(route) {
161 
162                 @Override
163                 protected E createEntry(final C conn) {
164                     return AbstractConnPool.this.createEntry(route, conn);
165                 }
166 
167             };
168             this.routeToPool.put(route, pool);
169         }
170         return pool;
171     }
172 
173     /**
174      * {@inheritDoc}
175      * <p>
176      * Please note that this class does not maintain its own pool of execution
177      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
178      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
179      * returned by this method in order for the lease operation to complete.
180      */
181     @Override
182     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
183         Args.notNull(route, "Route");
184         Asserts.check(!this.isShutDown, "Connection pool shut down");
185         return new PoolEntryFuture<E>(this.lock, callback) {
186 
187             @Override
188             public E getPoolEntry(
189                     final long timeout,
190                     final TimeUnit tunit)
191                         throws InterruptedException, TimeoutException, IOException {
192                 final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
193                 onLease(entry);
194                 return entry;
195             }
196 
197         };
198     }
199 
200     /**
201      * Attempts to lease a connection for the given route and with the given
202      * state from the pool.
203      * <p>
204      * Please note that this class does not maintain its own pool of execution
205      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
206      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
207      * returned by this method in order for the lease operation to complete.
208      *
209      * @param route route of the connection.
210      * @param state arbitrary object that represents a particular state
211      *  (usually a security principal or a unique token identifying
212      *  the user whose credentials have been used while establishing the connection).
213      *  May be {@code null}.
214      * @return future for a leased pool entry.
215      */
216     public Future<E> lease(final T route, final Object state) {
217         return lease(route, state, null);
218     }
219 
220     private E getPoolEntryBlocking(
221             final T route, final Object state,
222             final long timeout, final TimeUnit tunit,
223             final PoolEntryFuture<E> future)
224                 throws IOException, InterruptedException, TimeoutException {
225 
226         Date deadline = null;
227         if (timeout > 0) {
228             deadline = new Date
229                 (System.currentTimeMillis() + tunit.toMillis(timeout));
230         }
231 
232         this.lock.lock();
233         try {
234             final RouteSpecificPool<T, C, E> pool = getPool(route);
235             E entry = null;
236             while (entry == null) {
237                 Asserts.check(!this.isShutDown, "Connection pool shut down");
238                 for (;;) {
239                     entry = pool.getFree(state);
240                     if (entry == null) {
241                         break;
242                     }
243                     if (entry.isExpired(System.currentTimeMillis())) {
244                         entry.close();
245                     } else if (this.validateAfterInactivity > 0) {
246                         if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
247                             if (!validate(entry)) {
248                                 entry.close();
249                             }
250                         }
251                     }
252                     if (entry.isClosed()) {
253                         this.available.remove(entry);
254                         pool.free(entry, false);
255                     } else {
256                         break;
257                     }
258                 }
259                 if (entry != null) {
260                     this.available.remove(entry);
261                     this.leased.add(entry);
262                     onReuse(entry);
263                     return entry;
264                 }
265 
266                 // New connection is needed
267                 final int maxPerRoute = getMax(route);
268                 // Shrink the pool prior to allocating a new connection
269                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
270                 if (excess > 0) {
271                     for (int i = 0; i < excess; i++) {
272                         final E lastUsed = pool.getLastUsed();
273                         if (lastUsed == null) {
274                             break;
275                         }
276                         lastUsed.close();
277                         this.available.remove(lastUsed);
278                         pool.remove(lastUsed);
279                     }
280                 }
281 
282                 if (pool.getAllocatedCount() < maxPerRoute) {
283                     final int totalUsed = this.leased.size();
284                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
285                     if (freeCapacity > 0) {
286                         final int totalAvailable = this.available.size();
287                         if (totalAvailable > freeCapacity - 1) {
288                             if (!this.available.isEmpty()) {
289                                 final E lastUsed = this.available.removeLast();
290                                 lastUsed.close();
291                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
292                                 otherpool.remove(lastUsed);
293                             }
294                         }
295                         final C conn = this.connFactory.create(route);
296                         entry = pool.add(conn);
297                         this.leased.add(entry);
298                         return entry;
299                     }
300                 }
301 
302                 boolean success = false;
303                 try {
304                     pool.queue(future);
305                     this.pending.add(future);
306                     success = future.await(deadline);
307                 } finally {
308                     // In case of 'success', we were woken up by the
309                     // connection pool and should now have a connection
310                     // waiting for us, or else we're shutting down.
311                     // Just continue in the loop, both cases are checked.
312                     pool.unqueue(future);
313                     this.pending.remove(future);
314                 }
315                 // check for spurious wakeup vs. timeout
316                 if (!success && (deadline != null) &&
317                     (deadline.getTime() <= System.currentTimeMillis())) {
318                     break;
319                 }
320             }
321             throw new TimeoutException("Timeout waiting for connection");
322         } finally {
323             this.lock.unlock();
324         }
325     }
326 
327     @Override
328     public void release(final E entry, final boolean reusable) {
329         this.lock.lock();
330         try {
331             if (this.leased.remove(entry)) {
332                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
333                 pool.free(entry, reusable);
334                 if (reusable && !this.isShutDown) {
335                     this.available.addFirst(entry);
336                     onRelease(entry);
337                 } else {
338                     entry.close();
339                 }
340                 PoolEntryFuture<E> future = pool.nextPending();
341                 if (future != null) {
342                     this.pending.remove(future);
343                 } else {
344                     future = this.pending.poll();
345                 }
346                 if (future != null) {
347                     future.wakeup();
348                 }
349             }
350         } finally {
351             this.lock.unlock();
352         }
353     }
354 
355     private int getMax(final T route) {
356         final Integer v = this.maxPerRoute.get(route);
357         if (v != null) {
358             return v.intValue();
359         } else {
360             return this.defaultMaxPerRoute;
361         }
362     }
363 
364     @Override
365     public void setMaxTotal(final int max) {
366         Args.positive(max, "Max value");
367         this.lock.lock();
368         try {
369             this.maxTotal = max;
370         } finally {
371             this.lock.unlock();
372         }
373     }
374 
375     @Override
376     public int getMaxTotal() {
377         this.lock.lock();
378         try {
379             return this.maxTotal;
380         } finally {
381             this.lock.unlock();
382         }
383     }
384 
385     @Override
386     public void setDefaultMaxPerRoute(final int max) {
387         Args.positive(max, "Max per route value");
388         this.lock.lock();
389         try {
390             this.defaultMaxPerRoute = max;
391         } finally {
392             this.lock.unlock();
393         }
394     }
395 
396     @Override
397     public int getDefaultMaxPerRoute() {
398         this.lock.lock();
399         try {
400             return this.defaultMaxPerRoute;
401         } finally {
402             this.lock.unlock();
403         }
404     }
405 
406     @Override
407     public void setMaxPerRoute(final T route, final int max) {
408         Args.notNull(route, "Route");
409         Args.positive(max, "Max per route value");
410         this.lock.lock();
411         try {
412             this.maxPerRoute.put(route, Integer.valueOf(max));
413         } finally {
414             this.lock.unlock();
415         }
416     }
417 
418     @Override
419     public int getMaxPerRoute(final T route) {
420         Args.notNull(route, "Route");
421         this.lock.lock();
422         try {
423             return getMax(route);
424         } finally {
425             this.lock.unlock();
426         }
427     }
428 
429     @Override
430     public PoolStats getTotalStats() {
431         this.lock.lock();
432         try {
433             return new PoolStats(
434                     this.leased.size(),
435                     this.pending.size(),
436                     this.available.size(),
437                     this.maxTotal);
438         } finally {
439             this.lock.unlock();
440         }
441     }
442 
443     @Override
444     public PoolStats getStats(final T route) {
445         Args.notNull(route, "Route");
446         this.lock.lock();
447         try {
448             final RouteSpecificPool<T, C, E> pool = getPool(route);
449             return new PoolStats(
450                     pool.getLeasedCount(),
451                     pool.getPendingCount(),
452                     pool.getAvailableCount(),
453                     getMax(route));
454         } finally {
455             this.lock.unlock();
456         }
457     }
458 
459     /**
460      * Returns snapshot of all knows routes
461      * @return the set of routes
462      *
463      * @since 4.4
464      */
465     public Set<T> getRoutes() {
466         this.lock.lock();
467         try {
468             return new HashSet<T>(routeToPool.keySet());
469         } finally {
470             this.lock.unlock();
471         }
472     }
473 
474     /**
475      * Enumerates all available connections.
476      *
477      * @since 4.3
478      */
479     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
480         this.lock.lock();
481         try {
482             final Iterator<E> it = this.available.iterator();
483             while (it.hasNext()) {
484                 final E entry = it.next();
485                 callback.process(entry);
486                 if (entry.isClosed()) {
487                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
488                     pool.remove(entry);
489                     it.remove();
490                 }
491             }
492             purgePoolMap();
493         } finally {
494             this.lock.unlock();
495         }
496     }
497 
498     /**
499      * Enumerates all leased connections.
500      *
501      * @since 4.3
502      */
503     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
504         this.lock.lock();
505         try {
506             final Iterator<E> it = this.leased.iterator();
507             while (it.hasNext()) {
508                 final E entry = it.next();
509                 callback.process(entry);
510             }
511         } finally {
512             this.lock.unlock();
513         }
514     }
515 
516     private void purgePoolMap() {
517         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
518         while (it.hasNext()) {
519             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
520             final RouteSpecificPool<T, C, E> pool = entry.getValue();
521             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
522                 it.remove();
523             }
524         }
525     }
526 
527     /**
528      * Closes connections that have been idle longer than the given period
529      * of time and evicts them from the pool.
530      *
531      * @param idletime maximum idle time.
532      * @param tunit time unit.
533      */
534     public void closeIdle(final long idletime, final TimeUnit tunit) {
535         Args.notNull(tunit, "Time unit");
536         long time = tunit.toMillis(idletime);
537         if (time < 0) {
538             time = 0;
539         }
540         final long deadline = System.currentTimeMillis() - time;
541         enumAvailable(new PoolEntryCallback<T, C>() {
542 
543             @Override
544             public void process(final PoolEntry<T, C> entry) {
545                 if (entry.getUpdated() <= deadline) {
546                     entry.close();
547                 }
548             }
549 
550         });
551     }
552 
553     /**
554      * Closes expired connections and evicts them from the pool.
555      */
556     public void closeExpired() {
557         final long now = System.currentTimeMillis();
558         enumAvailable(new PoolEntryCallback<T, C>() {
559 
560             @Override
561             public void process(final PoolEntry<T, C> entry) {
562                 if (entry.isExpired(now)) {
563                     entry.close();
564                 }
565             }
566 
567         });
568     }
569 
570     /**
571      * @return the number of milliseconds
572      * @since 4.4
573      */
574     public int getValidateAfterInactivity() {
575         return this.validateAfterInactivity;
576     }
577 
578     /**
579      * @param ms the number of milliseconds
580      * @since 4.4
581      */
582     public void setValidateAfterInactivity(final int ms) {
583         this.validateAfterInactivity = ms;
584     }
585 
586     @Override
587     public String toString() {
588         final StringBuilder buffer = new StringBuilder();
589         buffer.append("[leased: ");
590         buffer.append(this.leased);
591         buffer.append("][available: ");
592         buffer.append(this.available);
593         buffer.append("][pending: ");
594         buffer.append(this.pending);
595         buffer.append("]");
596         return buffer.toString();
597     }
598 
599 }