1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.nio.ByteBuffer;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.apache.hc.client5.http.HttpRoute;
36 import org.apache.hc.client5.http.async.AsyncExecCallback;
37 import org.apache.hc.client5.http.async.AsyncExecChain;
38 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
39 import org.apache.hc.client5.http.async.AsyncExecRuntime;
40 import org.apache.hc.client5.http.protocol.HttpClientContext;
41 import org.apache.hc.core5.annotation.Contract;
42 import org.apache.hc.core5.annotation.Internal;
43 import org.apache.hc.core5.annotation.ThreadingBehavior;
44 import org.apache.hc.core5.concurrent.CancellableDependency;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HttpException;
48 import org.apache.hc.core5.http.HttpRequest;
49 import org.apache.hc.core5.http.HttpResponse;
50 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
51 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
52 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
53 import org.apache.hc.core5.http.nio.CapacityChannel;
54 import org.apache.hc.core5.http.nio.DataStreamChannel;
55 import org.apache.hc.core5.http.nio.RequestChannel;
56 import org.apache.hc.core5.http.protocol.HttpContext;
57 import org.apache.hc.core5.http.protocol.HttpProcessor;
58 import org.apache.hc.core5.util.Args;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62
63
64
65
66
67
68
69 @Contract(threading = ThreadingBehavior.STATELESS)
70 @Internal
71 public class H2AsyncMainClientExec implements AsyncExecChainHandler {
72
73 private static final Logger LOG = LoggerFactory.getLogger(H2AsyncMainClientExec.class);
74
75 private final HttpProcessor httpProcessor;
76
77 H2AsyncMainClientExec(final HttpProcessor httpProcessor) {
78 this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
79 }
80
81 @Override
82 public void execute(
83 final HttpRequest request,
84 final AsyncEntityProducer entityProducer,
85 final AsyncExecChain.Scope scope,
86 final AsyncExecChain chain,
87 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
88 final String exchangeId = scope.exchangeId;
89 final HttpRoute route = scope.route;
90 final CancellableDependency operation = scope.cancellableDependency;
91 final HttpClientContext clientContext = scope.clientContext;
92 final AsyncExecRuntime execRuntime = scope.execRuntime;
93
94 if (LOG.isDebugEnabled()) {
95 LOG.debug("{} executing {} {}", exchangeId, request.getMethod(), request.getRequestUri());
96 }
97
98 final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
99
100 private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
101
102 @Override
103 public void releaseResources() {
104 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
105 if (entityConsumer != null) {
106 entityConsumer.releaseResources();
107 }
108 }
109
110 @Override
111 public void failed(final Exception cause) {
112 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
113 if (entityConsumer != null) {
114 entityConsumer.releaseResources();
115 }
116 execRuntime.markConnectionNonReusable();
117 asyncExecCallback.failed(cause);
118 }
119
120 @Override
121 public void cancel() {
122 failed(new InterruptedIOException());
123 }
124
125 @Override
126 public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
127 clientContext.setRequest(request);
128 clientContext.setRoute(route);
129 httpProcessor.process(request, entityProducer, clientContext);
130
131 channel.sendRequest(request, entityProducer, context);
132 }
133
134 @Override
135 public int available() {
136 return entityProducer.available();
137 }
138
139 @Override
140 public void produce(final DataStreamChannel channel) throws IOException {
141 entityProducer.produce(channel);
142 }
143
144 @Override
145 public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
146 }
147
148 @Override
149 public void consumeResponse(
150 final HttpResponse response,
151 final EntityDetails entityDetails,
152 final HttpContext context) throws HttpException, IOException {
153
154 clientContext.setResponse(response);
155 httpProcessor.process(response, entityDetails, clientContext);
156
157 entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
158 if (entityDetails == null) {
159 execRuntime.validateConnection();
160 asyncExecCallback.completed();
161 }
162 }
163
164 @Override
165 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
166 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
167 if (entityConsumer != null) {
168 entityConsumer.updateCapacity(capacityChannel);
169 } else {
170 capacityChannel.update(Integer.MAX_VALUE);
171 }
172 }
173
174 @Override
175 public void consume(final ByteBuffer src) throws IOException {
176 final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
177 if (entityConsumer != null) {
178 entityConsumer.consume(src);
179 }
180 }
181
182 @Override
183 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
184 final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
185 if (entityConsumer != null) {
186 entityConsumer.streamEnd(trailers);
187 } else {
188 execRuntime.validateConnection();
189 }
190 asyncExecCallback.completed();
191 }
192
193 };
194
195 if (LOG.isDebugEnabled()) {
196 operation.setDependency(execRuntime.execute(
197 exchangeId,
198 new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
199 clientContext));
200 } else {
201 operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
202 }
203 }
204
205 }