1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.hc.client5.http.EndpointInfo;
34 import org.apache.hc.client5.http.HttpRoute;
35 import org.apache.hc.client5.http.async.AsyncExecRuntime;
36 import org.apache.hc.client5.http.config.RequestConfig;
37 import org.apache.hc.client5.http.config.TlsConfig;
38 import org.apache.hc.client5.http.impl.ConnPoolSupport;
39 import org.apache.hc.client5.http.impl.Operations;
40 import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
41 import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
42 import org.apache.hc.client5.http.protocol.HttpClientContext;
43 import org.apache.hc.core5.concurrent.CallbackContribution;
44 import org.apache.hc.core5.concurrent.Cancellable;
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
47 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48 import org.apache.hc.core5.http.nio.HandlerFactory;
49 import org.apache.hc.core5.io.CloseMode;
50 import org.apache.hc.core5.reactor.ConnectionInitiator;
51 import org.apache.hc.core5.util.TimeValue;
52 import org.apache.hc.core5.util.Timeout;
53 import org.slf4j.Logger;
54
55 class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
56
57 static class ReUseData {
58
59 final Object state;
60 final TimeValue validDuration;
61
62 ReUseData(final Object state, final TimeValue validDuration) {
63 this.state = state;
64 this.validDuration = validDuration;
65 }
66
67 }
68
69 private final Logger log;
70 private final AsyncClientConnectionManager manager;
71 private final ConnectionInitiator connectionInitiator;
72 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
73
74
75
76 @Deprecated
77 private final TlsConfig tlsConfig;
78 private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
79 private final AtomicReference<ReUseData> reuseDataRef;
80
81 InternalHttpAsyncExecRuntime(
82 final Logger log,
83 final AsyncClientConnectionManager manager,
84 final ConnectionInitiator connectionInitiator,
85 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
86 final TlsConfig tlsConfig) {
87 super();
88 this.log = log;
89 this.manager = manager;
90 this.connectionInitiator = connectionInitiator;
91 this.pushHandlerFactory = pushHandlerFactory;
92 this.tlsConfig = tlsConfig;
93 this.endpointRef = new AtomicReference<>();
94 this.reuseDataRef = new AtomicReference<>();
95 }
96
97 @Override
98 public boolean isEndpointAcquired() {
99 return endpointRef.get() != null;
100 }
101
102 @Override
103 public Cancellable acquireEndpoint(
104 final String id,
105 final HttpRoute route,
106 final Object object,
107 final HttpClientContext context,
108 final FutureCallback<AsyncExecRuntime> callback) {
109 if (endpointRef.get() == null) {
110 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
111 final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
112 if (log.isDebugEnabled()) {
113 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
114 }
115 return Operations.cancellable(manager.lease(
116 id,
117 route,
118 object,
119 connectionRequestTimeout,
120 new FutureCallback<AsyncConnectionEndpoint>() {
121
122 @Override
123 public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
124 endpointRef.set(connectionEndpoint);
125 if (log.isDebugEnabled()) {
126 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
127 }
128 callback.completed(InternalHttpAsyncExecRuntime.this);
129 }
130
131 @Override
132 public void failed(final Exception ex) {
133 callback.failed(ex);
134 }
135
136 @Override
137 public void cancelled() {
138 callback.cancelled();
139 }
140 }));
141 }
142 callback.completed(this);
143 return Operations.nonCancellable();
144 }
145
146 private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
147 try {
148 endpoint.close(CloseMode.IMMEDIATE);
149 if (log.isDebugEnabled()) {
150 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
151 }
152 } finally {
153 if (log.isDebugEnabled()) {
154 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
155 }
156 manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
157 }
158 }
159
160 @Override
161 public void releaseEndpoint() {
162 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
163 if (endpoint != null) {
164 final ReUseData reUseData = reuseDataRef.getAndSet(null);
165 if (reUseData != null) {
166 if (log.isDebugEnabled()) {
167 log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
168 }
169 manager.release(endpoint, reUseData.state, reUseData.validDuration);
170 } else {
171 discardEndpoint(endpoint);
172 }
173 }
174 }
175
176 @Override
177 public void discardEndpoint() {
178 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
179 if (endpoint != null) {
180 discardEndpoint(endpoint);
181 }
182 }
183
184 @Override
185 public boolean validateConnection() {
186 if (reuseDataRef != null) {
187 final AsyncConnectionEndpoint endpoint = endpointRef.get();
188 return endpoint != null && endpoint.isConnected();
189 }
190 final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
191 if (endpoint != null) {
192 discardEndpoint(endpoint);
193 }
194 return false;
195 }
196
197 AsyncConnectionEndpoint ensureValid() {
198 final AsyncConnectionEndpoint endpoint = endpointRef.get();
199 if (endpoint == null) {
200 throw new IllegalStateException("Endpoint not acquired / already released");
201 }
202 return endpoint;
203 }
204
205 @Override
206 public boolean isEndpointConnected() {
207 final AsyncConnectionEndpoint endpoint = endpointRef.get();
208 return endpoint != null && endpoint.isConnected();
209 }
210
211 @Override
212 public Cancellable connectEndpoint(
213 final HttpClientContext context,
214 final FutureCallback<AsyncExecRuntime> callback) {
215 final AsyncConnectionEndpoint endpoint = ensureValid();
216 if (endpoint.isConnected()) {
217 callback.completed(this);
218 return Operations.nonCancellable();
219 }
220 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
221 @SuppressWarnings("deprecation")
222 final Timeout connectTimeout = requestConfig.getConnectTimeout();
223 if (log.isDebugEnabled()) {
224 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
225 }
226 return Operations.cancellable(manager.connect(
227 endpoint,
228 connectionInitiator,
229 connectTimeout,
230 tlsConfig,
231 context,
232 new CallbackContribution<AsyncConnectionEndpoint>(callback) {
233
234 @Override
235 public void completed(final AsyncConnectionEndpoint endpoint) {
236 if (log.isDebugEnabled()) {
237 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
238 }
239 if (callback != null) {
240 callback.completed(InternalHttpAsyncExecRuntime.this);
241 }
242 }
243
244 }));
245
246 }
247
248 @Override
249 public void disconnectEndpoint() {
250 final AsyncConnectionEndpoint endpoint = endpointRef.get();
251 if (endpoint != null) {
252 endpoint.close(CloseMode.GRACEFUL);
253 }
254 }
255
256 @Override
257 public void upgradeTls(final HttpClientContext context) {
258 upgradeTls(context, null);
259 }
260
261 @Override
262 public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
263 final AsyncConnectionEndpoint endpoint = ensureValid();
264 if (log.isDebugEnabled()) {
265 log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
266 }
267 manager.upgrade(endpoint, tlsConfig, context, new CallbackContribution<AsyncConnectionEndpoint>(callback) {
268
269 @Override
270 public void completed(final AsyncConnectionEndpoint endpoint) {
271 if (callback != null) {
272 callback.completed(InternalHttpAsyncExecRuntime.this);
273 }
274 }
275
276 });
277 }
278
279 @Override
280 public EndpointInfo getEndpointInfo() {
281 final AsyncConnectionEndpoint endpoint = endpointRef.get();
282 return endpoint != null ? endpoint.getInfo() : null;
283 }
284
285 @Override
286 public Cancellable execute(
287 final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
288 final AsyncConnectionEndpoint endpoint = ensureValid();
289 if (endpoint.isConnected()) {
290 if (log.isDebugEnabled()) {
291 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
292 }
293 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
294 final Timeout responseTimeout = requestConfig.getResponseTimeout();
295 if (responseTimeout != null) {
296 endpoint.setSocketTimeout(responseTimeout);
297 }
298 endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
299 if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
300 return () -> {
301 exchangeHandler.cancel();
302 return true;
303 };
304 }
305 } else {
306 connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
307
308 @Override
309 public void completed(final AsyncExecRuntime runtime) {
310 if (log.isDebugEnabled()) {
311 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
312 }
313 try {
314 endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
315 } catch (final RuntimeException ex) {
316 failed(ex);
317 }
318 }
319
320 @Override
321 public void failed(final Exception ex) {
322 exchangeHandler.failed(ex);
323 }
324
325 @Override
326 public void cancelled() {
327 exchangeHandler.failed(new InterruptedIOException());
328 }
329
330 });
331 }
332 return Operations.nonCancellable();
333 }
334
335 @Override
336 public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
337 reuseDataRef.set(new ReUseData(newState, newValidDuration));
338 }
339
340 @Override
341 public void markConnectionNonReusable() {
342 reuseDataRef.set(null);
343 }
344
345 @Override
346 public AsyncExecRuntime fork() {
347 return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig);
348 }
349
350 }