View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.SocketChannel;
39  import java.util.Queue;
40  import java.util.Set;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Callback;
47  import org.apache.hc.core5.function.Decorator;
48  import org.apache.hc.core5.io.Closer;
49  import org.apache.hc.core5.net.NamedEndpoint;
50  import org.apache.hc.core5.util.Args;
51  import org.apache.hc.core5.util.Timeout;
52  
53  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
54  
55      private static final int MAX_CHANNEL_REQUESTS = 10000;
56  
57      private final IOEventHandlerFactory eventHandlerFactory;
58      private final IOReactorConfig reactorConfig;
59      private final Decorator<ProtocolIOSession> ioSessionDecorator;
60      private final IOSessionListener sessionListener;
61      private final Callback<IOSession> sessionShutdownCallback;
62      private final Queue<InternalDataChannel> closedSessions;
63      private final Queue<SocketChannel> channelQueue;
64      private final Queue<IOSessionRequest> requestQueue;
65      private final AtomicBoolean shutdownInitiated;
66      private final long selectTimeoutMillis;
67      private volatile long lastTimeoutCheckMillis;
68  
69      SingleCoreIOReactor(
70              final Callback<Exception> exceptionCallback,
71              final IOEventHandlerFactory eventHandlerFactory,
72              final IOReactorConfig reactorConfig,
73              final Decorator<ProtocolIOSession> ioSessionDecorator,
74              final IOSessionListener sessionListener,
75              final Callback<IOSession> sessionShutdownCallback) {
76          super(exceptionCallback);
77          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
78          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
79          this.ioSessionDecorator = ioSessionDecorator;
80          this.sessionListener = sessionListener;
81          this.sessionShutdownCallback = sessionShutdownCallback;
82          this.shutdownInitiated = new AtomicBoolean(false);
83          this.closedSessions = new ConcurrentLinkedQueue<>();
84          this.channelQueue = new ConcurrentLinkedQueue<>();
85          this.requestQueue = new ConcurrentLinkedQueue<>();
86          this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMillis();
87      }
88  
89      void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
90          Args.notNull(socketChannel, "SocketChannel");
91          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
92              throw new IOReactorShutdownException("I/O reactor has been shut down");
93          }
94          this.channelQueue.add(socketChannel);
95          this.selector.wakeup();
96      }
97  
98      @Override
99      void doTerminate() {
100         closePendingChannels();
101         closePendingConnectionRequests();
102         processClosedSessions();
103     }
104 
105     @Override
106     void doExecute() throws IOException {
107         while (!Thread.currentThread().isInterrupted()) {
108 
109             final int readyCount = this.selector.select(this.selectTimeoutMillis);
110 
111             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
112                 if (this.shutdownInitiated.compareAndSet(false, true)) {
113                     initiateSessionShutdown();
114                 }
115                 closePendingChannels();
116             }
117             if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
118                 break;
119             }
120 
121             // Process selected I/O events
122             if (readyCount > 0) {
123                 processEvents(this.selector.selectedKeys());
124             }
125 
126             validateActiveChannels();
127 
128             // Process closed sessions
129             processClosedSessions();
130 
131             // If active process new channels
132             if (getStatus().compareTo(IOReactorStatus.ACTIVE) == 0) {
133                 processPendingChannels();
134                 processPendingConnectionRequests();
135             }
136 
137             // Exit select loop if graceful shutdown has been completed
138             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) == 0 && this.selector.keys().isEmpty()) {
139                 break;
140             }
141             if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
142                 break;
143             }
144         }
145     }
146 
147     private void initiateSessionShutdown() {
148         if (this.sessionShutdownCallback != null) {
149             final Set<SelectionKey> keys = this.selector.keys();
150             for (final SelectionKey key : keys) {
151                 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
152                 if (channel instanceof InternalDataChannel) {
153                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
154                 }
155             }
156         }
157     }
158 
159     private void validateActiveChannels() {
160         final long currentTimeMillis = System.currentTimeMillis();
161         if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
162             this.lastTimeoutCheckMillis = currentTimeMillis;
163             for (final SelectionKey key : this.selector.keys()) {
164                 checkTimeout(key, currentTimeMillis);
165             }
166         }
167     }
168 
169     private void processEvents(final Set<SelectionKey> selectedKeys) {
170         for (final SelectionKey key : selectedKeys) {
171             final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
172             if (channel != null) {
173                 channel.handleIOEvent(key.readyOps());
174             }
175         }
176         selectedKeys.clear();
177     }
178 
179     private void processPendingChannels() throws IOException {
180         SocketChannel socketChannel;
181         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
182             try {
183                 prepareSocket(socketChannel.socket());
184                 socketChannel.configureBlocking(false);
185             } catch (final IOException ex) {
186                 logException(ex);
187                 try {
188                     socketChannel.close();
189                 } catch (final IOException ex2) {
190                     logException(ex2);
191                 }
192                 throw ex;
193             }
194             final SelectionKey key;
195             try {
196                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
197             } catch (final ClosedChannelException ex) {
198                 return;
199             }
200             final IOSession ioSession = new IOSessionImpl(key, socketChannel);
201             final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(ioSession, null, sessionListener, closedSessions);
202             final ProtocolIOSession protocolSession = ioSessionDecorator != null ? ioSessionDecorator.decorate(dataChannel) : dataChannel;
203             dataChannel.upgrade(this.eventHandlerFactory.createHandler(protocolSession, null));
204             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
205             key.attach(dataChannel);
206             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
207         }
208     }
209 
210     private void processClosedSessions() {
211         for (;;) {
212             final InternalDataChannel dataChannel = this.closedSessions.poll();
213             if (dataChannel == null) {
214                 break;
215             }
216             try {
217                 dataChannel.disconnected();
218             } catch (final CancelledKeyException ex) {
219                 // ignore and move on
220             }
221         }
222     }
223 
224     private void checkTimeout(final SelectionKey key, final long nowMillis) {
225         final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
226         if (channel != null) {
227             channel.checkTimeout(nowMillis);
228         }
229     }
230 
231     @Override
232     public Future<IOSession> connect(
233             final NamedEndpoint remoteEndpoint,
234             final SocketAddress remoteAddress,
235             final SocketAddress localAddress,
236             final Timeout timeout,
237             final Object attachment,
238             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
239         Args.notNull(remoteEndpoint, "Remote endpoint");
240         final IOSessionRequestt.html#IOSessionRequest">IOSessionRequest sessionRequest = new IOSessionRequest(
241                 remoteEndpoint,
242                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
243                 localAddress,
244                 timeout,
245                 attachment,
246                 callback);
247 
248         this.requestQueue.add(sessionRequest);
249         this.selector.wakeup();
250 
251         return sessionRequest;
252     }
253 
254     private void prepareSocket(final Socket socket) throws IOException {
255         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
256         socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
257         if (this.reactorConfig.getSndBufSize() > 0) {
258             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
259         }
260         if (this.reactorConfig.getRcvBufSize() > 0) {
261             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
262         }
263         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
264         if (linger >= 0) {
265             socket.setSoLinger(true, linger);
266         }
267     }
268 
269     private void validateAddress(final SocketAddress address) throws UnknownHostException {
270         if (address instanceof InetSocketAddress) {
271             final InetSocketAddress endpoint = (InetSocketAddress) address;
272             if (endpoint.isUnresolved()) {
273                 throw new UnknownHostException(endpoint.getHostName());
274             }
275         }
276     }
277 
278     private void processPendingConnectionRequests() {
279         IOSessionRequest sessionRequest;
280         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
281             if (!sessionRequest.isCancelled()) {
282                 final SocketChannel socketChannel;
283                 try {
284                     socketChannel = SocketChannel.open();
285                 } catch (final IOException ex) {
286                     sessionRequest.failed(ex);
287                     return;
288                 }
289                 try {
290                     processConnectionRequest(socketChannel, sessionRequest);
291                 } catch (final IOException | SecurityException ex) {
292                     Closer.closeQuietly(socketChannel);
293                     sessionRequest.failed(ex);
294                 }
295             }
296         }
297     }
298 
299     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
300         validateAddress(sessionRequest.localAddress);
301         validateAddress(sessionRequest.remoteAddress);
302 
303         socketChannel.configureBlocking(false);
304         prepareSocket(socketChannel.socket());
305 
306         if (sessionRequest.localAddress != null) {
307             final Socket sock = socketChannel.socket();
308             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
309             sock.bind(sessionRequest.localAddress);
310         }
311 
312         final SocketAddress targetAddress;
313         final IOEventHandlerFactory eventHandlerFactory;
314         if (this.reactorConfig.getSocksProxyAddress() != null) {
315             targetAddress = this.reactorConfig.getSocksProxyAddress();
316             eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
317                     sessionRequest.remoteAddress,
318                     this.reactorConfig.getSocksProxyUsername(),
319                     this.reactorConfig.getSocksProxyPassword(),
320                     this.eventHandlerFactory);
321         } else {
322             targetAddress = sessionRequest.remoteAddress;
323             eventHandlerFactory = this.eventHandlerFactory;
324         }
325         final boolean connected = socketChannel.connect(targetAddress);
326         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
327         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
328 
329             @Override
330             public InternalDataChannel create(
331                     final SelectionKey key,
332                     final SocketChannel socketChannel,
333                     final NamedEndpoint namedEndpoint,
334                     final Object attachment) {
335                 final IOSession ioSession = new IOSessionImpl(key, socketChannel);
336                 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(ioSession, namedEndpoint, sessionListener, closedSessions);
337                 final ProtocolIOSession protocolSession = ioSessionDecorator != null ? ioSessionDecorator.decorate(dataChannel) : dataChannel;
338                 dataChannel.upgrade(eventHandlerFactory.createHandler(protocolSession, attachment));
339                 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
340                 return dataChannel;
341             }
342 
343         });
344         if (connected) {
345             channel.handleIOEvent(SelectionKey.OP_CONNECT);
346         } else {
347             key.attach(channel);
348             sessionRequest.assign(channel);
349         }
350     }
351 
352     private void closePendingChannels() {
353         SocketChannel socketChannel;
354         while ((socketChannel = this.channelQueue.poll()) != null) {
355             try {
356                 socketChannel.close();
357             } catch (final IOException ex) {
358                 logException(ex);
359             }
360         }
361     }
362 
363     private void closePendingConnectionRequests() {
364         IOSessionRequest sessionRequest;
365         while ((sessionRequest = this.requestQueue.poll()) != null) {
366             sessionRequest.cancel();
367         }
368     }
369 
370 }