1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.conn.tsccm;
28
29 import java.io.IOException;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.Iterator;
33 import java.util.LinkedList;
34 import java.util.Map;
35 import java.util.Queue;
36 import java.util.Set;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.locks.Condition;
39 import java.util.concurrent.locks.Lock;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.http.conn.ClientConnectionOperator;
44 import org.apache.http.conn.ConnectionPoolTimeoutException;
45 import org.apache.http.conn.OperatedClientConnection;
46 import org.apache.http.conn.params.ConnManagerParams;
47 import org.apache.http.conn.params.ConnPerRoute;
48 import org.apache.http.conn.routing.HttpRoute;
49 import org.apache.http.params.HttpParams;
50 import org.apache.http.util.Args;
51 import org.apache.http.util.Asserts;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @Deprecated
71 public class ConnPoolByRoute extends AbstractConnPool {
72
73 private final Log log = LogFactory.getLog(getClass());
74
75 private final Lock poolLock;
76
77
78 protected final ClientConnectionOperator operator;
79
80
81 protected final ConnPerRoute connPerRoute;
82
83
84 protected final Set<BasicPoolEntry> leasedConnections;
85
86
87 protected final Queue<BasicPoolEntry> freeConnections;
88
89
90 protected final Queue<WaitingThread> waitingThreads;
91
92
93 protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
94
95 private final long connTTL;
96
97 private final TimeUnit connTTLTimeUnit;
98
99 protected volatile boolean shutdown;
100
101 protected volatile int maxTotalConnections;
102
103 protected volatile int numConnections;
104
105
106
107
108
109
110 public ConnPoolByRoute(
111 final ClientConnectionOperator operator,
112 final ConnPerRoute connPerRoute,
113 final int maxTotalConnections) {
114 this(operator, connPerRoute, maxTotalConnections, -1, TimeUnit.MILLISECONDS);
115 }
116
117
118
119
120 public ConnPoolByRoute(
121 final ClientConnectionOperator operator,
122 final ConnPerRoute connPerRoute,
123 final int maxTotalConnections,
124 final long connTTL,
125 final TimeUnit connTTLTimeUnit) {
126 super();
127 Args.notNull(operator, "Connection operator");
128 Args.notNull(connPerRoute, "Connections per route");
129 this.poolLock = super.poolLock;
130 this.leasedConnections = super.leasedConnections;
131 this.operator = operator;
132 this.connPerRoute = connPerRoute;
133 this.maxTotalConnections = maxTotalConnections;
134 this.freeConnections = createFreeConnQueue();
135 this.waitingThreads = createWaitingThreadQueue();
136 this.routeToPool = createRouteToPoolMap();
137 this.connTTL = connTTL;
138 this.connTTLTimeUnit = connTTLTimeUnit;
139 }
140
141 protected Lock getLock() {
142 return this.poolLock;
143 }
144
145
146
147
148
149
150 @Deprecated
151 public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
152 this(operator,
153 ConnManagerParams.getMaxConnectionsPerRoute(params),
154 ConnManagerParams.getMaxTotalConnections(params));
155 }
156
157
158
159
160
161
162
163 protected Queue<BasicPoolEntry> createFreeConnQueue() {
164 return new LinkedList<BasicPoolEntry>();
165 }
166
167
168
169
170
171
172
173 protected Queue<WaitingThread> createWaitingThreadQueue() {
174 return new LinkedList<WaitingThread>();
175 }
176
177
178
179
180
181
182
183 protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
184 return new HashMap<HttpRoute, RouteSpecificPool>();
185 }
186
187
188
189
190
191
192
193
194
195
196 protected RouteSpecificPool newRouteSpecificPool(final HttpRoute route) {
197 return new RouteSpecificPool(route, this.connPerRoute);
198 }
199
200
201
202
203
204
205
206
207
208
209
210 protected WaitingThread newWaitingThread(final Condition cond,
211 final RouteSpecificPool rospl) {
212 return new WaitingThread(cond, rospl);
213 }
214
215 private void closeConnection(final BasicPoolEntry entry) {
216 final OperatedClientConnection conn = entry.getConnection();
217 if (conn != null) {
218 try {
219 conn.close();
220 } catch (final IOException ex) {
221 log.debug("I/O error closing connection", ex);
222 }
223 }
224 }
225
226
227
228
229
230
231
232
233
234
235 protected RouteSpecificPool getRoutePool(final HttpRoute route,
236 final boolean create) {
237 RouteSpecificPool rospl = null;
238 poolLock.lock();
239 try {
240
241 rospl = routeToPool.get(route);
242 if ((rospl == null) && create) {
243
244 rospl = newRouteSpecificPool(route);
245 routeToPool.put(route, rospl);
246 }
247
248 } finally {
249 poolLock.unlock();
250 }
251
252 return rospl;
253 }
254
255 public int getConnectionsInPool(final HttpRoute route) {
256 poolLock.lock();
257 try {
258
259 final RouteSpecificPool rospl = getRoutePool(route, false);
260 return (rospl != null) ? rospl.getEntryCount() : 0;
261
262 } finally {
263 poolLock.unlock();
264 }
265 }
266
267 public int getConnectionsInPool() {
268 poolLock.lock();
269 try {
270 return numConnections;
271 } finally {
272 poolLock.unlock();
273 }
274 }
275
276 @Override
277 public PoolEntryRequest requestPoolEntry(
278 final HttpRoute route,
279 final Object state) {
280
281 final WaitingThreadAborter aborter = new WaitingThreadAborter();
282
283 return new PoolEntryRequest() {
284
285 public void abortRequest() {
286 poolLock.lock();
287 try {
288 aborter.abort();
289 } finally {
290 poolLock.unlock();
291 }
292 }
293
294 public BasicPoolEntry getPoolEntry(
295 final long timeout,
296 final TimeUnit tunit)
297 throws InterruptedException, ConnectionPoolTimeoutException {
298 return getEntryBlocking(route, state, timeout, tunit, aborter);
299 }
300
301 };
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322 protected BasicPoolEntry getEntryBlocking(
323 final HttpRoute route, final Object state,
324 final long timeout, final TimeUnit tunit,
325 final WaitingThreadAborter aborter)
326 throws ConnectionPoolTimeoutException, InterruptedException {
327
328 Date deadline = null;
329 if (timeout > 0) {
330 deadline = new Date
331 (System.currentTimeMillis() + tunit.toMillis(timeout));
332 }
333
334 BasicPoolEntry entry = null;
335 poolLock.lock();
336 try {
337
338 RouteSpecificPool rospl = getRoutePool(route, true);
339 WaitingThread waitingThread = null;
340
341 while (entry == null) {
342 Asserts.check(!shutdown, "Connection pool shut down");
343
344 if (log.isDebugEnabled()) {
345 log.debug("[" + route + "] total kept alive: " + freeConnections.size() +
346 ", total issued: " + leasedConnections.size() +
347 ", total allocated: " + numConnections + " out of " + maxTotalConnections);
348 }
349
350
351
352
353
354
355
356 entry = getFreeEntry(rospl, state);
357 if (entry != null) {
358 break;
359 }
360
361 final boolean hasCapacity = rospl.getCapacity() > 0;
362
363 if (log.isDebugEnabled()) {
364 log.debug("Available capacity: " + rospl.getCapacity()
365 + " out of " + rospl.getMaxEntries()
366 + " [" + route + "][" + state + "]");
367 }
368
369 if (hasCapacity && numConnections < maxTotalConnections) {
370
371 entry = createEntry(rospl, operator);
372
373 } else if (hasCapacity && !freeConnections.isEmpty()) {
374
375 deleteLeastUsedEntry();
376
377
378 rospl = getRoutePool(route, true);
379 entry = createEntry(rospl, operator);
380
381 } else {
382
383 if (log.isDebugEnabled()) {
384 log.debug("Need to wait for connection" +
385 " [" + route + "][" + state + "]");
386 }
387
388 if (waitingThread == null) {
389 waitingThread =
390 newWaitingThread(poolLock.newCondition(), rospl);
391 aborter.setWaitingThread(waitingThread);
392 }
393
394 boolean success = false;
395 try {
396 rospl.queueThread(waitingThread);
397 waitingThreads.add(waitingThread);
398 success = waitingThread.await(deadline);
399
400 } finally {
401
402
403
404
405 rospl.removeThread(waitingThread);
406 waitingThreads.remove(waitingThread);
407 }
408
409
410 if (!success && (deadline != null) &&
411 (deadline.getTime() <= System.currentTimeMillis())) {
412 throw new ConnectionPoolTimeoutException
413 ("Timeout waiting for connection from pool");
414 }
415 }
416 }
417
418 } finally {
419 poolLock.unlock();
420 }
421 return entry;
422 }
423
424 @Override
425 public void freeEntry(final BasicPoolEntry entry, final boolean reusable, final long validDuration, final TimeUnit timeUnit) {
426
427 final HttpRoute route = entry.getPlannedRoute();
428 if (log.isDebugEnabled()) {
429 log.debug("Releasing connection" +
430 " [" + route + "][" + entry.getState() + "]");
431 }
432
433 poolLock.lock();
434 try {
435 if (shutdown) {
436
437
438 closeConnection(entry);
439 return;
440 }
441
442
443 leasedConnections.remove(entry);
444
445 final RouteSpecificPool rospl = getRoutePool(route, true);
446
447 if (reusable && rospl.getCapacity() >= 0) {
448 if (log.isDebugEnabled()) {
449 String s;
450 if (validDuration > 0) {
451 s = "for " + validDuration + " " + timeUnit;
452 } else {
453 s = "indefinitely";
454 }
455 log.debug("Pooling connection" +
456 " [" + route + "][" + entry.getState() + "]; keep alive " + s);
457 }
458 rospl.freeEntry(entry);
459 entry.updateExpiry(validDuration, timeUnit);
460 freeConnections.add(entry);
461 } else {
462 closeConnection(entry);
463 rospl.dropEntry();
464 numConnections--;
465 }
466
467 notifyWaitingThread(rospl);
468
469 } finally {
470 poolLock.unlock();
471 }
472 }
473
474
475
476
477
478
479
480
481
482 protected BasicPoolEntry getFreeEntry(final RouteSpecificPool rospl, final Object state) {
483
484 BasicPoolEntry entry = null;
485 poolLock.lock();
486 try {
487 boolean done = false;
488 while(!done) {
489
490 entry = rospl.allocEntry(state);
491
492 if (entry != null) {
493 if (log.isDebugEnabled()) {
494 log.debug("Getting free connection"
495 + " [" + rospl.getRoute() + "][" + state + "]");
496
497 }
498 freeConnections.remove(entry);
499 if (entry.isExpired(System.currentTimeMillis())) {
500
501
502 if (log.isDebugEnabled()) {
503 log.debug("Closing expired free connection"
504 + " [" + rospl.getRoute() + "][" + state + "]");
505 }
506 closeConnection(entry);
507
508
509
510 rospl.dropEntry();
511 numConnections--;
512 } else {
513 leasedConnections.add(entry);
514 done = true;
515 }
516
517 } else {
518 done = true;
519 if (log.isDebugEnabled()) {
520 log.debug("No free connections"
521 + " [" + rospl.getRoute() + "][" + state + "]");
522 }
523 }
524 }
525 } finally {
526 poolLock.unlock();
527 }
528 return entry;
529 }
530
531
532
533
534
535
536
537
538
539
540
541
542 protected BasicPoolEntry createEntry(final RouteSpecificPool rospl,
543 final ClientConnectionOperator op) {
544
545 if (log.isDebugEnabled()) {
546 log.debug("Creating new connection [" + rospl.getRoute() + "]");
547 }
548
549
550 final BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), connTTL, connTTLTimeUnit);
551
552 poolLock.lock();
553 try {
554 rospl.createdEntry(entry);
555 numConnections++;
556 leasedConnections.add(entry);
557 } finally {
558 poolLock.unlock();
559 }
560
561 return entry;
562 }
563
564
565
566
567
568
569
570
571
572
573
574
575
576 protected void deleteEntry(final BasicPoolEntry entry) {
577
578 final HttpRoute route = entry.getPlannedRoute();
579
580 if (log.isDebugEnabled()) {
581 log.debug("Deleting connection"
582 + " [" + route + "][" + entry.getState() + "]");
583 }
584
585 poolLock.lock();
586 try {
587
588 closeConnection(entry);
589
590 final RouteSpecificPool rospl = getRoutePool(route, true);
591 rospl.deleteEntry(entry);
592 numConnections--;
593 if (rospl.isUnused()) {
594 routeToPool.remove(route);
595 }
596
597 } finally {
598 poolLock.unlock();
599 }
600 }
601
602
603
604
605
606
607 protected void deleteLeastUsedEntry() {
608 poolLock.lock();
609 try {
610
611 final BasicPoolEntry entry = freeConnections.remove();
612
613 if (entry != null) {
614 deleteEntry(entry);
615 } else if (log.isDebugEnabled()) {
616 log.debug("No free connection to delete");
617 }
618
619 } finally {
620 poolLock.unlock();
621 }
622 }
623
624 @Override
625 protected void handleLostEntry(final HttpRoute route) {
626
627 poolLock.lock();
628 try {
629
630 final RouteSpecificPool rospl = getRoutePool(route, true);
631 rospl.dropEntry();
632 if (rospl.isUnused()) {
633 routeToPool.remove(route);
634 }
635
636 numConnections--;
637 notifyWaitingThread(rospl);
638
639 } finally {
640 poolLock.unlock();
641 }
642 }
643
644
645
646
647
648
649
650
651
652 protected void notifyWaitingThread(final RouteSpecificPool rospl) {
653
654
655
656
657
658
659 WaitingThread waitingThread = null;
660
661 poolLock.lock();
662 try {
663
664 if ((rospl != null) && rospl.hasThread()) {
665 if (log.isDebugEnabled()) {
666 log.debug("Notifying thread waiting on pool" +
667 " [" + rospl.getRoute() + "]");
668 }
669 waitingThread = rospl.nextThread();
670 } else if (!waitingThreads.isEmpty()) {
671 if (log.isDebugEnabled()) {
672 log.debug("Notifying thread waiting on any pool");
673 }
674 waitingThread = waitingThreads.remove();
675 } else if (log.isDebugEnabled()) {
676 log.debug("Notifying no-one, there are no waiting threads");
677 }
678
679 if (waitingThread != null) {
680 waitingThread.wakeup();
681 }
682
683 } finally {
684 poolLock.unlock();
685 }
686 }
687
688
689 @Override
690 public void deleteClosedConnections() {
691 poolLock.lock();
692 try {
693 final Iterator<BasicPoolEntry> iter = freeConnections.iterator();
694 while (iter.hasNext()) {
695 final BasicPoolEntry entry = iter.next();
696 if (!entry.getConnection().isOpen()) {
697 iter.remove();
698 deleteEntry(entry);
699 }
700 }
701 } finally {
702 poolLock.unlock();
703 }
704 }
705
706
707
708
709
710
711
712
713 @Override
714 public void closeIdleConnections(long idletime, final TimeUnit tunit) {
715 Args.notNull(tunit, "Time unit");
716 if (idletime < 0) {
717 idletime = 0;
718 }
719 if (log.isDebugEnabled()) {
720 log.debug("Closing connections idle longer than " + idletime + " " + tunit);
721 }
722
723 final long deadline = System.currentTimeMillis() - tunit.toMillis(idletime);
724 poolLock.lock();
725 try {
726 final Iterator<BasicPoolEntry> iter = freeConnections.iterator();
727 while (iter.hasNext()) {
728 final BasicPoolEntry entry = iter.next();
729 if (entry.getUpdated() <= deadline) {
730 if (log.isDebugEnabled()) {
731 log.debug("Closing connection last used @ " + new Date(entry.getUpdated()));
732 }
733 iter.remove();
734 deleteEntry(entry);
735 }
736 }
737 } finally {
738 poolLock.unlock();
739 }
740 }
741
742 @Override
743 public void closeExpiredConnections() {
744 log.debug("Closing expired connections");
745 final long now = System.currentTimeMillis();
746
747 poolLock.lock();
748 try {
749 final Iterator<BasicPoolEntry> iter = freeConnections.iterator();
750 while (iter.hasNext()) {
751 final BasicPoolEntry entry = iter.next();
752 if (entry.isExpired(now)) {
753 if (log.isDebugEnabled()) {
754 log.debug("Closing connection expired @ " + new Date(entry.getExpiry()));
755 }
756 iter.remove();
757 deleteEntry(entry);
758 }
759 }
760 } finally {
761 poolLock.unlock();
762 }
763 }
764
765 @Override
766 public void shutdown() {
767 poolLock.lock();
768 try {
769 if (shutdown) {
770 return;
771 }
772 shutdown = true;
773
774
775 final Iterator<BasicPoolEntry> iter1 = leasedConnections.iterator();
776 while (iter1.hasNext()) {
777 final BasicPoolEntry entry = iter1.next();
778 iter1.remove();
779 closeConnection(entry);
780 }
781
782
783 final Iterator<BasicPoolEntry> iter2 = freeConnections.iterator();
784 while (iter2.hasNext()) {
785 final BasicPoolEntry entry = iter2.next();
786 iter2.remove();
787
788 if (log.isDebugEnabled()) {
789 log.debug("Closing connection"
790 + " [" + entry.getPlannedRoute() + "][" + entry.getState() + "]");
791 }
792 closeConnection(entry);
793 }
794
795
796 final Iterator<WaitingThread> iwth = waitingThreads.iterator();
797 while (iwth.hasNext()) {
798 final WaitingThread waiter = iwth.next();
799 iwth.remove();
800 waiter.wakeup();
801 }
802
803 routeToPool.clear();
804
805 } finally {
806 poolLock.unlock();
807 }
808 }
809
810
811
812
813 public void setMaxTotalConnections(final int max) {
814 poolLock.lock();
815 try {
816 maxTotalConnections = max;
817 } finally {
818 poolLock.unlock();
819 }
820 }
821
822
823
824
825
826 public int getMaxTotalConnections() {
827 return maxTotalConnections;
828 }
829
830 }
831