View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.channels.CancelledKeyException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ClosedSelectorException;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.Selector;
37  import java.nio.channels.SocketChannel;
38  import java.util.Collections;
39  import java.util.HashSet;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  
44  import org.apache.http.annotation.ThreadSafe;
45  import org.apache.http.nio.reactor.IOReactor;
46  import org.apache.http.nio.reactor.IOReactorException;
47  import org.apache.http.nio.reactor.IOReactorStatus;
48  import org.apache.http.nio.reactor.IOSession;
49  import org.apache.http.util.Args;
50  import org.apache.http.util.Asserts;
51  
52  /**
53   * Generic implementation of {@link IOReactor} that can used as a subclass
54   * for more specialized I/O reactors. It is based on a single {@link Selector}
55   * instance.
56   *
57   * @since 4.0
58   */
59  @ThreadSafe // public methods only
60  public abstract class AbstractIOReactor implements IOReactor {
61  
62      private volatile IOReactorStatus status;
63  
64      private final Object statusMutex;
65      private final long selectTimeout;
66      private final boolean interestOpsQueueing;
67      private final Selector selector;
68      private final Set<IOSession> sessions;
69      private final Queue<InterestOpEntry> interestOpsQueue;
70      private final Queue<IOSession> closedSessions;
71      private final Queue<ChannelEntry> newChannels;
72  
73      /**
74       * Creates new AbstractIOReactor instance.
75       *
76       * @param selectTimeout the select timeout.
77       * @throws IOReactorException in case if a non-recoverable I/O error.
78       */
79      public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
80          this(selectTimeout, false);
81      }
82  
83      /**
84       * Creates new AbstractIOReactor instance.
85       *
86       * @param selectTimeout the select timeout.
87       * @param interestOpsQueueing Ops queueing flag.
88       *
89       * @throws IOReactorException in case if a non-recoverable I/O error.
90       *
91       * @since 4.1
92       */
93      public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
94          super();
95          Args.positive(selectTimeout, "Select timeout");
96          this.selectTimeout = selectTimeout;
97          this.interestOpsQueueing = interestOpsQueueing;
98          this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
99          this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
100         this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
101         this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
102         try {
103             this.selector = Selector.open();
104         } catch (final IOException ex) {
105             throw new IOReactorException("Failure opening selector", ex);
106         }
107         this.statusMutex = new Object();
108         this.status = IOReactorStatus.INACTIVE;
109     }
110 
111     /**
112      * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
113      * <p>
114      * Super-classes can implement this method to react to the event.
115      *
116      * @param key the selection key.
117      */
118     protected abstract void acceptable(SelectionKey key);
119 
120     /**
121      * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
122      * <p>
123      * Super-classes can implement this method to react to the event.
124      *
125      * @param key the selection key.
126      */
127     protected abstract void connectable(SelectionKey key);
128 
129     /**
130      * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
131      * <p>
132      * Super-classes can implement this method to react to the event.
133      *
134      * @param key the selection key.
135      */
136     protected abstract void readable(SelectionKey key);
137 
138     /**
139      * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
140      * <p>
141      * Super-classes can implement this method to react to the event.
142      *
143      * @param key the selection key.
144      */
145     protected abstract void writable(SelectionKey key);
146 
147     /**
148      * Triggered to validate keys currently registered with the selector. This
149      * method is called after each I/O select loop.
150      * <p>
151      * Super-classes can implement this method to run validity checks on
152      * active sessions and include additional processing that needs to be
153      * executed after each I/O select loop.
154      *
155      * @param keys all selection keys registered with the selector.
156      */
157     protected abstract void validate(Set<SelectionKey> keys);
158 
159     /**
160      * Triggered when new session has been created.
161      * <p>
162      * Super-classes can implement this method to react to the event.
163      *
164      * @param key the selection key.
165      * @param session new I/O session.
166      */
167     protected void sessionCreated(final SelectionKey key, final IOSession session) {
168     }
169 
170     /**
171      * Triggered when a session has been closed.
172      * <p>
173      * Super-classes can implement this method to react to the event.
174      *
175      * @param session closed I/O session.
176      */
177     protected void sessionClosed(final IOSession session) {
178     }
179 
180     /**
181      * Triggered when a session has timed out.
182      * <p>
183      * Super-classes can implement this method to react to the event.
184      *
185      * @param session timed out I/O session.
186      */
187     protected void sessionTimedOut(final IOSession session) {
188     }
189 
190     /**
191      * Obtains {@link IOSession} instance associated with the given selection
192      * key.
193      *
194      * @param key the selection key.
195      * @return I/O session.
196      */
197     protected IOSession getSession(final SelectionKey key) {
198         return (IOSession) key.attachment();
199     }
200 
201     @Override
202     public IOReactorStatus getStatus() {
203         return this.status;
204     }
205 
206     /**
207      * Returns {@code true} if interest Ops queueing is enabled, {@code false} otherwise.
208      *
209      * @since 4.1
210      */
211     public boolean getInterestOpsQueueing() {
212         return this.interestOpsQueueing;
213     }
214 
215     /**
216      * Adds new channel entry. The channel will be asynchronously registered
217      * with the selector.
218      *
219      * @param channelEntry the channel entry.
220      */
221     public void addChannel(final ChannelEntry channelEntry) {
222         Args.notNull(channelEntry, "Channel entry");
223         this.newChannels.add(channelEntry);
224         this.selector.wakeup();
225     }
226 
227     /**
228      * Activates the I/O reactor. The I/O reactor will start reacting to
229      * I/O events and triggering notification methods.
230      * <p>
231      * This method will enter the infinite I/O select loop on
232      * the {@link Selector} instance associated with this I/O reactor.
233      * <p>
234      * The method will remain blocked unto the I/O reactor is shut down or the
235      * execution thread is interrupted.
236      *
237      * @see #acceptable(SelectionKey)
238      * @see #connectable(SelectionKey)
239      * @see #readable(SelectionKey)
240      * @see #writable(SelectionKey)
241      * @see #timeoutCheck(SelectionKey, long)
242      * @see #validate(Set)
243      * @see #sessionCreated(SelectionKey, IOSession)
244      * @see #sessionClosed(IOSession)
245      *
246      * @throws InterruptedIOException if the dispatch thread is interrupted.
247      * @throws IOReactorException in case if a non-recoverable I/O error.
248      */
249     protected void execute() throws InterruptedIOException, IOReactorException {
250         this.status = IOReactorStatus.ACTIVE;
251 
252         try {
253             for (;;) {
254 
255                 final int readyCount;
256                 try {
257                     readyCount = this.selector.select(this.selectTimeout);
258                 } catch (final InterruptedIOException ex) {
259                     throw ex;
260                 } catch (final IOException ex) {
261                     throw new IOReactorException("Unexpected selector failure", ex);
262                 }
263 
264                 if (this.status == IOReactorStatus.SHUT_DOWN) {
265                     // Hard shut down. Exit select loop immediately
266                     break;
267                 }
268 
269                 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
270                     // Graceful shutdown in process
271                     // Try to close things out nicely
272                     closeSessions();
273                     closeNewChannels();
274                 }
275 
276                 // Process selected I/O events
277                 if (readyCount > 0) {
278                     processEvents(this.selector.selectedKeys());
279                 }
280 
281                 // Validate active channels
282                 validate(this.selector.keys());
283 
284                 // Process closed sessions
285                 processClosedSessions();
286 
287                 // If active process new channels
288                 if (this.status == IOReactorStatus.ACTIVE) {
289                     processNewChannels();
290                 }
291 
292                 // Exit select loop if graceful shutdown has been completed
293                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
294                         && this.sessions.isEmpty()) {
295                     break;
296                 }
297 
298                 if (this.interestOpsQueueing) {
299                     // process all pending interestOps() operations
300                     processPendingInterestOps();
301                 }
302 
303             }
304 
305         } catch (final ClosedSelectorException ignore) {
306         } finally {
307             hardShutdown();
308             synchronized (this.statusMutex) {
309                 this.statusMutex.notifyAll();
310             }
311         }
312     }
313 
314     private void processEvents(final Set<SelectionKey> selectedKeys) {
315         for (final SelectionKey key : selectedKeys) {
316 
317             processEvent(key);
318 
319         }
320         selectedKeys.clear();
321     }
322 
323     /**
324      * Processes new event on the given selection key.
325      *
326      * @param key the selection key that triggered an event.
327      */
328     protected void processEvent(final SelectionKey key) {
329         final IOSessionImpl session = (IOSessionImpl) key.attachment();
330         try {
331             if (key.isAcceptable()) {
332                 acceptable(key);
333             }
334             if (key.isConnectable()) {
335                 connectable(key);
336             }
337             if (key.isReadable()) {
338                 session.resetLastRead();
339                 readable(key);
340             }
341             if (key.isWritable()) {
342                 session.resetLastWrite();
343                 writable(key);
344             }
345         } catch (final CancelledKeyException ex) {
346             queueClosedSession(session);
347             key.attach(null);
348         }
349     }
350 
351     /**
352      * Queues the given I/O session to be processed asynchronously as closed.
353      *
354      * @param session the closed I/O session.
355      */
356     protected void queueClosedSession(final IOSession session) {
357         if (session != null) {
358             this.closedSessions.add(session);
359         }
360     }
361 
362     private void processNewChannels() throws IOReactorException {
363         ChannelEntry entry;
364         while ((entry = this.newChannels.poll()) != null) {
365 
366             final SocketChannel channel;
367             final SelectionKey key;
368             try {
369                 channel = entry.getChannel();
370                 channel.configureBlocking(false);
371                 key = channel.register(this.selector, SelectionKey.OP_READ);
372             } catch (final ClosedChannelException ex) {
373                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
374                 if (sessionRequest != null) {
375                     sessionRequest.failed(ex);
376                 }
377                 return;
378 
379             } catch (final IOException ex) {
380                 throw new IOReactorException("Failure registering channel " +
381                         "with the selector", ex);
382             }
383 
384             final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
385 
386                 @Override
387                 public void sessionClosed(final IOSession session) {
388                     queueClosedSession(session);
389                 }
390 
391             };
392 
393             InterestOpsCallback interestOpsCallback = null;
394             if (this.interestOpsQueueing) {
395                 interestOpsCallback = new InterestOpsCallback() {
396 
397                     @Override
398                     public void addInterestOps(final InterestOpEntry entry) {
399                         queueInterestOps(entry);
400                     }
401 
402                 };
403             }
404 
405             final IOSession session;
406             try {
407                 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
408                 int timeout = 0;
409                 try {
410                     timeout = channel.socket().getSoTimeout();
411                 } catch (final IOException ex) {
412                     // Very unlikely to happen and is not fatal
413                     // as the protocol layer is expected to overwrite
414                     // this value anyways
415                 }
416 
417                 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
418                 session.setSocketTimeout(timeout);
419             } catch (final CancelledKeyException ex) {
420                 continue;
421             }
422             try {
423                 this.sessions.add(session);
424                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
425                 if (sessionRequest != null) {
426                     sessionRequest.completed(session);
427                 }
428                 key.attach(session);
429                 sessionCreated(key, session);
430             } catch (final CancelledKeyException ex) {
431                 queueClosedSession(session);
432                 key.attach(null);
433             }
434         }
435     }
436 
437     private void processClosedSessions() {
438         IOSession session;
439         while ((session = this.closedSessions.poll()) != null) {
440             if (this.sessions.remove(session)) {
441                 try {
442                     sessionClosed(session);
443                 } catch (final CancelledKeyException ex) {
444                     // ignore and move on
445                 }
446             }
447         }
448     }
449 
450     private void processPendingInterestOps() {
451         // validity check
452         if (!this.interestOpsQueueing) {
453             return;
454         }
455         InterestOpEntry entry;
456         while ((entry = this.interestOpsQueue.poll()) != null) {
457             // obtain the operation's details
458             final SelectionKey key = entry.getSelectionKey();
459             final int eventMask = entry.getEventMask();
460             if (key.isValid()) {
461                 key.interestOps(eventMask);
462             }
463         }
464     }
465 
466     private boolean queueInterestOps(final InterestOpEntry entry) {
467         // validity checks
468         Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled");
469         if (entry == null) {
470             return false;
471         }
472 
473         // add this operation to the interestOps() queue
474         this.interestOpsQueue.add(entry);
475 
476         return true;
477     }
478 
479     /**
480      * Triggered to verify whether the I/O session associated with the
481      * given selection key has not timed out.
482      * <p>
483      * Super-classes can implement this method to react to the event.
484      *
485      * @param key the selection key.
486      * @param now current time as long value.
487      */
488     protected void timeoutCheck(final SelectionKey key, final long now) {
489         final IOSessionImpl session = (IOSessionImpl) key.attachment();
490         if (session != null) {
491             final int timeout = session.getSocketTimeout();
492             if (timeout > 0) {
493                 if (session.getLastAccessTime() + timeout < now) {
494                     sessionTimedOut(session);
495                 }
496             }
497         }
498     }
499 
500     /**
501      * Closes out all I/O sessions maintained by this I/O reactor.
502      */
503     protected void closeSessions() {
504         synchronized (this.sessions) {
505             for (final IOSession session : this.sessions) {
506                 session.close();
507             }
508         }
509     }
510 
511     /**
512      * Closes out all new channels pending registration with the selector of
513      * this I/O reactor.
514      * @throws IOReactorException - not thrown currently
515      */
516     protected void closeNewChannels() throws IOReactorException {
517         ChannelEntry entry;
518         while ((entry = this.newChannels.poll()) != null) {
519             final SessionRequestImpl sessionRequest = entry.getSessionRequest();
520             if (sessionRequest != null) {
521                 sessionRequest.cancel();
522             }
523             final SocketChannel channel = entry.getChannel();
524             try {
525                 channel.close();
526             } catch (final IOException ignore) {
527             }
528         }
529     }
530 
531     /**
532      * Closes out all active channels registered with the selector of
533      * this I/O reactor.
534      * @throws IOReactorException - not thrown currently
535      */
536     protected void closeActiveChannels() throws IOReactorException {
537         try {
538             final Set<SelectionKey> keys = this.selector.keys();
539             for (final SelectionKey key : keys) {
540                 final IOSession session = getSession(key);
541                 if (session != null) {
542                     session.close();
543                 }
544             }
545             this.selector.close();
546         } catch (final IOException ignore) {
547         }
548     }
549 
550     /**
551      * Attempts graceful shutdown of this I/O reactor.
552      */
553     public void gracefulShutdown() {
554         synchronized (this.statusMutex) {
555             if (this.status != IOReactorStatus.ACTIVE) {
556                 // Already shutting down
557                 return;
558             }
559             this.status = IOReactorStatus.SHUTTING_DOWN;
560         }
561         this.selector.wakeup();
562     }
563 
564     /**
565      * Attempts force-shutdown of this I/O reactor.
566      */
567     public void hardShutdown() throws IOReactorException {
568         synchronized (this.statusMutex) {
569             if (this.status == IOReactorStatus.SHUT_DOWN) {
570                 // Already shut down
571                 return;
572             }
573             this.status = IOReactorStatus.SHUT_DOWN;
574         }
575 
576         closeNewChannels();
577         closeActiveChannels();
578         processClosedSessions();
579     }
580 
581     /**
582      * Blocks for the given period of time in milliseconds awaiting
583      * the completion of the reactor shutdown.
584      *
585      * @param timeout the maximum wait time.
586      * @throws InterruptedException if interrupted.
587      */
588     public void awaitShutdown(final long timeout) throws InterruptedException {
589         synchronized (this.statusMutex) {
590             final long deadline = System.currentTimeMillis() + timeout;
591             long remaining = timeout;
592             while (this.status != IOReactorStatus.SHUT_DOWN) {
593                 this.statusMutex.wait(remaining);
594                 if (timeout > 0) {
595                     remaining = deadline - System.currentTimeMillis();
596                     if (remaining <= 0) {
597                         break;
598                     }
599                 }
600             }
601         }
602     }
603 
604     @Override
605     public void shutdown(final long gracePeriod) throws IOReactorException {
606         if (this.status != IOReactorStatus.INACTIVE) {
607             gracefulShutdown();
608             try {
609                 awaitShutdown(gracePeriod);
610             } catch (final InterruptedException ignore) {
611             }
612         }
613         if (this.status != IOReactorStatus.SHUT_DOWN) {
614             hardShutdown();
615         }
616     }
617 
618     @Override
619     public void shutdown() throws IOReactorException {
620         shutdown(1000);
621     }
622 
623 }