1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.io.IOException;
30 import java.util.Queue;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.CountDownLatch;
33
34 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
35 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
36 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
37 import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
38 import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
39 import org.apache.hc.client5.http.protocol.HttpClientContext;
40 import org.apache.hc.client5.testing.Result;
41 import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
42 import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
43 import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
44 import org.apache.hc.core5.concurrent.FutureCallback;
45 import org.apache.hc.core5.http.ContentType;
46 import org.apache.hc.core5.http.EntityDetails;
47 import org.apache.hc.core5.http.HttpException;
48 import org.apache.hc.core5.http.HttpHost;
49 import org.apache.hc.core5.http.HttpRequest;
50 import org.apache.hc.core5.http.HttpStatus;
51 import org.apache.hc.core5.http.RequestNotExecutedException;
52 import org.apache.hc.core5.http.URIScheme;
53 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
54 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
55 import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
56 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
57 import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
58 import org.apache.hc.core5.http.nio.support.AbstractAsyncRequesterConsumer;
59 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
60 import org.apache.hc.core5.http.nio.support.BasicPushProducer;
61 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
62 import org.apache.hc.core5.http.protocol.HttpContext;
63 import org.apache.hc.core5.http.support.BasicRequestBuilder;
64 import org.apache.hc.core5.http.support.BasicResponseBuilder;
65 import org.apache.hc.core5.http2.config.H2Config;
66 import org.junit.jupiter.api.Assertions;
67 import org.junit.jupiter.api.Test;
68
69 abstract class AbstractH2AsyncFundamentalsTest extends AbstractHttpAsyncFundamentalsTest {
70
71 public AbstractH2AsyncFundamentalsTest(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final ServerProtocolLevel serverProtocolLevel) {
72 super(scheme, clientProtocolLevel, serverProtocolLevel);
73 }
74
75 @Test
76 void testPush() throws Exception {
77 configureServer(bootstrap -> bootstrap
78 .register("/pushy", () -> new AbstractServerExchangeHandler<HttpRequest>() {
79
80 @Override
81 protected AsyncRequestConsumer<HttpRequest> supplyConsumer(
82 final HttpRequest request,
83 final EntityDetails entityDetails,
84 final HttpContext context) throws HttpException {
85
86 return new AbstractAsyncRequesterConsumer<HttpRequest, Void>(new DiscardingEntityConsumer<>()) {
87
88 @Override
89 protected HttpRequest buildResult(final HttpRequest request, final Void entity, final ContentType contentType) {
90 return request;
91 }
92
93 };
94 }
95
96 @Override
97 protected void handle(
98 final HttpRequest request,
99 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
100 final HttpContext context) throws HttpException, IOException {
101 responseTrigger.pushPromise(
102 BasicRequestBuilder.copy(request)
103 .setPath("/aaa")
104 .build(),
105 context,
106 new BasicPushProducer(BasicResponseBuilder.create(HttpStatus.SC_OK)
107 .build(),
108 new StringAsyncEntityProducer("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", ContentType.TEXT_PLAIN)));
109 responseTrigger.pushPromise(
110 BasicRequestBuilder.copy(request)
111 .setPath("/bbb")
112 .build(),
113 context,
114 new BasicPushProducer(
115 BasicResponseBuilder.create(HttpStatus.SC_OK).build(),
116 new StringAsyncEntityProducer("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", ContentType.TEXT_PLAIN)));
117 responseTrigger.submitResponse(
118 new BasicResponseProducer(
119 BasicResponseBuilder.create(HttpStatus.SC_OK).build(),
120 new StringAsyncEntityProducer("I am being very pushy")
121 ),
122 context);
123 }
124
125 }));
126
127 configureClient(builder -> builder
128 .setH2Config(H2Config.custom()
129 .setPushEnabled(true)
130 .build()));
131
132 final HttpHost target = startServer();
133
134 final TestAsyncClient client = startClient();
135
136 client.start();
137
138 final Queue<Result<String>> pushMessageQueue = new ConcurrentLinkedQueue<>();
139 final CountDownLatch latch = new CountDownLatch(3);
140 final HttpClientContext context = HttpClientContext.create();
141 final SimpleHttpRequest request = SimpleRequestBuilder.get()
142 .setHttpHost(target)
143 .setPath("/pushy")
144 .build();
145 client.execute(
146 SimpleRequestProducer.create(request),
147 SimpleResponseConsumer.create(),
148 (r, c) -> new AbstractAsyncPushHandler<SimpleHttpResponse>(SimpleResponseConsumer.create()) {
149
150 @Override
151 protected void handleResponse(final HttpRequest promise,
152 final SimpleHttpResponse response) throws IOException, HttpException {
153 pushMessageQueue.add(new Result<>(promise, response, response.getBodyText()));
154 latch.countDown();
155 }
156
157 @Override
158 protected void handleError(final HttpRequest promise, final Exception cause) {
159 pushMessageQueue.add(new Result<>(promise, cause));
160 latch.countDown();
161 }
162
163 },
164 context,
165 new FutureCallback<SimpleHttpResponse>() {
166
167 @Override
168 public void completed(final SimpleHttpResponse response) {
169 pushMessageQueue.add(new Result<>(request, response, response.getBodyText()));
170 latch.countDown();
171 }
172
173 @Override
174 public void failed(final Exception ex) {
175 pushMessageQueue.add(new Result<>(request, ex));
176 latch.countDown();
177 }
178
179 @Override
180 public void cancelled() {
181 pushMessageQueue.add(new Result<>(request, new RequestNotExecutedException()));
182 latch.countDown();
183 }
184
185 }
186 );
187 Assertions.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
188 Assertions.assertEquals(3, pushMessageQueue.size());
189 for (final Result<String> result : pushMessageQueue) {
190 if (result.isOK()) {
191 Assertions.assertEquals(HttpStatus.SC_OK, result.response.getCode());
192 final String path = result.request.getPath();
193 if (path.equals("/pushy")) {
194 Assertions.assertEquals("I am being very pushy", result.content);
195 } else if (path.equals("/aaa")) {
196 Assertions.assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", result.content);
197 } else if (path.equals("/bbb")) {
198 Assertions.assertEquals("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", result.content);
199 } else {
200 Assertions.fail("Unxpected request path: " + path);
201 }
202 }
203 }
204 }
205
206 }