View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.client.methods;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.nio.ByteBuffer;
32  import java.nio.CharBuffer;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.apache.http.Consts;
38  import org.apache.http.HttpAsyncTestBase;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpHost;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.config.ConnectionConfig;
43  import org.apache.http.entity.ContentType;
44  import org.apache.http.impl.DefaultConnectionReuseStrategy;
45  import org.apache.http.impl.DefaultHttpResponseFactory;
46  import org.apache.http.impl.nio.DefaultNHttpServerConnection;
47  import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
48  import org.apache.http.impl.nio.client.HttpAsyncClients;
49  import org.apache.http.localserver.EchoHandler;
50  import org.apache.http.localserver.RandomHandler;
51  import org.apache.http.nio.IOControl;
52  import org.apache.http.nio.NHttpConnectionFactory;
53  import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
54  import org.apache.http.nio.protocol.HttpAsyncExpectationVerifier;
55  import org.apache.http.nio.protocol.HttpAsyncRequestHandlerMapper;
56  import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
57  import org.apache.http.nio.protocol.HttpAsyncService;
58  import org.apache.http.nio.protocol.UriHttpAsyncRequestHandlerMapper;
59  import org.apache.http.nio.reactor.IOReactorStatus;
60  import org.apache.http.nio.reactor.ListenerEndpoint;
61  import org.apache.http.protocol.HttpContext;
62  import org.junit.After;
63  import org.junit.Assert;
64  import org.junit.Before;
65  import org.junit.Test;
66  import org.mockito.Mockito;
67  
68  public class TestAsyncConsumers extends HttpAsyncTestBase {
69  
70      @Before
71      public void setUp() throws Exception {
72          initServer();
73          initConnectionManager();
74          this.httpclient = HttpAsyncClients.custom()
75                  .setConnectionManager(this.connMgr)
76                  .build();
77      }
78  
79      @After
80      public void tearDown() throws Exception {
81          shutDownClient();
82          shutDownServer();
83      }
84  
85      @Override
86      protected NHttpConnectionFactory<DefaultNHttpServerConnection> createServerConnectionFactory(
87              final ConnectionConfig config) throws Exception {
88          return new DefaultNHttpServerConnectionFactory(config);
89      }
90  
91      @Override
92      protected String getSchemeName() {
93          return "http";
94      }
95  
96      private HttpHost start(
97              final HttpAsyncRequestHandlerMapper requestHandlerResolver,
98              final HttpAsyncExpectationVerifier expectationVerifier) throws Exception {
99          final HttpAsyncService serviceHandler = new HttpAsyncService(
100                 this.serverHttpProc,
101                 new DefaultConnectionReuseStrategy(),
102                 new DefaultHttpResponseFactory(),
103                 requestHandlerResolver,
104                 expectationVerifier);
105         this.server.start(serviceHandler);
106         this.httpclient.start();
107 
108         final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
109         endpoint.waitFor();
110 
111         Assert.assertEquals("Test server status", IOReactorStatus.ACTIVE, this.server.getStatus());
112         final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
113         return new HttpHost("localhost", address.getPort(), getSchemeName());
114     }
115 
116     private HttpHost start() throws Exception {
117         final UriHttpAsyncRequestHandlerMapper registry = new UriHttpAsyncRequestHandlerMapper();
118         registry.register("/echo/*", new BasicAsyncRequestHandler(new EchoHandler()));
119         registry.register("/random/*", new BasicAsyncRequestHandler(new RandomHandler()));
120         return start(registry, null);
121     }
122 
123     static class ByteCountingConsumer extends AsyncByteConsumer<Long> {
124 
125         public ByteCountingConsumer() {
126             super();
127         }
128 
129         public ByteCountingConsumer(final int bufSize) {
130             super(bufSize);
131         }
132 
133         private final AtomicLong count = new AtomicLong(0);
134 
135         @Override
136         protected void onResponseReceived(final HttpResponse response) {
137         }
138 
139         @Override
140         protected void onByteReceived(final ByteBuffer buf, final IOControl ioctrl) {
141             this.count.addAndGet(buf.remaining());
142         }
143 
144         @Override
145         protected Long buildResult(final HttpContext context) throws Exception {
146             return count.get();
147         }
148 
149     }
150 
151     @Test
152     public void testByteConsumer() throws Exception {
153         final HttpHost target = start();
154         for (int i = 0; i < 5; i++) {
155             final HttpAsyncRequestProducer httpget = HttpAsyncMethods.createGet(target.toURI() + "/random/20480");
156             final AsyncByteConsumer<Long> consumer = new ByteCountingConsumer();
157             final Future<Long> future = this.httpclient.execute(httpget, consumer, null);
158             final Long count = future.get();
159             Assert.assertEquals(20480, count.longValue());
160         }
161     }
162 
163     @Test
164     public void testByteConsumerSmallBufffer() throws Exception {
165         final HttpHost target = start();
166         for (int i = 0; i < 5; i++) {
167             final HttpAsyncRequestProducer httpget = HttpAsyncMethods.createGet(target.toURI() + "/random/20480");
168             final AsyncByteConsumer<Long> consumer = new ByteCountingConsumer(512);
169             final Future<Long> future = this.httpclient.execute(httpget, consumer, null);
170             final Long count = future.get();
171             Assert.assertEquals(20480, count.longValue());
172         }
173     }
174 
175     static class BufferingCharConsumer extends AsyncCharConsumer<String> {
176 
177         public BufferingCharConsumer() {
178             super();
179         }
180 
181         public BufferingCharConsumer(final int bufSize) {
182             super(bufSize);
183         }
184 
185         private final StringBuilder sb = new StringBuilder();
186 
187         @Override
188         public void onResponseReceived(final HttpResponse response) {
189         }
190 
191         @Override
192         protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
193             while (buf.hasRemaining()) {
194                 this.sb.append(buf.get());
195             }
196         }
197 
198         @Override
199         protected void releaseResources() {
200             super.releaseResources();
201             this.sb.setLength(0);
202         }
203 
204         @Override
205         protected String buildResult(final HttpContext context) throws Exception {
206             return this.sb.toString();
207         }
208 
209     }
210 
211     @Test
212     public void testCharConsumer() throws Exception {
213         final HttpHost target = start();
214         final StringBuilder sb = new StringBuilder();
215         for (int i= 0; i < 25; i++) {
216             sb.append("blah blah blah blah\r\n");
217             sb.append("yada yada yada yada\r\n");
218         }
219         final String s = sb.toString();
220 
221         for (int i = 0; i < 5; i++) {
222             final HttpAsyncRequestProducer httppost = HttpAsyncMethods.createPost(
223                     target.toURI() + "/echo/stuff", s,
224                     ContentType.create("text/plain", Consts.ASCII));
225             final AsyncCharConsumer<String> consumer = new BufferingCharConsumer();
226             final Future<String> future = this.httpclient.execute(httppost, consumer, null);
227             final String result = future.get();
228             Assert.assertEquals(s, result);
229         }
230     }
231 
232     @Test
233     public void testCharConsumerSmallBufffer() throws Exception {
234         final HttpHost target = start();
235         final StringBuilder sb = new StringBuilder();
236         for (int i= 0; i < 25; i++) {
237             sb.append("blah blah blah blah\r\n");
238             sb.append("yada yada yada yada\r\n");
239         }
240         final String s = sb.toString();
241 
242         for (int i = 0; i < 5; i++) {
243             final HttpAsyncRequestProducer httppost = HttpAsyncMethods.createPost(
244                     target.toURI() + "/echo/stuff", s,
245                     ContentType.create("text/plain", Consts.ASCII));
246             final AsyncCharConsumer<String> consumer = new BufferingCharConsumer(512);
247             final Future<String> future = this.httpclient.execute(httppost, consumer, null);
248             final String result = future.get();
249             Assert.assertEquals(s, result);
250         }
251     }
252 
253     @Test
254     public void testResourceReleaseOnSuccess() throws Exception {
255         final HttpHost target = start();
256         final StringBuilder sb = new StringBuilder();
257         for (int i= 0; i < 25; i++) {
258             sb.append("blah blah blah blah\r\n");
259             sb.append("yada yada yada yada\r\n");
260         }
261         final String s = sb.toString();
262 
263         final HttpAsyncRequestProducer httppost = HttpAsyncMethods.createPost(
264                 target.toURI() + "/echo/stuff", s,
265                 ContentType.create("text/plain", Consts.ASCII));
266         final BufferingCharConsumer consumer = Mockito.spy(new BufferingCharConsumer());
267         final Future<String> future = this.httpclient.execute(httppost, consumer, null);
268         final String result = future.get();
269         Assert.assertEquals(s, result);
270         Mockito.verify(consumer).buildResult(Mockito.any(HttpContext.class));
271         Mockito.verify(consumer).releaseResources();
272     }
273 
274     @Test
275     public void testResourceReleaseOnException() throws Exception {
276         final HttpHost target = start();
277         final HttpAsyncRequestProducer httppost = HttpAsyncMethods.createPost(
278                 target.toURI() + "/echo/stuff", "stuff",
279                 ContentType.create("text/plain", Consts.ASCII));
280         final AsyncCharConsumer<String> consumer = Mockito.spy(new BufferingCharConsumer());
281         Mockito.doThrow(new IOException("Kaboom")).when(consumer).onCharReceived(
282                 Mockito.any(CharBuffer.class), Mockito.any(IOControl.class));
283 
284         final Future<String> future = this.httpclient.execute(httppost, consumer, null);
285         try {
286             future.get();
287             Assert.fail("ExecutionException expected");
288         } catch (final ExecutionException ex) {
289             final Throwable t = ex.getCause();
290             Assert.assertNotNull(t);
291             Assert.assertTrue(t instanceof IOException);
292             Assert.assertEquals("Kaboom", t.getMessage());
293         }
294         Mockito.verify(consumer).releaseResources();
295     }
296 
297     @Test
298     public void testResourceReleaseOnBuildFailure() throws Exception {
299         final HttpHost target = start();
300         final HttpAsyncRequestProducer httppost = HttpAsyncMethods.createPost(
301                 target.toURI() + "/echo/stuff", "stuff",
302                 ContentType.create("text/plain", Consts.ASCII));
303         final BufferingCharConsumer consumer = Mockito.spy(new BufferingCharConsumer());
304         Mockito.doThrow(new HttpException("Kaboom")).when(consumer).buildResult(Mockito.any(HttpContext.class));
305 
306         final Future<String> future = this.httpclient.execute(httppost, consumer, null);
307         try {
308             future.get();
309             Assert.fail("ExecutionException expected");
310         } catch (final ExecutionException ex) {
311             final Throwable t = ex.getCause();
312             Assert.assertNotNull(t);
313             Assert.assertTrue(t instanceof HttpException);
314             Assert.assertEquals("Kaboom", t.getMessage());
315         }
316         Mockito.verify(consumer).releaseResources();
317     }
318 
319 }