1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.SocketAddress;
33 import java.util.concurrent.Future;
34
35 import org.apache.hc.client5.http.DnsResolver;
36 import org.apache.hc.client5.http.SchemePortResolver;
37 import org.apache.hc.client5.http.UnsupportedSchemeException;
38 import org.apache.hc.client5.http.config.TlsConfig;
39 import org.apache.hc.client5.http.impl.ConnPoolSupport;
40 import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
41 import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
42 import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
43 import org.apache.hc.client5.http.routing.RoutingSupport;
44 import org.apache.hc.core5.annotation.Internal;
45 import org.apache.hc.core5.concurrent.CallbackContribution;
46 import org.apache.hc.core5.concurrent.ComplexFuture;
47 import org.apache.hc.core5.concurrent.FutureCallback;
48 import org.apache.hc.core5.concurrent.FutureContribution;
49 import org.apache.hc.core5.http.HttpHost;
50 import org.apache.hc.core5.http.URIScheme;
51 import org.apache.hc.core5.http.config.Lookup;
52 import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
53 import org.apache.hc.core5.http.protocol.HttpContext;
54 import org.apache.hc.core5.net.NamedEndpoint;
55 import org.apache.hc.core5.reactor.ConnectionInitiator;
56 import org.apache.hc.core5.reactor.IOSession;
57 import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
58 import org.apache.hc.core5.util.Args;
59 import org.apache.hc.core5.util.Timeout;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 @Internal
64 public class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator {
65
66 private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncClientConnectionOperator.class);
67
68 private final SchemePortResolver schemePortResolver;
69 private final MultihomeIOSessionRequester sessionRequester;
70 private final Lookup<TlsStrategy> tlsStrategyLookup;
71
72
73
74
75
76
77
78
79
80 protected DefaultAsyncClientConnectionOperator(
81 final Lookup<TlsStrategy> tlsStrategyLookup,
82 final SchemePortResolver schemePortResolver,
83 final DnsResolver dnsResolver) {
84 this.tlsStrategyLookup = Args.notNull(tlsStrategyLookup, "TLS strategy lookup");
85 this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
86 this.sessionRequester = new MultihomeIOSessionRequester(dnsResolver);
87 }
88
89 @Override
90 public Future<ManagedAsyncClientConnection> connect(
91 final ConnectionInitiator connectionInitiator,
92 final HttpHost host,
93 final SocketAddress localAddress,
94 final Timeout connectTimeout,
95 final Object attachment,
96 final FutureCallback<ManagedAsyncClientConnection> callback) {
97 return connect(connectionInitiator, host, null, localAddress, connectTimeout,
98 attachment, null, callback);
99 }
100
101 @Override
102 public Future<ManagedAsyncClientConnection> connect(
103 final ConnectionInitiator connectionInitiator,
104 final HttpHost endpointHost,
105 final NamedEndpoint endpointName,
106 final SocketAddress localAddress,
107 final Timeout connectTimeout,
108 final Object attachment,
109 final HttpContext context,
110 final FutureCallback<ManagedAsyncClientConnection> callback) {
111 Args.notNull(connectionInitiator, "Connection initiator");
112 Args.notNull(endpointHost, "Host");
113 final ComplexFuture<ManagedAsyncClientConnection> future = new ComplexFuture<>(callback);
114 final HttpHost remoteEndpoint = RoutingSupport.normalize(endpointHost, schemePortResolver);
115 final InetAddress remoteAddress = endpointHost.getAddress();
116 final TlsConfig tlsConfig = attachment instanceof TlsConfig ? (TlsConfig) attachment : TlsConfig.DEFAULT;
117
118 onBeforeSocketConnect(context, endpointHost);
119 if (LOG.isDebugEnabled()) {
120 LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout);
121 }
122
123 final Future<IOSession> sessionFuture = sessionRequester.connect(
124 connectionInitiator,
125 remoteEndpoint,
126 remoteAddress != null ? new InetSocketAddress(remoteAddress, remoteEndpoint.getPort()) : null,
127 localAddress,
128 connectTimeout,
129 tlsConfig.getHttpVersionPolicy(),
130 new FutureCallback<IOSession>() {
131
132 @Override
133 public void completed(final IOSession session) {
134 final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
135 onAfterSocketConnect(context, endpointHost);
136 if (LOG.isDebugEnabled()) {
137 LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(connection), endpointHost,
138 connection.getLocalAddress(), connection.getRemoteAddress());
139 }
140 final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(endpointHost.getSchemeName()) : null;
141 if (tlsStrategy != null) {
142 try {
143 final Timeout socketTimeout = connection.getSocketTimeout();
144 final Timeout handshakeTimeout = tlsConfig.getHandshakeTimeout() != null ? tlsConfig.getHandshakeTimeout() : connectTimeout;
145 final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
146 onBeforeTlsHandshake(context, endpointHost);
147 if (LOG.isDebugEnabled()) {
148 LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(connection), tlsName);
149 }
150 tlsStrategy.upgrade(
151 connection,
152 tlsName,
153 attachment,
154 handshakeTimeout,
155 new FutureContribution<TransportSecurityLayer>(future) {
156
157 @Override
158 public void completed(final TransportSecurityLayer transportSecurityLayer) {
159 connection.setSocketTimeout(socketTimeout);
160 future.completed(connection);
161 onAfterTlsHandshake(context, endpointHost);
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("{} {} upgraded to TLS", ConnPoolSupport.getId(connection), tlsName);
164 }
165 }
166
167 });
168 } catch (final Exception ex) {
169 future.failed(ex);
170 }
171 } else {
172 future.completed(connection);
173 }
174 }
175
176 @Override
177 public void failed(final Exception ex) {
178 future.failed(ex);
179 }
180
181 @Override
182 public void cancelled() {
183 future.cancel();
184 }
185
186 });
187 future.setDependency(sessionFuture);
188 return future;
189 }
190
191 @Override
192 public void upgrade(
193 final ManagedAsyncClientConnection connection,
194 final HttpHost host,
195 final Object attachment) {
196 upgrade(connection, host, null, attachment, null, null);
197 }
198
199 @Override
200 public void upgrade(
201 final ManagedAsyncClientConnection connection,
202 final HttpHost endpointHost,
203 final NamedEndpoint endpointName,
204 final Object attachment,
205 final HttpContext context,
206 final FutureCallback<ManagedAsyncClientConnection> callback) {
207 final String newProtocol = URIScheme.HTTP.same(endpointHost.getSchemeName()) ? URIScheme.HTTPS.id : endpointHost.getSchemeName();
208 final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(newProtocol) : null;
209 if (tlsStrategy != null) {
210 final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
211 if (LOG.isDebugEnabled()) {
212 LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(connection), tlsName);
213 }
214 tlsStrategy.upgrade(
215 connection,
216 tlsName,
217 attachment,
218 null,
219 new CallbackContribution<TransportSecurityLayer>(callback) {
220
221 @Override
222 public void completed(final TransportSecurityLayer transportSecurityLayer) {
223 if (callback != null) {
224 callback.completed(connection);
225 }
226 }
227
228 });
229 } else {
230 callback.failed(new UnsupportedSchemeException(newProtocol + " protocol is not supported"));
231 }
232 }
233
234 protected void onBeforeSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
235 }
236
237 protected void onAfterSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
238 }
239
240 protected void onBeforeTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
241 }
242
243 protected void onAfterTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
244 }
245
246 }