View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.net.Socket;
33  import java.nio.channels.Channel;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.ClosedSelectorException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.Selector;
39  import java.util.ArrayList;
40  import java.util.Date;
41  import java.util.Iterator;
42  import java.util.List;
43  import java.util.Set;
44  import java.util.concurrent.ThreadFactory;
45  
46  import org.apache.http.annotation.ThreadSafe;
47  import org.apache.http.nio.params.NIOReactorParams;
48  import org.apache.http.nio.reactor.IOEventDispatch;
49  import org.apache.http.nio.reactor.IOReactor;
50  import org.apache.http.nio.reactor.IOReactorException;
51  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
52  import org.apache.http.nio.reactor.IOReactorStatus;
53  import org.apache.http.params.BasicHttpParams;
54  import org.apache.http.params.HttpConnectionParams;
55  import org.apache.http.params.HttpParams;
56  
57  /**
58   * Generic implementation of {@link IOReactor} that can run multiple
59   * {@link BaseIOReactor} instance in separate worker threads and distribute
60   * newly created I/O session equally across those I/O reactors for a more
61   * optimal resource utilization and a better I/O performance. Usually it is
62   * recommended to have one worker I/O reactor per physical CPU core.
63   * <p>
64   * <strong>Important note about exception handling</strong>
65   * <p>
66   * Protocol specific exceptions as well as those I/O exceptions thrown in the
67   * course of interaction with the session's channel are to be expected are to be
68   * dealt with by specific protocol handlers. These exceptions may result in
69   * termination of an individual session but should not affect the I/O reactor
70   * and all other active sessions. There are situations, however, when the I/O
71   * reactor itself encounters an internal problem such as an I/O exception in
72   * the underlying NIO classes or an unhandled runtime exception. Those types of
73   * exceptions are usually fatal and will cause the I/O reactor to shut down
74   * automatically.
75   * <p>
76   * There is a possibility to override this behavior and prevent I/O reactors
77   * from shutting down automatically in case of a runtime exception or an I/O
78   * exception in internal classes. This can be accomplished by providing a custom
79   * implementation of the {@link IOReactorExceptionHandler} interface.
80   * <p>
81   * If an I/O reactor is unable to automatically recover from an I/O or a runtime
82   * exception it will enter the shutdown mode. First off, it cancel all pending
83   * new session requests. Then it will attempt to close all active I/O sessions
84   * gracefully giving them some time to flush pending output data and terminate
85   * cleanly. Lastly, it will forcibly shut down those I/O sessions that still
86   * remain active after the grace period. This is a fairly complex process, where
87   * many things can fail at the same time and many different exceptions can be
88   * thrown in the course of the shutdown process. The I/O reactor will record all
89   * exceptions thrown during the shutdown process, including the original one
90   * that actually caused the shutdown in the first place, in an audit log. One
91   * can obtain the audit log using {@link #getAuditLog()}, examine exceptions
92   * thrown by the I/O reactor prior and in the course of the reactor shutdown
93   * and decide whether it is safe to restart the I/O reactor.
94   * <p>
95   * The following parameters can be used to customize the behavior of this
96   * class:
97   * <ul>
98   *  <li>{@link org.apache.http.params.CoreConnectionPNames#TCP_NODELAY}</li>
99   *  <li>{@link org.apache.http.params.CoreConnectionPNames#SO_TIMEOUT}</li>
100  *  <li>{@link org.apache.http.params.CoreConnectionPNames#SO_LINGER}</li>
101  *  <li>{@link org.apache.http.nio.params.NIOReactorPNames#SELECT_INTERVAL}</li>
102  *  <li>{@link org.apache.http.nio.params.NIOReactorPNames#GRACE_PERIOD}</li>
103  *  <li>{@link org.apache.http.nio.params.NIOReactorPNames#INTEREST_OPS_QUEUEING}</li>
104  * </ul>
105  *
106  * @since 4.0
107  */
108 @SuppressWarnings("deprecation")
109 @ThreadSafe // public methods only
110 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
111 
112     protected volatile IOReactorStatus status;
113 
114     /**
115      * @deprecated (4.2)
116      */
117     @Deprecated
118     protected final HttpParams params;
119     protected final IOReactorConfig config;
120     protected final Selector selector;
121     protected final long selectTimeout;
122     protected final boolean interestOpsQueueing;
123 
124     private final int workerCount;
125     private final ThreadFactory threadFactory;
126     private final BaseIOReactor[] dispatchers;
127     private final Worker[] workers;
128     private final Thread[] threads;
129     private final Object statusLock;
130 
131     protected IOReactorExceptionHandler exceptionHandler;
132     protected List<ExceptionEvent> auditLog;
133 
134     private int currentWorker = 0;
135 
136     /**
137      * Creates an instance of AbstractMultiworkerIOReactor with the given configuration.
138      *
139      * @param config I/O reactor configuration.
140      * @param threadFactory the factory to create threads.
141      *   Can be <code>null</code>.
142      * @throws IOReactorException in case if a non-recoverable I/O error.
143      *
144      * @since 4.2
145      */
146     public AbstractMultiworkerIOReactor(
147             final IOReactorConfig config,
148             final ThreadFactory threadFactory) throws IOReactorException {
149         super();
150         if (config != null) {
151             try {
152                 this.config = config.clone();
153             } catch (CloneNotSupportedException ex) {
154                 throw new IOReactorException("Unable to clone configuration");
155             }
156         } else {
157             this.config = new IOReactorConfig();
158         }
159         this.params = new BasicHttpParams();
160         try {
161             this.selector = Selector.open();
162         } catch (IOException ex) {
163             throw new IOReactorException("Failure opening selector", ex);
164         }
165         this.selectTimeout = this.config.getSelectInterval();
166         this.interestOpsQueueing = this.config.isInterestOpQueued();
167         this.statusLock = new Object();
168         if (threadFactory != null) {
169             this.threadFactory = threadFactory;
170         } else {
171             this.threadFactory = new DefaultThreadFactory();
172         }
173         this.workerCount = this.config.getIoThreadCount();
174         this.dispatchers = new BaseIOReactor[workerCount];
175         this.workers = new Worker[workerCount];
176         this.threads = new Thread[workerCount];
177         this.status = IOReactorStatus.INACTIVE;
178     }
179 
180     /**
181      * Creates an instance of AbstractMultiworkerIOReactor with default configuration.
182      *
183      * @throws IOReactorException in case if a non-recoverable I/O error.
184      *
185      * @since 4.2
186      */
187     public AbstractMultiworkerIOReactor() throws IOReactorException {
188         this(null, null);
189     }
190 
191     static IOReactorConfig convert(int workerCount, final HttpParams params) {
192         if (params == null) {
193             throw new IllegalArgumentException("HTTP parameters may not be null");
194         }
195         IOReactorConfig config = new IOReactorConfig();
196         config.setSelectInterval(NIOReactorParams.getSelectInterval(params));
197         config.setShutdownGracePeriod(NIOReactorParams.getGracePeriod(params));
198         config.setInterestOpQueued(NIOReactorParams.getInterestOpsQueueing(params));
199         config.setIoThreadCount(workerCount);
200         config.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(params));
201         config.setSoTimeout(HttpConnectionParams.getSoTimeout(params));
202         config.setSoLinger(HttpConnectionParams.getLinger(params));
203         config.setSoKeepalive(HttpConnectionParams.getSoKeepalive(params));
204         config.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(params));
205         config.setSoReuseAddress(HttpConnectionParams.getSoReuseaddr(params));
206 
207         return config;
208     }
209 
210     /**
211      * Creates an instance of AbstractMultiworkerIOReactor.
212      *
213      * @param workerCount number of worker I/O reactors.
214      * @param threadFactory the factory to create threads.
215      *   Can be <code>null</code>.
216      * @param params HTTP parameters.
217      * @throws IOReactorException in case if a non-recoverable I/O error.
218      *
219      * @deprecated (4.2) use {@link AbstractMultiworkerIOReactor#AbstractMultiworkerIOReactor(IOReactorConfig, ThreadFactory)}
220      */
221     @Deprecated
222     public AbstractMultiworkerIOReactor(
223             int workerCount,
224             final ThreadFactory threadFactory,
225             final HttpParams params) throws IOReactorException {
226         this(convert(workerCount, params), threadFactory);
227     }
228 
229     public IOReactorStatus getStatus() {
230         return this.status;
231     }
232 
233     /**
234      * Returns the audit log containing exceptions thrown by the I/O reactor
235      * prior and in the course of the reactor shutdown.
236      *
237      * @return audit log.
238      */
239     public synchronized List<ExceptionEvent> getAuditLog() {
240         if (this.auditLog != null) {
241             return new ArrayList<ExceptionEvent>(this.auditLog);
242         } else {
243             return null;
244         }
245     }
246 
247     /**
248      * Adds the given {@link Throwable} object with the given time stamp
249      * to the audit log.
250      *
251      * @param ex the exception thrown by the I/O reactor.
252      * @param timestamp the time stamp of the exception. Can be
253      * <code>null</code> in which case the current date / time will be used.
254      */
255     protected synchronized void addExceptionEvent(final Throwable ex, Date timestamp) {
256         if (ex == null) {
257             return;
258         }
259         if (timestamp == null) {
260             timestamp = new Date();
261         }
262         if (this.auditLog == null) {
263             this.auditLog = new ArrayList<ExceptionEvent>();
264         }
265         this.auditLog.add(new ExceptionEvent(ex, timestamp));
266     }
267 
268     /**
269      * Adds the given {@link Throwable} object to the audit log.
270      *
271      * @param ex the exception thrown by the I/O reactor.
272      */
273     protected void addExceptionEvent(final Throwable ex) {
274         addExceptionEvent(ex, null);
275     }
276 
277     /**
278      * Sets exception handler for this I/O reactor.
279      *
280      * @param exceptionHandler the exception handler.
281      */
282     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
283         this.exceptionHandler = exceptionHandler;
284     }
285 
286     /**
287      * Triggered to process I/O events registered by the main {@link Selector}.
288      * <p>
289      * Super-classes can implement this method to react to the event.
290      *
291      * @param count event count.
292      * @throws IOReactorException in case if a non-recoverable I/O error.
293      */
294     protected abstract void processEvents(int count) throws IOReactorException;
295 
296     /**
297      * Triggered to cancel pending session requests.
298      * <p>
299      * Super-classes can implement this method to react to the event.
300      *
301      * @throws IOReactorException in case if a non-recoverable I/O error.
302      */
303     protected abstract void cancelRequests() throws IOReactorException;
304 
305     /**
306      * Activates the main I/O reactor as well as all worker I/O reactors.
307      * The I/O main reactor will start reacting to I/O events and triggering
308      * notification methods. The worker I/O reactor in their turn will start
309      * reacting to I/O events and dispatch I/O event notifications to the given
310      * {@link IOEventDispatch} interface.
311      * <p>
312      * This method will enter the infinite I/O select loop on
313      * the {@link Selector} instance associated with this I/O reactor and used
314      * to manage creation of new I/O channels. Once a new I/O channel has been
315      * created the processing of I/O events on that channel will be delegated
316      * to one of the worker I/O reactors.
317      * <p>
318      * The method will remain blocked unto the I/O reactor is shut down or the
319      * execution thread is interrupted.
320      *
321      * @see #processEvents(int)
322      * @see #cancelRequests()
323      *
324      * @throws InterruptedIOException if the dispatch thread is interrupted.
325      * @throws IOReactorException in case if a non-recoverable I/O error.
326      */
327     public void execute(
328             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
329         if (eventDispatch == null) {
330             throw new IllegalArgumentException("Event dispatcher may not be null");
331         }
332         synchronized (this.statusLock) {
333             if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
334                 this.status = IOReactorStatus.SHUT_DOWN;
335                 this.statusLock.notifyAll();
336                 return;
337             }
338             if (this.status.compareTo(IOReactorStatus.INACTIVE) != 0) {
339                 throw new IllegalStateException("Illegal state: " + this.status);
340             }
341             this.status = IOReactorStatus.ACTIVE;
342             // Start I/O dispatchers
343             for (int i = 0; i < this.dispatchers.length; i++) {
344                 BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
345                 dispatcher.setExceptionHandler(exceptionHandler);
346                 this.dispatchers[i] = dispatcher;
347             }
348             for (int i = 0; i < this.workerCount; i++) {
349                 BaseIOReactor dispatcher = this.dispatchers[i];
350                 this.workers[i] = new Worker(dispatcher, eventDispatch);
351                 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
352             }
353         }
354         try {
355 
356             for (int i = 0; i < this.workerCount; i++) {
357                 if (this.status != IOReactorStatus.ACTIVE) {
358                     return;
359                 }
360                 this.threads[i].start();
361             }
362 
363             for (;;) {
364                 int readyCount;
365                 try {
366                     readyCount = this.selector.select(this.selectTimeout);
367                 } catch (InterruptedIOException ex) {
368                     throw ex;
369                 } catch (IOException ex) {
370                     throw new IOReactorException("Unexpected selector failure", ex);
371                 }
372 
373                 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
374                     processEvents(readyCount);
375                 }
376 
377                 // Verify I/O dispatchers
378                 for (int i = 0; i < this.workerCount; i++) {
379                     Worker worker = this.workers[i];
380                     Exception ex = worker.getException();
381                     if (ex != null) {
382                         throw new IOReactorException(
383                                 "I/O dispatch worker terminated abnormally", ex);
384                     }
385                 }
386 
387                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
388                     break;
389                 }
390             }
391 
392         } catch (ClosedSelectorException ex) {
393             addExceptionEvent(ex);
394         } catch (IOReactorException ex) {
395             if (ex.getCause() != null) {
396                 addExceptionEvent(ex.getCause());
397             }
398             throw ex;
399         } finally {
400             doShutdown();
401             synchronized (this.statusLock) {
402                 this.status = IOReactorStatus.SHUT_DOWN;
403                 this.statusLock.notifyAll();
404             }
405         }
406     }
407 
408     /**
409      * Activates the shutdown sequence for this reactor. This method will cancel
410      * all pending session requests, close out all active I/O channels,
411      * make an attempt to terminate all worker I/O reactors gracefully,
412      * and finally force-terminate those I/O reactors that failed to
413      * terminate after the specified grace period.
414      *
415      * @throws InterruptedIOException if the shutdown sequence has been
416      *   interrupted.
417      */
418     protected void doShutdown() throws InterruptedIOException {
419         synchronized (this.statusLock) {
420             if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
421                 return;
422             }
423             this.status = IOReactorStatus.SHUTTING_DOWN;
424         }
425         try {
426             cancelRequests();
427         } catch (IOReactorException ex) {
428             if (ex.getCause() != null) {
429                 addExceptionEvent(ex.getCause());
430             }
431         }
432         this.selector.wakeup();
433 
434         // Close out all channels
435         if (this.selector.isOpen()) {
436             Set<SelectionKey> keys = this.selector.keys();
437             for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
438                 try {
439                     SelectionKey key = it.next();
440                     Channel channel = key.channel();
441                     if (channel != null) {
442                         channel.close();
443                     }
444                 } catch (IOException ex) {
445                     addExceptionEvent(ex);
446                 }
447             }
448             // Stop dispatching I/O events
449             try {
450                 this.selector.close();
451             } catch (IOException ex) {
452                 addExceptionEvent(ex);
453             }
454         }
455 
456         // Attempt to shut down I/O dispatchers gracefully
457         for (int i = 0; i < this.workerCount; i++) {
458             BaseIOReactor dispatcher = this.dispatchers[i];
459             dispatcher.gracefulShutdown();
460         }
461 
462         long gracePeriod = this.config.getShutdownGracePeriod();
463 
464         try {
465             // Force shut down I/O dispatchers if they fail to terminate
466             // in time
467             for (int i = 0; i < this.workerCount; i++) {
468                 BaseIOReactor dispatcher = this.dispatchers[i];
469                 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
470                     dispatcher.awaitShutdown(gracePeriod);
471                 }
472                 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
473                     try {
474                         dispatcher.hardShutdown();
475                     } catch (IOReactorException ex) {
476                         if (ex.getCause() != null) {
477                             addExceptionEvent(ex.getCause());
478                         }
479                     }
480                 }
481             }
482             // Join worker threads
483             for (int i = 0; i < this.workerCount; i++) {
484                 Thread t = this.threads[i];
485                 if (t != null) {
486                     t.join(gracePeriod);
487                 }
488             }
489         } catch (InterruptedException ex) {
490             throw new InterruptedIOException(ex.getMessage());
491         }
492     }
493 
494     /**
495      * Assigns the given channel entry to one of the worker I/O reactors.
496      *
497      * @param entry the channel entry.
498      */
499     protected void addChannel(final ChannelEntry entry) {
500         // Distribute new channels among the workers
501         int i = Math.abs(this.currentWorker++ % this.workerCount);
502         this.dispatchers[i].addChannel(entry);
503     }
504 
505     /**
506      * Registers the given channel with the main {@link Selector}.
507      *
508      * @param channel the channel.
509      * @param ops interest ops.
510      * @return  selection key.
511      * @throws ClosedChannelException if the channel has been already closed.
512      */
513     protected SelectionKey registerChannel(
514             final SelectableChannel channel, int ops) throws ClosedChannelException {
515         return channel.register(this.selector, ops);
516     }
517 
518     /**
519      * Prepares the given {@link Socket} by resetting some of its properties.
520      *
521      * @param socket the socket
522      * @throws IOException in case of an I/O error.
523      */
524     protected void prepareSocket(final Socket socket) throws IOException {
525         socket.setTcpNoDelay(this.config.isTcpNoDelay());
526         socket.setKeepAlive(this.config.isSoKeepalive());
527         socket.setReuseAddress(this.config.isSoReuseAddress());
528         if (this.config.getSoTimeout() > 0) {
529             socket.setSoTimeout(this.config.getSoTimeout());
530         }
531         if (this.config.getSndBufSize() > 0) {
532             socket.setSendBufferSize(this.config.getSndBufSize());
533         }
534         if (this.config.getRcvBufSize() > 0) {
535             socket.setReceiveBufferSize(this.config.getRcvBufSize());
536         }
537         int linger = this.config.getSoLinger();
538         if (linger >= 0) {
539             socket.setSoLinger(linger > 0, linger);
540         }
541     }
542 
543     /**
544      * Blocks for the given period of time in milliseconds awaiting
545      * the completion of the reactor shutdown. If the value of
546      * <code>timeout</code> is set to <code>0</code> this method blocks
547      * indefinitely.
548      *
549      * @param timeout the maximum wait time.
550      * @throws InterruptedException if interrupted.
551      */
552     protected void awaitShutdown(long timeout) throws InterruptedException {
553         synchronized (this.statusLock) {
554             long deadline = System.currentTimeMillis() + timeout;
555             long remaining = timeout;
556             while (this.status != IOReactorStatus.SHUT_DOWN) {
557                 this.statusLock.wait(remaining);
558                 if (timeout > 0) {
559                     remaining = deadline - System.currentTimeMillis();
560                     if (remaining <= 0) {
561                         break;
562                     }
563                 }
564             }
565         }
566     }
567 
568     public void shutdown() throws IOException {
569         shutdown(2000);
570     }
571 
572     public void shutdown(long waitMs) throws IOException {
573         synchronized (this.statusLock) {
574             if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
575                 return;
576             }
577             if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
578                 this.status = IOReactorStatus.SHUT_DOWN;
579                 cancelRequests();
580                 return;
581             }
582             this.status = IOReactorStatus.SHUTDOWN_REQUEST;
583         }
584         this.selector.wakeup();
585         try {
586             awaitShutdown(waitMs);
587         } catch (InterruptedException ignore) {
588         }
589     }
590 
591     static void closeChannel(final Channel channel) {
592         try {
593             channel.close();
594         } catch (IOException ignore) {
595         }
596     }
597 
598     static class Worker implements Runnable {
599 
600         final BaseIOReactor dispatcher;
601         final IOEventDispatch eventDispatch;
602 
603         private volatile Exception exception;
604 
605         public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
606             super();
607             this.dispatcher = dispatcher;
608             this.eventDispatch = eventDispatch;
609         }
610 
611         public void run() {
612             try {
613                 this.dispatcher.execute(this.eventDispatch);
614             } catch (Exception ex) {
615                 this.exception = ex;
616             }
617         }
618 
619         public Exception getException() {
620             return this.exception;
621         }
622 
623     }
624 
625     static class DefaultThreadFactory implements ThreadFactory {
626 
627         private static volatile int COUNT = 0;
628 
629         public Thread newThread(final Runnable r) {
630             return new Thread(r, "I/O dispatcher " + (++COUNT));
631         }
632 
633     }
634 
635 }