1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.pool;
28
29 import java.io.IOException;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.locks.Lock;
41 import java.util.concurrent.locks.ReentrantLock;
42
43 import org.apache.http.annotation.ThreadSafe;
44 import org.apache.http.concurrent.FutureCallback;
45 import org.apache.http.util.Args;
46 import org.apache.http.util.Asserts;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 @ThreadSafe
64 public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
65 implements ConnPool<T, E>, ConnPoolControl<T> {
66
67 private final Lock lock;
68 private final ConnFactory<T, C> connFactory;
69 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
70 private final Set<E> leased;
71 private final LinkedList<E> available;
72 private final LinkedList<PoolEntryFuture<E>> pending;
73 private final Map<T, Integer> maxPerRoute;
74
75 private volatile boolean isShutDown;
76 private volatile int defaultMaxPerRoute;
77 private volatile int maxTotal;
78
79 public AbstractConnPool(
80 final ConnFactory<T, C> connFactory,
81 final int defaultMaxPerRoute,
82 final int maxTotal) {
83 super();
84 this.connFactory = Args.notNull(connFactory, "Connection factory");
85 this.defaultMaxPerRoute = Args.notNegative(defaultMaxPerRoute, "Max per route value");
86 this.maxTotal = Args.notNegative(maxTotal, "Max total value");
87 this.lock = new ReentrantLock();
88 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
89 this.leased = new HashSet<E>();
90 this.available = new LinkedList<E>();
91 this.pending = new LinkedList<PoolEntryFuture<E>>();
92 this.maxPerRoute = new HashMap<T, Integer>();
93 }
94
95
96
97
98 protected abstract E createEntry(T route, C conn);
99
100 public boolean isShutdown() {
101 return this.isShutDown;
102 }
103
104
105
106
107 public void shutdown() throws IOException {
108 if (this.isShutDown) {
109 return ;
110 }
111 this.isShutDown = true;
112 this.lock.lock();
113 try {
114 for (final E entry: this.available) {
115 entry.close();
116 }
117 for (final E entry: this.leased) {
118 entry.close();
119 }
120 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
121 pool.shutdown();
122 }
123 this.routeToPool.clear();
124 this.leased.clear();
125 this.available.clear();
126 } finally {
127 this.lock.unlock();
128 }
129 }
130
131 private RouteSpecificPool<T, C, E> getPool(final T route) {
132 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
133 if (pool == null) {
134 pool = new RouteSpecificPool<T, C, E>(route) {
135
136 @Override
137 protected E createEntry(final C conn) {
138 return AbstractConnPool.this.createEntry(route, conn);
139 }
140
141 };
142 this.routeToPool.put(route, pool);
143 }
144 return pool;
145 }
146
147
148
149
150
151
152
153
154
155 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
156 Args.notNull(route, "Route");
157 Asserts.check(!this.isShutDown, "Connection pool shut down");
158 return new PoolEntryFuture<E>(this.lock, callback) {
159
160 @Override
161 public E getPoolEntry(
162 final long timeout,
163 final TimeUnit tunit)
164 throws InterruptedException, TimeoutException, IOException {
165 return getPoolEntryBlocking(route, state, timeout, tunit, this);
166 }
167
168 };
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 public Future<E> lease(final T route, final Object state) {
188 return lease(route, state, null);
189 }
190
191 private E getPoolEntryBlocking(
192 final T route, final Object state,
193 final long timeout, final TimeUnit tunit,
194 final PoolEntryFuture<E> future)
195 throws IOException, InterruptedException, TimeoutException {
196
197 Date deadline = null;
198 if (timeout > 0) {
199 deadline = new Date
200 (System.currentTimeMillis() + tunit.toMillis(timeout));
201 }
202
203 this.lock.lock();
204 try {
205 final RouteSpecificPool<T, C, E> pool = getPool(route);
206 E entry = null;
207 while (entry == null) {
208 Asserts.check(!this.isShutDown, "Connection pool shut down");
209 for (;;) {
210 entry = pool.getFree(state);
211 if (entry == null) {
212 break;
213 }
214 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
215 entry.close();
216 this.available.remove(entry);
217 pool.free(entry, false);
218 } else {
219 break;
220 }
221 }
222 if (entry != null) {
223 this.available.remove(entry);
224 this.leased.add(entry);
225 return entry;
226 }
227
228
229 final int maxPerRoute = getMax(route);
230
231 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
232 if (excess > 0) {
233 for (int i = 0; i < excess; i++) {
234 final E lastUsed = pool.getLastUsed();
235 if (lastUsed == null) {
236 break;
237 }
238 lastUsed.close();
239 this.available.remove(lastUsed);
240 pool.remove(lastUsed);
241 }
242 }
243
244 if (pool.getAllocatedCount() < maxPerRoute) {
245 final int totalUsed = this.leased.size();
246 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
247 if (freeCapacity > 0) {
248 final int totalAvailable = this.available.size();
249 if (totalAvailable > freeCapacity - 1) {
250 if (!this.available.isEmpty()) {
251 final E lastUsed = this.available.removeLast();
252 lastUsed.close();
253 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
254 otherpool.remove(lastUsed);
255 }
256 }
257 final C conn = this.connFactory.create(route);
258 entry = pool.add(conn);
259 this.leased.add(entry);
260 return entry;
261 }
262 }
263
264 boolean success = false;
265 try {
266 pool.queue(future);
267 this.pending.add(future);
268 success = future.await(deadline);
269 } finally {
270
271
272
273
274 pool.unqueue(future);
275 this.pending.remove(future);
276 }
277
278 if (!success && (deadline != null) &&
279 (deadline.getTime() <= System.currentTimeMillis())) {
280 break;
281 }
282 }
283 throw new TimeoutException("Timeout waiting for connection");
284 } finally {
285 this.lock.unlock();
286 }
287 }
288
289 private void notifyPending(final RouteSpecificPool<T, C, E> pool) {
290 PoolEntryFuture<E> future = pool.nextPending();
291 if (future != null) {
292 this.pending.remove(future);
293 } else {
294 future = this.pending.poll();
295 }
296 if (future != null) {
297 future.wakeup();
298 }
299 }
300
301 public void release(final E entry, final boolean reusable) {
302 this.lock.lock();
303 try {
304 if (this.leased.remove(entry)) {
305 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
306 pool.free(entry, reusable);
307 if (reusable && !this.isShutDown) {
308 this.available.addFirst(entry);
309 } else {
310 entry.close();
311 }
312 notifyPending(pool);
313 }
314 } finally {
315 this.lock.unlock();
316 }
317 }
318
319 private int getMax(final T route) {
320 final Integer v = this.maxPerRoute.get(route);
321 if (v != null) {
322 return v.intValue();
323 } else {
324 return this.defaultMaxPerRoute;
325 }
326 }
327
328 public void setMaxTotal(final int max) {
329 Args.notNegative(max, "Max value");
330 this.lock.lock();
331 try {
332 this.maxTotal = max;
333 } finally {
334 this.lock.unlock();
335 }
336 }
337
338 public int getMaxTotal() {
339 this.lock.lock();
340 try {
341 return this.maxTotal;
342 } finally {
343 this.lock.unlock();
344 }
345 }
346
347 public void setDefaultMaxPerRoute(final int max) {
348 Args.notNegative(max, "Max per route value");
349 this.lock.lock();
350 try {
351 this.defaultMaxPerRoute = max;
352 } finally {
353 this.lock.unlock();
354 }
355 }
356
357 public int getDefaultMaxPerRoute() {
358 this.lock.lock();
359 try {
360 return this.defaultMaxPerRoute;
361 } finally {
362 this.lock.unlock();
363 }
364 }
365
366 public void setMaxPerRoute(final T route, final int max) {
367 Args.notNull(route, "Route");
368 Args.notNegative(max, "Max per route value");
369 this.lock.lock();
370 try {
371 this.maxPerRoute.put(route, max);
372 } finally {
373 this.lock.unlock();
374 }
375 }
376
377 public int getMaxPerRoute(final T route) {
378 Args.notNull(route, "Route");
379 this.lock.lock();
380 try {
381 return getMax(route);
382 } finally {
383 this.lock.unlock();
384 }
385 }
386
387 public PoolStats getTotalStats() {
388 this.lock.lock();
389 try {
390 return new PoolStats(
391 this.leased.size(),
392 this.pending.size(),
393 this.available.size(),
394 this.maxTotal);
395 } finally {
396 this.lock.unlock();
397 }
398 }
399
400 public PoolStats getStats(final T route) {
401 Args.notNull(route, "Route");
402 this.lock.lock();
403 try {
404 final RouteSpecificPool<T, C, E> pool = getPool(route);
405 return new PoolStats(
406 pool.getLeasedCount(),
407 pool.getPendingCount(),
408 pool.getAvailableCount(),
409 getMax(route));
410 } finally {
411 this.lock.unlock();
412 }
413 }
414
415
416
417
418
419
420
421
422 public void closeIdle(final long idletime, final TimeUnit tunit) {
423 Args.notNull(tunit, "Time unit");
424 long time = tunit.toMillis(idletime);
425 if (time < 0) {
426 time = 0;
427 }
428 final long deadline = System.currentTimeMillis() - time;
429 this.lock.lock();
430 try {
431 final Iterator<E> it = this.available.iterator();
432 while (it.hasNext()) {
433 final E entry = it.next();
434 if (entry.getUpdated() <= deadline) {
435 entry.close();
436 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
437 pool.remove(entry);
438 it.remove();
439 notifyPending(pool);
440 }
441 }
442 } finally {
443 this.lock.unlock();
444 }
445 }
446
447
448
449
450 public void closeExpired() {
451 final long now = System.currentTimeMillis();
452 this.lock.lock();
453 try {
454 final Iterator<E> it = this.available.iterator();
455 while (it.hasNext()) {
456 final E entry = it.next();
457 if (entry.isExpired(now)) {
458 entry.close();
459 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
460 pool.remove(entry);
461 it.remove();
462 notifyPending(pool);
463 }
464 }
465 } finally {
466 this.lock.unlock();
467 }
468 }
469
470 @Override
471 public String toString() {
472 final StringBuilder buffer = new StringBuilder();
473 buffer.append("[leased: ");
474 buffer.append(this.leased);
475 buffer.append("][available: ");
476 buffer.append(this.available);
477 buffer.append("][pending: ");
478 buffer.append(this.pending);
479 buffer.append("]");
480 return buffer.toString();
481 }
482
483 }