View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.util.Comparator;
31  import java.util.HashMap;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicLong;
39  import java.util.concurrent.locks.Lock;
40  import java.util.concurrent.locks.ReentrantLock;
41  
42  import org.apache.hc.client5.http.impl.ConnPoolSupport;
43  import org.apache.hc.core5.annotation.Contract;
44  import org.apache.hc.core5.annotation.Experimental;
45  import org.apache.hc.core5.annotation.ThreadingBehavior;
46  import org.apache.hc.core5.concurrent.CallbackContribution;
47  import org.apache.hc.core5.concurrent.CompletedFuture;
48  import org.apache.hc.core5.concurrent.FutureCallback;
49  import org.apache.hc.core5.http.HttpConnection;
50  import org.apache.hc.core5.http.HttpVersion;
51  import org.apache.hc.core5.http.ProtocolVersion;
52  import org.apache.hc.core5.io.CloseMode;
53  import org.apache.hc.core5.pool.ManagedConnPool;
54  import org.apache.hc.core5.pool.PoolEntry;
55  import org.apache.hc.core5.pool.PoolStats;
56  import org.apache.hc.core5.util.Args;
57  import org.apache.hc.core5.util.Asserts;
58  import org.apache.hc.core5.util.TimeValue;
59  import org.apache.hc.core5.util.Timeout;
60  import org.slf4j.Logger;
61  import org.slf4j.LoggerFactory;
62  
63  /**
64   * Experimental connections pool implementation that acts as a caching facade in front of
65   * a standard {@link ManagedConnPool} and shares already leased connections to multiplex
66   * message exchanges over active HTTP/2 connections.
67   * @param <T> route
68   * @param <C> connection object
69   *
70   * @since 5.5
71   */
72  @Contract(threading = ThreadingBehavior.SAFE)
73  @Experimental
74  public class H2SharingConnPool<T, C extends HttpConnection> implements ManagedConnPool<T, C> {
75  
76      private static final Logger LOG = LoggerFactory.getLogger(H2SharingConnPool.class);
77  
78      private final ManagedConnPool<T, C> pool;
79      private final ConcurrentMap<T, PerRoutePool<T, C>> perRouteCache;
80      private final AtomicBoolean closed;
81  
82      public H2SharingConnPool(final ManagedConnPool<T, C> pool) {
83          this.pool = Args.notNull(pool, "Connection pool");
84          this.perRouteCache = new ConcurrentHashMap<>();
85          this.closed = new AtomicBoolean();
86      }
87  
88      @Override
89      public void close(final CloseMode closeMode) {
90          if (closed.compareAndSet(false, true)) {
91              perRouteCache.clear();
92              pool.close(closeMode);
93          }
94      }
95  
96      @Override
97      public void close() throws IOException {
98          if (closed.compareAndSet(false, true)) {
99              perRouteCache.clear();
100             pool.close();
101         }
102     }
103 
104     PerRoutePool<T, C> getPerRoutePool(final T route) {
105         return perRouteCache.computeIfAbsent(route, r -> new PerRoutePool<>());
106     }
107 
108     @Override
109     public Future<PoolEntry<T, C>> lease(final T route,
110                                          final Object state,
111                                          final Timeout requestTimeout,
112                                          final FutureCallback<PoolEntry<T, C>> callback) {
113         Asserts.check(!closed.get(), "Connection pool shut down");
114         if (state == null) {
115             final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
116             if (perRoutePool != null) {
117                 final PoolEntry<T, C> entry = perRoutePool.lease();
118                 if (entry != null) {
119                     if (LOG.isDebugEnabled()) {
120                         LOG.debug("Sharing connection {} for message exchange multiplexing (lease count = {})",
121                                 ConnPoolSupport.getId(entry.getConnection()), perRoutePool.getCount(entry));
122                     }
123                     final Future<PoolEntry<T, C>> future = new CompletedFuture<>(entry);
124                     if (callback != null) {
125                         callback.completed(entry);
126                     }
127                     return future;
128                 }
129             }
130         }
131         LOG.debug("No shared connection available");
132         return pool.lease(route,
133                 state,
134                 requestTimeout,
135                 new CallbackContribution<PoolEntry<T, C>>(callback) {
136 
137                     @Override
138                     public void completed(final PoolEntry<T, C> entry) {
139                         if (state == null) {
140                             final C connection = entry.getConnection();
141                             final ProtocolVersion ver = connection != null ? connection.getProtocolVersion() : null;
142                             if (ver == HttpVersion.HTTP_2_0) {
143                                 final PerRoutePool<T, C> perRoutePool = getPerRoutePool(route);
144                                 final long count = perRoutePool.track(entry);
145                                 if (LOG.isDebugEnabled()) {
146                                     LOG.debug("Connection {} can be shared for message exchange multiplexing (lease count = {})",
147                                             ConnPoolSupport.getId(entry.getConnection()), count);
148                                 }
149                             }
150                         }
151                         if (callback != null) {
152                             callback.completed(entry);
153                         }
154                     }
155 
156                 });
157     }
158 
159     @Override
160     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
161         if (entry == null) {
162             return;
163         }
164         if (closed.get()) {
165             pool.release(entry, reusable);
166             return;
167         }
168         final T route = entry.getRoute();
169         final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
170         if (perRoutePool != null) {
171             final long count = perRoutePool.release(entry, reusable);
172             if (count > 0) {
173                 if (LOG.isDebugEnabled()) {
174                     LOG.debug("Connection {} is being shared for message exchange multiplexing (lease count = {})",
175                             ConnPoolSupport.getId(entry.getConnection()), count);
176                 }
177                 return;
178             }
179         }
180         if (LOG.isDebugEnabled()) {
181             LOG.debug("Releasing connection {} back to the pool", ConnPoolSupport.getId(entry.getConnection()));
182         }
183         pool.release(entry, reusable);
184     }
185 
186     @Override
187     public void setMaxTotal(final int max) {
188         pool.setMaxTotal(max);
189     }
190 
191     @Override
192     public int getMaxTotal() {
193         return pool.getMaxTotal();
194     }
195 
196     @Override
197     public void setDefaultMaxPerRoute(final int max) {
198         pool.setDefaultMaxPerRoute(max);
199     }
200 
201     @Override
202     public int getDefaultMaxPerRoute() {
203         return pool.getDefaultMaxPerRoute();
204     }
205 
206     @Override
207     public void setMaxPerRoute(final T route, final int max) {
208         pool.setMaxPerRoute(route, max);
209     }
210 
211     @Override
212     public int getMaxPerRoute(final T route) {
213         return pool.getMaxPerRoute(route);
214     }
215 
216     @Override
217     public void closeIdle(final TimeValue idleTime) {
218         pool.closeIdle(idleTime);
219     }
220 
221     @Override
222     public void closeExpired() {
223         pool.closeExpired();
224     }
225 
226     @Override
227     public Set<T> getRoutes() {
228         return pool.getRoutes();
229     }
230 
231     @Override
232     public PoolStats getTotalStats() {
233         return pool.getTotalStats();
234     }
235 
236     @Override
237     public PoolStats getStats(final T route) {
238         return pool.getStats(route);
239     }
240 
241     @Override
242     public String toString() {
243         return pool.toString();
244     }
245 
246     static class PerRoutePool<T, C extends HttpConnection> {
247 
248         private final Map<PoolEntry<T, C>, AtomicLong> entryMap;
249         private final Lock lock;
250 
251         PerRoutePool() {
252             this.entryMap = new HashMap<>();
253             this.lock = new ReentrantLock();
254         }
255 
256         AtomicLong getCounter(final PoolEntry<T, C> entry) {
257             return entryMap.computeIfAbsent(entry, e -> new AtomicLong());
258         }
259 
260         long track(final PoolEntry<T, C> entry) {
261             lock.lock();
262             try {
263                 final AtomicLong counter = getCounter(entry);
264                 return counter.incrementAndGet();
265             } finally {
266                 lock.unlock();
267             }
268         }
269 
270         PoolEntry<T, C> lease() {
271             lock.lock();
272             try {
273                 final PoolEntry<T, C> entry = entryMap.entrySet().stream()
274                         .min(Comparator.comparingLong(e -> e.getValue().get()))
275                         .map(Map.Entry::getKey)
276                         .orElse(null);
277                 if (entry == null) {
278                     return null;
279                 }
280                 final AtomicLong counter = getCounter(entry);
281                 counter.incrementAndGet();
282                 return entry;
283             } finally {
284                 lock.unlock();
285             }
286         }
287 
288         long release(final PoolEntry<T, C> entry, final boolean reusable) {
289             lock.lock();
290             try {
291                 final C connection = entry.getConnection();
292                 if (!reusable || connection == null || !connection.isOpen()) {
293                     entryMap.remove(entry);
294                     return 0;
295                 } else {
296                     final AtomicLong counter = entryMap.compute(entry, (e, c) -> {
297                         if (c == null) {
298                             return null;
299                         }
300                         final long count = c.decrementAndGet();
301                         return count > 0 ? c : null;
302                     });
303                     return counter != null ? counter.get() : 0L;
304                 }
305             } finally {
306                 lock.unlock();
307             }
308         }
309 
310         long getCount(final PoolEntry<T, C> entry) {
311             lock.lock();
312             try {
313                 final AtomicLong counter = entryMap.get(entry);
314                 return counter == null ? 0L : counter.get();
315             } finally {
316                 lock.unlock();
317             }
318         }
319 
320     }
321 
322 }