View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.ByteBuffer;
33  import java.util.List;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.hc.client5.http.AuthenticationStrategy;
37  import org.apache.hc.client5.http.EndpointInfo;
38  import org.apache.hc.client5.http.HttpRoute;
39  import org.apache.hc.client5.http.RouteTracker;
40  import org.apache.hc.client5.http.SchemePortResolver;
41  import org.apache.hc.client5.http.async.AsyncExecCallback;
42  import org.apache.hc.client5.http.async.AsyncExecChain;
43  import org.apache.hc.client5.http.async.AsyncExecChainHandler;
44  import org.apache.hc.client5.http.async.AsyncExecRuntime;
45  import org.apache.hc.client5.http.auth.AuthExchange;
46  import org.apache.hc.client5.http.auth.AuthenticationException;
47  import org.apache.hc.client5.http.auth.ChallengeType;
48  import org.apache.hc.client5.http.auth.MalformedChallengeException;
49  import org.apache.hc.client5.http.config.RequestConfig;
50  import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
51  import org.apache.hc.client5.http.impl.auth.AuthenticationHandler;
52  import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
53  import org.apache.hc.client5.http.protocol.HttpClientContext;
54  import org.apache.hc.client5.http.routing.HttpRouteDirector;
55  import org.apache.hc.core5.annotation.Contract;
56  import org.apache.hc.core5.annotation.Internal;
57  import org.apache.hc.core5.annotation.ThreadingBehavior;
58  import org.apache.hc.core5.concurrent.CancellableDependency;
59  import org.apache.hc.core5.concurrent.FutureCallback;
60  import org.apache.hc.core5.http.EntityDetails;
61  import org.apache.hc.core5.http.Header;
62  import org.apache.hc.core5.http.HttpException;
63  import org.apache.hc.core5.http.HttpHost;
64  import org.apache.hc.core5.http.HttpRequest;
65  import org.apache.hc.core5.http.HttpResponse;
66  import org.apache.hc.core5.http.HttpStatus;
67  import org.apache.hc.core5.http.HttpVersion;
68  import org.apache.hc.core5.http.Method;
69  import org.apache.hc.core5.http.message.BasicHttpRequest;
70  import org.apache.hc.core5.http.message.StatusLine;
71  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
72  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
73  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
74  import org.apache.hc.core5.http.nio.CapacityChannel;
75  import org.apache.hc.core5.http.nio.DataStreamChannel;
76  import org.apache.hc.core5.http.nio.RequestChannel;
77  import org.apache.hc.core5.http.protocol.HttpContext;
78  import org.apache.hc.core5.http.protocol.HttpProcessor;
79  import org.apache.hc.core5.util.Args;
80  import org.slf4j.Logger;
81  import org.slf4j.LoggerFactory;
82  
83  /**
84   * Request execution handler in the asynchronous request execution chain
85   * that is responsible for establishing connection to the target
86   * origin server as specified by the current connection route.
87   *
88   * @since 5.0
89   */
90  @Contract(threading = ThreadingBehavior.STATELESS)
91  @Internal
92  public final class AsyncConnectExec implements AsyncExecChainHandler {
93  
94      private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
95  
96      private final HttpProcessor proxyHttpProcessor;
97      private final AuthenticationStrategy proxyAuthStrategy;
98      private final AuthenticationHandler authenticator;
99      private final AuthCacheKeeper authCacheKeeper;
100     private final HttpRouteDirector routeDirector;
101 
102     public AsyncConnectExec(
103             final HttpProcessor proxyHttpProcessor,
104             final AuthenticationStrategy proxyAuthStrategy,
105             final SchemePortResolver schemePortResolver,
106             final boolean authCachingDisabled) {
107         Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
108         Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
109         this.proxyHttpProcessor = proxyHttpProcessor;
110         this.proxyAuthStrategy = proxyAuthStrategy;
111         this.authenticator = new AuthenticationHandler();
112         this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
113         this.routeDirector = BasicRouteDirector.INSTANCE;
114     }
115 
116     static class State {
117 
118         State(final HttpRoute route) {
119             tracker = new RouteTracker(route);
120         }
121 
122         final RouteTracker tracker;
123 
124         volatile boolean challenged;
125         volatile HttpResponse response;
126         volatile boolean tunnelRefused;
127 
128     }
129 
130     @Override
131     public void execute(
132             final HttpRequest request,
133             final AsyncEntityProducer entityProducer,
134             final AsyncExecChain.Scope scope,
135             final AsyncExecChain chain,
136             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
137         Args.notNull(request, "HTTP request");
138         Args.notNull(scope, "Scope");
139 
140         final String exchangeId = scope.exchangeId;
141         final HttpRoute route = scope.route;
142         final CancellableDependency cancellableDependency = scope.cancellableDependency;
143         final HttpClientContext clientContext = scope.clientContext;
144         final AsyncExecRuntime execRuntime = scope.execRuntime;
145         final State state = new State(route);
146 
147         if (!execRuntime.isEndpointAcquired()) {
148             final Object userToken = clientContext.getUserToken();
149             if (LOG.isDebugEnabled()) {
150                 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
151             }
152             cancellableDependency.setDependency(execRuntime.acquireEndpoint(
153                     exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
154 
155                         @Override
156                         public void completed(final AsyncExecRuntime execRuntime) {
157                             if (execRuntime.isEndpointConnected()) {
158                                 try {
159                                     chain.proceed(request, entityProducer, scope, asyncExecCallback);
160                                 } catch (final HttpException | IOException ex) {
161                                     asyncExecCallback.failed(ex);
162                                 }
163                             } else {
164                                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
165                             }
166                         }
167 
168                         @Override
169                         public void failed(final Exception ex) {
170                             asyncExecCallback.failed(ex);
171                         }
172 
173                         @Override
174                         public void cancelled() {
175                             asyncExecCallback.failed(new InterruptedIOException());
176                         }
177 
178                     }));
179         } else {
180             if (execRuntime.isEndpointConnected()) {
181                 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
182             } else {
183                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
184             }
185         }
186 
187     }
188 
189     private void proceedToNextHop(
190             final State state,
191             final HttpRequest request,
192             final AsyncEntityProducer entityProducer,
193             final AsyncExecChain.Scope scope,
194             final AsyncExecChain chain,
195             final AsyncExecCallback asyncExecCallback) {
196         try {
197             doProceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
198         } catch (final RuntimeException ex) {
199             asyncExecCallback.failed(ex);
200         }
201     }
202 
203     private void doProceedToNextHop(
204             final State state,
205             final HttpRequest request,
206             final AsyncEntityProducer entityProducer,
207             final AsyncExecChain.Scope scope,
208             final AsyncExecChain chain,
209             final AsyncExecCallback asyncExecCallback) {
210         final RouteTracker tracker = state.tracker;
211         final String exchangeId = scope.exchangeId;
212         final HttpRoute route = scope.route;
213         final AsyncExecRuntime execRuntime = scope.execRuntime;
214         final CancellableDependency operation = scope.cancellableDependency;
215         final HttpClientContext clientContext = scope.clientContext;
216 
217         final HttpRoute fact = tracker.toRoute();
218         final int step = routeDirector.nextStep(route, fact);
219 
220         switch (step) {
221             case HttpRouteDirector.CONNECT_TARGET:
222                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
223 
224                     @Override
225                     public void completed(final AsyncExecRuntime execRuntime) {
226                         tracker.connectTarget(route.isSecure());
227                         if (LOG.isDebugEnabled()) {
228                             LOG.debug("{} connected to target", exchangeId);
229                         }
230                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
231                     }
232 
233                     @Override
234                     public void failed(final Exception ex) {
235                         asyncExecCallback.failed(ex);
236                     }
237 
238                     @Override
239                     public void cancelled() {
240                         asyncExecCallback.failed(new InterruptedIOException());
241                     }
242 
243                 }));
244                 break;
245 
246             case HttpRouteDirector.CONNECT_PROXY:
247                 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
248 
249                     @Override
250                     public void completed(final AsyncExecRuntime execRuntime) {
251                         final HttpHost proxy = route.getProxyHost();
252                         tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
253                         if (LOG.isDebugEnabled()) {
254                             LOG.debug("{} connected to proxy", exchangeId);
255                         }
256                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
257                     }
258 
259                     @Override
260                     public void failed(final Exception ex) {
261                         asyncExecCallback.failed(ex);
262                     }
263 
264                     @Override
265                     public void cancelled() {
266                         asyncExecCallback.failed(new InterruptedIOException());
267                     }
268 
269                 }));
270                 break;
271 
272             case HttpRouteDirector.TUNNEL_TARGET:
273                 final HttpHost proxy = route.getProxyHost();
274                 final HttpHost target = route.getTargetHost();
275                 if (LOG.isDebugEnabled()) {
276                     LOG.debug("{} create tunnel", exchangeId);
277                 }
278                 createTunnel(state, proxy, target, scope, new AsyncExecCallback() {
279 
280                     @Override
281                     public AsyncDataConsumer handleResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
282                         return asyncExecCallback.handleResponse(response, entityDetails);
283                     }
284 
285                     @Override
286                     public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
287                         asyncExecCallback.handleInformationResponse(response);
288                     }
289 
290                     @Override
291                     public void completed() {
292                         if (!execRuntime.isEndpointConnected()) {
293                             // Remote endpoint disconnected. Need to start over
294                             if (LOG.isDebugEnabled()) {
295                                 LOG.debug("{} proxy disconnected", exchangeId);
296                             }
297                             state.tracker.reset();
298                         }
299                         if (state.challenged) {
300                             if (LOG.isDebugEnabled()) {
301                                 LOG.debug("{} proxy authentication required", exchangeId);
302                             }
303                             proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
304                         } else {
305                             if (state.tunnelRefused) {
306                                 if (LOG.isDebugEnabled()) {
307                                     LOG.debug("{} tunnel refused", exchangeId);
308                                 }
309                                 asyncExecCallback.completed();
310                             } else {
311                                 if (LOG.isDebugEnabled()) {
312                                     LOG.debug("{} tunnel to target created", exchangeId);
313                                 }
314                                 tracker.tunnelTarget(false);
315                                 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
316                             }
317                         }
318                     }
319 
320                     @Override
321                     public void failed(final Exception cause) {
322                         execRuntime.markConnectionNonReusable();
323                         asyncExecCallback.failed(cause);
324                     }
325 
326                 });
327                 break;
328 
329             case HttpRouteDirector.TUNNEL_PROXY:
330                 // The most simple example for this case is a proxy chain
331                 // of two proxies, where P1 must be tunnelled to P2.
332                 // route: Source -> P1 -> P2 -> Target (3 hops)
333                 // fact:  Source -> P1 -> Target       (2 hops)
334                 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
335                 break;
336 
337             case HttpRouteDirector.LAYER_PROTOCOL:
338                 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
339 
340                     @Override
341                     public void completed(final AsyncExecRuntime asyncExecRuntime) {
342                         if (LOG.isDebugEnabled()) {
343                             LOG.debug("{} upgraded to TLS", exchangeId);
344                         }
345                         tracker.layerProtocol(route.isSecure());
346                         proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
347                     }
348 
349                     @Override
350                     public void failed(final Exception ex) {
351                         asyncExecCallback.failed(ex);
352                     }
353 
354                     @Override
355                     public void cancelled() {
356                         asyncExecCallback.failed(new InterruptedIOException());
357                     }
358 
359                 });
360                 break;
361 
362             case HttpRouteDirector.UNREACHABLE:
363                 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
364                         "planned = " + route + "; current = " + fact));
365                 break;
366 
367             case HttpRouteDirector.COMPLETE:
368                 if (LOG.isDebugEnabled()) {
369                     LOG.debug("{} route fully established", exchangeId);
370                 }
371                 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
372                 break;
373 
374             default:
375                 throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
376         }
377     }
378 
379     private void createTunnel(
380             final State state,
381             final HttpHost proxy,
382             final HttpHost nextHop,
383             final AsyncExecChain.Scope scope,
384             final AsyncExecCallback asyncExecCallback) {
385 
386         final CancellableDependency operation = scope.cancellableDependency;
387         final HttpClientContext clientContext = scope.clientContext;
388         final AsyncExecRuntime execRuntime = scope.execRuntime;
389         final String exchangeId = scope.exchangeId;
390 
391         final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
392 
393         if (authCacheKeeper != null) {
394             authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
395         }
396 
397         final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
398 
399             private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
400 
401             @Override
402             public void releaseResources() {
403                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
404                 if (entityConsumer != null) {
405                     entityConsumer.releaseResources();
406                 }
407             }
408 
409             @Override
410             public void failed(final Exception cause) {
411                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
412                 if (entityConsumer != null) {
413                     entityConsumer.releaseResources();
414                 }
415                 asyncExecCallback.failed(cause);
416             }
417 
418             @Override
419             public void cancel() {
420                 failed(new InterruptedIOException());
421             }
422 
423             @Override
424             public void produceRequest(final RequestChannel requestChannel,
425                                        final HttpContext httpContext) throws HttpException, IOException {
426                 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
427                 connect.setVersion(HttpVersion.HTTP_1_1);
428 
429                 proxyHttpProcessor.process(connect, null, clientContext);
430                 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
431 
432                 requestChannel.sendRequest(connect, null, clientContext);
433             }
434 
435             @Override
436             public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
437             }
438 
439             @Override
440             public int available() {
441                 return 0;
442             }
443 
444             @Override
445             public void consumeInformation(final HttpResponse httpResponse,
446                                            final HttpContext httpContext) throws HttpException, IOException {
447             }
448 
449             @Override
450             public void consumeResponse(final HttpResponse response,
451                                         final EntityDetails entityDetails,
452                                         final HttpContext httpContext) throws HttpException, IOException {
453                 clientContext.setResponse(response);
454                 proxyHttpProcessor.process(response, entityDetails, clientContext);
455 
456                 final int status = response.getCode();
457                 if (status < HttpStatus.SC_SUCCESS) {
458                     throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
459                 }
460 
461                 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
462                     state.challenged = true;
463                 } else {
464                     state.challenged = false;
465                     if (status >= HttpStatus.SC_REDIRECTION) {
466                         state.tunnelRefused = true;
467                         entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
468                     } else if (status == HttpStatus.SC_OK) {
469                         clientContext.setProtocolVersion(null);
470                         asyncExecCallback.completed();
471                     } else {
472                         throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
473                     }
474                 }
475             }
476 
477             @Override
478             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
479                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
480                 if (entityConsumer != null) {
481                     entityConsumer.updateCapacity(capacityChannel);
482                 } else {
483                     capacityChannel.update(Integer.MAX_VALUE);
484                 }
485             }
486 
487             @Override
488             public void consume(final ByteBuffer src) throws IOException {
489                 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
490                 if (entityConsumer != null) {
491                     entityConsumer.consume(src);
492                 }
493             }
494 
495             @Override
496             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
497                 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
498                 if (entityConsumer != null) {
499                     entityConsumer.streamEnd(trailers);
500                 }
501                 asyncExecCallback.completed();
502             }
503 
504         };
505 
506         if (LOG.isDebugEnabled()) {
507             operation.setDependency(execRuntime.execute(
508                     exchangeId,
509                     new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
510                     clientContext));
511         } else {
512             operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
513         }
514 
515     }
516 
517     private boolean needAuthentication(
518             final AuthExchange proxyAuthExchange,
519             final HttpHost proxy,
520             final HttpResponse response,
521             final HttpClientContext context) throws AuthenticationException, MalformedChallengeException {
522         final RequestConfig config = context.getRequestConfigOrDefault();
523         if (config.isAuthenticationEnabled()) {
524             final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
525             final boolean proxyMutualAuthRequired = authenticator.isChallengeExpected(proxyAuthExchange);
526 
527             if (authCacheKeeper != null) {
528                 if (proxyAuthRequested) {
529                     authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
530                 } else {
531                     authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
532                 }
533             }
534 
535             if (proxyAuthRequested || proxyMutualAuthRequired) {
536                 final boolean updated = authenticator.handleResponse(proxy, ChallengeType.PROXY, response,
537                         proxyAuthStrategy, proxyAuthExchange, context);
538 
539                 if (authCacheKeeper != null) {
540                     authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
541                 }
542 
543                 return updated;
544             }
545         }
546         return false;
547     }
548 
549     private void proceedConnected(
550             final HttpRequest request,
551             final AsyncEntityProducer entityProducer,
552             final AsyncExecChain.Scope scope,
553             final AsyncExecChain chain,
554             final AsyncExecCallback asyncExecCallback) {
555         final AsyncExecRuntime execRuntime = scope.execRuntime;
556         final HttpClientContext clientContext = scope.clientContext;
557         final EndpointInfo endpointInfo = execRuntime.getEndpointInfo();
558         if (endpointInfo != null) {
559             clientContext.setSSLSession(endpointInfo.getSslSession());
560         }
561         try {
562             chain.proceed(request, entityProducer, scope, asyncExecCallback);
563         } catch (final HttpException | IOException ex) {
564             asyncExecCallback.failed(ex);
565         }
566     }
567 
568 }