View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicReference;
43  import java.util.concurrent.locks.Condition;
44  import java.util.concurrent.locks.Lock;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.annotation.ThreadingBehavior;
49  import org.apache.http.concurrent.FutureCallback;
50  import org.apache.http.util.Args;
51  import org.apache.http.util.Asserts;
52  
53  /**
54   * Abstract synchronous (blocking) pool of connections.
55   * <p>
56   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
57   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
58   * method on the {@link Future} object returned by the
59   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
60   * to complete.
61   *
62   * @param <T> the route type that represents the opposite endpoint of a pooled
63   *   connection.
64   * @param <C> the connection type.
65   * @param <E> the type of the pool entry containing a pooled connection.
66   * @since 4.2
67   */
68  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
69  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
70                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
71  
72      private final Lock lock;
73      private final Condition condition;
74      private final ConnFactory<T, C> connFactory;
75      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
76      private final Set<E> leased;
77      private final LinkedList<E> available;
78      private final LinkedList<Future<E>> pending;
79      private final Map<T, Integer> maxPerRoute;
80  
81      private volatile boolean isShutDown;
82      private volatile int defaultMaxPerRoute;
83      private volatile int maxTotal;
84      private volatile int validateAfterInactivity;
85  
86      public AbstractConnPool(
87              final ConnFactory<T, C> connFactory,
88              final int defaultMaxPerRoute,
89              final int maxTotal) {
90          super();
91          this.connFactory = Args.notNull(connFactory, "Connection factory");
92          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
93          this.maxTotal = Args.positive(maxTotal, "Max total value");
94          this.lock = new ReentrantLock();
95          this.condition = this.lock.newCondition();
96          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
97          this.leased = new HashSet<E>();
98          this.available = new LinkedList<E>();
99          this.pending = new LinkedList<Future<E>>();
100         this.maxPerRoute = new HashMap<T, Integer>();
101     }
102 
103     /**
104      * Creates a new entry for the given connection with the given route.
105      */
106     protected abstract E createEntry(T route, C conn);
107 
108     /**
109      * @since 4.3
110      */
111     protected void onLease(final E entry) {
112     }
113 
114     /**
115      * @since 4.3
116      */
117     protected void onRelease(final E entry) {
118     }
119 
120     /**
121      * @since 4.4
122      */
123     protected void onReuse(final E entry) {
124     }
125 
126     /**
127      * @since 4.4
128      */
129     protected boolean validate(final E entry) {
130         return true;
131     }
132 
133     public boolean isShutdown() {
134         return this.isShutDown;
135     }
136 
137     /**
138      * Shuts down the pool.
139      */
140     public void shutdown() throws IOException {
141         if (this.isShutDown) {
142             return ;
143         }
144         this.isShutDown = true;
145         this.lock.lock();
146         try {
147             for (final E entry: this.available) {
148                 entry.close();
149             }
150             for (final E entry: this.leased) {
151                 entry.close();
152             }
153             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
154                 pool.shutdown();
155             }
156             this.routeToPool.clear();
157             this.leased.clear();
158             this.available.clear();
159         } finally {
160             this.lock.unlock();
161         }
162     }
163 
164     private RouteSpecificPool<T, C, E> getPool(final T route) {
165         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
166         if (pool == null) {
167             pool = new RouteSpecificPool<T, C, E>(route) {
168 
169                 @Override
170                 protected E createEntry(final C conn) {
171                     return AbstractConnPool.this.createEntry(route, conn);
172                 }
173 
174             };
175             this.routeToPool.put(route, pool);
176         }
177         return pool;
178     }
179 
180     /**
181      * {@inheritDoc}
182      * <p>
183      * Please note that this class does not maintain its own pool of execution
184      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
185      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
186      * returned by this method in order for the lease operation to complete.
187      */
188     @Override
189     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
190         Args.notNull(route, "Route");
191         Asserts.check(!this.isShutDown, "Connection pool shut down");
192 
193         return new Future<E>() {
194 
195             private final AtomicBoolean cancelled = new AtomicBoolean(false);
196             private final AtomicBoolean done = new AtomicBoolean(false);
197             private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
198 
199             @Override
200             public boolean cancel(final boolean mayInterruptIfRunning) {
201                 if (cancelled.compareAndSet(false, true)) {
202                     done.set(true);
203                     lock.lock();
204                     try {
205                         condition.signalAll();
206                     } finally {
207                         lock.unlock();
208                     }
209                     if (callback != null) {
210                         callback.cancelled();
211                     }
212                     return true;
213                 } else {
214                     return false;
215                 }
216             }
217 
218             @Override
219             public boolean isCancelled() {
220                 return cancelled.get();
221             }
222 
223             @Override
224             public boolean isDone() {
225                 return done.get();
226             }
227 
228             @Override
229             public E get() throws InterruptedException, ExecutionException {
230                 try {
231                     return get(0L, TimeUnit.MILLISECONDS);
232                 } catch (final TimeoutException ex) {
233                     throw new ExecutionException(ex);
234                 }
235             }
236 
237             @Override
238             public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
239                 final E entry = entryRef.get();
240                 if (entry != null) {
241                     return entry;
242                 }
243                 synchronized (this) {
244                     try {
245                         for (;;) {
246                             final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
247                             if (validateAfterInactivity > 0)  {
248                                 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
249                                     if (!validate(leasedEntry)) {
250                                         leasedEntry.close();
251                                         release(leasedEntry, false);
252                                         continue;
253                                     }
254                                 }
255                             }
256                             entryRef.set(leasedEntry);
257                             done.set(true);
258                             onLease(leasedEntry);
259                             if (callback != null) {
260                                 callback.completed(leasedEntry);
261                             }
262                             return leasedEntry;
263                         }
264                     } catch (final IOException ex) {
265                         done.set(true);
266                         if (callback != null) {
267                             callback.failed(ex);
268                         }
269                         throw new ExecutionException(ex);
270                     }
271                 }
272             }
273 
274         };
275     }
276 
277     /**
278      * Attempts to lease a connection for the given route and with the given
279      * state from the pool.
280      * <p>
281      * Please note that this class does not maintain its own pool of execution
282      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
283      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
284      * returned by this method in order for the lease operation to complete.
285      *
286      * @param route route of the connection.
287      * @param state arbitrary object that represents a particular state
288      *  (usually a security principal or a unique token identifying
289      *  the user whose credentials have been used while establishing the connection).
290      *  May be {@code null}.
291      * @return future for a leased pool entry.
292      */
293     public Future<E> lease(final T route, final Object state) {
294         return lease(route, state, null);
295     }
296 
297     private E getPoolEntryBlocking(
298             final T route, final Object state,
299             final long timeout, final TimeUnit tunit,
300             final Future<E> future) throws IOException, InterruptedException, TimeoutException {
301 
302         Date deadline = null;
303         if (timeout > 0) {
304             deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
305         }
306         this.lock.lock();
307         try {
308             final RouteSpecificPool<T, C, E> pool = getPool(route);
309             E entry;
310             for (;;) {
311                 Asserts.check(!this.isShutDown, "Connection pool shut down");
312                 for (;;) {
313                     entry = pool.getFree(state);
314                     if (entry == null) {
315                         break;
316                     }
317                     if (entry.isExpired(System.currentTimeMillis())) {
318                         entry.close();
319                     }
320                     if (entry.isClosed()) {
321                         this.available.remove(entry);
322                         pool.free(entry, false);
323                     } else {
324                         break;
325                     }
326                 }
327                 if (entry != null) {
328                     this.available.remove(entry);
329                     this.leased.add(entry);
330                     onReuse(entry);
331                     return entry;
332                 }
333 
334                 // New connection is needed
335                 final int maxPerRoute = getMax(route);
336                 // Shrink the pool prior to allocating a new connection
337                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
338                 if (excess > 0) {
339                     for (int i = 0; i < excess; i++) {
340                         final E lastUsed = pool.getLastUsed();
341                         if (lastUsed == null) {
342                             break;
343                         }
344                         lastUsed.close();
345                         this.available.remove(lastUsed);
346                         pool.remove(lastUsed);
347                     }
348                 }
349 
350                 if (pool.getAllocatedCount() < maxPerRoute) {
351                     final int totalUsed = this.leased.size();
352                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
353                     if (freeCapacity > 0) {
354                         final int totalAvailable = this.available.size();
355                         if (totalAvailable > freeCapacity - 1) {
356                             if (!this.available.isEmpty()) {
357                                 final E lastUsed = this.available.removeLast();
358                                 lastUsed.close();
359                                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
360                                 otherpool.remove(lastUsed);
361                             }
362                         }
363                         final C conn = this.connFactory.create(route);
364                         entry = pool.add(conn);
365                         this.leased.add(entry);
366                         return entry;
367                     }
368                 }
369 
370                 boolean success = false;
371                 try {
372                     if (future.isCancelled()) {
373                         throw new InterruptedException("Operation interrupted");
374                     }
375                     pool.queue(future);
376                     this.pending.add(future);
377                     if (deadline != null) {
378                         success = this.condition.awaitUntil(deadline);
379                     } else {
380                         this.condition.await();
381                         success = true;
382                     }
383                     if (future.isCancelled()) {
384                         throw new InterruptedException("Operation interrupted");
385                     }
386                 } finally {
387                     // In case of 'success', we were woken up by the
388                     // connection pool and should now have a connection
389                     // waiting for us, or else we're shutting down.
390                     // Just continue in the loop, both cases are checked.
391                     pool.unqueue(future);
392                     this.pending.remove(future);
393                 }
394                 // check for spurious wakeup vs. timeout
395                 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
396                     break;
397                 }
398             }
399             throw new TimeoutException("Timeout waiting for connection");
400         } finally {
401             this.lock.unlock();
402         }
403     }
404 
405     @Override
406     public void release(final E entry, final boolean reusable) {
407         this.lock.lock();
408         try {
409             if (this.leased.remove(entry)) {
410                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
411                 pool.free(entry, reusable);
412                 if (reusable && !this.isShutDown) {
413                     this.available.addFirst(entry);
414                 } else {
415                     entry.close();
416                 }
417                 onRelease(entry);
418                 Future<E> future = pool.nextPending();
419                 if (future != null) {
420                     this.pending.remove(future);
421                 } else {
422                     future = this.pending.poll();
423                 }
424                 if (future != null) {
425                     this.condition.signalAll();
426                 }
427             }
428         } finally {
429             this.lock.unlock();
430         }
431     }
432 
433     private int getMax(final T route) {
434         final Integer v = this.maxPerRoute.get(route);
435         if (v != null) {
436             return v.intValue();
437         } else {
438             return this.defaultMaxPerRoute;
439         }
440     }
441 
442     @Override
443     public void setMaxTotal(final int max) {
444         Args.positive(max, "Max value");
445         this.lock.lock();
446         try {
447             this.maxTotal = max;
448         } finally {
449             this.lock.unlock();
450         }
451     }
452 
453     @Override
454     public int getMaxTotal() {
455         this.lock.lock();
456         try {
457             return this.maxTotal;
458         } finally {
459             this.lock.unlock();
460         }
461     }
462 
463     @Override
464     public void setDefaultMaxPerRoute(final int max) {
465         Args.positive(max, "Max per route value");
466         this.lock.lock();
467         try {
468             this.defaultMaxPerRoute = max;
469         } finally {
470             this.lock.unlock();
471         }
472     }
473 
474     @Override
475     public int getDefaultMaxPerRoute() {
476         this.lock.lock();
477         try {
478             return this.defaultMaxPerRoute;
479         } finally {
480             this.lock.unlock();
481         }
482     }
483 
484     @Override
485     public void setMaxPerRoute(final T route, final int max) {
486         Args.notNull(route, "Route");
487         this.lock.lock();
488         try {
489             if (max > -1) {
490                 this.maxPerRoute.put(route, Integer.valueOf(max));
491             } else {
492                 this.maxPerRoute.remove(route);
493             }
494         } finally {
495             this.lock.unlock();
496         }
497     }
498 
499     @Override
500     public int getMaxPerRoute(final T route) {
501         Args.notNull(route, "Route");
502         this.lock.lock();
503         try {
504             return getMax(route);
505         } finally {
506             this.lock.unlock();
507         }
508     }
509 
510     @Override
511     public PoolStats getTotalStats() {
512         this.lock.lock();
513         try {
514             return new PoolStats(
515                     this.leased.size(),
516                     this.pending.size(),
517                     this.available.size(),
518                     this.maxTotal);
519         } finally {
520             this.lock.unlock();
521         }
522     }
523 
524     @Override
525     public PoolStats getStats(final T route) {
526         Args.notNull(route, "Route");
527         this.lock.lock();
528         try {
529             final RouteSpecificPool<T, C, E> pool = getPool(route);
530             return new PoolStats(
531                     pool.getLeasedCount(),
532                     pool.getPendingCount(),
533                     pool.getAvailableCount(),
534                     getMax(route));
535         } finally {
536             this.lock.unlock();
537         }
538     }
539 
540     /**
541      * Returns snapshot of all knows routes
542      * @return the set of routes
543      *
544      * @since 4.4
545      */
546     public Set<T> getRoutes() {
547         this.lock.lock();
548         try {
549             return new HashSet<T>(routeToPool.keySet());
550         } finally {
551             this.lock.unlock();
552         }
553     }
554 
555     /**
556      * Enumerates all available connections.
557      *
558      * @since 4.3
559      */
560     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
561         this.lock.lock();
562         try {
563             final Iterator<E> it = this.available.iterator();
564             while (it.hasNext()) {
565                 final E entry = it.next();
566                 callback.process(entry);
567                 if (entry.isClosed()) {
568                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
569                     pool.remove(entry);
570                     it.remove();
571                 }
572             }
573             purgePoolMap();
574         } finally {
575             this.lock.unlock();
576         }
577     }
578 
579     /**
580      * Enumerates all leased connections.
581      *
582      * @since 4.3
583      */
584     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
585         this.lock.lock();
586         try {
587             final Iterator<E> it = this.leased.iterator();
588             while (it.hasNext()) {
589                 final E entry = it.next();
590                 callback.process(entry);
591             }
592         } finally {
593             this.lock.unlock();
594         }
595     }
596 
597     private void purgePoolMap() {
598         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
599         while (it.hasNext()) {
600             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
601             final RouteSpecificPool<T, C, E> pool = entry.getValue();
602             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
603                 it.remove();
604             }
605         }
606     }
607 
608     /**
609      * Closes connections that have been idle longer than the given period
610      * of time and evicts them from the pool.
611      *
612      * @param idletime maximum idle time.
613      * @param tunit time unit.
614      */
615     public void closeIdle(final long idletime, final TimeUnit tunit) {
616         Args.notNull(tunit, "Time unit");
617         long time = tunit.toMillis(idletime);
618         if (time < 0) {
619             time = 0;
620         }
621         final long deadline = System.currentTimeMillis() - time;
622         enumAvailable(new PoolEntryCallback<T, C>() {
623 
624             @Override
625             public void process(final PoolEntry<T, C> entry) {
626                 if (entry.getUpdated() <= deadline) {
627                     entry.close();
628                 }
629             }
630 
631         });
632     }
633 
634     /**
635      * Closes expired connections and evicts them from the pool.
636      */
637     public void closeExpired() {
638         final long now = System.currentTimeMillis();
639         enumAvailable(new PoolEntryCallback<T, C>() {
640 
641             @Override
642             public void process(final PoolEntry<T, C> entry) {
643                 if (entry.isExpired(now)) {
644                     entry.close();
645                 }
646             }
647 
648         });
649     }
650 
651     /**
652      * @return the number of milliseconds
653      * @since 4.4
654      */
655     public int getValidateAfterInactivity() {
656         return this.validateAfterInactivity;
657     }
658 
659     /**
660      * @param ms the number of milliseconds
661      * @since 4.4
662      */
663     public void setValidateAfterInactivity(final int ms) {
664         this.validateAfterInactivity = ms;
665     }
666 
667     @Override
668     public String toString() {
669         final StringBuilder buffer = new StringBuilder();
670         buffer.append("[leased: ");
671         buffer.append(this.leased);
672         buffer.append("][available: ");
673         buffer.append(this.available);
674         buffer.append("][pending: ");
675         buffer.append(this.pending);
676         buffer.append("]");
677         return buffer.toString();
678     }
679 
680 }