View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.EndpointInfo;
34  import org.apache.hc.client5.http.HttpRoute;
35  import org.apache.hc.client5.http.async.AsyncExecRuntime;
36  import org.apache.hc.client5.http.config.RequestConfig;
37  import org.apache.hc.client5.http.config.TlsConfig;
38  import org.apache.hc.client5.http.impl.ConnPoolSupport;
39  import org.apache.hc.client5.http.impl.Operations;
40  import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
41  import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
42  import org.apache.hc.client5.http.protocol.HttpClientContext;
43  import org.apache.hc.core5.concurrent.CallbackContribution;
44  import org.apache.hc.core5.concurrent.Cancellable;
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
47  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48  import org.apache.hc.core5.http.nio.HandlerFactory;
49  import org.apache.hc.core5.io.CloseMode;
50  import org.apache.hc.core5.reactor.ConnectionInitiator;
51  import org.apache.hc.core5.util.TimeValue;
52  import org.apache.hc.core5.util.Timeout;
53  import org.slf4j.Logger;
54  
55  class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
56  
57      static class ReUseData {
58  
59          final Object state;
60          final TimeValue validDuration;
61  
62        ReUseData(final Object state, final TimeValue validDuration) {
63          this.state = state;
64          this.validDuration = validDuration;
65        }
66  
67      }
68  
69      private final Logger log;
70      private final AsyncClientConnectionManager manager;
71      private final ConnectionInitiator connectionInitiator;
72      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
73      /**
74       * @deprecated TLS should be configured by the connection manager
75       */
76      @Deprecated
77      private final TlsConfig tlsConfig;
78      private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
79      private final AtomicReference<ReUseData> reuseDataRef;
80  
81      InternalHttpAsyncExecRuntime(
82              final Logger log,
83              final AsyncClientConnectionManager manager,
84              final ConnectionInitiator connectionInitiator,
85              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
86              final TlsConfig tlsConfig) {
87          super();
88          this.log = log;
89          this.manager = manager;
90          this.connectionInitiator = connectionInitiator;
91          this.pushHandlerFactory = pushHandlerFactory;
92          this.tlsConfig = tlsConfig;
93          this.endpointRef = new AtomicReference<>();
94          this.reuseDataRef = new AtomicReference<>();
95      }
96  
97      @Override
98      public boolean isEndpointAcquired() {
99          return endpointRef.get() != null;
100     }
101 
102     @Override
103     public Cancellable acquireEndpoint(
104             final String id,
105             final HttpRoute route,
106             final Object object,
107             final HttpClientContext context,
108             final FutureCallback<AsyncExecRuntime> callback) {
109         if (endpointRef.get() == null) {
110             final RequestConfig requestConfig = context.getRequestConfigOrDefault();
111             final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
112             if (log.isDebugEnabled()) {
113                 log.debug("{} acquiring endpoint ({})", id, connectionRequestTimeout);
114             }
115             return Operations.cancellable(manager.lease(
116                     id,
117                     route,
118                     object,
119                     connectionRequestTimeout,
120                     new FutureCallback<AsyncConnectionEndpoint>() {
121 
122                         @Override
123                         public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
124                             endpointRef.set(connectionEndpoint);
125                             if (log.isDebugEnabled()) {
126                                 log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
127                             }
128                             callback.completed(InternalHttpAsyncExecRuntime.this);
129                         }
130 
131                         @Override
132                         public void failed(final Exception ex) {
133                             callback.failed(ex);
134                         }
135 
136                         @Override
137                         public void cancelled() {
138                             callback.cancelled();
139                         }
140                     }));
141         }
142         callback.completed(this);
143         return Operations.nonCancellable();
144     }
145 
146     private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
147         try {
148             endpoint.close(CloseMode.IMMEDIATE);
149             if (log.isDebugEnabled()) {
150                 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
151             }
152         } finally {
153             if (log.isDebugEnabled()) {
154                 log.debug("{} discarding endpoint", ConnPoolSupport.getId(endpoint));
155             }
156             manager.release(endpoint, null, TimeValue.ZERO_MILLISECONDS);
157         }
158     }
159 
160     @Override
161     public void releaseEndpoint() {
162         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
163         if (endpoint != null) {
164             final ReUseData reUseData = reuseDataRef.getAndSet(null);
165             if (reUseData != null) {
166                 if (log.isDebugEnabled()) {
167                     log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
168                 }
169                 manager.release(endpoint, reUseData.state, reUseData.validDuration);
170             } else {
171                 discardEndpoint(endpoint);
172             }
173         }
174     }
175 
176     @Override
177     public void discardEndpoint() {
178         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
179         if (endpoint != null) {
180             discardEndpoint(endpoint);
181         }
182     }
183 
184     @Override
185     public boolean validateConnection() {
186         if (reuseDataRef != null) {
187             final AsyncConnectionEndpoint endpoint = endpointRef.get();
188             return endpoint != null && endpoint.isConnected();
189         }
190         final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
191         if (endpoint != null) {
192             discardEndpoint(endpoint);
193         }
194         return false;
195     }
196 
197     AsyncConnectionEndpoint ensureValid() {
198         final AsyncConnectionEndpoint endpoint = endpointRef.get();
199         if (endpoint == null) {
200             throw new IllegalStateException("Endpoint not acquired / already released");
201         }
202         return endpoint;
203     }
204 
205     @Override
206     public boolean isEndpointConnected() {
207         final AsyncConnectionEndpoint endpoint = endpointRef.get();
208         return endpoint != null && endpoint.isConnected();
209     }
210 
211     @Override
212     public Cancellable connectEndpoint(
213             final HttpClientContext context,
214             final FutureCallback<AsyncExecRuntime> callback) {
215         final AsyncConnectionEndpoint endpoint = ensureValid();
216         if (endpoint.isConnected()) {
217             callback.completed(this);
218             return Operations.nonCancellable();
219         }
220         final RequestConfig requestConfig = context.getRequestConfigOrDefault();
221         @SuppressWarnings("deprecation")
222         final Timeout connectTimeout = requestConfig.getConnectTimeout();
223         if (log.isDebugEnabled()) {
224             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
225         }
226         return Operations.cancellable(manager.connect(
227                 endpoint,
228                 connectionInitiator,
229                 connectTimeout,
230                 tlsConfig,
231                 context,
232                 new CallbackContribution<AsyncConnectionEndpoint>(callback) {
233 
234                     @Override
235                     public void completed(final AsyncConnectionEndpoint endpoint) {
236                         if (log.isDebugEnabled()) {
237                             log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
238                         }
239                         if (callback != null) {
240                             callback.completed(InternalHttpAsyncExecRuntime.this);
241                         }
242                     }
243 
244         }));
245 
246     }
247 
248     @Override
249     public void disconnectEndpoint() {
250         final AsyncConnectionEndpoint endpoint = endpointRef.get();
251         if (endpoint != null) {
252             endpoint.close(CloseMode.GRACEFUL);
253         }
254     }
255 
256     @Override
257     public void upgradeTls(final HttpClientContext context) {
258         upgradeTls(context, null);
259     }
260 
261     @Override
262     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
263         final AsyncConnectionEndpoint endpoint = ensureValid();
264         if (log.isDebugEnabled()) {
265             log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
266         }
267         manager.upgrade(endpoint, tlsConfig, context, new CallbackContribution<AsyncConnectionEndpoint>(callback) {
268 
269             @Override
270             public void completed(final AsyncConnectionEndpoint endpoint) {
271                 if (callback != null) {
272                     callback.completed(InternalHttpAsyncExecRuntime.this);
273                 }
274             }
275 
276         });
277     }
278 
279     @Override
280     public EndpointInfo getEndpointInfo() {
281         final AsyncConnectionEndpoint endpoint = endpointRef.get();
282         return endpoint != null ? endpoint.getInfo() : null;
283     }
284 
285     @Override
286     public Cancellable execute(
287             final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
288         final AsyncConnectionEndpoint endpoint = ensureValid();
289         if (endpoint.isConnected()) {
290             if (log.isDebugEnabled()) {
291                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
292             }
293             final RequestConfig requestConfig = context.getRequestConfigOrDefault();
294             final Timeout responseTimeout = requestConfig.getResponseTimeout();
295             if (responseTimeout != null) {
296                 endpoint.setSocketTimeout(responseTimeout);
297             }
298             endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
299             if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
300                 return () -> {
301                     exchangeHandler.cancel();
302                     return true;
303                 };
304             }
305         } else {
306             connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
307 
308                 @Override
309                 public void completed(final AsyncExecRuntime runtime) {
310                     if (log.isDebugEnabled()) {
311                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
312                     }
313                     try {
314                         endpoint.execute(id, exchangeHandler, pushHandlerFactory, context);
315                     } catch (final RuntimeException ex) {
316                         failed(ex);
317                     }
318                 }
319 
320                 @Override
321                 public void failed(final Exception ex) {
322                     exchangeHandler.failed(ex);
323                 }
324 
325                 @Override
326                 public void cancelled() {
327                     exchangeHandler.failed(new InterruptedIOException());
328                 }
329 
330             });
331         }
332         return Operations.nonCancellable();
333     }
334 
335     @Override
336     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
337         reuseDataRef.set(new ReUseData(newState, newValidDuration));
338     }
339 
340     @Override
341     public void markConnectionNonReusable() {
342         reuseDataRef.set(null);
343     }
344 
345     @Override
346     public AsyncExecRuntime fork() {
347         return new InternalHttpAsyncExecRuntime(log, manager, connectionInitiator, pushHandlerFactory, tlsConfig);
348     }
349 
350 }