View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicReference;
43  import java.util.concurrent.locks.Condition;
44  import java.util.concurrent.locks.Lock;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.annotation.ThreadingBehavior;
49  import org.apache.http.concurrent.FutureCallback;
50  import org.apache.http.util.Args;
51  import org.apache.http.util.Asserts;
52  
53  /**
54   * Abstract synchronous (blocking) pool of connections.
55   * <p>
56   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
57   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
58   * method on the {@link Future} object returned by the
59   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
60   * to complete.
61   *
62   * @param <T> the route type that represents the opposite endpoint of a pooled
63   *   connection.
64   * @param <C> the connection type.
65   * @param <E> the type of the pool entry containing a pooled connection.
66   * @since 4.2
67   */
68  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
69  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
70                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
71  
72      private final Lock lock;
73      private final Condition condition;
74      private final ConnFactory<T, C> connFactory;
75      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
76      private final Set<E> leased;
77      private final LinkedList<E> available;
78      private final LinkedList<Future<E>> pending;
79      private final Map<T, Integer> maxPerRoute;
80  
81      private volatile boolean isShutDown;
82      private volatile int defaultMaxPerRoute;
83      private volatile int maxTotal;
84      private volatile int validateAfterInactivity;
85  
86      public AbstractConnPool(
87              final ConnFactory<T, C> connFactory,
88              final int defaultMaxPerRoute,
89              final int maxTotal) {
90          super();
91          this.connFactory = Args.notNull(connFactory, "Connection factory");
92          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
93          this.maxTotal = Args.positive(maxTotal, "Max total value");
94          this.lock = new ReentrantLock();
95          this.condition = this.lock.newCondition();
96          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
97          this.leased = new HashSet<E>();
98          this.available = new LinkedList<E>();
99          this.pending = new LinkedList<Future<E>>();
100         this.maxPerRoute = new HashMap<T, Integer>();
101     }
102 
103     /**
104      * Creates a new entry for the given connection with the given route.
105      */
106     protected abstract E createEntry(T route, C conn);
107 
108     /**
109      * @since 4.3
110      */
111     protected void onLease(final E entry) {
112     }
113 
114     /**
115      * @since 4.3
116      */
117     protected void onRelease(final E entry) {
118     }
119 
120     /**
121      * @since 4.4
122      */
123     protected void onReuse(final E entry) {
124     }
125 
126     /**
127      * @since 4.4
128      */
129     protected boolean validate(final E entry) {
130         return true;
131     }
132 
133     public boolean isShutdown() {
134         return this.isShutDown;
135     }
136 
137     /**
138      * Shuts down the pool.
139      */
140     public void shutdown() throws IOException {
141         if (this.isShutDown) {
142             return ;
143         }
144         this.isShutDown = true;
145         this.lock.lock();
146         try {
147             for (final E entry: this.available) {
148                 entry.close();
149             }
150             for (final E entry: this.leased) {
151                 entry.close();
152             }
153             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
154                 pool.shutdown();
155             }
156             this.routeToPool.clear();
157             this.leased.clear();
158             this.available.clear();
159         } finally {
160             this.lock.unlock();
161         }
162     }
163 
164     private RouteSpecificPool<T, C, E> getPool(final T route) {
165         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
166         if (pool == null) {
167             pool = new RouteSpecificPool<T, C, E>(route) {
168 
169                 @Override
170                 protected E createEntry(final C conn) {
171                     return AbstractConnPool.this.createEntry(route, conn);
172                 }
173 
174             };
175             this.routeToPool.put(route, pool);
176         }
177         return pool;
178     }
179 
180     /**
181      * {@inheritDoc}
182      * <p>
183      * Please note that this class does not maintain its own pool of execution
184      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
185      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
186      * returned by this method in order for the lease operation to complete.
187      */
188     @Override
189     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
190         Args.notNull(route, "Route");
191         Asserts.check(!this.isShutDown, "Connection pool shut down");
192 
193         return new Future<E>() {
194 
195             private final AtomicBoolean cancelled = new AtomicBoolean(false);
196             private final AtomicBoolean done = new AtomicBoolean(false);
197             private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
198 
199             @Override
200             public boolean cancel(final boolean mayInterruptIfRunning) {
201                 if (cancelled.compareAndSet(false, true)) {
202                     done.set(true);
203                     lock.lock();
204                     try {
205                         condition.signalAll();
206                     } finally {
207                         lock.unlock();
208                     }
209                     if (callback != null) {
210                         callback.cancelled();
211                     }
212                     return true;
213                 }
214                 return false;
215             }
216 
217             @Override
218             public boolean isCancelled() {
219                 return cancelled.get();
220             }
221 
222             @Override
223             public boolean isDone() {
224                 return done.get();
225             }
226 
227             @Override
228             public E get() throws InterruptedException, ExecutionException {
229                 try {
230                     return get(0L, TimeUnit.MILLISECONDS);
231                 } catch (final TimeoutException ex) {
232                     throw new ExecutionException(ex);
233                 }
234             }
235 
236             @Override
237             public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
238                 final E entry = entryRef.get();
239                 if (entry != null) {
240                     return entry;
241                 }
242                 synchronized (this) {
243                     try {
244                         for (;;) {
245                             final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
246                             if (validateAfterInactivity > 0)  {
247                                 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
248                                     if (!validate(leasedEntry)) {
249                                         leasedEntry.close();
250                                         release(leasedEntry, false);
251                                         continue;
252                                     }
253                                 }
254                             }
255                             entryRef.set(leasedEntry);
256                             done.set(true);
257                             onLease(leasedEntry);
258                             if (callback != null) {
259                                 callback.completed(leasedEntry);
260                             }
261                             return leasedEntry;
262                         }
263                     } catch (final IOException ex) {
264                         done.set(true);
265                         if (callback != null) {
266                             callback.failed(ex);
267                         }
268                         throw new ExecutionException(ex);
269                     }
270                 }
271             }
272 
273         };
274     }
275 
276     /**
277      * Attempts to lease a connection for the given route and with the given
278      * state from the pool.
279      * <p>
280      * Please note that this class does not maintain its own pool of execution
281      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
282      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
283      * returned by this method in order for the lease operation to complete.
284      *
285      * @param route route of the connection.
286      * @param state arbitrary object that represents a particular state
287      *  (usually a security principal or a unique token identifying
288      *  the user whose credentials have been used while establishing the connection).
289      *  May be {@code null}.
290      * @return future for a leased pool entry.
291      */
292     public Future<E> lease(final T route, final Object state) {
293         return lease(route, state, null);
294     }
295 
296     private E getPoolEntryBlocking(
297             final T route, final Object state,
298             final long timeout, final TimeUnit timeUnit,
299             final Future<E> future) throws IOException, InterruptedException, TimeoutException {
300 
301         Date deadline = null;
302         if (timeout > 0) {
303             deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
304         }
305         this.lock.lock();
306         try {
307             final RouteSpecificPool<T, C, E> pool = getPool(route);
308             E entry;
309             for (;;) {
310                 Asserts.check(!this.isShutDown, "Connection pool shut down");
311                 for (;;) {
312                     entry = pool.getFree(state);
313                     if (entry == null) {
314                         break;
315                     }
316                     if (entry.isExpired(System.currentTimeMillis())) {
317                         entry.close();
318                     }
319                     if (entry.isClosed()) {
320                         this.available.remove(entry);
321                         pool.free(entry, false);
322                     } else {
323                         break;
324                     }
325                 }
326                 if (entry != null) {
327                     this.available.remove(entry);
328                     this.leased.add(entry);
329                     onReuse(entry);
330                     return entry;
331                 }
332 
333                 // New connection is needed
334                 final int maxPerRoute = getMax(route);
335                 // Shrink the pool prior to allocating a new connection
336                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
337                 if (excess > 0) {
338                     for (int i = 0; i < excess; i++) {
339                         final E lastUsed = pool.getLastUsed();
340                         if (lastUsed == null) {
341                             break;
342                         }
343                         lastUsed.close();
344                         this.available.remove(lastUsed);
345                         pool.remove(lastUsed);
346                     }
347                 }
348 
349                 if (pool.getAllocatedCount() < maxPerRoute) {
350                     final int totalUsed = this.leased.size();
351                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
352                     if (freeCapacity > 0) {
353                         final int totalAvailable = this.available.size();
354                         if (totalAvailable > freeCapacity - 1) {
355                             if (!this.available.isEmpty()) {
356                                 final E lastUsed = this.available.removeLast();
357                                 lastUsed.close();
358                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
359                                 otherpool.remove(lastUsed);
360                             }
361                         }
362                         final C conn = this.connFactory.create(route);
363                         entry = pool.add(conn);
364                         this.leased.add(entry);
365                         return entry;
366                     }
367                 }
368 
369                 boolean success = false;
370                 try {
371                     if (future.isCancelled()) {
372                         throw new InterruptedException("Operation interrupted");
373                     }
374                     pool.queue(future);
375                     this.pending.add(future);
376                     if (deadline != null) {
377                         success = this.condition.awaitUntil(deadline);
378                     } else {
379                         this.condition.await();
380                         success = true;
381                     }
382                     if (future.isCancelled()) {
383                         throw new InterruptedException("Operation interrupted");
384                     }
385                 } finally {
386                     // In case of 'success', we were woken up by the
387                     // connection pool and should now have a connection
388                     // waiting for us, or else we're shutting down.
389                     // Just continue in the loop, both cases are checked.
390                     pool.unqueue(future);
391                     this.pending.remove(future);
392                 }
393                 // check for spurious wakeup vs. timeout
394                 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
395                     break;
396                 }
397             }
398             throw new TimeoutException("Timeout waiting for connection");
399         } finally {
400             this.lock.unlock();
401         }
402     }
403 
404     @Override
405     public void release(final E entry, final boolean reusable) {
406         this.lock.lock();
407         try {
408             if (this.leased.remove(entry)) {
409                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
410                 pool.free(entry, reusable);
411                 if (reusable && !this.isShutDown) {
412                     this.available.addFirst(entry);
413                 } else {
414                     entry.close();
415                 }
416                 onRelease(entry);
417                 Future<E> future = pool.nextPending();
418                 if (future != null) {
419                     this.pending.remove(future);
420                 } else {
421                     future = this.pending.poll();
422                 }
423                 if (future != null) {
424                     this.condition.signalAll();
425                 }
426             }
427         } finally {
428             this.lock.unlock();
429         }
430     }
431 
432     private int getMax(final T route) {
433         final Integer v = this.maxPerRoute.get(route);
434         return v != null ? v.intValue() : this.defaultMaxPerRoute;
435     }
436 
437     @Override
438     public void setMaxTotal(final int max) {
439         Args.positive(max, "Max value");
440         this.lock.lock();
441         try {
442             this.maxTotal = max;
443         } finally {
444             this.lock.unlock();
445         }
446     }
447 
448     @Override
449     public int getMaxTotal() {
450         this.lock.lock();
451         try {
452             return this.maxTotal;
453         } finally {
454             this.lock.unlock();
455         }
456     }
457 
458     @Override
459     public void setDefaultMaxPerRoute(final int max) {
460         Args.positive(max, "Max per route value");
461         this.lock.lock();
462         try {
463             this.defaultMaxPerRoute = max;
464         } finally {
465             this.lock.unlock();
466         }
467     }
468 
469     @Override
470     public int getDefaultMaxPerRoute() {
471         this.lock.lock();
472         try {
473             return this.defaultMaxPerRoute;
474         } finally {
475             this.lock.unlock();
476         }
477     }
478 
479     @Override
480     public void setMaxPerRoute(final T route, final int max) {
481         Args.notNull(route, "Route");
482         this.lock.lock();
483         try {
484             if (max > -1) {
485                 this.maxPerRoute.put(route, Integer.valueOf(max));
486             } else {
487                 this.maxPerRoute.remove(route);
488             }
489         } finally {
490             this.lock.unlock();
491         }
492     }
493 
494     @Override
495     public int getMaxPerRoute(final T route) {
496         Args.notNull(route, "Route");
497         this.lock.lock();
498         try {
499             return getMax(route);
500         } finally {
501             this.lock.unlock();
502         }
503     }
504 
505     @Override
506     public PoolStats getTotalStats() {
507         this.lock.lock();
508         try {
509             return new PoolStats(
510                     this.leased.size(),
511                     this.pending.size(),
512                     this.available.size(),
513                     this.maxTotal);
514         } finally {
515             this.lock.unlock();
516         }
517     }
518 
519     @Override
520     public PoolStats getStats(final T route) {
521         Args.notNull(route, "Route");
522         this.lock.lock();
523         try {
524             final RouteSpecificPool<T, C, E> pool = getPool(route);
525             return new PoolStats(
526                     pool.getLeasedCount(),
527                     pool.getPendingCount(),
528                     pool.getAvailableCount(),
529                     getMax(route));
530         } finally {
531             this.lock.unlock();
532         }
533     }
534 
535     /**
536      * Returns snapshot of all knows routes
537      * @return the set of routes
538      *
539      * @since 4.4
540      */
541     public Set<T> getRoutes() {
542         this.lock.lock();
543         try {
544             return new HashSet<T>(routeToPool.keySet());
545         } finally {
546             this.lock.unlock();
547         }
548     }
549 
550     /**
551      * Enumerates all available connections.
552      *
553      * @since 4.3
554      */
555     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
556         this.lock.lock();
557         try {
558             final Iterator<E> it = this.available.iterator();
559             while (it.hasNext()) {
560                 final E entry = it.next();
561                 callback.process(entry);
562                 if (entry.isClosed()) {
563                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
564                     pool.remove(entry);
565                     it.remove();
566                 }
567             }
568             purgePoolMap();
569         } finally {
570             this.lock.unlock();
571         }
572     }
573 
574     /**
575      * Enumerates all leased connections.
576      *
577      * @since 4.3
578      */
579     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
580         this.lock.lock();
581         try {
582             final Iterator<E> it = this.leased.iterator();
583             while (it.hasNext()) {
584                 final E entry = it.next();
585                 callback.process(entry);
586             }
587         } finally {
588             this.lock.unlock();
589         }
590     }
591 
592     private void purgePoolMap() {
593         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
594         while (it.hasNext()) {
595             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
596             final RouteSpecificPool<T, C, E> pool = entry.getValue();
597             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
598                 it.remove();
599             }
600         }
601     }
602 
603     /**
604      * Closes connections that have been idle longer than the given period
605      * of time and evicts them from the pool.
606      *
607      * @param idletime maximum idle time.
608      * @param timeUnit time unit.
609      */
610     public void closeIdle(final long idletime, final TimeUnit timeUnit) {
611         Args.notNull(timeUnit, "Time unit");
612         long time = timeUnit.toMillis(idletime);
613         if (time < 0) {
614             time = 0;
615         }
616         final long deadline = System.currentTimeMillis() - time;
617         enumAvailable(new PoolEntryCallback<T, C>() {
618 
619             @Override
620             public void process(final PoolEntry<T, C> entry) {
621                 if (entry.getUpdated() <= deadline) {
622                     entry.close();
623                 }
624             }
625 
626         });
627     }
628 
629     /**
630      * Closes expired connections and evicts them from the pool.
631      */
632     public void closeExpired() {
633         final long now = System.currentTimeMillis();
634         enumAvailable(new PoolEntryCallback<T, C>() {
635 
636             @Override
637             public void process(final PoolEntry<T, C> entry) {
638                 if (entry.isExpired(now)) {
639                     entry.close();
640                 }
641             }
642 
643         });
644     }
645 
646     /**
647      * @return the number of milliseconds
648      * @since 4.4
649      */
650     public int getValidateAfterInactivity() {
651         return this.validateAfterInactivity;
652     }
653 
654     /**
655      * @param ms the number of milliseconds
656      * @since 4.4
657      */
658     public void setValidateAfterInactivity(final int ms) {
659         this.validateAfterInactivity = ms;
660     }
661 
662     @Override
663     public String toString() {
664         this.lock.lock();
665         try {
666             final StringBuilder buffer = new StringBuilder();
667             buffer.append("[leased: ");
668             buffer.append(this.leased);
669             buffer.append("][available: ");
670             buffer.append(this.available);
671             buffer.append("][pending: ");
672             buffer.append(this.pending);
673             buffer.append("]");
674             return buffer.toString();
675         } finally {
676             this.lock.unlock();
677         }
678     }
679 
680 }