View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.integration;
29  
30  import java.io.ByteArrayOutputStream;
31  import java.io.IOException;
32  import java.io.InputStream;
33  import java.io.UnsupportedEncodingException;
34  import java.net.InetSocketAddress;
35  import java.nio.charset.Charset;
36  import java.util.LinkedList;
37  import java.util.Queue;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.ExecutorService;
40  import java.util.concurrent.Executors;
41  
42  import org.apache.http.HttpCoreNIOTestBase;
43  import org.apache.http.HttpEntity;
44  import org.apache.http.HttpEntityEnclosingRequest;
45  import org.apache.http.HttpException;
46  import org.apache.http.HttpRequest;
47  import org.apache.http.HttpRequestInterceptor;
48  import org.apache.http.HttpResponse;
49  import org.apache.http.HttpResponseInterceptor;
50  import org.apache.http.HttpStatus;
51  import org.apache.http.HttpVersion;
52  import org.apache.http.LoggingClientConnectionFactory;
53  import org.apache.http.LoggingServerConnectionFactory;
54  import org.apache.http.entity.ContentType;
55  import org.apache.http.entity.StringEntity;
56  import org.apache.http.impl.DefaultConnectionReuseStrategy;
57  import org.apache.http.impl.DefaultHttpResponseFactory;
58  import org.apache.http.impl.nio.DefaultNHttpClientConnection;
59  import org.apache.http.impl.nio.DefaultNHttpServerConnection;
60  import org.apache.http.message.BasicHttpEntityEnclosingRequest;
61  import org.apache.http.message.BasicHttpRequest;
62  import org.apache.http.nio.NHttpConnectionFactory;
63  import org.apache.http.nio.entity.NByteArrayEntity;
64  import org.apache.http.nio.entity.NStringEntity;
65  import org.apache.http.nio.protocol.HttpRequestExecutionHandler;
66  import org.apache.http.nio.protocol.ThrottlingHttpClientHandler;
67  import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler;
68  import org.apache.http.nio.reactor.IOReactorStatus;
69  import org.apache.http.nio.reactor.ListenerEndpoint;
70  import org.apache.http.nio.reactor.SessionRequest;
71  import org.apache.http.params.CoreProtocolPNames;
72  import org.apache.http.params.HttpParams;
73  import org.apache.http.protocol.HTTP;
74  import org.apache.http.protocol.HttpContext;
75  import org.apache.http.protocol.HttpExpectationVerifier;
76  import org.apache.http.protocol.HttpProcessor;
77  import org.apache.http.protocol.HttpRequestHandler;
78  import org.apache.http.protocol.ImmutableHttpProcessor;
79  import org.apache.http.protocol.RequestConnControl;
80  import org.apache.http.protocol.RequestContent;
81  import org.apache.http.protocol.RequestExpectContinue;
82  import org.apache.http.protocol.RequestTargetHost;
83  import org.apache.http.protocol.RequestUserAgent;
84  import org.apache.http.protocol.ResponseConnControl;
85  import org.apache.http.protocol.ResponseContent;
86  import org.apache.http.protocol.ResponseDate;
87  import org.apache.http.protocol.ResponseServer;
88  import org.apache.http.testserver.SimpleEventListener;
89  import org.apache.http.testserver.SimpleHttpRequestHandlerResolver;
90  import org.apache.http.util.EncodingUtils;
91  import org.junit.After;
92  import org.junit.Assert;
93  import org.junit.Before;
94  import org.junit.Test;
95  
96  /**
97   * HttpCore NIO integration tests using throttling versions of the
98   * protocol handlers.
99   */
100 @Deprecated
101 public class TestThrottlingNHttpHandlers extends HttpCoreNIOTestBase {
102 
103     @Before
104     public void setUp() throws Exception {
105         initServer();
106         initClient();
107     }
108 
109     @After
110     public void tearDown() throws Exception {
111         shutDownClient();
112         shutDownServer();
113     }
114 
115     @Override
116     protected NHttpConnectionFactory<DefaultNHttpServerConnection> createServerConnectionFactory(
117             final HttpParams params) throws Exception {
118         return new LoggingServerConnectionFactory(params);
119     }
120 
121     @Override
122     protected NHttpConnectionFactory<DefaultNHttpClientConnection> createClientConnectionFactory(
123             final HttpParams params) throws Exception {
124         return new LoggingClientConnectionFactory(params);
125     }
126 
127     private ExecutorService execService;
128 
129     @Before
130     public void initExecService() throws Exception {
131         this.execService = Executors.newCachedThreadPool();
132     }
133 
134     @After
135     public void shutDownExecService() {
136         this.execService.shutdownNow();
137     }
138 
139     private void executeStandardTest(
140             final HttpRequestHandler requestHandler,
141             final HttpRequestExecutionHandler requestExecutionHandler) throws Exception {
142         int connNo = 3;
143         int reqNo = 20;
144         Job[] jobs = new Job[connNo * reqNo];
145         for (int i = 0; i < jobs.length; i++) {
146             jobs[i] = new Job();
147         }
148         Queue<Job> queue = new ConcurrentLinkedQueue<Job>();
149         for (int i = 0; i < jobs.length; i++) {
150             queue.add(jobs[i]);
151         }
152 
153         ThrottlingHttpServiceHandler serviceHandler = new ThrottlingHttpServiceHandler(
154                 this.serverHttpProc,
155                 new DefaultHttpResponseFactory(),
156                 new DefaultConnectionReuseStrategy(),
157                 this.execService,
158                 this.serverParams);
159 
160         serviceHandler.setHandlerResolver(
161                 new SimpleHttpRequestHandlerResolver(requestHandler));
162         serviceHandler.setEventListener(
163                 new SimpleEventListener());
164 
165         ThrottlingHttpClientHandler clientHandler = new ThrottlingHttpClientHandler(
166                 this.clientHttpProc,
167                 requestExecutionHandler,
168                 new DefaultConnectionReuseStrategy(),
169                 this.execService,
170                 this.clientParams);
171 
172         clientHandler.setEventListener(
173                 new SimpleEventListener());
174 
175         this.server.start(serviceHandler);
176         this.client.start(clientHandler);
177 
178         ListenerEndpoint endpoint = this.server.getListenerEndpoint();
179         endpoint.waitFor();
180         InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress();
181 
182         Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
183 
184         Queue<SessionRequest> connRequests = new LinkedList<SessionRequest>();
185         for (int i = 0; i < connNo; i++) {
186             SessionRequest sessionRequest = this.client.openConnection(
187                     new InetSocketAddress("localhost", serverAddress.getPort()),
188                     queue);
189             connRequests.add(sessionRequest);
190         }
191 
192         while (!connRequests.isEmpty()) {
193             SessionRequest sessionRequest = connRequests.remove();
194             sessionRequest.waitFor();
195             if (sessionRequest.getException() != null) {
196                 throw sessionRequest.getException();
197             }
198             Assert.assertNotNull(sessionRequest.getSession());
199         }
200 
201         Assert.assertEquals("Test client status", IOReactorStatus.ACTIVE, this.client.getStatus());
202 
203         for (int i = 0; i < jobs.length; i++) {
204             Job testjob = jobs[i];
205             testjob.waitFor();
206             if (testjob.isSuccessful()) {
207                 Assert.assertEquals(HttpStatus.SC_OK, testjob.getStatusCode());
208                 Assert.assertEquals(testjob.getExpected(), testjob.getResult());
209             } else {
210                 Assert.fail(testjob.getFailureMessage());
211             }
212         }
213     }
214 
215     /**
216      * This test case executes a series of simple (non-pipelined) GET requests
217      * over multiple connections.
218      */
219     @Test
220     public void testSimpleHttpGets() throws Exception {
221         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
222 
223             @Override
224             protected HttpRequest generateRequest(Job testjob) {
225                 String s = testjob.getPattern() + "x" + testjob.getCount();
226                 return new BasicHttpRequest("GET", s);
227             }
228 
229         };
230         executeStandardTest(new RequestHandler(), requestExecutionHandler);
231     }
232 
233     /**
234      * This test case executes a series of simple (non-pipelined) POST requests
235      * with content length delimited content over multiple connections.
236      */
237     @Test
238     public void testSimpleHttpPostsWithContentLength() throws Exception {
239         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
240 
241             @Override
242             protected HttpRequest generateRequest(Job testjob) {
243                 String s = testjob.getPattern() + "x" + testjob.getCount();
244                 HttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest("POST", s);
245                 NStringEntity entity = null;
246                 try {
247                     entity = new NStringEntity(testjob.getExpected(), "US-ASCII");
248                     entity.setChunked(false);
249                 } catch (UnsupportedEncodingException ignore) {
250                 }
251                 r.setEntity(entity);
252                 return r;
253             }
254 
255         };
256         executeStandardTest(new RequestHandler(), requestExecutionHandler);
257     }
258 
259     /**
260      * This test case executes a series of simple (non-pipelined) POST requests
261      * with chunk coded content content over multiple connections.
262      */
263     @Test
264     public void testSimpleHttpPostsChunked() throws Exception {
265         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
266 
267             @Override
268             protected HttpRequest generateRequest(Job testjob) {
269                 String s = testjob.getPattern() + "x" + testjob.getCount();
270                 HttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest("POST", s);
271                 NStringEntity entity = null;
272                 try {
273                     entity = new NStringEntity(testjob.getExpected(), "US-ASCII");
274                     entity.setChunked(true);
275                 } catch (UnsupportedEncodingException ignore) {
276                 }
277                 r.setEntity(entity);
278                 return r;
279             }
280 
281         };
282         executeStandardTest(new RequestHandler(), requestExecutionHandler);
283     }
284 
285     /**
286      * This test case executes a series of simple (non-pipelined) HTTP/1.0
287      * POST requests over multiple persistent connections.
288      */
289     @Test
290     public void testSimpleHttpPostsHTTP10() throws Exception {
291         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
292 
293             @Override
294             protected HttpRequest generateRequest(Job testjob) {
295                 String s = testjob.getPattern() + "x" + testjob.getCount();
296                 HttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest("POST", s,
297                         HttpVersion.HTTP_1_0);
298                 NStringEntity entity = null;
299                 try {
300                     entity = new NStringEntity(testjob.getExpected(), "US-ASCII");
301                 } catch (UnsupportedEncodingException ignore) {
302                 }
303                 r.setEntity(entity);
304                 return r;
305             }
306 
307         };
308         executeStandardTest(new RequestHandler(), requestExecutionHandler);
309     }
310 
311     /**
312      * This test case executes a series of simple (non-pipelined) POST requests
313      * over multiple connections using the 'expect: continue' handshake.
314      */
315     @Test
316     public void testHttpPostsWithExpectContinue() throws Exception {
317         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
318 
319             @Override
320             protected HttpRequest generateRequest(Job testjob) {
321                 String s = testjob.getPattern() + "x" + testjob.getCount();
322                 HttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest("POST", s);
323                 NStringEntity entity = null;
324                 try {
325                     entity = new NStringEntity(testjob.getExpected(), "US-ASCII");
326                 } catch (UnsupportedEncodingException ignore) {
327                 }
328                 r.setEntity(entity);
329                 r.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true);
330                 return r;
331             }
332 
333         };
334         executeStandardTest(new RequestHandler(), requestExecutionHandler);
335     }
336 
337     /**
338      * This test case executes a series of simple (non-pipelined) POST requests
339      * over multiple connections that do not meet the target server expectations.
340      */
341     @Test
342     public void testHttpPostsWithExpectationVerification() throws Exception {
343         Job[] jobs = new Job[3];
344         jobs[0] = new Job("AAAAA", 10);
345         jobs[1] = new Job("AAAAA", 10);
346         jobs[2] = new Job("BBBBB", 20);
347         Queue<Job> queue = new ConcurrentLinkedQueue<Job>();
348         for (int i = 0; i < jobs.length; i++) {
349             queue.add(jobs[i]);
350         }
351 
352         HttpExpectationVerifier expectationVerifier = new HttpExpectationVerifier() {
353 
354             public void verify(
355                     final HttpRequest request,
356                     final HttpResponse response,
357                     final HttpContext context) throws HttpException {
358                 String s = request.getRequestLine().getUri();
359                 if (!s.equals("AAAAAx10")) {
360                     response.setStatusCode(HttpStatus.SC_EXPECTATION_FAILED);
361                     NByteArrayEntity outgoing = new NByteArrayEntity(
362                             EncodingUtils.getAsciiBytes("Expectation failed"));
363                     response.setEntity(outgoing);
364                 }
365             }
366 
367         };
368 
369         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
370 
371             @Override
372             protected HttpRequest generateRequest(Job testjob) {
373                 String s = testjob.getPattern() + "x" + testjob.getCount();
374                 HttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest("POST", s);
375                 NStringEntity entity = null;
376                 try {
377                     entity = new NStringEntity(testjob.getExpected(), "US-ASCII");
378                 } catch (UnsupportedEncodingException ignore) {
379                 }
380                 r.setEntity(entity);
381                 r.getParams().setBooleanParameter(CoreProtocolPNames.USE_EXPECT_CONTINUE, true);
382                 return r;
383             }
384 
385         };
386 
387         HttpProcessor serverHttpProc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
388                 new ResponseDate(),
389                 new ResponseServer(),
390                 new ResponseContent(),
391                 new ResponseConnControl()
392         });
393 
394         ThrottlingHttpServiceHandler serviceHandler = new ThrottlingHttpServiceHandler(
395                 serverHttpProc,
396                 new DefaultHttpResponseFactory(),
397                 new DefaultConnectionReuseStrategy(),
398                 this.execService,
399                 this.serverParams);
400 
401         serviceHandler.setHandlerResolver(
402                 new SimpleHttpRequestHandlerResolver(new RequestHandler()));
403         serviceHandler.setExpectationVerifier(
404                 expectationVerifier);
405         serviceHandler.setEventListener(
406                 new SimpleEventListener());
407 
408         HttpProcessor clientHttpProc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
409                 new RequestContent(),
410                 new RequestTargetHost(),
411                 new RequestConnControl(),
412                 new RequestUserAgent(),
413                 new RequestExpectContinue()});
414 
415         ThrottlingHttpClientHandler clientHandler = new ThrottlingHttpClientHandler(
416                 clientHttpProc,
417                 requestExecutionHandler,
418                 new DefaultConnectionReuseStrategy(),
419                 this.execService,
420                 this.clientParams);
421 
422         clientHandler.setEventListener(
423                 new SimpleEventListener());
424 
425         this.server.start(serviceHandler);
426         this.client.start(clientHandler);
427 
428         ListenerEndpoint endpoint = this.server.getListenerEndpoint();
429         endpoint.waitFor();
430         InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress();
431 
432         Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
433 
434         SessionRequest sessionRequest = this.client.openConnection(
435                 new InetSocketAddress("localhost", serverAddress.getPort()),
436                 queue);
437 
438         sessionRequest.waitFor();
439         if (sessionRequest.getException() != null) {
440             throw sessionRequest.getException();
441         }
442         Assert.assertNotNull(sessionRequest.getSession());
443 
444         Assert.assertEquals("Test client status", IOReactorStatus.ACTIVE, this.client.getStatus());
445 
446         for (int i = 0; i < 2; i++) {
447             Job testjob = jobs[i];
448             testjob.waitFor();
449             if (testjob.isSuccessful()) {
450                 Assert.assertEquals(testjob.getExpected(), testjob.getResult());
451             } else {
452                 Assert.fail(testjob.getFailureMessage());
453             }
454         }
455         Job failedExpectation = jobs[2];
456         failedExpectation.waitFor();
457         Assert.assertEquals(HttpStatus.SC_EXPECTATION_FAILED, failedExpectation.getStatusCode());
458     }
459 
460     /**
461      * This test case executes a series of simple (non-pipelined) HEAD requests
462      * over multiple connections.
463      */
464     @Test
465     public void testSimpleHttpHeads() throws Exception {
466         int connNo = 3;
467         int reqNo = 20;
468         Job[] jobs = new Job[connNo * reqNo];
469         for (int i = 0; i < jobs.length; i++) {
470             jobs[i] = new Job();
471         }
472         Queue<Job> queue = new ConcurrentLinkedQueue<Job>();
473         for (int i = 0; i < jobs.length; i++) {
474             queue.add(jobs[i]);
475         }
476 
477         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
478 
479             @Override
480             protected HttpRequest generateRequest(Job testjob) {
481                 String s = testjob.getPattern() + "x" + testjob.getCount();
482                 return new BasicHttpRequest("HEAD", s);
483             }
484 
485         };
486 
487         HttpProcessor serverHttpProc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
488                 new ResponseDate(),
489                 new ResponseServer(),
490                 new ResponseContent(),
491                 new ResponseConnControl()
492         });
493 
494         ThrottlingHttpServiceHandler serviceHandler = new ThrottlingHttpServiceHandler(
495                 serverHttpProc,
496                 new DefaultHttpResponseFactory(),
497                 new DefaultConnectionReuseStrategy(),
498                 this.execService,
499                 this.serverParams);
500 
501         serviceHandler.setHandlerResolver(
502                 new SimpleHttpRequestHandlerResolver(new RequestHandler()));
503         serviceHandler.setEventListener(
504                 new SimpleEventListener());
505 
506         HttpProcessor clientHttpProc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
507                 new RequestContent(),
508                 new RequestTargetHost(),
509                 new RequestConnControl(),
510                 new RequestUserAgent(),
511                 new RequestExpectContinue()});
512 
513         ThrottlingHttpClientHandler clientHandler = new ThrottlingHttpClientHandler(
514                 clientHttpProc,
515                 requestExecutionHandler,
516                 new DefaultConnectionReuseStrategy(),
517                 this.execService,
518                 this.clientParams);
519 
520         clientHandler.setEventListener(new SimpleEventListener());
521 
522         this.server.start(serviceHandler);
523         this.client.start(clientHandler);
524 
525         ListenerEndpoint endpoint = this.server.getListenerEndpoint();
526         endpoint.waitFor();
527         InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress();
528 
529         Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
530 
531         Queue<SessionRequest> connRequests = new LinkedList<SessionRequest>();
532         for (int i = 0; i < connNo; i++) {
533             SessionRequest sessionRequest = this.client.openConnection(
534                     new InetSocketAddress("localhost", serverAddress.getPort()),
535                     queue);
536             connRequests.add(sessionRequest);
537         }
538 
539         while (!connRequests.isEmpty()) {
540             SessionRequest sessionRequest = connRequests.remove();
541             sessionRequest.waitFor();
542             if (sessionRequest.getException() != null) {
543                 throw sessionRequest.getException();
544             }
545             Assert.assertNotNull(sessionRequest.getSession());
546         }
547 
548         Assert.assertEquals("Test client status", IOReactorStatus.ACTIVE, this.client.getStatus());
549 
550         for (int i = 0; i < jobs.length; i++) {
551             Job testjob = jobs[i];
552             testjob.waitFor();
553             if (testjob.getFailureMessage() != null) {
554                 Assert.fail(testjob.getFailureMessage());
555             }
556             Assert.assertEquals(HttpStatus.SC_OK, testjob.getStatusCode());
557             Assert.assertNull(testjob.getResult());
558         }
559     }
560 
561     /**
562      * This test case tests if the protocol handler can correctly deal
563      * with requests with partially consumed content.
564      */
565     @Test
566     public void testSimpleHttpPostsContentNotConsumed() throws Exception {
567         HttpRequestHandler requestHandler = new HttpRequestHandler() {
568 
569             public void handle(
570                     final HttpRequest request,
571                     final HttpResponse response,
572                     final HttpContext context) throws HttpException, IOException {
573 
574                 // Request content body has not been consumed!!!
575                 response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
576                 NStringEntity outgoing = new NStringEntity("Ooopsie");
577                 response.setEntity(outgoing);
578             }
579 
580         };
581         HttpRequestExecutionHandler requestExecutionHandler = new RequestExecutionHandler() {
582 
583             @Override
584             protected HttpRequest generateRequest(Job testjob) {
585                 String s = testjob.getPattern() + "x" + testjob.getCount();
586                 HttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest("POST", s);
587                 NStringEntity entity = null;
588                 try {
589                     entity = new NStringEntity(testjob.getExpected(), "US-ASCII");
590                     entity.setChunked(testjob.getCount() % 2 == 0);
591                 } catch (UnsupportedEncodingException ignore) {
592                 }
593                 r.setEntity(entity);
594                 return r;
595             }
596 
597         };
598         int connNo = 3;
599         int reqNo = 20;
600         Job[] jobs = new Job[connNo * reqNo];
601         for (int i = 0; i < jobs.length; i++) {
602             jobs[i] = new Job();
603         }
604         Queue<Job> queue = new ConcurrentLinkedQueue<Job>();
605         for (int i = 0; i < jobs.length; i++) {
606             queue.add(jobs[i]);
607         }
608 
609         HttpProcessor serverHttpProc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
610                 new ResponseDate(),
611                 new ResponseServer(),
612                 new ResponseContent(),
613                 new ResponseConnControl()
614         });
615 
616         ThrottlingHttpServiceHandler serviceHandler = new ThrottlingHttpServiceHandler(
617                 serverHttpProc,
618                 new DefaultHttpResponseFactory(),
619                 new DefaultConnectionReuseStrategy(),
620                 this.execService,
621                 this.serverParams);
622 
623         serviceHandler.setHandlerResolver(
624                 new SimpleHttpRequestHandlerResolver(requestHandler));
625         serviceHandler.setEventListener(
626                 new SimpleEventListener());
627 
628         HttpProcessor clientHttpProc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
629                 new RequestContent(),
630                 new RequestTargetHost(),
631                 new RequestConnControl(),
632                 new RequestUserAgent(),
633                 new RequestExpectContinue()});
634 
635         ThrottlingHttpClientHandler clientHandler = new ThrottlingHttpClientHandler(
636                 clientHttpProc,
637                 requestExecutionHandler,
638                 new DefaultConnectionReuseStrategy(),
639                 this.execService,
640                 this.clientParams);
641 
642         clientHandler.setEventListener(
643                 new SimpleEventListener());
644 
645         this.server.start(serviceHandler);
646         this.client.start(clientHandler);
647 
648         ListenerEndpoint endpoint = this.server.getListenerEndpoint();
649         endpoint.waitFor();
650         InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress();
651 
652         Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
653 
654         Queue<SessionRequest> connRequests = new LinkedList<SessionRequest>();
655         for (int i = 0; i < connNo; i++) {
656             SessionRequest sessionRequest = this.client.openConnection(
657                     new InetSocketAddress("localhost", serverAddress.getPort()),
658                     queue);
659             connRequests.add(sessionRequest);
660         }
661 
662         while (!connRequests.isEmpty()) {
663             SessionRequest sessionRequest = connRequests.remove();
664             sessionRequest.waitFor();
665             if (sessionRequest.getException() != null) {
666                 throw sessionRequest.getException();
667             }
668             Assert.assertNotNull(sessionRequest.getSession());
669         }
670 
671         Assert.assertEquals("Test client status", IOReactorStatus.ACTIVE, this.client.getStatus());
672 
673         for (int i = 0; i < jobs.length; i++) {
674             Job testjob = jobs[i];
675             testjob.waitFor();
676             if (testjob.isSuccessful()) {
677                 Assert.assertEquals(HttpStatus.SC_INTERNAL_SERVER_ERROR, testjob.getStatusCode());
678                 Assert.assertEquals("Ooopsie", testjob.getResult());
679             } else {
680                 Assert.fail(testjob.getFailureMessage());
681             }
682         }
683     }
684 
685     @Test
686     public void testInputThrottling() throws Exception {
687         HttpRequestExecutionHandler requestExecutionHandler = new HttpRequestExecutionHandler() {
688 
689             public void initalizeContext(final HttpContext context, final Object attachment) {
690                 context.setAttribute("queue", attachment);
691             }
692 
693             public HttpRequest submitRequest(final HttpContext context) {
694 
695                 @SuppressWarnings("unchecked")
696                 Queue<Job> queue = (Queue<Job>) context.getAttribute("queue");
697                 if (queue == null) {
698                     throw new IllegalStateException("Queue is null");
699                 }
700 
701                 Job testjob = queue.poll();
702                 context.setAttribute("job", testjob);
703 
704                 if (testjob != null) {
705                     String s = testjob.getPattern() + "x" + testjob.getCount();
706                     HttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest("POST", s);
707                     StringEntity entity = null;
708                     try {
709                         entity = new StringEntity(testjob.getExpected(), "US-ASCII");
710                         entity.setChunked(testjob.getCount() % 2 == 0);
711                     } catch (UnsupportedEncodingException ignore) {
712                     }
713                     r.setEntity(entity);
714                     return r;
715                 } else {
716                     return null;
717                 }
718             }
719 
720             public void handleResponse(final HttpResponse response, final HttpContext context) {
721                 Job testjob = (Job) context.removeAttribute("job");
722                 if (testjob == null) {
723                     throw new IllegalStateException("TestJob is null");
724                 }
725 
726                 int statusCode = response.getStatusLine().getStatusCode();
727                 String content = null;
728 
729                 HttpEntity entity = response.getEntity();
730                 if (entity != null) {
731                     try {
732                         // Simulate slow response handling in order to cause the
733                         // internal content buffer to fill up, forcing the
734                         // protocol handler to throttle input rate
735                         ByteArrayOutputStream outstream = new ByteArrayOutputStream();
736                         InputStream instream = entity.getContent();
737                         byte[] tmp = new byte[2048];
738                         int l;
739                         while((l = instream.read(tmp)) != -1) {
740                             Thread.sleep(1);
741                             outstream.write(tmp, 0, l);
742                         }
743                         ContentType contentType = ContentType.getOrDefault(entity);
744                         Charset charset = contentType.getCharset();
745                         if (charset == null) {
746                             charset = HTTP.DEF_CONTENT_CHARSET;
747                         }
748                         content = new String(outstream.toByteArray(), charset.name());
749                     } catch (InterruptedException ex) {
750                         content = "Interrupted: " + ex.getMessage();
751                     } catch (IOException ex) {
752                         content = "I/O exception: " + ex.getMessage();
753                     }
754                 }
755                 testjob.setResult(statusCode, content);
756             }
757 
758             public void finalizeContext(final HttpContext context) {
759                 Job testjob = (Job) context.removeAttribute("job");
760                 if (testjob != null) {
761                     testjob.fail("Request failed");
762                 }
763             }
764 
765         };
766         int connNo = 3;
767         int reqNo = 20;
768         Job[] jobs = new Job[connNo * reqNo];
769         for (int i = 0; i < jobs.length; i++) {
770             jobs[i] = new Job(10000);
771         }
772         Queue<Job> queue = new ConcurrentLinkedQueue<Job>();
773         for (int i = 0; i < jobs.length; i++) {
774             queue.add(jobs[i]);
775         }
776 
777         HttpProcessor serverHttpProc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
778                 new ResponseDate(),
779                 new ResponseServer(),
780                 new ResponseContent(),
781                 new ResponseConnControl()
782         });
783 
784         ThrottlingHttpServiceHandler serviceHandler = new ThrottlingHttpServiceHandler(
785                 serverHttpProc,
786                 new DefaultHttpResponseFactory(),
787                 new DefaultConnectionReuseStrategy(),
788                 this.execService,
789                 this.serverParams);
790 
791         serviceHandler.setHandlerResolver(
792                 new SimpleHttpRequestHandlerResolver(new RequestHandler()));
793         serviceHandler.setEventListener(
794                 new SimpleEventListener());
795 
796         HttpProcessor clientHttpProc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
797                 new RequestContent(),
798                 new RequestTargetHost(),
799                 new RequestConnControl(),
800                 new RequestUserAgent(),
801                 new RequestExpectContinue()});
802 
803         ThrottlingHttpClientHandler clientHandler = new ThrottlingHttpClientHandler(
804                 clientHttpProc,
805                 requestExecutionHandler,
806                 new DefaultConnectionReuseStrategy(),
807                 this.execService,
808                 this.clientParams);
809 
810         clientHandler.setEventListener(
811                 new SimpleEventListener());
812 
813         this.server.start(serviceHandler);
814         this.client.start(clientHandler);
815 
816         ListenerEndpoint endpoint = this.server.getListenerEndpoint();
817         endpoint.waitFor();
818         InetSocketAddress serverAddress = (InetSocketAddress) endpoint.getAddress();
819 
820         Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
821 
822         Queue<SessionRequest> connRequests = new LinkedList<SessionRequest>();
823         for (int i = 0; i < connNo; i++) {
824             SessionRequest sessionRequest = this.client.openConnection(
825                     new InetSocketAddress("localhost", serverAddress.getPort()),
826                     queue);
827             connRequests.add(sessionRequest);
828         }
829 
830         while (!connRequests.isEmpty()) {
831             SessionRequest sessionRequest = connRequests.remove();
832             sessionRequest.waitFor();
833             if (sessionRequest.getException() != null) {
834                 throw sessionRequest.getException();
835             }
836             Assert.assertNotNull(sessionRequest.getSession());
837         }
838 
839         Assert.assertEquals("Test client status", IOReactorStatus.ACTIVE, this.client.getStatus());
840 
841         for (int i = 0; i < jobs.length; i++) {
842             Job testjob = jobs[i];
843             testjob.waitFor();
844             if (testjob.isSuccessful()) {
845                 Assert.assertEquals(HttpStatus.SC_OK, testjob.getStatusCode());
846                 Assert.assertEquals(testjob.getExpected(), testjob.getResult());
847             } else {
848                 Assert.fail(testjob.getFailureMessage());
849             }
850         }
851     }
852 
853 }