1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.util.concurrent.Future;
30
31 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
32 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
33 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
34 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
35 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
36 import org.apache.hc.client5.http.protocol.HttpClientContext;
37 import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
38 import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
39 import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
40 import org.apache.hc.core5.http.ContentType;
41 import org.apache.hc.core5.http.EndpointDetails;
42 import org.apache.hc.core5.http.HttpHost;
43 import org.apache.hc.core5.http.HttpResponse;
44 import org.apache.hc.core5.http.HttpStatus;
45 import org.apache.hc.core5.http.URIScheme;
46 import org.apache.hc.core5.http.protocol.HttpContext;
47 import org.apache.hc.core5.http.protocol.HttpCoreContext;
48 import org.apache.hc.core5.net.URIAuthority;
49 import org.junit.jupiter.api.Assertions;
50 import org.junit.jupiter.api.Test;
51
52 class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase {
53
54 public TestHttp1AsyncStatefulConnManagement() {
55 super(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD);
56 }
57
58 @Test
59 void testStatefulConnections() throws Exception {
60 configureServer(bootstrap -> bootstrap.register("*", () -> new AbstractSimpleServerExchangeHandler() {
61
62 @Override
63 protected SimpleHttpResponse handle(
64 final SimpleHttpRequest request,
65 final HttpCoreContext context) {
66 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
67 response.setBody("Whatever", ContentType.TEXT_PLAIN);
68 return response;
69 }
70 }));
71 final HttpHost target = startServer();
72
73 configureClient(builder -> builder
74 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
75
76 final TestAsyncClient client = startClient();
77
78 final int workerCount = 2;
79 final int requestCount = 5;
80
81 final HttpContext[] contexts = new HttpContext[workerCount];
82 final HttpWorker[] workers = new HttpWorker[workerCount];
83 for (int i = 0; i < contexts.length; i++) {
84 final HttpClientContext context = HttpClientContext.create();
85 contexts[i] = context;
86 workers[i] = new HttpWorker(
87 "user" + i,
88 context, requestCount, target, client);
89 }
90
91 for (final HttpWorker worker : workers) {
92 worker.start();
93 }
94 for (final HttpWorker worker : workers) {
95 worker.join(TIMEOUT.toMilliseconds());
96 }
97 for (final HttpWorker worker : workers) {
98 final Exception ex = worker.getException();
99 if (ex != null) {
100 throw ex;
101 }
102 Assertions.assertEquals(requestCount, worker.getCount());
103 }
104
105 for (final HttpContext context : contexts) {
106 final String state0 = (String) context.getAttribute("r0");
107 Assertions.assertNotNull(state0);
108 for (int r = 1; r < requestCount; r++) {
109 Assertions.assertEquals(state0, context.getAttribute("r" + r));
110 }
111 }
112
113 }
114
115 static class HttpWorker extends Thread {
116
117 private final String uid;
118 private final HttpClientContext context;
119 private final int requestCount;
120 private final HttpHost target;
121 private final CloseableHttpAsyncClient httpclient;
122
123 private volatile Exception exception;
124 private volatile int count;
125
126 public HttpWorker(
127 final String uid,
128 final HttpClientContext context,
129 final int requestCount,
130 final HttpHost target,
131 final CloseableHttpAsyncClient httpclient) {
132 super();
133 this.uid = uid;
134 this.context = context;
135 this.requestCount = requestCount;
136 this.target = target;
137 this.httpclient = httpclient;
138 this.count = 0;
139 }
140
141 public int getCount() {
142 return count;
143 }
144
145 public Exception getException() {
146 return exception;
147 }
148
149 @Override
150 public void run() {
151 try {
152 context.setAttribute("user", uid);
153 for (int r = 0; r < requestCount; r++) {
154 final SimpleHttpRequest request = SimpleRequestBuilder.get()
155 .setHttpHost(target)
156 .setPath("/")
157 .build();
158 final Future<SimpleHttpResponse> future = httpclient.execute(request, null);
159 future.get();
160
161 count++;
162 final EndpointDetails endpointDetails = context.getEndpointDetails();
163 final String connuid = Integer.toHexString(System.identityHashCode(endpointDetails));
164 context.setAttribute("r" + r, connuid);
165 }
166
167 } catch (final Exception ex) {
168 exception = ex;
169 }
170 }
171
172 }
173
174 @Test
175 void testRouteSpecificPoolRecylcing() throws Exception {
176 configureServer(bootstrap -> bootstrap.register("*", () -> new AbstractSimpleServerExchangeHandler() {
177
178 @Override
179 protected SimpleHttpResponse handle(
180 final SimpleHttpRequest request,
181 final HttpCoreContext context) {
182 final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
183 response.setBody("Whatever", ContentType.TEXT_PLAIN);
184 return response;
185 }
186 }));
187 final HttpHost target = startServer();
188
189
190
191
192
193 configureClient(builder -> builder
194 .setUserTokenHandler((route, context) -> context.getAttribute("user")));
195
196 final TestAsyncClient client = startClient();
197
198 final PoolingAsyncClientConnectionManager connManager = client.getConnectionManager();
199
200 final int maxConn = 2;
201
202 connManager.setMaxTotal(maxConn);
203 connManager.setDefaultMaxPerRoute(maxConn);
204
205
206 final HttpContext context1 = HttpClientContext.create();
207 context1.setAttribute("user", "stuff");
208
209 final SimpleHttpRequest request1 = SimpleRequestBuilder.get()
210 .setHttpHost(target)
211 .setPath("/")
212 .build();
213 final Future<SimpleHttpResponse> future1 = client.execute(request1, context1, null);
214 final HttpResponse response1 = future1.get();
215 Assertions.assertNotNull(response1);
216 Assertions.assertEquals(200, response1.getCode());
217
218
219
220
221
222 Thread.sleep(100);
223
224
225
226 final HttpContext context2 = HttpClientContext.create();
227
228 final SimpleHttpRequest request2 = SimpleRequestBuilder.get()
229 .setScheme(target.getSchemeName())
230 .setAuthority(new URIAuthority("127.0.0.1", target.getPort()))
231 .setPath("/")
232 .build();
233 final Future<SimpleHttpResponse> future2 = client.execute(request2, context2, null);
234 final HttpResponse response2 = future2.get();
235 Assertions.assertNotNull(response2);
236 Assertions.assertEquals(200, response2.getCode());
237
238
239
240
241
242 Thread.sleep(100);
243
244
245
246
247
248
249 final HttpContext context3 = HttpClientContext.create();
250
251 final SimpleHttpRequest request3 = SimpleRequestBuilder.get()
252 .setHttpHost(target)
253 .setPath("/")
254 .build();
255 final Future<SimpleHttpResponse> future3 = client.execute(request3, context3, null);
256 final HttpResponse response3 = future3.get();
257 Assertions.assertNotNull(response3);
258 Assertions.assertEquals(200, response3.getCode());
259 }
260
261 }