1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.nio;
28
29 import java.io.IOException;
30 import java.util.Comparator;
31 import java.util.HashMap;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.concurrent.locks.Lock;
40 import java.util.concurrent.locks.ReentrantLock;
41
42 import org.apache.hc.client5.http.impl.ConnPoolSupport;
43 import org.apache.hc.core5.annotation.Contract;
44 import org.apache.hc.core5.annotation.Experimental;
45 import org.apache.hc.core5.annotation.ThreadingBehavior;
46 import org.apache.hc.core5.concurrent.CallbackContribution;
47 import org.apache.hc.core5.concurrent.CompletedFuture;
48 import org.apache.hc.core5.concurrent.FutureCallback;
49 import org.apache.hc.core5.http.HttpConnection;
50 import org.apache.hc.core5.http.HttpVersion;
51 import org.apache.hc.core5.http.ProtocolVersion;
52 import org.apache.hc.core5.io.CloseMode;
53 import org.apache.hc.core5.pool.ManagedConnPool;
54 import org.apache.hc.core5.pool.PoolEntry;
55 import org.apache.hc.core5.pool.PoolStats;
56 import org.apache.hc.core5.util.Args;
57 import org.apache.hc.core5.util.Asserts;
58 import org.apache.hc.core5.util.TimeValue;
59 import org.apache.hc.core5.util.Timeout;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63
64
65
66
67
68
69
70
71
72 @Contract(threading = ThreadingBehavior.SAFE)
73 @Experimental
74 public class H2SharingConnPool<T, C extends HttpConnection> implements ManagedConnPool<T, C> {
75
76 private static final Logger LOG = LoggerFactory.getLogger(H2SharingConnPool.class);
77
78 private final ManagedConnPool<T, C> pool;
79 private final ConcurrentMap<T, PerRoutePool<T, C>> perRouteCache;
80 private final AtomicBoolean closed;
81
82 public H2SharingConnPool(final ManagedConnPool<T, C> pool) {
83 this.pool = Args.notNull(pool, "Connection pool");
84 this.perRouteCache = new ConcurrentHashMap<>();
85 this.closed = new AtomicBoolean();
86 }
87
88 @Override
89 public void close(final CloseMode closeMode) {
90 if (closed.compareAndSet(false, true)) {
91 perRouteCache.clear();
92 pool.close(closeMode);
93 }
94 }
95
96 @Override
97 public void close() throws IOException {
98 if (closed.compareAndSet(false, true)) {
99 perRouteCache.clear();
100 pool.close();
101 }
102 }
103
104 PerRoutePool<T, C> getPerRoutePool(final T route) {
105 return perRouteCache.computeIfAbsent(route, r -> new PerRoutePool<>());
106 }
107
108 @Override
109 public Future<PoolEntry<T, C>> lease(final T route,
110 final Object state,
111 final Timeout requestTimeout,
112 final FutureCallback<PoolEntry<T, C>> callback) {
113 Asserts.check(!closed.get(), "Connection pool shut down");
114 if (state == null) {
115 final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
116 if (perRoutePool != null) {
117 final PoolEntry<T, C> entry = perRoutePool.lease();
118 if (entry != null) {
119 if (LOG.isDebugEnabled()) {
120 LOG.debug("Sharing connection {} for message exchange multiplexing (lease count = {})",
121 ConnPoolSupport.getId(entry.getConnection()), perRoutePool.getCount(entry));
122 }
123 final Future<PoolEntry<T, C>> future = new CompletedFuture<>(entry);
124 if (callback != null) {
125 callback.completed(entry);
126 }
127 return future;
128 }
129 }
130 }
131 LOG.debug("No shared connection available");
132 return pool.lease(route,
133 state,
134 requestTimeout,
135 new CallbackContribution<PoolEntry<T, C>>(callback) {
136
137 @Override
138 public void completed(final PoolEntry<T, C> entry) {
139 if (state == null) {
140 final C connection = entry.getConnection();
141 final ProtocolVersion ver = connection != null ? connection.getProtocolVersion() : null;
142 if (ver == HttpVersion.HTTP_2_0) {
143 final PerRoutePool<T, C> perRoutePool = getPerRoutePool(route);
144 final long count = perRoutePool.track(entry);
145 if (LOG.isDebugEnabled()) {
146 LOG.debug("Connection {} can be shared for message exchange multiplexing (lease count = {})",
147 ConnPoolSupport.getId(entry.getConnection()), count);
148 }
149 }
150 }
151 if (callback != null) {
152 callback.completed(entry);
153 }
154 }
155
156 });
157 }
158
159 @Override
160 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
161 if (entry == null) {
162 return;
163 }
164 if (closed.get()) {
165 pool.release(entry, reusable);
166 return;
167 }
168 final T route = entry.getRoute();
169 final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
170 if (perRoutePool != null) {
171 final long count = perRoutePool.release(entry, reusable);
172 if (count > 0) {
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("Connection {} is being shared for message exchange multiplexing (lease count = {})",
175 ConnPoolSupport.getId(entry.getConnection()), count);
176 }
177 return;
178 }
179 }
180 if (LOG.isDebugEnabled()) {
181 LOG.debug("Releasing connection {} back to the pool", ConnPoolSupport.getId(entry.getConnection()));
182 }
183 pool.release(entry, reusable);
184 }
185
186 @Override
187 public void setMaxTotal(final int max) {
188 pool.setMaxTotal(max);
189 }
190
191 @Override
192 public int getMaxTotal() {
193 return pool.getMaxTotal();
194 }
195
196 @Override
197 public void setDefaultMaxPerRoute(final int max) {
198 pool.setDefaultMaxPerRoute(max);
199 }
200
201 @Override
202 public int getDefaultMaxPerRoute() {
203 return pool.getDefaultMaxPerRoute();
204 }
205
206 @Override
207 public void setMaxPerRoute(final T route, final int max) {
208 pool.setMaxPerRoute(route, max);
209 }
210
211 @Override
212 public int getMaxPerRoute(final T route) {
213 return pool.getMaxPerRoute(route);
214 }
215
216 @Override
217 public void closeIdle(final TimeValue idleTime) {
218 pool.closeIdle(idleTime);
219 }
220
221 @Override
222 public void closeExpired() {
223 pool.closeExpired();
224 }
225
226 @Override
227 public Set<T> getRoutes() {
228 return pool.getRoutes();
229 }
230
231 @Override
232 public PoolStats getTotalStats() {
233 return pool.getTotalStats();
234 }
235
236 @Override
237 public PoolStats getStats(final T route) {
238 return pool.getStats(route);
239 }
240
241 @Override
242 public String toString() {
243 return pool.toString();
244 }
245
246 static class PerRoutePool<T, C extends HttpConnection> {
247
248 private final Map<PoolEntry<T, C>, AtomicLong> entryMap;
249 private final Lock lock;
250
251 PerRoutePool() {
252 this.entryMap = new HashMap<>();
253 this.lock = new ReentrantLock();
254 }
255
256 AtomicLong getCounter(final PoolEntry<T, C> entry) {
257 return entryMap.computeIfAbsent(entry, e -> new AtomicLong());
258 }
259
260 long track(final PoolEntry<T, C> entry) {
261 lock.lock();
262 try {
263 final AtomicLong counter = getCounter(entry);
264 return counter.incrementAndGet();
265 } finally {
266 lock.unlock();
267 }
268 }
269
270 PoolEntry<T, C> lease() {
271 lock.lock();
272 try {
273 final PoolEntry<T, C> entry = entryMap.entrySet().stream()
274 .min(Comparator.comparingLong(e -> e.getValue().get()))
275 .map(Map.Entry::getKey)
276 .orElse(null);
277 if (entry == null) {
278 return null;
279 }
280 final AtomicLong counter = getCounter(entry);
281 counter.incrementAndGet();
282 return entry;
283 } finally {
284 lock.unlock();
285 }
286 }
287
288 long release(final PoolEntry<T, C> entry, final boolean reusable) {
289 lock.lock();
290 try {
291 final C connection = entry.getConnection();
292 if (!reusable || connection == null || !connection.isOpen()) {
293 entryMap.remove(entry);
294 return 0;
295 } else {
296 final AtomicLong counter = entryMap.compute(entry, (e, c) -> {
297 if (c == null) {
298 return null;
299 }
300 final long count = c.decrementAndGet();
301 return count > 0 ? c : null;
302 });
303 return counter != null ? counter.get() : 0L;
304 }
305 } finally {
306 lock.unlock();
307 }
308 }
309
310 long getCount(final PoolEntry<T, C> entry) {
311 lock.lock();
312 try {
313 final AtomicLong counter = entryMap.get(entry);
314 return counter == null ? 0L : counter.get();
315 } finally {
316 lock.unlock();
317 }
318 }
319
320 }
321
322 }