View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.Deque;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedDeque;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  
39  import org.apache.hc.core5.annotation.Contract;
40  import org.apache.hc.core5.annotation.Experimental;
41  import org.apache.hc.core5.annotation.ThreadingBehavior;
42  import org.apache.hc.core5.concurrent.BasicFuture;
43  import org.apache.hc.core5.concurrent.Cancellable;
44  import org.apache.hc.core5.concurrent.FutureCallback;
45  import org.apache.hc.core5.function.Callback;
46  import org.apache.hc.core5.io.CloseMode;
47  import org.apache.hc.core5.io.ModalCloseable;
48  import org.apache.hc.core5.util.Args;
49  import org.apache.hc.core5.util.Asserts;
50  import org.apache.hc.core5.util.Deadline;
51  import org.apache.hc.core5.util.DeadlineTimeoutException;
52  import org.apache.hc.core5.util.LangUtils;
53  import org.apache.hc.core5.util.TimeValue;
54  import org.apache.hc.core5.util.Timeout;
55  
56  /**
57   * Connection pool with higher concurrency but with lax connection limit guarantees.
58   *
59   * @param <T> route
60   * @param <C> connection object
61   *
62   * @since 5.0
63   */
64  @Contract(threading = ThreadingBehavior.SAFE)
65  @Experimental
66  public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
67  
68      private final TimeValue timeToLive;
69      private final ConnPoolListener<T> connPoolListener;
70      private final PoolReusePolicy policy;
71      private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
72      private final AtomicBoolean isShutDown;
73  
74      private volatile int defaultMaxPerRoute;
75  
76      /**
77       * @since 5.0
78       */
79      public LaxConnPool(
80              final int defaultMaxPerRoute,
81              final TimeValue timeToLive,
82              final PoolReusePolicy policy,
83              final ConnPoolListener<T> connPoolListener) {
84          super();
85          Args.positive(defaultMaxPerRoute, "Max per route value");
86          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
87          this.connPoolListener = connPoolListener;
88          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
89          this.routeToPool = new ConcurrentHashMap<>();
90          this.isShutDown = new AtomicBoolean(false);
91          this.defaultMaxPerRoute = defaultMaxPerRoute;
92      }
93  
94      public LaxConnPool(final int defaultMaxPerRoute) {
95          this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECONDS, PoolReusePolicy.LIFO, null);
96      }
97  
98      public boolean isShutdown() {
99          return isShutDown.get();
100     }
101 
102     @Override
103     public void close(final CloseMode closeMode) {
104         if (isShutDown.compareAndSet(false, true)) {
105             for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
106                 final PerRoutePool<T, C> routePool = it.next();
107                 routePool.shutdown(closeMode);
108             }
109             routeToPool.clear();
110         }
111     }
112 
113     @Override
114     public void close() {
115         close(CloseMode.GRACEFUL);
116     }
117 
118     private PerRoutePool<T, C> getPool(final T route) {
119         PerRoutePool<T, C> routePool = routeToPool.get(route);
120         if (routePool == null) {
121             final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
122                     route,
123                     defaultMaxPerRoute,
124                     timeToLive,
125                     policy,
126                     this,
127                     connPoolListener);
128             routePool = routeToPool.putIfAbsent(route, newRoutePool);
129             if (routePool == null) {
130                 routePool = newRoutePool;
131             }
132         }
133         return routePool;
134     }
135 
136     @Override
137     public Future<PoolEntry<T, C>> lease(
138             final T route, final Object state,
139             final Timeout requestTimeout,
140             final FutureCallback<PoolEntry<T, C>> callback) {
141         Args.notNull(route, "Route");
142         Asserts.check(!isShutDown.get(), "Connection pool shut down");
143         final PerRoutePool<T, C> routePool = getPool(route);
144         return routePool.lease(state, requestTimeout, callback);
145     }
146 
147     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
148         return lease(route, state, Timeout.DISABLED, null);
149     }
150 
151     @Override
152     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
153         if (entry == null) {
154             return;
155         }
156         if (isShutDown.get()) {
157             return;
158         }
159         final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
160         if (connPoolListener != null) {
161             connPoolListener.onLease(entry.getRoute(), this);
162         }
163         routePool.release(entry, reusable);
164     }
165 
166     public void validatePendingRequests() {
167         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
168             routePool.validatePendingRequests();
169         }
170     }
171 
172     @Override
173     public void setMaxTotal(final int max) {
174     }
175 
176     @Override
177     public int getMaxTotal() {
178         return 0;
179     }
180 
181     @Override
182     public void setDefaultMaxPerRoute(final int max) {
183         Args.positive(max, "Max value");
184         defaultMaxPerRoute = max;
185     }
186 
187     @Override
188     public int getDefaultMaxPerRoute() {
189         return defaultMaxPerRoute;
190     }
191 
192     @Override
193     public void setMaxPerRoute(final T route, final int max) {
194         Args.notNull(route, "Route");
195         final PerRoutePool<T, C> routePool = getPool(route);
196         routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
197     }
198 
199     @Override
200     public int getMaxPerRoute(final T route) {
201         Args.notNull(route, "Route");
202         final PerRoutePool<T, C> routePool = getPool(route);
203         return routePool.getMax();
204     }
205 
206     @Override
207     public PoolStats getTotalStats() {
208         int leasedTotal = 0;
209         int pendingTotal = 0;
210         int availableTotal = 0;
211         int maxTotal = 0;
212         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
213             leasedTotal += routePool.getLeasedCount();
214             pendingTotal += routePool.getPendingCount();
215             availableTotal += routePool.getAvailableCount();
216             maxTotal += routePool.getMax();
217         }
218         return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
219     }
220 
221     @Override
222     public PoolStats getStats(final T route) {
223         Args.notNull(route, "Route");
224         final PerRoutePool<T, C> routePool = getPool(route);
225         return new PoolStats(
226                 routePool.getLeasedCount(),
227                 routePool.getPendingCount(),
228                 routePool.getAvailableCount(),
229                 routePool.getMax());
230     }
231 
232     @Override
233     public Set<T> getRoutes() {
234         return new HashSet<>(routeToPool.keySet());
235     }
236 
237     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
238         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
239             routePool.enumAvailable(callback);
240         }
241     }
242 
243     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
244         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
245             routePool.enumLeased(callback);
246         }
247     }
248 
249     @Override
250     public void closeIdle(final TimeValue idleTime) {
251         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMillis() : 0);
252         enumAvailable(new Callback<PoolEntry<T, C>>() {
253 
254             @Override
255             public void execute(final PoolEntry<T, C> entry) {
256                 if (entry.getUpdated() <= deadline) {
257                     entry.discardConnection(CloseMode.GRACEFUL);
258                 }
259             }
260 
261         });
262     }
263 
264     @Override
265     public void closeExpired() {
266         final long now = System.currentTimeMillis();
267         enumAvailable(new Callback<PoolEntry<T, C>>() {
268 
269             @Override
270             public void execute(final PoolEntry<T, C> entry) {
271                 if (entry.getExpiryDeadline().isBefore(now)) {
272                     entry.discardConnection(CloseMode.GRACEFUL);
273                 }
274             }
275 
276         });
277     }
278 
279     @Override
280     public String toString() {
281         final PoolStats totalStats = getTotalStats();
282         final StringBuilder buffer = new StringBuilder();
283         buffer.append("[leased: ");
284         buffer.append(totalStats.getLeased());
285         buffer.append("][available: ");
286         buffer.append(totalStats.getAvailable());
287         buffer.append("][pending: ");
288         buffer.append(totalStats.getPending());
289         buffer.append("]");
290         return buffer.toString();
291     }
292 
293     static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
294 
295         private final Object state;
296         private final Deadline deadline;
297         private final BasicFuture<PoolEntry<T, C>> future;
298 
299         LeaseRequest(
300                 final Object state,
301                 final Timeout requestTimeout,
302                 final BasicFuture<PoolEntry<T, C>> future) {
303             super();
304             this.state = state;
305             this.deadline = Deadline.calculate(requestTimeout);
306             this.future = future;
307         }
308 
309         BasicFuture<PoolEntry<T, C>> getFuture() {
310             return this.future;
311         }
312 
313         public Object getState() {
314             return this.state;
315         }
316 
317         public Deadline getDeadline() {
318             return this.deadline;
319         }
320 
321         public boolean isDone() {
322             return this.future.isDone();
323         }
324 
325         public void completed(final PoolEntry<T, C> result) {
326             future.completed(result);
327         }
328 
329         public void failed(final Exception ex) {
330             future.failed(ex);
331         }
332 
333         @Override
334         public boolean cancel() {
335             return future.cancel();
336         }
337 
338     }
339 
340     static class PerRoutePool<T, C extends ModalCloseable> {
341 
342         private final T route;
343         private final TimeValue timeToLive;
344         private final PoolReusePolicy policy;
345         private final ConnPoolStats<T> connPoolStats;
346         private final ConnPoolListener<T> connPoolListener;
347         private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
348         private final Deque<PoolEntry<T, C>> available;
349         private final Deque<LeaseRequest<T, C>> pending;
350         private final AtomicBoolean terminated;
351 
352         private volatile int max;
353 
354         PerRoutePool(
355                 final T route,
356                 final int max,
357                 final TimeValue timeToLive,
358                 final PoolReusePolicy policy,
359                 final ConnPoolStats<T> connPoolStats,
360                 final ConnPoolListener<T> connPoolListener) {
361             super();
362             this.route = route;
363             this.timeToLive = timeToLive;
364             this.policy = policy;
365             this.connPoolStats = connPoolStats;
366             this.connPoolListener = connPoolListener;
367             this.leased = new ConcurrentHashMap<>();
368             this.available = new ConcurrentLinkedDeque<>();
369             this.pending = new ConcurrentLinkedDeque<>();
370             this.terminated = new AtomicBoolean(false);
371             this.max = max;
372         }
373 
374         public void shutdown(final CloseMode closeMode) {
375             if (terminated.compareAndSet(false, true)) {
376                 PoolEntry<T, C> availableEntry;
377                 while ((availableEntry = available.poll()) != null) {
378                     availableEntry.discardConnection(closeMode);
379                 }
380                 for (final PoolEntry<T, C> entry : leased.keySet()) {
381                     entry.discardConnection(closeMode);
382                 }
383                 leased.clear();
384                 LeaseRequest<T, C> leaseRequest;
385                 while ((leaseRequest = pending.poll()) != null) {
386                     leaseRequest.cancel();
387                 }
388             }
389         }
390 
391         private void addLeased(final PoolEntry<T, C> entry) {
392             if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
393                 throw new IllegalStateException("Pool entry already present in the set of leased entries");
394             } else if (connPoolListener != null) {
395                 connPoolListener.onLease(route, connPoolStats);
396             }
397         }
398 
399         private void removeLeased(final PoolEntry<T, C> entry) {
400             if (connPoolListener != null) {
401                 connPoolListener.onRelease(route, connPoolStats);
402             }
403             if (!leased.remove(entry, Boolean.TRUE)) {
404                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
405             }
406         }
407 
408         private PoolEntry<T, C> getAvailableEntry(final Object state) {
409             final PoolEntry<T, C> entry = available.poll();
410             if (entry != null) {
411                 if (entry.getExpiryDeadline().isExpired()) {
412                     entry.discardConnection(CloseMode.GRACEFUL);
413                 }
414                 if (!LangUtils.equals(entry.getState(), state)) {
415                     entry.discardConnection(CloseMode.GRACEFUL);
416                 }
417             }
418             return entry;
419         }
420 
421         public Future<PoolEntry<T, C>> lease(
422                 final Object state,
423                 final Timeout requestTimeout,
424                 final FutureCallback<PoolEntry<T, C>> callback) {
425             Asserts.check(!terminated.get(), "Connection pool shut down");
426             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
427             final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
428             if (availableEntry != null) {
429                 addLeased(availableEntry);
430                 future.completed(availableEntry);
431             } else {
432                 if (pending.isEmpty() && leased.size() < max) {
433                     final PoolEntry<T, C> entry = new PoolEntry<>(route, timeToLive);
434                     addLeased(entry);
435                     future.completed(entry);
436                 } else {
437                     pending.add(new LeaseRequest<>(state, requestTimeout, future));
438                 }
439             }
440             return future;
441         }
442 
443         public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
444             removeLeased(releasedEntry);
445             if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
446                 releasedEntry.discardConnection(CloseMode.GRACEFUL);
447             }
448             if (releasedEntry.hasConnection()) {
449                 switch (policy) {
450                     case LIFO:
451                         available.addFirst(releasedEntry);
452                         break;
453                     case FIFO:
454                         available.addLast(releasedEntry);
455                         break;
456                     default:
457                         throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
458                 }
459             }
460             LeaseRequest<T, C> leaseRequest;
461             while ((leaseRequest = pending.poll()) != null) {
462                 if (leaseRequest.isDone()) {
463                     continue;
464                 }
465                 final Object state = leaseRequest.getState();
466                 final Deadline deadline = leaseRequest.getDeadline();
467 
468                 if (deadline.isExpired()) {
469                     leaseRequest.failed(DeadlineTimeoutException.from(deadline));
470                 } else {
471                     final PoolEntry<T, C> availableEntry = getAvailableEntry(state);
472                     if (availableEntry != null) {
473                         addLeased(availableEntry);
474                         leaseRequest.completed(availableEntry);
475                     } else if (leased.size() < max) {
476                         final PoolEntry<T, C> newEntry = new PoolEntry<>(route, timeToLive);
477                         addLeased(newEntry);
478                         leaseRequest.completed(newEntry);
479                     }
480                     break;
481                 }
482             }
483         }
484 
485         public void validatePendingRequests() {
486             final Iterator<LeaseRequest<T, C>> it = pending.iterator();
487             while (it.hasNext()) {
488                 final LeaseRequest<T, C> request = it.next();
489                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
490                 if (future.isCancelled() && !request.isDone()) {
491                     it.remove();
492                 } else {
493                     final Deadline deadline = request.getDeadline();
494                     if (deadline.isExpired()) {
495                         request.failed(DeadlineTimeoutException.from(deadline));
496                     }
497                     if (request.isDone()) {
498                         it.remove();
499                     }
500                 }
501             }
502         }
503 
504         public final T getRoute() {
505             return route;
506         }
507 
508         public int getMax() {
509             return max;
510         }
511 
512         public void setMax(final int max) {
513             this.max = max;
514         }
515 
516         public int getPendingCount() {
517             return pending.size();
518         }
519 
520         public int getLeasedCount() {
521             return leased.size();
522         }
523 
524         public int getAvailableCount() {
525             return available.size();
526         }
527 
528         public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
529             for (final Iterator<PoolEntry<T, C>> it = available.iterator(); it.hasNext(); ) {
530                 final PoolEntry<T, C> entry = it.next();
531                 callback.execute(entry);
532                 if (!entry.hasConnection()) {
533                     it.remove();
534                 }
535             }
536         }
537 
538         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
539             for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
540                 final PoolEntry<T, C> entry = it.next();
541                 callback.execute(entry);
542                 if (!entry.hasConnection()) {
543                     it.remove();
544                 }
545             }
546         }
547 
548         @Override
549         public String toString() {
550             final StringBuilder buffer = new StringBuilder();
551             buffer.append("[route: ");
552             buffer.append(route);
553             buffer.append("][leased: ");
554             buffer.append(leased.size());
555             buffer.append("][available: ");
556             buffer.append(available.size());
557             buffer.append("][pending: ");
558             buffer.append(pending.size());
559             buffer.append("]");
560             return buffer.toString();
561         }
562 
563     }
564 
565 }