View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.concurrent.FutureCallback;
45  
46  /**
47   * Abstract synchronous (blocking) pool of connections.
48   * <p/>
49   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
50   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
51   * method on the {@link Future} object returned by the
52   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
53   * to complete.
54   *
55   * @param <T> the route type that represents the opposite endpoint of a pooled
56   *   connection.
57   * @param <C> the connection type.
58   * @param <E> the type of the pool entry containing a pooled connection.
59   * @since 4.2
60   */
61  @ThreadSafe
62  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
63                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
64  
65      private final Lock lock;
66      private final ConnFactory<T, C> connFactory;
67      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
68      private final Set<E> leased;
69      private final LinkedList<E> available;
70      private final LinkedList<PoolEntryFuture<E>> pending;
71      private final Map<T, Integer> maxPerRoute;
72  
73      private volatile boolean isShutDown;
74      private volatile int defaultMaxPerRoute;
75      private volatile int maxTotal;
76  
77      public AbstractConnPool(
78              final ConnFactory<T, C> connFactory,
79              int defaultMaxPerRoute,
80              int maxTotal) {
81          super();
82          if (connFactory == null) {
83              throw new IllegalArgumentException("Connection factory may not null");
84          }
85          if (defaultMaxPerRoute <= 0) {
86              throw new IllegalArgumentException("Max per route value may not be negative or zero");
87          }
88          if (maxTotal <= 0) {
89              throw new IllegalArgumentException("Max total value may not be negative or zero");
90          }
91          this.lock = new ReentrantLock();
92          this.connFactory = connFactory;
93          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
94          this.leased = new HashSet<E>();
95          this.available = new LinkedList<E>();
96          this.pending = new LinkedList<PoolEntryFuture<E>>();
97          this.maxPerRoute = new HashMap<T, Integer>();
98          this.defaultMaxPerRoute = defaultMaxPerRoute;
99          this.maxTotal = maxTotal;
100     }
101 
102     /**
103      * Creates a new entry for the given connection with the given route.
104      */
105     protected abstract E createEntry(T route, C conn);
106 
107     public boolean isShutdown() {
108         return this.isShutDown;
109     }
110 
111     /**
112      * Shuts down the pool.
113      */
114     public void shutdown() throws IOException {
115         if (this.isShutDown) {
116             return ;
117         }
118         this.isShutDown = true;
119         this.lock.lock();
120         try {
121             for (E entry: this.available) {
122                 entry.close();
123             }
124             for (E entry: this.leased) {
125                 entry.close();
126             }
127             for (RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
128                 pool.shutdown();
129             }
130             this.routeToPool.clear();
131             this.leased.clear();
132             this.available.clear();
133         } finally {
134             this.lock.unlock();
135         }
136     }
137 
138     private RouteSpecificPool<T, C, E> getPool(final T route) {
139         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
140         if (pool == null) {
141             pool = new RouteSpecificPool<T, C, E>(route) {
142 
143                 @Override
144                 protected E createEntry(C conn) {
145                     return AbstractConnPool.this.createEntry(route, conn);
146                 }
147 
148             };
149             this.routeToPool.put(route, pool);
150         }
151         return pool;
152     }
153 
154     /**
155      * {@inheritDoc}
156      * <p/>
157      * Please note that this class does not maintain its own pool of execution
158      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
159      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
160      * returned by this method in order for the lease operation to complete.
161      */
162     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
163         if (route == null) {
164             throw new IllegalArgumentException("Route may not be null");
165         }
166         if (this.isShutDown) {
167             throw new IllegalStateException("Connection pool shut down");
168         }
169         return new PoolEntryFuture<E>(this.lock, callback) {
170 
171             @Override
172             public E getPoolEntry(
173                     long timeout,
174                     TimeUnit tunit)
175                         throws InterruptedException, TimeoutException, IOException {
176                 return getPoolEntryBlocking(route, state, timeout, tunit, this);
177             }
178 
179         };
180     }
181 
182     /**
183      * Attempts to lease a connection for the given route and with the given
184      * state from the pool.
185      * <p/>
186      * Please note that this class does not maintain its own pool of execution
187      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
188      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
189      * returned by this method in order for the lease operation to complete.
190      *
191      * @param route route of the connection.
192      * @param state arbitrary object that represents a particular state
193      *  (usually a security principal or a unique token identifying
194      *  the user whose credentials have been used while establishing the connection).
195      *  May be <code>null</code>.
196      * @return future for a leased pool entry.
197      */
198     public Future<E> lease(final T route, final Object state) {
199         return lease(route, state, null);
200     }
201 
202     private E getPoolEntryBlocking(
203             final T route, final Object state,
204             final long timeout, final TimeUnit tunit,
205             final PoolEntryFuture<E> future)
206                 throws IOException, InterruptedException, TimeoutException {
207 
208         Date deadline = null;
209         if (timeout > 0) {
210             deadline = new Date
211                 (System.currentTimeMillis() + tunit.toMillis(timeout));
212         }
213 
214         this.lock.lock();
215         try {
216             RouteSpecificPool<T, C, E> pool = getPool(route);
217             E entry = null;
218             while (entry == null) {
219                 if (this.isShutDown) {
220                     throw new IllegalStateException("Connection pool shut down");
221                 }
222                 for (;;) {
223                     entry = pool.getFree(state);
224                     if (entry == null) {
225                         break;
226                     }
227                     if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
228                         entry.close();
229                         this.available.remove(entry);
230                         pool.free(entry, false);
231                     } else {
232                         break;
233                     }
234                 }
235                 if (entry != null) {
236                     this.available.remove(entry);
237                     this.leased.add(entry);
238                     return entry;
239                 }
240 
241                 // New connection is needed
242                 int maxPerRoute = getMax(route);
243                 // Shrink the pool prior to allocating a new connection
244                 int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
245                 if (excess > 0) {
246                     for (int i = 0; i < excess; i++) {
247                         E lastUsed = pool.getLastUsed();
248                         if (lastUsed == null) {
249                             break;
250                         }
251                         lastUsed.close();
252                         this.available.remove(lastUsed);
253                         pool.remove(lastUsed);
254                     }
255                 }
256 
257                 if (pool.getAllocatedCount() < maxPerRoute) {
258                     int totalUsed = this.leased.size();
259                     int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
260                     if (freeCapacity > 0) {
261                         int totalAvailable = this.available.size();
262                         if (totalAvailable > freeCapacity - 1) {
263                             if (!this.available.isEmpty()) {
264                                 E lastUsed = this.available.removeLast();
265                                 lastUsed.close();
266                                 RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
267                                 otherpool.remove(lastUsed);
268                             }
269                         }
270                         C conn = this.connFactory.create(route);
271                         entry = pool.add(conn);
272                         this.leased.add(entry);
273                         return entry;
274                     }
275                 }
276 
277                 boolean success = false;
278                 try {
279                     pool.queue(future);
280                     this.pending.add(future);
281                     success = future.await(deadline);
282                 } finally {
283                     // In case of 'success', we were woken up by the
284                     // connection pool and should now have a connection
285                     // waiting for us, or else we're shutting down.
286                     // Just continue in the loop, both cases are checked.
287                     pool.unqueue(future);
288                     this.pending.remove(future);
289                 }
290                 // check for spurious wakeup vs. timeout
291                 if (!success && (deadline != null) &&
292                     (deadline.getTime() <= System.currentTimeMillis())) {
293                     break;
294                 }
295             }
296             throw new TimeoutException("Timeout waiting for connection");
297         } finally {
298             this.lock.unlock();
299         }
300     }
301 
302     private void notifyPending(final RouteSpecificPool<T, C, E> pool) {
303         PoolEntryFuture<E> future = pool.nextPending();
304         if (future != null) {
305             this.pending.remove(future);
306         } else {
307             future = this.pending.poll();
308         }
309         if (future != null) {
310             future.wakeup();
311         }
312     }
313 
314     public void release(E entry, boolean reusable) {
315         this.lock.lock();
316         try {
317             if (this.leased.remove(entry)) {
318                 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
319                 pool.free(entry, reusable);
320                 if (reusable && !this.isShutDown) {
321                     this.available.addFirst(entry);
322                 } else {
323                     entry.close();
324                 }
325                 notifyPending(pool);
326             }
327         } finally {
328             this.lock.unlock();
329         }
330     }
331 
332     private int getMax(final T route) {
333         Integer v = this.maxPerRoute.get(route);
334         if (v != null) {
335             return v.intValue();
336         } else {
337             return this.defaultMaxPerRoute;
338         }
339     }
340 
341     public void setMaxTotal(int max) {
342         if (max <= 0) {
343             throw new IllegalArgumentException("Max value may not be negative or zero");
344         }
345         this.lock.lock();
346         try {
347             this.maxTotal = max;
348         } finally {
349             this.lock.unlock();
350         }
351     }
352 
353     public int getMaxTotal() {
354         this.lock.lock();
355         try {
356             return this.maxTotal;
357         } finally {
358             this.lock.unlock();
359         }
360     }
361 
362     public void setDefaultMaxPerRoute(int max) {
363         if (max <= 0) {
364             throw new IllegalArgumentException("Max value may not be negative or zero");
365         }
366         this.lock.lock();
367         try {
368             this.defaultMaxPerRoute = max;
369         } finally {
370             this.lock.unlock();
371         }
372     }
373 
374     public int getDefaultMaxPerRoute() {
375         this.lock.lock();
376         try {
377             return this.defaultMaxPerRoute;
378         } finally {
379             this.lock.unlock();
380         }
381     }
382 
383     public void setMaxPerRoute(final T route, int max) {
384         if (route == null) {
385             throw new IllegalArgumentException("Route may not be null");
386         }
387         if (max <= 0) {
388             throw new IllegalArgumentException("Max value may not be negative or zero");
389         }
390         this.lock.lock();
391         try {
392             this.maxPerRoute.put(route, max);
393         } finally {
394             this.lock.unlock();
395         }
396     }
397 
398     public int getMaxPerRoute(T route) {
399         if (route == null) {
400             throw new IllegalArgumentException("Route may not be null");
401         }
402         this.lock.lock();
403         try {
404             return getMax(route);
405         } finally {
406             this.lock.unlock();
407         }
408     }
409 
410     public PoolStats getTotalStats() {
411         this.lock.lock();
412         try {
413             return new PoolStats(
414                     this.leased.size(),
415                     this.pending.size(),
416                     this.available.size(),
417                     this.maxTotal);
418         } finally {
419             this.lock.unlock();
420         }
421     }
422 
423     public PoolStats getStats(final T route) {
424         if (route == null) {
425             throw new IllegalArgumentException("Route may not be null");
426         }
427         this.lock.lock();
428         try {
429             RouteSpecificPool<T, C, E> pool = getPool(route);
430             return new PoolStats(
431                     pool.getLeasedCount(),
432                     pool.getPendingCount(),
433                     pool.getAvailableCount(),
434                     getMax(route));
435         } finally {
436             this.lock.unlock();
437         }
438     }
439 
440     /**
441      * Closes connections that have been idle longer than the given period
442      * of time and evicts them from the pool.
443      *
444      * @param idletime maximum idle time.
445      * @param tunit time unit.
446      */
447     public void closeIdle(long idletime, final TimeUnit tunit) {
448         if (tunit == null) {
449             throw new IllegalArgumentException("Time unit must not be null.");
450         }
451         long time = tunit.toMillis(idletime);
452         if (time < 0) {
453             time = 0;
454         }
455         long deadline = System.currentTimeMillis() - time;
456         this.lock.lock();
457         try {
458             Iterator<E> it = this.available.iterator();
459             while (it.hasNext()) {
460                 E entry = it.next();
461                 if (entry.getUpdated() <= deadline) {
462                     entry.close();
463                     RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
464                     pool.remove(entry);
465                     it.remove();
466                     notifyPending(pool);
467                 }
468             }
469         } finally {
470             this.lock.unlock();
471         }
472     }
473 
474     /**
475      * Closes expired connections and evicts them from the pool.
476      */
477     public void closeExpired() {
478         long now = System.currentTimeMillis();
479         this.lock.lock();
480         try {
481             Iterator<E> it = this.available.iterator();
482             while (it.hasNext()) {
483                 E entry = it.next();
484                 if (entry.isExpired(now)) {
485                     entry.close();
486                     RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
487                     pool.remove(entry);
488                     it.remove();
489                     notifyPending(pool);
490                 }
491             }
492         } finally {
493             this.lock.unlock();
494         }
495     }
496 
497     @Override
498     public String toString() {
499         StringBuilder buffer = new StringBuilder();
500         buffer.append("[leased: ");
501         buffer.append(this.leased);
502         buffer.append("][available: ");
503         buffer.append(this.available);
504         buffer.append("][pending: ");
505         buffer.append(this.pending);
506         buffer.append("]");
507         return buffer.toString();
508     }
509 
510 }