View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.locks.Condition;
42  import java.util.concurrent.locks.Lock;
43  import java.util.concurrent.locks.ReentrantLock;
44  
45  import org.apache.http.annotation.Contract;
46  import org.apache.http.annotation.ThreadingBehavior;
47  import org.apache.http.concurrent.FutureCallback;
48  import org.apache.http.util.Args;
49  import org.apache.http.util.Asserts;
50  
51  /**
52   * Abstract synchronous (blocking) pool of connections.
53   * <p>
54   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
55   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
56   * method on the {@link Future} object returned by the
57   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
58   * to complete.
59   *
60   * @param <T> the route type that represents the opposite endpoint of a pooled
61   *   connection.
62   * @param <C> the connection type.
63   * @param <E> the type of the pool entry containing a pooled connection.
64   * @since 4.2
65   */
66  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
67  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
68                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
69  
70      private final Lock lock;
71      private final Condition condition;
72      private final ConnFactory<T, C> connFactory;
73      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
74      private final Set<E> leased;
75      private final LinkedList<E> available;
76      private final LinkedList<Future<E>> pending;
77      private final Map<T, Integer> maxPerRoute;
78  
79      private volatile boolean isShutDown;
80      private volatile int defaultMaxPerRoute;
81      private volatile int maxTotal;
82      private volatile int validateAfterInactivity;
83  
84      public AbstractConnPool(
85              final ConnFactory<T, C> connFactory,
86              final int defaultMaxPerRoute,
87              final int maxTotal) {
88          super();
89          this.connFactory = Args.notNull(connFactory, "Connection factory");
90          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
91          this.maxTotal = Args.positive(maxTotal, "Max total value");
92          this.lock = new ReentrantLock();
93          this.condition = this.lock.newCondition();
94          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
95          this.leased = new HashSet<E>();
96          this.available = new LinkedList<E>();
97          this.pending = new LinkedList<Future<E>>();
98          this.maxPerRoute = new HashMap<T, Integer>();
99      }
100 
101     /**
102      * Creates a new entry for the given connection with the given route.
103      */
104     protected abstract E createEntry(T route, C conn);
105 
106     /**
107      * @since 4.3
108      */
109     protected void onLease(final E entry) {
110     }
111 
112     /**
113      * @since 4.3
114      */
115     protected void onRelease(final E entry) {
116     }
117 
118     /**
119      * @since 4.4
120      */
121     protected void onReuse(final E entry) {
122     }
123 
124     /**
125      * @since 4.4
126      */
127     protected boolean validate(final E entry) {
128         return true;
129     }
130 
131     public boolean isShutdown() {
132         return this.isShutDown;
133     }
134 
135     /**
136      * Shuts down the pool.
137      */
138     public void shutdown() throws IOException {
139         if (this.isShutDown) {
140             return ;
141         }
142         this.isShutDown = true;
143         this.lock.lock();
144         try {
145             for (final E entry: this.available) {
146                 entry.close();
147             }
148             for (final E entry: this.leased) {
149                 entry.close();
150             }
151             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
152                 pool.shutdown();
153             }
154             this.routeToPool.clear();
155             this.leased.clear();
156             this.available.clear();
157         } finally {
158             this.lock.unlock();
159         }
160     }
161 
162     private RouteSpecificPool<T, C, E> getPool(final T route) {
163         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
164         if (pool == null) {
165             pool = new RouteSpecificPool<T, C, E>(route) {
166 
167                 @Override
168                 protected E createEntry(final C conn) {
169                     return AbstractConnPool.this.createEntry(route, conn);
170                 }
171 
172             };
173             this.routeToPool.put(route, pool);
174         }
175         return pool;
176     }
177 
178     /**
179      * {@inheritDoc}
180      * <p>
181      * Please note that this class does not maintain its own pool of execution
182      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
183      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
184      * returned by this method in order for the lease operation to complete.
185      */
186     @Override
187     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
188         Args.notNull(route, "Route");
189         Asserts.check(!this.isShutDown, "Connection pool shut down");
190 
191         return new Future<E>() {
192 
193             private volatile boolean cancelled;
194             private volatile boolean done;
195             private volatile E entry;
196 
197             @Override
198             public boolean cancel(final boolean mayInterruptIfRunning) {
199                 cancelled = true;
200                 lock.lock();
201                 try {
202                     condition.signalAll();
203                 } finally {
204                     lock.unlock();
205                 }
206                 synchronized (this) {
207                     final boolean result = !done;
208                     done = true;
209                     if (callback != null) {
210                         callback.cancelled();
211                     }
212                     return result;
213                 }
214             }
215 
216             @Override
217             public boolean isCancelled() {
218                 return cancelled;
219             }
220 
221             @Override
222             public boolean isDone() {
223                 return done;
224             }
225 
226             @Override
227             public E get() throws InterruptedException, ExecutionException {
228                 try {
229                     return get(0L, TimeUnit.MILLISECONDS);
230                 } catch (TimeoutException ex) {
231                     throw new ExecutionException(ex);
232                 }
233             }
234 
235             @Override
236             public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
237                 if (entry != null) {
238                     return entry;
239                 }
240                 synchronized (this) {
241                     try {
242                         for (;;) {
243                             final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
244                             if (validateAfterInactivity > 0)  {
245                                 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
246                                     if (!validate(leasedEntry)) {
247                                         leasedEntry.close();
248                                         release(leasedEntry, false);
249                                         continue;
250                                     }
251                                 }
252                             }
253                             entry = leasedEntry;
254                             done = true;
255                             onLease(entry);
256                             if (callback != null) {
257                                 callback.completed(entry);
258                             }
259                             return entry;
260                         }
261                     } catch (IOException ex) {
262                         done = true;
263                         if (callback != null) {
264                             callback.failed(ex);
265                         }
266                         throw new ExecutionException(ex);
267                     }
268                 }
269             }
270 
271         };
272     }
273 
274     /**
275      * Attempts to lease a connection for the given route and with the given
276      * state from the pool.
277      * <p>
278      * Please note that this class does not maintain its own pool of execution
279      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
280      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
281      * returned by this method in order for the lease operation to complete.
282      *
283      * @param route route of the connection.
284      * @param state arbitrary object that represents a particular state
285      *  (usually a security principal or a unique token identifying
286      *  the user whose credentials have been used while establishing the connection).
287      *  May be {@code null}.
288      * @return future for a leased pool entry.
289      */
290     public Future<E> lease(final T route, final Object state) {
291         return lease(route, state, null);
292     }
293 
294     private E getPoolEntryBlocking(
295             final T route, final Object state,
296             final long timeout, final TimeUnit tunit,
297             final Future<E> future) throws IOException, InterruptedException, TimeoutException {
298 
299         Date deadline = null;
300         if (timeout > 0) {
301             deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
302         }
303         this.lock.lock();
304         try {
305             final RouteSpecificPool<T, C, E> pool = getPool(route);
306             E entry;
307             for (;;) {
308                 Asserts.check(!this.isShutDown, "Connection pool shut down");
309                 for (;;) {
310                     entry = pool.getFree(state);
311                     if (entry == null) {
312                         break;
313                     }
314                     if (entry.isExpired(System.currentTimeMillis())) {
315                         entry.close();
316                     }
317                     if (entry.isClosed()) {
318                         this.available.remove(entry);
319                         pool.free(entry, false);
320                     } else {
321                         break;
322                     }
323                 }
324                 if (entry != null) {
325                     this.available.remove(entry);
326                     this.leased.add(entry);
327                     onReuse(entry);
328                     return entry;
329                 }
330 
331                 // New connection is needed
332                 final int maxPerRoute = getMax(route);
333                 // Shrink the pool prior to allocating a new connection
334                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
335                 if (excess > 0) {
336                     for (int i = 0; i < excess; i++) {
337                         final E lastUsed = pool.getLastUsed();
338                         if (lastUsed == null) {
339                             break;
340                         }
341                         lastUsed.close();
342                         this.available.remove(lastUsed);
343                         pool.remove(lastUsed);
344                     }
345                 }
346 
347                 if (pool.getAllocatedCount() < maxPerRoute) {
348                     final int totalUsed = this.leased.size();
349                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
350                     if (freeCapacity > 0) {
351                         final int totalAvailable = this.available.size();
352                         if (totalAvailable > freeCapacity - 1) {
353                             if (!this.available.isEmpty()) {
354                                 final E lastUsed = this.available.removeLast();
355                                 lastUsed.close();
356                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
357                                 otherpool.remove(lastUsed);
358                             }
359                         }
360                         final C conn = this.connFactory.create(route);
361                         entry = pool.add(conn);
362                         this.leased.add(entry);
363                         return entry;
364                     }
365                 }
366 
367                 boolean success = false;
368                 try {
369                     if (future.isCancelled()) {
370                         throw new InterruptedException("Operation interrupted");
371                     }
372                     pool.queue(future);
373                     this.pending.add(future);
374                     if (deadline != null) {
375                         success = this.condition.awaitUntil(deadline);
376                     } else {
377                         this.condition.await();
378                         success = true;
379                     }
380                     if (future.isCancelled()) {
381                         throw new InterruptedException("Operation interrupted");
382                     }
383                 } finally {
384                     // In case of 'success', we were woken up by the
385                     // connection pool and should now have a connection
386                     // waiting for us, or else we're shutting down.
387                     // Just continue in the loop, both cases are checked.
388                     pool.unqueue(future);
389                     this.pending.remove(future);
390                 }
391                 // check for spurious wakeup vs. timeout
392                 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
393                     break;
394                 }
395             }
396             throw new TimeoutException("Timeout waiting for connection");
397         } finally {
398             this.lock.unlock();
399         }
400     }
401 
402     @Override
403     public void release(final E entry, final boolean reusable) {
404         this.lock.lock();
405         try {
406             if (this.leased.remove(entry)) {
407                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
408                 pool.free(entry, reusable);
409                 if (reusable && !this.isShutDown) {
410                     this.available.addFirst(entry);
411                 } else {
412                     entry.close();
413                 }
414                 onRelease(entry);
415                 Future<E> future = pool.nextPending();
416                 if (future != null) {
417                     this.pending.remove(future);
418                 } else {
419                     future = this.pending.poll();
420                 }
421                 if (future != null) {
422                     this.condition.signalAll();
423                 }
424             }
425         } finally {
426             this.lock.unlock();
427         }
428     }
429 
430     private int getMax(final T route) {
431         final Integer v = this.maxPerRoute.get(route);
432         if (v != null) {
433             return v.intValue();
434         } else {
435             return this.defaultMaxPerRoute;
436         }
437     }
438 
439     @Override
440     public void setMaxTotal(final int max) {
441         Args.positive(max, "Max value");
442         this.lock.lock();
443         try {
444             this.maxTotal = max;
445         } finally {
446             this.lock.unlock();
447         }
448     }
449 
450     @Override
451     public int getMaxTotal() {
452         this.lock.lock();
453         try {
454             return this.maxTotal;
455         } finally {
456             this.lock.unlock();
457         }
458     }
459 
460     @Override
461     public void setDefaultMaxPerRoute(final int max) {
462         Args.positive(max, "Max per route value");
463         this.lock.lock();
464         try {
465             this.defaultMaxPerRoute = max;
466         } finally {
467             this.lock.unlock();
468         }
469     }
470 
471     @Override
472     public int getDefaultMaxPerRoute() {
473         this.lock.lock();
474         try {
475             return this.defaultMaxPerRoute;
476         } finally {
477             this.lock.unlock();
478         }
479     }
480 
481     @Override
482     public void setMaxPerRoute(final T route, final int max) {
483         Args.notNull(route, "Route");
484         Args.positive(max, "Max per route value");
485         this.lock.lock();
486         try {
487             this.maxPerRoute.put(route, Integer.valueOf(max));
488         } finally {
489             this.lock.unlock();
490         }
491     }
492 
493     @Override
494     public int getMaxPerRoute(final T route) {
495         Args.notNull(route, "Route");
496         this.lock.lock();
497         try {
498             return getMax(route);
499         } finally {
500             this.lock.unlock();
501         }
502     }
503 
504     @Override
505     public PoolStats getTotalStats() {
506         this.lock.lock();
507         try {
508             return new PoolStats(
509                     this.leased.size(),
510                     this.pending.size(),
511                     this.available.size(),
512                     this.maxTotal);
513         } finally {
514             this.lock.unlock();
515         }
516     }
517 
518     @Override
519     public PoolStats getStats(final T route) {
520         Args.notNull(route, "Route");
521         this.lock.lock();
522         try {
523             final RouteSpecificPool<T, C, E> pool = getPool(route);
524             return new PoolStats(
525                     pool.getLeasedCount(),
526                     pool.getPendingCount(),
527                     pool.getAvailableCount(),
528                     getMax(route));
529         } finally {
530             this.lock.unlock();
531         }
532     }
533 
534     /**
535      * Returns snapshot of all knows routes
536      * @return the set of routes
537      *
538      * @since 4.4
539      */
540     public Set<T> getRoutes() {
541         this.lock.lock();
542         try {
543             return new HashSet<T>(routeToPool.keySet());
544         } finally {
545             this.lock.unlock();
546         }
547     }
548 
549     /**
550      * Enumerates all available connections.
551      *
552      * @since 4.3
553      */
554     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
555         this.lock.lock();
556         try {
557             final Iterator<E> it = this.available.iterator();
558             while (it.hasNext()) {
559                 final E entry = it.next();
560                 callback.process(entry);
561                 if (entry.isClosed()) {
562                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
563                     pool.remove(entry);
564                     it.remove();
565                 }
566             }
567             purgePoolMap();
568         } finally {
569             this.lock.unlock();
570         }
571     }
572 
573     /**
574      * Enumerates all leased connections.
575      *
576      * @since 4.3
577      */
578     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
579         this.lock.lock();
580         try {
581             final Iterator<E> it = this.leased.iterator();
582             while (it.hasNext()) {
583                 final E entry = it.next();
584                 callback.process(entry);
585             }
586         } finally {
587             this.lock.unlock();
588         }
589     }
590 
591     private void purgePoolMap() {
592         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
593         while (it.hasNext()) {
594             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
595             final RouteSpecificPool<T, C, E> pool = entry.getValue();
596             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
597                 it.remove();
598             }
599         }
600     }
601 
602     /**
603      * Closes connections that have been idle longer than the given period
604      * of time and evicts them from the pool.
605      *
606      * @param idletime maximum idle time.
607      * @param tunit time unit.
608      */
609     public void closeIdle(final long idletime, final TimeUnit tunit) {
610         Args.notNull(tunit, "Time unit");
611         long time = tunit.toMillis(idletime);
612         if (time < 0) {
613             time = 0;
614         }
615         final long deadline = System.currentTimeMillis() - time;
616         enumAvailable(new PoolEntryCallback<T, C>() {
617 
618             @Override
619             public void process(final PoolEntry<T, C> entry) {
620                 if (entry.getUpdated() <= deadline) {
621                     entry.close();
622                 }
623             }
624 
625         });
626     }
627 
628     /**
629      * Closes expired connections and evicts them from the pool.
630      */
631     public void closeExpired() {
632         final long now = System.currentTimeMillis();
633         enumAvailable(new PoolEntryCallback<T, C>() {
634 
635             @Override
636             public void process(final PoolEntry<T, C> entry) {
637                 if (entry.isExpired(now)) {
638                     entry.close();
639                 }
640             }
641 
642         });
643     }
644 
645     /**
646      * @return the number of milliseconds
647      * @since 4.4
648      */
649     public int getValidateAfterInactivity() {
650         return this.validateAfterInactivity;
651     }
652 
653     /**
654      * @param ms the number of milliseconds
655      * @since 4.4
656      */
657     public void setValidateAfterInactivity(final int ms) {
658         this.validateAfterInactivity = ms;
659     }
660 
661     @Override
662     public String toString() {
663         final StringBuilder buffer = new StringBuilder();
664         buffer.append("[leased: ");
665         buffer.append(this.leased);
666         buffer.append("][available: ");
667         buffer.append(this.available);
668         buffer.append("][pending: ");
669         buffer.append(this.pending);
670         buffer.append("]");
671         return buffer.toString();
672     }
673 
674 }