1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.hc.client5.http.EndpointInfo;
34 import org.apache.hc.client5.http.HttpRoute;
35 import org.apache.hc.client5.http.async.AsyncExecRuntime;
36 import org.apache.hc.client5.http.config.RequestConfig;
37 import org.apache.hc.client5.http.impl.ConnPoolSupport;
38 import org.apache.hc.client5.http.impl.Operations;
39 import org.apache.hc.client5.http.protocol.HttpClientContext;
40 import org.apache.hc.core5.concurrent.Cancellable;
41 import org.apache.hc.core5.concurrent.ComplexCancellable;
42 import org.apache.hc.core5.concurrent.FutureCallback;
43 import org.apache.hc.core5.http.HttpVersion;
44 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
45 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
46 import org.apache.hc.core5.http.nio.HandlerFactory;
47 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
48 import org.apache.hc.core5.io.CloseMode;
49 import org.apache.hc.core5.reactor.Command;
50 import org.apache.hc.core5.reactor.IOSession;
51 import org.apache.hc.core5.reactor.ssl.TlsDetails;
52 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
53 import org.apache.hc.core5.util.Identifiable;
54 import org.apache.hc.core5.util.TimeValue;
55 import org.apache.hc.core5.util.Timeout;
56 import org.slf4j.Logger;
57
58 class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
59
60 private final Logger log;
61 private final InternalH2ConnPool connPool;
62 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
63 private final AtomicReference<Endpoint> sessionRef;
64 private volatile boolean reusable;
65
66 InternalH2AsyncExecRuntime(
67 final Logger log,
68 final InternalH2ConnPool connPool,
69 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
70 super();
71 this.log = log;
72 this.connPool = connPool;
73 this.pushHandlerFactory = pushHandlerFactory;
74 this.sessionRef = new AtomicReference<>();
75 }
76
77 @Override
78 public boolean isEndpointAcquired() {
79 return sessionRef.get() != null;
80 }
81
82 @Override
83 public Cancellable acquireEndpoint(
84 final String id,
85 final HttpRoute route,
86 final Object object,
87 final HttpClientContext context,
88 final FutureCallback<AsyncExecRuntime> callback) {
89 if (sessionRef.get() == null) {
90 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
91 @SuppressWarnings("deprecation")
92 final Timeout connectTimeout = requestConfig.getConnectTimeout();
93 if (log.isDebugEnabled()) {
94 log.debug("{} acquiring endpoint ({})", id, connectTimeout);
95 }
96 return Operations.cancellable(connPool.getSession(route, connectTimeout,
97 new FutureCallback<IOSession>() {
98
99 @Override
100 public void completed(final IOSession ioSession) {
101 sessionRef.set(new Endpoint(route, ioSession));
102 reusable = true;
103 if (log.isDebugEnabled()) {
104 log.debug("{} acquired endpoint", id);
105 }
106 callback.completed(InternalH2AsyncExecRuntime.this);
107 }
108
109 @Override
110 public void failed(final Exception ex) {
111 callback.failed(ex);
112 }
113
114 @Override
115 public void cancelled() {
116 callback.cancelled();
117 }
118
119 }));
120 }
121 callback.completed(this);
122 return Operations.nonCancellable();
123 }
124
125 private void closeEndpoint(final Endpoint endpoint) {
126 endpoint.session.close(CloseMode.GRACEFUL);
127 if (log.isDebugEnabled()) {
128 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
129 }
130 }
131
132 @Override
133 public void releaseEndpoint() {
134 final Endpoint endpoint = sessionRef.getAndSet(null);
135 if (endpoint != null && !reusable) {
136 closeEndpoint(endpoint);
137 }
138 }
139
140 @Override
141 public void discardEndpoint() {
142 final Endpoint endpoint = sessionRef.getAndSet(null);
143 if (endpoint != null) {
144 closeEndpoint(endpoint);
145 }
146 }
147
148 @Override
149 public boolean validateConnection() {
150 if (reusable) {
151 final Endpoint endpoint = sessionRef.get();
152 return endpoint != null && endpoint.session.isOpen();
153 }
154 final Endpoint endpoint = sessionRef.getAndSet(null);
155 if (endpoint != null) {
156 closeEndpoint(endpoint);
157 }
158 return false;
159 }
160
161 @Override
162 public boolean isEndpointConnected() {
163 final Endpoint endpoint = sessionRef.get();
164 return endpoint != null && endpoint.session.isOpen();
165 }
166
167
168 Endpoint ensureValid() {
169 final Endpoint endpoint = sessionRef.get();
170 if (endpoint == null) {
171 throw new IllegalStateException("I/O session not acquired / already released");
172 }
173 return endpoint;
174 }
175
176 @Override
177 public Cancellable connectEndpoint(
178 final HttpClientContext context,
179 final FutureCallback<AsyncExecRuntime> callback) {
180 final Endpoint endpoint = ensureValid();
181 if (endpoint.session.isOpen()) {
182 callback.completed(this);
183 return Operations.nonCancellable();
184 }
185 final HttpRoute route = endpoint.route;
186 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
187 @SuppressWarnings("deprecation")
188 final Timeout connectTimeout = requestConfig.getConnectTimeout();
189 if (log.isDebugEnabled()) {
190 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
191 }
192 return Operations.cancellable(connPool.getSession(route, connectTimeout,
193 new FutureCallback<IOSession>() {
194
195 @Override
196 public void completed(final IOSession ioSession) {
197 sessionRef.set(new Endpoint(route, ioSession));
198 reusable = true;
199 if (log.isDebugEnabled()) {
200 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
201 }
202 callback.completed(InternalH2AsyncExecRuntime.this);
203 }
204
205 @Override
206 public void failed(final Exception ex) {
207 callback.failed(ex);
208 }
209
210 @Override
211 public void cancelled() {
212 callback.cancelled();
213 }
214
215 }));
216
217 }
218
219 @Override
220 public void disconnectEndpoint() {
221 final Endpoint endpoint = sessionRef.get();
222 if (endpoint != null) {
223 endpoint.session.close(CloseMode.GRACEFUL);
224 }
225 }
226
227 @Override
228 public void upgradeTls(final HttpClientContext context) {
229 throw new UnsupportedOperationException();
230 }
231
232 @Override
233 public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
234 throw new UnsupportedOperationException();
235 }
236
237 @Override
238 public EndpointInfo getEndpointInfo() {
239 final Endpoint endpoint = sessionRef.get();
240 if (endpoint != null && endpoint.session.isOpen()) {
241 if (endpoint.session instanceof TransportSecurityLayer) {
242 final TlsDetails tlsDetails = ((TransportSecurityLayer) endpoint.session).getTlsDetails();
243 return new EndpointInfo(HttpVersion.HTTP_2, tlsDetails != null ? tlsDetails.getSSLSession() : null);
244 }
245 }
246 return null;
247 }
248
249 @Override
250 public Cancellable execute(
251 final String id,
252 final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
253 final ComplexCancellable complexCancellable = new ComplexCancellable();
254 final Endpoint endpoint = ensureValid();
255 final IOSession session = endpoint.session;
256 if (session.isOpen()) {
257 if (log.isDebugEnabled()) {
258 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
259 }
260 context.setProtocolVersion(HttpVersion.HTTP_2);
261 session.enqueue(
262 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
263 Command.Priority.NORMAL);
264 } else {
265 final HttpRoute route = endpoint.route;
266 final RequestConfig requestConfig = context.getRequestConfigOrDefault();
267 @SuppressWarnings("deprecation")
268 final Timeout connectTimeout = requestConfig.getConnectTimeout();
269 connPool.getSession(route, connectTimeout, new FutureCallback<IOSession>() {
270
271 @Override
272 public void completed(final IOSession ioSession) {
273 sessionRef.set(new Endpoint(route, ioSession));
274 reusable = true;
275 if (log.isDebugEnabled()) {
276 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
277 }
278 context.setProtocolVersion(HttpVersion.HTTP_2);
279 session.enqueue(
280 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
281 Command.Priority.NORMAL);
282 }
283
284 @Override
285 public void failed(final Exception ex) {
286 exchangeHandler.failed(ex);
287 }
288
289 @Override
290 public void cancelled() {
291 exchangeHandler.failed(new InterruptedIOException());
292 }
293
294 });
295 }
296 return complexCancellable;
297 }
298
299 @Override
300 public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
301 throw new UnsupportedOperationException();
302 }
303
304 @Override
305 public void markConnectionNonReusable() {
306 reusable = false;
307 }
308
309 static class Endpoint implements Identifiable {
310
311 final HttpRoute route;
312 final IOSession session;
313
314 Endpoint(final HttpRoute route, final IOSession session) {
315 this.route = route;
316 this.session = session;
317 }
318
319 @Override
320 public String getId() {
321 return session.getId();
322 }
323
324 }
325
326 @Override
327 public AsyncExecRuntime fork() {
328 return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
329 }
330
331 }