1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.util.List;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.hc.client5.http.ConnectExceptionSupport;
40 import org.apache.hc.client5.http.DnsResolver;
41 import org.apache.hc.client5.http.SystemDefaultDnsResolver;
42 import org.apache.hc.core5.concurrent.ComplexFuture;
43 import org.apache.hc.core5.concurrent.FutureCallback;
44 import org.apache.hc.core5.net.NamedEndpoint;
45 import org.apache.hc.core5.reactor.ConnectionInitiator;
46 import org.apache.hc.core5.reactor.IOSession;
47 import org.apache.hc.core5.util.Timeout;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 final class MultihomeIOSessionRequester {
52
53 private static final Logger LOG = LoggerFactory.getLogger(MultihomeIOSessionRequester.class);
54 private final DnsResolver dnsResolver;
55
56 MultihomeIOSessionRequester(final DnsResolver dnsResolver) {
57 this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
58 }
59
60 public Future<IOSession> connect(
61 final ConnectionInitiator connectionInitiator,
62 final NamedEndpoint remoteEndpoint,
63 final SocketAddress remoteAddress,
64 final SocketAddress localAddress,
65 final Timeout connectTimeout,
66 final Object attachment,
67 final FutureCallback<IOSession> callback) {
68
69 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
70 if (remoteAddress != null) {
71 if (LOG.isDebugEnabled()) {
72 LOG.debug("{}:{} connecting {} to {} ({})",
73 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout);
74 }
75 final Future<IOSession> sessionFuture = connectionInitiator.connect(remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, new FutureCallback<IOSession>() {
76 @Override
77 public void completed(final IOSession session) {
78 future.completed(session);
79 }
80
81 @Override
82 public void failed(final Exception cause) {
83 if (LOG.isDebugEnabled()) {
84 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
85 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
86 }
87 if (cause instanceof IOException) {
88 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint,
89 (remoteAddress instanceof InetSocketAddress) ?
90 new InetAddress[] { ((InetSocketAddress) remoteAddress).getAddress() } :
91 new InetAddress[] {}));
92 } else {
93 future.failed(cause);
94 }
95 }
96
97 @Override
98 public void cancelled() {
99 future.cancel();
100 }
101
102 });
103 future.setDependency(sessionFuture);
104 return future;
105 }
106
107 if (LOG.isDebugEnabled()) {
108 LOG.debug("{} resolving remote address", remoteEndpoint.getHostName());
109 }
110
111 final List<InetSocketAddress> remoteAddresses;
112 try {
113 remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName(), remoteEndpoint.getPort());
114 if (remoteAddresses == null || remoteAddresses.isEmpty()) {
115 throw new UnknownHostException(remoteEndpoint.getHostName());
116 }
117 } catch (final UnknownHostException ex) {
118 future.failed(ex);
119 return future;
120 }
121
122 if (LOG.isDebugEnabled()) {
123 LOG.debug("{} resolved to {}", remoteEndpoint.getHostName(), remoteAddresses);
124 }
125
126 final Runnable runnable = new Runnable() {
127
128 private final AtomicInteger attempt = new AtomicInteger(0);
129
130 void executeNext() {
131 final int index = attempt.getAndIncrement();
132 final InetSocketAddress remoteAddress = remoteAddresses.get(index);
133
134 if (LOG.isDebugEnabled()) {
135 LOG.debug("{}:{} connecting {}->{} ({})",
136 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, connectTimeout);
137 }
138
139 final Future<IOSession> sessionFuture = connectionInitiator.connect(
140 remoteEndpoint,
141 remoteAddress,
142 localAddress,
143 connectTimeout,
144 attachment,
145 new FutureCallback<IOSession>() {
146
147 @Override
148 public void completed(final IOSession session) {
149 if (LOG.isDebugEnabled()) {
150 LOG.debug("{}:{} connected {}->{} as {}",
151 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), localAddress, remoteAddress, session.getId());
152 }
153 future.completed(session);
154 }
155
156 @Override
157 public void failed(final Exception cause) {
158 if (attempt.get() >= remoteAddresses.size()) {
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("{}:{} connection to {} failed ({}); terminating operation",
161 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
162 }
163 if (cause instanceof IOException) {
164 final InetAddress[] addresses = remoteAddresses.stream()
165 .filter(addr -> addr instanceof InetSocketAddress)
166 .map(addr -> ((InetSocketAddress) addr).getAddress())
167 .toArray(InetAddress[]::new);
168 future.failed(ConnectExceptionSupport.enhance((IOException) cause, remoteEndpoint, addresses));
169 } else {
170 future.failed(cause);
171 }
172 } else {
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("{}:{} connection to {} failed ({}); retrying connection to the next address",
175 remoteEndpoint.getHostName(), remoteEndpoint.getPort(), remoteAddress, cause.getClass());
176 }
177 executeNext();
178 }
179 }
180
181 @Override
182 public void cancelled() {
183 future.cancel();
184 }
185
186 });
187 future.setDependency(sessionFuture);
188 }
189
190 @Override
191 public void run() {
192 executeNext();
193 }
194
195 };
196 runnable.run();
197 return future;
198 }
199
200 public Future<IOSession> connect(
201 final ConnectionInitiator connectionInitiator,
202 final NamedEndpoint remoteEndpoint,
203 final SocketAddress localAddress,
204 final Timeout connectTimeout,
205 final Object attachment,
206 final FutureCallback<IOSession> callback) {
207 return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback);
208 }
209
210 }