View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.testing.sync;
29  
30  import java.io.ByteArrayInputStream;
31  import java.net.URI;
32  import java.nio.charset.StandardCharsets;
33  import java.util.ArrayList;
34  import java.util.List;
35  
36  import org.apache.hc.client5.http.classic.methods.HttpGet;
37  import org.apache.hc.client5.http.classic.methods.HttpPost;
38  import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
39  import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
40  import org.apache.hc.client5.testing.classic.RandomHandler;
41  import org.apache.hc.client5.testing.extension.sync.ClientProtocolLevel;
42  import org.apache.hc.client5.testing.extension.sync.TestClient;
43  import org.apache.hc.core5.http.ClassicHttpRequest;
44  import org.apache.hc.core5.http.ContentType;
45  import org.apache.hc.core5.http.EntityDetails;
46  import org.apache.hc.core5.http.Header;
47  import org.apache.hc.core5.http.HeaderElements;
48  import org.apache.hc.core5.http.HttpHeaders;
49  import org.apache.hc.core5.http.HttpHost;
50  import org.apache.hc.core5.http.HttpResponse;
51  import org.apache.hc.core5.http.HttpResponseInterceptor;
52  import org.apache.hc.core5.http.URIScheme;
53  import org.apache.hc.core5.http.impl.HttpProcessors;
54  import org.apache.hc.core5.http.io.entity.EntityUtils;
55  import org.apache.hc.core5.http.io.entity.InputStreamEntity;
56  import org.apache.hc.core5.http.protocol.HttpContext;
57  import org.junit.jupiter.api.Assertions;
58  import org.junit.jupiter.api.Test;
59  
60  class TestConnectionReuse extends AbstractIntegrationTestBase {
61  
62      public TestConnectionReuse() {
63          super(URIScheme.HTTP, ClientProtocolLevel.STANDARD);
64      }
65  
66      @Test
67      void testReuseOfPersistentConnections() throws Exception {
68          configureServer(bootstrap -> bootstrap
69                  .register("/random/*", new RandomHandler()));
70          final HttpHost target = startServer();
71  
72          final TestClient client = client();
73          final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
74          connManager.setMaxTotal(5);
75          connManager.setDefaultMaxPerRoute(5);
76  
77          final WorkerThread[] workers = new WorkerThread[10];
78          for (int i = 0; i < workers.length; i++) {
79              workers[i] = new WorkerThread(
80                      client,
81                      target,
82                      new URI("/random/2000"),
83                      10, false);
84          }
85  
86          for (final WorkerThread worker : workers) {
87              worker.start();
88          }
89          for (final WorkerThread worker : workers) {
90              worker.join(10000);
91              final Exception ex = worker.getException();
92              if (ex != null) {
93                  throw ex;
94              }
95          }
96  
97          // Expect leased connections to be returned
98          Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
99          // Expect some connection in the pool
100         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
101     }
102 
103     @Test
104     void testReuseOfPersistentConnectionsWithStreamedRequestAndResponse() throws Exception {
105         configureServer(bootstrap -> bootstrap
106                 .register("/random/*", new RandomHandler()));
107         final HttpHost target = startServer();
108 
109         final TestClient client = client();
110         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
111         connManager.setMaxTotal(5);
112         connManager.setDefaultMaxPerRoute(5);
113 
114         final WorkerThread[] workers = new WorkerThread[10];
115         for (int i = 0; i < workers.length; i++) {
116             final List<ClassicHttpRequest> requests = new ArrayList<>();
117             for (int j = 0; j < 10; j++) {
118                 final HttpPost post = new HttpPost(new URI("/random/2000"));
119                 // non-repeatable
120                 post.setEntity(new InputStreamEntity(
121                         new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)),
122                         ContentType.APPLICATION_OCTET_STREAM));
123                 requests.add(post);
124             }
125             workers[i] = new WorkerThread(client, target, false, requests);
126         }
127 
128         for (final WorkerThread worker : workers) {
129             worker.start();
130         }
131         for (final WorkerThread worker : workers) {
132             worker.join(10000);
133             final Exception ex = worker.getException();
134             if (ex != null) {
135                 throw ex;
136             }
137         }
138 
139         // Expect leased connections to be returned
140         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
141         // Expect some connection in the pool
142         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
143     }
144 
145     private static class AlwaysCloseConn implements HttpResponseInterceptor {
146 
147         @Override
148         public void process(
149                 final HttpResponse response,
150                 final EntityDetails entityDetails,
151                 final HttpContext context) {
152             response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
153         }
154 
155     }
156 
157     @Test
158     void testReuseOfClosedConnections() throws Exception {
159         configureServer(bootstrap -> bootstrap
160                 .setHttpProcessor(HttpProcessors.customServer(null)
161                         .add(new AlwaysCloseConn())
162                         .build()));
163         final HttpHost target = startServer();
164 
165         final TestClient client = client();
166         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
167         connManager.setMaxTotal(5);
168         connManager.setDefaultMaxPerRoute(5);
169 
170         final WorkerThread[] workers = new WorkerThread[10];
171         for (int i = 0; i < workers.length; i++) {
172             workers[i] = new WorkerThread(
173                     client,
174                     target,
175                     new URI("/random/2000"),
176                     10, false);
177         }
178 
179         for (final WorkerThread worker : workers) {
180             worker.start();
181         }
182         for (final WorkerThread worker : workers) {
183             worker.join(10000);
184             final Exception ex = worker.getException();
185             if (ex != null) {
186                 throw ex;
187             }
188         }
189 
190         // Expect leased connections to be returned
191         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
192         // Expect zero connections in the pool
193         Assertions.assertEquals(0, connManager.getTotalStats().getAvailable());
194     }
195 
196     @Test
197     void testReuseOfAbortedConnections() throws Exception {
198         configureServer(bootstrap -> bootstrap
199                 .register("/random/*", new RandomHandler()));
200         final HttpHost target = startServer();
201 
202         final TestClient client = client();
203         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
204         connManager.setMaxTotal(5);
205         connManager.setDefaultMaxPerRoute(5);
206 
207         final WorkerThread[] workers = new WorkerThread[10];
208         for (int i = 0; i < workers.length; i++) {
209             workers[i] = new WorkerThread(
210                     client,
211                     target,
212                     new URI("/random/2000"),
213                     10, true);
214         }
215 
216         for (final WorkerThread worker : workers) {
217             worker.start();
218         }
219         for (final WorkerThread worker : workers) {
220             worker.join(10000);
221             final Exception ex = worker.getException();
222             if (ex != null) {
223                 throw ex;
224             }
225         }
226 
227         // Expect leased connections to be returned
228         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
229         // Expect some connections in the pool
230         Assertions.assertTrue(connManager.getTotalStats().getAvailable() > 0);
231     }
232 
233     @Test
234     void testKeepAliveHeaderRespected() throws Exception {
235         configureServer(bootstrap -> bootstrap
236                 .setHttpProcessor(HttpProcessors.customServer(null)
237                         .add(new ResponseKeepAlive())
238                         .build())
239                 .register("/random/*", new RandomHandler()));
240         final HttpHost target = startServer();
241 
242         final TestClient client = client();
243         final PoolingHttpClientConnectionManager connManager = client.getConnectionManager();
244         connManager.setMaxTotal(1);
245         connManager.setDefaultMaxPerRoute(1);
246 
247         client.execute(target, new HttpGet("/random/2000"), response -> {
248             EntityUtils.consume(response.getEntity());
249             return null;
250         });
251 
252         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
253 
254         client.execute(target, new HttpGet("/random/2000"), response -> {
255             EntityUtils.consume(response.getEntity());
256             return null;
257         });
258 
259         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
260 
261         // Now sleep for 1.1 seconds and let the timeout do its work
262         Thread.sleep(1100);
263         client.execute(target, new HttpGet("/random/2000"), response -> {
264             EntityUtils.consume(response.getEntity());
265             return null;
266         });
267 
268         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
269 
270         // Do another request just under the 1 second limit & make
271         // sure we reuse that connection.
272         Thread.sleep(500);
273         client.execute(target, new HttpGet("/random/2000"), response -> {
274             EntityUtils.consume(response.getEntity());
275             return null;
276         });
277 
278         // Expect leased connections to be returned
279         Assertions.assertEquals(0, connManager.getTotalStats().getLeased());
280         Assertions.assertEquals(1, connManager.getTotalStats().getAvailable());
281     }
282 
283     private static class WorkerThread extends Thread {
284 
285         private final HttpHost target;
286         private final CloseableHttpClient httpclient;
287         private final boolean forceClose;
288         private final List<ClassicHttpRequest> requests;
289 
290         private volatile Exception exception;
291 
292         public WorkerThread(
293                 final CloseableHttpClient httpclient,
294                 final HttpHost target,
295                 final URI requestURI,
296                 final int repetitions,
297                 final boolean forceClose) {
298             super();
299             this.httpclient = httpclient;
300             this.target = target;
301             this.forceClose = forceClose;
302             this.requests = new ArrayList<>(repetitions);
303             for (int i = 0; i < repetitions; i++) {
304                 requests.add(new HttpGet(requestURI));
305             }
306         }
307 
308         public WorkerThread(
309                 final CloseableHttpClient httpclient,
310                 final HttpHost target,
311                 final boolean forceClose,
312                 final List<ClassicHttpRequest> requests) {
313             super();
314             this.httpclient = httpclient;
315             this.target = target;
316             this.forceClose = forceClose;
317             this.requests = requests;
318         }
319 
320         @Override
321         public void run() {
322             try {
323                 for (final ClassicHttpRequest request : requests) {
324                     this.httpclient.execute(this.target, request, response -> {
325                         if (this.forceClose) {
326                             response.close();
327                         } else {
328                             EntityUtils.consume(response.getEntity());
329                         }
330                         return null;
331                     });
332                 }
333             } catch (final Exception ex) {
334                 this.exception = ex;
335             }
336         }
337 
338         public Exception getException() {
339             return exception;
340         }
341 
342     }
343 
344     // A very basic keep-alive header interceptor, to add Keep-Alive: timeout=1
345     // if there is no Connection: close header.
346     private static class ResponseKeepAlive implements HttpResponseInterceptor {
347         @Override
348         public void process(
349                 final HttpResponse response,
350                 final EntityDetails entityDetails,
351                 final HttpContext context) {
352             final Header connection = response.getFirstHeader(HttpHeaders.CONNECTION);
353             if(connection != null) {
354                 if(!connection.getValue().equalsIgnoreCase("Close")) {
355                     response.addHeader(HeaderElements.KEEP_ALIVE, "timeout=1");
356                 }
357             }
358         }
359     }
360 
361 }