View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.hc.core5.annotation.Contract;
44  import org.apache.hc.core5.annotation.ThreadingBehavior;
45  import org.apache.hc.core5.concurrent.BasicFuture;
46  import org.apache.hc.core5.concurrent.FutureCallback;
47  import org.apache.hc.core5.function.Callback;
48  import org.apache.hc.core5.io.GracefullyCloseable;
49  import org.apache.hc.core5.io.ShutdownType;
50  import org.apache.hc.core5.util.Args;
51  import org.apache.hc.core5.util.Asserts;
52  import org.apache.hc.core5.util.LangUtils;
53  import org.apache.hc.core5.util.TimeValue;
54  import org.apache.hc.core5.util.Timeout;
55  
56  /**
57   * Connection pool with strict connection limit guarantees.
58   *
59   * @param <T> route
60   * @param <C> connection object
61   *
62   * @since 4.2
63   */
64  @Contract(threading = ThreadingBehavior.SAFE)
65  public class StrictConnPool<T, C extends GracefullyCloseable> implements ManagedConnPool<T, C> {
66  
67      private final TimeValue timeToLive;
68      private final ConnPoolListener<T> connPoolListener;
69      private final PoolReusePolicy policy;
70      private final Map<T, PerRoutePool<T, C>> routeToPool;
71      private final LinkedList<LeaseRequest<T, C>> leasingRequests;
72      private final Set<PoolEntry<T, C>> leased;
73      private final LinkedList<PoolEntry<T, C>> available;
74      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
75      private final Map<T, Integer> maxPerRoute;
76      private final Lock lock;
77      private final AtomicBoolean isShutDown;
78  
79      private volatile int defaultMaxPerRoute;
80      private volatile int maxTotal;
81  
82      /**
83       * @since 5.0
84       */
85      public StrictConnPool(
86              final int defaultMaxPerRoute,
87              final int maxTotal,
88              final TimeValue timeToLive,
89              final PoolReusePolicy policy,
90              final ConnPoolListener<T> connPoolListener) {
91          super();
92          Args.positive(defaultMaxPerRoute, "Max per route value");
93          Args.positive(maxTotal, "Max total value");
94          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
95          this.connPoolListener = connPoolListener;
96          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
97          this.routeToPool = new HashMap<>();
98          this.leasingRequests = new LinkedList<>();
99          this.leased = new HashSet<>();
100         this.available = new LinkedList<>();
101         this.completedRequests = new ConcurrentLinkedQueue<>();
102         this.maxPerRoute = new HashMap<>();
103         this.lock = new ReentrantLock();
104         this.isShutDown = new AtomicBoolean(false);
105         this.defaultMaxPerRoute = defaultMaxPerRoute;
106         this.maxTotal = maxTotal;
107     }
108 
109     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
110         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECONDS, PoolReusePolicy.LIFO, null);
111     }
112 
113     public boolean isShutdown() {
114         return this.isShutDown.get();
115     }
116 
117     @Override
118     public void shutdown(final ShutdownType shutdownType) {
119         if (this.isShutDown.compareAndSet(false, true)) {
120             fireCallbacks();
121             this.lock.lock();
122             try {
123                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
124                     pool.shutdown(shutdownType);
125                 }
126                 this.routeToPool.clear();
127                 this.leased.clear();
128                 this.available.clear();
129                 this.leasingRequests.clear();
130             } finally {
131                 this.lock.unlock();
132             }
133         }
134     }
135 
136     @Override
137     public void close() {
138         shutdown(ShutdownType.GRACEFUL);
139     }
140 
141     private PerRoutePool<T, C> getPool(final T route) {
142         PerRoutePool<T, C> pool = this.routeToPool.get(route);
143         if (pool == null) {
144             pool = new PerRoutePool<>(route);
145             this.routeToPool.put(route, pool);
146         }
147         return pool;
148     }
149 
150     @Override
151     public Future<PoolEntry<T, C>> lease(
152             final T route, final Object state,
153             final Timeout requestTimeout,
154             final FutureCallback<PoolEntry<T, C>> callback) {
155         Args.notNull(route, "Route");
156         Args.notNull(requestTimeout, "Request timeout");
157         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
158         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
159         this.lock.lock();
160         try {
161             final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
162             final boolean completed = processPendingRequest(request);
163             if (!request.isDone() && !completed) {
164                 this.leasingRequests.add(request);
165             }
166             if (request.isDone()) {
167                 this.completedRequests.add(request);
168             }
169         } finally {
170             this.lock.unlock();
171         }
172         fireCallbacks();
173         return future;
174     }
175 
176     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
177         return lease(route, state, Timeout.DISABLED, null);
178     }
179 
180     @Override
181     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
182         if (entry == null) {
183             return;
184         }
185         if (this.isShutDown.get()) {
186             return;
187         }
188         if (!reusable) {
189             entry.discardConnection(ShutdownType.GRACEFUL);
190         }
191         this.lock.lock();
192         try {
193             if (this.leased.remove(entry)) {
194                 if (this.connPoolListener != null) {
195                     this.connPoolListener.onRelease(entry.getRoute(), this);
196                 }
197                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
198                 final boolean keepAlive = entry.hasConnection() && reusable;
199                 pool.free(entry, keepAlive);
200                 if (keepAlive) {
201                     switch (policy) {
202                         case LIFO:
203                             this.available.addFirst(entry);
204                             break;
205                         case FIFO:
206                             this.available.addLast(entry);
207                             break;
208                         default:
209                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
210                     }
211                 } else {
212                     entry.discardConnection(ShutdownType.GRACEFUL);
213                 }
214                 processNextPendingRequest();
215             } else {
216                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
217             }
218         } finally {
219             this.lock.unlock();
220         }
221         fireCallbacks();
222     }
223 
224     private void processPendingRequests() {
225         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
226         while (it.hasNext()) {
227             final LeaseRequest<T, C> request = it.next();
228             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
229             if (future.isCancelled()) {
230                 it.remove();
231                 continue;
232             }
233             final boolean completed = processPendingRequest(request);
234             if (request.isDone() || completed) {
235                 it.remove();
236             }
237             if (request.isDone()) {
238                 this.completedRequests.add(request);
239             }
240         }
241     }
242 
243     private void processNextPendingRequest() {
244         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
245         while (it.hasNext()) {
246             final LeaseRequest<T, C> request = it.next();
247             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
248             if (future.isCancelled()) {
249                 it.remove();
250                 continue;
251             }
252             final boolean completed = processPendingRequest(request);
253             if (request.isDone() || completed) {
254                 it.remove();
255             }
256             if (request.isDone()) {
257                 this.completedRequests.add(request);
258             }
259             if (completed) {
260                 return;
261             }
262         }
263     }
264 
265     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
266         final T route = request.getRoute();
267         final Object state = request.getState();
268         final long deadline = request.getDeadline();
269 
270         final long now = System.currentTimeMillis();
271         if (now > deadline) {
272             request.failed(new TimeoutException());
273             return false;
274         }
275 
276         final PerRoutePool<T, C> pool = getPool(route);
277         PoolEntry<T, C> entry;
278         for (;;) {
279             entry = pool.getFree(state);
280             if (entry == null) {
281                 break;
282             }
283             if (entry.getExpiry() < System.currentTimeMillis()) {
284                 entry.discardConnection(ShutdownType.GRACEFUL);
285                 this.available.remove(entry);
286                 pool.free(entry, false);
287             } else {
288                 break;
289             }
290         }
291         if (entry != null) {
292             this.available.remove(entry);
293             this.leased.add(entry);
294             request.completed(entry);
295             if (this.connPoolListener != null) {
296                 this.connPoolListener.onLease(entry.getRoute(), this);
297             }
298             return true;
299         }
300 
301         // New connection is needed
302         final int maxPerRoute = getMax(route);
303         // Shrink the pool prior to allocating a new connection
304         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
305         if (excess > 0) {
306             for (int i = 0; i < excess; i++) {
307                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
308                 if (lastUsed == null) {
309                     break;
310                 }
311                 lastUsed.discardConnection(ShutdownType.GRACEFUL);
312                 this.available.remove(lastUsed);
313                 pool.remove(lastUsed);
314             }
315         }
316 
317         if (pool.getAllocatedCount() < maxPerRoute) {
318             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
319             if (freeCapacity == 0) {
320                 return false;
321             }
322             final int totalAvailable = this.available.size();
323             if (totalAvailable > freeCapacity - 1) {
324                 if (!this.available.isEmpty()) {
325                     final PoolEntry<T, C> lastUsed = this.available.removeLast();
326                     lastUsed.discardConnection(ShutdownType.GRACEFUL);
327                     final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
328                     otherpool.remove(lastUsed);
329                 }
330             }
331 
332             entry = pool.createEntry(this.timeToLive);
333             this.leased.add(entry);
334             request.completed(entry);
335             if (this.connPoolListener != null) {
336                 this.connPoolListener.onLease(entry.getRoute(), this);
337             }
338             return true;
339         }
340         return false;
341     }
342 
343     private void fireCallbacks() {
344         LeaseRequest<T, C> request;
345         while ((request = this.completedRequests.poll()) != null) {
346             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
347             final Exception ex = request.getException();
348             final PoolEntry<T, C> result = request.getResult();
349             boolean successfullyCompleted = false;
350             if (ex != null) {
351                 future.failed(ex);
352             } else if (result != null) {
353                 if (future.completed(result)) {
354                     successfullyCompleted = true;
355                 }
356             } else {
357                 future.cancel();
358             }
359             if (!successfullyCompleted) {
360                 release(result, true);
361             }
362         }
363     }
364 
365     public void validatePendingRequests() {
366         this.lock.lock();
367         try {
368             final long now = System.currentTimeMillis();
369             final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
370             while (it.hasNext()) {
371                 final LeaseRequest<T, C> request = it.next();
372                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
373                 if (future.isCancelled() && !request.isDone()) {
374                     it.remove();
375                 } else {
376                     final long deadline = request.getDeadline();
377                     if (now > deadline) {
378                         request.failed(new TimeoutException());
379                     }
380                     if (request.isDone()) {
381                         it.remove();
382                         this.completedRequests.add(request);
383                     }
384                 }
385             }
386         } finally {
387             this.lock.unlock();
388         }
389         fireCallbacks();
390     }
391 
392     private int getMax(final T route) {
393         final Integer v = this.maxPerRoute.get(route);
394         if (v != null) {
395             return v.intValue();
396         }
397         return this.defaultMaxPerRoute;
398     }
399 
400     @Override
401     public void setMaxTotal(final int max) {
402         Args.positive(max, "Max value");
403         this.lock.lock();
404         try {
405             this.maxTotal = max;
406         } finally {
407             this.lock.unlock();
408         }
409     }
410 
411     @Override
412     public int getMaxTotal() {
413         this.lock.lock();
414         try {
415             return this.maxTotal;
416         } finally {
417             this.lock.unlock();
418         }
419     }
420 
421     @Override
422     public void setDefaultMaxPerRoute(final int max) {
423         Args.positive(max, "Max value");
424         this.lock.lock();
425         try {
426             this.defaultMaxPerRoute = max;
427         } finally {
428             this.lock.unlock();
429         }
430     }
431 
432     @Override
433     public int getDefaultMaxPerRoute() {
434         this.lock.lock();
435         try {
436             return this.defaultMaxPerRoute;
437         } finally {
438             this.lock.unlock();
439         }
440     }
441 
442     @Override
443     public void setMaxPerRoute(final T route, final int max) {
444         Args.notNull(route, "Route");
445         Args.positive(max, "Max value");
446         this.lock.lock();
447         try {
448             this.maxPerRoute.put(route, Integer.valueOf(max));
449         } finally {
450             this.lock.unlock();
451         }
452     }
453 
454     @Override
455     public int getMaxPerRoute(final T route) {
456         Args.notNull(route, "Route");
457         this.lock.lock();
458         try {
459             return getMax(route);
460         } finally {
461             this.lock.unlock();
462         }
463     }
464 
465     @Override
466     public PoolStats getTotalStats() {
467         this.lock.lock();
468         try {
469             return new PoolStats(
470                     this.leased.size(),
471                     this.leasingRequests.size(),
472                     this.available.size(),
473                     this.maxTotal);
474         } finally {
475             this.lock.unlock();
476         }
477     }
478 
479     @Override
480     public PoolStats getStats(final T route) {
481         Args.notNull(route, "Route");
482         this.lock.lock();
483         try {
484             final PerRoutePool<T, C> pool = getPool(route);
485             int pendingCount = 0;
486             for (final LeaseRequest<T, C> request: leasingRequests) {
487                 if (LangUtils.equals(route, request.getRoute())) {
488                     pendingCount++;
489                 }
490             }
491             return new PoolStats(
492                     pool.getLeasedCount(),
493                     pendingCount,
494                     pool.getAvailableCount(),
495                     getMax(route));
496         } finally {
497             this.lock.unlock();
498         }
499     }
500 
501     /**
502      * Returns snapshot of all knows routes
503      *
504      * @since 4.4
505      */
506     @Override
507     public Set<T> getRoutes() {
508         this.lock.lock();
509         try {
510             return new HashSet<>(routeToPool.keySet());
511         } finally {
512             this.lock.unlock();
513         }
514     }
515 
516     /**
517      * Enumerates all available connections.
518      *
519      * @since 4.3
520      */
521     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
522         this.lock.lock();
523         try {
524             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
525             while (it.hasNext()) {
526                 final PoolEntry<T, C> entry = it.next();
527                 callback.execute(entry);
528                 if (!entry.hasConnection()) {
529                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
530                     pool.remove(entry);
531                     it.remove();
532                 }
533             }
534             processPendingRequests();
535             purgePoolMap();
536         } finally {
537             this.lock.unlock();
538         }
539     }
540 
541     /**
542      * Enumerates all leased connections.
543      *
544      * @since 4.3
545      */
546     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
547         this.lock.lock();
548         try {
549             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
550             while (it.hasNext()) {
551                 final PoolEntry<T, C> entry = it.next();
552                 callback.execute(entry);
553             }
554             processPendingRequests();
555         } finally {
556             this.lock.unlock();
557         }
558     }
559 
560     private void purgePoolMap() {
561         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
562         while (it.hasNext()) {
563             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
564             final PerRoutePool<T, C> pool = entry.getValue();
565             if (pool.getAllocatedCount() == 0) {
566                 it.remove();
567             }
568         }
569     }
570 
571     @Override
572     public void closeIdle(final TimeValue idleTime) {
573         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMillis() : 0);
574         enumAvailable(new Callback<PoolEntry<T, C>>() {
575 
576             @Override
577             public void execute(final PoolEntry<T, C> entry) {
578                 if (entry.getUpdated() <= deadline) {
579                     entry.discardConnection(ShutdownType.GRACEFUL);
580                 }
581             }
582 
583         });
584     }
585 
586     @Override
587     public void closeExpired() {
588         final long now = System.currentTimeMillis();
589         enumAvailable(new Callback<PoolEntry<T, C>>() {
590 
591             @Override
592             public void execute(final PoolEntry<T, C> entry) {
593                 if (entry.getExpiry() < now) {
594                     entry.discardConnection(ShutdownType.GRACEFUL);
595                 }
596             }
597 
598         });
599     }
600 
601     @Override
602     public String toString() {
603         final StringBuilder buffer = new StringBuilder();
604         buffer.append("[leased: ");
605         buffer.append(this.leased.size());
606         buffer.append("][available: ");
607         buffer.append(this.available.size());
608         buffer.append("][pending: ");
609         buffer.append(this.leasingRequests.size());
610         buffer.append("]");
611         return buffer.toString();
612     }
613 
614 
615     static class LeaseRequest<T, C extends GracefullyCloseable> {
616 
617         private final T route;
618         private final Object state;
619         private final long deadline;
620         private final BasicFuture<PoolEntry<T, C>> future;
621         private final AtomicBoolean completed;
622         private volatile PoolEntry<T, C> result;
623         private volatile Exception ex;
624 
625         /**
626          * Constructor
627          *
628          * @param route route
629          * @param state state
630          * @param requestTimeout timeout to wait in a request queue until kicked off
631          * @param future future callback
632          */
633         public LeaseRequest(
634                 final T route,
635                 final Object state,
636                 final Timeout requestTimeout,
637                 final BasicFuture<PoolEntry<T, C>> future) {
638             super();
639             this.route = route;
640             this.state = state;
641             this.deadline = Timeout.calculateDeadline(System.currentTimeMillis(), requestTimeout);
642             this.future = future;
643             this.completed = new AtomicBoolean(false);
644         }
645 
646         public T getRoute() {
647             return this.route;
648         }
649 
650         public Object getState() {
651             return this.state;
652         }
653 
654         public long getDeadline() {
655             return this.deadline;
656         }
657 
658         public boolean isDone() {
659             return this.completed.get();
660         }
661 
662         public void failed(final Exception ex) {
663             if (this.completed.compareAndSet(false, true)) {
664                 this.ex = ex;
665             }
666         }
667 
668         public void completed(final PoolEntry<T, C> result) {
669             if (this.completed.compareAndSet(false, true)) {
670                 this.result = result;
671             }
672         }
673 
674         public BasicFuture<PoolEntry<T, C>> getFuture() {
675             return this.future;
676         }
677 
678         public PoolEntry<T, C> getResult() {
679             return this.result;
680         }
681 
682         public Exception getException() {
683             return this.ex;
684         }
685 
686         @Override
687         public String toString() {
688             final StringBuilder buffer = new StringBuilder();
689             buffer.append("[");
690             buffer.append(this.route);
691             buffer.append("][");
692             buffer.append(this.state);
693             buffer.append("]");
694             return buffer.toString();
695         }
696 
697     }
698 
699     static class PerRoutePool<T, C extends GracefullyCloseable> {
700 
701         private final T route;
702         private final Set<PoolEntry<T, C>> leased;
703         private final LinkedList<PoolEntry<T, C>> available;
704 
705         PerRoutePool(final T route) {
706             super();
707             this.route = route;
708             this.leased = new HashSet<>();
709             this.available = new LinkedList<>();
710         }
711 
712         public final T getRoute() {
713             return route;
714         }
715 
716         public int getLeasedCount() {
717             return this.leased.size();
718         }
719 
720         public int getAvailableCount() {
721             return this.available.size();
722         }
723 
724         public int getAllocatedCount() {
725             return this.available.size() + this.leased.size();
726         }
727 
728         public PoolEntry<T, C> getFree(final Object state) {
729             if (!this.available.isEmpty()) {
730                 if (state != null) {
731                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
732                     while (it.hasNext()) {
733                         final PoolEntry<T, C> entry = it.next();
734                         if (state.equals(entry.getState())) {
735                             it.remove();
736                             this.leased.add(entry);
737                             return entry;
738                         }
739                     }
740                 }
741                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
742                 while (it.hasNext()) {
743                     final PoolEntry<T, C> entry = it.next();
744                     if (entry.getState() == null) {
745                         it.remove();
746                         this.leased.add(entry);
747                         return entry;
748                     }
749                 }
750             }
751             return null;
752         }
753 
754         public PoolEntry<T, C> getLastUsed() {
755             return this.available.peekLast();
756         }
757 
758         public boolean remove(final PoolEntry<T, C> entry) {
759             return this.available.remove(entry) || this.leased.remove(entry);
760         }
761 
762         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
763             final boolean found = this.leased.remove(entry);
764             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
765             if (reusable) {
766                 this.available.addFirst(entry);
767             }
768         }
769 
770         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
771             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive);
772             this.leased.add(entry);
773             return entry;
774         }
775 
776         public void shutdown(final ShutdownType shutdownType) {
777             PoolEntry<T, C> availableEntry;
778             while ((availableEntry = available.poll()) != null) {
779                 availableEntry.discardConnection(shutdownType);
780             }
781             for (final PoolEntry<T, C> entry: this.leased) {
782                 entry.discardConnection(shutdownType);
783             }
784             this.leased.clear();
785         }
786 
787         @Override
788         public String toString() {
789             final StringBuilder buffer = new StringBuilder();
790             buffer.append("[route: ");
791             buffer.append(this.route);
792             buffer.append("][leased: ");
793             buffer.append(this.leased.size());
794             buffer.append("][available: ");
795             buffer.append(this.available.size());
796             buffer.append("]");
797             return buffer.toString();
798         }
799 
800     }
801 }