View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.net.Socket;
33  import java.nio.channels.Channel;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.ClosedSelectorException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.Selector;
39  import java.util.ArrayList;
40  import java.util.Date;
41  import java.util.List;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.nio.params.NIOReactorPNames;
47  import org.apache.http.nio.reactor.IOEventDispatch;
48  import org.apache.http.nio.reactor.IOReactor;
49  import org.apache.http.nio.reactor.IOReactorException;
50  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
51  import org.apache.http.nio.reactor.IOReactorStatus;
52  import org.apache.http.params.BasicHttpParams;
53  import org.apache.http.params.CoreConnectionPNames;
54  import org.apache.http.params.HttpParams;
55  import org.apache.http.util.Args;
56  import org.apache.http.util.Asserts;
57  
58  /**
59   * Generic implementation of {@link IOReactor} that can run multiple
60   * {@link BaseIOReactor} instance in separate worker threads and distribute
61   * newly created I/O session equally across those I/O reactors for a more
62   * optimal resource utilization and a better I/O performance. Usually it is
63   * recommended to have one worker I/O reactor per physical CPU core.
64   * <p>
65   * <strong>Important note about exception handling</strong>
66   * <p>
67   * Protocol specific exceptions as well as those I/O exceptions thrown in the
68   * course of interaction with the session's channel are to be expected are to be
69   * dealt with by specific protocol handlers. These exceptions may result in
70   * termination of an individual session but should not affect the I/O reactor
71   * and all other active sessions. There are situations, however, when the I/O
72   * reactor itself encounters an internal problem such as an I/O exception in
73   * the underlying NIO classes or an unhandled runtime exception. Those types of
74   * exceptions are usually fatal and will cause the I/O reactor to shut down
75   * automatically.
76   * <p>
77   * There is a possibility to override this behavior and prevent I/O reactors
78   * from shutting down automatically in case of a runtime exception or an I/O
79   * exception in internal classes. This can be accomplished by providing a custom
80   * implementation of the {@link IOReactorExceptionHandler} interface.
81   * <p>
82   * If an I/O reactor is unable to automatically recover from an I/O or a runtime
83   * exception it will enter the shutdown mode. First off, it cancel all pending
84   * new session requests. Then it will attempt to close all active I/O sessions
85   * gracefully giving them some time to flush pending output data and terminate
86   * cleanly. Lastly, it will forcibly shut down those I/O sessions that still
87   * remain active after the grace period. This is a fairly complex process, where
88   * many things can fail at the same time and many different exceptions can be
89   * thrown in the course of the shutdown process. The I/O reactor will record all
90   * exceptions thrown during the shutdown process, including the original one
91   * that actually caused the shutdown in the first place, in an audit log. One
92   * can obtain the audit log using {@link #getAuditLog()}, examine exceptions
93   * thrown by the I/O reactor prior and in the course of the reactor shutdown
94   * and decide whether it is safe to restart the I/O reactor.
95   *
96   * @since 4.0
97   */
98  @SuppressWarnings("deprecation")
99  @ThreadSafe // public methods only
100 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
101 
102     protected volatile IOReactorStatus status;
103 
104     /**
105      * @deprecated (4.2)
106      */
107     @Deprecated
108     protected final HttpParams params;
109     protected final IOReactorConfig config;
110     protected final Selector selector;
111     protected final long selectTimeout;
112     protected final boolean interestOpsQueueing;
113 
114     private final int workerCount;
115     private final ThreadFactory threadFactory;
116     private final BaseIOReactor[] dispatchers;
117     private final Worker[] workers;
118     private final Thread[] threads;
119     private final Object statusLock;
120 
121     protected IOReactorExceptionHandler exceptionHandler;
122     protected List<ExceptionEvent> auditLog;
123 
124     private int currentWorker = 0;
125 
126     /**
127      * Creates an instance of AbstractMultiworkerIOReactor with the given configuration.
128      *
129      * @param config I/O reactor configuration.
130      * @param threadFactory the factory to create threads.
131      *   Can be <code>null</code>.
132      * @throws IOReactorException in case if a non-recoverable I/O error.
133      *
134      * @since 4.2
135      */
136     public AbstractMultiworkerIOReactor(
137             final IOReactorConfig config,
138             final ThreadFactory threadFactory) throws IOReactorException {
139         super();
140         this.config = config != null ? config : IOReactorConfig.DEFAULT;
141         this.params = new BasicHttpParams();
142         try {
143             this.selector = Selector.open();
144         } catch (final IOException ex) {
145             throw new IOReactorException("Failure opening selector", ex);
146         }
147         this.selectTimeout = this.config.getSelectInterval();
148         this.interestOpsQueueing = this.config.isInterestOpQueued();
149         this.statusLock = new Object();
150         if (threadFactory != null) {
151             this.threadFactory = threadFactory;
152         } else {
153             this.threadFactory = new DefaultThreadFactory();
154         }
155         this.workerCount = this.config.getIoThreadCount();
156         this.dispatchers = new BaseIOReactor[workerCount];
157         this.workers = new Worker[workerCount];
158         this.threads = new Thread[workerCount];
159         this.status = IOReactorStatus.INACTIVE;
160     }
161 
162     /**
163      * Creates an instance of AbstractMultiworkerIOReactor with default configuration.
164      *
165      * @throws IOReactorException in case if a non-recoverable I/O error.
166      *
167      * @since 4.2
168      */
169     public AbstractMultiworkerIOReactor() throws IOReactorException {
170         this(null, null);
171     }
172 
173     @Deprecated
174     static IOReactorConfig convert(final int workerCount, final HttpParams params) {
175         Args.notNull(params, "HTTP parameters");
176         return IOReactorConfig.custom()
177             .setSelectInterval(params.getLongParameter(NIOReactorPNames.SELECT_INTERVAL, 1000))
178             .setShutdownGracePeriod(params.getLongParameter(NIOReactorPNames.GRACE_PERIOD, 500))
179             .setInterestOpQueued(params.getBooleanParameter(NIOReactorPNames.SELECT_INTERVAL, false))
180             .setIoThreadCount(workerCount)
181             .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
182             .setConnectTimeout(params.getIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 0))
183             .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
184             .setSoReuseAddress(params.getBooleanParameter(CoreConnectionPNames.SO_REUSEADDR, false))
185             .setSoKeepAlive(params.getBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE, false))
186             .setSoLinger(params.getIntParameter(CoreConnectionPNames.SO_LINGER, -1))
187             .setTcpNoDelay(params.getBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true))
188             .build();
189     }
190 
191     /**
192      * Creates an instance of AbstractMultiworkerIOReactor.
193      *
194      * @param workerCount number of worker I/O reactors.
195      * @param threadFactory the factory to create threads.
196      *   Can be <code>null</code>.
197      * @param params HTTP parameters.
198      * @throws IOReactorException in case if a non-recoverable I/O error.
199      *
200      * @deprecated (4.2) use {@link AbstractMultiworkerIOReactor#AbstractMultiworkerIOReactor(IOReactorConfig, ThreadFactory)}
201      */
202     @Deprecated
203     public AbstractMultiworkerIOReactor(
204             final int workerCount,
205             final ThreadFactory threadFactory,
206             final HttpParams params) throws IOReactorException {
207         this(convert(workerCount, params), threadFactory);
208     }
209 
210     public IOReactorStatus getStatus() {
211         return this.status;
212     }
213 
214     /**
215      * Returns the audit log containing exceptions thrown by the I/O reactor
216      * prior and in the course of the reactor shutdown.
217      *
218      * @return audit log.
219      */
220     public synchronized List<ExceptionEvent> getAuditLog() {
221         if (this.auditLog != null) {
222             return new ArrayList<ExceptionEvent>(this.auditLog);
223         } else {
224             return null;
225         }
226     }
227 
228     /**
229      * Adds the given {@link Throwable} object with the given time stamp
230      * to the audit log.
231      *
232      * @param ex the exception thrown by the I/O reactor.
233      * @param timestamp the time stamp of the exception. Can be
234      * <code>null</code> in which case the current date / time will be used.
235      */
236     protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) {
237         if (ex == null) {
238             return;
239         }
240         if (this.auditLog == null) {
241             this.auditLog = new ArrayList<ExceptionEvent>();
242         }
243         this.auditLog.add(new ExceptionEvent(ex, timestamp != null ? timestamp : new Date()));
244     }
245 
246     /**
247      * Adds the given {@link Throwable} object to the audit log.
248      *
249      * @param ex the exception thrown by the I/O reactor.
250      */
251     protected void addExceptionEvent(final Throwable ex) {
252         addExceptionEvent(ex, null);
253     }
254 
255     /**
256      * Sets exception handler for this I/O reactor.
257      *
258      * @param exceptionHandler the exception handler.
259      */
260     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
261         this.exceptionHandler = exceptionHandler;
262     }
263 
264     /**
265      * Triggered to process I/O events registered by the main {@link Selector}.
266      * <p>
267      * Super-classes can implement this method to react to the event.
268      *
269      * @param count event count.
270      * @throws IOReactorException in case if a non-recoverable I/O error.
271      */
272     protected abstract void processEvents(int count) throws IOReactorException;
273 
274     /**
275      * Triggered to cancel pending session requests.
276      * <p>
277      * Super-classes can implement this method to react to the event.
278      *
279      * @throws IOReactorException in case if a non-recoverable I/O error.
280      */
281     protected abstract void cancelRequests() throws IOReactorException;
282 
283     /**
284      * Activates the main I/O reactor as well as all worker I/O reactors.
285      * The I/O main reactor will start reacting to I/O events and triggering
286      * notification methods. The worker I/O reactor in their turn will start
287      * reacting to I/O events and dispatch I/O event notifications to the given
288      * {@link IOEventDispatch} interface.
289      * <p>
290      * This method will enter the infinite I/O select loop on
291      * the {@link Selector} instance associated with this I/O reactor and used
292      * to manage creation of new I/O channels. Once a new I/O channel has been
293      * created the processing of I/O events on that channel will be delegated
294      * to one of the worker I/O reactors.
295      * <p>
296      * The method will remain blocked unto the I/O reactor is shut down or the
297      * execution thread is interrupted.
298      *
299      * @see #processEvents(int)
300      * @see #cancelRequests()
301      *
302      * @throws InterruptedIOException if the dispatch thread is interrupted.
303      * @throws IOReactorException in case if a non-recoverable I/O error.
304      */
305     public void execute(
306             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
307         Args.notNull(eventDispatch, "Event dispatcher");
308         synchronized (this.statusLock) {
309             if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
310                 this.status = IOReactorStatus.SHUT_DOWN;
311                 this.statusLock.notifyAll();
312                 return;
313             }
314             Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
315                     "Illegal state %s", this.status);
316             this.status = IOReactorStatus.ACTIVE;
317             // Start I/O dispatchers
318             for (int i = 0; i < this.dispatchers.length; i++) {
319                 final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
320                 dispatcher.setExceptionHandler(exceptionHandler);
321                 this.dispatchers[i] = dispatcher;
322             }
323             for (int i = 0; i < this.workerCount; i++) {
324                 final BaseIOReactor dispatcher = this.dispatchers[i];
325                 this.workers[i] = new Worker(dispatcher, eventDispatch);
326                 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
327             }
328         }
329         try {
330 
331             for (int i = 0; i < this.workerCount; i++) {
332                 if (this.status != IOReactorStatus.ACTIVE) {
333                     return;
334                 }
335                 this.threads[i].start();
336             }
337 
338             for (;;) {
339                 final int readyCount;
340                 try {
341                     readyCount = this.selector.select(this.selectTimeout);
342                 } catch (final InterruptedIOException ex) {
343                     throw ex;
344                 } catch (final IOException ex) {
345                     throw new IOReactorException("Unexpected selector failure", ex);
346                 }
347 
348                 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
349                     processEvents(readyCount);
350                 }
351 
352                 // Verify I/O dispatchers
353                 for (int i = 0; i < this.workerCount; i++) {
354                     final Worker worker = this.workers[i];
355                     final Exception ex = worker.getException();
356                     if (ex != null) {
357                         throw new IOReactorException(
358                                 "I/O dispatch worker terminated abnormally", ex);
359                     }
360                 }
361 
362                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
363                     break;
364                 }
365             }
366 
367         } catch (final ClosedSelectorException ex) {
368             addExceptionEvent(ex);
369         } catch (final IOReactorException ex) {
370             if (ex.getCause() != null) {
371                 addExceptionEvent(ex.getCause());
372             }
373             throw ex;
374         } finally {
375             doShutdown();
376             synchronized (this.statusLock) {
377                 this.status = IOReactorStatus.SHUT_DOWN;
378                 this.statusLock.notifyAll();
379             }
380         }
381     }
382 
383     /**
384      * Activates the shutdown sequence for this reactor. This method will cancel
385      * all pending session requests, close out all active I/O channels,
386      * make an attempt to terminate all worker I/O reactors gracefully,
387      * and finally force-terminate those I/O reactors that failed to
388      * terminate after the specified grace period.
389      *
390      * @throws InterruptedIOException if the shutdown sequence has been
391      *   interrupted.
392      */
393     protected void doShutdown() throws InterruptedIOException {
394         synchronized (this.statusLock) {
395             if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
396                 return;
397             }
398             this.status = IOReactorStatus.SHUTTING_DOWN;
399         }
400         try {
401             cancelRequests();
402         } catch (final IOReactorException ex) {
403             if (ex.getCause() != null) {
404                 addExceptionEvent(ex.getCause());
405             }
406         }
407         this.selector.wakeup();
408 
409         // Close out all channels
410         if (this.selector.isOpen()) {
411             for (final SelectionKey key : this.selector.keys()) {
412                 try {
413                     final Channel channel = key.channel();
414                     if (channel != null) {
415                         channel.close();
416                     }
417                 } catch (final IOException ex) {
418                     addExceptionEvent(ex);
419                 }
420             }
421             // Stop dispatching I/O events
422             try {
423                 this.selector.close();
424             } catch (final IOException ex) {
425                 addExceptionEvent(ex);
426             }
427         }
428 
429         // Attempt to shut down I/O dispatchers gracefully
430         for (int i = 0; i < this.workerCount; i++) {
431             final BaseIOReactor dispatcher = this.dispatchers[i];
432             dispatcher.gracefulShutdown();
433         }
434 
435         final long gracePeriod = this.config.getShutdownGracePeriod();
436 
437         try {
438             // Force shut down I/O dispatchers if they fail to terminate
439             // in time
440             for (int i = 0; i < this.workerCount; i++) {
441                 final BaseIOReactor dispatcher = this.dispatchers[i];
442                 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
443                     dispatcher.awaitShutdown(gracePeriod);
444                 }
445                 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
446                     try {
447                         dispatcher.hardShutdown();
448                     } catch (final IOReactorException ex) {
449                         if (ex.getCause() != null) {
450                             addExceptionEvent(ex.getCause());
451                         }
452                     }
453                 }
454             }
455             // Join worker threads
456             for (int i = 0; i < this.workerCount; i++) {
457                 final Thread t = this.threads[i];
458                 if (t != null) {
459                     t.join(gracePeriod);
460                 }
461             }
462         } catch (final InterruptedException ex) {
463             throw new InterruptedIOException(ex.getMessage());
464         }
465     }
466 
467     /**
468      * Assigns the given channel entry to one of the worker I/O reactors.
469      *
470      * @param entry the channel entry.
471      */
472     protected void addChannel(final ChannelEntry entry) {
473         // Distribute new channels among the workers
474         final int i = Math.abs(this.currentWorker++ % this.workerCount);
475         this.dispatchers[i].addChannel(entry);
476     }
477 
478     /**
479      * Registers the given channel with the main {@link Selector}.
480      *
481      * @param channel the channel.
482      * @param ops interest ops.
483      * @return  selection key.
484      * @throws ClosedChannelException if the channel has been already closed.
485      */
486     protected SelectionKey registerChannel(
487             final SelectableChannel channel, final int ops) throws ClosedChannelException {
488         return channel.register(this.selector, ops);
489     }
490 
491     /**
492      * Prepares the given {@link Socket} by resetting some of its properties.
493      *
494      * @param socket the socket
495      * @throws IOException in case of an I/O error.
496      */
497     protected void prepareSocket(final Socket socket) throws IOException {
498         socket.setTcpNoDelay(this.config.isTcpNoDelay());
499         socket.setKeepAlive(this.config.isSoKeepalive());
500         if (this.config.getSoTimeout() > 0) {
501             socket.setSoTimeout(this.config.getSoTimeout());
502         }
503         if (this.config.getSndBufSize() > 0) {
504             socket.setSendBufferSize(this.config.getSndBufSize());
505         }
506         if (this.config.getRcvBufSize() > 0) {
507             socket.setReceiveBufferSize(this.config.getRcvBufSize());
508         }
509         final int linger = this.config.getSoLinger();
510         if (linger >= 0) {
511             socket.setSoLinger(linger > 0, linger);
512         }
513     }
514 
515     /**
516      * Blocks for the given period of time in milliseconds awaiting
517      * the completion of the reactor shutdown. If the value of
518      * <code>timeout</code> is set to <code>0</code> this method blocks
519      * indefinitely.
520      *
521      * @param timeout the maximum wait time.
522      * @throws InterruptedException if interrupted.
523      */
524     protected void awaitShutdown(final long timeout) throws InterruptedException {
525         synchronized (this.statusLock) {
526             final long deadline = System.currentTimeMillis() + timeout;
527             long remaining = timeout;
528             while (this.status != IOReactorStatus.SHUT_DOWN) {
529                 this.statusLock.wait(remaining);
530                 if (timeout > 0) {
531                     remaining = deadline - System.currentTimeMillis();
532                     if (remaining <= 0) {
533                         break;
534                     }
535                 }
536             }
537         }
538     }
539 
540     public void shutdown() throws IOException {
541         shutdown(2000);
542     }
543 
544     public void shutdown(final long waitMs) throws IOException {
545         synchronized (this.statusLock) {
546             if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
547                 return;
548             }
549             if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
550                 this.status = IOReactorStatus.SHUT_DOWN;
551                 cancelRequests();
552                 this.selector.close();
553                 return;
554             }
555             this.status = IOReactorStatus.SHUTDOWN_REQUEST;
556         }
557         this.selector.wakeup();
558         try {
559             awaitShutdown(waitMs);
560         } catch (final InterruptedException ignore) {
561         }
562     }
563 
564     static void closeChannel(final Channel channel) {
565         try {
566             channel.close();
567         } catch (final IOException ignore) {
568         }
569     }
570 
571     static class Worker implements Runnable {
572 
573         final BaseIOReactor dispatcher;
574         final IOEventDispatch eventDispatch;
575 
576         private volatile Exception exception;
577 
578         public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
579             super();
580             this.dispatcher = dispatcher;
581             this.eventDispatch = eventDispatch;
582         }
583 
584         public void run() {
585             try {
586                 this.dispatcher.execute(this.eventDispatch);
587             } catch (final Exception ex) {
588                 this.exception = ex;
589             }
590         }
591 
592         public Exception getException() {
593             return this.exception;
594         }
595 
596     }
597 
598     static class DefaultThreadFactory implements ThreadFactory {
599 
600         private final static AtomicLong COUNT = new AtomicLong(1);
601 
602         public Thread newThread(final Runnable r) {
603             return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
604         }
605 
606     }
607 
608 }