1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  package org.apache.hc.client5.http.impl.nio;
29  
30  import java.net.InetAddress;
31  import java.net.InetSocketAddress;
32  import java.net.SocketAddress;
33  import java.util.concurrent.Future;
34  
35  import org.apache.hc.client5.http.DnsResolver;
36  import org.apache.hc.client5.http.SchemePortResolver;
37  import org.apache.hc.client5.http.UnsupportedSchemeException;
38  import org.apache.hc.client5.http.config.TlsConfig;
39  import org.apache.hc.client5.http.impl.ConnPoolSupport;
40  import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
41  import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
42  import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
43  import org.apache.hc.client5.http.routing.RoutingSupport;
44  import org.apache.hc.core5.annotation.Internal;
45  import org.apache.hc.core5.concurrent.CallbackContribution;
46  import org.apache.hc.core5.concurrent.ComplexFuture;
47  import org.apache.hc.core5.concurrent.FutureCallback;
48  import org.apache.hc.core5.concurrent.FutureContribution;
49  import org.apache.hc.core5.http.HttpHost;
50  import org.apache.hc.core5.http.URIScheme;
51  import org.apache.hc.core5.http.config.Lookup;
52  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
53  import org.apache.hc.core5.http.protocol.HttpContext;
54  import org.apache.hc.core5.net.NamedEndpoint;
55  import org.apache.hc.core5.reactor.ConnectionInitiator;
56  import org.apache.hc.core5.reactor.IOSession;
57  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
58  import org.apache.hc.core5.util.Args;
59  import org.apache.hc.core5.util.Timeout;
60  import org.slf4j.Logger;
61  import org.slf4j.LoggerFactory;
62  
63  @Internal
64  public class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator {
65  
66      private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncClientConnectionOperator.class);
67  
68      private final SchemePortResolver schemePortResolver;
69      private final MultihomeIOSessionRequester sessionRequester;
70      private final Lookup<TlsStrategy> tlsStrategyLookup;
71  
72      DefaultAsyncClientConnectionOperator(
73              final Lookup<TlsStrategy> tlsStrategyLookup,
74              final SchemePortResolver schemePortResolver,
75              final DnsResolver dnsResolver) {
76          this.tlsStrategyLookup = Args.notNull(tlsStrategyLookup, "TLS strategy lookup");
77          this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
78          this.sessionRequester = new MultihomeIOSessionRequester(dnsResolver);
79      }
80  
81      @Override
82      public Future<ManagedAsyncClientConnection> connect(
83              final ConnectionInitiator connectionInitiator,
84              final HttpHost host,
85              final SocketAddress localAddress,
86              final Timeout connectTimeout,
87              final Object attachment,
88              final FutureCallback<ManagedAsyncClientConnection> callback) {
89          return connect(connectionInitiator, host, null, localAddress, connectTimeout,
90              attachment, null, callback);
91      }
92  
93      @Override
94      public Future<ManagedAsyncClientConnection> connect(
95              final ConnectionInitiator connectionInitiator,
96              final HttpHost endpointHost,
97              final NamedEndpoint endpointName,
98              final SocketAddress localAddress,
99              final Timeout connectTimeout,
100             final Object attachment,
101             final HttpContext context,
102             final FutureCallback<ManagedAsyncClientConnection> callback) {
103         Args.notNull(connectionInitiator, "Connection initiator");
104         Args.notNull(endpointHost, "Host");
105         final ComplexFuture<ManagedAsyncClientConnection> future = new ComplexFuture<>(callback);
106         final HttpHost remoteEndpoint = RoutingSupport.normalize(endpointHost, schemePortResolver);
107         final InetAddress remoteAddress = endpointHost.getAddress();
108         final TlsConfig tlsConfig = attachment instanceof TlsConfig ? (TlsConfig) attachment : TlsConfig.DEFAULT;
109 
110         onBeforeSocketConnect(context, endpointHost);
111         if (LOG.isDebugEnabled()) {
112             LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout);
113         }
114 
115         final Future<IOSession> sessionFuture = sessionRequester.connect(
116                 connectionInitiator,
117                 remoteEndpoint,
118                 remoteAddress != null ? new InetSocketAddress(remoteAddress, remoteEndpoint.getPort()) : null,
119                 localAddress,
120                 connectTimeout,
121                 tlsConfig.getHttpVersionPolicy(),
122                 new FutureCallback<IOSession>() {
123 
124                     @Override
125                     public void completed(final IOSession session) {
126                         final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
127                         onAfterSocketConnect(context, endpointHost);
128                         if (LOG.isDebugEnabled()) {
129                             LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(connection), endpointHost,
130                                     connection.getLocalAddress(), connection.getRemoteAddress());
131                         }
132                         final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(endpointHost.getSchemeName()) : null;
133                         if (tlsStrategy != null) {
134                             try {
135                                 final Timeout socketTimeout = connection.getSocketTimeout();
136                                 final Timeout handshakeTimeout = tlsConfig.getHandshakeTimeout();
137                                 final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
138                                 onBeforeTlsHandshake(context, endpointHost);
139                                 if (LOG.isDebugEnabled()) {
140                                     LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(connection), tlsName);
141                                 }
142                                 tlsStrategy.upgrade(
143                                         connection,
144                                         tlsName,
145                                         attachment,
146                                         handshakeTimeout != null ? handshakeTimeout : connectTimeout,
147                                         new FutureContribution<TransportSecurityLayer>(future) {
148 
149                                             @Override
150                                             public void completed(final TransportSecurityLayer transportSecurityLayer) {
151                                                 connection.setSocketTimeout(socketTimeout);
152                                                 future.completed(connection);
153                                                 onAfterTlsHandshake(context, endpointHost);
154                                                 if (LOG.isDebugEnabled()) {
155                                                     LOG.debug("{} {} upgraded to TLS", ConnPoolSupport.getId(connection), tlsName);
156                                                 }
157                                             }
158 
159                                         });
160                             } catch (final Exception ex) {
161                                 future.failed(ex);
162                             }
163                         } else {
164                             future.completed(connection);
165                         }
166                     }
167 
168                     @Override
169                     public void failed(final Exception ex) {
170                         future.failed(ex);
171                     }
172 
173                     @Override
174                     public void cancelled() {
175                         future.cancel();
176                     }
177 
178                 });
179         future.setDependency(sessionFuture);
180         return future;
181     }
182 
183     @Override
184     public void upgrade(
185             final ManagedAsyncClientConnection connection,
186             final HttpHost host,
187             final Object attachment) {
188         upgrade(connection, host, null, attachment, null, null);
189     }
190 
191     @Override
192     public void upgrade(
193             final ManagedAsyncClientConnection connection,
194             final HttpHost endpointHost,
195             final NamedEndpoint endpointName,
196             final Object attachment,
197             final HttpContext context,
198             final FutureCallback<ManagedAsyncClientConnection> callback) {
199         final String newProtocol = URIScheme.HTTP.same(endpointHost.getSchemeName()) ? URIScheme.HTTPS.id : endpointHost.getSchemeName();
200         final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(newProtocol) : null;
201         if (tlsStrategy != null) {
202             final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
203             if (LOG.isDebugEnabled()) {
204                 LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(connection), tlsName);
205             }
206             tlsStrategy.upgrade(
207                     connection,
208                     tlsName,
209                     attachment,
210                     null,
211                     new CallbackContribution<TransportSecurityLayer>(callback) {
212 
213                         @Override
214                         public void completed(final TransportSecurityLayer transportSecurityLayer) {
215                             if (callback != null) {
216                                 callback.completed(connection);
217                             }
218                         }
219 
220                     });
221         } else {
222             callback.failed(new UnsupportedSchemeException(newProtocol + " protocol is not supported"));
223         }
224     }
225 
226     protected void onBeforeSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
227     }
228 
229     protected void onAfterSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
230     }
231 
232     protected void onBeforeTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
233     }
234 
235     protected void onAfterTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
236     }
237 
238 }