1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.SocketOption;
35 import java.net.UnknownHostException;
36 import java.nio.channels.CancelledKeyException;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.SelectionKey;
39 import java.nio.channels.SocketChannel;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentLinkedQueue;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.atomic.AtomicBoolean;
45
46 import org.apache.hc.core5.concurrent.FutureCallback;
47 import org.apache.hc.core5.function.Callback;
48 import org.apache.hc.core5.function.Decorator;
49 import org.apache.hc.core5.io.CloseMode;
50 import org.apache.hc.core5.io.Closer;
51 import org.apache.hc.core5.io.SocketSupport;
52 import org.apache.hc.core5.net.NamedEndpoint;
53 import org.apache.hc.core5.util.Args;
54 import org.apache.hc.core5.util.Timeout;
55
56 class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
57
58 private static final int MAX_CHANNEL_REQUESTS = 10000;
59
60 private final IOEventHandlerFactory eventHandlerFactory;
61 private final IOReactorConfig reactorConfig;
62 private final Decorator<IOSession> ioSessionDecorator;
63 private final IOSessionListener sessionListener;
64 private final Callback<IOSession> sessionShutdownCallback;
65 private final Queue<InternalDataChannel> closedSessions;
66 private final Queue<ChannelEntry> channelQueue;
67 private final Queue<IOSessionRequest> requestQueue;
68 private final AtomicBoolean shutdownInitiated;
69 private final long selectTimeoutMillis;
70 private volatile long lastTimeoutCheckMillis;
71
72 SingleCoreIOReactor(
73 final Callback<Exception> exceptionCallback,
74 final IOEventHandlerFactory eventHandlerFactory,
75 final IOReactorConfig reactorConfig,
76 final Decorator<IOSession> ioSessionDecorator,
77 final IOSessionListener sessionListener,
78 final Callback<IOSession> sessionShutdownCallback) {
79 super(exceptionCallback);
80 this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
81 this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
82 this.ioSessionDecorator = ioSessionDecorator;
83 this.sessionListener = sessionListener;
84 this.sessionShutdownCallback = sessionShutdownCallback;
85 this.shutdownInitiated = new AtomicBoolean();
86 this.closedSessions = new ConcurrentLinkedQueue<>();
87 this.channelQueue = new ConcurrentLinkedQueue<>();
88 this.requestQueue = new ConcurrentLinkedQueue<>();
89 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
90 }
91
92 void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
93 if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
94 throw new IOReactorShutdownException("I/O reactor has been shut down");
95 }
96 this.channelQueue.add(entry);
97 this.selector.wakeup();
98 }
99
100 @Override
101 void doTerminate() {
102 closePendingChannels();
103 closePendingConnectionRequests();
104 processClosedSessions();
105 }
106
107 @Override
108 void doExecute() throws IOException {
109 while (!Thread.currentThread().isInterrupted()) {
110
111 final int readyCount = this.selector.select(this.selectTimeoutMillis);
112
113 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
114 if (this.shutdownInitiated.compareAndSet(false, true)) {
115 initiateSessionShutdown();
116 }
117 closePendingChannels();
118 }
119 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
120 break;
121 }
122
123
124 if (readyCount > 0) {
125 processEvents(this.selector.selectedKeys());
126 }
127
128 validateActiveChannels();
129
130
131 processClosedSessions();
132
133
134 if (getStatus() == IOReactorStatus.ACTIVE) {
135 processPendingChannels();
136 processPendingConnectionRequests();
137 }
138
139
140 if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
141 break;
142 }
143 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
144 break;
145 }
146 }
147 }
148
149 private void initiateSessionShutdown() {
150 if (this.sessionShutdownCallback != null) {
151 final Set<SelectionKey> keys = this.selector.keys();
152 for (final SelectionKey key : keys) {
153 final InternalChannel channel = (InternalChannel) key.attachment();
154 if (channel instanceof InternalDataChannel) {
155 this.sessionShutdownCallback.execute((InternalDataChannel) channel);
156 }
157 }
158 }
159 }
160
161 private void validateActiveChannels() {
162 final long currentTimeMillis = System.currentTimeMillis();
163 if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
164 this.lastTimeoutCheckMillis = currentTimeMillis;
165 for (final SelectionKey key : this.selector.keys()) {
166 checkTimeout(key, currentTimeMillis);
167 }
168 }
169 }
170
171 private void processEvents(final Set<SelectionKey> selectedKeys) {
172 for (final SelectionKey key : selectedKeys) {
173 final InternalChannel channel = (InternalChannel) key.attachment();
174 if (channel != null) {
175 try {
176 channel.handleIOEvent(key.readyOps());
177 } catch (final CancelledKeyException ex) {
178 channel.close(CloseMode.GRACEFUL);
179 }
180 }
181 }
182 selectedKeys.clear();
183 }
184
185 private void processPendingChannels() throws IOException {
186 ChannelEntry entry;
187 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
188 final SocketChannel socketChannel = entry.channel;
189 final Object attachment = entry.attachment;
190 try {
191 prepareSocket(socketChannel);
192 socketChannel.configureBlocking(false);
193 } catch (final IOException ex) {
194 logException(ex);
195 try {
196 socketChannel.close();
197 } catch (final IOException ex2) {
198 logException(ex2);
199 }
200 throw ex;
201 }
202 final SelectionKey key;
203 try {
204 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
205 } catch (final ClosedChannelException ex) {
206 return;
207 }
208 final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
209 final InternalDataChannel dataChannel = new InternalDataChannel(
210 ioSession,
211 null,
212 ioSessionDecorator,
213 sessionListener,
214 closedSessions);
215 dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
216 dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
217 key.attach(dataChannel);
218 dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
219 }
220 }
221
222 private void processClosedSessions() {
223 for (;;) {
224 final InternalDataChannel dataChannel = this.closedSessions.poll();
225 if (dataChannel == null) {
226 break;
227 }
228 try {
229 dataChannel.disconnected();
230 } catch (final CancelledKeyException ex) {
231
232 }
233 }
234 }
235
236 private void checkTimeout(final SelectionKey key, final long nowMillis) {
237 final InternalChannel channel = (InternalChannel) key.attachment();
238 if (channel != null) {
239 channel.checkTimeout(nowMillis);
240 }
241 }
242
243 @Override
244 public Future<IOSession> connect(
245 final NamedEndpoint remoteEndpoint,
246 final SocketAddress remoteAddress,
247 final SocketAddress localAddress,
248 final Timeout timeout,
249 final Object attachment,
250 final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
251 Args.notNull(remoteEndpoint, "Remote endpoint");
252 final IOSessionRequest sessionRequest = new IOSessionRequest(
253 remoteEndpoint,
254 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
255 localAddress,
256 timeout,
257 attachment,
258 callback);
259
260 this.requestQueue.add(sessionRequest);
261 this.selector.wakeup();
262
263 return sessionRequest;
264 }
265
266 private void prepareSocket(final SocketChannel socketChannel) throws IOException {
267 final Socket socket = socketChannel.socket();
268 socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
269 socket.setKeepAlive(this.reactorConfig.isSoKeepAlive());
270 if (this.reactorConfig.getSndBufSize() > 0) {
271 socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
272 }
273 if (this.reactorConfig.getRcvBufSize() > 0) {
274 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
275 }
276 if (this.reactorConfig.getTrafficClass() > 0) {
277 socket.setTrafficClass(this.reactorConfig.getTrafficClass());
278 }
279 final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
280 if (linger >= 0) {
281 socket.setSoLinger(true, linger);
282 }
283 if (this.reactorConfig.getTcpKeepIdle() > 0) {
284 setExtendedSocketOption(socketChannel, SocketSupport.TCP_KEEPIDLE, this.reactorConfig.getTcpKeepIdle());
285 }
286 if (this.reactorConfig.getTcpKeepInterval() > 0) {
287 setExtendedSocketOption(socketChannel, SocketSupport.TCP_KEEPINTERVAL, this.reactorConfig.getTcpKeepInterval());
288 }
289 if (this.reactorConfig.getTcpKeepInterval() > 0) {
290 setExtendedSocketOption(socketChannel, SocketSupport.TCP_KEEPCOUNT, this.reactorConfig.getTcpKeepCount());
291 }
292 }
293
294
295
296
297 <T> void setExtendedSocketOption(final SocketChannel socketChannel,
298 final String optionName, final T value) throws IOException {
299 final SocketOption<T> socketOption = SocketSupport.getExtendedSocketOptionOrNull(optionName);
300 if (socketOption == null) {
301 throw new UnsupportedOperationException(optionName + " is not supported in the current jdk");
302 }
303 socketChannel.setOption(socketOption, value);
304 }
305
306 private void validateAddress(final SocketAddress address) throws UnknownHostException {
307 if (address instanceof InetSocketAddress) {
308 final InetSocketAddress endpoint = (InetSocketAddress) address;
309 if (endpoint.isUnresolved()) {
310 throw new UnknownHostException(endpoint.getHostName());
311 }
312 }
313 }
314
315 private void processPendingConnectionRequests() {
316 IOSessionRequest sessionRequest;
317 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
318 if (!sessionRequest.isCancelled()) {
319 final SocketChannel socketChannel;
320 try {
321 socketChannel = SocketChannel.open();
322 } catch (final IOException ex) {
323 sessionRequest.failed(ex);
324 return;
325 }
326 try {
327 processConnectionRequest(socketChannel, sessionRequest);
328 } catch (final IOException | RuntimeException ex) {
329 Closer.closeQuietly(socketChannel);
330 sessionRequest.failed(ex);
331 }
332 }
333 }
334 }
335
336 private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
337 socketChannel.configureBlocking(false);
338 prepareSocket(socketChannel);
339
340 validateAddress(sessionRequest.localAddress);
341 if (sessionRequest.localAddress != null) {
342 final Socket sock = socketChannel.socket();
343 sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
344 sock.bind(sessionRequest.localAddress);
345 }
346
347 final SocketAddress socksProxyAddress = reactorConfig.getSocksProxyAddress();
348 final SocketAddress remoteAddress = socksProxyAddress != null ? socksProxyAddress : sessionRequest.remoteAddress;
349
350
351
352 validateAddress(remoteAddress);
353 final boolean connected = socketChannel.connect(remoteAddress);
354 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
355 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
356 final InternalDataChannel dataChannel = new InternalDataChannel(
357 ioSession,
358 sessionRequest.remoteEndpoint,
359 ioSessionDecorator,
360 sessionListener,
361 closedSessions);
362 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
363 final InternalChannel connectChannel = new InternalConnectChannel(
364 key,
365 socketChannel,
366 sessionRequest,
367 dataChannel,
368 eventHandlerFactory,
369 reactorConfig);
370 if (connected) {
371 connectChannel.handleIOEvent(SelectionKey.OP_CONNECT);
372 } else {
373 key.attach(connectChannel);
374 sessionRequest.assign(connectChannel);
375 }
376 }
377
378 private void closePendingChannels() {
379 ChannelEntry entry;
380 while ((entry = this.channelQueue.poll()) != null) {
381 final SocketChannel socketChannel = entry.channel;
382 try {
383 socketChannel.close();
384 } catch (final IOException ex) {
385 logException(ex);
386 }
387 }
388 }
389
390 private void closePendingConnectionRequests() {
391 IOSessionRequest sessionRequest;
392 while ((sessionRequest = this.requestQueue.poll()) != null) {
393 sessionRequest.cancel();
394 }
395 }
396
397 }