1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.pool;
28
29 import java.io.IOException;
30 import java.net.SocketAddress;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.ListIterator;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.locks.Lock;
42 import java.util.concurrent.locks.ReentrantLock;
43
44 import org.apache.http.annotation.ThreadSafe;
45 import org.apache.http.concurrent.BasicFuture;
46 import org.apache.http.concurrent.FutureCallback;
47 import org.apache.http.nio.reactor.ConnectingIOReactor;
48 import org.apache.http.nio.reactor.IOSession;
49 import org.apache.http.nio.reactor.SessionRequest;
50 import org.apache.http.nio.reactor.SessionRequestCallback;
51 import org.apache.http.pool.ConnPool;
52 import org.apache.http.pool.ConnPoolControl;
53 import org.apache.http.pool.PoolEntry;
54 import org.apache.http.pool.PoolStats;
55
56
57
58
59
60
61
62
63
64
65 @ThreadSafe
66 public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
67 implements ConnPool<T, E>, ConnPoolControl<T> {
68
69 private final ConnectingIOReactor ioreactor;
70 private final NIOConnFactory<T, C> connFactory;
71 private final SessionRequestCallback sessionRequestCallback;
72 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
73 private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
74 private final Set<SessionRequest> pending;
75 private final Set<E> leased;
76 private final LinkedList<E> available;
77 private final Map<T, Integer> maxPerRoute;
78 private final Lock lock;
79
80 private volatile boolean isShutDown;
81 private volatile int defaultMaxPerRoute;
82 private volatile int maxTotal;
83
84 public AbstractNIOConnPool(
85 final ConnectingIOReactor ioreactor,
86 final NIOConnFactory<T, C> connFactory,
87 int defaultMaxPerRoute,
88 int maxTotal) {
89 super();
90 if (ioreactor == null) {
91 throw new IllegalArgumentException("I/O reactor may not be null");
92 }
93 if (connFactory == null) {
94 throw new IllegalArgumentException("Connection factory may not null");
95 }
96 if (defaultMaxPerRoute <= 0) {
97 throw new IllegalArgumentException("Max per route value may not be negative or zero");
98 }
99 if (maxTotal <= 0) {
100 throw new IllegalArgumentException("Max total value may not be negative or zero");
101 }
102 this.ioreactor = ioreactor;
103 this.connFactory = connFactory;
104 this.sessionRequestCallback = new InternalSessionRequestCallback();
105 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
106 this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
107 this.pending = new HashSet<SessionRequest>();
108 this.leased = new HashSet<E>();
109 this.available = new LinkedList<E>();
110 this.maxPerRoute = new HashMap<T, Integer>();
111 this.lock = new ReentrantLock();
112 this.defaultMaxPerRoute = defaultMaxPerRoute;
113 this.maxTotal = maxTotal;
114 }
115
116 protected abstract SocketAddress resolveRemoteAddress(T route);
117
118 protected abstract SocketAddress resolveLocalAddress(T route);
119
120 protected abstract E createEntry(T route, C conn);
121
122 public boolean isShutdown() {
123 return this.isShutDown;
124 }
125
126 public void shutdown(long waitMs) throws IOException {
127 if (this.isShutDown) {
128 return ;
129 }
130 this.isShutDown = true;
131 this.lock.lock();
132 try {
133 for (SessionRequest sessionRequest: this.pending) {
134 sessionRequest.cancel();
135 }
136 for (E entry: this.available) {
137 entry.close();
138 }
139 for (E entry: this.leased) {
140 entry.close();
141 }
142 for (RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
143 pool.shutdown();
144 }
145 this.routeToPool.clear();
146 this.leased.clear();
147 this.pending.clear();
148 this.available.clear();
149 this.leasingRequests.clear();
150 this.ioreactor.shutdown(waitMs);
151 } finally {
152 this.lock.unlock();
153 }
154 }
155
156 private RouteSpecificPool<T, C, E> getPool(final T route) {
157 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
158 if (pool == null) {
159 pool = new RouteSpecificPool<T, C, E>(route) {
160
161 @Override
162 protected E createEntry(final T route, final C conn) {
163 return AbstractNIOConnPool.this.createEntry(route, conn);
164 }
165
166 };
167 this.routeToPool.put(route, pool);
168 }
169 return pool;
170 }
171
172 public Future<E> lease(
173 final T route, final Object state,
174 final long connectTimeout, final TimeUnit tunit,
175 final FutureCallback<E> callback) {
176 if (route == null) {
177 throw new IllegalArgumentException("Route may not be null");
178 }
179 if (tunit == null) {
180 throw new IllegalArgumentException("Time unit may not be null.");
181 }
182 if (this.isShutDown) {
183 throw new IllegalStateException("Session pool has been shut down");
184 }
185 this.lock.lock();
186 try {
187 long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
188 BasicFuture<E> future = new BasicFuture<E>(callback);
189 LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, future);
190 this.leasingRequests.add(request);
191
192 processPendingRequests();
193 return future;
194 } finally {
195 this.lock.unlock();
196 }
197 }
198
199 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
200 return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
201 }
202
203 public Future<E> lease(final T route, final Object state) {
204 return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
205 }
206
207 public void release(final E entry, boolean reusable) {
208 if (entry == null) {
209 return;
210 }
211 if (this.isShutDown) {
212 return;
213 }
214 this.lock.lock();
215 try {
216 if (this.leased.remove(entry)) {
217 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
218 pool.free(entry, reusable);
219 if (reusable) {
220 this.available.addFirst(entry);
221 } else {
222 entry.close();
223 }
224 processPendingRequests();
225 }
226 } finally {
227 this.lock.unlock();
228 }
229 }
230
231 private void processPendingRequests() {
232 ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
233 while (it.hasNext()) {
234 LeaseRequest<T, C, E> request = it.next();
235
236 T route = request.getRoute();
237 Object state = request.getState();
238 long deadline = request.getDeadline();
239 BasicFuture<E> future = request.getFuture();
240
241 long now = System.currentTimeMillis();
242 if (now > deadline) {
243 it.remove();
244 future.failed(new TimeoutException());
245 continue;
246 }
247
248 RouteSpecificPool<T, C, E> pool = getPool(route);
249 E entry = null;
250 for (;;) {
251 entry = pool.getFree(state);
252 if (entry == null) {
253 break;
254 }
255 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
256 entry.close();
257 this.available.remove(entry);
258 pool.free(entry, false);
259 } else {
260 break;
261 }
262 }
263 if (entry != null) {
264 it.remove();
265 this.available.remove(entry);
266 this.leased.add(entry);
267 future.completed(entry);
268 continue;
269 }
270
271
272 int maxPerRoute = getMax(route);
273
274 int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
275 if (excess > 0) {
276 for (int i = 0; i < excess; i++) {
277 E lastUsed = pool.getLastUsed();
278 if (lastUsed == null) {
279 break;
280 }
281 lastUsed.close();
282 this.available.remove(lastUsed);
283 pool.remove(lastUsed);
284 }
285 }
286
287 if (pool.getAllocatedCount() < maxPerRoute) {
288 int totalUsed = this.pending.size() + this.leased.size();
289 int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
290 if (freeCapacity == 0) {
291 continue;
292 }
293 int totalAvailable = this.available.size();
294 if (totalAvailable > freeCapacity - 1) {
295 if (!this.available.isEmpty()) {
296 E lastUsed = this.available.removeLast();
297 lastUsed.close();
298 RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
299 otherpool.remove(lastUsed);
300 }
301 }
302 it.remove();
303 SessionRequest sessionRequest = this.ioreactor.connect(
304 resolveRemoteAddress(route),
305 resolveLocalAddress(route),
306 route,
307 this.sessionRequestCallback);
308 int timout = request.getConnectTimeout() < Integer.MAX_VALUE ?
309 (int) request.getConnectTimeout() : Integer.MAX_VALUE;
310 sessionRequest.setConnectTimeout(timout);
311 this.pending.add(sessionRequest);
312 pool.addPending(sessionRequest, future);
313 }
314 }
315 }
316
317 public void validatePendingRequests() {
318 this.lock.lock();
319 try {
320 long now = System.currentTimeMillis();
321 ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
322 while (it.hasNext()) {
323 LeaseRequest<T, C, E> request = it.next();
324 long deadline = request.getDeadline();
325 if (now > deadline) {
326 it.remove();
327 BasicFuture<E> future = request.getFuture();
328 future.failed(new TimeoutException());
329 }
330 }
331 } finally {
332 this.lock.unlock();
333 }
334 }
335
336 protected void requestCompleted(final SessionRequest request) {
337 if (this.isShutDown) {
338 return;
339 }
340 @SuppressWarnings("unchecked")
341 T route = (T) request.getAttachment();
342 this.lock.lock();
343 try {
344 this.pending.remove(request);
345 RouteSpecificPool<T, C, E> pool = getPool(route);
346 IOSession session = request.getSession();
347 try {
348 C conn = this.connFactory.create(route, session);
349 E entry = pool.createEntry(request, conn);
350 this.leased.add(entry);
351 pool.completed(request, entry);
352
353 } catch (IOException ex) {
354 pool.failed(request, ex);
355 }
356 } finally {
357 this.lock.unlock();
358 }
359 }
360
361 protected void requestCancelled(final SessionRequest request) {
362 if (this.isShutDown) {
363 return;
364 }
365 @SuppressWarnings("unchecked")
366 T route = (T) request.getAttachment();
367 this.lock.lock();
368 try {
369 this.pending.remove(request);
370 RouteSpecificPool<T, C, E> pool = getPool(route);
371 pool.cancelled(request);
372 processPendingRequests();
373 } finally {
374 this.lock.unlock();
375 }
376 }
377
378 protected void requestFailed(final SessionRequest request) {
379 if (this.isShutDown) {
380 return;
381 }
382 @SuppressWarnings("unchecked")
383 T route = (T) request.getAttachment();
384 this.lock.lock();
385 try {
386 this.pending.remove(request);
387 RouteSpecificPool<T, C, E> pool = getPool(route);
388 pool.failed(request, request.getException());
389 processPendingRequests();
390 } finally {
391 this.lock.unlock();
392 }
393 }
394
395 protected void requestTimeout(final SessionRequest request) {
396 if (this.isShutDown) {
397 return;
398 }
399 @SuppressWarnings("unchecked")
400 T route = (T) request.getAttachment();
401 this.lock.lock();
402 try {
403 this.pending.remove(request);
404 RouteSpecificPool<T, C, E> pool = getPool(route);
405 pool.timeout(request);
406 processPendingRequests();
407 } finally {
408 this.lock.unlock();
409 }
410 }
411
412 private int getMax(final T route) {
413 Integer v = this.maxPerRoute.get(route);
414 if (v != null) {
415 return v.intValue();
416 } else {
417 return this.defaultMaxPerRoute;
418 }
419 }
420
421 public void setMaxTotal(int max) {
422 if (max <= 0) {
423 throw new IllegalArgumentException("Max value may not be negative or zero");
424 }
425 this.lock.lock();
426 try {
427 this.maxTotal = max;
428 } finally {
429 this.lock.unlock();
430 }
431 }
432
433 public int getMaxTotal() {
434 this.lock.lock();
435 try {
436 return this.maxTotal;
437 } finally {
438 this.lock.unlock();
439 }
440 }
441
442 public void setDefaultMaxPerRoute(int max) {
443 if (max <= 0) {
444 throw new IllegalArgumentException("Max value may not be negative or zero");
445 }
446 this.lock.lock();
447 try {
448 this.defaultMaxPerRoute = max;
449 } finally {
450 this.lock.unlock();
451 }
452 }
453
454 public int getDefaultMaxPerRoute() {
455 this.lock.lock();
456 try {
457 return this.defaultMaxPerRoute;
458 } finally {
459 this.lock.unlock();
460 }
461 }
462
463 public void setMaxPerRoute(final T route, int max) {
464 if (route == null) {
465 throw new IllegalArgumentException("Route may not be null");
466 }
467 if (max <= 0) {
468 throw new IllegalArgumentException("Max value may not be negative or zero");
469 }
470 this.lock.lock();
471 try {
472 this.maxPerRoute.put(route, max);
473 } finally {
474 this.lock.unlock();
475 }
476 }
477
478 public int getMaxPerRoute(T route) {
479 if (route == null) {
480 throw new IllegalArgumentException("Route may not be null");
481 }
482 this.lock.lock();
483 try {
484 return getMax(route);
485 } finally {
486 this.lock.unlock();
487 }
488 }
489
490 public PoolStats getTotalStats() {
491 this.lock.lock();
492 try {
493 return new PoolStats(
494 this.leased.size(),
495 this.pending.size(),
496 this.available.size(),
497 this.maxTotal);
498 } finally {
499 this.lock.unlock();
500 }
501 }
502
503 public PoolStats getStats(final T route) {
504 if (route == null) {
505 throw new IllegalArgumentException("Route may not be null");
506 }
507 this.lock.lock();
508 try {
509 RouteSpecificPool<T, C, E> pool = getPool(route);
510 return new PoolStats(
511 pool.getLeasedCount(),
512 pool.getPendingCount(),
513 pool.getAvailableCount(),
514 getMax(route));
515 } finally {
516 this.lock.unlock();
517 }
518 }
519
520 public void closeIdle(long idletime, final TimeUnit tunit) {
521 if (tunit == null) {
522 throw new IllegalArgumentException("Time unit must not be null.");
523 }
524 long time = tunit.toMillis(idletime);
525 if (time < 0) {
526 time = 0;
527 }
528 long deadline = System.currentTimeMillis() - time;
529 this.lock.lock();
530 try {
531 Iterator<E> it = this.available.iterator();
532 while (it.hasNext()) {
533 E entry = it.next();
534 if (entry.getUpdated() <= deadline) {
535 entry.close();
536 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
537 pool.remove(entry);
538 it.remove();
539 }
540 }
541 processPendingRequests();
542 } finally {
543 this.lock.unlock();
544 }
545 }
546
547 public void closeExpired() {
548 long now = System.currentTimeMillis();
549 this.lock.lock();
550 try {
551 Iterator<E> it = this.available.iterator();
552 while (it.hasNext()) {
553 E entry = it.next();
554 if (entry.isExpired(now)) {
555 entry.close();
556 RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
557 pool.remove(entry);
558 it.remove();
559 }
560 }
561 processPendingRequests();
562 } finally {
563 this.lock.unlock();
564 }
565 }
566
567 @Override
568 public String toString() {
569 StringBuilder buffer = new StringBuilder();
570 buffer.append("[leased: ");
571 buffer.append(this.leased);
572 buffer.append("][available: ");
573 buffer.append(this.available);
574 buffer.append("][pending: ");
575 buffer.append(this.pending);
576 buffer.append("]");
577 return buffer.toString();
578 }
579
580 class InternalSessionRequestCallback implements SessionRequestCallback {
581
582 public void completed(final SessionRequest request) {
583 requestCompleted(request);
584 }
585
586 public void cancelled(final SessionRequest request) {
587 requestCancelled(request);
588 }
589
590 public void failed(final SessionRequest request) {
591 requestFailed(request);
592 }
593
594 public void timeout(final SessionRequest request) {
595 requestTimeout(request);
596 }
597
598 }
599
600 }