1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.conn.tsccm;
28
29 import java.io.IOException;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.Iterator;
33 import java.util.Queue;
34 import java.util.LinkedList;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.locks.Condition;
38 import java.util.concurrent.locks.Lock;
39 import java.util.concurrent.TimeUnit;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.http.conn.routing.HttpRoute;
44 import org.apache.http.conn.ClientConnectionOperator;
45 import org.apache.http.conn.ConnectionPoolTimeoutException;
46 import org.apache.http.conn.OperatedClientConnection;
47 import org.apache.http.conn.params.ConnPerRoute;
48 import org.apache.http.conn.params.ConnManagerParams;
49 import org.apache.http.params.HttpParams;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 @Deprecated
69 public class ConnPoolByRoute extends AbstractConnPool {
70
71 private final Log log = LogFactory.getLog(getClass());
72
73 private final Lock poolLock;
74
75
76 protected final ClientConnectionOperator operator;
77
78
79 protected final ConnPerRoute connPerRoute;
80
81
82 protected final Set<BasicPoolEntry> leasedConnections;
83
84
85 protected final Queue<BasicPoolEntry> freeConnections;
86
87
88 protected final Queue<WaitingThread> waitingThreads;
89
90
91 protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
92
93 private final long connTTL;
94
95 private final TimeUnit connTTLTimeUnit;
96
97 protected volatile boolean shutdown;
98
99 protected volatile int maxTotalConnections;
100
101 protected volatile int numConnections;
102
103
104
105
106
107
108 public ConnPoolByRoute(
109 final ClientConnectionOperator operator,
110 final ConnPerRoute connPerRoute,
111 int maxTotalConnections) {
112 this(operator, connPerRoute, maxTotalConnections, -1, TimeUnit.MILLISECONDS);
113 }
114
115
116
117
118 public ConnPoolByRoute(
119 final ClientConnectionOperator operator,
120 final ConnPerRoute connPerRoute,
121 int maxTotalConnections,
122 long connTTL,
123 final TimeUnit connTTLTimeUnit) {
124 super();
125 if (operator == null) {
126 throw new IllegalArgumentException("Connection operator may not be null");
127 }
128 if (connPerRoute == null) {
129 throw new IllegalArgumentException("Connections per route may not be null");
130 }
131 this.poolLock = super.poolLock;
132 this.leasedConnections = super.leasedConnections;
133 this.operator = operator;
134 this.connPerRoute = connPerRoute;
135 this.maxTotalConnections = maxTotalConnections;
136 this.freeConnections = createFreeConnQueue();
137 this.waitingThreads = createWaitingThreadQueue();
138 this.routeToPool = createRouteToPoolMap();
139 this.connTTL = connTTL;
140 this.connTTLTimeUnit = connTTLTimeUnit;
141 }
142
143 protected Lock getLock() {
144 return this.poolLock;
145 }
146
147
148
149
150
151
152 public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
153 this(operator,
154 ConnManagerParams.getMaxConnectionsPerRoute(params),
155 ConnManagerParams.getMaxTotalConnections(params));
156 }
157
158
159
160
161
162
163
164 protected Queue<BasicPoolEntry> createFreeConnQueue() {
165 return new LinkedList<BasicPoolEntry>();
166 }
167
168
169
170
171
172
173
174 protected Queue<WaitingThread> createWaitingThreadQueue() {
175 return new LinkedList<WaitingThread>();
176 }
177
178
179
180
181
182
183
184 protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
185 return new HashMap<HttpRoute, RouteSpecificPool>();
186 }
187
188
189
190
191
192
193
194
195
196
197 protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
198 return new RouteSpecificPool(route, this.connPerRoute);
199 }
200
201
202
203
204
205
206
207
208
209
210
211 protected WaitingThread newWaitingThread(Condition cond,
212 RouteSpecificPool rospl) {
213 return new WaitingThread(cond, rospl);
214 }
215
216 private void closeConnection(final BasicPoolEntry entry) {
217 OperatedClientConnection conn = entry.getConnection();
218 if (conn != null) {
219 try {
220 conn.close();
221 } catch (IOException ex) {
222 log.debug("I/O error closing connection", ex);
223 }
224 }
225 }
226
227
228
229
230
231
232
233
234
235
236 protected RouteSpecificPool getRoutePool(HttpRoute route,
237 boolean create) {
238 RouteSpecificPool rospl = null;
239 poolLock.lock();
240 try {
241
242 rospl = routeToPool.get(route);
243 if ((rospl == null) && create) {
244
245 rospl = newRouteSpecificPool(route);
246 routeToPool.put(route, rospl);
247 }
248
249 } finally {
250 poolLock.unlock();
251 }
252
253 return rospl;
254 }
255
256 public int getConnectionsInPool(HttpRoute route) {
257 poolLock.lock();
258 try {
259
260 RouteSpecificPool rospl = getRoutePool(route, false);
261 return (rospl != null) ? rospl.getEntryCount() : 0;
262
263 } finally {
264 poolLock.unlock();
265 }
266 }
267
268 public int getConnectionsInPool() {
269 poolLock.lock();
270 try {
271 return numConnections;
272 } finally {
273 poolLock.unlock();
274 }
275 }
276
277 @Override
278 public PoolEntryRequest requestPoolEntry(
279 final HttpRoute route,
280 final Object state) {
281
282 final WaitingThreadAborter aborter = new WaitingThreadAborter();
283
284 return new PoolEntryRequest() {
285
286 public void abortRequest() {
287 poolLock.lock();
288 try {
289 aborter.abort();
290 } finally {
291 poolLock.unlock();
292 }
293 }
294
295 public BasicPoolEntry getPoolEntry(
296 long timeout,
297 TimeUnit tunit)
298 throws InterruptedException, ConnectionPoolTimeoutException {
299 return getEntryBlocking(route, state, timeout, tunit, aborter);
300 }
301
302 };
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323 protected BasicPoolEntry getEntryBlocking(
324 HttpRoute route, Object state,
325 long timeout, TimeUnit tunit,
326 WaitingThreadAborter aborter)
327 throws ConnectionPoolTimeoutException, InterruptedException {
328
329 Date deadline = null;
330 if (timeout > 0) {
331 deadline = new Date
332 (System.currentTimeMillis() + tunit.toMillis(timeout));
333 }
334
335 BasicPoolEntry entry = null;
336 poolLock.lock();
337 try {
338
339 RouteSpecificPool rospl = getRoutePool(route, true);
340 WaitingThread waitingThread = null;
341
342 while (entry == null) {
343
344 if (shutdown) {
345 throw new IllegalStateException("Connection pool shut down");
346 }
347
348 if (log.isDebugEnabled()) {
349 log.debug("[" + route + "] total kept alive: " + freeConnections.size() +
350 ", total issued: " + leasedConnections.size() +
351 ", total allocated: " + numConnections + " out of " + maxTotalConnections);
352 }
353
354
355
356
357
358
359
360 entry = getFreeEntry(rospl, state);
361 if (entry != null) {
362 break;
363 }
364
365 boolean hasCapacity = rospl.getCapacity() > 0;
366
367 if (log.isDebugEnabled()) {
368 log.debug("Available capacity: " + rospl.getCapacity()
369 + " out of " + rospl.getMaxEntries()
370 + " [" + route + "][" + state + "]");
371 }
372
373 if (hasCapacity && numConnections < maxTotalConnections) {
374
375 entry = createEntry(rospl, operator);
376
377 } else if (hasCapacity && !freeConnections.isEmpty()) {
378
379 deleteLeastUsedEntry();
380
381
382 rospl = getRoutePool(route, true);
383 entry = createEntry(rospl, operator);
384
385 } else {
386
387 if (log.isDebugEnabled()) {
388 log.debug("Need to wait for connection" +
389 " [" + route + "][" + state + "]");
390 }
391
392 if (waitingThread == null) {
393 waitingThread =
394 newWaitingThread(poolLock.newCondition(), rospl);
395 aborter.setWaitingThread(waitingThread);
396 }
397
398 boolean success = false;
399 try {
400 rospl.queueThread(waitingThread);
401 waitingThreads.add(waitingThread);
402 success = waitingThread.await(deadline);
403
404 } finally {
405
406
407
408
409 rospl.removeThread(waitingThread);
410 waitingThreads.remove(waitingThread);
411 }
412
413
414 if (!success && (deadline != null) &&
415 (deadline.getTime() <= System.currentTimeMillis())) {
416 throw new ConnectionPoolTimeoutException
417 ("Timeout waiting for connection from pool");
418 }
419 }
420 }
421
422 } finally {
423 poolLock.unlock();
424 }
425 return entry;
426 }
427
428 @Override
429 public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
430
431 HttpRoute route = entry.getPlannedRoute();
432 if (log.isDebugEnabled()) {
433 log.debug("Releasing connection" +
434 " [" + route + "][" + entry.getState() + "]");
435 }
436
437 poolLock.lock();
438 try {
439 if (shutdown) {
440
441
442 closeConnection(entry);
443 return;
444 }
445
446
447 leasedConnections.remove(entry);
448
449 RouteSpecificPool rospl = getRoutePool(route, true);
450
451 if (reusable && rospl.getCapacity() >= 0) {
452 if (log.isDebugEnabled()) {
453 String s;
454 if (validDuration > 0) {
455 s = "for " + validDuration + " " + timeUnit;
456 } else {
457 s = "indefinitely";
458 }
459 log.debug("Pooling connection" +
460 " [" + route + "][" + entry.getState() + "]; keep alive " + s);
461 }
462 rospl.freeEntry(entry);
463 entry.updateExpiry(validDuration, timeUnit);
464 freeConnections.add(entry);
465 } else {
466 closeConnection(entry);
467 rospl.dropEntry();
468 numConnections--;
469 }
470
471 notifyWaitingThread(rospl);
472
473 } finally {
474 poolLock.unlock();
475 }
476 }
477
478
479
480
481
482
483
484
485
486 protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
487
488 BasicPoolEntry entry = null;
489 poolLock.lock();
490 try {
491 boolean done = false;
492 while(!done) {
493
494 entry = rospl.allocEntry(state);
495
496 if (entry != null) {
497 if (log.isDebugEnabled()) {
498 log.debug("Getting free connection"
499 + " [" + rospl.getRoute() + "][" + state + "]");
500
501 }
502 freeConnections.remove(entry);
503 if (entry.isExpired(System.currentTimeMillis())) {
504
505
506 if (log.isDebugEnabled())
507 log.debug("Closing expired free connection"
508 + " [" + rospl.getRoute() + "][" + state + "]");
509 closeConnection(entry);
510
511
512
513 rospl.dropEntry();
514 numConnections--;
515 } else {
516 leasedConnections.add(entry);
517 done = true;
518 }
519
520 } else {
521 done = true;
522 if (log.isDebugEnabled()) {
523 log.debug("No free connections"
524 + " [" + rospl.getRoute() + "][" + state + "]");
525 }
526 }
527 }
528 } finally {
529 poolLock.unlock();
530 }
531 return entry;
532 }
533
534
535
536
537
538
539
540
541
542
543
544
545 protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
546 ClientConnectionOperator op) {
547
548 if (log.isDebugEnabled()) {
549 log.debug("Creating new connection [" + rospl.getRoute() + "]");
550 }
551
552
553 BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), connTTL, connTTLTimeUnit);
554
555 poolLock.lock();
556 try {
557 rospl.createdEntry(entry);
558 numConnections++;
559 leasedConnections.add(entry);
560 } finally {
561 poolLock.unlock();
562 }
563
564 return entry;
565 }
566
567
568
569
570
571
572
573
574
575
576
577
578
579 protected void deleteEntry(BasicPoolEntry entry) {
580
581 HttpRoute route = entry.getPlannedRoute();
582
583 if (log.isDebugEnabled()) {
584 log.debug("Deleting connection"
585 + " [" + route + "][" + entry.getState() + "]");
586 }
587
588 poolLock.lock();
589 try {
590
591 closeConnection(entry);
592
593 RouteSpecificPool rospl = getRoutePool(route, true);
594 rospl.deleteEntry(entry);
595 numConnections--;
596 if (rospl.isUnused()) {
597 routeToPool.remove(route);
598 }
599
600 } finally {
601 poolLock.unlock();
602 }
603 }
604
605
606
607
608
609
610 protected void deleteLeastUsedEntry() {
611 poolLock.lock();
612 try {
613
614 BasicPoolEntry entry = freeConnections.remove();
615
616 if (entry != null) {
617 deleteEntry(entry);
618 } else if (log.isDebugEnabled()) {
619 log.debug("No free connection to delete");
620 }
621
622 } finally {
623 poolLock.unlock();
624 }
625 }
626
627 @Override
628 protected void handleLostEntry(HttpRoute route) {
629
630 poolLock.lock();
631 try {
632
633 RouteSpecificPool rospl = getRoutePool(route, true);
634 rospl.dropEntry();
635 if (rospl.isUnused()) {
636 routeToPool.remove(route);
637 }
638
639 numConnections--;
640 notifyWaitingThread(rospl);
641
642 } finally {
643 poolLock.unlock();
644 }
645 }
646
647
648
649
650
651
652
653
654
655 protected void notifyWaitingThread(RouteSpecificPool rospl) {
656
657
658
659
660
661
662 WaitingThread waitingThread = null;
663
664 poolLock.lock();
665 try {
666
667 if ((rospl != null) && rospl.hasThread()) {
668 if (log.isDebugEnabled()) {
669 log.debug("Notifying thread waiting on pool" +
670 " [" + rospl.getRoute() + "]");
671 }
672 waitingThread = rospl.nextThread();
673 } else if (!waitingThreads.isEmpty()) {
674 if (log.isDebugEnabled()) {
675 log.debug("Notifying thread waiting on any pool");
676 }
677 waitingThread = waitingThreads.remove();
678 } else if (log.isDebugEnabled()) {
679 log.debug("Notifying no-one, there are no waiting threads");
680 }
681
682 if (waitingThread != null) {
683 waitingThread.wakeup();
684 }
685
686 } finally {
687 poolLock.unlock();
688 }
689 }
690
691
692 @Override
693 public void deleteClosedConnections() {
694 poolLock.lock();
695 try {
696 Iterator<BasicPoolEntry> iter = freeConnections.iterator();
697 while (iter.hasNext()) {
698 BasicPoolEntry entry = iter.next();
699 if (!entry.getConnection().isOpen()) {
700 iter.remove();
701 deleteEntry(entry);
702 }
703 }
704 } finally {
705 poolLock.unlock();
706 }
707 }
708
709
710
711
712
713
714
715
716 @Override
717 public void closeIdleConnections(long idletime, TimeUnit tunit) {
718 if (tunit == null) {
719 throw new IllegalArgumentException("Time unit must not be null.");
720 }
721 if (idletime < 0) {
722 idletime = 0;
723 }
724 if (log.isDebugEnabled()) {
725 log.debug("Closing connections idle longer than " + idletime + " " + tunit);
726 }
727
728 long deadline = System.currentTimeMillis() - tunit.toMillis(idletime);
729 poolLock.lock();
730 try {
731 Iterator<BasicPoolEntry> iter = freeConnections.iterator();
732 while (iter.hasNext()) {
733 BasicPoolEntry entry = iter.next();
734 if (entry.getUpdated() <= deadline) {
735 if (log.isDebugEnabled()) {
736 log.debug("Closing connection last used @ " + new Date(entry.getUpdated()));
737 }
738 iter.remove();
739 deleteEntry(entry);
740 }
741 }
742 } finally {
743 poolLock.unlock();
744 }
745 }
746
747 @Override
748 public void closeExpiredConnections() {
749 log.debug("Closing expired connections");
750 long now = System.currentTimeMillis();
751
752 poolLock.lock();
753 try {
754 Iterator<BasicPoolEntry> iter = freeConnections.iterator();
755 while (iter.hasNext()) {
756 BasicPoolEntry entry = iter.next();
757 if (entry.isExpired(now)) {
758 if (log.isDebugEnabled()) {
759 log.debug("Closing connection expired @ " + new Date(entry.getExpiry()));
760 }
761 iter.remove();
762 deleteEntry(entry);
763 }
764 }
765 } finally {
766 poolLock.unlock();
767 }
768 }
769
770 @Override
771 public void shutdown() {
772 poolLock.lock();
773 try {
774 if (shutdown) {
775 return;
776 }
777 shutdown = true;
778
779
780 Iterator<BasicPoolEntry> iter1 = leasedConnections.iterator();
781 while (iter1.hasNext()) {
782 BasicPoolEntry entry = iter1.next();
783 iter1.remove();
784 closeConnection(entry);
785 }
786
787
788 Iterator<BasicPoolEntry> iter2 = freeConnections.iterator();
789 while (iter2.hasNext()) {
790 BasicPoolEntry entry = iter2.next();
791 iter2.remove();
792
793 if (log.isDebugEnabled()) {
794 log.debug("Closing connection"
795 + " [" + entry.getPlannedRoute() + "][" + entry.getState() + "]");
796 }
797 closeConnection(entry);
798 }
799
800
801 Iterator<WaitingThread> iwth = waitingThreads.iterator();
802 while (iwth.hasNext()) {
803 WaitingThread waiter = iwth.next();
804 iwth.remove();
805 waiter.wakeup();
806 }
807
808 routeToPool.clear();
809
810 } finally {
811 poolLock.unlock();
812 }
813 }
814
815
816
817
818 public void setMaxTotalConnections(int max) {
819 poolLock.lock();
820 try {
821 maxTotalConnections = max;
822 } finally {
823 poolLock.unlock();
824 }
825 }
826
827
828
829
830
831 public int getMaxTotalConnections() {
832 return maxTotalConnections;
833 }
834
835 }
836