View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.SocketChannel;
39  import java.util.Queue;
40  import java.util.Set;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Callback;
47  import org.apache.hc.core5.function.Decorator;
48  import org.apache.hc.core5.io.ShutdownType;
49  import org.apache.hc.core5.net.NamedEndpoint;
50  import org.apache.hc.core5.util.Args;
51  import org.apache.hc.core5.util.TimeValue;
52  
53  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
54  
55      private static final int MAX_CHANNEL_REQUESTS = 10000;
56  
57      private final IOEventHandlerFactory eventHandlerFactory;
58      private final IOReactorConfig reactorConfig;
59      private final Decorator<IOSession> ioSessionDecorator;
60      private final IOSessionListener sessionListener;
61      private final Callback<IOSession> sessionShutdownCallback;
62      private final Queue<InternalDataChannel> closedSessions;
63      private final Queue<SocketChannel> channelQueue;
64      private final Queue<IOSessionRequest> requestQueue;
65      private final AtomicBoolean shutdownInitiated;
66  
67      private volatile long lastTimeoutCheck;
68  
69      SingleCoreIOReactor(
70              final Queue<ExceptionEvent> auditLog,
71              final IOEventHandlerFactory eventHandlerFactory,
72              final IOReactorConfig reactorConfig,
73              final Decorator<IOSession> ioSessionDecorator,
74              final IOSessionListener sessionListener,
75              final Callback<IOSession> sessionShutdownCallback) {
76          super(auditLog);
77          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
78          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
79          this.ioSessionDecorator = ioSessionDecorator;
80          this.sessionListener = sessionListener;
81          this.sessionShutdownCallback = sessionShutdownCallback;
82          this.shutdownInitiated = new AtomicBoolean(false);
83          this.closedSessions = new ConcurrentLinkedQueue<>();
84          this.channelQueue = new ConcurrentLinkedQueue<>();
85          this.requestQueue = new ConcurrentLinkedQueue<>();
86      }
87  
88      void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
89          Args.notNull(socketChannel, "SocketChannel");
90          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
91              throw new IOReactorShutdownException("I/O reactor has been shut down");
92          }
93          this.channelQueue.add(socketChannel);
94          this.selector.wakeup();
95      }
96  
97      @Override
98      void doTerminate() {
99          closePendingChannels();
100         closePendingConnectionRequests();
101         processClosedSessions();
102     }
103 
104     @Override
105     void doExecute() throws IOException {
106         final long selectTimeout = this.reactorConfig.getSelectInterval();
107         while (!Thread.currentThread().isInterrupted()) {
108 
109             final int readyCount = this.selector.select(selectTimeout);
110 
111             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
112                 if (this.shutdownInitiated.compareAndSet(false, true)) {
113                     initiateSessionShutdown();
114                 }
115                 closePendingChannels();
116             }
117             if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
118                 break;
119             }
120 
121             // Process selected I/O events
122             if (readyCount > 0) {
123                 processEvents(this.selector.selectedKeys());
124             }
125 
126             validateActiveChannels();
127 
128             // Process closed sessions
129             processClosedSessions();
130 
131             // If active process new channels
132             if (getStatus().compareTo(IOReactorStatus.ACTIVE) == 0) {
133                 processPendingChannels();
134                 processPendingConnectionRequests();
135             }
136 
137             // Exit select loop if graceful shutdown has been completed
138             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) == 0 && this.selector.keys().isEmpty()) {
139                 break;
140             }
141             if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
142                 break;
143             }
144         }
145     }
146 
147     private void initiateSessionShutdown() {
148         if (this.sessionShutdownCallback != null) {
149             final Set<SelectionKey> keys = this.selector.keys();
150             for (final SelectionKey key : keys) {
151                 final InternalChannel channel = (InternalChannel) key.attachment();
152                 if (channel instanceof InternalDataChannel) {
153                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
154                 }
155             }
156         }
157     }
158 
159     private void validateActiveChannels() {
160         final long currentTime = System.currentTimeMillis();
161         if( (currentTime - this.lastTimeoutCheck) >= this.reactorConfig.getSelectInterval()) {
162             this.lastTimeoutCheck = currentTime;
163             for (final SelectionKey key : this.selector.keys()) {
164                 timeoutCheck(key, currentTime);
165             }
166         }
167     }
168 
169     private void processEvents(final Set<SelectionKey> selectedKeys) {
170         for (final SelectionKey key : selectedKeys) {
171             final InternalChannel channel = (InternalChannel) key.attachment();
172             try {
173                 channel.handleIOEvent(key.readyOps());
174             } catch (final CancelledKeyException ex) {
175                 channel.shutdown(ShutdownType.GRACEFUL);
176             }
177         }
178         selectedKeys.clear();
179     }
180 
181     private void processPendingChannels() throws IOException {
182         SocketChannel socketChannel;
183         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
184             try {
185                 prepareSocket(socketChannel.socket());
186                 socketChannel.configureBlocking(false);
187             } catch (final IOException ex) {
188                 addExceptionEvent(ex);
189                 try {
190                     socketChannel.close();
191                 } catch (final IOException ex2) {
192                     addExceptionEvent(ex2);
193                 }
194                 throw ex;
195             }
196             final SelectionKey key;
197             try {
198                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
199             } catch (final ClosedChannelException ex) {
200                 return;
201             }
202             IOSession ioSession = new IOSessionImpl(key, socketChannel);
203             if (ioSessionDecorator != null) {
204                 ioSession = ioSessionDecorator.decorate(ioSession);
205             }
206             final InternalDataChannel dataChannel = new InternalDataChannel(ioSession, null, sessionListener, closedSessions);
207             dataChannel.setHandler(this.eventHandlerFactory.createHandler(dataChannel, null));
208             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout().toMillisIntBound());
209             key.attach(dataChannel);
210             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
211         }
212     }
213 
214     private void processClosedSessions() {
215         for (;;) {
216             final InternalDataChannel dataChannel = this.closedSessions.poll();
217             if (dataChannel == null) {
218                 break;
219             }
220             try {
221                 dataChannel.disconnected();
222             } catch (final CancelledKeyException ex) {
223                 // ignore and move on
224             }
225         }
226     }
227 
228     private void timeoutCheck(final SelectionKey key, final long now) {
229         final InternalChannel channel = (InternalChannel) key.attachment();
230         if (channel != null) {
231             channel.checkTimeout(now);
232         }
233     }
234 
235     @Override
236     public Future<IOSession> connect(
237             final NamedEndpoint remoteEndpoint,
238             final SocketAddress remoteAddress,
239             final SocketAddress localAddress,
240             final TimeValue timeout,
241             final Object attachment,
242             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
243         Args.notNull(remoteEndpoint, "Remote endpoint");
244         final IOSessionRequest sessionRequest = new IOSessionRequest(
245                 remoteEndpoint,
246                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
247                 localAddress,
248                 timeout,
249                 attachment,
250                 callback);
251 
252         this.requestQueue.add(sessionRequest);
253         this.selector.wakeup();
254 
255         return sessionRequest;
256     }
257 
258     private void prepareSocket(final Socket socket) throws IOException {
259         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
260         socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
261         if (this.reactorConfig.getSndBufSize() > 0) {
262             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
263         }
264         if (this.reactorConfig.getRcvBufSize() > 0) {
265             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
266         }
267         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
268         if (linger >= 0) {
269             socket.setSoLinger(true, linger);
270         }
271     }
272 
273     private void validateAddress(final SocketAddress address) throws UnknownHostException {
274         if (address instanceof InetSocketAddress) {
275             final InetSocketAddress endpoint = (InetSocketAddress) address;
276             if (endpoint.isUnresolved()) {
277                 throw new UnknownHostException(endpoint.getHostName());
278             }
279         }
280     }
281 
282     private void processPendingConnectionRequests() {
283         IOSessionRequest sessionRequest;
284         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
285             if (!sessionRequest.isCancelled()) {
286                 final SocketChannel socketChannel;
287                 try {
288                     socketChannel = SocketChannel.open();
289                 } catch (final IOException ex) {
290                     sessionRequest.failed(ex);
291                     return;
292                 }
293                 try {
294                     processConnectionRequest(socketChannel, sessionRequest);
295                 } catch (final IOException ex) {
296                     try {
297                         socketChannel.close();
298                     } catch (final IOException ignore) {
299                     }
300                     sessionRequest.failed(ex);
301                 }
302             }
303         }
304     }
305 
306     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
307         validateAddress(sessionRequest.localAddress);
308         validateAddress(sessionRequest.remoteAddress);
309 
310         socketChannel.configureBlocking(false);
311         prepareSocket(socketChannel.socket());
312 
313         if (sessionRequest.localAddress != null) {
314             final Socket sock = socketChannel.socket();
315             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
316             sock.bind(sessionRequest.localAddress);
317         }
318         final boolean connected = socketChannel.connect(sessionRequest.remoteAddress);
319         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
320         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
321 
322             @Override
323             public InternalDataChannel create(
324                     final SelectionKey key,
325                     final SocketChannel socketChannel,
326                     final NamedEndpoint namedEndpoint,
327                     final Object attachment) {
328                 IOSession ioSession = new IOSessionImpl(key, socketChannel);
329                 if (ioSessionDecorator != null) {
330                     ioSession = ioSessionDecorator.decorate(ioSession);
331                 }
332                 final InternalDataChannel dataChannel = new InternalDataChannel(ioSession, namedEndpoint, sessionListener, closedSessions);
333                 dataChannel.setHandler(eventHandlerFactory.createHandler(dataChannel, attachment));
334                 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout().toMillisIntBound());
335                 return dataChannel;
336             }
337 
338         });
339         if (connected) {
340             channel.handleIOEvent(SelectionKey.OP_CONNECT);
341         } else {
342             key.attach(channel);
343             sessionRequest.assign(channel);
344         }
345     }
346 
347     private void closePendingChannels() {
348         SocketChannel socketChannel;
349         while ((socketChannel = this.channelQueue.poll()) != null) {
350             try {
351                 socketChannel.close();
352             } catch (final IOException ex) {
353                 addExceptionEvent(ex);
354             }
355         }
356     }
357 
358     private void closePendingConnectionRequests() {
359         IOSessionRequest sessionRequest;
360         while ((sessionRequest = this.requestQueue.poll()) != null) {
361             sessionRequest.cancel();
362         }
363     }
364 
365 }