1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import static org.hamcrest.MatcherAssert.assertThat;
30
31 import java.util.LinkedList;
32 import java.util.Queue;
33 import java.util.Random;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicInteger;
40
41 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
42 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
43 import org.apache.hc.client5.http.protocol.HttpClientContext;
44 import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
45 import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
46 import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
47 import org.apache.hc.core5.concurrent.FutureCallback;
48 import org.apache.hc.core5.http.ContentType;
49 import org.apache.hc.core5.http.HttpHost;
50 import org.apache.hc.core5.http.HttpResponse;
51 import org.apache.hc.core5.http.Message;
52 import org.apache.hc.core5.http.Method;
53 import org.apache.hc.core5.http.URIScheme;
54 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
55 import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
56 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
57 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
58 import org.hamcrest.CoreMatchers;
59 import org.junit.jupiter.api.Test;
60
61 abstract class AbstractHttpAsyncFundamentalsTest extends AbstractIntegrationTestBase {
62
63 protected AbstractHttpAsyncFundamentalsTest(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final ServerProtocolLevel serverProtocolLevel) {
64 super(scheme, clientProtocolLevel, serverProtocolLevel);
65 }
66
67 @Test
68 void testSequentialGetRequests() throws Exception {
69 configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
70 final HttpHost target = startServer();
71
72 final TestAsyncClient client = startClient();
73
74 for (int i = 0; i < 3; i++) {
75 final Future<SimpleHttpResponse> future = client.execute(
76 SimpleRequestBuilder.get()
77 .setHttpHost(target)
78 .setPath("/random/2048")
79 .build(), null);
80 final SimpleHttpResponse response = future.get();
81 assertThat(response, CoreMatchers.notNullValue());
82 assertThat(response.getCode(), CoreMatchers.equalTo(200));
83 final String body = response.getBodyText();
84 assertThat(body, CoreMatchers.notNullValue());
85 assertThat(body.length(), CoreMatchers.equalTo(2048));
86 }
87 }
88
89 @Test
90 void testSequentialHeadRequests() throws Exception {
91 configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
92 final HttpHost target = startServer();
93 final TestAsyncClient client = startClient();
94 for (int i = 0; i < 3; i++) {
95 final Future<SimpleHttpResponse> future = client.execute(
96 SimpleRequestBuilder.head()
97 .setHttpHost(target)
98 .setPath("/random/2048")
99 .build(), null);
100 final SimpleHttpResponse response = future.get();
101 assertThat(response, CoreMatchers.notNullValue());
102 assertThat(response.getCode(), CoreMatchers.equalTo(200));
103 final String body = response.getBodyText();
104 assertThat(body, CoreMatchers.nullValue());
105 }
106 }
107
108 @Test
109 void testSequentialPostRequests() throws Exception {
110 configureServer(bootstrap -> bootstrap.register("/echo/*", AsyncEchoHandler::new));
111 final HttpHost target = startServer();
112 final TestAsyncClient client = startClient();
113 for (int i = 0; i < 3; i++) {
114 final byte[] b1 = new byte[1024];
115 final Random rnd = new Random(System.currentTimeMillis());
116 rnd.nextBytes(b1);
117 final Future<Message<HttpResponse, byte[]>> future = client.execute(
118 new BasicRequestProducer(Method.GET, target, "/echo/",
119 AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
120 new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
121 final Message<HttpResponse, byte[]> responseMessage = future.get();
122 assertThat(responseMessage, CoreMatchers.notNullValue());
123 final HttpResponse response = responseMessage.getHead();
124 assertThat(response.getCode(), CoreMatchers.equalTo(200));
125 final byte[] b2 = responseMessage.getBody();
126 assertThat(b1, CoreMatchers.equalTo(b2));
127 }
128 }
129
130 @Test
131 void testConcurrentPostRequests() throws Exception {
132 configureServer(bootstrap -> bootstrap.register("/echo/*", AsyncEchoHandler::new));
133 final HttpHost target = startServer();
134 final TestAsyncClient client = startClient();
135 final byte[] b1 = new byte[1024];
136 final Random rnd = new Random(System.currentTimeMillis());
137 rnd.nextBytes(b1);
138
139 final int reqCount = 20;
140
141 final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
142 for (int i = 0; i < reqCount; i++) {
143 final Future<Message<HttpResponse, byte[]>> future = client.execute(
144 new BasicRequestProducer(Method.POST, target, "/echo/",
145 AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
146 new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
147 queue.add(future);
148 }
149
150 while (!queue.isEmpty()) {
151 final Future<Message<HttpResponse, byte[]>> future = queue.remove();
152 final Message<HttpResponse, byte[]> responseMessage = future.get();
153 assertThat(responseMessage, CoreMatchers.notNullValue());
154 final HttpResponse response = responseMessage.getHead();
155 assertThat(response.getCode(), CoreMatchers.equalTo(200));
156 final byte[] b2 = responseMessage.getBody();
157 assertThat(b1, CoreMatchers.equalTo(b2));
158 }
159 }
160
161 @Test
162 void testRequestExecutionFromCallback() throws Exception {
163 configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
164 final HttpHost target = startServer();
165 final TestAsyncClient client = startClient();
166 final int requestNum = 50;
167 final AtomicInteger count = new AtomicInteger(requestNum);
168 final Queue<SimpleHttpResponse> resultQueue = new ConcurrentLinkedQueue<>();
169 final CountDownLatch countDownLatch = new CountDownLatch(requestNum);
170
171 final FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
172
173 @Override
174 public void completed(final SimpleHttpResponse result) {
175 try {
176 resultQueue.add(result);
177 if (count.decrementAndGet() > 0) {
178 client.execute(
179 SimpleRequestBuilder.get()
180 .setHttpHost(target)
181 .setPath("/random/2048")
182 .build(), this);
183 }
184 } finally {
185 countDownLatch.countDown();
186 }
187 }
188
189 @Override
190 public void failed(final Exception ex) {
191 countDownLatch.countDown();
192 }
193
194 @Override
195 public void cancelled() {
196 countDownLatch.countDown();
197 }
198 };
199
200 final int threadNum = 5;
201 final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
202 for (int i = 0; i < threadNum; i++) {
203 executorService.execute(() -> {
204 if (!Thread.currentThread().isInterrupted()) {
205 client.execute(
206 SimpleRequestBuilder.get()
207 .setHttpHost(target)
208 .setPath("/random/2048")
209 .build(), callback);
210 }
211 });
212 }
213
214 assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
215
216 executorService.shutdownNow();
217 executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
218
219 for (;;) {
220 final SimpleHttpResponse response = resultQueue.poll();
221 if (response == null) {
222 break;
223 }
224 assertThat(response.getCode(), CoreMatchers.equalTo(200));
225 }
226 }
227
228 @Test
229 void testBadRequest() throws Exception {
230 configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
231 final HttpHost target = startServer();
232 final TestAsyncClient client = startClient();
233 final Future<SimpleHttpResponse> future = client.execute(
234 SimpleRequestBuilder.get()
235 .setHttpHost(target)
236 .setPath("/random/boom")
237 .build(), null);
238 final SimpleHttpResponse response = future.get();
239 assertThat(response, CoreMatchers.notNullValue());
240 assertThat(response.getCode(), CoreMatchers.equalTo(400));
241 }
242
243 }