View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.EndpointInfo;
34  import org.apache.hc.client5.http.HttpRoute;
35  import org.apache.hc.client5.http.async.AsyncExecRuntime;
36  import org.apache.hc.client5.http.config.RequestConfig;
37  import org.apache.hc.client5.http.impl.ConnPoolSupport;
38  import org.apache.hc.client5.http.impl.Operations;
39  import org.apache.hc.client5.http.protocol.HttpClientContext;
40  import org.apache.hc.core5.concurrent.Cancellable;
41  import org.apache.hc.core5.concurrent.ComplexCancellable;
42  import org.apache.hc.core5.concurrent.FutureCallback;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
45  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
46  import org.apache.hc.core5.http.nio.HandlerFactory;
47  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.reactor.Command;
50  import org.apache.hc.core5.reactor.IOSession;
51  import org.apache.hc.core5.reactor.ssl.TlsDetails;
52  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
53  import org.apache.hc.core5.util.Identifiable;
54  import org.apache.hc.core5.util.TimeValue;
55  import org.apache.hc.core5.util.Timeout;
56  import org.slf4j.Logger;
57  
58  class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
59  
60      private final Logger log;
61      private final InternalH2ConnPool connPool;
62      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
63      private final AtomicReference<Endpoint> sessionRef;
64      private volatile boolean reusable;
65  
66      InternalH2AsyncExecRuntime(
67              final Logger log,
68              final InternalH2ConnPool connPool,
69              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
70          super();
71          this.log = log;
72          this.connPool = connPool;
73          this.pushHandlerFactory = pushHandlerFactory;
74          this.sessionRef = new AtomicReference<>();
75      }
76  
77      @Override
78      public boolean isEndpointAcquired() {
79          return sessionRef.get() != null;
80      }
81  
82      @Override
83      public Cancellable acquireEndpoint(
84              final String id,
85              final HttpRoute route,
86              final Object object,
87              final HttpClientContext context,
88              final FutureCallback<AsyncExecRuntime> callback) {
89          if (sessionRef.get() == null) {
90              final RequestConfig requestConfig = context.getRequestConfigOrDefault();
91              @SuppressWarnings("deprecation")
92              final Timeout connectTimeout = requestConfig.getConnectTimeout();
93              if (log.isDebugEnabled()) {
94                  log.debug("{} acquiring endpoint ({})", id, connectTimeout);
95              }
96              return Operations.cancellable(connPool.getSession(route, connectTimeout,
97                      new FutureCallback<IOSession>() {
98  
99                          @Override
100                         public void completed(final IOSession ioSession) {
101                             sessionRef.set(new Endpoint(route, ioSession));
102                             reusable = true;
103                             if (log.isDebugEnabled()) {
104                                 log.debug("{} acquired endpoint", id);
105                             }
106                             callback.completed(InternalH2AsyncExecRuntime.this);
107                         }
108 
109                         @Override
110                         public void failed(final Exception ex) {
111                             callback.failed(ex);
112                         }
113 
114                         @Override
115                         public void cancelled() {
116                             callback.cancelled();
117                         }
118 
119                     }));
120         }
121         callback.completed(this);
122         return Operations.nonCancellable();
123     }
124 
125     private void closeEndpoint(final Endpoint endpoint) {
126         endpoint.session.close(CloseMode.GRACEFUL);
127         if (log.isDebugEnabled()) {
128             log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
129         }
130     }
131 
132     @Override
133     public void releaseEndpoint() {
134         final Endpoint endpoint = sessionRef.getAndSet(null);
135         if (endpoint != null && !reusable) {
136             closeEndpoint(endpoint);
137         }
138     }
139 
140     @Override
141     public void discardEndpoint() {
142         final Endpoint endpoint = sessionRef.getAndSet(null);
143         if (endpoint != null) {
144             closeEndpoint(endpoint);
145         }
146     }
147 
148     @Override
149     public boolean validateConnection() {
150         if (reusable) {
151             final Endpoint endpoint = sessionRef.get();
152             return endpoint != null && endpoint.session.isOpen();
153         }
154         final Endpoint endpoint = sessionRef.getAndSet(null);
155         if (endpoint != null) {
156             closeEndpoint(endpoint);
157         }
158         return false;
159     }
160 
161     @Override
162     public boolean isEndpointConnected() {
163         final Endpoint endpoint = sessionRef.get();
164         return endpoint != null && endpoint.session.isOpen();
165     }
166 
167 
168     Endpoint ensureValid() {
169         final Endpoint endpoint = sessionRef.get();
170         if (endpoint == null) {
171             throw new IllegalStateException("I/O session not acquired / already released");
172         }
173         return endpoint;
174     }
175 
176     @Override
177     public Cancellable connectEndpoint(
178             final HttpClientContext context,
179             final FutureCallback<AsyncExecRuntime> callback) {
180         final Endpoint endpoint = ensureValid();
181         if (endpoint.session.isOpen()) {
182             callback.completed(this);
183             return Operations.nonCancellable();
184         }
185         final HttpRoute route = endpoint.route;
186         final RequestConfig requestConfig = context.getRequestConfigOrDefault();
187         @SuppressWarnings("deprecation")
188         final Timeout connectTimeout = requestConfig.getConnectTimeout();
189         if (log.isDebugEnabled()) {
190             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
191         }
192         return Operations.cancellable(connPool.getSession(route, connectTimeout,
193             new FutureCallback<IOSession>() {
194 
195             @Override
196             public void completed(final IOSession ioSession) {
197                 sessionRef.set(new Endpoint(route, ioSession));
198                 reusable = true;
199                 if (log.isDebugEnabled()) {
200                     log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
201                 }
202                 callback.completed(InternalH2AsyncExecRuntime.this);
203             }
204 
205             @Override
206             public void failed(final Exception ex) {
207                 callback.failed(ex);
208             }
209 
210             @Override
211             public void cancelled() {
212                 callback.cancelled();
213             }
214 
215         }));
216 
217     }
218 
219     @Override
220     public void disconnectEndpoint() {
221         final Endpoint endpoint = sessionRef.get();
222         if (endpoint != null) {
223             endpoint.session.close(CloseMode.GRACEFUL);
224         }
225     }
226 
227     @Override
228     public void upgradeTls(final HttpClientContext context) {
229         throw new UnsupportedOperationException();
230     }
231 
232     @Override
233     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
234         throw new UnsupportedOperationException();
235     }
236 
237     @Override
238     public EndpointInfo getEndpointInfo() {
239         final Endpoint endpoint = sessionRef.get();
240         if (endpoint != null && endpoint.session.isOpen()) {
241             if (endpoint.session instanceof TransportSecurityLayer) {
242                 final TlsDetails tlsDetails = ((TransportSecurityLayer) endpoint.session).getTlsDetails();
243                 return new EndpointInfo(HttpVersion.HTTP_2, tlsDetails != null ? tlsDetails.getSSLSession() : null);
244             }
245         }
246         return null;
247     }
248 
249     @Override
250     public Cancellable execute(
251             final String id,
252             final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
253         final ComplexCancellable complexCancellable = new ComplexCancellable();
254         final Endpoint endpoint = ensureValid();
255         final IOSession session = endpoint.session;
256         if (session.isOpen()) {
257             if (log.isDebugEnabled()) {
258                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
259             }
260             context.setProtocolVersion(HttpVersion.HTTP_2);
261             session.enqueue(
262                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
263                     Command.Priority.NORMAL);
264         } else {
265             final HttpRoute route = endpoint.route;
266             final RequestConfig requestConfig = context.getRequestConfigOrDefault();
267             @SuppressWarnings("deprecation")
268             final Timeout connectTimeout = requestConfig.getConnectTimeout();
269             connPool.getSession(route, connectTimeout, new FutureCallback<IOSession>() {
270 
271                 @Override
272                 public void completed(final IOSession ioSession) {
273                     sessionRef.set(new Endpoint(route, ioSession));
274                     reusable = true;
275                     if (log.isDebugEnabled()) {
276                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
277                     }
278                     context.setProtocolVersion(HttpVersion.HTTP_2);
279                     session.enqueue(
280                             new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
281                             Command.Priority.NORMAL);
282                 }
283 
284                 @Override
285                 public void failed(final Exception ex) {
286                     exchangeHandler.failed(ex);
287                 }
288 
289                 @Override
290                 public void cancelled() {
291                     exchangeHandler.failed(new InterruptedIOException());
292                 }
293 
294             });
295         }
296         return complexCancellable;
297     }
298 
299     @Override
300     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
301         throw new UnsupportedOperationException();
302     }
303 
304     @Override
305     public void markConnectionNonReusable() {
306         reusable = false;
307     }
308 
309     static class Endpoint implements Identifiable {
310 
311         final HttpRoute route;
312         final IOSession session;
313 
314         Endpoint(final HttpRoute route, final IOSession session) {
315             this.route = route;
316             this.session = session;
317         }
318 
319         @Override
320         public String getId() {
321             return session.getId();
322         }
323 
324     }
325 
326     @Override
327     public AsyncExecRuntime fork() {
328         return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
329     }
330 
331 }