View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.ConnectException;
31  import java.net.InetSocketAddress;
32  import java.net.SocketAddress;
33  import java.net.UnknownHostException;
34  import java.util.Collections;
35  import java.util.concurrent.CancellationException;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.apache.http.concurrent.BasicFuture;
41  import org.apache.http.nio.reactor.ConnectingIOReactor;
42  import org.apache.http.nio.reactor.IOReactorStatus;
43  import org.apache.http.nio.reactor.IOSession;
44  import org.apache.http.nio.reactor.SessionRequest;
45  import org.apache.http.nio.reactor.SessionRequestCallback;
46  import org.apache.http.pool.PoolEntry;
47  import org.apache.http.pool.PoolStats;
48  import org.junit.Assert;
49  import org.junit.Test;
50  import org.mockito.Matchers;
51  import org.mockito.Mockito;
52  
53  public class TestNIOConnPool {
54  
55      static class LocalPoolEntry extends PoolEntry<String, IOSession> {
56  
57          private boolean closed;
58  
59          public LocalPoolEntry(final String route, final IOSession conn) {
60              super(null, route, conn);
61          }
62  
63          @Override
64          public void close() {
65              if (this.closed) {
66                  return;
67              }
68              this.closed = true;
69              getConnection().close();
70          }
71  
72          @Override
73          public boolean isClosed() {
74              return this.closed;
75          }
76  
77      }
78  
79      static class LocalConnFactory implements NIOConnFactory<String, IOSession> {
80  
81          @Override
82          public IOSession create(final String route, final IOSession session) throws IOException {
83              return session;
84          }
85  
86      }
87  
88      static class LocalAddressResolver implements SocketAddressResolver<String> {
89  
90          @Override
91          public SocketAddress resolveLocalAddress(final String route) {
92              return null;
93          }
94  
95          @Override
96          public SocketAddress resolveRemoteAddress(final String route) {
97              return InetSocketAddress.createUnresolved(route, 80);
98          }
99  
100     }
101 
102     static class LocalSessionPool extends AbstractNIOConnPool<String, IOSession, LocalPoolEntry> {
103 
104         public LocalSessionPool(
105                 final ConnectingIOReactor ioreactor, final int defaultMaxPerRoute, final int maxTotal) {
106             super(ioreactor, new LocalConnFactory(), new LocalAddressResolver(), defaultMaxPerRoute, maxTotal);
107         }
108 
109         public LocalSessionPool(
110                 final ConnectingIOReactor ioreactor,
111                 final SocketAddressResolver<String> addressResolver,
112                 final int defaultMaxPerRoute, final int maxTotal) {
113             super(ioreactor, new LocalConnFactory(), addressResolver, defaultMaxPerRoute, maxTotal);
114         }
115 
116         @Override
117         protected LocalPoolEntry createEntry(final String route, final IOSession session) {
118             return new LocalPoolEntry(route, session);
119         }
120 
121     }
122 
123     @Test
124     public void testEmptyPool() throws Exception {
125         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
126         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
127         final PoolStats totals = pool.getTotalStats();
128         Assert.assertEquals(0, totals.getAvailable());
129         Assert.assertEquals(0, totals.getLeased());
130         Assert.assertEquals(0, totals.getPending());
131         Assert.assertEquals(10, totals.getMax());
132         Assert.assertEquals(Collections.emptySet(), pool.getRoutes());
133         final PoolStats stats = pool.getStats("somehost");
134         Assert.assertEquals(0, stats.getAvailable());
135         Assert.assertEquals(0, stats.getLeased());
136         Assert.assertEquals(0, stats.getPending());
137         Assert.assertEquals(2, stats.getMax());
138         Assert.assertEquals("[leased: []][available: []][pending: []]", pool.toString());
139     }
140 
141     @Test
142     public void testInternalLeaseRequest() throws Exception {
143         final LeaseRequest<String, IOSession, LocalPoolEntry> leaseRequest =
144             new LeaseRequest<String, IOSession, LocalPoolEntry>("somehost", null, 0, 0,
145                     new BasicFuture<LocalPoolEntry>(null));
146         Assert.assertEquals("[somehost][null]", leaseRequest.toString());
147     }
148 
149     @Test
150     public void testInvalidConstruction() throws Exception {
151         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
152         try {
153             new LocalSessionPool(null, 1, 1);
154             Assert.fail("IllegalArgumentException should have been thrown");
155         } catch (final IllegalArgumentException expected) {
156         }
157         try {
158             new LocalSessionPool(ioreactor, -1, 1);
159             Assert.fail("IllegalArgumentException should have been thrown");
160         } catch (final IllegalArgumentException expected) {
161         }
162         try {
163             new LocalSessionPool(ioreactor, 1, -1);
164             Assert.fail("IllegalArgumentException should have been thrown");
165         } catch (final IllegalArgumentException expected) {
166         }
167     }
168 
169     @Test
170     public void testSuccessfulConnect() throws Exception {
171         final IOSession iosession = Mockito.mock(IOSession.class);
172         final SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
173         Mockito.when(sessionRequest.getAttachment()).thenReturn("somehost");
174         Mockito.when(sessionRequest.getSession()).thenReturn(iosession);
175         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
176         Mockito.when(ioreactor.connect(
177                 Matchers.any(SocketAddress.class),
178                 Matchers.any(SocketAddress.class),
179                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
180                 thenReturn(sessionRequest);
181         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
182         final Future<LocalPoolEntry> future = pool.lease("somehost", null, 100, TimeUnit.MILLISECONDS, null);
183         Mockito.verify(sessionRequest).setConnectTimeout(100);
184 
185         PoolStats totals = pool.getTotalStats();
186         Assert.assertEquals(0, totals.getAvailable());
187         Assert.assertEquals(0, totals.getLeased());
188         Assert.assertEquals(1, totals.getPending());
189 
190         pool.requestCompleted(sessionRequest);
191 
192         Assert.assertTrue(future.isDone());
193         Assert.assertFalse(future.isCancelled());
194         final LocalPoolEntry entry = future.get();
195         Assert.assertNotNull(entry);
196 
197         totals = pool.getTotalStats();
198         Assert.assertEquals(0, totals.getAvailable());
199         Assert.assertEquals(1, totals.getLeased());
200         Assert.assertEquals(0, totals.getPending());
201     }
202 
203     @Test
204     public void testFailedConnect() throws Exception {
205         final SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
206         Mockito.when(sessionRequest.getAttachment()).thenReturn("somehost");
207         Mockito.when(sessionRequest.getException()).thenReturn(new IOException());
208         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
209         Mockito.when(ioreactor.connect(
210                 Matchers.any(SocketAddress.class),
211                 Matchers.any(SocketAddress.class),
212                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
213                 thenReturn(sessionRequest);
214         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
215         final Future<LocalPoolEntry> future = pool.lease("somehost", null);
216 
217         PoolStats totals = pool.getTotalStats();
218         Assert.assertEquals(0, totals.getAvailable());
219         Assert.assertEquals(0, totals.getLeased());
220         Assert.assertEquals(1, totals.getPending());
221 
222         pool.requestFailed(sessionRequest);
223 
224         Assert.assertTrue(future.isDone());
225         Assert.assertFalse(future.isCancelled());
226         try {
227             future.get();
228             Assert.fail("ExecutionException should have been thrown");
229         } catch (final ExecutionException ex) {
230             Assert.assertTrue(ex.getCause() instanceof IOException);
231         }
232 
233         totals = pool.getTotalStats();
234         Assert.assertEquals(0, totals.getAvailable());
235         Assert.assertEquals(0, totals.getLeased());
236         Assert.assertEquals(0, totals.getPending());
237     }
238 
239     @Test
240     public void testCencelledConnect() throws Exception {
241         final IOSession iosession = Mockito.mock(IOSession.class);
242         final SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
243         Mockito.when(sessionRequest.getAttachment()).thenReturn("somehost");
244         Mockito.when(sessionRequest.getSession()).thenReturn(iosession);
245         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
246         Mockito.when(ioreactor.connect(
247                 Matchers.any(SocketAddress.class),
248                 Matchers.any(SocketAddress.class),
249                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
250                 thenReturn(sessionRequest);
251         Mockito.when(ioreactor.getStatus()).thenReturn(IOReactorStatus.ACTIVE);
252         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
253         final Future<LocalPoolEntry> future = pool.lease("somehost", null);
254 
255         PoolStats totals = pool.getTotalStats();
256         Assert.assertEquals(0, totals.getAvailable());
257         Assert.assertEquals(0, totals.getLeased());
258         Assert.assertEquals(1, totals.getPending());
259 
260         pool.requestCancelled(sessionRequest);
261 
262         Assert.assertTrue(future.isDone());
263         Assert.assertTrue(future.isCancelled());
264         try {
265             future.get();
266             Assert.fail("CancellationException expected");
267         } catch (final CancellationException ignore) {
268         }
269 
270         totals = pool.getTotalStats();
271         Assert.assertEquals(0, totals.getAvailable());
272         Assert.assertEquals(0, totals.getLeased());
273         Assert.assertEquals(0, totals.getPending());
274     }
275 
276     @Test
277     public void testTimeoutConnect() throws Exception {
278         final IOSession iosession = Mockito.mock(IOSession.class);
279         final SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
280         Mockito.when(sessionRequest.getAttachment()).thenReturn("somehost");
281         Mockito.when(sessionRequest.getSession()).thenReturn(iosession);
282         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
283         Mockito.when(ioreactor.connect(
284                 Matchers.any(SocketAddress.class),
285                 Matchers.any(SocketAddress.class),
286                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
287                 thenReturn(sessionRequest);
288         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
289         final Future<LocalPoolEntry> future = pool.lease("somehost", null);
290 
291         PoolStats totals = pool.getTotalStats();
292         Assert.assertEquals(0, totals.getAvailable());
293         Assert.assertEquals(0, totals.getLeased());
294         Assert.assertEquals(1, totals.getPending());
295 
296         pool.requestTimeout(sessionRequest);
297 
298         Assert.assertTrue(future.isDone());
299         Assert.assertFalse(future.isCancelled());
300         try {
301             future.get();
302             Assert.fail("ExecutionException should have been thrown");
303         } catch (final ExecutionException ex) {
304             Assert.assertTrue(ex.getCause() instanceof ConnectException);
305         }
306 
307         totals = pool.getTotalStats();
308         Assert.assertEquals(0, totals.getAvailable());
309         Assert.assertEquals(0, totals.getLeased());
310         Assert.assertEquals(0, totals.getPending());
311     }
312 
313     @Test
314     public void testConnectUnknownHost() throws Exception {
315         final SessionRequest sessionRequest = Mockito.mock(SessionRequest.class);
316         Mockito.when(sessionRequest.getAttachment()).thenReturn("somehost");
317         Mockito.when(sessionRequest.getException()).thenReturn(new IOException());
318         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
319         @SuppressWarnings("unchecked")
320         final SocketAddressResolver<String> addressResolver = Mockito.mock(SocketAddressResolver.class);
321         Mockito.when(addressResolver.resolveRemoteAddress("somehost")).thenThrow(new UnknownHostException());
322         final LocalSessionPool pool = new LocalSessionPool(ioreactor, addressResolver, 2, 10);
323         final Future<LocalPoolEntry> future = pool.lease("somehost", null);
324 
325         Assert.assertTrue(future.isDone());
326         Assert.assertFalse(future.isCancelled());
327         try {
328             future.get();
329             Assert.fail("ExecutionException should have been thrown");
330         } catch (final ExecutionException ex) {
331             Assert.assertTrue(ex.getCause() instanceof UnknownHostException);
332         }
333     }
334 
335     @Test
336     public void testLeaseRelease() throws Exception {
337         final IOSession iosession1 = Mockito.mock(IOSession.class);
338         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
339         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
340         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
341 
342         final IOSession iosession2 = Mockito.mock(IOSession.class);
343         final SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
344         Mockito.when(sessionRequest2.getAttachment()).thenReturn("otherhost");
345         Mockito.when(sessionRequest2.getSession()).thenReturn(iosession2);
346 
347         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
348         Mockito.when(ioreactor.connect(
349                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
350                 Matchers.any(SocketAddress.class),
351                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
352                 thenReturn(sessionRequest1);
353         Mockito.when(ioreactor.connect(
354                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
355                 Matchers.any(SocketAddress.class),
356                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
357                 thenReturn(sessionRequest2);
358 
359         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
360         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
361         pool.requestCompleted(sessionRequest1);
362         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
363         pool.requestCompleted(sessionRequest1);
364         final Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
365         pool.requestCompleted(sessionRequest2);
366 
367         final LocalPoolEntry entry1 = future1.get();
368         Assert.assertNotNull(entry1);
369         final LocalPoolEntry entry2 = future2.get();
370         Assert.assertNotNull(entry2);
371         final LocalPoolEntry entry3 = future3.get();
372         Assert.assertNotNull(entry3);
373 
374         pool.release(entry1, true);
375         pool.release(entry2, true);
376         pool.release(entry3, false);
377         Mockito.verify(iosession1, Mockito.never()).close();
378         Mockito.verify(iosession2, Mockito.times(1)).close();
379 
380         final PoolStats totals = pool.getTotalStats();
381         Assert.assertEquals(2, totals.getAvailable());
382         Assert.assertEquals(0, totals.getLeased());
383         Assert.assertEquals(0, totals.getPending());
384     }
385 
386     @Test
387     public void testLeaseIllegal() throws Exception {
388         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
389         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
390         try {
391             pool.lease(null, null, 0, TimeUnit.MILLISECONDS, null);
392             Assert.fail("IllegalArgumentException should have been thrown");
393         } catch (final IllegalArgumentException expected) {
394         }
395         try {
396             pool.lease("somehost", null, 0, null, null);
397             Assert.fail("IllegalArgumentException should have been thrown");
398         } catch (final IllegalArgumentException expected) {
399         }
400     }
401 
402     @Test
403     public void testReleaseUnknownEntry() throws Exception {
404         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
405         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
406         pool.release(new LocalPoolEntry("somehost", Mockito.mock(IOSession.class)), true);
407     }
408 
409     @Test
410     public void testMaxLimits() throws Exception {
411         final IOSession iosession1 = Mockito.mock(IOSession.class);
412         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
413         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
414         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
415 
416         final IOSession iosession2 = Mockito.mock(IOSession.class);
417         final SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
418         Mockito.when(sessionRequest2.getAttachment()).thenReturn("otherhost");
419         Mockito.when(sessionRequest2.getSession()).thenReturn(iosession2);
420 
421         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
422         Mockito.when(ioreactor.connect(
423                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
424                 Matchers.any(SocketAddress.class),
425                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
426                 thenReturn(sessionRequest1);
427         Mockito.when(ioreactor.connect(
428                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
429                 Matchers.any(SocketAddress.class),
430                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
431                 thenReturn(sessionRequest2);
432 
433         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
434         pool.setMaxPerRoute("somehost", 2);
435         pool.setMaxPerRoute("otherhost", 1);
436         pool.setMaxTotal(3);
437 
438         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
439         pool.requestCompleted(sessionRequest1);
440         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
441         pool.requestCompleted(sessionRequest1);
442         final Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
443         pool.requestCompleted(sessionRequest2);
444 
445         final LocalPoolEntry entry1 = future1.get();
446         Assert.assertNotNull(entry1);
447         final LocalPoolEntry entry2 = future2.get();
448         Assert.assertNotNull(entry2);
449         final LocalPoolEntry entry3 = future3.get();
450         Assert.assertNotNull(entry3);
451 
452         pool.release(entry1, true);
453         pool.release(entry2, true);
454         pool.release(entry3, true);
455 
456         final PoolStats totals = pool.getTotalStats();
457         Assert.assertEquals(3, totals.getAvailable());
458         Assert.assertEquals(0, totals.getLeased());
459         Assert.assertEquals(0, totals.getPending());
460 
461         final Future<LocalPoolEntry> future4 = pool.lease("somehost", null);
462         final Future<LocalPoolEntry> future5 = pool.lease("somehost", null);
463         final Future<LocalPoolEntry> future6 = pool.lease("otherhost", null);
464         final Future<LocalPoolEntry> future7 = pool.lease("somehost", null);
465         final Future<LocalPoolEntry> future8 = pool.lease("somehost", null);
466         final Future<LocalPoolEntry> future9 = pool.lease("otherhost", null);
467 
468         Assert.assertTrue(future4.isDone());
469         final LocalPoolEntry entry4 = future4.get();
470         Assert.assertNotNull(entry4);
471         Assert.assertTrue(future5.isDone());
472         final LocalPoolEntry entry5 = future5.get();
473         Assert.assertNotNull(entry5);
474         Assert.assertTrue(future6.isDone());
475         final LocalPoolEntry entry6 = future6.get();
476         Assert.assertNotNull(entry6);
477         Assert.assertFalse(future7.isDone());
478         Assert.assertFalse(future8.isDone());
479         Assert.assertFalse(future9.isDone());
480 
481         Mockito.verify(ioreactor, Mockito.times(3)).connect(
482                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
483                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
484 
485         pool.release(entry4, true);
486         pool.release(entry5, false);
487         pool.release(entry6, true);
488 
489         Assert.assertTrue(future7.isDone());
490         Assert.assertFalse(future8.isDone());
491         Assert.assertTrue(future9.isDone());
492 
493         Mockito.verify(ioreactor, Mockito.times(4)).connect(
494                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
495                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
496     }
497 
498     @Test
499     public void testConnectionRedistributionOnTotalMaxLimit() throws Exception {
500         final IOSession iosession1 = Mockito.mock(IOSession.class);
501         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
502         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
503         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
504 
505         final IOSession iosession2 = Mockito.mock(IOSession.class);
506         final SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
507         Mockito.when(sessionRequest2.getAttachment()).thenReturn("somehost");
508         Mockito.when(sessionRequest2.getSession()).thenReturn(iosession2);
509 
510         final IOSession iosession3 = Mockito.mock(IOSession.class);
511         final SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
512         Mockito.when(sessionRequest3.getAttachment()).thenReturn("otherhost");
513         Mockito.when(sessionRequest3.getSession()).thenReturn(iosession3);
514 
515         final IOSession iosession4 = Mockito.mock(IOSession.class);
516         final SessionRequest sessionRequest4 = Mockito.mock(SessionRequest.class);
517         Mockito.when(sessionRequest4.getAttachment()).thenReturn("otherhost");
518         Mockito.when(sessionRequest4.getSession()).thenReturn(iosession4);
519 
520         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
521         Mockito.when(ioreactor.connect(
522                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
523                 Matchers.any(SocketAddress.class),
524                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
525                 thenReturn(sessionRequest1, sessionRequest2, sessionRequest1);
526         Mockito.when(ioreactor.connect(
527                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
528                 Matchers.any(SocketAddress.class),
529                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
530                 thenReturn(sessionRequest3, sessionRequest4, sessionRequest3);
531 
532         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
533         pool.setMaxPerRoute("somehost", 2);
534         pool.setMaxPerRoute("otherhost", 2);
535         pool.setMaxTotal(2);
536 
537         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
538         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
539         final Future<LocalPoolEntry> future3 = pool.lease("otherhost", null);
540         final Future<LocalPoolEntry> future4 = pool.lease("otherhost", null);
541 
542         Mockito.verify(ioreactor, Mockito.times(2)).connect(
543                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
544                 Matchers.any(SocketAddress.class),
545                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
546 
547         Mockito.verify(ioreactor, Mockito.never()).connect(
548                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
549                 Matchers.any(SocketAddress.class),
550                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
551 
552         pool.requestCompleted(sessionRequest1);
553         pool.requestCompleted(sessionRequest2);
554 
555         Assert.assertTrue(future1.isDone());
556         final LocalPoolEntry entry1 = future1.get();
557         Assert.assertNotNull(entry1);
558         Assert.assertTrue(future2.isDone());
559         final LocalPoolEntry entry2 = future2.get();
560         Assert.assertNotNull(entry2);
561 
562         Assert.assertFalse(future3.isDone());
563         Assert.assertFalse(future4.isDone());
564 
565         PoolStats totals = pool.getTotalStats();
566         Assert.assertEquals(0, totals.getAvailable());
567         Assert.assertEquals(2, totals.getLeased());
568         Assert.assertEquals(0, totals.getPending());
569 
570         pool.release(entry1, true);
571         pool.release(entry2, true);
572 
573         Mockito.verify(ioreactor, Mockito.times(2)).connect(
574                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
575                 Matchers.any(SocketAddress.class),
576                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
577 
578         Mockito.verify(ioreactor, Mockito.times(2)).connect(
579                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
580                 Matchers.any(SocketAddress.class),
581                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
582 
583         pool.requestCompleted(sessionRequest3);
584         pool.requestCompleted(sessionRequest4);
585 
586         Assert.assertTrue(future3.isDone());
587         final LocalPoolEntry entry3 = future3.get();
588         Assert.assertNotNull(entry3);
589         Assert.assertTrue(future4.isDone());
590         final LocalPoolEntry entry4 = future4.get();
591         Assert.assertNotNull(entry4);
592 
593         totals = pool.getTotalStats();
594         Assert.assertEquals(0, totals.getAvailable());
595         Assert.assertEquals(2, totals.getLeased());
596         Assert.assertEquals(0, totals.getPending());
597 
598         final Future<LocalPoolEntry> future5 = pool.lease("somehost", null);
599         final Future<LocalPoolEntry> future6 = pool.lease("otherhost", null);
600 
601         Mockito.verify(ioreactor, Mockito.times(2)).connect(
602                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
603                 Matchers.any(SocketAddress.class),
604                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
605 
606         Mockito.verify(ioreactor, Mockito.times(2)).connect(
607                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
608                 Matchers.any(SocketAddress.class),
609                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
610 
611         pool.release(entry3, true);
612         pool.release(entry4, true);
613 
614         Mockito.verify(ioreactor, Mockito.times(3)).connect(
615                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
616                 Matchers.any(SocketAddress.class),
617                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
618 
619         Mockito.verify(ioreactor, Mockito.times(2)).connect(
620                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
621                 Matchers.any(SocketAddress.class),
622                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
623 
624         pool.requestCompleted(sessionRequest1);
625 
626         Assert.assertTrue(future5.isDone());
627         final LocalPoolEntry entry5 = future5.get();
628         Assert.assertNotNull(entry5);
629         Assert.assertTrue(future6.isDone());
630         final LocalPoolEntry entry6 = future6.get();
631         Assert.assertNotNull(entry6);
632 
633         totals = pool.getTotalStats();
634         Assert.assertEquals(0, totals.getAvailable());
635         Assert.assertEquals(2, totals.getLeased());
636         Assert.assertEquals(0, totals.getPending());
637 
638         pool.release(entry5, true);
639         pool.release(entry6, true);
640 
641         Mockito.verify(ioreactor, Mockito.times(3)).connect(
642                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
643                 Matchers.any(SocketAddress.class),
644                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
645 
646         Mockito.verify(ioreactor, Mockito.times(2)).connect(
647                 Matchers.eq(InetSocketAddress.createUnresolved("otherhost", 80)),
648                 Matchers.any(SocketAddress.class),
649                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
650 
651         totals = pool.getTotalStats();
652         Assert.assertEquals(2, totals.getAvailable());
653         Assert.assertEquals(0, totals.getLeased());
654         Assert.assertEquals(0, totals.getPending());
655     }
656 
657     @Test
658     public void testStatefulConnectionRedistributionOnPerRouteMaxLimit() throws Exception {
659         final IOSession iosession1 = Mockito.mock(IOSession.class);
660         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
661         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
662         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
663 
664         final IOSession iosession2 = Mockito.mock(IOSession.class);
665         final SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
666         Mockito.when(sessionRequest2.getAttachment()).thenReturn("somehost");
667         Mockito.when(sessionRequest2.getSession()).thenReturn(iosession2);
668 
669         final IOSession iosession3 = Mockito.mock(IOSession.class);
670         final SessionRequest sessionRequest3 = Mockito.mock(SessionRequest.class);
671         Mockito.when(sessionRequest3.getAttachment()).thenReturn("somehost");
672         Mockito.when(sessionRequest3.getSession()).thenReturn(iosession3);
673 
674         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
675         Mockito.when(ioreactor.connect(
676                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
677                 Matchers.any(SocketAddress.class),
678                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
679                 thenReturn(sessionRequest1, sessionRequest2, sessionRequest3);
680 
681         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 10);
682         pool.setMaxPerRoute("somehost", 2);
683         pool.setMaxTotal(2);
684 
685         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
686         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
687 
688         Mockito.verify(ioreactor, Mockito.times(2)).connect(
689                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
690                 Matchers.any(SocketAddress.class),
691                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
692 
693         pool.requestCompleted(sessionRequest1);
694         pool.requestCompleted(sessionRequest2);
695 
696         Assert.assertTrue(future1.isDone());
697         final LocalPoolEntry entry1 = future1.get();
698         Assert.assertNotNull(entry1);
699         Assert.assertTrue(future2.isDone());
700         final LocalPoolEntry entry2 = future2.get();
701         Assert.assertNotNull(entry2);
702 
703         PoolStats totals = pool.getTotalStats();
704         Assert.assertEquals(0, totals.getAvailable());
705         Assert.assertEquals(2, totals.getLeased());
706         Assert.assertEquals(0, totals.getPending());
707 
708         entry1.setState("some-stuff");
709         pool.release(entry1, true);
710         entry2.setState("some-stuff");
711         pool.release(entry2, true);
712 
713         final Future<LocalPoolEntry> future3 = pool.lease("somehost", "some-stuff");
714         final Future<LocalPoolEntry> future4 = pool.lease("somehost", "some-stuff");
715 
716         Assert.assertTrue(future1.isDone());
717         final LocalPoolEntry entry3 = future3.get();
718         Assert.assertNotNull(entry3);
719         Assert.assertTrue(future4.isDone());
720         final LocalPoolEntry entry4 = future4.get();
721         Assert.assertNotNull(entry4);
722 
723         Mockito.verify(ioreactor, Mockito.times(2)).connect(
724                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
725                 Matchers.any(SocketAddress.class),
726                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
727 
728         pool.release(entry3, true);
729         pool.release(entry4, true);
730 
731         totals = pool.getTotalStats();
732         Assert.assertEquals(2, totals.getAvailable());
733         Assert.assertEquals(0, totals.getLeased());
734         Assert.assertEquals(0, totals.getPending());
735 
736         final Future<LocalPoolEntry> future5 = pool.lease("somehost", "some-other-stuff");
737 
738         Assert.assertFalse(future5.isDone());
739 
740         Mockito.verify(ioreactor, Mockito.times(3)).connect(
741                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
742                 Matchers.any(SocketAddress.class),
743                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
744 
745         Mockito.verify(iosession2).close();
746         Mockito.verify(iosession1, Mockito.never()).close();
747 
748         totals = pool.getTotalStats();
749         Assert.assertEquals(1, totals.getAvailable());
750         Assert.assertEquals(0, totals.getLeased());
751         Assert.assertEquals(1, totals.getPending());
752     }
753 
754     @Test
755     public void testCreateNewIfExpired() throws Exception {
756         final IOSession iosession1 = Mockito.mock(IOSession.class);
757         Mockito.when(iosession1.isClosed()).thenReturn(Boolean.TRUE);
758         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
759         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
760         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
761 
762         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
763         Mockito.when(ioreactor.connect(
764                 Matchers.eq(InetSocketAddress.createUnresolved("somehost", 80)),
765                 Matchers.any(SocketAddress.class),
766                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
767                 thenReturn(sessionRequest1);
768 
769         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
770 
771         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
772 
773         Mockito.verify(ioreactor, Mockito.times(1)).connect(
774                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
775                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
776 
777         pool.requestCompleted(sessionRequest1);
778 
779         Assert.assertTrue(future1.isDone());
780         final LocalPoolEntry entry1 = future1.get();
781         Assert.assertNotNull(entry1);
782 
783         entry1.updateExpiry(1, TimeUnit.MILLISECONDS);
784         pool.release(entry1, true);
785 
786         Thread.sleep(200L);
787 
788         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
789 
790         Assert.assertFalse(future2.isDone());
791 
792         Mockito.verify(iosession1).close();
793         Mockito.verify(ioreactor, Mockito.times(2)).connect(
794                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
795                 Matchers.any(), Matchers.any(SessionRequestCallback.class));
796 
797         final PoolStats totals = pool.getTotalStats();
798         Assert.assertEquals(0, totals.getAvailable());
799         Assert.assertEquals(0, totals.getLeased());
800         Assert.assertEquals(1, totals.getPending());
801         Assert.assertEquals(Collections.singleton("somehost"), pool.getRoutes());
802         final PoolStats stats = pool.getStats("somehost");
803         Assert.assertEquals(0, stats.getAvailable());
804         Assert.assertEquals(0, stats.getLeased());
805         Assert.assertEquals(1, stats.getPending());
806     }
807 
808     @Test
809     public void testCloseExpired() throws Exception {
810         final IOSession iosession1 = Mockito.mock(IOSession.class);
811         Mockito.when(iosession1.isClosed()).thenReturn(Boolean.TRUE);
812         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
813         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
814         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
815 
816         final IOSession iosession2 = Mockito.mock(IOSession.class);
817         final SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
818         Mockito.when(sessionRequest2.getAttachment()).thenReturn("somehost");
819         Mockito.when(sessionRequest2.getSession()).thenReturn(iosession2);
820 
821         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
822         Mockito.when(ioreactor.connect(
823                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
824                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
825                 thenReturn(sessionRequest1, sessionRequest2);
826 
827         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
828 
829         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
830         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
831 
832         pool.requestCompleted(sessionRequest1);
833         pool.requestCompleted(sessionRequest2);
834 
835         Assert.assertTrue(future1.isDone());
836         final LocalPoolEntry entry1 = future1.get();
837         Assert.assertNotNull(entry1);
838         Assert.assertTrue(future2.isDone());
839         final LocalPoolEntry entry2 = future2.get();
840         Assert.assertNotNull(entry2);
841 
842         entry1.updateExpiry(1, TimeUnit.MILLISECONDS);
843         pool.release(entry1, true);
844 
845         Thread.sleep(200);
846 
847         entry2.updateExpiry(1000, TimeUnit.SECONDS);
848         pool.release(entry2, true);
849 
850         pool.closeExpired();
851 
852         Mockito.verify(iosession1).close();
853         Mockito.verify(iosession2, Mockito.never()).close();
854 
855         final PoolStats totals = pool.getTotalStats();
856         Assert.assertEquals(1, totals.getAvailable());
857         Assert.assertEquals(0, totals.getLeased());
858         Assert.assertEquals(0, totals.getPending());
859         final PoolStats stats = pool.getStats("somehost");
860         Assert.assertEquals(1, stats.getAvailable());
861         Assert.assertEquals(0, stats.getLeased());
862         Assert.assertEquals(0, stats.getPending());
863     }
864 
865     @Test
866     public void testCloseIdle() throws Exception {
867         final IOSession iosession1 = Mockito.mock(IOSession.class);
868         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
869         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
870         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
871 
872         final IOSession iosession2 = Mockito.mock(IOSession.class);
873         final SessionRequest sessionRequest2 = Mockito.mock(SessionRequest.class);
874         Mockito.when(sessionRequest2.getAttachment()).thenReturn("somehost");
875         Mockito.when(sessionRequest2.getSession()).thenReturn(iosession2);
876 
877         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
878         Mockito.when(ioreactor.connect(
879                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
880                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
881                 thenReturn(sessionRequest1, sessionRequest2);
882 
883         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
884 
885         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null);
886         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null);
887 
888         pool.requestCompleted(sessionRequest1);
889         pool.requestCompleted(sessionRequest2);
890 
891         Assert.assertTrue(future1.isDone());
892         final LocalPoolEntry entry1 = future1.get();
893         Assert.assertNotNull(entry1);
894         Assert.assertTrue(future2.isDone());
895         final LocalPoolEntry entry2 = future2.get();
896         Assert.assertNotNull(entry2);
897 
898         entry1.updateExpiry(0, TimeUnit.MILLISECONDS);
899         pool.release(entry1, true);
900 
901         Thread.sleep(200L);
902 
903         entry2.updateExpiry(0, TimeUnit.MILLISECONDS);
904         pool.release(entry2, true);
905 
906         pool.closeIdle(50, TimeUnit.MILLISECONDS);
907 
908         Mockito.verify(iosession1).close();
909         Mockito.verify(iosession2, Mockito.never()).close();
910 
911         PoolStats totals = pool.getTotalStats();
912         Assert.assertEquals(1, totals.getAvailable());
913         Assert.assertEquals(0, totals.getLeased());
914         Assert.assertEquals(0, totals.getPending());
915         PoolStats stats = pool.getStats("somehost");
916         Assert.assertEquals(1, stats.getAvailable());
917         Assert.assertEquals(0, stats.getLeased());
918         Assert.assertEquals(0, stats.getPending());
919 
920         pool.closeIdle(-1, TimeUnit.MILLISECONDS);
921 
922         Mockito.verify(iosession2).close();
923 
924         totals = pool.getTotalStats();
925         Assert.assertEquals(0, totals.getAvailable());
926         Assert.assertEquals(0, totals.getLeased());
927         Assert.assertEquals(0, totals.getPending());
928         stats = pool.getStats("somehost");
929         Assert.assertEquals(0, stats.getAvailable());
930         Assert.assertEquals(0, stats.getLeased());
931         Assert.assertEquals(0, stats.getPending());
932     }
933 
934     @Test
935     public void testLeaseRequestTimeout() throws Exception {
936         final IOSession iosession1 = Mockito.mock(IOSession.class);
937         Mockito.when(iosession1.isClosed()).thenReturn(Boolean.TRUE);
938         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
939         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
940         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
941 
942         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
943         Mockito.when(ioreactor.connect(
944                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
945                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
946                 thenReturn(sessionRequest1);
947 
948         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 1, 1);
949 
950         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, null);
951         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, null);
952         final Future<LocalPoolEntry> future3 = pool.lease("somehost", null, 10, TimeUnit.MILLISECONDS, null);
953 
954         pool.requestCompleted(sessionRequest1);
955 
956         Assert.assertTrue(future1.isDone());
957         final LocalPoolEntry entry1 = future1.get();
958         Assert.assertNotNull(entry1);
959         Assert.assertFalse(future2.isDone());
960         Assert.assertFalse(future3.isDone());
961 
962         Thread.sleep(100);
963 
964         pool.validatePendingRequests();
965 
966         Assert.assertFalse(future2.isDone());
967         Assert.assertTrue(future3.isDone());
968     }
969 
970     @Test(expected=IllegalArgumentException.class)
971     public void testCloseIdleInvalid() throws Exception {
972         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
973         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
974         pool.closeIdle(50, null);
975     }
976 
977     @Test(expected=IllegalArgumentException.class)
978     public void testGetStatsInvalid() throws Exception {
979         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
980         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
981         pool.getStats(null);
982     }
983 
984     @Test
985     public void testSetMaxInvalid() throws Exception {
986         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
987         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
988         try {
989             pool.setMaxTotal(-1);
990             Assert.fail("IllegalArgumentException should have been thrown");
991         } catch (final IllegalArgumentException expected) {
992         }
993         try {
994             pool.setMaxPerRoute(null, 1);
995             Assert.fail("IllegalArgumentException should have been thrown");
996         } catch (final IllegalArgumentException expected) {
997         }
998         try {
999             pool.setMaxPerRoute("somehost", -1);
1000             Assert.fail("IllegalArgumentException should have been thrown");
1001         } catch (final IllegalArgumentException expected) {
1002         }
1003         try {
1004             pool.setDefaultMaxPerRoute(-1);
1005             Assert.fail("IllegalArgumentException should have been thrown");
1006         } catch (final IllegalArgumentException expected) {
1007         }
1008     }
1009 
1010     @Test
1011     public void testShutdown() throws Exception {
1012         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
1013         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 2, 2);
1014         pool.shutdown(1000);
1015         Mockito.verify(ioreactor, Mockito.times(1)).shutdown(1000);
1016         pool.shutdown(1000);
1017         Mockito.verify(ioreactor, Mockito.times(1)).shutdown(1000);
1018         try {
1019             pool.lease("somehost", null);
1020             Assert.fail("IllegalStateException should have been thrown");
1021         } catch (final IllegalStateException expected) {
1022         }
1023         // Ignored if shut down
1024         pool.release(new LocalPoolEntry("somehost", Mockito.mock(IOSession.class)), true);
1025         pool.requestCompleted(Mockito.mock(SessionRequest.class));
1026         pool.requestFailed(Mockito.mock(SessionRequest.class));
1027         pool.requestCancelled(Mockito.mock(SessionRequest.class));
1028         pool.requestTimeout(Mockito.mock(SessionRequest.class));
1029     }
1030 
1031     @Test
1032     public void testLeaseRequestCanceled() throws Exception {
1033         final IOSession iosession1 = Mockito.mock(IOSession.class);
1034         Mockito.when(iosession1.isClosed()).thenReturn(Boolean.TRUE);
1035         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
1036         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
1037         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
1038 
1039         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
1040         Mockito.when(ioreactor.connect(
1041                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
1042                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
1043                 thenReturn(sessionRequest1);
1044         Mockito.when(ioreactor.getStatus()).thenReturn(IOReactorStatus.ACTIVE);
1045 
1046         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 1, 1);
1047 
1048         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, null);
1049         future1.cancel(true);
1050 
1051         pool.requestCompleted(sessionRequest1);
1052 
1053         Assert.assertTrue(future1.isDone());
1054         try {
1055             future1.get();
1056             Assert.fail("CancellationException expected");
1057         } catch (final CancellationException ignore) {
1058         }
1059 
1060         final PoolStats totals = pool.getTotalStats();
1061         Assert.assertEquals(1, totals.getAvailable());
1062         Assert.assertEquals(0, totals.getLeased());
1063     }
1064 
1065     @Test
1066     public void testLeaseRequestCanceledWhileConnecting() throws Exception {
1067         final IOSession iosession1 = Mockito.mock(IOSession.class);
1068         Mockito.when(iosession1.isClosed()).thenReturn(Boolean.TRUE);
1069         final SessionRequest sessionRequest1 = Mockito.mock(SessionRequest.class);
1070         Mockito.when(sessionRequest1.getAttachment()).thenReturn("somehost");
1071         Mockito.when(sessionRequest1.getSession()).thenReturn(iosession1);
1072 
1073         final ConnectingIOReactor ioreactor = Mockito.mock(ConnectingIOReactor.class);
1074         Mockito.when(ioreactor.connect(
1075                 Matchers.any(SocketAddress.class), Matchers.any(SocketAddress.class),
1076                 Matchers.any(), Matchers.any(SessionRequestCallback.class))).
1077                 thenReturn(sessionRequest1);
1078         Mockito.when(ioreactor.getStatus()).thenReturn(IOReactorStatus.ACTIVE);
1079 
1080         final LocalSessionPool pool = new LocalSessionPool(ioreactor, 1, 1);
1081 
1082         final Future<LocalPoolEntry> future1 = pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, null);
1083 
1084         pool.requestCompleted(sessionRequest1);
1085 
1086         Assert.assertTrue(future1.isDone());
1087         final LocalPoolEntry entry1 = future1.get();
1088         Assert.assertNotNull(entry1);
1089 
1090         final Future<LocalPoolEntry> future2 = pool.lease("somehost", null, 0, TimeUnit.MILLISECONDS, null);
1091         future2.cancel(true);
1092 
1093         pool.release(entry1, true);
1094 
1095         final PoolStats totals = pool.getTotalStats();
1096         Assert.assertEquals(1, totals.getAvailable());
1097         Assert.assertEquals(0, totals.getLeased());
1098     }
1099 
1100 }