View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.hc.core5.annotation.Contract;
44  import org.apache.hc.core5.annotation.ThreadingBehavior;
45  import org.apache.hc.core5.concurrent.BasicFuture;
46  import org.apache.hc.core5.concurrent.FutureCallback;
47  import org.apache.hc.core5.function.Callback;
48  import org.apache.hc.core5.io.ModalCloseable;
49  import org.apache.hc.core5.io.CloseMode;
50  import org.apache.hc.core5.util.Args;
51  import org.apache.hc.core5.util.Asserts;
52  import org.apache.hc.core5.util.LangUtils;
53  import org.apache.hc.core5.util.TimeValue;
54  import org.apache.hc.core5.util.Timeout;
55  
56  /**
57   * Connection pool with strict connection limit guarantees.
58   *
59   * @param <T> route
60   * @param <C> connection object
61   *
62   * @since 4.2
63   */
64  @Contract(threading = ThreadingBehavior.SAFE)
65  public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
66  
67      private final TimeValue timeToLive;
68      private final ConnPoolListener<T> connPoolListener;
69      private final PoolReusePolicy policy;
70      private final Map<T, PerRoutePool<T, C>> routeToPool;
71      private final LinkedList<LeaseRequest<T, C>> leasingRequests;
72      private final Set<PoolEntry<T, C>> leased;
73      private final LinkedList<PoolEntry<T, C>> available;
74      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
75      private final Map<T, Integer> maxPerRoute;
76      private final Lock lock;
77      private final AtomicBoolean isShutDown;
78  
79      private volatile int defaultMaxPerRoute;
80      private volatile int maxTotal;
81  
82      /**
83       * @since 5.0
84       */
85      public StrictConnPool(
86              final int defaultMaxPerRoute,
87              final int maxTotal,
88              final TimeValue timeToLive,
89              final PoolReusePolicy policy,
90              final ConnPoolListener<T> connPoolListener) {
91          super();
92          Args.positive(defaultMaxPerRoute, "Max per route value");
93          Args.positive(maxTotal, "Max total value");
94          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
95          this.connPoolListener = connPoolListener;
96          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
97          this.routeToPool = new HashMap<>();
98          this.leasingRequests = new LinkedList<>();
99          this.leased = new HashSet<>();
100         this.available = new LinkedList<>();
101         this.completedRequests = new ConcurrentLinkedQueue<>();
102         this.maxPerRoute = new HashMap<>();
103         this.lock = new ReentrantLock();
104         this.isShutDown = new AtomicBoolean(false);
105         this.defaultMaxPerRoute = defaultMaxPerRoute;
106         this.maxTotal = maxTotal;
107     }
108 
109     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
110         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECONDS, PoolReusePolicy.LIFO, null);
111     }
112 
113     public boolean isShutdown() {
114         return this.isShutDown.get();
115     }
116 
117     @Override
118     public void close(final CloseMode closeMode) {
119         if (this.isShutDown.compareAndSet(false, true)) {
120             fireCallbacks();
121             this.lock.lock();
122             try {
123                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
124                     pool.shutdown(closeMode);
125                 }
126                 this.routeToPool.clear();
127                 this.leased.clear();
128                 this.available.clear();
129                 this.leasingRequests.clear();
130             } finally {
131                 this.lock.unlock();
132             }
133         }
134     }
135 
136     @Override
137     public void close() {
138         close(CloseMode.GRACEFUL);
139     }
140 
141     private PerRoutePool<T, C> getPool(final T route) {
142         PerRoutePool<T, C> pool = this.routeToPool.get(route);
143         if (pool == null) {
144             pool = new PerRoutePool<>(route);
145             this.routeToPool.put(route, pool);
146         }
147         return pool;
148     }
149 
150     @Override
151     public Future<PoolEntry<T, C>> lease(
152             final T route, final Object state,
153             final Timeout requestTimeout,
154             final FutureCallback<PoolEntry<T, C>> callback) {
155         Args.notNull(route, "Route");
156         Args.notNull(requestTimeout, "Request timeout");
157         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
158         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
159         this.lock.lock();
160         try {
161             final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
162             final boolean completed = processPendingRequest(request);
163             if (!request.isDone() && !completed) {
164                 this.leasingRequests.add(request);
165             }
166             if (request.isDone()) {
167                 this.completedRequests.add(request);
168             }
169         } finally {
170             this.lock.unlock();
171         }
172         fireCallbacks();
173         return future;
174     }
175 
176     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
177         return lease(route, state, Timeout.DISABLED, null);
178     }
179 
180     @Override
181     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
182         if (entry == null) {
183             return;
184         }
185         if (this.isShutDown.get()) {
186             return;
187         }
188         if (!reusable) {
189             entry.discardConnection(CloseMode.GRACEFUL);
190         }
191         this.lock.lock();
192         try {
193             if (this.leased.remove(entry)) {
194                 if (this.connPoolListener != null) {
195                     this.connPoolListener.onRelease(entry.getRoute(), this);
196                 }
197                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
198                 final boolean keepAlive = entry.hasConnection() && reusable;
199                 pool.free(entry, keepAlive);
200                 if (keepAlive) {
201                     switch (policy) {
202                         case LIFO:
203                             this.available.addFirst(entry);
204                             break;
205                         case FIFO:
206                             this.available.addLast(entry);
207                             break;
208                         default:
209                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
210                     }
211                 } else {
212                     entry.discardConnection(CloseMode.GRACEFUL);
213                 }
214                 processNextPendingRequest();
215             } else {
216                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
217             }
218         } finally {
219             this.lock.unlock();
220         }
221         fireCallbacks();
222     }
223 
224     private void processPendingRequests() {
225         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
226         while (it.hasNext()) {
227             final LeaseRequest<T, C> request = it.next();
228             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
229             if (future.isCancelled()) {
230                 it.remove();
231                 continue;
232             }
233             final boolean completed = processPendingRequest(request);
234             if (request.isDone() || completed) {
235                 it.remove();
236             }
237             if (request.isDone()) {
238                 this.completedRequests.add(request);
239             }
240         }
241     }
242 
243     private void processNextPendingRequest() {
244         final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
245         while (it.hasNext()) {
246             final LeaseRequest<T, C> request = it.next();
247             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
248             if (future.isCancelled()) {
249                 it.remove();
250                 continue;
251             }
252             final boolean completed = processPendingRequest(request);
253             if (request.isDone() || completed) {
254                 it.remove();
255             }
256             if (request.isDone()) {
257                 this.completedRequests.add(request);
258             }
259             if (completed) {
260                 return;
261             }
262         }
263     }
264 
265     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
266         final T route = request.getRoute();
267         final Object state = request.getState();
268         final long deadline = request.getDeadline();
269 
270         final long now = System.currentTimeMillis();
271         if (now > deadline) {
272             request.failed(new TimeoutException());
273             return false;
274         }
275 
276         final PerRoutePool<T, C> pool = getPool(route);
277         PoolEntry<T, C> entry;
278         for (;;) {
279             entry = pool.getFree(state);
280             if (entry == null) {
281                 break;
282             }
283             if (entry.getExpiry() < System.currentTimeMillis()) {
284                 entry.discardConnection(CloseMode.GRACEFUL);
285                 this.available.remove(entry);
286                 pool.free(entry, false);
287             } else {
288                 break;
289             }
290         }
291         if (entry != null) {
292             this.available.remove(entry);
293             this.leased.add(entry);
294             request.completed(entry);
295             if (this.connPoolListener != null) {
296                 this.connPoolListener.onLease(entry.getRoute(), this);
297             }
298             return true;
299         }
300 
301         // New connection is needed
302         final int maxPerRoute = getMax(route);
303         // Shrink the pool prior to allocating a new connection
304         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
305         if (excess > 0) {
306             for (int i = 0; i < excess; i++) {
307                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
308                 if (lastUsed == null) {
309                     break;
310                 }
311                 lastUsed.discardConnection(CloseMode.GRACEFUL);
312                 this.available.remove(lastUsed);
313                 pool.remove(lastUsed);
314             }
315         }
316 
317         if (pool.getAllocatedCount() < maxPerRoute) {
318             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
319             if (freeCapacity == 0) {
320                 return false;
321             }
322             final int totalAvailable = this.available.size();
323             if (totalAvailable > freeCapacity - 1) {
324                 if (!this.available.isEmpty()) {
325                     final PoolEntry<T, C> lastUsed = this.available.removeLast();
326                     lastUsed.discardConnection(CloseMode.GRACEFUL);
327                     final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
328                     otherpool.remove(lastUsed);
329                 }
330             }
331 
332             entry = pool.createEntry(this.timeToLive);
333             this.leased.add(entry);
334             request.completed(entry);
335             if (this.connPoolListener != null) {
336                 this.connPoolListener.onLease(entry.getRoute(), this);
337             }
338             return true;
339         }
340         return false;
341     }
342 
343     private void fireCallbacks() {
344         LeaseRequest<T, C> request;
345         while ((request = this.completedRequests.poll()) != null) {
346             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
347             final Exception ex = request.getException();
348             final PoolEntry<T, C> result = request.getResult();
349             boolean successfullyCompleted = false;
350             if (ex != null) {
351                 future.failed(ex);
352             } else if (result != null) {
353                 if (future.completed(result)) {
354                     successfullyCompleted = true;
355                 }
356             } else {
357                 future.cancel();
358             }
359             if (!successfullyCompleted) {
360                 release(result, true);
361             }
362         }
363     }
364 
365     public void validatePendingRequests() {
366         this.lock.lock();
367         try {
368             final long now = System.currentTimeMillis();
369             final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
370             while (it.hasNext()) {
371                 final LeaseRequest<T, C> request = it.next();
372                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
373                 if (future.isCancelled() && !request.isDone()) {
374                     it.remove();
375                 } else {
376                     final long deadline = request.getDeadline();
377                     if (now > deadline) {
378                         request.failed(new TimeoutException());
379                     }
380                     if (request.isDone()) {
381                         it.remove();
382                         this.completedRequests.add(request);
383                     }
384                 }
385             }
386         } finally {
387             this.lock.unlock();
388         }
389         fireCallbacks();
390     }
391 
392     private int getMax(final T route) {
393         final Integer v = this.maxPerRoute.get(route);
394         if (v != null) {
395             return v.intValue();
396         }
397         return this.defaultMaxPerRoute;
398     }
399 
400     @Override
401     public void setMaxTotal(final int max) {
402         Args.positive(max, "Max value");
403         this.lock.lock();
404         try {
405             this.maxTotal = max;
406         } finally {
407             this.lock.unlock();
408         }
409     }
410 
411     @Override
412     public int getMaxTotal() {
413         this.lock.lock();
414         try {
415             return this.maxTotal;
416         } finally {
417             this.lock.unlock();
418         }
419     }
420 
421     @Override
422     public void setDefaultMaxPerRoute(final int max) {
423         Args.positive(max, "Max value");
424         this.lock.lock();
425         try {
426             this.defaultMaxPerRoute = max;
427         } finally {
428             this.lock.unlock();
429         }
430     }
431 
432     @Override
433     public int getDefaultMaxPerRoute() {
434         this.lock.lock();
435         try {
436             return this.defaultMaxPerRoute;
437         } finally {
438             this.lock.unlock();
439         }
440     }
441 
442     @Override
443     public void setMaxPerRoute(final T route, final int max) {
444         Args.notNull(route, "Route");
445         this.lock.lock();
446         try {
447             if (max > -1) {
448                 this.maxPerRoute.put(route, Integer.valueOf(max));
449             } else {
450                 this.maxPerRoute.remove(route);
451             }
452         } finally {
453             this.lock.unlock();
454         }
455     }
456 
457     @Override
458     public int getMaxPerRoute(final T route) {
459         Args.notNull(route, "Route");
460         this.lock.lock();
461         try {
462             return getMax(route);
463         } finally {
464             this.lock.unlock();
465         }
466     }
467 
468     @Override
469     public PoolStats getTotalStats() {
470         this.lock.lock();
471         try {
472             return new PoolStats(
473                     this.leased.size(),
474                     this.leasingRequests.size(),
475                     this.available.size(),
476                     this.maxTotal);
477         } finally {
478             this.lock.unlock();
479         }
480     }
481 
482     @Override
483     public PoolStats getStats(final T route) {
484         Args.notNull(route, "Route");
485         this.lock.lock();
486         try {
487             final PerRoutePool<T, C> pool = getPool(route);
488             int pendingCount = 0;
489             for (final LeaseRequest<T, C> request: leasingRequests) {
490                 if (LangUtils.equals(route, request.getRoute())) {
491                     pendingCount++;
492                 }
493             }
494             return new PoolStats(
495                     pool.getLeasedCount(),
496                     pendingCount,
497                     pool.getAvailableCount(),
498                     getMax(route));
499         } finally {
500             this.lock.unlock();
501         }
502     }
503 
504     /**
505      * Returns snapshot of all knows routes
506      *
507      * @since 4.4
508      */
509     @Override
510     public Set<T> getRoutes() {
511         this.lock.lock();
512         try {
513             return new HashSet<>(routeToPool.keySet());
514         } finally {
515             this.lock.unlock();
516         }
517     }
518 
519     /**
520      * Enumerates all available connections.
521      *
522      * @since 4.3
523      */
524     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
525         this.lock.lock();
526         try {
527             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
528             while (it.hasNext()) {
529                 final PoolEntry<T, C> entry = it.next();
530                 callback.execute(entry);
531                 if (!entry.hasConnection()) {
532                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
533                     pool.remove(entry);
534                     it.remove();
535                 }
536             }
537             processPendingRequests();
538             purgePoolMap();
539         } finally {
540             this.lock.unlock();
541         }
542     }
543 
544     /**
545      * Enumerates all leased connections.
546      *
547      * @since 4.3
548      */
549     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
550         this.lock.lock();
551         try {
552             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
553             while (it.hasNext()) {
554                 final PoolEntry<T, C> entry = it.next();
555                 callback.execute(entry);
556             }
557             processPendingRequests();
558         } finally {
559             this.lock.unlock();
560         }
561     }
562 
563     private void purgePoolMap() {
564         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
565         while (it.hasNext()) {
566             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
567             final PerRoutePool<T, C> pool = entry.getValue();
568             if (pool.getAllocatedCount() == 0) {
569                 it.remove();
570             }
571         }
572     }
573 
574     @Override
575     public void closeIdle(final TimeValue idleTime) {
576         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMillis() : 0);
577         enumAvailable(new Callback<PoolEntry<T, C>>() {
578 
579             @Override
580             public void execute(final PoolEntry<T, C> entry) {
581                 if (entry.getUpdated() <= deadline) {
582                     entry.discardConnection(CloseMode.GRACEFUL);
583                 }
584             }
585 
586         });
587     }
588 
589     @Override
590     public void closeExpired() {
591         final long now = System.currentTimeMillis();
592         enumAvailable(new Callback<PoolEntry<T, C>>() {
593 
594             @Override
595             public void execute(final PoolEntry<T, C> entry) {
596                 if (entry.getExpiry() < now) {
597                     entry.discardConnection(CloseMode.GRACEFUL);
598                 }
599             }
600 
601         });
602     }
603 
604     @Override
605     public String toString() {
606         final StringBuilder buffer = new StringBuilder();
607         buffer.append("[leased: ");
608         buffer.append(this.leased.size());
609         buffer.append("][available: ");
610         buffer.append(this.available.size());
611         buffer.append("][pending: ");
612         buffer.append(this.leasingRequests.size());
613         buffer.append("]");
614         return buffer.toString();
615     }
616 
617 
618     static class LeaseRequest<T, C extends ModalCloseable> {
619 
620         private final T route;
621         private final Object state;
622         private final long deadline;
623         private final BasicFuture<PoolEntry<T, C>> future;
624         private final AtomicBoolean completed;
625         private volatile PoolEntry<T, C> result;
626         private volatile Exception ex;
627 
628         /**
629          * Constructor
630          *
631          * @param route route
632          * @param state state
633          * @param requestTimeout timeout to wait in a request queue until kicked off
634          * @param future future callback
635          */
636         public LeaseRequest(
637                 final T route,
638                 final Object state,
639                 final Timeout requestTimeout,
640                 final BasicFuture<PoolEntry<T, C>> future) {
641             super();
642             this.route = route;
643             this.state = state;
644             this.deadline = Timeout.calculateDeadline(System.currentTimeMillis(), requestTimeout);
645             this.future = future;
646             this.completed = new AtomicBoolean(false);
647         }
648 
649         public T getRoute() {
650             return this.route;
651         }
652 
653         public Object getState() {
654             return this.state;
655         }
656 
657         public long getDeadline() {
658             return this.deadline;
659         }
660 
661         public boolean isDone() {
662             return this.completed.get();
663         }
664 
665         public void failed(final Exception ex) {
666             if (this.completed.compareAndSet(false, true)) {
667                 this.ex = ex;
668             }
669         }
670 
671         public void completed(final PoolEntry<T, C> result) {
672             if (this.completed.compareAndSet(false, true)) {
673                 this.result = result;
674             }
675         }
676 
677         public BasicFuture<PoolEntry<T, C>> getFuture() {
678             return this.future;
679         }
680 
681         public PoolEntry<T, C> getResult() {
682             return this.result;
683         }
684 
685         public Exception getException() {
686             return this.ex;
687         }
688 
689         @Override
690         public String toString() {
691             final StringBuilder buffer = new StringBuilder();
692             buffer.append("[");
693             buffer.append(this.route);
694             buffer.append("][");
695             buffer.append(this.state);
696             buffer.append("]");
697             return buffer.toString();
698         }
699 
700     }
701 
702     static class PerRoutePool<T, C extends ModalCloseable> {
703 
704         private final T route;
705         private final Set<PoolEntry<T, C>> leased;
706         private final LinkedList<PoolEntry<T, C>> available;
707 
708         PerRoutePool(final T route) {
709             super();
710             this.route = route;
711             this.leased = new HashSet<>();
712             this.available = new LinkedList<>();
713         }
714 
715         public final T getRoute() {
716             return route;
717         }
718 
719         public int getLeasedCount() {
720             return this.leased.size();
721         }
722 
723         public int getAvailableCount() {
724             return this.available.size();
725         }
726 
727         public int getAllocatedCount() {
728             return this.available.size() + this.leased.size();
729         }
730 
731         public PoolEntry<T, C> getFree(final Object state) {
732             if (!this.available.isEmpty()) {
733                 if (state != null) {
734                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
735                     while (it.hasNext()) {
736                         final PoolEntry<T, C> entry = it.next();
737                         if (state.equals(entry.getState())) {
738                             it.remove();
739                             this.leased.add(entry);
740                             return entry;
741                         }
742                     }
743                 }
744                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
745                 while (it.hasNext()) {
746                     final PoolEntry<T, C> entry = it.next();
747                     if (entry.getState() == null) {
748                         it.remove();
749                         this.leased.add(entry);
750                         return entry;
751                     }
752                 }
753             }
754             return null;
755         }
756 
757         public PoolEntry<T, C> getLastUsed() {
758             return this.available.peekLast();
759         }
760 
761         public boolean remove(final PoolEntry<T, C> entry) {
762             return this.available.remove(entry) || this.leased.remove(entry);
763         }
764 
765         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
766             final boolean found = this.leased.remove(entry);
767             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
768             if (reusable) {
769                 this.available.addFirst(entry);
770             }
771         }
772 
773         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
774             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive);
775             this.leased.add(entry);
776             return entry;
777         }
778 
779         public void shutdown(final CloseMode closeMode) {
780             PoolEntry<T, C> availableEntry;
781             while ((availableEntry = available.poll()) != null) {
782                 availableEntry.discardConnection(closeMode);
783             }
784             for (final PoolEntry<T, C> entry: this.leased) {
785                 entry.discardConnection(closeMode);
786             }
787             this.leased.clear();
788         }
789 
790         @Override
791         public String toString() {
792             final StringBuilder buffer = new StringBuilder();
793             buffer.append("[route: ");
794             buffer.append(this.route);
795             buffer.append("][leased: ");
796             buffer.append(this.leased.size());
797             buffer.append("][available: ");
798             buffer.append(this.available.size());
799             buffer.append("]");
800             return buffer.toString();
801         }
802 
803     }
804 }