View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.integration;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.Collection;
35  import java.util.List;
36  import java.util.Queue;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  
41  import org.apache.http.ConnectionClosedException;
42  import org.apache.http.HttpEntityEnclosingRequest;
43  import org.apache.http.HttpException;
44  import org.apache.http.HttpHeaders;
45  import org.apache.http.HttpHost;
46  import org.apache.http.HttpRequest;
47  import org.apache.http.HttpResponse;
48  import org.apache.http.HttpStatus;
49  import org.apache.http.HttpVersion;
50  import org.apache.http.entity.ContentType;
51  import org.apache.http.entity.StringEntity;
52  import org.apache.http.message.BasicHttpEntityEnclosingRequest;
53  import org.apache.http.message.BasicHttpRequest;
54  import org.apache.http.message.BasicHttpResponse;
55  import org.apache.http.nio.entity.NStringEntity;
56  import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
57  import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
58  import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
59  import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
60  import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
61  import org.apache.http.nio.protocol.HttpAsyncExchange;
62  import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
63  import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
64  import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
65  import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
66  import org.apache.http.nio.reactor.ListenerEndpoint;
67  import org.apache.http.nio.testserver.HttpCoreNIOTestBase;
68  import org.apache.http.protocol.HttpContext;
69  import org.apache.http.protocol.HttpProcessor;
70  import org.apache.http.protocol.HttpRequestHandler;
71  import org.apache.http.protocol.ImmutableHttpProcessor;
72  import org.apache.http.protocol.RequestConnControl;
73  import org.apache.http.protocol.RequestContent;
74  import org.apache.http.protocol.RequestTargetHost;
75  import org.apache.http.protocol.RequestUserAgent;
76  import org.apache.http.util.EntityUtils;
77  import org.junit.After;
78  import org.junit.Assert;
79  import org.junit.Before;
80  import org.junit.Test;
81  import org.junit.runner.RunWith;
82  import org.junit.runners.Parameterized;
83  
84  /**
85   * HttpCore NIO integration tests for pipelined request processing.
86   */
87  @RunWith(Parameterized.class)
88  public class TestHttpAsyncHandlersPipelining extends HttpCoreNIOTestBase {
89  
90      @Parameterized.Parameters(name = "{0}")
91      public static Collection<Object[]> protocols() {
92          return Arrays.asList(new Object[][]{
93                  {ProtocolScheme.http},
94                  {ProtocolScheme.https},
95          });
96      }
97  
98      public TestHttpAsyncHandlersPipelining(final ProtocolScheme scheme) {
99          super(scheme);
100     }
101 
102     public static final HttpProcessor DEFAULT_HTTP_PROC = new ImmutableHttpProcessor(
103             new RequestContent(),
104             new RequestTargetHost(),
105             new RequestConnControl(),
106             new RequestUserAgent("TEST-CLIENT/1.1"));
107 
108     @Before
109     public void setUp() throws Exception {
110         initServer();
111         initClient();
112     }
113 
114     @After
115     public void tearDown() throws Exception {
116         shutDownClient();
117         shutDownServer();
118     }
119 
120     private HttpHost start() throws Exception {
121         this.server.start();
122         this.client.setHttpProcessor(DEFAULT_HTTP_PROC);
123         this.client.start();
124 
125         final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
126         endpoint.waitFor();
127 
128         final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
129         return new HttpHost("localhost", address.getPort(), getScheme().name());
130     }
131 
132     private static String createRequestUri(final String pattern, final int count) {
133         return pattern + "x" + count;
134     }
135 
136     private static String createExpectedString(final String pattern, final int count) {
137         final StringBuilder buffer = new StringBuilder();
138         for (int i = 0; i < count; i++) {
139             buffer.append(pattern);
140         }
141         return buffer.toString();
142     }
143 
144     @Test
145     public void testHttpGets() throws Exception {
146         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
147         final HttpHost target = start();
148 
149         this.client.setMaxPerRoute(3);
150         this.client.setMaxTotal(3);
151 
152         final String pattern = RndTestPatternGenerator.generateText();
153         final int count = RndTestPatternGenerator.generateCount(1000);
154 
155         final String expectedPattern = createExpectedString(pattern, count);
156 
157         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
158         for (int i = 0; i < 10; i++) {
159             final String requestUri = createRequestUri(pattern, count);
160             final Future<List<HttpResponse>> future = this.client.executePipelined(target,
161                     new BasicHttpRequest("GET", requestUri),
162                     new BasicHttpRequest("GET", requestUri),
163                     new BasicHttpRequest("GET", requestUri));
164             queue.add(future);
165         }
166 
167         while (!queue.isEmpty()) {
168             final Future<List<HttpResponse>> future = queue.remove();
169             final List<HttpResponse> responses = future.get();
170             Assert.assertNotNull(responses);
171             Assert.assertEquals(3, responses.size());
172             for (final HttpResponse response: responses) {
173                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
174                 Assert.assertEquals(expectedPattern, EntityUtils.toString(response.getEntity()));
175             }
176         }
177     }
178 
179     @Test
180     public void testHttpHeads() throws Exception {
181         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
182         final HttpHost target = start();
183 
184         this.client.setMaxPerRoute(3);
185         this.client.setMaxTotal(3);
186 
187         final String pattern = RndTestPatternGenerator.generateText();
188         final int count = RndTestPatternGenerator.generateCount(1000);
189 
190         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
191         for (int i = 0; i < 10; i++) {
192             final String requestUri = createRequestUri(pattern, count);
193             final HttpRequest head1 = new BasicHttpRequest("HEAD", requestUri);
194             final HttpRequest head2 = new BasicHttpRequest("HEAD", requestUri);
195             final BasicHttpEntityEnclosingRequest post1 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
196             post1.setEntity(new NStringEntity("stuff", ContentType.TEXT_PLAIN));
197             final Future<List<HttpResponse>> future = this.client.executePipelined(target, head1, head2, post1);
198             queue.add(future);
199         }
200 
201         while (!queue.isEmpty()) {
202             final Future<List<HttpResponse>> future = queue.remove();
203             final List<HttpResponse> responses = future.get();
204             Assert.assertNotNull(responses);
205             Assert.assertEquals(3, responses.size());
206             for (final HttpResponse response: responses) {
207                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
208             }
209         }
210     }
211 
212     @Test
213     public void testHttpPosts() throws Exception {
214         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
215         final HttpHost target = start();
216 
217         this.client.setMaxPerRoute(3);
218         this.client.setMaxTotal(3);
219 
220         final String pattern = RndTestPatternGenerator.generateText();
221         final int count = RndTestPatternGenerator.generateCount(1000);
222 
223         final String expectedPattern = createExpectedString(pattern, count);
224 
225         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
226         for (int i = 0; i < 10; i++) {
227             final String requestUri = createRequestUri(pattern, count);
228             final HttpEntityEnclosingRequest request1 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
229             final NStringEntity entity1 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
230             entity1.setChunked(RndTestPatternGenerator.generateBoolean());
231             request1.setEntity(entity1);
232             final HttpEntityEnclosingRequest request2 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
233             final NStringEntity entity2 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
234             entity2.setChunked(RndTestPatternGenerator.generateBoolean());
235             request2.setEntity(entity2);
236             final HttpEntityEnclosingRequest request3 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
237             final NStringEntity entity3 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
238             entity3.setChunked(RndTestPatternGenerator.generateBoolean());
239             request3.setEntity(entity3);
240             final Future<List<HttpResponse>> future = this.client.executePipelined(target,
241                     request1, request2, request3);
242             queue.add(future);
243         }
244 
245         while (!queue.isEmpty()) {
246             final Future<List<HttpResponse>> future = queue.remove();
247             final List<HttpResponse> responses = future.get();
248             Assert.assertNotNull(responses);
249             Assert.assertEquals(3, responses.size());
250             for (final HttpResponse response: responses) {
251                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
252                 Assert.assertEquals(expectedPattern, EntityUtils.toString(response.getEntity()));
253             }
254         }
255     }
256 
257     @Test
258     public void testHttpDelayedResponse() throws Exception {
259 
260         class DelayedRequestHandler implements HttpAsyncRequestHandler<HttpRequest> {
261 
262             private final SimpleRequestHandler requestHandler;
263 
264             public DelayedRequestHandler() {
265                 super();
266                 this.requestHandler = new SimpleRequestHandler();
267             }
268 
269             @Override
270             public HttpAsyncRequestConsumer<HttpRequest> processRequest(
271                     final HttpRequest request,
272                     final HttpContext context) {
273                 return new BasicAsyncRequestConsumer();
274             }
275 
276             @Override
277             public void handle(
278                     final HttpRequest request,
279                     final HttpAsyncExchange httpexchange,
280                     final HttpContext context) throws HttpException, IOException {
281                 final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, HttpStatus.SC_OK, "OK");
282                 new Thread() {
283                     @Override
284                     public void run() {
285                         // Wait a bit, to make sure this is delayed.
286                         try { Thread.sleep(100); } catch(final InterruptedException ie) {}
287                         // Set the entity after delaying...
288                         try {
289                             requestHandler.handle(request, response, context);
290                         } catch (final Exception ex) {
291                             response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
292                         }
293                         httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
294                     }
295                 }.start();
296             }
297 
298         }
299 
300         this.server.registerHandler("*", new DelayedRequestHandler());
301         final HttpHost target = start();
302 
303         this.client.setMaxPerRoute(3);
304         this.client.setMaxTotal(3);
305 
306         final String pattern1 = RndTestPatternGenerator.generateText();
307         final String pattern2 = RndTestPatternGenerator.generateText();
308         final String pattern3 = RndTestPatternGenerator.generateText();
309         final int count = RndTestPatternGenerator.generateCount(1000);
310 
311         final String expectedPattern1 = createExpectedString(pattern1, count);
312         final String expectedPattern2 = createExpectedString(pattern2, count);
313         final String expectedPattern3 = createExpectedString(pattern3, count);
314 
315         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
316         for (int i = 0; i < 1; i++) {
317             final HttpRequest request1 = new BasicHttpRequest("GET", createRequestUri(pattern1, count));
318             final HttpEntityEnclosingRequest request2 = new BasicHttpEntityEnclosingRequest("POST",
319                     createRequestUri(pattern2, count));
320             final NStringEntity entity2 = new NStringEntity(expectedPattern2, ContentType.DEFAULT_TEXT);
321             entity2.setChunked(RndTestPatternGenerator.generateBoolean());
322             request2.setEntity(entity2);
323             final HttpEntityEnclosingRequest request3 = new BasicHttpEntityEnclosingRequest("POST",
324                     createRequestUri(pattern3, count));
325             final NStringEntity entity3 = new NStringEntity(expectedPattern3, ContentType.DEFAULT_TEXT);
326             entity3.setChunked(RndTestPatternGenerator.generateBoolean());
327             request3.setEntity(entity3);
328             final Future<List<HttpResponse>> future = this.client.executePipelined(target,
329                     request1, request2, request3);
330             queue.add(future);
331         }
332 
333         while (!queue.isEmpty()) {
334             final Future<List<HttpResponse>> future = queue.remove();
335             final List<HttpResponse> responses = future.get();
336             Assert.assertNotNull(responses);
337             Assert.assertEquals(3, responses.size());
338             for (final HttpResponse response: responses) {
339                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
340             }
341             Assert.assertEquals(expectedPattern1, EntityUtils.toString(responses.get(0).getEntity()));
342             Assert.assertEquals(expectedPattern2, EntityUtils.toString(responses.get(1).getEntity()));
343             Assert.assertEquals(expectedPattern3, EntityUtils.toString(responses.get(2).getEntity()));
344         }
345     }
346 
347     @Test
348     public void testUnexpectedConnectionClosure() throws Exception {
349         this.server.registerHandler("*", new BasicAsyncRequestHandler(new HttpRequestHandler() {
350 
351             @Override
352             public void handle(
353                     final HttpRequest request,
354                     final HttpResponse response,
355                     final HttpContext context) throws HttpException, IOException {
356                 response.setStatusCode(HttpStatus.SC_OK);
357                 response.setEntity(new StringEntity("all is well", ContentType.TEXT_PLAIN));
358             }
359 
360         }));
361         this.server.registerHandler("/boom", new BasicAsyncRequestHandler(new HttpRequestHandler() {
362 
363             @Override
364             public void handle(
365                     final HttpRequest request,
366                     final HttpResponse response,
367                     final HttpContext context) throws HttpException, IOException {
368                 response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
369                 response.setHeader(HttpHeaders.CONNECTION, "Close");
370                 response.setEntity(new StringEntity("boooooom!!!!!", ContentType.TEXT_PLAIN));
371             }
372 
373         }));
374         final HttpHost target = start();
375 
376         this.client.setMaxPerRoute(3);
377         this.client.setMaxTotal(3);
378 
379         for (int i = 0; i < 3; i++) {
380 
381             final HttpAsyncRequestProducer p1 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/"));
382             final HttpAsyncRequestProducer p2 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/"));
383             final HttpAsyncRequestProducer p3 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/boom"));
384             final List<HttpAsyncRequestProducer> requestProducers = new ArrayList<HttpAsyncRequestProducer>();
385             requestProducers.add(p1);
386             requestProducers.add(p2);
387             requestProducers.add(p3);
388 
389             final HttpAsyncResponseConsumer<HttpResponse> c1 = new BasicAsyncResponseConsumer();
390             final HttpAsyncResponseConsumer<HttpResponse> c2 = new BasicAsyncResponseConsumer();
391             final HttpAsyncResponseConsumer<HttpResponse> c3 = new BasicAsyncResponseConsumer();
392             final List<HttpAsyncResponseConsumer<HttpResponse>> responseConsumers = new ArrayList<HttpAsyncResponseConsumer<HttpResponse>>();
393             responseConsumers.add(c1);
394             responseConsumers.add(c2);
395             responseConsumers.add(c3);
396 
397             final Future<List<HttpResponse>> future = this.client.executePipelined(target, requestProducers, responseConsumers, null, null);
398             try {
399                 future.get();
400             } catch (final ExecutionException ex) {
401                 final Throwable cause = ex.getCause();
402                 Assert.assertTrue(cause instanceof ConnectionClosedException);
403             }
404 
405             Assert.assertTrue(c1.isDone());
406             Assert.assertNotNull(c1.getResult());
407             Assert.assertTrue(c2.isDone());
408             Assert.assertNotNull(c2.getResult());
409             Assert.assertTrue(c2.isDone());
410             Assert.assertNotNull(c3.getResult());
411         }
412     }
413 
414 }