View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.concurrent.FutureCallback;
45  import org.apache.http.util.Args;
46  import org.apache.http.util.Asserts;
47  
48  /**
49   * Abstract synchronous (blocking) pool of connections.
50   * <p/>
51   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
52   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
53   * method on the {@link Future} object returned by the
54   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
55   * to complete.
56   *
57   * @param <T> the route type that represents the opposite endpoint of a pooled
58   *   connection.
59   * @param <C> the connection type.
60   * @param <E> the type of the pool entry containing a pooled connection.
61   * @since 4.2
62   */
63  @ThreadSafe
64  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
65                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
66  
67      private final Lock lock;
68      private final ConnFactory<T, C> connFactory;
69      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
70      private final Set<E> leased;
71      private final LinkedList<E> available;
72      private final LinkedList<PoolEntryFuture<E>> pending;
73      private final Map<T, Integer> maxPerRoute;
74  
75      private volatile boolean isShutDown;
76      private volatile int defaultMaxPerRoute;
77      private volatile int maxTotal;
78  
79      public AbstractConnPool(
80              final ConnFactory<T, C> connFactory,
81              final int defaultMaxPerRoute,
82              final int maxTotal) {
83          super();
84          this.connFactory = Args.notNull(connFactory, "Connection factory");
85          this.defaultMaxPerRoute = Args.notNegative(defaultMaxPerRoute, "Max per route value");
86          this.maxTotal = Args.notNegative(maxTotal, "Max total value");
87          this.lock = new ReentrantLock();
88          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
89          this.leased = new HashSet<E>();
90          this.available = new LinkedList<E>();
91          this.pending = new LinkedList<PoolEntryFuture<E>>();
92          this.maxPerRoute = new HashMap<T, Integer>();
93      }
94  
95      /**
96       * Creates a new entry for the given connection with the given route.
97       */
98      protected abstract E createEntry(T route, C conn);
99  
100     /**
101      * @since 4.3
102      */
103     protected void onLease(final E entry) {
104     }
105 
106     /**
107      * @since 4.3
108      */
109     protected void onRelease(final E entry) {
110     }
111 
112     public boolean isShutdown() {
113         return this.isShutDown;
114     }
115 
116     /**
117      * Shuts down the pool.
118      */
119     public void shutdown() throws IOException {
120         if (this.isShutDown) {
121             return ;
122         }
123         this.isShutDown = true;
124         this.lock.lock();
125         try {
126             for (final E entry: this.available) {
127                 entry.close();
128             }
129             for (final E entry: this.leased) {
130                 entry.close();
131             }
132             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
133                 pool.shutdown();
134             }
135             this.routeToPool.clear();
136             this.leased.clear();
137             this.available.clear();
138         } finally {
139             this.lock.unlock();
140         }
141     }
142 
143     private RouteSpecificPool<T, C, E> getPool(final T route) {
144         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
145         if (pool == null) {
146             pool = new RouteSpecificPool<T, C, E>(route) {
147 
148                 @Override
149                 protected E createEntry(final C conn) {
150                     return AbstractConnPool.this.createEntry(route, conn);
151                 }
152 
153             };
154             this.routeToPool.put(route, pool);
155         }
156         return pool;
157     }
158 
159     /**
160      * {@inheritDoc}
161      * <p/>
162      * Please note that this class does not maintain its own pool of execution
163      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
164      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
165      * returned by this method in order for the lease operation to complete.
166      */
167     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
168         Args.notNull(route, "Route");
169         Asserts.check(!this.isShutDown, "Connection pool shut down");
170         return new PoolEntryFuture<E>(this.lock, callback) {
171 
172             @Override
173             public E getPoolEntry(
174                     final long timeout,
175                     final TimeUnit tunit)
176                         throws InterruptedException, TimeoutException, IOException {
177                 final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
178                 onLease(entry);
179                 return entry;
180             }
181 
182         };
183     }
184 
185     /**
186      * Attempts to lease a connection for the given route and with the given
187      * state from the pool.
188      * <p/>
189      * Please note that this class does not maintain its own pool of execution
190      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
191      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
192      * returned by this method in order for the lease operation to complete.
193      *
194      * @param route route of the connection.
195      * @param state arbitrary object that represents a particular state
196      *  (usually a security principal or a unique token identifying
197      *  the user whose credentials have been used while establishing the connection).
198      *  May be <code>null</code>.
199      * @return future for a leased pool entry.
200      */
201     public Future<E> lease(final T route, final Object state) {
202         return lease(route, state, null);
203     }
204 
205     private E getPoolEntryBlocking(
206             final T route, final Object state,
207             final long timeout, final TimeUnit tunit,
208             final PoolEntryFuture<E> future)
209                 throws IOException, InterruptedException, TimeoutException {
210 
211         Date deadline = null;
212         if (timeout > 0) {
213             deadline = new Date
214                 (System.currentTimeMillis() + tunit.toMillis(timeout));
215         }
216 
217         this.lock.lock();
218         try {
219             final RouteSpecificPool<T, C, E> pool = getPool(route);
220             E entry = null;
221             while (entry == null) {
222                 Asserts.check(!this.isShutDown, "Connection pool shut down");
223                 for (;;) {
224                     entry = pool.getFree(state);
225                     if (entry == null) {
226                         break;
227                     }
228                     if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
229                         entry.close();
230                         this.available.remove(entry);
231                         pool.free(entry, false);
232                     } else {
233                         break;
234                     }
235                 }
236                 if (entry != null) {
237                     this.available.remove(entry);
238                     this.leased.add(entry);
239                     return entry;
240                 }
241 
242                 // New connection is needed
243                 final int maxPerRoute = getMax(route);
244                 // Shrink the pool prior to allocating a new connection
245                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
246                 if (excess > 0) {
247                     for (int i = 0; i < excess; i++) {
248                         final E lastUsed = pool.getLastUsed();
249                         if (lastUsed == null) {
250                             break;
251                         }
252                         lastUsed.close();
253                         this.available.remove(lastUsed);
254                         pool.remove(lastUsed);
255                     }
256                 }
257 
258                 if (pool.getAllocatedCount() < maxPerRoute) {
259                     final int totalUsed = this.leased.size();
260                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
261                     if (freeCapacity > 0) {
262                         final int totalAvailable = this.available.size();
263                         if (totalAvailable > freeCapacity - 1) {
264                             if (!this.available.isEmpty()) {
265                                 final E lastUsed = this.available.removeLast();
266                                 lastUsed.close();
267                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
268                                 otherpool.remove(lastUsed);
269                             }
270                         }
271                         final C conn = this.connFactory.create(route);
272                         entry = pool.add(conn);
273                         this.leased.add(entry);
274                         return entry;
275                     }
276                 }
277 
278                 boolean success = false;
279                 try {
280                     pool.queue(future);
281                     this.pending.add(future);
282                     success = future.await(deadline);
283                 } finally {
284                     // In case of 'success', we were woken up by the
285                     // connection pool and should now have a connection
286                     // waiting for us, or else we're shutting down.
287                     // Just continue in the loop, both cases are checked.
288                     pool.unqueue(future);
289                     this.pending.remove(future);
290                 }
291                 // check for spurious wakeup vs. timeout
292                 if (!success && (deadline != null) &&
293                     (deadline.getTime() <= System.currentTimeMillis())) {
294                     break;
295                 }
296             }
297             throw new TimeoutException("Timeout waiting for connection");
298         } finally {
299             this.lock.unlock();
300         }
301     }
302 
303     public void release(final E entry, final boolean reusable) {
304         this.lock.lock();
305         try {
306             if (this.leased.remove(entry)) {
307                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
308                 pool.free(entry, reusable);
309                 if (reusable && !this.isShutDown) {
310                     this.available.addFirst(entry);
311                     onRelease(entry);
312                 } else {
313                     entry.close();
314                 }
315                 PoolEntryFuture<E> future = pool.nextPending();
316                 if (future != null) {
317                     this.pending.remove(future);
318                 } else {
319                     future = this.pending.poll();
320                 }
321                 if (future != null) {
322                     future.wakeup();
323                 }
324             }
325         } finally {
326             this.lock.unlock();
327         }
328     }
329 
330     private int getMax(final T route) {
331         final Integer v = this.maxPerRoute.get(route);
332         if (v != null) {
333             return v.intValue();
334         } else {
335             return this.defaultMaxPerRoute;
336         }
337     }
338 
339     public void setMaxTotal(final int max) {
340         Args.notNegative(max, "Max value");
341         this.lock.lock();
342         try {
343             this.maxTotal = max;
344         } finally {
345             this.lock.unlock();
346         }
347     }
348 
349     public int getMaxTotal() {
350         this.lock.lock();
351         try {
352             return this.maxTotal;
353         } finally {
354             this.lock.unlock();
355         }
356     }
357 
358     public void setDefaultMaxPerRoute(final int max) {
359         Args.notNegative(max, "Max per route value");
360         this.lock.lock();
361         try {
362             this.defaultMaxPerRoute = max;
363         } finally {
364             this.lock.unlock();
365         }
366     }
367 
368     public int getDefaultMaxPerRoute() {
369         this.lock.lock();
370         try {
371             return this.defaultMaxPerRoute;
372         } finally {
373             this.lock.unlock();
374         }
375     }
376 
377     public void setMaxPerRoute(final T route, final int max) {
378         Args.notNull(route, "Route");
379         Args.notNegative(max, "Max per route value");
380         this.lock.lock();
381         try {
382             this.maxPerRoute.put(route, Integer.valueOf(max));
383         } finally {
384             this.lock.unlock();
385         }
386     }
387 
388     public int getMaxPerRoute(final T route) {
389         Args.notNull(route, "Route");
390         this.lock.lock();
391         try {
392             return getMax(route);
393         } finally {
394             this.lock.unlock();
395         }
396     }
397 
398     public PoolStats getTotalStats() {
399         this.lock.lock();
400         try {
401             return new PoolStats(
402                     this.leased.size(),
403                     this.pending.size(),
404                     this.available.size(),
405                     this.maxTotal);
406         } finally {
407             this.lock.unlock();
408         }
409     }
410 
411     public PoolStats getStats(final T route) {
412         Args.notNull(route, "Route");
413         this.lock.lock();
414         try {
415             final RouteSpecificPool<T, C, E> pool = getPool(route);
416             return new PoolStats(
417                     pool.getLeasedCount(),
418                     pool.getPendingCount(),
419                     pool.getAvailableCount(),
420                     getMax(route));
421         } finally {
422             this.lock.unlock();
423         }
424     }
425 
426     /**
427      * Enumerates all available connections.
428      *
429      * @since 4.3
430      */
431     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
432         this.lock.lock();
433         try {
434             final Iterator<E> it = this.available.iterator();
435             while (it.hasNext()) {
436                 final E entry = it.next();
437                 callback.process(entry);
438                 if (entry.isClosed()) {
439                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
440                     pool.remove(entry);
441                     it.remove();
442                 }
443             }
444             purgePoolMap();
445         } finally {
446             this.lock.unlock();
447         }
448     }
449 
450     /**
451      * Enumerates all leased connections.
452      *
453      * @since 4.3
454      */
455     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
456         this.lock.lock();
457         try {
458             final Iterator<E> it = this.leased.iterator();
459             while (it.hasNext()) {
460                 final E entry = it.next();
461                 callback.process(entry);
462             }
463         } finally {
464             this.lock.unlock();
465         }
466     }
467 
468     private void purgePoolMap() {
469         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
470         while (it.hasNext()) {
471             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
472             final RouteSpecificPool<T, C, E> pool = entry.getValue();
473             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
474                 it.remove();
475             }
476         }
477     }
478 
479     /**
480      * Closes connections that have been idle longer than the given period
481      * of time and evicts them from the pool.
482      *
483      * @param idletime maximum idle time.
484      * @param tunit time unit.
485      */
486     public void closeIdle(final long idletime, final TimeUnit tunit) {
487         Args.notNull(tunit, "Time unit");
488         long time = tunit.toMillis(idletime);
489         if (time < 0) {
490             time = 0;
491         }
492         final long deadline = System.currentTimeMillis() - time;
493         enumAvailable(new PoolEntryCallback<T, C>() {
494 
495             public void process(final PoolEntry<T, C> entry) {
496                 if (entry.getUpdated() <= deadline) {
497                     entry.close();
498                 }
499             }
500 
501         });
502     }
503 
504     /**
505      * Closes expired connections and evicts them from the pool.
506      */
507     public void closeExpired() {
508         final long now = System.currentTimeMillis();
509         enumAvailable(new PoolEntryCallback<T, C>() {
510 
511             public void process(final PoolEntry<T, C> entry) {
512                 if (entry.isExpired(now)) {
513                     entry.close();
514                 }
515             }
516 
517         });
518     }
519 
520     @Override
521     public String toString() {
522         final StringBuilder buffer = new StringBuilder();
523         buffer.append("[leased: ");
524         buffer.append(this.leased);
525         buffer.append("][available: ");
526         buffer.append(this.available);
527         buffer.append("][pending: ");
528         buffer.append(this.pending);
529         buffer.append("]");
530         return buffer.toString();
531     }
532 
533 }