View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.SocketOption;
35  import java.net.UnknownHostException;
36  import java.nio.channels.CancelledKeyException;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.SelectionKey;
39  import java.nio.channels.SocketChannel;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  import java.util.concurrent.Future;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  
46  import org.apache.hc.core5.concurrent.FutureCallback;
47  import org.apache.hc.core5.function.Callback;
48  import org.apache.hc.core5.function.Decorator;
49  import org.apache.hc.core5.io.CloseMode;
50  import org.apache.hc.core5.io.Closer;
51  import org.apache.hc.core5.io.SocketSupport;
52  import org.apache.hc.core5.net.NamedEndpoint;
53  import org.apache.hc.core5.util.Args;
54  import org.apache.hc.core5.util.Timeout;
55  
56  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
57  
58      private static final int MAX_CHANNEL_REQUESTS = 10000;
59  
60      private final IOEventHandlerFactory eventHandlerFactory;
61      private final IOReactorConfig reactorConfig;
62      private final Decorator<IOSession> ioSessionDecorator;
63      private final IOSessionListener sessionListener;
64      private final Callback<IOSession> sessionShutdownCallback;
65      private final Queue<InternalDataChannel> closedSessions;
66      private final Queue<ChannelEntry> channelQueue;
67      private final Queue<IOSessionRequest> requestQueue;
68      private final AtomicBoolean shutdownInitiated;
69      private final long selectTimeoutMillis;
70      private volatile long lastTimeoutCheckMillis;
71  
72      SingleCoreIOReactor(
73              final Callback<Exception> exceptionCallback,
74              final IOEventHandlerFactory eventHandlerFactory,
75              final IOReactorConfig reactorConfig,
76              final Decorator<IOSession> ioSessionDecorator,
77              final IOSessionListener sessionListener,
78              final Callback<IOSession> sessionShutdownCallback) {
79          super(exceptionCallback);
80          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
81          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
82          this.ioSessionDecorator = ioSessionDecorator;
83          this.sessionListener = sessionListener;
84          this.sessionShutdownCallback = sessionShutdownCallback;
85          this.shutdownInitiated = new AtomicBoolean();
86          this.closedSessions = new ConcurrentLinkedQueue<>();
87          this.channelQueue = new ConcurrentLinkedQueue<>();
88          this.requestQueue = new ConcurrentLinkedQueue<>();
89          this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
90      }
91  
92      void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
93          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
94              throw new IOReactorShutdownException("I/O reactor has been shut down");
95          }
96          this.channelQueue.add(entry);
97          this.selector.wakeup();
98      }
99  
100     @Override
101     void doTerminate() {
102         closePendingChannels();
103         closePendingConnectionRequests();
104         processClosedSessions();
105     }
106 
107     @Override
108     void doExecute() throws IOException {
109         while (!Thread.currentThread().isInterrupted()) {
110 
111             final int readyCount = this.selector.select(this.selectTimeoutMillis);
112 
113             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
114                 if (this.shutdownInitiated.compareAndSet(false, true)) {
115                     initiateSessionShutdown();
116                 }
117                 closePendingChannels();
118             }
119             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
120                 break;
121             }
122 
123             // Process selected I/O events
124             if (readyCount > 0) {
125                 processEvents(this.selector.selectedKeys());
126             }
127 
128             validateActiveChannels();
129 
130             // Process closed sessions
131             processClosedSessions();
132 
133             // If active process new channels
134             if (getStatus() == IOReactorStatus.ACTIVE) {
135                 processPendingChannels();
136                 processPendingConnectionRequests();
137             }
138 
139             // Exit select loop if graceful shutdown has been completed
140             if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
141                 break;
142             }
143             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
144                 break;
145             }
146         }
147     }
148 
149     private void initiateSessionShutdown() {
150         if (this.sessionShutdownCallback != null) {
151             final Set<SelectionKey> keys = this.selector.keys();
152             for (final SelectionKey key : keys) {
153                 final InternalChannel channel = (InternalChannel) key.attachment();
154                 if (channel instanceof InternalDataChannel) {
155                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
156                 }
157             }
158         }
159     }
160 
161     private void validateActiveChannels() {
162         final long currentTimeMillis = System.currentTimeMillis();
163         if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
164             this.lastTimeoutCheckMillis = currentTimeMillis;
165             for (final SelectionKey key : this.selector.keys()) {
166                 checkTimeout(key, currentTimeMillis);
167             }
168         }
169     }
170 
171     private void processEvents(final Set<SelectionKey> selectedKeys) {
172         for (final SelectionKey key : selectedKeys) {
173             final InternalChannel channel = (InternalChannel) key.attachment();
174             if (channel != null) {
175                 try {
176                     channel.handleIOEvent(key.readyOps());
177                 } catch (final CancelledKeyException ex) {
178                     channel.close(CloseMode.GRACEFUL);
179                 }
180             }
181         }
182         selectedKeys.clear();
183     }
184 
185     private void processPendingChannels() throws IOException {
186         ChannelEntry entry;
187         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
188             final SocketChannel socketChannel = entry.channel;
189             final Object attachment = entry.attachment;
190             try {
191                 prepareSocket(socketChannel);
192                 socketChannel.configureBlocking(false);
193             } catch (final IOException ex) {
194                 logException(ex);
195                 try {
196                     socketChannel.close();
197                 } catch (final IOException ex2) {
198                     logException(ex2);
199                 }
200                 throw ex;
201             }
202             final SelectionKey key;
203             try {
204                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
205             } catch (final ClosedChannelException ex) {
206                 return;
207             }
208             final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
209             final InternalDataChannel dataChannel = new InternalDataChannel(
210                     ioSession,
211                     null,
212                     ioSessionDecorator,
213                     sessionListener,
214                     closedSessions);
215             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
216             dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
217             key.attach(dataChannel);
218             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
219         }
220     }
221 
222     private void processClosedSessions() {
223         for (;;) {
224             final InternalDataChannel dataChannel = this.closedSessions.poll();
225             if (dataChannel == null) {
226                 break;
227             }
228             try {
229                 dataChannel.disconnected();
230             } catch (final CancelledKeyException ex) {
231                 // ignore and move on
232             }
233         }
234     }
235 
236     private void checkTimeout(final SelectionKey key, final long nowMillis) {
237         final InternalChannel channel = (InternalChannel) key.attachment();
238         if (channel != null) {
239             channel.checkTimeout(nowMillis);
240         }
241     }
242 
243     @Override
244     public Future<IOSession> connect(
245             final NamedEndpoint remoteEndpoint,
246             final SocketAddress remoteAddress,
247             final SocketAddress localAddress,
248             final Timeout timeout,
249             final Object attachment,
250             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
251         Args.notNull(remoteEndpoint, "Remote endpoint");
252         final IOSessionRequest sessionRequest = new IOSessionRequest(
253                 remoteEndpoint,
254                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
255                 localAddress,
256                 timeout,
257                 attachment,
258                 callback);
259 
260         this.requestQueue.add(sessionRequest);
261         this.selector.wakeup();
262 
263         return sessionRequest;
264     }
265 
266     private void prepareSocket(final SocketChannel socketChannel) throws IOException {
267         final Socket socket = socketChannel.socket();
268         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
269         socket.setKeepAlive(this.reactorConfig.isSoKeepAlive());
270         if (this.reactorConfig.getSndBufSize() > 0) {
271             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
272         }
273         if (this.reactorConfig.getRcvBufSize() > 0) {
274             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
275         }
276         if (this.reactorConfig.getTrafficClass() > 0) {
277             socket.setTrafficClass(this.reactorConfig.getTrafficClass());
278         }
279         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
280         if (linger >= 0) {
281             socket.setSoLinger(true, linger);
282         }
283         if (this.reactorConfig.getTcpKeepIdle() > 0) {
284             setExtendedSocketOption(socketChannel, SocketSupport.TCP_KEEPIDLE, this.reactorConfig.getTcpKeepIdle());
285         }
286         if (this.reactorConfig.getTcpKeepInterval() > 0) {
287             setExtendedSocketOption(socketChannel, SocketSupport.TCP_KEEPINTERVAL, this.reactorConfig.getTcpKeepInterval());
288         }
289         if (this.reactorConfig.getTcpKeepInterval() > 0) {
290             setExtendedSocketOption(socketChannel, SocketSupport.TCP_KEEPCOUNT, this.reactorConfig.getTcpKeepCount());
291         }
292     }
293 
294     /**
295      * @since 5.3
296      */
297     <T> void setExtendedSocketOption(final SocketChannel socketChannel,
298                                      final String optionName, final T value) throws IOException {
299         final SocketOption<T> socketOption = SocketSupport.getExtendedSocketOptionOrNull(optionName);
300         if (socketOption == null) {
301             throw new UnsupportedOperationException(optionName + " is not supported in the current jdk");
302         }
303         socketChannel.setOption(socketOption, value);
304     }
305 
306     private void validateAddress(final SocketAddress address) throws UnknownHostException {
307         if (address instanceof InetSocketAddress) {
308             final InetSocketAddress endpoint = (InetSocketAddress) address;
309             if (endpoint.isUnresolved()) {
310                 throw new UnknownHostException(endpoint.getHostName());
311             }
312         }
313     }
314 
315     private void processPendingConnectionRequests() {
316         IOSessionRequest sessionRequest;
317         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
318             if (!sessionRequest.isCancelled()) {
319                 final SocketChannel socketChannel;
320                 try {
321                     socketChannel = SocketChannel.open();
322                 } catch (final IOException ex) {
323                     sessionRequest.failed(ex);
324                     return;
325                 }
326                 try {
327                     processConnectionRequest(socketChannel, sessionRequest);
328                 } catch (final IOException | RuntimeException ex) {
329                     Closer.closeQuietly(socketChannel);
330                     sessionRequest.failed(ex);
331                 }
332             }
333         }
334     }
335 
336     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
337         socketChannel.configureBlocking(false);
338         prepareSocket(socketChannel);
339 
340         validateAddress(sessionRequest.localAddress);
341         if (sessionRequest.localAddress != null) {
342             final Socket sock = socketChannel.socket();
343             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
344             sock.bind(sessionRequest.localAddress);
345         }
346 
347         final SocketAddress socksProxyAddress = reactorConfig.getSocksProxyAddress();
348         final SocketAddress remoteAddress = socksProxyAddress != null ? socksProxyAddress : sessionRequest.remoteAddress;
349 
350         // Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect permissions
351         // only to this library
352         validateAddress(remoteAddress);
353         final boolean connected = socketChannel.connect(remoteAddress);
354         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
355         final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
356         final InternalDataChannel dataChannel = new InternalDataChannel(
357                 ioSession,
358                 sessionRequest.remoteEndpoint,
359                 ioSessionDecorator,
360                 sessionListener,
361                 closedSessions);
362         dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
363         final InternalChannel connectChannel = new InternalConnectChannel(
364                 key,
365                 socketChannel,
366                 sessionRequest,
367                 dataChannel,
368                 eventHandlerFactory,
369                 reactorConfig);
370         if (connected) {
371             connectChannel.handleIOEvent(SelectionKey.OP_CONNECT);
372         } else {
373             key.attach(connectChannel);
374             sessionRequest.assign(connectChannel);
375         }
376     }
377 
378     private void closePendingChannels() {
379         ChannelEntry entry;
380         while ((entry = this.channelQueue.poll()) != null) {
381             final SocketChannel socketChannel = entry.channel;
382             try {
383                 socketChannel.close();
384             } catch (final IOException ex) {
385                 logException(ex);
386             }
387         }
388     }
389 
390     private void closePendingConnectionRequests() {
391         IOSessionRequest sessionRequest;
392         while ((sessionRequest = this.requestQueue.poll()) != null) {
393             sessionRequest.cancel();
394         }
395     }
396 
397 }