1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import static java.util.concurrent.TimeUnit.MILLISECONDS;
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.nio.ByteBuffer;
33
34 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
35 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
36 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
37 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
38 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
39 import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
40 import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
41 import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
42 import org.apache.hc.core5.http.HeaderElements;
43 import org.apache.hc.core5.http.HttpHeaders;
44 import org.apache.hc.core5.http.HttpHost;
45 import org.apache.hc.core5.http.HttpResponse;
46 import org.apache.hc.core5.http.Message;
47 import org.apache.hc.core5.http.URIScheme;
48 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
49 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
50 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
51 import org.apache.hc.core5.reactive.ReactiveServerExchangeHandler;
52 import org.apache.hc.core5.testing.reactive.ReactiveRandomProcessor;
53 import org.hamcrest.CoreMatchers;
54 import org.junit.jupiter.api.Test;
55 import org.junit.jupiter.api.Timeout;
56 import org.junit.jupiter.params.ParameterizedTest;
57 import org.junit.jupiter.params.provider.ValueSource;
58 import org.reactivestreams.Publisher;
59
60 abstract class TestHttp1Reactive extends AbstractHttpReactiveFundamentalsTest {
61
62 public TestHttp1Reactive(final URIScheme scheme) {
63 super(scheme, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD);
64 }
65
66 @ParameterizedTest(name = "{displayName}; concurrent connections: {0}")
67 @ValueSource(ints = {5, 1, 20})
68 @Timeout(value = 60_000, unit = MILLISECONDS)
69 public void testSequentialGetRequestsCloseConnection(final int concurrentConns) throws Exception {
70 configureServer(bootstrap -> bootstrap.register("/random/*", () ->
71 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor())));
72 final HttpHost target = startServer();
73
74 final TestAsyncClient client = startClient();
75
76 final PoolingAsyncClientConnectionManager connManager = client.getConnectionManager();
77 connManager.setDefaultMaxPerRoute(concurrentConns);
78 connManager.setMaxTotal(100);
79
80 for (int i = 0; i < 3; i++) {
81 final SimpleHttpRequest get = SimpleRequestBuilder.get()
82 .setHttpHost(target)
83 .setPath("/random/2048")
84 .build();
85 get.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
86 final AsyncRequestProducer request = AsyncRequestBuilder.get(target + "/random/2048").build();
87 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
88
89 client.execute(request, consumer, null);
90
91 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
92 assertThat(response, CoreMatchers.notNullValue());
93 assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200));
94 final String body = publisherToString(response.getBody());
95 assertThat(body, CoreMatchers.notNullValue());
96 assertThat(body.length(), CoreMatchers.equalTo(2048));
97 }
98 }
99
100 @Test
101 @Timeout(value = 60_000, unit = MILLISECONDS)
102 public void testSharedPool() throws Exception {
103 configureServer(bootstrap -> bootstrap.register("/random/*", () ->
104 new ReactiveServerExchangeHandler(new ReactiveRandomProcessor())));
105 final HttpHost target = startServer();
106
107 final TestAsyncClient client = startClient();
108
109 final PoolingAsyncClientConnectionManager connManager = client.getConnectionManager();
110
111 final AsyncRequestProducer request1 = AsyncRequestBuilder.get(target + "/random/2048").build();
112 final ReactiveResponseConsumer consumer1 = new ReactiveResponseConsumer();
113
114 client.execute(request1, consumer1, null);
115
116 final Message<HttpResponse, Publisher<ByteBuffer>> response1 = consumer1.getResponseFuture().get();
117 assertThat(response1, CoreMatchers.notNullValue());
118 assertThat(response1.getHead(), CoreMatchers.notNullValue());
119 assertThat(response1.getHead().getCode(), CoreMatchers.equalTo(200));
120 final String body1 = publisherToString(response1.getBody());
121 assertThat(body1, CoreMatchers.notNullValue());
122 assertThat(body1.length(), CoreMatchers.equalTo(2048));
123
124
125 try (final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
126 .setConnectionManager(connManager)
127 .setConnectionManagerShared(true)
128 .build()) {
129 httpclient2.start();
130 final AsyncRequestProducer request2 = AsyncRequestBuilder.get(target + "/random/2048").build();
131 final ReactiveResponseConsumer consumer2 = new ReactiveResponseConsumer();
132
133 httpclient2.execute(request2, consumer2, null);
134
135 final Message<HttpResponse, Publisher<ByteBuffer>> response2 = consumer2.getResponseFuture().get();
136 assertThat(response2, CoreMatchers.notNullValue());
137 assertThat(response2.getHead().getCode(), CoreMatchers.equalTo(200));
138 final String body2 = publisherToString(response2.getBody());
139 assertThat(body2, CoreMatchers.notNullValue());
140 assertThat(body2.length(), CoreMatchers.equalTo(2048));
141 }
142
143 final AsyncRequestProducer request3 = AsyncRequestBuilder.get(target + "/random/2048").build();
144 final ReactiveResponseConsumer consumer3 = new ReactiveResponseConsumer();
145
146 client.execute(request3, consumer3, null);
147
148 final Message<HttpResponse, Publisher<ByteBuffer>> response3 = consumer3.getResponseFuture().get();
149 assertThat(response3, CoreMatchers.notNullValue());
150 assertThat(response3.getHead().getCode(), CoreMatchers.equalTo(200));
151 final String body3 = publisherToString(response3.getBody());
152 assertThat(body3, CoreMatchers.notNullValue());
153 assertThat(body3.length(), CoreMatchers.equalTo(2048));
154 }
155
156 }