1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.CancellationException;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Function;
42
43 import org.apache.hc.client5.http.HttpRoute;
44 import org.apache.hc.client5.http.async.AsyncExecCallback;
45 import org.apache.hc.client5.http.async.AsyncExecChain;
46 import org.apache.hc.client5.http.async.AsyncExecRuntime;
47 import org.apache.hc.client5.http.auth.AuthSchemeFactory;
48 import org.apache.hc.client5.http.auth.CredentialsProvider;
49 import org.apache.hc.client5.http.config.Configurable;
50 import org.apache.hc.client5.http.config.RequestConfig;
51 import org.apache.hc.client5.http.cookie.CookieSpecFactory;
52 import org.apache.hc.client5.http.cookie.CookieStore;
53 import org.apache.hc.client5.http.impl.ExecSupport;
54 import org.apache.hc.client5.http.protocol.HttpClientContext;
55 import org.apache.hc.client5.http.routing.RoutingSupport;
56 import org.apache.hc.core5.concurrent.Cancellable;
57 import org.apache.hc.core5.concurrent.ComplexFuture;
58 import org.apache.hc.core5.concurrent.DefaultThreadFactory;
59 import org.apache.hc.core5.concurrent.FutureCallback;
60 import org.apache.hc.core5.http.EntityDetails;
61 import org.apache.hc.core5.http.HttpException;
62 import org.apache.hc.core5.http.HttpHost;
63 import org.apache.hc.core5.http.HttpRequest;
64 import org.apache.hc.core5.http.HttpResponse;
65 import org.apache.hc.core5.http.HttpStatus;
66 import org.apache.hc.core5.http.config.Lookup;
67 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
68 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
69 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
70 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
71 import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
72 import org.apache.hc.core5.http.nio.DataStreamChannel;
73 import org.apache.hc.core5.http.nio.HandlerFactory;
74 import org.apache.hc.core5.http.protocol.HttpContext;
75 import org.apache.hc.core5.http.support.BasicRequestBuilder;
76 import org.apache.hc.core5.io.CloseMode;
77 import org.apache.hc.core5.io.ModalCloseable;
78 import org.apache.hc.core5.net.URIAuthority;
79 import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
80 import org.apache.hc.core5.util.TimeValue;
81 import org.slf4j.Logger;
82 import org.slf4j.LoggerFactory;
83
84 abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
85
86 private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
87
88 private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
89
90 private final AsyncExecChainElement execChain;
91 private final Lookup<CookieSpecFactory> cookieSpecRegistry;
92 private final Lookup<AuthSchemeFactory> authSchemeRegistry;
93 private final CookieStore cookieStore;
94 private final CredentialsProvider credentialsProvider;
95 private final Function<HttpContext, HttpClientContext> contextAdaptor;
96 private final RequestConfig defaultConfig;
97 private final ConcurrentLinkedQueue<Closeable> closeables;
98 private final ScheduledExecutorService scheduledExecutorService;
99 private final AsyncExecChain.Scheduler scheduler;
100
101 InternalAbstractHttpAsyncClient(
102 final DefaultConnectingIOReactor ioReactor,
103 final AsyncPushConsumerRegistry pushConsumerRegistry,
104 final ThreadFactory threadFactory,
105 final AsyncExecChainElement execChain,
106 final Lookup<CookieSpecFactory> cookieSpecRegistry,
107 final Lookup<AuthSchemeFactory> authSchemeRegistry,
108 final CookieStore cookieStore,
109 final CredentialsProvider credentialsProvider,
110 final Function<HttpContext, HttpClientContext> contextAdaptor,
111 final RequestConfig defaultConfig,
112 final List<Closeable> closeables) {
113 super(ioReactor, pushConsumerRegistry, threadFactory);
114 this.execChain = execChain;
115 this.cookieSpecRegistry = cookieSpecRegistry;
116 this.authSchemeRegistry = authSchemeRegistry;
117 this.cookieStore = cookieStore;
118 this.credentialsProvider = credentialsProvider;
119 this.contextAdaptor = contextAdaptor;
120 this.defaultConfig = defaultConfig;
121 this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
122 this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
123 this.scheduler = new AsyncExecChain.Scheduler() {
124
125 @Override
126 public void scheduleExecution(
127 final HttpRequest request,
128 final AsyncEntityProducer entityProducer,
129 final AsyncExecChain.Scope scope,
130 final AsyncExecCallback asyncExecCallback,
131 final TimeValue delay) {
132 executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay);
133 }
134
135 @Override
136 public void scheduleExecution(
137 final HttpRequest request,
138 final AsyncEntityProducer entityProducer,
139 final AsyncExecChain.Scope scope,
140 final AsyncExecChain chain,
141 final AsyncExecCallback asyncExecCallback,
142 final TimeValue delay) {
143 executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay);
144 }
145 };
146
147 }
148
149 @Override
150 void internalClose(final CloseMode closeMode) {
151 if (this.closeables != null) {
152 Closeable closeable;
153 while ((closeable = this.closeables.poll()) != null) {
154 try {
155 if (closeable instanceof ModalCloseable) {
156 ((ModalCloseable) closeable).close(closeMode);
157 } else {
158 closeable.close();
159 }
160 } catch (final IOException ex) {
161 LOG.error(ex.getMessage(), ex);
162 }
163 }
164 }
165 final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
166 for (final Runnable runnable: runnables) {
167 if (runnable instanceof Cancellable) {
168 ((Cancellable) runnable).cancel();
169 }
170 }
171 }
172
173 private void setupContext(final HttpClientContext context) {
174 if (context.getAuthSchemeRegistry() == null) {
175 context.setAuthSchemeRegistry(authSchemeRegistry);
176 }
177 if (context.getCookieSpecRegistry() == null) {
178 context.setCookieSpecRegistry(cookieSpecRegistry);
179 }
180 if (context.getCookieStore() == null) {
181 context.setCookieStore(cookieStore);
182 }
183 if (context.getCredentialsProvider() == null) {
184 context.setCredentialsProvider(credentialsProvider);
185 }
186 if (context.getRequestConfig() == null) {
187 context.setRequestConfig(defaultConfig);
188 }
189 }
190
191 abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
192
193 abstract HttpRoute determineRoute(HttpHost httpHost, HttpRequest request, HttpClientContext clientContext) throws HttpException;
194
195 @Override
196 protected <T> Future<T> doExecute(
197 final HttpHost target,
198 final AsyncRequestProducer requestProducer,
199 final AsyncResponseConsumer<T> responseConsumer,
200 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
201 final HttpContext context,
202 final FutureCallback<T> callback) {
203 final ComplexFuture<T> future = new ComplexFuture<>(callback);
204 try {
205 if (!isRunning()) {
206 throw new CancellationException("Request execution cancelled");
207 }
208 final HttpClientContext clientContext = contextAdaptor.apply(context);
209 requestProducer.sendRequest((request, entityDetails, c) -> {
210
211 RequestConfig requestConfig = null;
212 if (request instanceof Configurable) {
213 requestConfig = ((Configurable) request).getConfig();
214 }
215 if (requestConfig != null) {
216 clientContext.setRequestConfig(requestConfig);
217 }
218
219 setupContext(clientContext);
220
221 final HttpHost resolvedTarget = target != null ? target : RoutingSupport.determineHost(request);
222 if (resolvedTarget != null) {
223 if (request.getScheme() == null) {
224 request.setScheme(resolvedTarget.getSchemeName());
225 }
226 if (request.getAuthority() == null) {
227 request.setAuthority(new URIAuthority(resolvedTarget));
228 }
229 }
230
231 final HttpRoute route = determineRoute(
232 resolvedTarget,
233 request,
234 clientContext);
235 final String exchangeId = ExecSupport.getNextExchangeId();
236 clientContext.setExchangeId(exchangeId);
237 if (LOG.isDebugEnabled()) {
238 LOG.debug("{} preparing request execution", exchangeId);
239 }
240 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
241
242 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
243 clientContext, execRuntime, scheduler, new AtomicInteger(1));
244 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
245 executeImmediate(
246 BasicRequestBuilder.copy(request).build(),
247 entityDetails != null ? new AsyncEntityProducer() {
248
249 @Override
250 public void releaseResources() {
251 requestProducer.releaseResources();
252 }
253
254 @Override
255 public void failed(final Exception cause) {
256 requestProducer.failed(cause);
257 }
258
259 @Override
260 public boolean isRepeatable() {
261 return requestProducer.isRepeatable();
262 }
263
264 @Override
265 public long getContentLength() {
266 return entityDetails.getContentLength();
267 }
268
269 @Override
270 public String getContentType() {
271 return entityDetails.getContentType();
272 }
273
274 @Override
275 public String getContentEncoding() {
276 return entityDetails.getContentEncoding();
277 }
278
279 @Override
280 public boolean isChunked() {
281 return entityDetails.isChunked();
282 }
283
284 @Override
285 public Set<String> getTrailerNames() {
286 return entityDetails.getTrailerNames();
287 }
288
289 @Override
290 public int available() {
291 return requestProducer.available();
292 }
293
294 @Override
295 public void produce(final DataStreamChannel channel) throws IOException {
296 if (outputTerminated.get()) {
297 channel.endStream();
298 return;
299 }
300 requestProducer.produce(channel);
301 }
302
303 } : null,
304 scope,
305 execChain::execute,
306 new AsyncExecCallback() {
307
308 @Override
309 public AsyncDataConsumer handleResponse(
310 final HttpResponse response,
311 final EntityDetails entityDetails) throws HttpException, IOException {
312 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
313 outputTerminated.set(true);
314 requestProducer.releaseResources();
315 }
316 responseConsumer.consumeResponse(response, entityDetails, c,
317 new FutureCallback<T>() {
318
319 @Override
320 public void completed(final T result) {
321 future.completed(result);
322 }
323
324 @Override
325 public void failed(final Exception ex) {
326 future.failed(ex);
327 }
328
329 @Override
330 public void cancelled() {
331 future.cancel();
332 }
333
334 });
335 return entityDetails != null ? responseConsumer : null;
336 }
337
338 @Override
339 public void handleInformationResponse(
340 final HttpResponse response) throws HttpException, IOException {
341 responseConsumer.informationResponse(response, c);
342 }
343
344 @Override
345 public void completed() {
346 if (LOG.isDebugEnabled()) {
347 LOG.debug("{} message exchange successfully completed", exchangeId);
348 }
349 try {
350 execRuntime.releaseEndpoint();
351 } finally {
352 responseConsumer.releaseResources();
353 requestProducer.releaseResources();
354 }
355 }
356
357 @Override
358 public void failed(final Exception cause) {
359 if (LOG.isDebugEnabled()) {
360 LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
361 }
362 try {
363 execRuntime.discardEndpoint();
364 responseConsumer.failed(cause);
365 } finally {
366 try {
367 future.failed(cause);
368 } finally {
369 responseConsumer.releaseResources();
370 requestProducer.releaseResources();
371 }
372 }
373 }
374
375 });
376 }, clientContext);
377 } catch (final HttpException | IOException | IllegalStateException ex) {
378 future.failed(ex);
379 }
380 return future;
381 }
382
383 void executeImmediate(
384 final HttpRequest request,
385 final AsyncEntityProducer entityProducer,
386 final AsyncExecChain.Scope scope,
387 final AsyncExecChain chain,
388 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
389 chain.proceed(request, entityProducer, scope, asyncExecCallback);
390 }
391
392 void executeScheduled(
393 final HttpRequest request,
394 final AsyncEntityProducer entityProducer,
395 final AsyncExecChain.Scope scope,
396 final AsyncExecChain chain,
397 final AsyncExecCallback asyncExecCallback,
398 final TimeValue delay) {
399 final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
400 request, entityProducer, scope, chain, asyncExecCallback, delay);
401 if (TimeValue.isPositive(delay)) {
402 scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
403 } else {
404 scheduledExecutorService.execute(scheduledTask);
405 }
406 }
407
408 class ScheduledRequestExecution implements Runnable, Cancellable {
409
410 final HttpRequest request;
411 final AsyncEntityProducer entityProducer;
412 final AsyncExecChain.Scope scope;
413 final AsyncExecChain chain;
414 final AsyncExecCallback asyncExecCallback;
415 final TimeValue delay;
416
417 ScheduledRequestExecution(final HttpRequest request,
418 final AsyncEntityProducer entityProducer,
419 final AsyncExecChain.Scope scope,
420 final AsyncExecChain chain,
421 final AsyncExecCallback asyncExecCallback,
422 final TimeValue delay) {
423 this.request = request;
424 this.entityProducer = entityProducer;
425 this.scope = scope;
426 this.chain = chain;
427 this.asyncExecCallback = asyncExecCallback;
428 this.delay = delay;
429 }
430
431 @Override
432 public void run() {
433 try {
434 chain.proceed(request, entityProducer, scope, asyncExecCallback);
435 } catch (final Exception ex) {
436 asyncExecCallback.failed(ex);
437 }
438 }
439
440 @Override
441 public boolean cancel() {
442 asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
443 return true;
444 }
445
446 }
447
448 }