View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.Set;
33  import java.util.concurrent.CancellationException;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.ScheduledExecutorService;
38  import java.util.concurrent.ThreadFactory;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.function.Function;
42  
43  import org.apache.hc.client5.http.HttpRoute;
44  import org.apache.hc.client5.http.async.AsyncExecCallback;
45  import org.apache.hc.client5.http.async.AsyncExecChain;
46  import org.apache.hc.client5.http.async.AsyncExecRuntime;
47  import org.apache.hc.client5.http.auth.AuthSchemeFactory;
48  import org.apache.hc.client5.http.auth.CredentialsProvider;
49  import org.apache.hc.client5.http.config.Configurable;
50  import org.apache.hc.client5.http.config.RequestConfig;
51  import org.apache.hc.client5.http.cookie.CookieSpecFactory;
52  import org.apache.hc.client5.http.cookie.CookieStore;
53  import org.apache.hc.client5.http.impl.ExecSupport;
54  import org.apache.hc.client5.http.protocol.HttpClientContext;
55  import org.apache.hc.client5.http.routing.RoutingSupport;
56  import org.apache.hc.core5.concurrent.Cancellable;
57  import org.apache.hc.core5.concurrent.ComplexFuture;
58  import org.apache.hc.core5.concurrent.DefaultThreadFactory;
59  import org.apache.hc.core5.concurrent.FutureCallback;
60  import org.apache.hc.core5.http.EntityDetails;
61  import org.apache.hc.core5.http.HttpException;
62  import org.apache.hc.core5.http.HttpHost;
63  import org.apache.hc.core5.http.HttpRequest;
64  import org.apache.hc.core5.http.HttpResponse;
65  import org.apache.hc.core5.http.HttpStatus;
66  import org.apache.hc.core5.http.config.Lookup;
67  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
68  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
69  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
70  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
71  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
72  import org.apache.hc.core5.http.nio.DataStreamChannel;
73  import org.apache.hc.core5.http.nio.HandlerFactory;
74  import org.apache.hc.core5.http.protocol.HttpContext;
75  import org.apache.hc.core5.http.support.BasicRequestBuilder;
76  import org.apache.hc.core5.io.CloseMode;
77  import org.apache.hc.core5.io.ModalCloseable;
78  import org.apache.hc.core5.net.URIAuthority;
79  import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
80  import org.apache.hc.core5.util.TimeValue;
81  import org.slf4j.Logger;
82  import org.slf4j.LoggerFactory;
83  
84  abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
85  
86      private final static ThreadFactory SCHEDULER_THREAD_FACTORY = new DefaultThreadFactory("Scheduled-executor", true);
87  
88      private static final Logger LOG = LoggerFactory.getLogger(InternalAbstractHttpAsyncClient.class);
89  
90      private final AsyncExecChainElement execChain;
91      private final Lookup<CookieSpecFactory> cookieSpecRegistry;
92      private final Lookup<AuthSchemeFactory> authSchemeRegistry;
93      private final CookieStore cookieStore;
94      private final CredentialsProvider credentialsProvider;
95      private final Function<HttpContext, HttpClientContext> contextAdaptor;
96      private final RequestConfig defaultConfig;
97      private final ConcurrentLinkedQueue<Closeable> closeables;
98      private final ScheduledExecutorService scheduledExecutorService;
99      private final AsyncExecChain.Scheduler scheduler;
100 
101     InternalAbstractHttpAsyncClient(
102             final DefaultConnectingIOReactor ioReactor,
103             final AsyncPushConsumerRegistry pushConsumerRegistry,
104             final ThreadFactory threadFactory,
105             final AsyncExecChainElement execChain,
106             final Lookup<CookieSpecFactory> cookieSpecRegistry,
107             final Lookup<AuthSchemeFactory> authSchemeRegistry,
108             final CookieStore cookieStore,
109             final CredentialsProvider credentialsProvider,
110             final Function<HttpContext, HttpClientContext> contextAdaptor,
111             final RequestConfig defaultConfig,
112             final List<Closeable> closeables) {
113         super(ioReactor, pushConsumerRegistry, threadFactory);
114         this.execChain = execChain;
115         this.cookieSpecRegistry = cookieSpecRegistry;
116         this.authSchemeRegistry = authSchemeRegistry;
117         this.cookieStore = cookieStore;
118         this.credentialsProvider = credentialsProvider;
119         this.contextAdaptor = contextAdaptor;
120         this.defaultConfig = defaultConfig;
121         this.closeables = closeables != null ? new ConcurrentLinkedQueue<>(closeables) : null;
122         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(SCHEDULER_THREAD_FACTORY);
123         this.scheduler = new AsyncExecChain.Scheduler() {
124 
125             @Override
126             public void scheduleExecution(
127                     final HttpRequest request,
128                     final AsyncEntityProducer entityProducer,
129                     final AsyncExecChain.Scope scope,
130                     final AsyncExecCallback asyncExecCallback,
131                     final TimeValue delay) {
132                 executeScheduled(request, entityProducer, scope, execChain::execute, asyncExecCallback, delay);
133             }
134 
135             @Override
136             public void scheduleExecution(
137                     final HttpRequest request,
138                     final AsyncEntityProducer entityProducer,
139                     final AsyncExecChain.Scope scope,
140                     final AsyncExecChain chain,
141                     final AsyncExecCallback asyncExecCallback,
142                     final TimeValue delay) {
143                 executeScheduled(request, entityProducer, scope, chain, asyncExecCallback, delay);
144             }
145         };
146 
147     }
148 
149     @Override
150     void internalClose(final CloseMode closeMode) {
151         if (this.closeables != null) {
152             Closeable closeable;
153             while ((closeable = this.closeables.poll()) != null) {
154                 try {
155                     if (closeable instanceof ModalCloseable) {
156                         ((ModalCloseable) closeable).close(closeMode);
157                     } else {
158                         closeable.close();
159                     }
160                 } catch (final IOException ex) {
161                     LOG.error(ex.getMessage(), ex);
162                 }
163             }
164         }
165         final List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
166         for (final Runnable runnable: runnables) {
167             if (runnable instanceof Cancellable) {
168                 ((Cancellable) runnable).cancel();
169             }
170         }
171     }
172 
173     private void setupContext(final HttpClientContext context) {
174         if (context.getAuthSchemeRegistry() == null) {
175             context.setAuthSchemeRegistry(authSchemeRegistry);
176         }
177         if (context.getCookieSpecRegistry() == null) {
178             context.setCookieSpecRegistry(cookieSpecRegistry);
179         }
180         if (context.getCookieStore() == null) {
181             context.setCookieStore(cookieStore);
182         }
183         if (context.getCredentialsProvider() == null) {
184             context.setCredentialsProvider(credentialsProvider);
185         }
186         if (context.getRequestConfig() == null) {
187             context.setRequestConfig(defaultConfig);
188         }
189     }
190 
191     abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
192 
193     abstract HttpRoute determineRoute(HttpHost httpHost, HttpRequest request, HttpClientContext clientContext) throws HttpException;
194 
195     @Override
196     protected <T> Future<T> doExecute(
197             final HttpHost target,
198             final AsyncRequestProducer requestProducer,
199             final AsyncResponseConsumer<T> responseConsumer,
200             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
201             final HttpContext context,
202             final FutureCallback<T> callback) {
203         final ComplexFuture<T> future = new ComplexFuture<>(callback);
204         try {
205             if (!isRunning()) {
206                 throw new CancellationException("Request execution cancelled");
207             }
208             final HttpClientContext clientContext = contextAdaptor.apply(context);
209             requestProducer.sendRequest((request, entityDetails, c) -> {
210 
211                 RequestConfig requestConfig = null;
212                 if (request instanceof Configurable) {
213                     requestConfig = ((Configurable) request).getConfig();
214                 }
215                 if (requestConfig != null) {
216                     clientContext.setRequestConfig(requestConfig);
217                 }
218 
219                 setupContext(clientContext);
220 
221                 final HttpHost resolvedTarget = target != null ? target : RoutingSupport.determineHost(request);
222                 if (resolvedTarget != null) {
223                     if (request.getScheme() == null) {
224                         request.setScheme(resolvedTarget.getSchemeName());
225                     }
226                     if (request.getAuthority() == null) {
227                         request.setAuthority(new URIAuthority(resolvedTarget));
228                     }
229                 }
230 
231                 final HttpRoute route = determineRoute(
232                         resolvedTarget,
233                         request,
234                         clientContext);
235                 final String exchangeId = ExecSupport.getNextExchangeId();
236                 clientContext.setExchangeId(exchangeId);
237                 if (LOG.isDebugEnabled()) {
238                     LOG.debug("{} preparing request execution", exchangeId);
239                 }
240                 final AsyncExecRuntime execRuntime = createAsyncExecRuntime(pushHandlerFactory);
241 
242                 final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, future,
243                         clientContext, execRuntime, scheduler, new AtomicInteger(1));
244                 final AtomicBoolean outputTerminated = new AtomicBoolean(false);
245                 executeImmediate(
246                         BasicRequestBuilder.copy(request).build(),
247                         entityDetails != null ? new AsyncEntityProducer() {
248 
249                             @Override
250                             public void releaseResources() {
251                                 requestProducer.releaseResources();
252                             }
253 
254                             @Override
255                             public void failed(final Exception cause) {
256                                 requestProducer.failed(cause);
257                             }
258 
259                             @Override
260                             public boolean isRepeatable() {
261                                 return requestProducer.isRepeatable();
262                             }
263 
264                             @Override
265                             public long getContentLength() {
266                                 return entityDetails.getContentLength();
267                             }
268 
269                             @Override
270                             public String getContentType() {
271                                 return entityDetails.getContentType();
272                             }
273 
274                             @Override
275                             public String getContentEncoding() {
276                                 return entityDetails.getContentEncoding();
277                             }
278 
279                             @Override
280                             public boolean isChunked() {
281                                 return entityDetails.isChunked();
282                             }
283 
284                             @Override
285                             public Set<String> getTrailerNames() {
286                                 return entityDetails.getTrailerNames();
287                             }
288 
289                             @Override
290                             public int available() {
291                                 return requestProducer.available();
292                             }
293 
294                             @Override
295                             public void produce(final DataStreamChannel channel) throws IOException {
296                                 if (outputTerminated.get()) {
297                                     channel.endStream();
298                                     return;
299                                 }
300                                 requestProducer.produce(channel);
301                             }
302 
303                         } : null,
304                         scope,
305                         execChain::execute,
306                         new AsyncExecCallback() {
307 
308                             @Override
309                             public AsyncDataConsumer handleResponse(
310                                     final HttpResponse response,
311                                     final EntityDetails entityDetails) throws HttpException, IOException {
312                                 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
313                                     outputTerminated.set(true);
314                                     requestProducer.releaseResources();
315                                 }
316                                 responseConsumer.consumeResponse(response, entityDetails, c,
317                                         new FutureCallback<T>() {
318 
319                                             @Override
320                                             public void completed(final T result) {
321                                                 future.completed(result);
322                                             }
323 
324                                             @Override
325                                             public void failed(final Exception ex) {
326                                                 future.failed(ex);
327                                             }
328 
329                                             @Override
330                                             public void cancelled() {
331                                                 future.cancel();
332                                             }
333 
334                                         });
335                                 return entityDetails != null ? responseConsumer : null;
336                             }
337 
338                             @Override
339                             public void handleInformationResponse(
340                                     final HttpResponse response) throws HttpException, IOException {
341                                 responseConsumer.informationResponse(response, c);
342                             }
343 
344                             @Override
345                             public void completed() {
346                                 if (LOG.isDebugEnabled()) {
347                                     LOG.debug("{} message exchange successfully completed", exchangeId);
348                                 }
349                                 try {
350                                     execRuntime.releaseEndpoint();
351                                 } finally {
352                                     responseConsumer.releaseResources();
353                                     requestProducer.releaseResources();
354                                 }
355                             }
356 
357                             @Override
358                             public void failed(final Exception cause) {
359                                 if (LOG.isDebugEnabled()) {
360                                     LOG.debug("{} request failed: {}", exchangeId, cause.getMessage());
361                                 }
362                                 try {
363                                     execRuntime.discardEndpoint();
364                                     responseConsumer.failed(cause);
365                                 } finally {
366                                     try {
367                                         future.failed(cause);
368                                     } finally {
369                                         responseConsumer.releaseResources();
370                                         requestProducer.releaseResources();
371                                     }
372                                 }
373                             }
374 
375                         });
376             }, clientContext);
377         } catch (final HttpException | IOException | IllegalStateException ex) {
378             future.failed(ex);
379         }
380         return future;
381     }
382 
383     void executeImmediate(
384             final HttpRequest request,
385             final AsyncEntityProducer entityProducer,
386             final AsyncExecChain.Scope scope,
387             final AsyncExecChain chain,
388             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
389         chain.proceed(request, entityProducer, scope, asyncExecCallback);
390     }
391 
392     void executeScheduled(
393             final HttpRequest request,
394             final AsyncEntityProducer entityProducer,
395             final AsyncExecChain.Scope scope,
396             final AsyncExecChain chain,
397             final AsyncExecCallback asyncExecCallback,
398             final TimeValue delay) {
399         final ScheduledRequestExecution scheduledTask = new ScheduledRequestExecution(
400                 request, entityProducer, scope, chain, asyncExecCallback, delay);
401         if (TimeValue.isPositive(delay)) {
402             scheduledExecutorService.schedule(scheduledTask, delay.getDuration(), delay.getTimeUnit());
403         } else {
404             scheduledExecutorService.execute(scheduledTask);
405         }
406     }
407 
408     class ScheduledRequestExecution implements Runnable, Cancellable {
409 
410         final HttpRequest request;
411         final AsyncEntityProducer entityProducer;
412         final AsyncExecChain.Scope scope;
413         final AsyncExecChain chain;
414         final AsyncExecCallback asyncExecCallback;
415         final TimeValue delay;
416 
417         ScheduledRequestExecution(final HttpRequest request,
418                                   final AsyncEntityProducer entityProducer,
419                                   final AsyncExecChain.Scope scope,
420                                   final AsyncExecChain chain,
421                                   final AsyncExecCallback asyncExecCallback,
422                                   final TimeValue delay) {
423             this.request = request;
424             this.entityProducer = entityProducer;
425             this.scope = scope;
426             this.chain = chain;
427             this.asyncExecCallback = asyncExecCallback;
428             this.delay = delay;
429         }
430 
431         @Override
432         public void run() {
433             try {
434                 chain.proceed(request, entityProducer, scope, asyncExecCallback);
435             } catch (final Exception ex) {
436                 asyncExecCallback.failed(ex);
437             }
438         }
439 
440         @Override
441         public boolean cancel() {
442             asyncExecCallback.failed(new CancellationException("Request execution cancelled"));
443             return true;
444         }
445 
446     }
447 
448 }