View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.Deque;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedDeque;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.TimeoutException;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  
40  import org.apache.hc.core5.annotation.Contract;
41  import org.apache.hc.core5.annotation.Experimental;
42  import org.apache.hc.core5.annotation.ThreadingBehavior;
43  import org.apache.hc.core5.concurrent.BasicFuture;
44  import org.apache.hc.core5.concurrent.Cancellable;
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Callback;
47  import org.apache.hc.core5.io.GracefullyCloseable;
48  import org.apache.hc.core5.io.ShutdownType;
49  import org.apache.hc.core5.util.Args;
50  import org.apache.hc.core5.util.Asserts;
51  import org.apache.hc.core5.util.LangUtils;
52  import org.apache.hc.core5.util.TimeValue;
53  import org.apache.hc.core5.util.Timeout;
54  
55  /**
56   * Connection pool with higher concurrency but with lax connection limit guarantees.
57   *
58   * @param <T> route
59   * @param <C> connection object
60   *
61   * @since 5.0
62   */
63  @Contract(threading = ThreadingBehavior.SAFE)
64  @Experimental
65  public class LaxConnPool<T, C extends GracefullyCloseable> implements ManagedConnPool<T, C> {
66  
67      private final TimeValue timeToLive;
68      private final ConnPoolListener<T> connPoolListener;
69      private final PoolReusePolicy policy;
70      private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
71      private final AtomicBoolean isShutDown;
72  
73      private volatile int defaultMaxPerRoute;
74  
75      /**
76       * @since 5.0
77       */
78      public LaxConnPool(
79              final int defaultMaxPerRoute,
80              final TimeValue timeToLive,
81              final PoolReusePolicy policy,
82              final ConnPoolListener<T> connPoolListener) {
83          super();
84          Args.positive(defaultMaxPerRoute, "Max per route value");
85          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
86          this.connPoolListener = connPoolListener;
87          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
88          this.routeToPool = new ConcurrentHashMap<>();
89          this.isShutDown = new AtomicBoolean(false);
90          this.defaultMaxPerRoute = defaultMaxPerRoute;
91      }
92  
93      public LaxConnPool(final int defaultMaxPerRoute) {
94          this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECONDS, PoolReusePolicy.LIFO, null);
95      }
96  
97      public boolean isShutdown() {
98          return isShutDown.get();
99      }
100 
101     @Override
102     public void shutdown(final ShutdownType shutdownType) {
103         if (isShutDown.compareAndSet(false, true)) {
104             for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
105                 final PerRoutePool<T, C> routePool = it.next();
106                 routePool.shutdown(shutdownType);
107             }
108             routeToPool.clear();
109         }
110     }
111 
112     @Override
113     public void close() {
114         shutdown(ShutdownType.GRACEFUL);
115     }
116 
117     private PerRoutePool<T, C> getPool(final T route) {
118         PerRoutePool<T, C> routePool = routeToPool.get(route);
119         if (routePool == null) {
120             final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
121                     route,
122                     defaultMaxPerRoute,
123                     timeToLive,
124                     policy,
125                     this,
126                     connPoolListener);
127             routePool = routeToPool.putIfAbsent(route, newRoutePool);
128             if (routePool == null) {
129                 routePool = newRoutePool;
130             }
131         }
132         return routePool;
133     }
134 
135     @Override
136     public Future<PoolEntry<T, C>> lease(
137             final T route, final Object state,
138             final Timeout requestTimeout,
139             final FutureCallback<PoolEntry<T, C>> callback) {
140         Args.notNull(route, "Route");
141         Asserts.check(!isShutDown.get(), "Connection pool shut down");
142         final PerRoutePool<T, C> routePool = getPool(route);
143         return routePool.lease(state, requestTimeout, callback);
144     }
145 
146     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
147         return lease(route, state, Timeout.DISABLED, null);
148     }
149 
150     @Override
151     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
152         if (entry == null) {
153             return;
154         }
155         if (isShutDown.get()) {
156             return;
157         }
158         final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
159         if (connPoolListener != null) {
160             connPoolListener.onLease(entry.getRoute(), this);
161         }
162         routePool.release(entry, reusable);
163     }
164 
165     public void validatePendingRequests() {
166         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
167             routePool.validatePendingRequests();
168         }
169     }
170 
171     @Override
172     public void setMaxTotal(final int max) {
173     }
174 
175     @Override
176     public int getMaxTotal() {
177         return 0;
178     }
179 
180     @Override
181     public void setDefaultMaxPerRoute(final int max) {
182         Args.positive(max, "Max value");
183         defaultMaxPerRoute = max;
184     }
185 
186     @Override
187     public int getDefaultMaxPerRoute() {
188         return defaultMaxPerRoute;
189     }
190 
191     @Override
192     public void setMaxPerRoute(final T route, final int max) {
193         Args.notNull(route, "Route");
194         Args.positive(max, "Max value");
195         final PerRoutePool<T, C> routePool = getPool(route);
196         routePool.setMax(max);
197     }
198 
199     @Override
200     public int getMaxPerRoute(final T route) {
201         Args.notNull(route, "Route");
202         final PerRoutePool<T, C> routePool = getPool(route);
203         return routePool.getMax();
204     }
205 
206     @Override
207     public PoolStats getTotalStats() {
208         int leasedTotal = 0;
209         int pendingTotal = 0;
210         int availableTotal = 0;
211         int maxTotal = 0;
212         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
213             leasedTotal += routePool.getLeasedCount();
214             pendingTotal += routePool.getPendingCount();
215             availableTotal += routePool.getAvailableCount();
216             maxTotal += routePool.getMax();
217         }
218         return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
219     }
220 
221     @Override
222     public PoolStats getStats(final T route) {
223         Args.notNull(route, "Route");
224         final PerRoutePool<T, C> routePool = getPool(route);
225         return new PoolStats(
226                 routePool.getLeasedCount(),
227                 routePool.getPendingCount(),
228                 routePool.getAvailableCount(),
229                 routePool.getMax());
230     }
231 
232     @Override
233     public Set<T> getRoutes() {
234         return new HashSet<>(routeToPool.keySet());
235     }
236 
237     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
238         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
239             routePool.enumAvailable(callback);
240         }
241     }
242 
243     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
244         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
245             routePool.enumLeased(callback);
246         }
247     }
248 
249     @Override
250     public void closeIdle(final TimeValue idleTime) {
251         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMillis() : 0);
252         enumAvailable(new Callback<PoolEntry<T, C>>() {
253 
254             @Override
255             public void execute(final PoolEntry<T, C> entry) {
256                 if (entry.getUpdated() <= deadline) {
257                     entry.discardConnection(ShutdownType.GRACEFUL);
258                 }
259             }
260 
261         });
262     }
263 
264     @Override
265     public void closeExpired() {
266         final long now = System.currentTimeMillis();
267         enumAvailable(new Callback<PoolEntry<T, C>>() {
268 
269             @Override
270             public void execute(final PoolEntry<T, C> entry) {
271                 if (entry.getExpiry() < now) {
272                     entry.discardConnection(ShutdownType.GRACEFUL);
273                 }
274             }
275 
276         });
277     }
278 
279     @Override
280     public String toString() {
281         final PoolStats totalStats = getTotalStats();
282         final StringBuilder buffer = new StringBuilder();
283         buffer.append("[leased: ");
284         buffer.append(totalStats.getLeased());
285         buffer.append("][available: ");
286         buffer.append(totalStats.getAvailable());
287         buffer.append("][pending: ");
288         buffer.append(totalStats.getPending());
289         buffer.append("]");
290         return buffer.toString();
291     }
292 
293     static class LeaseRequest<T, C extends GracefullyCloseable> implements Cancellable {
294 
295         private final Object state;
296         private final long deadline;
297         private final BasicFuture<PoolEntry<T, C>> future;
298 
299         LeaseRequest(
300                 final Object state,
301                 final TimeValue requestTimeout,
302                 final BasicFuture<PoolEntry<T, C>> future) {
303             super();
304             this.state = state;
305             this.deadline = TimeValue.calculateDeadline(System.currentTimeMillis(), requestTimeout);
306             this.future = future;
307         }
308 
309         BasicFuture<PoolEntry<T, C>> getFuture() {
310             return this.future;
311         }
312 
313         public Object getState() {
314             return this.state;
315         }
316 
317         public long getDeadline() {
318             return this.deadline;
319         }
320 
321         public boolean isDone() {
322             return this.future.isDone();
323         }
324 
325         public void completed(final PoolEntry<T, C> result) {
326             future.completed(result);
327         }
328 
329         public void failed(final Exception ex) {
330             future.failed(ex);
331         }
332 
333         @Override
334         public boolean cancel() {
335             return future.cancel();
336         }
337 
338     }
339 
340     static class PerRoutePool<T, C extends GracefullyCloseable> {
341 
342         private final T route;
343         private final TimeValue timeToLive;
344         private final PoolReusePolicy policy;
345         private final ConnPoolStats<T> connPoolStats;
346         private final ConnPoolListener<T> connPoolListener;
347         private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
348         private final Deque<PoolEntry<T, C>> available;
349         private final Deque<LeaseRequest<T, C>> pending;
350         private final AtomicBoolean terminated;
351 
352         private volatile int max;
353 
354         PerRoutePool(
355                 final T route,
356                 final int max,
357                 final TimeValue timeToLive,
358                 final PoolReusePolicy policy,
359                 final ConnPoolStats<T> connPoolStats,
360                 final ConnPoolListener<T> connPoolListener) {
361             super();
362             this.route = route;
363             this.timeToLive = timeToLive;
364             this.policy = policy;
365             this.connPoolStats = connPoolStats;
366             this.connPoolListener = connPoolListener;
367             this.leased = new ConcurrentHashMap<>();
368             this.available = new ConcurrentLinkedDeque<>();
369             this.pending = new ConcurrentLinkedDeque<>();
370             this.terminated = new AtomicBoolean(false);
371             this.max = max;
372         }
373 
374         public void shutdown(final ShutdownType shutdownType) {
375             if (terminated.compareAndSet(false, true)) {
376                 PoolEntry<T, C> availableEntry;
377                 while ((availableEntry = available.poll()) != null) {
378                     availableEntry.discardConnection(shutdownType);
379                 }
380                 for (final PoolEntry<T, C> entry : leased.keySet()) {
381                     entry.discardConnection(shutdownType);
382                 }
383                 leased.clear();
384                 LeaseRequest<T, C> leaseRequest;
385                 while ((leaseRequest = pending.poll()) != null) {
386                     leaseRequest.cancel();
387                 }
388             }
389         }
390 
391         private void addLeased(final PoolEntry<T, C> entry) {
392             if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
393                 throw new IllegalStateException("Pool entry already present in the set of leased entries");
394             } else if (connPoolListener != null) {
395                 connPoolListener.onLease(route, connPoolStats);
396             }
397         }
398 
399         private void removeLeased(final PoolEntry<T, C> entry) {
400             if (connPoolListener != null) {
401                 connPoolListener.onRelease(route, connPoolStats);
402             }
403             if (!leased.remove(entry, Boolean.TRUE)) {
404                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
405             }
406         }
407 
408         private PoolEntry<T, C> getAvailableEntry(final Object state) {
409             final PoolEntry<T, C> entry = available.poll();
410             if (entry != null) {
411                 if (entry.getExpiry() < System.currentTimeMillis()) {
412                     entry.discardConnection(ShutdownType.GRACEFUL);
413                 }
414                 if (!LangUtils.equals(entry.getState(), state)) {
415                     entry.discardConnection(ShutdownType.GRACEFUL);
416                 }
417             }
418             return entry;
419         }
420 
421         public Future<PoolEntry<T, C>> lease(
422                 final Object state,
423                 final TimeValue requestTimeout,
424                 final FutureCallback<PoolEntry<T, C>> callback) {
425             Asserts.check(!terminated.get(), "Connection pool shut down");
426             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
427             final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
428             if (availableEntry != null) {
429                 addLeased(availableEntry);
430                 future.completed(availableEntry);
431             } else {
432                 if (pending.isEmpty() && leased.size() < max) {
433                     final PoolEntry<T, C> entry = new PoolEntry<>(route, timeToLive);
434                     addLeased(entry);
435                     future.completed(entry);
436                 } else {
437                     pending.add(new LeaseRequest<>(state, requestTimeout, future));
438                 }
439             }
440             return future;
441         }
442 
443         public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
444             removeLeased(releasedEntry);
445             if (!reusable || releasedEntry.getExpiry() < System.currentTimeMillis()) {
446                 releasedEntry.discardConnection(ShutdownType.GRACEFUL);
447             }
448             if (releasedEntry.hasConnection()) {
449                 switch (policy) {
450                     case LIFO:
451                         available.addFirst(releasedEntry);
452                         break;
453                     case FIFO:
454                         available.addLast(releasedEntry);
455                         break;
456                     default:
457                         throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
458                 }
459             }
460             LeaseRequest<T, C> leaseRequest;
461             while ((leaseRequest = pending.poll()) != null) {
462                 if (leaseRequest.isDone()) {
463                     continue;
464                 }
465                 final Object state = leaseRequest.getState();
466                 final long deadline = leaseRequest.getDeadline();
467 
468                 final long now = System.currentTimeMillis();
469                 if (now > deadline) {
470                     leaseRequest.failed(new TimeoutException());
471                 } else {
472                     final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
473                     if (availableEntry != null) {
474                         addLeased(releasedEntry);
475                         leaseRequest.completed(availableEntry);
476                     } else if (leased.size() < max) {
477                         final PoolEntry<T, C> newEntry = new PoolEntry<>(route, timeToLive);
478                         addLeased(newEntry);
479                         leaseRequest.completed(newEntry);
480                     }
481                     break;
482                 }
483             }
484         }
485 
486         public void validatePendingRequests() {
487             final long now = System.currentTimeMillis();
488             final Iterator<LeaseRequest<T, C>> it = pending.iterator();
489             while (it.hasNext()) {
490                 final LeaseRequest<T, C> request = it.next();
491                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
492                 if (future.isCancelled() && !request.isDone()) {
493                     it.remove();
494                 } else {
495                     final long deadline = request.getDeadline();
496                     if (now > deadline) {
497                         request.failed(new TimeoutException());
498                     }
499                     if (request.isDone()) {
500                         it.remove();
501                     }
502                 }
503             }
504         }
505 
506         public final T getRoute() {
507             return route;
508         }
509 
510         public int getMax() {
511             return max;
512         }
513 
514         public void setMax(final int max) {
515             this.max = max;
516         }
517 
518         public int getPendingCount() {
519             return pending.size();
520         }
521 
522         public int getLeasedCount() {
523             return leased.size();
524         }
525 
526         public int getAvailableCount() {
527             return available.size();
528         }
529 
530         public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
531             for (final Iterator<PoolEntry<T, C>> it = available.iterator(); it.hasNext(); ) {
532                 final PoolEntry<T, C> entry = it.next();
533                 callback.execute(entry);
534                 if (!entry.hasConnection()) {
535                     it.remove();
536                 }
537             }
538         }
539 
540         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
541             for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
542                 final PoolEntry<T, C> entry = it.next();
543                 callback.execute(entry);
544                 if (!entry.hasConnection()) {
545                     it.remove();
546                 }
547             }
548         }
549 
550         @Override
551         public String toString() {
552             final StringBuilder buffer = new StringBuilder();
553             buffer.append("[route: ");
554             buffer.append(route);
555             buffer.append("][leased: ");
556             buffer.append(leased.size());
557             buffer.append("][available: ");
558             buffer.append(available.size());
559             buffer.append("][pending: ");
560             buffer.append(pending.size());
561             buffer.append("]");
562             return buffer.toString();
563         }
564 
565     }
566 
567 }