1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.ByteBuffer;
33 import java.util.List;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.hc.client5.http.AuthenticationStrategy;
37 import org.apache.hc.client5.http.EndpointInfo;
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.RouteTracker;
40 import org.apache.hc.client5.http.SchemePortResolver;
41 import org.apache.hc.client5.http.async.AsyncExecCallback;
42 import org.apache.hc.client5.http.async.AsyncExecChain;
43 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
44 import org.apache.hc.client5.http.async.AsyncExecRuntime;
45 import org.apache.hc.client5.http.auth.AuthExchange;
46 import org.apache.hc.client5.http.auth.AuthenticationException;
47 import org.apache.hc.client5.http.auth.ChallengeType;
48 import org.apache.hc.client5.http.auth.MalformedChallengeException;
49 import org.apache.hc.client5.http.config.RequestConfig;
50 import org.apache.hc.client5.http.impl.auth.AuthCacheKeeper;
51 import org.apache.hc.client5.http.impl.auth.AuthenticationHandler;
52 import org.apache.hc.client5.http.impl.routing.BasicRouteDirector;
53 import org.apache.hc.client5.http.protocol.HttpClientContext;
54 import org.apache.hc.client5.http.routing.HttpRouteDirector;
55 import org.apache.hc.core5.annotation.Contract;
56 import org.apache.hc.core5.annotation.Internal;
57 import org.apache.hc.core5.annotation.ThreadingBehavior;
58 import org.apache.hc.core5.concurrent.CancellableDependency;
59 import org.apache.hc.core5.concurrent.FutureCallback;
60 import org.apache.hc.core5.http.EntityDetails;
61 import org.apache.hc.core5.http.Header;
62 import org.apache.hc.core5.http.HttpException;
63 import org.apache.hc.core5.http.HttpHost;
64 import org.apache.hc.core5.http.HttpRequest;
65 import org.apache.hc.core5.http.HttpResponse;
66 import org.apache.hc.core5.http.HttpStatus;
67 import org.apache.hc.core5.http.HttpVersion;
68 import org.apache.hc.core5.http.Method;
69 import org.apache.hc.core5.http.message.BasicHttpRequest;
70 import org.apache.hc.core5.http.message.StatusLine;
71 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
72 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
73 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
74 import org.apache.hc.core5.http.nio.CapacityChannel;
75 import org.apache.hc.core5.http.nio.DataStreamChannel;
76 import org.apache.hc.core5.http.nio.RequestChannel;
77 import org.apache.hc.core5.http.protocol.HttpContext;
78 import org.apache.hc.core5.http.protocol.HttpProcessor;
79 import org.apache.hc.core5.util.Args;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
82
83
84
85
86
87
88
89
90 @Contract(threading = ThreadingBehavior.STATELESS)
91 @Internal
92 public final class AsyncConnectExec implements AsyncExecChainHandler {
93
94 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectExec.class);
95
96 private final HttpProcessor proxyHttpProcessor;
97 private final AuthenticationStrategy proxyAuthStrategy;
98 private final AuthenticationHandler authenticator;
99 private final AuthCacheKeeper authCacheKeeper;
100 private final HttpRouteDirector routeDirector;
101
102 public AsyncConnectExec(
103 final HttpProcessor proxyHttpProcessor,
104 final AuthenticationStrategy proxyAuthStrategy,
105 final SchemePortResolver schemePortResolver,
106 final boolean authCachingDisabled) {
107 Args.notNull(proxyHttpProcessor, "Proxy HTTP processor");
108 Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
109 this.proxyHttpProcessor = proxyHttpProcessor;
110 this.proxyAuthStrategy = proxyAuthStrategy;
111 this.authenticator = new AuthenticationHandler();
112 this.authCacheKeeper = authCachingDisabled ? null : new AuthCacheKeeper(schemePortResolver);
113 this.routeDirector = BasicRouteDirector.INSTANCE;
114 }
115
116 static class State {
117
118 State(final HttpRoute route) {
119 tracker = new RouteTracker(route);
120 }
121
122 final RouteTracker tracker;
123
124 volatile boolean challenged;
125 volatile HttpResponse response;
126 volatile boolean tunnelRefused;
127
128 }
129
130 @Override
131 public void execute(
132 final HttpRequest request,
133 final AsyncEntityProducer entityProducer,
134 final AsyncExecChain.Scope scope,
135 final AsyncExecChain chain,
136 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
137 Args.notNull(request, "HTTP request");
138 Args.notNull(scope, "Scope");
139
140 final String exchangeId = scope.exchangeId;
141 final HttpRoute route = scope.route;
142 final CancellableDependency cancellableDependency = scope.cancellableDependency;
143 final HttpClientContext clientContext = scope.clientContext;
144 final AsyncExecRuntime execRuntime = scope.execRuntime;
145 final State state = new State(route);
146
147 if (!execRuntime.isEndpointAcquired()) {
148 final Object userToken = clientContext.getUserToken();
149 if (LOG.isDebugEnabled()) {
150 LOG.debug("{} acquiring connection with route {}", exchangeId, route);
151 }
152 cancellableDependency.setDependency(execRuntime.acquireEndpoint(
153 exchangeId, route, userToken, clientContext, new FutureCallback<AsyncExecRuntime>() {
154
155 @Override
156 public void completed(final AsyncExecRuntime execRuntime) {
157 if (execRuntime.isEndpointConnected()) {
158 try {
159 chain.proceed(request, entityProducer, scope, asyncExecCallback);
160 } catch (final HttpException | IOException ex) {
161 asyncExecCallback.failed(ex);
162 }
163 } else {
164 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
165 }
166 }
167
168 @Override
169 public void failed(final Exception ex) {
170 asyncExecCallback.failed(ex);
171 }
172
173 @Override
174 public void cancelled() {
175 asyncExecCallback.failed(new InterruptedIOException());
176 }
177
178 }));
179 } else {
180 if (execRuntime.isEndpointConnected()) {
181 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
182 } else {
183 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
184 }
185 }
186
187 }
188
189 private void proceedToNextHop(
190 final State state,
191 final HttpRequest request,
192 final AsyncEntityProducer entityProducer,
193 final AsyncExecChain.Scope scope,
194 final AsyncExecChain chain,
195 final AsyncExecCallback asyncExecCallback) {
196 try {
197 doProceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
198 } catch (final RuntimeException ex) {
199 asyncExecCallback.failed(ex);
200 }
201 }
202
203 private void doProceedToNextHop(
204 final State state,
205 final HttpRequest request,
206 final AsyncEntityProducer entityProducer,
207 final AsyncExecChain.Scope scope,
208 final AsyncExecChain chain,
209 final AsyncExecCallback asyncExecCallback) {
210 final RouteTracker tracker = state.tracker;
211 final String exchangeId = scope.exchangeId;
212 final HttpRoute route = scope.route;
213 final AsyncExecRuntime execRuntime = scope.execRuntime;
214 final CancellableDependency operation = scope.cancellableDependency;
215 final HttpClientContext clientContext = scope.clientContext;
216
217 final HttpRoute fact = tracker.toRoute();
218 final int step = routeDirector.nextStep(route, fact);
219
220 switch (step) {
221 case HttpRouteDirector.CONNECT_TARGET:
222 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
223
224 @Override
225 public void completed(final AsyncExecRuntime execRuntime) {
226 tracker.connectTarget(route.isSecure());
227 if (LOG.isDebugEnabled()) {
228 LOG.debug("{} connected to target", exchangeId);
229 }
230 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
231 }
232
233 @Override
234 public void failed(final Exception ex) {
235 asyncExecCallback.failed(ex);
236 }
237
238 @Override
239 public void cancelled() {
240 asyncExecCallback.failed(new InterruptedIOException());
241 }
242
243 }));
244 break;
245
246 case HttpRouteDirector.CONNECT_PROXY:
247 operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
248
249 @Override
250 public void completed(final AsyncExecRuntime execRuntime) {
251 final HttpHost proxy = route.getProxyHost();
252 tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
253 if (LOG.isDebugEnabled()) {
254 LOG.debug("{} connected to proxy", exchangeId);
255 }
256 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
257 }
258
259 @Override
260 public void failed(final Exception ex) {
261 asyncExecCallback.failed(ex);
262 }
263
264 @Override
265 public void cancelled() {
266 asyncExecCallback.failed(new InterruptedIOException());
267 }
268
269 }));
270 break;
271
272 case HttpRouteDirector.TUNNEL_TARGET:
273 final HttpHost proxy = route.getProxyHost();
274 final HttpHost target = route.getTargetHost();
275 if (LOG.isDebugEnabled()) {
276 LOG.debug("{} create tunnel", exchangeId);
277 }
278 createTunnel(state, proxy, target, scope, new AsyncExecCallback() {
279
280 @Override
281 public AsyncDataConsumer handleResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
282 return asyncExecCallback.handleResponse(response, entityDetails);
283 }
284
285 @Override
286 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
287 asyncExecCallback.handleInformationResponse(response);
288 }
289
290 @Override
291 public void completed() {
292 if (!execRuntime.isEndpointConnected()) {
293
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("{} proxy disconnected", exchangeId);
296 }
297 state.tracker.reset();
298 }
299 if (state.challenged) {
300 if (LOG.isDebugEnabled()) {
301 LOG.debug("{} proxy authentication required", exchangeId);
302 }
303 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
304 } else {
305 if (state.tunnelRefused) {
306 if (LOG.isDebugEnabled()) {
307 LOG.debug("{} tunnel refused", exchangeId);
308 }
309 asyncExecCallback.completed();
310 } else {
311 if (LOG.isDebugEnabled()) {
312 LOG.debug("{} tunnel to target created", exchangeId);
313 }
314 tracker.tunnelTarget(false);
315 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
316 }
317 }
318 }
319
320 @Override
321 public void failed(final Exception cause) {
322 execRuntime.markConnectionNonReusable();
323 asyncExecCallback.failed(cause);
324 }
325
326 });
327 break;
328
329 case HttpRouteDirector.TUNNEL_PROXY:
330
331
332
333
334 asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
335 break;
336
337 case HttpRouteDirector.LAYER_PROTOCOL:
338 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
339
340 @Override
341 public void completed(final AsyncExecRuntime asyncExecRuntime) {
342 if (LOG.isDebugEnabled()) {
343 LOG.debug("{} upgraded to TLS", exchangeId);
344 }
345 tracker.layerProtocol(route.isSecure());
346 proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
347 }
348
349 @Override
350 public void failed(final Exception ex) {
351 asyncExecCallback.failed(ex);
352 }
353
354 @Override
355 public void cancelled() {
356 asyncExecCallback.failed(new InterruptedIOException());
357 }
358
359 });
360 break;
361
362 case HttpRouteDirector.UNREACHABLE:
363 asyncExecCallback.failed(new HttpException("Unable to establish route: " +
364 "planned = " + route + "; current = " + fact));
365 break;
366
367 case HttpRouteDirector.COMPLETE:
368 if (LOG.isDebugEnabled()) {
369 LOG.debug("{} route fully established", exchangeId);
370 }
371 proceedConnected(request, entityProducer, scope, chain, asyncExecCallback);
372 break;
373
374 default:
375 throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
376 }
377 }
378
379 private void createTunnel(
380 final State state,
381 final HttpHost proxy,
382 final HttpHost nextHop,
383 final AsyncExecChain.Scope scope,
384 final AsyncExecCallback asyncExecCallback) {
385
386 final CancellableDependency operation = scope.cancellableDependency;
387 final HttpClientContext clientContext = scope.clientContext;
388 final AsyncExecRuntime execRuntime = scope.execRuntime;
389 final String exchangeId = scope.exchangeId;
390
391 final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
392
393 if (authCacheKeeper != null) {
394 authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
395 }
396
397 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
398
399 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
400
401 @Override
402 public void releaseResources() {
403 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
404 if (entityConsumer != null) {
405 entityConsumer.releaseResources();
406 }
407 }
408
409 @Override
410 public void failed(final Exception cause) {
411 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
412 if (entityConsumer != null) {
413 entityConsumer.releaseResources();
414 }
415 asyncExecCallback.failed(cause);
416 }
417
418 @Override
419 public void cancel() {
420 failed(new InterruptedIOException());
421 }
422
423 @Override
424 public void produceRequest(final RequestChannel requestChannel,
425 final HttpContext httpContext) throws HttpException, IOException {
426 final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
427 connect.setVersion(HttpVersion.HTTP_1_1);
428
429 proxyHttpProcessor.process(connect, null, clientContext);
430 authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
431
432 requestChannel.sendRequest(connect, null, clientContext);
433 }
434
435 @Override
436 public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
437 }
438
439 @Override
440 public int available() {
441 return 0;
442 }
443
444 @Override
445 public void consumeInformation(final HttpResponse httpResponse,
446 final HttpContext httpContext) throws HttpException, IOException {
447 }
448
449 @Override
450 public void consumeResponse(final HttpResponse response,
451 final EntityDetails entityDetails,
452 final HttpContext httpContext) throws HttpException, IOException {
453 clientContext.setResponse(response);
454 proxyHttpProcessor.process(response, entityDetails, clientContext);
455
456 final int status = response.getCode();
457 if (status < HttpStatus.SC_SUCCESS) {
458 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
459 }
460
461 if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
462 state.challenged = true;
463 } else {
464 state.challenged = false;
465 if (status >= HttpStatus.SC_REDIRECTION) {
466 state.tunnelRefused = true;
467 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
468 } else if (status == HttpStatus.SC_OK) {
469 clientContext.setProtocolVersion(null);
470 asyncExecCallback.completed();
471 } else {
472 throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
473 }
474 }
475 }
476
477 @Override
478 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
479 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
480 if (entityConsumer != null) {
481 entityConsumer.updateCapacity(capacityChannel);
482 } else {
483 capacityChannel.update(Integer.MAX_VALUE);
484 }
485 }
486
487 @Override
488 public void consume(final ByteBuffer src) throws IOException {
489 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
490 if (entityConsumer != null) {
491 entityConsumer.consume(src);
492 }
493 }
494
495 @Override
496 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
497 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
498 if (entityConsumer != null) {
499 entityConsumer.streamEnd(trailers);
500 }
501 asyncExecCallback.completed();
502 }
503
504 };
505
506 if (LOG.isDebugEnabled()) {
507 operation.setDependency(execRuntime.execute(
508 exchangeId,
509 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
510 clientContext));
511 } else {
512 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
513 }
514
515 }
516
517 private boolean needAuthentication(
518 final AuthExchange proxyAuthExchange,
519 final HttpHost proxy,
520 final HttpResponse response,
521 final HttpClientContext context) throws AuthenticationException, MalformedChallengeException {
522 final RequestConfig config = context.getRequestConfigOrDefault();
523 if (config.isAuthenticationEnabled()) {
524 final boolean proxyAuthRequested = authenticator.isChallenged(proxy, ChallengeType.PROXY, response, proxyAuthExchange, context);
525 final boolean proxyMutualAuthRequired = authenticator.isChallengeExpected(proxyAuthExchange);
526
527 if (authCacheKeeper != null) {
528 if (proxyAuthRequested) {
529 authCacheKeeper.updateOnChallenge(proxy, null, proxyAuthExchange, context);
530 } else {
531 authCacheKeeper.updateOnNoChallenge(proxy, null, proxyAuthExchange, context);
532 }
533 }
534
535 if (proxyAuthRequested || proxyMutualAuthRequired) {
536 final boolean updated = authenticator.handleResponse(proxy, ChallengeType.PROXY, response,
537 proxyAuthStrategy, proxyAuthExchange, context);
538
539 if (authCacheKeeper != null) {
540 authCacheKeeper.updateOnResponse(proxy, null, proxyAuthExchange, context);
541 }
542
543 return updated;
544 }
545 }
546 return false;
547 }
548
549 private void proceedConnected(
550 final HttpRequest request,
551 final AsyncEntityProducer entityProducer,
552 final AsyncExecChain.Scope scope,
553 final AsyncExecChain chain,
554 final AsyncExecCallback asyncExecCallback) {
555 final AsyncExecRuntime execRuntime = scope.execRuntime;
556 final HttpClientContext clientContext = scope.clientContext;
557 final EndpointInfo endpointInfo = execRuntime.getEndpointInfo();
558 if (endpointInfo != null) {
559 clientContext.setSSLSession(endpointInfo.getSslSession());
560 }
561 try {
562 chain.proceed(request, entityProducer, scope, asyncExecCallback);
563 } catch (final HttpException | IOException ex) {
564 asyncExecCallback.failed(ex);
565 }
566 }
567
568 }