View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.client.integration;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.concurrent.Future;
32  
33  import org.apache.http.localserver.HttpAsyncTestBase;
34  import org.apache.http.HttpEntity;
35  import org.apache.http.HttpException;
36  import org.apache.http.HttpHost;
37  import org.apache.http.HttpRequest;
38  import org.apache.http.HttpResponse;
39  import org.apache.http.HttpStatus;
40  import org.apache.http.client.UserTokenHandler;
41  import org.apache.http.client.methods.HttpGet;
42  import org.apache.http.entity.ContentType;
43  import org.apache.http.impl.nio.conn.CPoolUtils;
44  import org.apache.http.nio.ContentDecoder;
45  import org.apache.http.nio.IOControl;
46  import org.apache.http.nio.NHttpClientConnection;
47  import org.apache.http.nio.client.HttpAsyncClient;
48  import org.apache.http.nio.entity.NStringEntity;
49  import org.apache.http.nio.protocol.AbstractAsyncResponseConsumer;
50  import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
51  import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
52  import org.apache.http.nio.reactor.IOEventDispatch;
53  import org.apache.http.pool.PoolEntry;
54  import org.apache.http.protocol.BasicHttpContext;
55  import org.apache.http.protocol.HttpContext;
56  import org.apache.http.protocol.HttpRequestHandler;
57  import org.junit.Assert;
58  import org.junit.Test;
59  
60  public class TestStatefulConnManagement extends HttpAsyncTestBase {
61  
62      static class SimpleService implements HttpRequestHandler {
63  
64          public SimpleService() {
65              super();
66          }
67  
68          @Override
69          public void handle(
70                  final HttpRequest request,
71                  final HttpResponse response,
72                  final HttpContext context) throws HttpException, IOException {
73              response.setStatusCode(HttpStatus.SC_OK);
74              final NStringEntity entity = new NStringEntity("Whatever");
75              response.setEntity(entity);
76          }
77      }
78  
79      @Test
80      public void testStatefulConnections() throws Exception {
81          this.serverBootstrap.registerHandler("*", new BasicAsyncRequestHandler(new SimpleService()));
82  
83          final UserTokenHandler userTokenHandler = new UserTokenHandler() {
84  
85              @Override
86              public Object getUserToken(final HttpContext context) {
87                  return context.getAttribute("user");
88              }
89  
90          };
91          this.clientBuilder.setUserTokenHandler(userTokenHandler);
92          final HttpHost target = start();
93  
94          final int workerCount = 2;
95          final int requestCount = 5;
96  
97          final HttpContext[] contexts = new HttpContext[workerCount];
98          final HttpWorker[] workers = new HttpWorker[workerCount];
99          for (int i = 0; i < contexts.length; i++) {
100             final HttpContext context = new BasicHttpContext();
101             final Object token = Integer.valueOf(i);
102             context.setAttribute("user", token);
103             contexts[i] = context;
104             workers[i] = new HttpWorker(context, requestCount, target, this.httpclient);
105         }
106 
107         for (final HttpWorker worker : workers) {
108             worker.start();
109         }
110         for (final HttpWorker worker : workers) {
111             worker.join(10000);
112         }
113         for (final HttpWorker worker : workers) {
114             final Exception ex = worker.getException();
115             if (ex != null) {
116                 throw ex;
117             }
118             Assert.assertEquals(requestCount, worker.getCount());
119         }
120 
121         for (final HttpContext context : contexts) {
122             final Integer id = (Integer) context.getAttribute("user");
123 
124             for (int r = 1; r < requestCount; r++) {
125                 final Integer state = (Integer) context.getAttribute("r" + r);
126                 Assert.assertEquals(id, state);
127             }
128         }
129 
130     }
131 
132     static class HttpWorker extends Thread {
133 
134         private final HttpContext context;
135         private final int requestCount;
136         private final HttpHost target;
137         private final HttpAsyncClient httpclient;
138 
139         private volatile Exception exception;
140         private volatile int count;
141 
142         public HttpWorker(
143                 final HttpContext context,
144                 final int requestCount,
145                 final HttpHost target,
146                 final HttpAsyncClient httpclient) {
147             super();
148             this.context = context;
149             this.requestCount = requestCount;
150             this.target = target;
151             this.httpclient = httpclient;
152             this.count = 0;
153         }
154 
155         public int getCount() {
156             return this.count;
157         }
158 
159         public Exception getException() {
160             return this.exception;
161         }
162 
163         @Override
164         public void run() {
165             try {
166                 for (int r = 0; r < this.requestCount; r++) {
167                     final HttpGet httpget = new HttpGet("/");
168                     final Future<Object> future = this.httpclient.execute(
169                             new BasicAsyncRequestProducer(this.target, httpget),
170                             new AbstractAsyncResponseConsumer<Object>() {
171 
172                                 @Override
173                                 protected void onResponseReceived(final HttpResponse response) {
174                                 }
175 
176                                 @Override
177                                 protected void onEntityEnclosed(
178                                         final HttpEntity entity,
179                                         final ContentType contentType) throws IOException {
180                                 }
181 
182                                 @Override
183                                 protected void onContentReceived(
184                                         final ContentDecoder decoder,
185                                         final IOControl ioctrl) throws IOException {
186                                     final ByteBuffer buf = ByteBuffer.allocate(2048);
187                                     decoder.read(buf);
188                                 }
189 
190                                 @Override
191                                 protected Object buildResult(final HttpContext context) throws Exception {
192                                     final NHttpClientConnection conn = (NHttpClientConnection) context.getAttribute(
193                                             IOEventDispatch.CONNECTION_KEY);
194 
195                                     final PoolEntry<?, ?> entry = CPoolUtils.getPoolEntry(conn);
196                                     return entry.getState();
197                                 }
198 
199                                 @Override
200                                 protected void releaseResources() {
201                                 }
202 
203                             },
204                             this.context,
205                             null);
206                     this.count++;
207                     final Object state = future.get();
208                     this.context.setAttribute("r" + r, state);
209                 }
210 
211             } catch (final Exception ex) {
212                 this.exception = ex;
213             }
214         }
215 
216     }
217 
218     @Test
219     public void testRouteSpecificPoolRecylcing() throws Exception {
220         this.serverBootstrap.registerHandler("*", new BasicAsyncRequestHandler(new SimpleService()));
221         // This tests what happens when a maxed connection pool needs
222         // to kill the last idle connection to a route to build a new
223         // one to the same route.
224         final UserTokenHandler userTokenHandler = new UserTokenHandler() {
225 
226             @Override
227             public Object getUserToken(final HttpContext context) {
228                 return context.getAttribute("user");
229             }
230 
231         };
232         this.clientBuilder.setUserTokenHandler(userTokenHandler);
233 
234         final HttpHost target = start();
235         final int maxConn = 2;
236         // We build a client with 2 max active // connections, and 2 max per route.
237         this.connMgr.setMaxTotal(maxConn);
238         this.connMgr.setDefaultMaxPerRoute(maxConn);
239 
240         // Bottom of the pool : a *keep alive* connection to Route 1.
241         final HttpContext context1 = new BasicHttpContext();
242         context1.setAttribute("user", "stuff");
243 
244         final Future<HttpResponse> future1 = this.httpclient.execute(
245                 target, new HttpGet("/"), context1, null);
246         final HttpResponse response1 = future1.get();
247         Assert.assertNotNull(response1);
248         Assert.assertEquals(200, response1.getStatusLine().getStatusCode());
249 
250         // The ConnPoolByRoute now has 1 free connection, out of 2 max
251         // The ConnPoolByRoute has one RouteSpcfcPool, that has one free connection
252         // for [localhost][stuff]
253 
254         Thread.sleep(100);
255 
256         // Send a very simple HTTP get (it MUST be simple, no auth, no proxy, no 302, no 401, ...)
257         // Send it to another route. Must be a keepalive.
258         final HttpContext context2 = new BasicHttpContext();
259 
260         final Future<HttpResponse> future2 = this.httpclient.execute(
261                 new HttpHost("127.0.0.1", target.getPort(), target.getSchemeName()),
262                 new HttpGet("/"), context2, null);
263         final HttpResponse response2 = future2.get();
264         Assert.assertNotNull(response2);
265         Assert.assertEquals(200, response2.getStatusLine().getStatusCode());
266 
267         // ConnPoolByRoute now has 2 free connexions, out of its 2 max.
268         // The [localhost][stuff] RouteSpcfcPool is the same as earlier
269         // And there is a [127.0.0.1][null] pool with 1 free connection
270 
271         Thread.sleep(100);
272 
273         // This will put the ConnPoolByRoute to the targeted state :
274         // [localhost][stuff] will not get reused because this call is [localhost][null]
275         // So the ConnPoolByRoute will need to kill one connection (it is maxed out globally).
276         // The killed conn is the oldest, which means the first HTTPGet ([localhost][stuff]).
277         // When this happens, the RouteSpecificPool becomes empty.
278         final HttpContext context3 = new BasicHttpContext();
279         final Future<HttpResponse> future3 = this.httpclient.execute(
280                 target, new HttpGet("/"), context3, null);
281         final HttpResponse response3 = future3.get();
282         Assert.assertNotNull(response3);
283         Assert.assertEquals(200, response3.getStatusLine().getStatusCode());
284     }
285 
286 }