View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.Deque;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedDeque;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeoutException;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.hc.core5.annotation.Contract;
41  import org.apache.hc.core5.annotation.Experimental;
42  import org.apache.hc.core5.annotation.ThreadingBehavior;
43  import org.apache.hc.core5.concurrent.BasicFuture;
44  import org.apache.hc.core5.concurrent.Cancellable;
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Callback;
47  import org.apache.hc.core5.io.ModalCloseable;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.util.Args;
50  import org.apache.hc.core5.util.Asserts;
51  import org.apache.hc.core5.util.LangUtils;
52  import org.apache.hc.core5.util.TimeValue;
53  import org.apache.hc.core5.util.Timeout;
54  
55  /**
56   * Connection pool with higher concurrency but with lax connection limit guarantees.
57   *
58   * @param <T> route
59   * @param <C> connection object
60   *
61   * @since 5.0
62   */
63  @Contract(threading = ThreadingBehavior.SAFE)
64  @Experimental
65  public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
66  
67      private final TimeValue timeToLive;
68      private final ConnPoolListener<T> connPoolListener;
69      private final PoolReusePolicy policy;
70      private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
71      private final AtomicBoolean isShutDown;
72  
73      private volatile int defaultMaxPerRoute;
74  
75      /**
76       * @since 5.0
77       */
78      public LaxConnPool(
79              final int defaultMaxPerRoute,
80              final TimeValue timeToLive,
81              final PoolReusePolicy policy,
82              final ConnPoolListener<T> connPoolListener) {
83          super();
84          Args.positive(defaultMaxPerRoute, "Max per route value");
85          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
86          this.connPoolListener = connPoolListener;
87          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
88          this.routeToPool = new ConcurrentHashMap<>();
89          this.isShutDown = new AtomicBoolean(false);
90          this.defaultMaxPerRoute = defaultMaxPerRoute;
91      }
92  
93      public LaxConnPool(final int defaultMaxPerRoute) {
94          this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECONDS, PoolReusePolicy.LIFO, null);
95      }
96  
97      public boolean isShutdown() {
98          return isShutDown.get();
99      }
100 
101     @Override
102     public void close(final CloseMode closeMode) {
103         if (isShutDown.compareAndSet(false, true)) {
104             for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
105                 final PerRoutePool<T, C> routePool = it.next();
106                 routePool.shutdown(closeMode);
107             }
108             routeToPool.clear();
109         }
110     }
111 
112     @Override
113     public void close() {
114         close(CloseMode.GRACEFUL);
115     }
116 
117     private PerRoutePool<T, C> getPool(final T route) {
118         PerRoutePool<T, C> routePool = routeToPool.get(route);
119         if (routePool == null) {
120             final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
121                     route,
122                     defaultMaxPerRoute,
123                     timeToLive,
124                     policy,
125                     this,
126                     connPoolListener);
127             routePool = routeToPool.putIfAbsent(route, newRoutePool);
128             if (routePool == null) {
129                 routePool = newRoutePool;
130             }
131         }
132         return routePool;
133     }
134 
135     @Override
136     public Future<PoolEntry<T, C>> lease(
137             final T route, final Object state,
138             final Timeout requestTimeout,
139             final FutureCallback<PoolEntry<T, C>> callback) {
140         Args.notNull(route, "Route");
141         Asserts.check(!isShutDown.get(), "Connection pool shut down");
142         final PerRoutePool<T, C> routePool = getPool(route);
143         return routePool.lease(state, requestTimeout, callback);
144     }
145 
146     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
147         return lease(route, state, Timeout.DISABLED, null);
148     }
149 
150     @Override
151     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
152         if (entry == null) {
153             return;
154         }
155         if (isShutDown.get()) {
156             return;
157         }
158         final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
159         if (connPoolListener != null) {
160             connPoolListener.onLease(entry.getRoute(), this);
161         }
162         routePool.release(entry, reusable);
163     }
164 
165     public void validatePendingRequests() {
166         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
167             routePool.validatePendingRequests();
168         }
169     }
170 
171     @Override
172     public void setMaxTotal(final int max) {
173     }
174 
175     @Override
176     public int getMaxTotal() {
177         return 0;
178     }
179 
180     @Override
181     public void setDefaultMaxPerRoute(final int max) {
182         Args.positive(max, "Max value");
183         defaultMaxPerRoute = max;
184     }
185 
186     @Override
187     public int getDefaultMaxPerRoute() {
188         return defaultMaxPerRoute;
189     }
190 
191     @Override
192     public void setMaxPerRoute(final T route, final int max) {
193         Args.notNull(route, "Route");
194         final PerRoutePool<T, C> routePool = getPool(route);
195         routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
196     }
197 
198     @Override
199     public int getMaxPerRoute(final T route) {
200         Args.notNull(route, "Route");
201         final PerRoutePool<T, C> routePool = getPool(route);
202         return routePool.getMax();
203     }
204 
205     @Override
206     public PoolStats getTotalStats() {
207         int leasedTotal = 0;
208         int pendingTotal = 0;
209         int availableTotal = 0;
210         int maxTotal = 0;
211         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
212             leasedTotal += routePool.getLeasedCount();
213             pendingTotal += routePool.getPendingCount();
214             availableTotal += routePool.getAvailableCount();
215             maxTotal += routePool.getMax();
216         }
217         return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
218     }
219 
220     @Override
221     public PoolStats getStats(final T route) {
222         Args.notNull(route, "Route");
223         final PerRoutePool<T, C> routePool = getPool(route);
224         return new PoolStats(
225                 routePool.getLeasedCount(),
226                 routePool.getPendingCount(),
227                 routePool.getAvailableCount(),
228                 routePool.getMax());
229     }
230 
231     @Override
232     public Set<T> getRoutes() {
233         return new HashSet<>(routeToPool.keySet());
234     }
235 
236     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
237         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
238             routePool.enumAvailable(callback);
239         }
240     }
241 
242     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
243         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
244             routePool.enumLeased(callback);
245         }
246     }
247 
248     @Override
249     public void closeIdle(final TimeValue idleTime) {
250         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMillis() : 0);
251         enumAvailable(new Callback<PoolEntry<T, C>>() {
252 
253             @Override
254             public void execute(final PoolEntry<T, C> entry) {
255                 if (entry.getUpdated() <= deadline) {
256                     entry.discardConnection(CloseMode.GRACEFUL);
257                 }
258             }
259 
260         });
261     }
262 
263     @Override
264     public void closeExpired() {
265         final long now = System.currentTimeMillis();
266         enumAvailable(new Callback<PoolEntry<T, C>>() {
267 
268             @Override
269             public void execute(final PoolEntry<T, C> entry) {
270                 if (entry.getExpiry() < now) {
271                     entry.discardConnection(CloseMode.GRACEFUL);
272                 }
273             }
274 
275         });
276     }
277 
278     @Override
279     public String toString() {
280         final PoolStats totalStats = getTotalStats();
281         final StringBuilder buffer = new StringBuilder();
282         buffer.append("[leased: ");
283         buffer.append(totalStats.getLeased());
284         buffer.append("][available: ");
285         buffer.append(totalStats.getAvailable());
286         buffer.append("][pending: ");
287         buffer.append(totalStats.getPending());
288         buffer.append("]");
289         return buffer.toString();
290     }
291 
292     static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
293 
294         private final Object state;
295         private final long deadline;
296         private final BasicFuture<PoolEntry<T, C>> future;
297 
298         LeaseRequest(
299                 final Object state,
300                 final TimeValue requestTimeout,
301                 final BasicFuture<PoolEntry<T, C>> future) {
302             super();
303             this.state = state;
304             this.deadline = TimeValue.calculateDeadline(System.currentTimeMillis(), requestTimeout);
305             this.future = future;
306         }
307 
308         BasicFuture<PoolEntry<T, C>> getFuture() {
309             return this.future;
310         }
311 
312         public Object getState() {
313             return this.state;
314         }
315 
316         public long getDeadline() {
317             return this.deadline;
318         }
319 
320         public boolean isDone() {
321             return this.future.isDone();
322         }
323 
324         public void completed(final PoolEntry<T, C> result) {
325             future.completed(result);
326         }
327 
328         public void failed(final Exception ex) {
329             future.failed(ex);
330         }
331 
332         @Override
333         public boolean cancel() {
334             return future.cancel();
335         }
336 
337     }
338 
339     static class PerRoutePool<T, C extends ModalCloseable> {
340 
341         private final T route;
342         private final TimeValue timeToLive;
343         private final PoolReusePolicy policy;
344         private final ConnPoolStats<T> connPoolStats;
345         private final ConnPoolListener<T> connPoolListener;
346         private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
347         private final Deque<PoolEntry<T, C>> available;
348         private final Deque<LeaseRequest<T, C>> pending;
349         private final AtomicBoolean terminated;
350 
351         private volatile int max;
352 
353         PerRoutePool(
354                 final T route,
355                 final int max,
356                 final TimeValue timeToLive,
357                 final PoolReusePolicy policy,
358                 final ConnPoolStats<T> connPoolStats,
359                 final ConnPoolListener<T> connPoolListener) {
360             super();
361             this.route = route;
362             this.timeToLive = timeToLive;
363             this.policy = policy;
364             this.connPoolStats = connPoolStats;
365             this.connPoolListener = connPoolListener;
366             this.leased = new ConcurrentHashMap<>();
367             this.available = new ConcurrentLinkedDeque<>();
368             this.pending = new ConcurrentLinkedDeque<>();
369             this.terminated = new AtomicBoolean(false);
370             this.max = max;
371         }
372 
373         public void shutdown(final CloseMode closeMode) {
374             if (terminated.compareAndSet(false, true)) {
375                 PoolEntry<T, C> availableEntry;
376                 while ((availableEntry = available.poll()) != null) {
377                     availableEntry.discardConnection(closeMode);
378                 }
379                 for (final PoolEntry<T, C> entry : leased.keySet()) {
380                     entry.discardConnection(closeMode);
381                 }
382                 leased.clear();
383                 LeaseRequest<T, C> leaseRequest;
384                 while ((leaseRequest = pending.poll()) != null) {
385                     leaseRequest.cancel();
386                 }
387             }
388         }
389 
390         private void addLeased(final PoolEntry<T, C> entry) {
391             if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
392                 throw new IllegalStateException("Pool entry already present in the set of leased entries");
393             } else if (connPoolListener != null) {
394                 connPoolListener.onLease(route, connPoolStats);
395             }
396         }
397 
398         private void removeLeased(final PoolEntry<T, C> entry) {
399             if (connPoolListener != null) {
400                 connPoolListener.onRelease(route, connPoolStats);
401             }
402             if (!leased.remove(entry, Boolean.TRUE)) {
403                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
404             }
405         }
406 
407         private PoolEntry<T, C> getAvailableEntry(final Object state) {
408             final PoolEntry<T, C> entry = available.poll();
409             if (entry != null) {
410                 if (entry.getExpiry() < System.currentTimeMillis()) {
411                     entry.discardConnection(CloseMode.GRACEFUL);
412                 }
413                 if (!LangUtils.equals(entry.getState(), state)) {
414                     entry.discardConnection(CloseMode.GRACEFUL);
415                 }
416             }
417             return entry;
418         }
419 
420         public Future<PoolEntry<T, C>> lease(
421                 final Object state,
422                 final TimeValue requestTimeout,
423                 final FutureCallback<PoolEntry<T, C>> callback) {
424             Asserts.check(!terminated.get(), "Connection pool shut down");
425             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
426             final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
427             if (availableEntry != null) {
428                 addLeased(availableEntry);
429                 future.completed(availableEntry);
430             } else {
431                 if (pending.isEmpty() && leased.size() < max) {
432                     final PoolEntry<T, C> entry = new PoolEntry<>(route, timeToLive);
433                     addLeased(entry);
434                     future.completed(entry);
435                 } else {
436                     pending.add(new LeaseRequest<>(state, requestTimeout, future));
437                 }
438             }
439             return future;
440         }
441 
442         public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
443             removeLeased(releasedEntry);
444             if (!reusable || releasedEntry.getExpiry() < System.currentTimeMillis()) {
445                 releasedEntry.discardConnection(CloseMode.GRACEFUL);
446             }
447             if (releasedEntry.hasConnection()) {
448                 switch (policy) {
449                     case LIFO:
450                         available.addFirst(releasedEntry);
451                         break;
452                     case FIFO:
453                         available.addLast(releasedEntry);
454                         break;
455                     default:
456                         throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
457                 }
458             }
459             LeaseRequest<T, C> leaseRequest;
460             while ((leaseRequest = pending.poll()) != null) {
461                 if (leaseRequest.isDone()) {
462                     continue;
463                 }
464                 final Object state = leaseRequest.getState();
465                 final long deadline = leaseRequest.getDeadline();
466 
467                 final long now = System.currentTimeMillis();
468                 if (now > deadline) {
469                     leaseRequest.failed(new TimeoutException());
470                 } else {
471                     final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
472                     if (availableEntry != null) {
473                         addLeased(releasedEntry);
474                         leaseRequest.completed(availableEntry);
475                     } else if (leased.size() < max) {
476                         final PoolEntry<T, C> newEntry = new PoolEntry<>(route, timeToLive);
477                         addLeased(newEntry);
478                         leaseRequest.completed(newEntry);
479                     }
480                     break;
481                 }
482             }
483         }
484 
485         public void validatePendingRequests() {
486             final long now = System.currentTimeMillis();
487             final Iterator<LeaseRequest<T, C>> it = pending.iterator();
488             while (it.hasNext()) {
489                 final LeaseRequest<T, C> request = it.next();
490                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
491                 if (future.isCancelled() && !request.isDone()) {
492                     it.remove();
493                 } else {
494                     final long deadline = request.getDeadline();
495                     if (now > deadline) {
496                         request.failed(new TimeoutException());
497                     }
498                     if (request.isDone()) {
499                         it.remove();
500                     }
501                 }
502             }
503         }
504 
505         public final T getRoute() {
506             return route;
507         }
508 
509         public int getMax() {
510             return max;
511         }
512 
513         public void setMax(final int max) {
514             this.max = max;
515         }
516 
517         public int getPendingCount() {
518             return pending.size();
519         }
520 
521         public int getLeasedCount() {
522             return leased.size();
523         }
524 
525         public int getAvailableCount() {
526             return available.size();
527         }
528 
529         public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
530             for (final Iterator<PoolEntry<T, C>> it = available.iterator(); it.hasNext(); ) {
531                 final PoolEntry<T, C> entry = it.next();
532                 callback.execute(entry);
533                 if (!entry.hasConnection()) {
534                     it.remove();
535                 }
536             }
537         }
538 
539         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
540             for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
541                 final PoolEntry<T, C> entry = it.next();
542                 callback.execute(entry);
543                 if (!entry.hasConnection()) {
544                     it.remove();
545                 }
546             }
547         }
548 
549         @Override
550         public String toString() {
551             final StringBuilder buffer = new StringBuilder();
552             buffer.append("[route: ");
553             buffer.append(route);
554             buffer.append("][leased: ");
555             buffer.append(leased.size());
556             buffer.append("][available: ");
557             buffer.append(available.size());
558             buffer.append("][pending: ");
559             buffer.append(pending.size());
560             buffer.append("]");
561             return buffer.toString();
562         }
563 
564     }
565 
566 }