View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.net.Socket;
33  import java.nio.channels.Channel;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.ClosedSelectorException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.Selector;
39  import java.util.ArrayList;
40  import java.util.Date;
41  import java.util.List;
42  import java.util.concurrent.ThreadFactory;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.nio.params.NIOReactorPNames;
47  import org.apache.http.nio.reactor.IOEventDispatch;
48  import org.apache.http.nio.reactor.IOReactor;
49  import org.apache.http.nio.reactor.IOReactorException;
50  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
51  import org.apache.http.nio.reactor.IOReactorStatus;
52  import org.apache.http.params.BasicHttpParams;
53  import org.apache.http.params.CoreConnectionPNames;
54  import org.apache.http.params.HttpParams;
55  import org.apache.http.util.Args;
56  import org.apache.http.util.Asserts;
57  
58  /**
59   * Generic implementation of {@link IOReactor} that can run multiple
60   * {@link BaseIOReactor} instance in separate worker threads and distribute
61   * newly created I/O session equally across those I/O reactors for a more
62   * optimal resource utilization and a better I/O performance. Usually it is
63   * recommended to have one worker I/O reactor per physical CPU core.
64   * <p>
65   * <strong>Important note about exception handling</strong>
66   * <p>
67   * Protocol specific exceptions as well as those I/O exceptions thrown in the
68   * course of interaction with the session's channel are to be expected are to be
69   * dealt with by specific protocol handlers. These exceptions may result in
70   * termination of an individual session but should not affect the I/O reactor
71   * and all other active sessions. There are situations, however, when the I/O
72   * reactor itself encounters an internal problem such as an I/O exception in
73   * the underlying NIO classes or an unhandled runtime exception. Those types of
74   * exceptions are usually fatal and will cause the I/O reactor to shut down
75   * automatically.
76   * <p>
77   * There is a possibility to override this behavior and prevent I/O reactors
78   * from shutting down automatically in case of a runtime exception or an I/O
79   * exception in internal classes. This can be accomplished by providing a custom
80   * implementation of the {@link IOReactorExceptionHandler} interface.
81   * <p>
82   * If an I/O reactor is unable to automatically recover from an I/O or a runtime
83   * exception it will enter the shutdown mode. First off, it cancel all pending
84   * new session requests. Then it will attempt to close all active I/O sessions
85   * gracefully giving them some time to flush pending output data and terminate
86   * cleanly. Lastly, it will forcibly shut down those I/O sessions that still
87   * remain active after the grace period. This is a fairly complex process, where
88   * many things can fail at the same time and many different exceptions can be
89   * thrown in the course of the shutdown process. The I/O reactor will record all
90   * exceptions thrown during the shutdown process, including the original one
91   * that actually caused the shutdown in the first place, in an audit log. One
92   * can obtain the audit log using {@link #getAuditLog()}, examine exceptions
93   * thrown by the I/O reactor prior and in the course of the reactor shutdown
94   * and decide whether it is safe to restart the I/O reactor.
95   *
96   * @since 4.0
97   */
98  @SuppressWarnings("deprecation")
99  @ThreadSafe // public methods only
100 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
101 
102     protected volatile IOReactorStatus status;
103 
104     /**
105      * @deprecated (4.2)
106      */
107     @Deprecated
108     protected final HttpParams params;
109     protected final IOReactorConfig config;
110     protected final Selector selector;
111     protected final long selectTimeout;
112     protected final boolean interestOpsQueueing;
113 
114     private final int workerCount;
115     private final ThreadFactory threadFactory;
116     private final BaseIOReactor[] dispatchers;
117     private final Worker[] workers;
118     private final Thread[] threads;
119     private final Object statusLock;
120 
121     //TODO: make final
122     protected IOReactorExceptionHandler exceptionHandler;
123     protected List<ExceptionEvent> auditLog;
124 
125     private int currentWorker = 0;
126 
127     /**
128      * Creates an instance of AbstractMultiworkerIOReactor with the given configuration.
129      *
130      * @param config I/O reactor configuration.
131      * @param threadFactory the factory to create threads.
132      *   Can be {@code null}.
133      * @throws IOReactorException in case if a non-recoverable I/O error.
134      *
135      * @since 4.2
136      */
137     public AbstractMultiworkerIOReactor(
138             final IOReactorConfig config,
139             final ThreadFactory threadFactory) throws IOReactorException {
140         super();
141         this.config = config != null ? config : IOReactorConfig.DEFAULT;
142         this.params = new BasicHttpParams();
143         try {
144             this.selector = Selector.open();
145         } catch (final IOException ex) {
146             throw new IOReactorException("Failure opening selector", ex);
147         }
148         this.selectTimeout = this.config.getSelectInterval();
149         this.interestOpsQueueing = this.config.isInterestOpQueued();
150         this.statusLock = new Object();
151         if (threadFactory != null) {
152             this.threadFactory = threadFactory;
153         } else {
154             this.threadFactory = new DefaultThreadFactory();
155         }
156         this.auditLog = new ArrayList<ExceptionEvent>();
157         this.workerCount = this.config.getIoThreadCount();
158         this.dispatchers = new BaseIOReactor[workerCount];
159         this.workers = new Worker[workerCount];
160         this.threads = new Thread[workerCount];
161         this.status = IOReactorStatus.INACTIVE;
162     }
163 
164     /**
165      * Creates an instance of AbstractMultiworkerIOReactor with default configuration.
166      *
167      * @throws IOReactorException in case if a non-recoverable I/O error.
168      *
169      * @since 4.2
170      */
171     public AbstractMultiworkerIOReactor() throws IOReactorException {
172         this(null, null);
173     }
174 
175     @Deprecated
176     static IOReactorConfig convert(final int workerCount, final HttpParams params) {
177         Args.notNull(params, "HTTP parameters");
178         return IOReactorConfig.custom()
179             .setSelectInterval(params.getLongParameter(NIOReactorPNames.SELECT_INTERVAL, 1000))
180             .setShutdownGracePeriod(params.getLongParameter(NIOReactorPNames.GRACE_PERIOD, 500))
181             .setInterestOpQueued(params.getBooleanParameter(NIOReactorPNames.SELECT_INTERVAL, false))
182             .setIoThreadCount(workerCount)
183             .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
184             .setConnectTimeout(params.getIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 0))
185             .setSoTimeout(params.getIntParameter(CoreConnectionPNames.SO_TIMEOUT, 0))
186             .setSoReuseAddress(params.getBooleanParameter(CoreConnectionPNames.SO_REUSEADDR, false))
187             .setSoKeepAlive(params.getBooleanParameter(CoreConnectionPNames.SO_KEEPALIVE, false))
188             .setSoLinger(params.getIntParameter(CoreConnectionPNames.SO_LINGER, -1))
189             .setTcpNoDelay(params.getBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true))
190             .build();
191     }
192 
193     /**
194      * Creates an instance of AbstractMultiworkerIOReactor.
195      *
196      * @param workerCount number of worker I/O reactors.
197      * @param threadFactory the factory to create threads.
198      *   Can be {@code null}.
199      * @param params HTTP parameters.
200      * @throws IOReactorException in case if a non-recoverable I/O error.
201      *
202      * @deprecated (4.2) use {@link AbstractMultiworkerIOReactor#AbstractMultiworkerIOReactor(IOReactorConfig, ThreadFactory)}
203      */
204     @Deprecated
205     public AbstractMultiworkerIOReactor(
206             final int workerCount,
207             final ThreadFactory threadFactory,
208             final HttpParams params) throws IOReactorException {
209         this(convert(workerCount, params), threadFactory);
210     }
211 
212     @Override
213     public IOReactorStatus getStatus() {
214         return this.status;
215     }
216 
217     /**
218      * Returns the audit log containing exceptions thrown by the I/O reactor
219      * prior and in the course of the reactor shutdown.
220      *
221      * @return audit log.
222      */
223     public List<ExceptionEvent> getAuditLog() {
224         synchronized (this.auditLog) {
225             return new ArrayList<ExceptionEvent>(this.auditLog);
226         }
227     }
228 
229     /**
230      * Adds the given {@link Throwable} object with the given time stamp
231      * to the audit log.
232      *
233      * @param ex the exception thrown by the I/O reactor.
234      * @param timestamp the time stamp of the exception. Can be
235      * {@code null} in which case the current date / time will be used.
236      */
237     protected synchronized void addExceptionEvent(final Throwable ex, final Date timestamp) {
238         if (ex == null) {
239             return;
240         }
241         synchronized (this.auditLog) {
242             this.auditLog.add(new ExceptionEvent(ex, timestamp != null ? timestamp : new Date()));
243         }
244     }
245 
246     /**
247      * Adds the given {@link Throwable} object to the audit log.
248      *
249      * @param ex the exception thrown by the I/O reactor.
250      */
251     protected void addExceptionEvent(final Throwable ex) {
252         addExceptionEvent(ex, null);
253     }
254 
255     /**
256      * Sets exception handler for this I/O reactor.
257      *
258      * @param exceptionHandler the exception handler.
259      */
260     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
261         this.exceptionHandler = exceptionHandler;
262     }
263 
264     /**
265      * Triggered to process I/O events registered by the main {@link Selector}.
266      * <p>
267      * Super-classes can implement this method to react to the event.
268      *
269      * @param count event count.
270      * @throws IOReactorException in case if a non-recoverable I/O error.
271      */
272     protected abstract void processEvents(int count) throws IOReactorException;
273 
274     /**
275      * Triggered to cancel pending session requests.
276      * <p>
277      * Super-classes can implement this method to react to the event.
278      *
279      * @throws IOReactorException in case if a non-recoverable I/O error.
280      */
281     protected abstract void cancelRequests() throws IOReactorException;
282 
283     /**
284      * Activates the main I/O reactor as well as all worker I/O reactors.
285      * The I/O main reactor will start reacting to I/O events and triggering
286      * notification methods. The worker I/O reactor in their turn will start
287      * reacting to I/O events and dispatch I/O event notifications to the given
288      * {@link IOEventDispatch} interface.
289      * <p>
290      * This method will enter the infinite I/O select loop on
291      * the {@link Selector} instance associated with this I/O reactor and used
292      * to manage creation of new I/O channels. Once a new I/O channel has been
293      * created the processing of I/O events on that channel will be delegated
294      * to one of the worker I/O reactors.
295      * <p>
296      * The method will remain blocked unto the I/O reactor is shut down or the
297      * execution thread is interrupted.
298      *
299      * @see #processEvents(int)
300      * @see #cancelRequests()
301      *
302      * @throws InterruptedIOException if the dispatch thread is interrupted.
303      * @throws IOReactorException in case if a non-recoverable I/O error.
304      */
305     @Override
306     public void execute(
307             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
308         Args.notNull(eventDispatch, "Event dispatcher");
309         synchronized (this.statusLock) {
310             if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
311                 this.status = IOReactorStatus.SHUT_DOWN;
312                 this.statusLock.notifyAll();
313                 return;
314             }
315             Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
316                     "Illegal state %s", this.status);
317             this.status = IOReactorStatus.ACTIVE;
318             // Start I/O dispatchers
319             for (int i = 0; i < this.dispatchers.length; i++) {
320                 final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
321                 dispatcher.setExceptionHandler(exceptionHandler);
322                 this.dispatchers[i] = dispatcher;
323             }
324             for (int i = 0; i < this.workerCount; i++) {
325                 final BaseIOReactor dispatcher = this.dispatchers[i];
326                 this.workers[i] = new Worker(dispatcher, eventDispatch);
327                 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
328             }
329         }
330         try {
331 
332             for (int i = 0; i < this.workerCount; i++) {
333                 if (this.status != IOReactorStatus.ACTIVE) {
334                     return;
335                 }
336                 this.threads[i].start();
337             }
338 
339             for (;;) {
340                 final int readyCount;
341                 try {
342                     readyCount = this.selector.select(this.selectTimeout);
343                 } catch (final InterruptedIOException ex) {
344                     throw ex;
345                 } catch (final IOException ex) {
346                     throw new IOReactorException("Unexpected selector failure", ex);
347                 }
348 
349                 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
350                     processEvents(readyCount);
351                 }
352 
353                 // Verify I/O dispatchers
354                 for (int i = 0; i < this.workerCount; i++) {
355                     final Worker worker = this.workers[i];
356                     final Exception ex = worker.getException();
357                     if (ex != null) {
358                         throw new IOReactorException(
359                                 "I/O dispatch worker terminated abnormally", ex);
360                     }
361                 }
362 
363                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
364                     break;
365                 }
366             }
367 
368         } catch (final ClosedSelectorException ex) {
369             addExceptionEvent(ex);
370         } catch (final IOReactorException ex) {
371             if (ex.getCause() != null) {
372                 addExceptionEvent(ex.getCause());
373             }
374             throw ex;
375         } finally {
376             doShutdown();
377             synchronized (this.statusLock) {
378                 this.status = IOReactorStatus.SHUT_DOWN;
379                 this.statusLock.notifyAll();
380             }
381         }
382     }
383 
384     /**
385      * Activates the shutdown sequence for this reactor. This method will cancel
386      * all pending session requests, close out all active I/O channels,
387      * make an attempt to terminate all worker I/O reactors gracefully,
388      * and finally force-terminate those I/O reactors that failed to
389      * terminate after the specified grace period.
390      *
391      * @throws InterruptedIOException if the shutdown sequence has been
392      *   interrupted.
393      */
394     protected void doShutdown() throws InterruptedIOException {
395         synchronized (this.statusLock) {
396             if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
397                 return;
398             }
399             this.status = IOReactorStatus.SHUTTING_DOWN;
400         }
401         try {
402             cancelRequests();
403         } catch (final IOReactorException ex) {
404             if (ex.getCause() != null) {
405                 addExceptionEvent(ex.getCause());
406             }
407         }
408         this.selector.wakeup();
409 
410         // Close out all channels
411         if (this.selector.isOpen()) {
412             for (final SelectionKey key : this.selector.keys()) {
413                 try {
414                     final Channel channel = key.channel();
415                     if (channel != null) {
416                         channel.close();
417                     }
418                 } catch (final IOException ex) {
419                     addExceptionEvent(ex);
420                 }
421             }
422             // Stop dispatching I/O events
423             try {
424                 this.selector.close();
425             } catch (final IOException ex) {
426                 addExceptionEvent(ex);
427             }
428         }
429 
430         // Attempt to shut down I/O dispatchers gracefully
431         for (int i = 0; i < this.workerCount; i++) {
432             final BaseIOReactor dispatcher = this.dispatchers[i];
433             dispatcher.gracefulShutdown();
434         }
435 
436         final long gracePeriod = this.config.getShutdownGracePeriod();
437 
438         try {
439             // Force shut down I/O dispatchers if they fail to terminate
440             // in time
441             for (int i = 0; i < this.workerCount; i++) {
442                 final BaseIOReactor dispatcher = this.dispatchers[i];
443                 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
444                     dispatcher.awaitShutdown(gracePeriod);
445                 }
446                 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
447                     try {
448                         dispatcher.hardShutdown();
449                     } catch (final IOReactorException ex) {
450                         if (ex.getCause() != null) {
451                             addExceptionEvent(ex.getCause());
452                         }
453                     }
454                 }
455             }
456             // Join worker threads
457             for (int i = 0; i < this.workerCount; i++) {
458                 final Thread t = this.threads[i];
459                 if (t != null) {
460                     t.join(gracePeriod);
461                 }
462             }
463         } catch (final InterruptedException ex) {
464             throw new InterruptedIOException(ex.getMessage());
465         }
466     }
467 
468     /**
469      * Assigns the given channel entry to one of the worker I/O reactors.
470      *
471      * @param entry the channel entry.
472      */
473     protected void addChannel(final ChannelEntry entry) {
474         // Distribute new channels among the workers
475         final int i = Math.abs(this.currentWorker++ % this.workerCount);
476         this.dispatchers[i].addChannel(entry);
477     }
478 
479     /**
480      * Registers the given channel with the main {@link Selector}.
481      *
482      * @param channel the channel.
483      * @param ops interest ops.
484      * @return  selection key.
485      * @throws ClosedChannelException if the channel has been already closed.
486      */
487     protected SelectionKey registerChannel(
488             final SelectableChannel channel, final int ops) throws ClosedChannelException {
489         return channel.register(this.selector, ops);
490     }
491 
492     /**
493      * Prepares the given {@link Socket} by resetting some of its properties.
494      *
495      * @param socket the socket
496      * @throws IOException in case of an I/O error.
497      */
498     protected void prepareSocket(final Socket socket) throws IOException {
499         socket.setTcpNoDelay(this.config.isTcpNoDelay());
500         socket.setKeepAlive(this.config.isSoKeepalive());
501         if (this.config.getSoTimeout() > 0) {
502             socket.setSoTimeout(this.config.getSoTimeout());
503         }
504         if (this.config.getSndBufSize() > 0) {
505             socket.setSendBufferSize(this.config.getSndBufSize());
506         }
507         if (this.config.getRcvBufSize() > 0) {
508             socket.setReceiveBufferSize(this.config.getRcvBufSize());
509         }
510         final int linger = this.config.getSoLinger();
511         if (linger >= 0) {
512             socket.setSoLinger(true, linger);
513         }
514     }
515 
516     /**
517      * Blocks for the given period of time in milliseconds awaiting
518      * the completion of the reactor shutdown. If the value of
519      * {@code timeout} is set to {@code 0} this method blocks
520      * indefinitely.
521      *
522      * @param timeout the maximum wait time.
523      * @throws InterruptedException if interrupted.
524      */
525     protected void awaitShutdown(final long timeout) throws InterruptedException {
526         synchronized (this.statusLock) {
527             final long deadline = System.currentTimeMillis() + timeout;
528             long remaining = timeout;
529             while (this.status != IOReactorStatus.SHUT_DOWN) {
530                 this.statusLock.wait(remaining);
531                 if (timeout > 0) {
532                     remaining = deadline - System.currentTimeMillis();
533                     if (remaining <= 0) {
534                         break;
535                     }
536                 }
537             }
538         }
539     }
540 
541     @Override
542     public void shutdown() throws IOException {
543         shutdown(2000);
544     }
545 
546     @Override
547     public void shutdown(final long waitMs) throws IOException {
548         synchronized (this.statusLock) {
549             if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
550                 return;
551             }
552             if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
553                 this.status = IOReactorStatus.SHUT_DOWN;
554                 cancelRequests();
555                 this.selector.close();
556                 return;
557             }
558             this.status = IOReactorStatus.SHUTDOWN_REQUEST;
559         }
560         this.selector.wakeup();
561         try {
562             awaitShutdown(waitMs);
563         } catch (final InterruptedException ignore) {
564         }
565     }
566 
567     static void closeChannel(final Channel channel) {
568         try {
569             channel.close();
570         } catch (final IOException ignore) {
571         }
572     }
573 
574     static class Worker implements Runnable {
575 
576         final BaseIOReactor dispatcher;
577         final IOEventDispatch eventDispatch;
578 
579         private volatile Exception exception;
580 
581         public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
582             super();
583             this.dispatcher = dispatcher;
584             this.eventDispatch = eventDispatch;
585         }
586 
587         @Override
588         public void run() {
589             try {
590                 this.dispatcher.execute(this.eventDispatch);
591             } catch (final Exception ex) {
592                 this.exception = ex;
593             }
594         }
595 
596         public Exception getException() {
597             return this.exception;
598         }
599 
600     }
601 
602     static class DefaultThreadFactory implements ThreadFactory {
603 
604         private final static AtomicLong COUNT = new AtomicLong(1);
605 
606         @Override
607         public Thread newThread(final Runnable r) {
608             return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
609         }
610 
611     }
612 
613 }