1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.pool;
28
29 import java.io.IOException;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.locks.Lock;
41 import java.util.concurrent.locks.ReentrantLock;
42
43 import org.apache.http.annotation.ThreadSafe;
44 import org.apache.http.concurrent.FutureCallback;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 @ThreadSafe
62 public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
63 implements ConnPool<T, E>, ConnPoolControl<T> {
64
65 private final Lock lock;
66 private final ConnFactory<T, C> connFactory;
67 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
68 private final Set<E> leased;
69 private final LinkedList<E> available;
70 private final LinkedList<PoolEntryFuture<E>> pending;
71 private final Map<T, Integer> maxPerRoute;
72
73 private volatile boolean isShutDown;
74 private volatile int defaultMaxPerRoute;
75 private volatile int maxTotal;
76
77 public AbstractConnPool(
78 final ConnFactory<T, C> connFactory,
79 int defaultMaxPerRoute,
80 int maxTotal) {
81 super();
82 if (connFactory == null) {
83 throw new IllegalArgumentException("Connection factory may not null");
84 }
85 if (defaultMaxPerRoute <= 0) {
86 throw new IllegalArgumentException("Max per route value may not be negative or zero");
87 }
88 if (maxTotal <= 0) {
89 throw new IllegalArgumentException("Max total value may not be negative or zero");
90 }
91 this.lock = new ReentrantLock();
92 this.connFactory = connFactory;
93 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
94 this.leased = new HashSet<E>();
95 this.available = new LinkedList<E>();
96 this.pending = new LinkedList<PoolEntryFuture<E>>();
97 this.maxPerRoute = new HashMap<T, Integer>();
98 this.defaultMaxPerRoute = defaultMaxPerRoute;
99 this.maxTotal = maxTotal;
100 }
101
102
103
104
105 protected abstract E createEntry(T route, C conn);
106
107 public boolean isShutdown() {
108 return this.isShutDown;
109 }
110
111
112
113
114 public void shutdown() throws IOException {
115 if (this.isShutDown) {
116 return ;
117 }
118 this.isShutDown = true;
119 this.lock.lock();
120 try {
121 for (E entry: this.available) {
122 entry.close();
123 }
124 for (E entry: this.leased) {
125 entry.close();
126 }
127 for (RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
128 pool.shutdown();
129 }
130 this.routeToPool.clear();
131 this.leased.clear();
132 this.available.clear();
133 } finally {
134 this.lock.unlock();
135 }
136 }
137
138 private RouteSpecificPool<T, C, E> getPool(final T route) {
139 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
140 if (pool == null) {
141 pool = new RouteSpecificPool<T, C, E>(route) {
142
143 @Override
144 protected E createEntry(C conn) {
145 return AbstractConnPool.this.createEntry(route, conn);
146 }
147
148 };
149 this.routeToPool.put(route, pool);
150 }
151 return pool;
152 }
153
154
155
156
157
158
159
160
161
162 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
163 if (route == null) {
164 throw new IllegalArgumentException("Route may not be null");
165 }
166 if (this.isShutDown) {
167 throw new IllegalStateException("Connection pool shut down");
168 }
169 return new PoolEntryFuture<E>(this.lock, callback) {
170
171 @Override
172 public E getPoolEntry(
173 long timeout,
174 TimeUnit tunit)
175 throws InterruptedException, TimeoutException, IOException {
176 return getPoolEntryBlocking(route, state, timeout, tunit, this);
177 }
178
179 };
180 }
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198 public Future<E> lease(final T route, final Object state) {
199 return lease(route, state, null);
200 }
201
202 private E getPoolEntryBlocking(
203 final T route, final Object state,
204 final long timeout, final TimeUnit tunit,
205 final PoolEntryFuture<E> future)
206 throws IOException, InterruptedException, TimeoutException {
207
208 Date deadline = null;
209 if (timeout > 0) {
210 deadline = new Date
211 (System.currentTimeMillis() + tunit.toMillis(timeout));
212 }
213
214 this.lock.lock();
215 try {
216 RouteSpecificPool<T, C, E> pool = getPool(route);
217 E entry = null;
218 while (entry == null) {
219 if (this.isShutDown) {
220 throw new IllegalStateException("Connection pool shut down");
221 }
222 for (;;) {
223 entry = pool.getFree(state);
224 if (entry == null) {
225 break;
226 }
227 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
228 entry.close();
229 this.available.remove(entry);
230 pool.free(entry, false);
231 } else {
232 break;
233 }
234 }
235 if (entry != null) {
236 this.available.remove(entry);
237 this.leased.add(entry);
238 return entry;
239 }
240
241
242 int maxPerRoute = getMax(route);
243
244 int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
245 if (excess > 0) {
246 for (int i = 0; i < excess; i++) {
247 E lastUsed = pool.getLastUsed();
248 if (lastUsed == null) {
249 break;
250 }
251 lastUsed.close();
252 this.available.remove(lastUsed);
253 pool.remove(lastUsed);
254 }
255 }
256
257 if (pool.getAllocatedCount() < maxPerRoute) {
258 int totalUsed = this.leased.size();
259 int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
260 if (freeCapacity > 0) {
261 int totalAvailable = this.available.size();
262 if (totalAvailable > freeCapacity - 1) {
263 if (!this.available.isEmpty()) {
264 E lastUsed = this.available.removeLast();
265 lastUsed.close();
266 RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
267 otherpool.remove(lastUsed);
268 }
269 }
270 C conn = this.connFactory.create(route);
271 entry = pool.add(conn);
272 this.leased.add(entry);
273 return entry;
274 }
275 }
276
277 boolean success = false;
278 try {
279 pool.queue(future);
280 this.pending.add(future);
281 success = future.await(deadline);
282 } finally {
283
284
285
286
287 pool.unqueue(future);
288 this.pending.remove(future);
289 }
290
291 if (!success && (deadline != null) &&
292 (deadline.getTime() <= System.currentTimeMillis())) {
293 break;
294 }
295 }
296 throw new TimeoutException("Timeout waiting for connection");
297 } finally {
298 this.lock.unlock();
299 }
300 }
301
302 private void notifyPending(final RouteSpecificPool<T, C, E> pool) {
303 PoolEntryFuture<E> future = pool.nextPending();
304 if (future != null) {
305 this.pending.remove(future);
306 } else {
307 future = this.pending.poll();
308 }
309 if (future != null) {
310 future.wakeup();
311 }
312 }
313
314 public void release(E entry, boolean reusable) {
315 this.lock.lock();
316 try {
317 if (this.leased.remove(entry)) {
318 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
319 pool.free(entry, reusable);
320 if (reusable && !this.isShutDown) {
321 this.available.addFirst(entry);
322 } else {
323 entry.close();
324 }
325 notifyPending(pool);
326 }
327 } finally {
328 this.lock.unlock();
329 }
330 }
331
332 private int getMax(final T route) {
333 Integer v = this.maxPerRoute.get(route);
334 if (v != null) {
335 return v.intValue();
336 } else {
337 return this.defaultMaxPerRoute;
338 }
339 }
340
341 public void setMaxTotal(int max) {
342 if (max <= 0) {
343 throw new IllegalArgumentException("Max value may not be negative or zero");
344 }
345 this.lock.lock();
346 try {
347 this.maxTotal = max;
348 } finally {
349 this.lock.unlock();
350 }
351 }
352
353 public int getMaxTotal() {
354 this.lock.lock();
355 try {
356 return this.maxTotal;
357 } finally {
358 this.lock.unlock();
359 }
360 }
361
362 public void setDefaultMaxPerRoute(int max) {
363 if (max <= 0) {
364 throw new IllegalArgumentException("Max value may not be negative or zero");
365 }
366 this.lock.lock();
367 try {
368 this.defaultMaxPerRoute = max;
369 } finally {
370 this.lock.unlock();
371 }
372 }
373
374 public int getDefaultMaxPerRoute() {
375 this.lock.lock();
376 try {
377 return this.defaultMaxPerRoute;
378 } finally {
379 this.lock.unlock();
380 }
381 }
382
383 public void setMaxPerRoute(final T route, int max) {
384 if (route == null) {
385 throw new IllegalArgumentException("Route may not be null");
386 }
387 if (max <= 0) {
388 throw new IllegalArgumentException("Max value may not be negative or zero");
389 }
390 this.lock.lock();
391 try {
392 this.maxPerRoute.put(route, max);
393 } finally {
394 this.lock.unlock();
395 }
396 }
397
398 public int getMaxPerRoute(T route) {
399 if (route == null) {
400 throw new IllegalArgumentException("Route may not be null");
401 }
402 this.lock.lock();
403 try {
404 return getMax(route);
405 } finally {
406 this.lock.unlock();
407 }
408 }
409
410 public PoolStats getTotalStats() {
411 this.lock.lock();
412 try {
413 return new PoolStats(
414 this.leased.size(),
415 this.pending.size(),
416 this.available.size(),
417 this.maxTotal);
418 } finally {
419 this.lock.unlock();
420 }
421 }
422
423 public PoolStats getStats(final T route) {
424 if (route == null) {
425 throw new IllegalArgumentException("Route may not be null");
426 }
427 this.lock.lock();
428 try {
429 RouteSpecificPool<T, C, E> pool = getPool(route);
430 return new PoolStats(
431 pool.getLeasedCount(),
432 pool.getPendingCount(),
433 pool.getAvailableCount(),
434 getMax(route));
435 } finally {
436 this.lock.unlock();
437 }
438 }
439
440
441
442
443
444
445
446
447 public void closeIdle(long idletime, final TimeUnit tunit) {
448 if (tunit == null) {
449 throw new IllegalArgumentException("Time unit must not be null.");
450 }
451 long time = tunit.toMillis(idletime);
452 if (time < 0) {
453 time = 0;
454 }
455 long deadline = System.currentTimeMillis() - time;
456 this.lock.lock();
457 try {
458 Iterator<E> it = this.available.iterator();
459 while (it.hasNext()) {
460 E entry = it.next();
461 if (entry.getUpdated() <= deadline) {
462 entry.close();
463 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
464 pool.remove(entry);
465 it.remove();
466 notifyPending(pool);
467 }
468 }
469 } finally {
470 this.lock.unlock();
471 }
472 }
473
474
475
476
477 public void closeExpired() {
478 long now = System.currentTimeMillis();
479 this.lock.lock();
480 try {
481 Iterator<E> it = this.available.iterator();
482 while (it.hasNext()) {
483 E entry = it.next();
484 if (entry.isExpired(now)) {
485 entry.close();
486 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
487 pool.remove(entry);
488 it.remove();
489 notifyPending(pool);
490 }
491 }
492 } finally {
493 this.lock.unlock();
494 }
495 }
496
497 @Override
498 public String toString() {
499 StringBuilder buffer = new StringBuilder();
500 buffer.append("[leased: ");
501 buffer.append(this.leased);
502 buffer.append("][available: ");
503 buffer.append(this.available);
504 buffer.append("][pending: ");
505 buffer.append(this.pending);
506 buffer.append("]");
507 return buffer.toString();
508 }
509
510 }