1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.testing.sync;
29
30 import java.io.ByteArrayInputStream;
31 import java.net.URI;
32 import java.nio.charset.StandardCharsets;
33 import java.util.ArrayList;
34 import java.util.List;
35
36 import org.apache.hc.client5.http.classic.methods.HttpGet;
37 import org.apache.hc.client5.http.classic.methods.HttpPost;
38 import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
39 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
40 import org.apache.hc.client5.testing.classic.RandomHandler;
41 import org.apache.hc.client5.testing.extension.sync.ClientProtocolLevel;
42 import org.apache.hc.client5.testing.extension.sync.TestClient;
43 import org.apache.hc.core5.http.ClassicHttpRequest;
44 import org.apache.hc.core5.http.ContentType;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HeaderElements;
48 import org.apache.hc.core5.http.HttpHeaders;
49 import org.apache.hc.core5.http.HttpHost;
50 import org.apache.hc.core5.http.HttpResponse;
51 import org.apache.hc.core5.http.HttpResponseInterceptor;
52 import org.apache.hc.core5.http.URIScheme;
53 import org.apache.hc.core5.http.impl.HttpProcessors;
54 import org.apache.hc.core5.http.io.entity.EntityUtils;
55 import org.apache.hc.core5.http.io.entity.InputStreamEntity;
56 import org.apache.hc.core5.http.protocol.HttpContext;
57 import org.junit.jupiter.api.Assertions;
58 import org.junit.jupiter.api.Test;
59
60 class TestConnectionReuse extends AbstractIntegrationTestBase {
61
62 public TestConnectionReuse() {
63 super(URIScheme.HTTP, ClientProtocolLevel.STANDARD);
64 }
65
66 @Test
67 void testReuseOfPersistentConnections() throws Exception {
68 configureServer(bootstrap -> bootstrap
69 .register("/random/*", new RandomHandler()));
70 final HttpHost target = startServer();
71
72 final TestClient client = client();
73 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
74 connManager.setMaxTotal(5);
75 connManager.setDefaultMaxPerRoute(5);
76
77 final WorkerThread[] workers = new WorkerThread[10];
78 for (int i = 0; i < workers.length; i++) {
79 workers[i] = new WorkerThread(
80 client,
81 target,
82 new URI("/random/2000"),
83 10, false);
84 }
85
86 for (final WorkerThread worker : workers) {
87 worker.start();
88 }
89 for (final WorkerThread worker : workers) {
90 worker.join(10000);
91 final Exception ex = worker.getException();
92 if (ex != null) {
93 throw ex;
94 }
95 }
96
97
98 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
99
100 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
101 }
102
103 @Test
104 void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
105 configureServer(bootstrap -> bootstrap
106 .register("/random/*", new RandomHandler()));
107 final HttpHost target = startServer();
108
109 final TestClient client = client();
110 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
111 connManager.setMaxTotal(5);
112 connManager.setDefaultMaxPerRoute(5);
113
114 final WorkerThread[] workers = new WorkerThread[10];
115 for (int i = 0; i < workers.length; i++) {
116 final List<ClassicHttpRequest> requests = new ArrayList<>();
117 for (int j = 0; j < 10; j++) {
118 final HttpPost post = new HttpPost(new URI("/random/2000"));
119
120 post.setEntity(new InputStreamEntity(
121 new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
122 ContentType.APPLICATION_OCTET_STREAM));
123 requests.add(post);
124 }
125 workers[i] = new WorkerThread(client, target, false, requests);
126 }
127
128 for (final WorkerThread worker : workers) {
129 worker.start();
130 }
131 for (final WorkerThread worker : workers) {
132 worker.join(10000);
133 final Exception ex = worker.getException();
134 if (ex != null) {
135 throw ex;
136 }
137 }
138
139
140 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
141
142 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
143 }
144
145 private static class AlwaysCloseConn implements HttpResponseInterceptor {
146
147 @Override
148 public void process(
149 final HttpResponse response,
150 final EntityDetails entityDetails,
151 final HttpContext context) {
152 response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
153 }
154
155 }
156
157 @Test
158 void testReuseOfClosedConnections() throws Exception {
159 configureServer(bootstrap -> bootstrap
160 .setHttpProcessor(HttpProcessors.customServer(null)
161 .add(new AlwaysCloseConn())
162 .build()));
163 final HttpHost target = startServer();
164
165 final TestClient client = client();
166 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
167 connManager.setMaxTotal(5);
168 connManager.setDefaultMaxPerRoute(5);
169
170 final WorkerThread[] workers = new WorkerThread[10];
171 for (int i = 0; i < workers.length; i++) {
172 workers[i] = new WorkerThread(
173 client,
174 target,
175 new URI("/random/2000"),
176 10, false);
177 }
178
179 for (final WorkerThread worker : workers) {
180 worker.start();
181 }
182 for (final WorkerThread worker : workers) {
183 worker.join(10000);
184 final Exception ex = worker.getException();
185 if (ex != null) {
186 throw ex;
187 }
188 }
189
190
191 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
192
193 Assertions.assertEquals(0, connManager.getTotalStats().getAvailable());
194 }
195
196 @Test
197 void testReuseOfAbortedConnections() throws Exception {
198 configureServer(bootstrap -> bootstrap
199 .register("/random/*", new RandomHandler()));
200 final HttpHost target = startServer();
201
202 final TestClient client = client();
203 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
204 connManager.setMaxTotal(5);
205 connManager.setDefaultMaxPerRoute(5);
206
207 final WorkerThread[] workers = new WorkerThread[10];
208 for (int i = 0; i < workers.length; i++) {
209 workers[i] = new WorkerThread(
210 client,
211 target,
212 new URI("/random/2000"),
213 10, true);
214 }
215
216 for (final WorkerThread worker : workers) {
217 worker.start();
218 }
219 for (final WorkerThread worker : workers) {
220 worker.join(10000);
221 final Exception ex = worker.getException();
222 if (ex != null) {
223 throw ex;
224 }
225 }
226
227
228 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
229
230 Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
231 }
232
233 @Test
234 void testKeepAliveHeaderRespected() throws Exception {
235 configureServer(bootstrap -> bootstrap
236 .setHttpProcessor(HttpProcessors.customServer(null)
237 .add(new ResponseKeepAlive())
238 .build())
239 .register("/random/*", new RandomHandler()));
240 final HttpHost target = startServer();
241
242 final TestClient client = client();
243 final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
244 connManager.setMaxTotal(1);
245 connManager.setDefaultMaxPerRoute(1);
246
247 client.execute(target, new HttpGet("/random/2000"), response -> {
248 EntityUtils.consume(response.getEntity());
249 return null;
250 });
251
252 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
253
254 client.execute(target, new HttpGet("/random/2000"), response -> {
255 EntityUtils.consume(response.getEntity());
256 return null;
257 });
258
259 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
260
261
262 Thread.sleep(1100);
263 client.execute(target, new HttpGet("/random/2000"), response -> {
264 EntityUtils.consume(response.getEntity());
265 return null;
266 });
267
268 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
269
270
271
272 Thread.sleep(500);
273 client.execute(target, new HttpGet("/random/2000"), response -> {
274 EntityUtils.consume(response.getEntity());
275 return null;
276 });
277
278
279 Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
280 Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
281 }
282
283 private static class WorkerThread extends Thread {
284
285 private final HttpHost target;
286 private final CloseableHttpClient httpclient;
287 private final boolean forceClose;
288 private final List<ClassicHttpRequest> requests;
289
290 private volatile Exception exception;
291
292 public WorkerThread(
293 final CloseableHttpClient httpclient,
294 final HttpHost target,
295 final URI requestURI,
296 final int repetitions,
297 final boolean forceClose) {
298 super();
299 this.httpclient = httpclient;
300 this.target = target;
301 this.forceClose = forceClose;
302 this.requests = new ArrayList<>(repetitions);
303 for (int i = 0; i < repetitions; i++) {
304 requests.add(new HttpGet(requestURI));
305 }
306 }
307
308 public WorkerThread(
309 final CloseableHttpClient httpclient,
310 final HttpHost target,
311 final boolean forceClose,
312 final List<ClassicHttpRequest> requests) {
313 super();
314 this.httpclient = httpclient;
315 this.target = target;
316 this.forceClose = forceClose;
317 this.requests = requests;
318 }
319
320 @Override
321 public void run() {
322 try {
323 for (final ClassicHttpRequest request : requests) {
324 this.httpclient.execute(this.target, request, response -> {
325 if (this.forceClose) {
326 response.close();
327 } else {
328 EntityUtils.consume(response.getEntity());
329 }
330 return null;
331 });
332 }
333 } catch (final Exception ex) {
334 this.exception = ex;
335 }
336 }
337
338 public Exception getException() {
339 return exception;
340 }
341
342 }
343
344
345
346 private static class ResponseKeepAlive implements HttpResponseInterceptor {
347 @Override
348 public void process(
349 final HttpResponse response,
350 final EntityDetails entityDetails,
351 final HttpContext context) {
352 final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
353 if(connection != null) {
354 if(!connection.getValue().equalsIgnoreCase("Close")) {
355 response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
356 }
357 }
358 }
359 }
360
361 }