1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.hc.client5.http.ConnectionKeepAliveStrategy;
37 import org.apache.hc.client5.http.HttpRoute;
38 import org.apache.hc.client5.http.UserTokenHandler;
39 import org.apache.hc.client5.http.async.AsyncExecCallback;
40 import org.apache.hc.client5.http.async.AsyncExecChain;
41 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
42 import org.apache.hc.client5.http.async.AsyncExecRuntime;
43 import org.apache.hc.client5.http.impl.ProtocolSwitchStrategy;
44 import org.apache.hc.client5.http.protocol.HttpClientContext;
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.Internal;
47 import org.apache.hc.core5.annotation.ThreadingBehavior;
48 import org.apache.hc.core5.concurrent.CancellableDependency;
49 import org.apache.hc.core5.concurrent.FutureCallback;
50 import org.apache.hc.core5.http.EntityDetails;
51 import org.apache.hc.core5.http.Header;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpRequest;
54 import org.apache.hc.core5.http.HttpResponse;
55 import org.apache.hc.core5.http.HttpStatus;
56 import org.apache.hc.core5.http.ProtocolException;
57 import org.apache.hc.core5.http.ProtocolVersion;
58 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
59 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
60 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
61 import org.apache.hc.core5.http.nio.CapacityChannel;
62 import org.apache.hc.core5.http.nio.DataStreamChannel;
63 import org.apache.hc.core5.http.nio.RequestChannel;
64 import org.apache.hc.core5.http.protocol.HttpContext;
65 import org.apache.hc.core5.http.protocol.HttpProcessor;
66 import org.apache.hc.core5.util.Args;
67 import org.apache.hc.core5.util.TimeValue;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71
72
73
74
75
76
77
78 @Contract(threading = ThreadingBehavior.STATELESS)
79 @Internal
80 class HttpAsyncMainClientExec implements AsyncExecChainHandler {
81
82 private static final Logger LOG = LoggerFactory.getLogger(HttpAsyncMainClientExec.class);
83
84 private final HttpProcessor httpProcessor;
85 private final ConnectionKeepAliveStrategy keepAliveStrategy;
86 private final UserTokenHandler userTokenHandler;
87 private final ProtocolSwitchStrategy protocolSwitchStrategy;
88
89 HttpAsyncMainClientExec(final HttpProcessor httpProcessor,
90 final ConnectionKeepAliveStrategy keepAliveStrategy,
91 final UserTokenHandler userTokenHandler) {
92 this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
93 this.keepAliveStrategy = keepAliveStrategy;
94 this.userTokenHandler = userTokenHandler;
95 this.protocolSwitchStrategy = new ProtocolSwitchStrategy();
96 }
97
98 @Override
99 public void execute(
100 final HttpRequest request,
101 final AsyncEntityProducer entityProducer,
102 final AsyncExecChain.Scope scope,
103 final AsyncExecChain chain,
104 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
105 final String exchangeId = scope.exchangeId;
106 final HttpRoute route = scope.route;
107 final CancellableDependency operation = scope.cancellableDependency;
108 final HttpClientContext clientContext = scope.clientContext;
109 final AsyncExecRuntime execRuntime = scope.execRuntime;
110
111 if (LOG.isDebugEnabled()) {
112 LOG.debug("{} executing {} {}", exchangeId, request.getMethod(), request.getRequestUri());
113 }
114
115 final AtomicInteger messageCountDown = new AtomicInteger(2);
116 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
117
118 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
119
120 @Override
121 public void releaseResources() {
122 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
123 if (entityConsumer != null) {
124 entityConsumer.releaseResources();
125 }
126 }
127
128 @Override
129 public void failed(final Exception cause) {
130 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
131 if (entityConsumer != null) {
132 entityConsumer.releaseResources();
133 }
134 execRuntime.markConnectionNonReusable();
135 asyncExecCallback.failed(cause);
136 }
137
138 @Override
139 public void cancel() {
140 if (messageCountDown.get() > 0) {
141 failed(new InterruptedIOException());
142 }
143 }
144
145 @Override
146 public void produceRequest(
147 final RequestChannel channel,
148 final HttpContext context) throws HttpException, IOException {
149
150 clientContext.setRoute(route);
151 clientContext.setRequest(request);
152 httpProcessor.process(request, entityProducer, clientContext);
153
154 channel.sendRequest(request, entityProducer, context);
155 if (entityProducer == null) {
156 messageCountDown.decrementAndGet();
157 }
158 }
159
160 @Override
161 public int available() {
162 return entityProducer.available();
163 }
164
165 @Override
166 public void produce(final DataStreamChannel channel) throws IOException {
167 entityProducer.produce(new DataStreamChannel() {
168
169 @Override
170 public void requestOutput() {
171 channel.requestOutput();
172 }
173
174 @Override
175 public int write(final ByteBuffer src) throws IOException {
176 return channel.write(src);
177 }
178
179 @Override
180 public void endStream(final List<? extends Header> trailers) throws IOException {
181 channel.endStream(trailers);
182 if (messageCountDown.decrementAndGet() <= 0) {
183 asyncExecCallback.completed();
184 }
185 }
186
187 @Override
188 public void endStream() throws IOException {
189 channel.endStream();
190 if (messageCountDown.decrementAndGet() <= 0) {
191 asyncExecCallback.completed();
192 }
193 }
194
195 });
196 }
197
198 @Override
199 public void consumeInformation(
200 final HttpResponse response,
201 final HttpContext context) throws HttpException, IOException {
202 if (response.getCode() == HttpStatus.SC_SWITCHING_PROTOCOLS) {
203 final ProtocolVersion upgradeProtocol = protocolSwitchStrategy.switchProtocol(response);
204 if (upgradeProtocol == null || !upgradeProtocol.getProtocol().equals("TLS")) {
205 throw new ProtocolException("Failure switching protocols");
206 }
207 if (LOG.isDebugEnabled()) {
208 LOG.debug("Switching to {}", upgradeProtocol);
209 }
210 execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
211
212 @Override
213 public void completed(final AsyncExecRuntime result) {
214 LOG.debug("Successfully switched to {}", upgradeProtocol);
215 }
216
217 @Override
218 public void failed(final Exception ex) {
219 asyncExecCallback.failed(ex);
220 }
221
222 @Override
223 public void cancelled() {
224 asyncExecCallback.failed(new InterruptedIOException());
225 }
226
227 });
228 } else {
229 asyncExecCallback.handleInformationResponse(response);
230 }
231 }
232
233 @Override
234 public void consumeResponse(
235 final HttpResponse response,
236 final EntityDetails entityDetails,
237 final HttpContext context) throws HttpException, IOException {
238
239 clientContext.setResponse(response);
240 httpProcessor.process(response, entityDetails, clientContext);
241
242 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
243 if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
244 messageCountDown.decrementAndGet();
245 }
246 final TimeValue keepAliveDuration = keepAliveStrategy.getKeepAliveDuration(response, clientContext);
247 Object userToken = clientContext.getUserToken();
248 if (userToken == null) {
249 userToken = userTokenHandler.getUserToken(route, request, clientContext);
250 clientContext.setUserToken(userToken);
251 }
252 execRuntime.markConnectionReusable(userToken, keepAliveDuration);
253 if (entityDetails == null) {
254 execRuntime.validateConnection();
255 if (messageCountDown.decrementAndGet() <= 0) {
256 asyncExecCallback.completed();
257 }
258 }
259 }
260
261 @Override
262 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
263 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
264 if (entityConsumer != null) {
265 entityConsumer.updateCapacity(capacityChannel);
266 } else {
267 capacityChannel.update(Integer.MAX_VALUE);
268 }
269 }
270
271 @Override
272 public void consume(final ByteBuffer src) throws IOException {
273 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
274 if (entityConsumer != null) {
275 entityConsumer.consume(src);
276 }
277 }
278
279 @Override
280 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
281 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
282 if (entityConsumer != null) {
283 entityConsumer.streamEnd(trailers);
284 } else {
285 execRuntime.validateConnection();
286 }
287 if (messageCountDown.decrementAndGet() <= 0) {
288 asyncExecCallback.completed();
289 }
290 }
291
292 };
293
294 if (LOG.isDebugEnabled()) {
295 operation.setDependency(execRuntime.execute(
296 exchangeId,
297 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
298 clientContext));
299 } else {
300 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
301 }
302 }
303
304 }