View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.http.annotation.ThreadSafe;
44  import org.apache.http.concurrent.FutureCallback;
45  import org.apache.http.util.Args;
46  import org.apache.http.util.Asserts;
47  
48  /**
49   * Abstract synchronous (blocking) pool of connections.
50   * <p/>
51   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
52   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
53   * method on the {@link Future} object returned by the
54   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
55   * to complete.
56   *
57   * @param <T> the route type that represents the opposite endpoint of a pooled
58   *   connection.
59   * @param <C> the connection type.
60   * @param <E> the type of the pool entry containing a pooled connection.
61   * @since 4.2
62   */
63  @ThreadSafe
64  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
65                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
66  
67      private final Lock lock;
68      private final ConnFactory<T, C> connFactory;
69      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
70      private final Set<E> leased;
71      private final LinkedList<E> available;
72      private final LinkedList<PoolEntryFuture<E>> pending;
73      private final Map<T, Integer> maxPerRoute;
74  
75      private volatile boolean isShutDown;
76      private volatile int defaultMaxPerRoute;
77      private volatile int maxTotal;
78  
79      public AbstractConnPool(
80              final ConnFactory<T, C> connFactory,
81              final int defaultMaxPerRoute,
82              final int maxTotal) {
83          super();
84          this.connFactory = Args.notNull(connFactory, "Connection factory");
85          this.defaultMaxPerRoute = Args.notNegative(defaultMaxPerRoute, "Max per route value");
86          this.maxTotal = Args.notNegative(maxTotal, "Max total value");
87          this.lock = new ReentrantLock();
88          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
89          this.leased = new HashSet<E>();
90          this.available = new LinkedList<E>();
91          this.pending = new LinkedList<PoolEntryFuture<E>>();
92          this.maxPerRoute = new HashMap<T, Integer>();
93      }
94  
95      /**
96       * Creates a new entry for the given connection with the given route.
97       */
98      protected abstract E createEntry(T route, C conn);
99  
100     public boolean isShutdown() {
101         return this.isShutDown;
102     }
103 
104     /**
105      * Shuts down the pool.
106      */
107     public void shutdown() throws IOException {
108         if (this.isShutDown) {
109             return ;
110         }
111         this.isShutDown = true;
112         this.lock.lock();
113         try {
114             for (final E entry: this.available) {
115                 entry.close();
116             }
117             for (final E entry: this.leased) {
118                 entry.close();
119             }
120             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
121                 pool.shutdown();
122             }
123             this.routeToPool.clear();
124             this.leased.clear();
125             this.available.clear();
126         } finally {
127             this.lock.unlock();
128         }
129     }
130 
131     private RouteSpecificPool<T, C, E> getPool(final T route) {
132         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
133         if (pool == null) {
134             pool = new RouteSpecificPool<T, C, E>(route) {
135 
136                 @Override
137                 protected E createEntry(final C conn) {
138                     return AbstractConnPool.this.createEntry(route, conn);
139                 }
140 
141             };
142             this.routeToPool.put(route, pool);
143         }
144         return pool;
145     }
146 
147     /**
148      * {@inheritDoc}
149      * <p/>
150      * Please note that this class does not maintain its own pool of execution
151      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
152      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
153      * returned by this method in order for the lease operation to complete.
154      */
155     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
156         Args.notNull(route, "Route");
157         Asserts.check(!this.isShutDown, "Connection pool shut down");
158         return new PoolEntryFuture<E>(this.lock, callback) {
159 
160             @Override
161             public E getPoolEntry(
162                     final long timeout,
163                     final TimeUnit tunit)
164                         throws InterruptedException, TimeoutException, IOException {
165                 return getPoolEntryBlocking(route, state, timeout, tunit, this);
166             }
167 
168         };
169     }
170 
171     /**
172      * Attempts to lease a connection for the given route and with the given
173      * state from the pool.
174      * <p/>
175      * Please note that this class does not maintain its own pool of execution
176      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
177      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
178      * returned by this method in order for the lease operation to complete.
179      *
180      * @param route route of the connection.
181      * @param state arbitrary object that represents a particular state
182      *  (usually a security principal or a unique token identifying
183      *  the user whose credentials have been used while establishing the connection).
184      *  May be <code>null</code>.
185      * @return future for a leased pool entry.
186      */
187     public Future<E> lease(final T route, final Object state) {
188         return lease(route, state, null);
189     }
190 
191     private E getPoolEntryBlocking(
192             final T route, final Object state,
193             final long timeout, final TimeUnit tunit,
194             final PoolEntryFuture<E> future)
195                 throws IOException, InterruptedException, TimeoutException {
196 
197         Date deadline = null;
198         if (timeout > 0) {
199             deadline = new Date
200                 (System.currentTimeMillis() + tunit.toMillis(timeout));
201         }
202 
203         this.lock.lock();
204         try {
205             final RouteSpecificPool<T, C, E> pool = getPool(route);
206             E entry = null;
207             while (entry == null) {
208                 Asserts.check(!this.isShutDown, "Connection pool shut down");
209                 for (;;) {
210                     entry = pool.getFree(state);
211                     if (entry == null) {
212                         break;
213                     }
214                     if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
215                         entry.close();
216                         this.available.remove(entry);
217                         pool.free(entry, false);
218                     } else {
219                         break;
220                     }
221                 }
222                 if (entry != null) {
223                     this.available.remove(entry);
224                     this.leased.add(entry);
225                     return entry;
226                 }
227 
228                 // New connection is needed
229                 final int maxPerRoute = getMax(route);
230                 // Shrink the pool prior to allocating a new connection
231                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
232                 if (excess > 0) {
233                     for (int i = 0; i < excess; i++) {
234                         final E lastUsed = pool.getLastUsed();
235                         if (lastUsed == null) {
236                             break;
237                         }
238                         lastUsed.close();
239                         this.available.remove(lastUsed);
240                         pool.remove(lastUsed);
241                     }
242                 }
243 
244                 if (pool.getAllocatedCount() < maxPerRoute) {
245                     final int totalUsed = this.leased.size();
246                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
247                     if (freeCapacity > 0) {
248                         final int totalAvailable = this.available.size();
249                         if (totalAvailable > freeCapacity - 1) {
250                             if (!this.available.isEmpty()) {
251                                 final E lastUsed = this.available.removeLast();
252                                 lastUsed.close();
253                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
254                                 otherpool.remove(lastUsed);
255                             }
256                         }
257                         final C conn = this.connFactory.create(route);
258                         entry = pool.add(conn);
259                         this.leased.add(entry);
260                         return entry;
261                     }
262                 }
263 
264                 boolean success = false;
265                 try {
266                     pool.queue(future);
267                     this.pending.add(future);
268                     success = future.await(deadline);
269                 } finally {
270                     // In case of 'success', we were woken up by the
271                     // connection pool and should now have a connection
272                     // waiting for us, or else we're shutting down.
273                     // Just continue in the loop, both cases are checked.
274                     pool.unqueue(future);
275                     this.pending.remove(future);
276                 }
277                 // check for spurious wakeup vs. timeout
278                 if (!success && (deadline != null) &&
279                     (deadline.getTime() <= System.currentTimeMillis())) {
280                     break;
281                 }
282             }
283             throw new TimeoutException("Timeout waiting for connection");
284         } finally {
285             this.lock.unlock();
286         }
287     }
288 
289     private void notifyPending(final RouteSpecificPool<T, C, E> pool) {
290         PoolEntryFuture<E> future = pool.nextPending();
291         if (future != null) {
292             this.pending.remove(future);
293         } else {
294             future = this.pending.poll();
295         }
296         if (future != null) {
297             future.wakeup();
298         }
299     }
300 
301     public void release(final E entry, final boolean reusable) {
302         this.lock.lock();
303         try {
304             if (this.leased.remove(entry)) {
305                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
306                 pool.free(entry, reusable);
307                 if (reusable && !this.isShutDown) {
308                     this.available.addFirst(entry);
309                 } else {
310                     entry.close();
311                 }
312                 notifyPending(pool);
313             }
314         } finally {
315             this.lock.unlock();
316         }
317     }
318 
319     private int getMax(final T route) {
320         final Integer v = this.maxPerRoute.get(route);
321         if (v != null) {
322             return v.intValue();
323         } else {
324             return this.defaultMaxPerRoute;
325         }
326     }
327 
328     public void setMaxTotal(final int max) {
329         Args.notNegative(max, "Max value");
330         this.lock.lock();
331         try {
332             this.maxTotal = max;
333         } finally {
334             this.lock.unlock();
335         }
336     }
337 
338     public int getMaxTotal() {
339         this.lock.lock();
340         try {
341             return this.maxTotal;
342         } finally {
343             this.lock.unlock();
344         }
345     }
346 
347     public void setDefaultMaxPerRoute(final int max) {
348         Args.notNegative(max, "Max per route value");
349         this.lock.lock();
350         try {
351             this.defaultMaxPerRoute = max;
352         } finally {
353             this.lock.unlock();
354         }
355     }
356 
357     public int getDefaultMaxPerRoute() {
358         this.lock.lock();
359         try {
360             return this.defaultMaxPerRoute;
361         } finally {
362             this.lock.unlock();
363         }
364     }
365 
366     public void setMaxPerRoute(final T route, final int max) {
367         Args.notNull(route, "Route");
368         Args.notNegative(max, "Max per route value");
369         this.lock.lock();
370         try {
371             this.maxPerRoute.put(route, max);
372         } finally {
373             this.lock.unlock();
374         }
375     }
376 
377     public int getMaxPerRoute(final T route) {
378         Args.notNull(route, "Route");
379         this.lock.lock();
380         try {
381             return getMax(route);
382         } finally {
383             this.lock.unlock();
384         }
385     }
386 
387     public PoolStats getTotalStats() {
388         this.lock.lock();
389         try {
390             return new PoolStats(
391                     this.leased.size(),
392                     this.pending.size(),
393                     this.available.size(),
394                     this.maxTotal);
395         } finally {
396             this.lock.unlock();
397         }
398     }
399 
400     public PoolStats getStats(final T route) {
401         Args.notNull(route, "Route");
402         this.lock.lock();
403         try {
404             final RouteSpecificPool<T, C, E> pool = getPool(route);
405             return new PoolStats(
406                     pool.getLeasedCount(),
407                     pool.getPendingCount(),
408                     pool.getAvailableCount(),
409                     getMax(route));
410         } finally {
411             this.lock.unlock();
412         }
413     }
414 
415     /**
416      * Closes connections that have been idle longer than the given period
417      * of time and evicts them from the pool.
418      *
419      * @param idletime maximum idle time.
420      * @param tunit time unit.
421      */
422     public void closeIdle(final long idletime, final TimeUnit tunit) {
423         Args.notNull(tunit, "Time unit");
424         long time = tunit.toMillis(idletime);
425         if (time < 0) {
426             time = 0;
427         }
428         final long deadline = System.currentTimeMillis() - time;
429         this.lock.lock();
430         try {
431             final Iterator<E> it = this.available.iterator();
432             while (it.hasNext()) {
433                 final E entry = it.next();
434                 if (entry.getUpdated() <= deadline) {
435                     entry.close();
436                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
437                     pool.remove(entry);
438                     it.remove();
439                     notifyPending(pool);
440                 }
441             }
442         } finally {
443             this.lock.unlock();
444         }
445     }
446 
447     /**
448      * Closes expired connections and evicts them from the pool.
449      */
450     public void closeExpired() {
451         final long now = System.currentTimeMillis();
452         this.lock.lock();
453         try {
454             final Iterator<E> it = this.available.iterator();
455             while (it.hasNext()) {
456                 final E entry = it.next();
457                 if (entry.isExpired(now)) {
458                     entry.close();
459                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
460                     pool.remove(entry);
461                     it.remove();
462                     notifyPending(pool);
463                 }
464             }
465         } finally {
466             this.lock.unlock();
467         }
468     }
469 
470     @Override
471     public String toString() {
472         final StringBuilder buffer = new StringBuilder();
473         buffer.append("[leased: ");
474         buffer.append(this.leased);
475         buffer.append("][available: ");
476         buffer.append(this.available);
477         buffer.append("][pending: ");
478         buffer.append(this.pending);
479         buffer.append("]");
480         return buffer.toString();
481     }
482 
483 }