View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.SocketChannel;
39  import java.security.AccessController;
40  import java.security.PrivilegedActionException;
41  import java.security.PrivilegedExceptionAction;
42  import java.util.Queue;
43  import java.util.Set;
44  import java.util.concurrent.ConcurrentLinkedQueue;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  
48  import org.apache.hc.core5.concurrent.FutureCallback;
49  import org.apache.hc.core5.function.Callback;
50  import org.apache.hc.core5.function.Decorator;
51  import org.apache.hc.core5.io.CloseMode;
52  import org.apache.hc.core5.io.Closer;
53  import org.apache.hc.core5.net.NamedEndpoint;
54  import org.apache.hc.core5.util.Args;
55  import org.apache.hc.core5.util.Asserts;
56  import org.apache.hc.core5.util.Timeout;
57  
58  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
59  
60      private static final int MAX_CHANNEL_REQUESTS = 10000;
61  
62      private final IOEventHandlerFactory eventHandlerFactory;
63      private final IOReactorConfig reactorConfig;
64      private final Decorator<IOSession> ioSessionDecorator;
65      private final IOSessionListener sessionListener;
66      private final Callback<IOSession> sessionShutdownCallback;
67      private final Queue<InternalDataChannel> closedSessions;
68      private final Queue<ChannelEntry> channelQueue;
69      private final Queue<IOSessionRequest> requestQueue;
70      private final AtomicBoolean shutdownInitiated;
71      private final long selectTimeoutMillis;
72      private volatile long lastTimeoutCheckMillis;
73  
74      SingleCoreIOReactor(
75              final Callback<Exception> exceptionCallback,
76              final IOEventHandlerFactory eventHandlerFactory,
77              final IOReactorConfig reactorConfig,
78              final Decorator<IOSession> ioSessionDecorator,
79              final IOSessionListener sessionListener,
80              final Callback<IOSession> sessionShutdownCallback) {
81          super(exceptionCallback);
82          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
83          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
84          this.ioSessionDecorator = ioSessionDecorator;
85          this.sessionListener = sessionListener;
86          this.sessionShutdownCallback = sessionShutdownCallback;
87          this.shutdownInitiated = new AtomicBoolean(false);
88          this.closedSessions = new ConcurrentLinkedQueue<>();
89          this.channelQueue = new ConcurrentLinkedQueue<>();
90          this.requestQueue = new ConcurrentLinkedQueue<>();
91          this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
92      }
93  
94      void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
95          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
96              throw new IOReactorShutdownException("I/O reactor has been shut down");
97          }
98          this.channelQueue.add(entry);
99          this.selector.wakeup();
100     }
101 
102     @Override
103     void doTerminate() {
104         closePendingChannels();
105         closePendingConnectionRequests();
106         processClosedSessions();
107     }
108 
109     @Override
110     void doExecute() throws IOException {
111         while (!Thread.currentThread().isInterrupted()) {
112 
113             final int readyCount = this.selector.select(this.selectTimeoutMillis);
114 
115             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
116                 if (this.shutdownInitiated.compareAndSet(false, true)) {
117                     initiateSessionShutdown();
118                 }
119                 closePendingChannels();
120             }
121             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
122                 break;
123             }
124 
125             // Process selected I/O events
126             if (readyCount > 0) {
127                 processEvents(this.selector.selectedKeys());
128             }
129 
130             validateActiveChannels();
131 
132             // Process closed sessions
133             processClosedSessions();
134 
135             // If active process new channels
136             if (getStatus() == IOReactorStatus.ACTIVE) {
137                 processPendingChannels();
138                 processPendingConnectionRequests();
139             }
140 
141             // Exit select loop if graceful shutdown has been completed
142             if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
143                 break;
144             }
145             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
146                 break;
147             }
148         }
149     }
150 
151     private void initiateSessionShutdown() {
152         if (this.sessionShutdownCallback != null) {
153             final Set<SelectionKey> keys = this.selector.keys();
154             for (final SelectionKey key : keys) {
155                 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
156                 if (channel instanceof InternalDataChannel) {
157                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
158                 }
159             }
160         }
161     }
162 
163     private void validateActiveChannels() {
164         final long currentTimeMillis = System.currentTimeMillis();
165         if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
166             this.lastTimeoutCheckMillis = currentTimeMillis;
167             for (final SelectionKey key : this.selector.keys()) {
168                 checkTimeout(key, currentTimeMillis);
169             }
170         }
171     }
172 
173     private void processEvents(final Set<SelectionKey> selectedKeys) {
174         for (final SelectionKey key : selectedKeys) {
175             final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
176             if (channel != null) {
177                 try {
178                     channel.handleIOEvent(key.readyOps());
179                 } catch (final CancelledKeyException ex) {
180                     channel.close(CloseMode.GRACEFUL);
181                 }
182             }
183         }
184         selectedKeys.clear();
185     }
186 
187     private void processPendingChannels() throws IOException {
188         ChannelEntry entry;
189         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
190             final SocketChannel socketChannel = entry.channel;
191             final Object attachment = entry.attachment;
192             try {
193                 prepareSocket(socketChannel.socket());
194                 socketChannel.configureBlocking(false);
195             } catch (final IOException ex) {
196                 logException(ex);
197                 try {
198                     socketChannel.close();
199                 } catch (final IOException ex2) {
200                     logException(ex2);
201                 }
202                 throw ex;
203             }
204             final SelectionKey key;
205             try {
206                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
207             } catch (final ClosedChannelException ex) {
208                 return;
209             }
210             final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
211             final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
212                     ioSession,
213                     null,
214                     ioSessionDecorator,
215                     sessionListener,
216                     closedSessions);
217             dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
218             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
219             key.attach(dataChannel);
220             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
221         }
222     }
223 
224     private void processClosedSessions() {
225         for (;;) {
226             final InternalDataChannel dataChannel = this.closedSessions.poll();
227             if (dataChannel == null) {
228                 break;
229             }
230             try {
231                 dataChannel.disconnected();
232             } catch (final CancelledKeyException ex) {
233                 // ignore and move on
234             }
235         }
236     }
237 
238     private void checkTimeout(final SelectionKey key, final long nowMillis) {
239         final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
240         if (channel != null) {
241             channel.checkTimeout(nowMillis);
242         }
243     }
244 
245     @Override
246     public Future<IOSession> connect(
247             final NamedEndpoint remoteEndpoint,
248             final SocketAddress remoteAddress,
249             final SocketAddress localAddress,
250             final Timeout timeout,
251             final Object attachment,
252             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
253         Args.notNull(remoteEndpoint, "Remote endpoint");
254         final IOSessionRequestt.html#IOSessionRequest">IOSessionRequest sessionRequest = new IOSessionRequest(
255                 remoteEndpoint,
256                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
257                 localAddress,
258                 timeout,
259                 attachment,
260                 callback);
261 
262         this.requestQueue.add(sessionRequest);
263         this.selector.wakeup();
264 
265         return sessionRequest;
266     }
267 
268     private void prepareSocket(final Socket socket) throws IOException {
269         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
270         socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
271         if (this.reactorConfig.getSndBufSize() > 0) {
272             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
273         }
274         if (this.reactorConfig.getRcvBufSize() > 0) {
275             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
276         }
277         if (this.reactorConfig.getTrafficClass() > 0) {
278             socket.setTrafficClass(this.reactorConfig.getTrafficClass());
279         }
280         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
281         if (linger >= 0) {
282             socket.setSoLinger(true, linger);
283         }
284     }
285 
286     private void validateAddress(final SocketAddress address) throws UnknownHostException {
287         if (address instanceof InetSocketAddress) {
288             final InetSocketAddress endpoint = (InetSocketAddress) address;
289             if (endpoint.isUnresolved()) {
290                 throw new UnknownHostException(endpoint.getHostName());
291             }
292         }
293     }
294 
295     private void processPendingConnectionRequests() {
296         IOSessionRequest sessionRequest;
297         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
298             if (!sessionRequest.isCancelled()) {
299                 final SocketChannel socketChannel;
300                 try {
301                     socketChannel = SocketChannel.open();
302                 } catch (final IOException ex) {
303                     sessionRequest.failed(ex);
304                     return;
305                 }
306                 try {
307                     processConnectionRequest(socketChannel, sessionRequest);
308                 } catch (final IOException | SecurityException ex) {
309                     Closer.closeQuietly(socketChannel);
310                     sessionRequest.failed(ex);
311                 }
312             }
313         }
314     }
315 
316     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
317         validateAddress(sessionRequest.localAddress);
318         validateAddress(sessionRequest.remoteAddress);
319 
320         socketChannel.configureBlocking(false);
321         prepareSocket(socketChannel.socket());
322 
323         if (sessionRequest.localAddress != null) {
324             final Socket sock = socketChannel.socket();
325             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
326             sock.bind(sessionRequest.localAddress);
327         }
328 
329         final SocketAddress targetAddress;
330         final IOEventHandlerFactory eventHandlerFactory;
331         if (this.reactorConfig.getSocksProxyAddress() != null) {
332             targetAddress = this.reactorConfig.getSocksProxyAddress();
333             eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
334                     sessionRequest.remoteAddress,
335                     this.reactorConfig.getSocksProxyUsername(),
336                     this.reactorConfig.getSocksProxyPassword(),
337                     this.eventHandlerFactory);
338         } else {
339             targetAddress = sessionRequest.remoteAddress;
340             eventHandlerFactory = this.eventHandlerFactory;
341         }
342 
343         // Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect permissions
344         // only to this library
345         final boolean connected;
346         try {
347             connected = AccessController.doPrivileged(
348                         new PrivilegedExceptionAction<Boolean>() {
349                             @Override
350                             public Boolean run() throws IOException {
351                                 return socketChannel.connect(targetAddress);
352                             }
353                         });
354         } catch (final PrivilegedActionException e) {
355             Asserts.check(e.getCause() instanceof  IOException,
356                     "method contract violation only checked exceptions are wrapped: " + e.getCause());
357             // only checked exceptions are wrapped - error and RTExceptions are rethrown by doPrivileged
358             throw (IOException) e.getCause();
359         }
360 
361 
362         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
363         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
364 
365             @Override
366             public InternalDataChannel create(
367                     final SelectionKey key,
368                     final SocketChannel socketChannel,
369                     final NamedEndpoint namedEndpoint,
370                     final Object attachment) {
371                 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
372                 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
373                         ioSession,
374                         namedEndpoint,
375                         ioSessionDecorator,
376                         sessionListener,
377                         closedSessions);
378                 dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
379                 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
380                 return dataChannel;
381             }
382 
383         });
384         if (connected) {
385             channel.handleIOEvent(SelectionKey.OP_CONNECT);
386         } else {
387             key.attach(channel);
388             sessionRequest.assign(channel);
389         }
390     }
391 
392     private void closePendingChannels() {
393         ChannelEntry entry;
394         while ((entry = this.channelQueue.poll()) != null) {
395             final SocketChannel socketChannel = entry.channel;
396             try {
397                 socketChannel.close();
398             } catch (final IOException ex) {
399                 logException(ex);
400             }
401         }
402     }
403 
404     private void closePendingConnectionRequests() {
405         IOSessionRequest sessionRequest;
406         while ((sessionRequest = this.requestQueue.poll()) != null) {
407             sessionRequest.cancel();
408         }
409     }
410 
411 }