View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicReference;
43  import java.util.concurrent.locks.Condition;
44  import java.util.concurrent.locks.Lock;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.annotation.ThreadingBehavior;
49  import org.apache.http.concurrent.FutureCallback;
50  import org.apache.http.util.Args;
51  import org.apache.http.util.Asserts;
52  
53  /**
54   * Abstract synchronous (blocking) pool of connections.
55   * <p>
56   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
57   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
58   * method on the {@link Future} object returned by the
59   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
60   * to complete.
61   *
62   * @param <T> the route type that represents the opposite endpoint of a pooled
63   *   connection.
64   * @param <C> the connection type.
65   * @param <E> the type of the pool entry containing a pooled connection.
66   * @since 4.2
67   */
68  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
69  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
70                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
71  
72      private final Lock lock;
73      private final Condition condition;
74      private final ConnFactory<T, C> connFactory;
75      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
76      private final Set<E> leased;
77      private final LinkedList<E> available;
78      private final LinkedList<Future<E>> pending;
79      private final Map<T, Integer> maxPerRoute;
80  
81      private volatile boolean isShutDown;
82      private volatile int defaultMaxPerRoute;
83      private volatile int maxTotal;
84      private volatile int validateAfterInactivity;
85  
86      public AbstractConnPool(
87              final ConnFactory<T, C> connFactory,
88              final int defaultMaxPerRoute,
89              final int maxTotal) {
90          super();
91          this.connFactory = Args.notNull(connFactory, "Connection factory");
92          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
93          this.maxTotal = Args.positive(maxTotal, "Max total value");
94          this.lock = new ReentrantLock();
95          this.condition = this.lock.newCondition();
96          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
97          this.leased = new HashSet<E>();
98          this.available = new LinkedList<E>();
99          this.pending = new LinkedList<Future<E>>();
100         this.maxPerRoute = new HashMap<T, Integer>();
101     }
102 
103     /**
104      * Creates a new entry for the given connection with the given route.
105      */
106     protected abstract E createEntry(T route, C conn);
107 
108     /**
109      * @since 4.3
110      */
111     protected void onLease(final E entry) {
112     }
113 
114     /**
115      * @since 4.3
116      */
117     protected void onRelease(final E entry) {
118     }
119 
120     /**
121      * @since 4.4
122      */
123     protected void onReuse(final E entry) {
124     }
125 
126     /**
127      * @since 4.4
128      */
129     protected boolean validate(final E entry) {
130         return true;
131     }
132 
133     public boolean isShutdown() {
134         return this.isShutDown;
135     }
136 
137     /**
138      * Shuts down the pool.
139      */
140     public void shutdown() throws IOException {
141         if (this.isShutDown) {
142             return ;
143         }
144         this.isShutDown = true;
145         this.lock.lock();
146         try {
147             for (final E entry: this.available) {
148                 entry.close();
149             }
150             for (final E entry: this.leased) {
151                 entry.close();
152             }
153             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
154                 pool.shutdown();
155             }
156             this.routeToPool.clear();
157             this.leased.clear();
158             this.available.clear();
159         } finally {
160             this.lock.unlock();
161         }
162     }
163 
164     private RouteSpecificPool<T, C, E> getPool(final T route) {
165         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
166         if (pool == null) {
167             pool = new RouteSpecificPool<T, C, E>(route) {
168 
169                 @Override
170                 protected E createEntry(final C conn) {
171                     return AbstractConnPool.this.createEntry(route, conn);
172                 }
173 
174             };
175             this.routeToPool.put(route, pool);
176         }
177         return pool;
178     }
179 
180     /**
181      * {@inheritDoc}
182      * <p>
183      * Please note that this class does not maintain its own pool of execution
184      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
185      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
186      * returned by this method in order for the lease operation to complete.
187      */
188     @Override
189     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
190         Args.notNull(route, "Route");
191         Asserts.check(!this.isShutDown, "Connection pool shut down");
192 
193         return new Future<E>() {
194 
195             private final AtomicBoolean cancelled = new AtomicBoolean(false);
196             private final AtomicBoolean done = new AtomicBoolean(false);
197             private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
198 
199             @Override
200             public boolean cancel(final boolean mayInterruptIfRunning) {
201                 if (cancelled.compareAndSet(false, true)) {
202                     done.set(true);
203                     lock.lock();
204                     try {
205                         condition.signalAll();
206                     } finally {
207                         lock.unlock();
208                     }
209                     if (callback != null) {
210                         callback.cancelled();
211                     }
212                     return true;
213                 } else {
214                     return false;
215                 }
216             }
217 
218             @Override
219             public boolean isCancelled() {
220                 return cancelled.get();
221             }
222 
223             @Override
224             public boolean isDone() {
225                 return done.get();
226             }
227 
228             @Override
229             public E get() throws InterruptedException, ExecutionException {
230                 try {
231                     return get(0L, TimeUnit.MILLISECONDS);
232                 } catch (final TimeoutException ex) {
233                     throw new ExecutionException(ex);
234                 }
235             }
236 
237             @Override
238             public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
239                 final E entry = entryRef.get();
240                 if (entry != null) {
241                     return entry;
242                 }
243                 synchronized (this) {
244                     try {
245                         for (;;) {
246                             final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
247                             if (validateAfterInactivity > 0)  {
248                                 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
249                                     if (!validate(leasedEntry)) {
250                                         leasedEntry.close();
251                                         release(leasedEntry, false);
252                                         continue;
253                                     }
254                                 }
255                             }
256                             entryRef.set(leasedEntry);
257                             done.set(true);
258                             onLease(leasedEntry);
259                             if (callback != null) {
260                                 callback.completed(leasedEntry);
261                             }
262                             return leasedEntry;
263                         }
264                     } catch (final IOException ex) {
265                         done.set(true);
266                         if (callback != null) {
267                             callback.failed(ex);
268                         }
269                         throw new ExecutionException(ex);
270                     }
271                 }
272             }
273 
274         };
275     }
276 
277     /**
278      * Attempts to lease a connection for the given route and with the given
279      * state from the pool.
280      * <p>
281      * Please note that this class does not maintain its own pool of execution
282      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
283      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
284      * returned by this method in order for the lease operation to complete.
285      *
286      * @param route route of the connection.
287      * @param state arbitrary object that represents a particular state
288      *  (usually a security principal or a unique token identifying
289      *  the user whose credentials have been used while establishing the connection).
290      *  May be {@code null}.
291      * @return future for a leased pool entry.
292      */
293     public Future<E> lease(final T route, final Object state) {
294         return lease(route, state, null);
295     }
296 
297     private E getPoolEntryBlocking(
298             final T route, final Object state,
299             final long timeout, final TimeUnit tunit,
300             final Future<E> future) throws IOException, InterruptedException, TimeoutException {
301 
302         Date deadline = null;
303         if (timeout > 0) {
304             deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
305         }
306         this.lock.lock();
307         try {
308             final RouteSpecificPool<T, C, E> pool = getPool(route);
309             E entry;
310             for (;;) {
311                 Asserts.check(!this.isShutDown, "Connection pool shut down");
312                 for (;;) {
313                     entry = pool.getFree(state);
314                     if (entry == null) {
315                         break;
316                     }
317                     if (entry.isExpired(System.currentTimeMillis())) {
318                         entry.close();
319                     }
320                     if (entry.isClosed()) {
321                         this.available.remove(entry);
322                         pool.free(entry, false);
323                     } else {
324                         break;
325                     }
326                 }
327                 if (entry != null) {
328                     this.available.remove(entry);
329                     this.leased.add(entry);
330                     onReuse(entry);
331                     return entry;
332                 }
333 
334                 // New connection is needed
335                 final int maxPerRoute = getMax(route);
336                 // Shrink the pool prior to allocating a new connection
337                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
338                 if (excess > 0) {
339                     for (int i = 0; i < excess; i++) {
340                         final E lastUsed = pool.getLastUsed();
341                         if (lastUsed == null) {
342                             break;
343                         }
344                         lastUsed.close();
345                         this.available.remove(lastUsed);
346                         pool.remove(lastUsed);
347                     }
348                 }
349 
350                 if (pool.getAllocatedCount() < maxPerRoute) {
351                     final int totalUsed = this.leased.size();
352                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
353                     if (freeCapacity > 0) {
354                         final int totalAvailable = this.available.size();
355                         if (totalAvailable > freeCapacity - 1) {
356                             if (!this.available.isEmpty()) {
357                                 final E lastUsed = this.available.removeLast();
358                                 lastUsed.close();
359                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
360                                 otherpool.remove(lastUsed);
361                             }
362                         }
363                         final C conn = this.connFactory.create(route);
364                         entry = pool.add(conn);
365                         this.leased.add(entry);
366                         return entry;
367                     }
368                 }
369 
370                 boolean success = false;
371                 try {
372                     if (future.isCancelled()) {
373                         throw new InterruptedException("Operation interrupted");
374                     }
375                     pool.queue(future);
376                     this.pending.add(future);
377                     if (deadline != null) {
378                         success = this.condition.awaitUntil(deadline);
379                     } else {
380                         this.condition.await();
381                         success = true;
382                     }
383                     if (future.isCancelled()) {
384                         throw new InterruptedException("Operation interrupted");
385                     }
386                 } finally {
387                     // In case of 'success', we were woken up by the
388                     // connection pool and should now have a connection
389                     // waiting for us, or else we're shutting down.
390                     // Just continue in the loop, both cases are checked.
391                     pool.unqueue(future);
392                     this.pending.remove(future);
393                 }
394                 // check for spurious wakeup vs. timeout
395                 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
396                     break;
397                 }
398             }
399             throw new TimeoutException("Timeout waiting for connection");
400         } finally {
401             this.lock.unlock();
402         }
403     }
404 
405     @Override
406     public void release(final E entry, final boolean reusable) {
407         this.lock.lock();
408         try {
409             if (this.leased.remove(entry)) {
410                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
411                 pool.free(entry, reusable);
412                 if (reusable && !this.isShutDown) {
413                     this.available.addFirst(entry);
414                 } else {
415                     entry.close();
416                 }
417                 onRelease(entry);
418                 Future<E> future = pool.nextPending();
419                 if (future != null) {
420                     this.pending.remove(future);
421                 } else {
422                     future = this.pending.poll();
423                 }
424                 if (future != null) {
425                     this.condition.signalAll();
426                 }
427             }
428         } finally {
429             this.lock.unlock();
430         }
431     }
432 
433     private int getMax(final T route) {
434         final Integer v = this.maxPerRoute.get(route);
435         if (v != null) {
436             return v.intValue();
437         } else {
438             return this.defaultMaxPerRoute;
439         }
440     }
441 
442     @Override
443     public void setMaxTotal(final int max) {
444         Args.positive(max, "Max value");
445         this.lock.lock();
446         try {
447             this.maxTotal = max;
448         } finally {
449             this.lock.unlock();
450         }
451     }
452 
453     @Override
454     public int getMaxTotal() {
455         this.lock.lock();
456         try {
457             return this.maxTotal;
458         } finally {
459             this.lock.unlock();
460         }
461     }
462 
463     @Override
464     public void setDefaultMaxPerRoute(final int max) {
465         Args.positive(max, "Max per route value");
466         this.lock.lock();
467         try {
468             this.defaultMaxPerRoute = max;
469         } finally {
470             this.lock.unlock();
471         }
472     }
473 
474     @Override
475     public int getDefaultMaxPerRoute() {
476         this.lock.lock();
477         try {
478             return this.defaultMaxPerRoute;
479         } finally {
480             this.lock.unlock();
481         }
482     }
483 
484     @Override
485     public void setMaxPerRoute(final T route, final int max) {
486         Args.notNull(route, "Route");
487         Args.positive(max, "Max per route value");
488         this.lock.lock();
489         try {
490             this.maxPerRoute.put(route, Integer.valueOf(max));
491         } finally {
492             this.lock.unlock();
493         }
494     }
495 
496     @Override
497     public int getMaxPerRoute(final T route) {
498         Args.notNull(route, "Route");
499         this.lock.lock();
500         try {
501             return getMax(route);
502         } finally {
503             this.lock.unlock();
504         }
505     }
506 
507     @Override
508     public PoolStats getTotalStats() {
509         this.lock.lock();
510         try {
511             return new PoolStats(
512                     this.leased.size(),
513                     this.pending.size(),
514                     this.available.size(),
515                     this.maxTotal);
516         } finally {
517             this.lock.unlock();
518         }
519     }
520 
521     @Override
522     public PoolStats getStats(final T route) {
523         Args.notNull(route, "Route");
524         this.lock.lock();
525         try {
526             final RouteSpecificPool<T, C, E> pool = getPool(route);
527             return new PoolStats(
528                     pool.getLeasedCount(),
529                     pool.getPendingCount(),
530                     pool.getAvailableCount(),
531                     getMax(route));
532         } finally {
533             this.lock.unlock();
534         }
535     }
536 
537     /**
538      * Returns snapshot of all knows routes
539      * @return the set of routes
540      *
541      * @since 4.4
542      */
543     public Set<T> getRoutes() {
544         this.lock.lock();
545         try {
546             return new HashSet<T>(routeToPool.keySet());
547         } finally {
548             this.lock.unlock();
549         }
550     }
551 
552     /**
553      * Enumerates all available connections.
554      *
555      * @since 4.3
556      */
557     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
558         this.lock.lock();
559         try {
560             final Iterator<E> it = this.available.iterator();
561             while (it.hasNext()) {
562                 final E entry = it.next();
563                 callback.process(entry);
564                 if (entry.isClosed()) {
565                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
566                     pool.remove(entry);
567                     it.remove();
568                 }
569             }
570             purgePoolMap();
571         } finally {
572             this.lock.unlock();
573         }
574     }
575 
576     /**
577      * Enumerates all leased connections.
578      *
579      * @since 4.3
580      */
581     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
582         this.lock.lock();
583         try {
584             final Iterator<E> it = this.leased.iterator();
585             while (it.hasNext()) {
586                 final E entry = it.next();
587                 callback.process(entry);
588             }
589         } finally {
590             this.lock.unlock();
591         }
592     }
593 
594     private void purgePoolMap() {
595         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
596         while (it.hasNext()) {
597             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
598             final RouteSpecificPool<T, C, E> pool = entry.getValue();
599             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
600                 it.remove();
601             }
602         }
603     }
604 
605     /**
606      * Closes connections that have been idle longer than the given period
607      * of time and evicts them from the pool.
608      *
609      * @param idletime maximum idle time.
610      * @param tunit time unit.
611      */
612     public void closeIdle(final long idletime, final TimeUnit tunit) {
613         Args.notNull(tunit, "Time unit");
614         long time = tunit.toMillis(idletime);
615         if (time < 0) {
616             time = 0;
617         }
618         final long deadline = System.currentTimeMillis() - time;
619         enumAvailable(new PoolEntryCallback<T, C>() {
620 
621             @Override
622             public void process(final PoolEntry<T, C> entry) {
623                 if (entry.getUpdated() <= deadline) {
624                     entry.close();
625                 }
626             }
627 
628         });
629     }
630 
631     /**
632      * Closes expired connections and evicts them from the pool.
633      */
634     public void closeExpired() {
635         final long now = System.currentTimeMillis();
636         enumAvailable(new PoolEntryCallback<T, C>() {
637 
638             @Override
639             public void process(final PoolEntry<T, C> entry) {
640                 if (entry.isExpired(now)) {
641                     entry.close();
642                 }
643             }
644 
645         });
646     }
647 
648     /**
649      * @return the number of milliseconds
650      * @since 4.4
651      */
652     public int getValidateAfterInactivity() {
653         return this.validateAfterInactivity;
654     }
655 
656     /**
657      * @param ms the number of milliseconds
658      * @since 4.4
659      */
660     public void setValidateAfterInactivity(final int ms) {
661         this.validateAfterInactivity = ms;
662     }
663 
664     @Override
665     public String toString() {
666         final StringBuilder buffer = new StringBuilder();
667         buffer.append("[leased: ");
668         buffer.append(this.leased);
669         buffer.append("][available: ");
670         buffer.append(this.available);
671         buffer.append("][pending: ");
672         buffer.append(this.pending);
673         buffer.append("]");
674         return buffer.toString();
675     }
676 
677 }