View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.SocketChannel;
39  import java.util.Queue;
40  import java.util.Set;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Callback;
47  import org.apache.hc.core5.function.Decorator;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.net.NamedEndpoint;
50  import org.apache.hc.core5.util.Args;
51  import org.apache.hc.core5.io.Closer;
52  import org.apache.hc.core5.util.TimeValue;
53  
54  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
55  
56      private static final int MAX_CHANNEL_REQUESTS = 10000;
57  
58      private final IOEventHandlerFactory eventHandlerFactory;
59      private final IOReactorConfig reactorConfig;
60      private final Decorator<IOSession> ioSessionDecorator;
61      private final IOSessionListener sessionListener;
62      private final Callback<IOSession> sessionShutdownCallback;
63      private final Queue<InternalDataChannel> closedSessions;
64      private final Queue<SocketChannel> channelQueue;
65      private final Queue<IOSessionRequest> requestQueue;
66      private final AtomicBoolean shutdownInitiated;
67  
68      private volatile long lastTimeoutCheckMillis;
69  
70      SingleCoreIOReactor(
71              final Queue<ExceptionEvent> auditLog,
72              final IOEventHandlerFactory eventHandlerFactory,
73              final IOReactorConfig reactorConfig,
74              final Decorator<IOSession> ioSessionDecorator,
75              final IOSessionListener sessionListener,
76              final Callback<IOSession> sessionShutdownCallback) {
77          super(auditLog);
78          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
79          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
80          this.ioSessionDecorator = ioSessionDecorator;
81          this.sessionListener = sessionListener;
82          this.sessionShutdownCallback = sessionShutdownCallback;
83          this.shutdownInitiated = new AtomicBoolean(false);
84          this.closedSessions = new ConcurrentLinkedQueue<>();
85          this.channelQueue = new ConcurrentLinkedQueue<>();
86          this.requestQueue = new ConcurrentLinkedQueue<>();
87      }
88  
89      void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
90          Args.notNull(socketChannel, "SocketChannel");
91          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
92              throw new IOReactorShutdownException("I/O reactor has been shut down");
93          }
94          this.channelQueue.add(socketChannel);
95          this.selector.wakeup();
96      }
97  
98      @Override
99      void doTerminate() {
100         closePendingChannels();
101         closePendingConnectionRequests();
102         processClosedSessions();
103     }
104 
105     @Override
106     void doExecute() throws IOException {
107         final long selectTimeoutMillis = this.reactorConfig.getSelectIntervalMillis();
108         while (!Thread.currentThread().isInterrupted()) {
109 
110             final int readyCount = this.selector.select(selectTimeoutMillis);
111 
112             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
113                 if (this.shutdownInitiated.compareAndSet(false, true)) {
114                     initiateSessionShutdown();
115                 }
116                 closePendingChannels();
117             }
118             if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
119                 break;
120             }
121 
122             // Process selected I/O events
123             if (readyCount > 0) {
124                 processEvents(this.selector.selectedKeys());
125             }
126 
127             validateActiveChannels();
128 
129             // Process closed sessions
130             processClosedSessions();
131 
132             // If active process new channels
133             if (getStatus().compareTo(IOReactorStatus.ACTIVE) == 0) {
134                 processPendingChannels();
135                 processPendingConnectionRequests();
136             }
137 
138             // Exit select loop if graceful shutdown has been completed
139             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) == 0 && this.selector.keys().isEmpty()) {
140                 break;
141             }
142             if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
143                 break;
144             }
145         }
146     }
147 
148     private void initiateSessionShutdown() {
149         if (this.sessionShutdownCallback != null) {
150             final Set<SelectionKey> keys = this.selector.keys();
151             for (final SelectionKey key : keys) {
152                 final InternalChannel channel = (InternalChannel) key.attachment();
153                 if (channel instanceof InternalDataChannel) {
154                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
155                 }
156             }
157         }
158     }
159 
160     private void validateActiveChannels() {
161         final long currentTimeMillis = System.currentTimeMillis();
162         if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.reactorConfig.getSelectIntervalMillis()) {
163             this.lastTimeoutCheckMillis = currentTimeMillis;
164             for (final SelectionKey key : this.selector.keys()) {
165                 checkTimeout(key, currentTimeMillis);
166             }
167         }
168     }
169 
170     private void processEvents(final Set<SelectionKey> selectedKeys) {
171         for (final SelectionKey key : selectedKeys) {
172             final InternalChannel channel = (InternalChannel) key.attachment();
173             try {
174                 channel.handleIOEvent(key.readyOps());
175             } catch (final CancelledKeyException ex) {
176                 channel.close(CloseMode.GRACEFUL);
177             }
178         }
179         selectedKeys.clear();
180     }
181 
182     private void processPendingChannels() throws IOException {
183         SocketChannel socketChannel;
184         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
185             try {
186                 prepareSocket(socketChannel.socket());
187                 socketChannel.configureBlocking(false);
188             } catch (final IOException ex) {
189                 addExceptionEvent(ex);
190                 try {
191                     socketChannel.close();
192                 } catch (final IOException ex2) {
193                     addExceptionEvent(ex2);
194                 }
195                 throw ex;
196             }
197             final SelectionKey key;
198             try {
199                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
200             } catch (final ClosedChannelException ex) {
201                 return;
202             }
203             IOSession ioSession = new IOSessionImpl(key, socketChannel);
204             if (ioSessionDecorator != null) {
205                 ioSession = ioSessionDecorator.decorate(ioSession);
206             }
207             final InternalDataChannel dataChannel = new InternalDataChannel(ioSession, null, sessionListener, closedSessions);
208             dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, null));
209             dataChannel.setSocketTimeoutMillis(this.reactorConfig.getSoTimeout().toMillisIntBound());
210             key.attach(dataChannel);
211             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
212         }
213     }
214 
215     private void processClosedSessions() {
216         for (;;) {
217             final InternalDataChannel dataChannel = this.closedSessions.poll();
218             if (dataChannel == null) {
219                 break;
220             }
221             try {
222                 dataChannel.disconnected();
223             } catch (final CancelledKeyException ex) {
224                 // ignore and move on
225             }
226         }
227     }
228 
229     private void checkTimeout(final SelectionKey key, final long nowMillis) {
230         final InternalChannel channel = (InternalChannel) key.attachment();
231         if (channel != null) {
232             channel.checkTimeout(nowMillis);
233         }
234     }
235 
236     @Override
237     public Future<IOSession> connect(
238             final NamedEndpoint remoteEndpoint,
239             final SocketAddress remoteAddress,
240             final SocketAddress localAddress,
241             final TimeValue timeout,
242             final Object attachment,
243             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
244         Args.notNull(remoteEndpoint, "Remote endpoint");
245         final IOSessionRequest sessionRequest = new IOSessionRequest(
246                 remoteEndpoint,
247                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
248                 localAddress,
249                 timeout,
250                 attachment,
251                 callback);
252 
253         this.requestQueue.add(sessionRequest);
254         this.selector.wakeup();
255 
256         return sessionRequest;
257     }
258 
259     private void prepareSocket(final Socket socket) throws IOException {
260         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
261         socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
262         if (this.reactorConfig.getSndBufSize() > 0) {
263             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
264         }
265         if (this.reactorConfig.getRcvBufSize() > 0) {
266             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
267         }
268         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
269         if (linger >= 0) {
270             socket.setSoLinger(true, linger);
271         }
272     }
273 
274     private void validateAddress(final SocketAddress address) throws UnknownHostException {
275         if (address instanceof InetSocketAddress) {
276             final InetSocketAddress endpoint = (InetSocketAddress) address;
277             if (endpoint.isUnresolved()) {
278                 throw new UnknownHostException(endpoint.getHostName());
279             }
280         }
281     }
282 
283     private void processPendingConnectionRequests() {
284         IOSessionRequest sessionRequest;
285         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
286             if (!sessionRequest.isCancelled()) {
287                 final SocketChannel socketChannel;
288                 try {
289                     socketChannel = SocketChannel.open();
290                 } catch (final IOException ex) {
291                     sessionRequest.failed(ex);
292                     return;
293                 }
294                 try {
295                     processConnectionRequest(socketChannel, sessionRequest);
296                 } catch (final IOException | SecurityException ex) {
297                     Closer.closeQuietly(socketChannel);
298                     sessionRequest.failed(ex);
299                 }
300             }
301         }
302     }
303 
304     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
305         validateAddress(sessionRequest.localAddress);
306         validateAddress(sessionRequest.remoteAddress);
307 
308         socketChannel.configureBlocking(false);
309         prepareSocket(socketChannel.socket());
310 
311         if (sessionRequest.localAddress != null) {
312             final Socket sock = socketChannel.socket();
313             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
314             sock.bind(sessionRequest.localAddress);
315         }
316         final boolean connected = socketChannel.connect(sessionRequest.remoteAddress);
317         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
318         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
319 
320             @Override
321             public InternalDataChannel create(
322                     final SelectionKey key,
323                     final SocketChannel socketChannel,
324                     final NamedEndpoint namedEndpoint,
325                     final Object attachment) {
326                 IOSession ioSession = new IOSessionImpl(key, socketChannel);
327                 if (ioSessionDecorator != null) {
328                     ioSession = ioSessionDecorator.decorate(ioSession);
329                 }
330                 final InternalDataChannel dataChannel = new InternalDataChannel(ioSession, namedEndpoint, sessionListener, closedSessions);
331                 dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
332                 dataChannel.setSocketTimeoutMillis(reactorConfig.getSoTimeout().toMillisIntBound());
333                 return dataChannel;
334             }
335 
336         });
337         if (connected) {
338             channel.handleIOEvent(SelectionKey.OP_CONNECT);
339         } else {
340             key.attach(channel);
341             sessionRequest.assign(channel);
342         }
343     }
344 
345     private void closePendingChannels() {
346         SocketChannel socketChannel;
347         while ((socketChannel = this.channelQueue.poll()) != null) {
348             try {
349                 socketChannel.close();
350             } catch (final IOException ex) {
351                 addExceptionEvent(ex);
352             }
353         }
354     }
355 
356     private void closePendingConnectionRequests() {
357         IOSessionRequest sessionRequest;
358         while ((sessionRequest = this.requestQueue.poll()) != null) {
359             sessionRequest.cancel();
360         }
361     }
362 
363 }