View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.client.integration;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.nio.ByteBuffer;
32  import java.util.concurrent.Future;
33  
34  import org.apache.http.HttpAsyncTestBase;
35  import org.apache.http.HttpEntity;
36  import org.apache.http.HttpException;
37  import org.apache.http.HttpHost;
38  import org.apache.http.HttpRequest;
39  import org.apache.http.HttpResponse;
40  import org.apache.http.HttpStatus;
41  import org.apache.http.client.UserTokenHandler;
42  import org.apache.http.client.methods.HttpGet;
43  import org.apache.http.config.ConnectionConfig;
44  import org.apache.http.entity.ContentType;
45  import org.apache.http.impl.DefaultConnectionReuseStrategy;
46  import org.apache.http.impl.DefaultHttpResponseFactory;
47  import org.apache.http.impl.nio.DefaultNHttpServerConnection;
48  import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
49  import org.apache.http.impl.nio.client.HttpAsyncClients;
50  import org.apache.http.impl.nio.conn.CPoolUtils;
51  import org.apache.http.nio.ContentDecoder;
52  import org.apache.http.nio.IOControl;
53  import org.apache.http.nio.NHttpClientConnection;
54  import org.apache.http.nio.NHttpConnectionFactory;
55  import org.apache.http.nio.client.HttpAsyncClient;
56  import org.apache.http.nio.entity.NStringEntity;
57  import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
58  import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
59  import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
60  import org.apache.http.nio.protocol.HttpAsyncExpectationVerifier;
61  import org.apache.http.nio.protocol.HttpAsyncRequestHandlerMapper;
62  import org.apache.http.nio.protocol.HttpAsyncService;
63  import org.apache.http.nio.protocol.UriHttpAsyncRequestHandlerMapper;
64  import org.apache.http.nio.reactor.IOEventDispatch;
65  import org.apache.http.nio.reactor.IOReactorStatus;
66  import org.apache.http.nio.reactor.ListenerEndpoint;
67  import org.apache.http.pool.PoolEntry;
68  import org.apache.http.protocol.BasicHttpContext;
69  import org.apache.http.protocol.HttpContext;
70  import org.apache.http.protocol.HttpRequestHandler;
71  import org.junit.After;
72  import org.junit.Assert;
73  import org.junit.Before;
74  import org.junit.Test;
75  
76  public class TestStatefulConnManagement extends HttpAsyncTestBase {
77  
78      @Before
79      public void setUp() throws Exception {
80          initServer();
81          initConnectionManager();
82      }
83  
84      @After
85      public void tearDown() throws Exception {
86          shutDownClient();
87          shutDownServer();
88      }
89  
90      @Override
91      protected NHttpConnectionFactory<DefaultNHttpServerConnection> createServerConnectionFactory(
92              final ConnectionConfig config) throws Exception {
93          return new DefaultNHttpServerConnectionFactory(config);
94      }
95  
96      @Override
97      protected String getSchemeName() {
98          return "http";
99      }
100 
101     private HttpHost start(
102             final HttpAsyncRequestHandlerMapper requestHandlerResolver,
103             final HttpAsyncExpectationVerifier expectationVerifier) throws Exception {
104         final HttpAsyncService serviceHandler = new HttpAsyncService(
105                 this.serverHttpProc,
106                 new DefaultConnectionReuseStrategy(),
107                 new DefaultHttpResponseFactory(),
108                 requestHandlerResolver,
109                 expectationVerifier);
110         this.server.start(serviceHandler);
111         this.httpclient.start();
112 
113         final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
114         endpoint.waitFor();
115 
116         Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
117         final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
118         return new HttpHost("localhost", address.getPort(), getSchemeName());
119     }
120 
121     static class SimpleService implements HttpRequestHandler {
122 
123         public SimpleService() {
124             super();
125         }
126 
127         public void handle(
128                 final HttpRequest request,
129                 final HttpResponse response,
130                 final HttpContext context) throws HttpException, IOException {
131             response.setStatusCode(HttpStatus.SC_OK);
132             final NStringEntity entity = new NStringEntity("Whatever");
133             response.setEntity(entity);
134         }
135     }
136 
137     @Test
138     public void testStatefulConnections() throws Exception {
139         final UriHttpAsyncRequestHandlerMapper registry = new UriHttpAsyncRequestHandlerMapper();
140         registry.register("*", new BasicAsyncRequestHandler(new SimpleService()));
141 
142         final UserTokenHandler userTokenHandler = new UserTokenHandler() {
143 
144             public Object getUserToken(final HttpContext context) {
145                 return context.getAttribute("user");
146             }
147 
148         };
149 
150         this.httpclient = HttpAsyncClients.custom()
151                 .setConnectionManager(this.connMgr)
152                 .setUserTokenHandler(userTokenHandler)
153                 .build();
154 
155         final HttpHost target = start(registry, null);
156 
157         final int workerCount = 2;
158         final int requestCount = 5;
159 
160         final HttpContext[] contexts = new HttpContext[workerCount];
161         final HttpWorker[] workers = new HttpWorker[workerCount];
162         for (int i = 0; i < contexts.length; i++) {
163             final HttpContext context = new BasicHttpContext();
164             final Object token = Integer.valueOf(i);
165             context.setAttribute("user", token);
166             contexts[i] = context;
167             workers[i] = new HttpWorker(context, requestCount, target, this.httpclient);
168         }
169 
170         for (final HttpWorker worker : workers) {
171             worker.start();
172         }
173         for (final HttpWorker worker : workers) {
174             worker.join(10000);
175         }
176         for (final HttpWorker worker : workers) {
177             final Exception ex = worker.getException();
178             if (ex != null) {
179                 throw ex;
180             }
181             Assert.assertEquals(requestCount, worker.getCount());
182         }
183 
184         for (final HttpContext context : contexts) {
185             final Integer id = (Integer) context.getAttribute("user");
186 
187             for (int r = 1; r < requestCount; r++) {
188                 final Integer state = (Integer) context.getAttribute("r" + r);
189                 Assert.assertEquals(id, state);
190             }
191         }
192 
193     }
194 
195     static class HttpWorker extends Thread {
196 
197         private final HttpContext context;
198         private final int requestCount;
199         private final HttpHost target;
200         private final HttpAsyncClient httpclient;
201 
202         private volatile Exception exception;
203         private volatile int count;
204 
205         public HttpWorker(
206                 final HttpContext context,
207                 final int requestCount,
208                 final HttpHost target,
209                 final HttpAsyncClient httpclient) {
210             super();
211             this.context = context;
212             this.requestCount = requestCount;
213             this.target = target;
214             this.httpclient = httpclient;
215             this.count = 0;
216         }
217 
218         public int getCount() {
219             return this.count;
220         }
221 
222         public Exception getException() {
223             return this.exception;
224         }
225 
226         @Override
227         public void run() {
228             try {
229                 for (int r = 0; r < this.requestCount; r++) {
230                     final HttpGet httpget = new HttpGet("/");
231                     final Future<Object> future = this.httpclient.execute(
232                             new BasicAsyncRequestProducer(this.target, httpget),
233                             new AbstractAsyncResponseConsumer<Object>() {
234 
235                                 @Override
236                                 protected void onResponseReceived(final HttpResponse response) {
237                                 }
238 
239                                 @Override
240                                 protected void onEntityEnclosed(
241                                         final HttpEntity entity,
242                                         final ContentType contentType) throws IOException {
243                                 }
244 
245                                 @Override
246                                 protected void onContentReceived(
247                                         final ContentDecoder decoder,
248                                         final IOControl ioctrl) throws IOException {
249                                     final ByteBuffer buf = ByteBuffer.allocate(2048);
250                                     decoder.read(buf);
251                                 }
252 
253                                 @Override
254                                 protected Object buildResult(final HttpContext context) throws Exception {
255                                     final NHttpClientConnection conn = (NHttpClientConnection) context.getAttribute(
256                                             IOEventDispatch.CONNECTION_KEY);
257 
258                                     final PoolEntry<?, ?> entry = CPoolUtils.getPoolEntry(conn);
259                                     return entry.getState();
260                                 }
261 
262                                 @Override
263                                 protected void releaseResources() {
264                                 }
265 
266                             },
267                             this.context,
268                             null);
269                     this.count++;
270                     final Object state = future.get();
271                     this.context.setAttribute("r" + r, state);
272                 }
273 
274             } catch (final Exception ex) {
275                 this.exception = ex;
276             }
277         }
278 
279     }
280 
281     @Test
282     public void testRouteSpecificPoolRecylcing() throws Exception {
283         // This tests what happens when a maxed connection pool needs
284         // to kill the last idle connection to a route to build a new
285         // one to the same route.
286         final UserTokenHandler userTokenHandler = new UserTokenHandler() {
287 
288             public Object getUserToken(final HttpContext context) {
289                 return context.getAttribute("user");
290             }
291 
292         };
293 
294         this.httpclient = HttpAsyncClients.custom()
295                 .setConnectionManager(this.connMgr)
296                 .setUserTokenHandler(userTokenHandler)
297                 .build();
298 
299         final int maxConn = 2;
300         // We build a client with 2 max active // connections, and 2 max per route.
301         this.connMgr.setMaxTotal(maxConn);
302         this.connMgr.setDefaultMaxPerRoute(maxConn);
303 
304         final UriHttpAsyncRequestHandlerMapper registry = new UriHttpAsyncRequestHandlerMapper();
305         registry.register("*", new BasicAsyncRequestHandler(new SimpleService()));
306 
307         final HttpHost target = start(registry, null);
308 
309         // Bottom of the pool : a *keep alive* connection to Route 1.
310         final HttpContext context1 = new BasicHttpContext();
311         context1.setAttribute("user", "stuff");
312 
313         final Future<HttpResponse> future1 = this.httpclient.execute(
314                 target, new HttpGet("/"), context1, null);
315         final HttpResponse response1 = future1.get();
316         Assert.assertNotNull(response1);
317         Assert.assertEquals(200, response1.getStatusLine().getStatusCode());
318 
319         // The ConnPoolByRoute now has 1 free connection, out of 2 max
320         // The ConnPoolByRoute has one RouteSpcfcPool, that has one free connection
321         // for [localhost][stuff]
322 
323         Thread.sleep(100);
324 
325         // Send a very simple HTTP get (it MUST be simple, no auth, no proxy, no 302, no 401, ...)
326         // Send it to another route. Must be a keepalive.
327         final HttpContext context2 = new BasicHttpContext();
328 
329         final Future<HttpResponse> future2 = this.httpclient.execute(
330                 new HttpHost("127.0.0.1", target.getPort(), target.getSchemeName()),
331                 new HttpGet("/"), context2, null);
332         final HttpResponse response2 = future2.get();
333         Assert.assertNotNull(response2);
334         Assert.assertEquals(200, response2.getStatusLine().getStatusCode());
335 
336         // ConnPoolByRoute now has 2 free connexions, out of its 2 max.
337         // The [localhost][stuff] RouteSpcfcPool is the same as earlier
338         // And there is a [127.0.0.1][null] pool with 1 free connection
339 
340         Thread.sleep(100);
341 
342         // This will put the ConnPoolByRoute to the targeted state :
343         // [localhost][stuff] will not get reused because this call is [localhost][null]
344         // So the ConnPoolByRoute will need to kill one connection (it is maxed out globally).
345         // The killed conn is the oldest, which means the first HTTPGet ([localhost][stuff]).
346         // When this happens, the RouteSpecificPool becomes empty.
347         final HttpContext context3 = new BasicHttpContext();
348         final Future<HttpResponse> future3 = this.httpclient.execute(
349                 target, new HttpGet("/"), context3, null);
350         final HttpResponse response3 = future3.get();
351         Assert.assertNotNull(response3);
352         Assert.assertEquals(200, response3.getStatusLine().getStatusCode());
353     }
354 
355 }