View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.channels.CancelledKeyException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ClosedSelectorException;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.Selector;
37  import java.nio.channels.SocketChannel;
38  import java.util.Collections;
39  import java.util.HashSet;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  
44  import org.apache.http.annotation.ThreadSafe;
45  import org.apache.http.nio.reactor.IOReactor;
46  import org.apache.http.nio.reactor.IOReactorException;
47  import org.apache.http.nio.reactor.IOReactorStatus;
48  import org.apache.http.nio.reactor.IOSession;
49  import org.apache.http.util.Args;
50  import org.apache.http.util.Asserts;
51  
52  /**
53   * Generic implementation of {@link IOReactor} that can used as a subclass
54   * for more specialized I/O reactors. It is based on a single {@link Selector}
55   * instance.
56   *
57   * @since 4.0
58   */
59  @ThreadSafe // public methods only
60  public abstract class AbstractIOReactor implements IOReactor {
61  
62      private volatile IOReactorStatus status;
63  
64      private final Object statusMutex;
65      private final long selectTimeout;
66      private final boolean interestOpsQueueing;
67      private final Selector selector;
68      private final Set<IOSession> sessions;
69      private final Queue<InterestOpEntry> interestOpsQueue;
70      private final Queue<IOSession> closedSessions;
71      private final Queue<ChannelEntry> newChannels;
72  
73      /**
74       * Creates new AbstractIOReactor instance.
75       *
76       * @param selectTimeout the select timeout.
77       * @throws IOReactorException in case if a non-recoverable I/O error.
78       */
79      public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
80          this(selectTimeout, false);
81      }
82  
83      /**
84       * Creates new AbstractIOReactor instance.
85       *
86       * @param selectTimeout the select timeout.
87       * @param interestOpsQueueing Ops queueing flag.
88       *
89       * @throws IOReactorException in case if a non-recoverable I/O error.
90       *
91       * @since 4.1
92       */
93      public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
94          super();
95          Args.positive(selectTimeout, "Select timeout");
96          this.selectTimeout = selectTimeout;
97          this.interestOpsQueueing = interestOpsQueueing;
98          this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
99          this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
100         this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
101         this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
102         try {
103             this.selector = Selector.open();
104         } catch (final IOException ex) {
105             throw new IOReactorException("Failure opening selector", ex);
106         }
107         this.statusMutex = new Object();
108         this.status = IOReactorStatus.INACTIVE;
109     }
110 
111     /**
112      * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
113      * <p>
114      * Super-classes can implement this method to react to the event.
115      *
116      * @param key the selection key.
117      */
118     protected abstract void acceptable(SelectionKey key);
119 
120     /**
121      * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
122      * <p>
123      * Super-classes can implement this method to react to the event.
124      *
125      * @param key the selection key.
126      */
127     protected abstract void connectable(SelectionKey key);
128 
129     /**
130      * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
131      * <p>
132      * Super-classes can implement this method to react to the event.
133      *
134      * @param key the selection key.
135      */
136     protected abstract void readable(SelectionKey key);
137 
138     /**
139      * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
140      * <p>
141      * Super-classes can implement this method to react to the event.
142      *
143      * @param key the selection key.
144      */
145     protected abstract void writable(SelectionKey key);
146 
147     /**
148      * Triggered to validate keys currently registered with the selector. This
149      * method is called after each I/O select loop.
150      * <p>
151      * Super-classes can implement this method to run validity checks on
152      * active sessions and include additional processing that needs to be
153      * executed after each I/O select loop.
154      *
155      * @param keys all selection keys registered with the selector.
156      */
157     protected abstract void validate(Set<SelectionKey> keys);
158 
159     /**
160      * Triggered when new session has been created.
161      * <p>
162      * Super-classes can implement this method to react to the event.
163      *
164      * @param key the selection key.
165      * @param session new I/O session.
166      */
167     protected void sessionCreated(final SelectionKey key, final IOSession session) {
168     }
169 
170     /**
171      * Triggered when a session has been closed.
172      * <p>
173      * Super-classes can implement this method to react to the event.
174      *
175      * @param session closed I/O session.
176      */
177     protected void sessionClosed(final IOSession session) {
178     }
179 
180     /**
181      * Triggered when a session has timed out.
182      * <p>
183      * Super-classes can implement this method to react to the event.
184      *
185      * @param session timed out I/O session.
186      */
187     protected void sessionTimedOut(final IOSession session) {
188     }
189 
190     /**
191      * Obtains {@link IOSession} instance associated with the given selection
192      * key.
193      *
194      * @param key the selection key.
195      * @return I/O session.
196      */
197     protected IOSession getSession(final SelectionKey key) {
198         return (IOSession) key.attachment();
199     }
200 
201     public IOReactorStatus getStatus() {
202         return this.status;
203     }
204 
205     /**
206      * Returns <code>true</code> if interest Ops queueing is enabled, <code>false</code> otherwise.
207      *
208      * @since 4.1
209      */
210     public boolean getInterestOpsQueueing() {
211         return this.interestOpsQueueing;
212     }
213 
214     /**
215      * Adds new channel entry. The channel will be asynchronously registered
216      * with the selector.
217      *
218      * @param channelEntry the channel entry.
219      */
220     public void addChannel(final ChannelEntry channelEntry) {
221         Args.notNull(channelEntry, "Channel entry");
222         this.newChannels.add(channelEntry);
223         this.selector.wakeup();
224     }
225 
226     /**
227      * Activates the I/O reactor. The I/O reactor will start reacting to
228      * I/O events and triggering notification methods.
229      * <p>
230      * This method will enter the infinite I/O select loop on
231      * the {@link Selector} instance associated with this I/O reactor.
232      * <p>
233      * The method will remain blocked unto the I/O reactor is shut down or the
234      * execution thread is interrupted.
235      *
236      * @see #acceptable(SelectionKey)
237      * @see #connectable(SelectionKey)
238      * @see #readable(SelectionKey)
239      * @see #writable(SelectionKey)
240      * @see #timeoutCheck(SelectionKey, long)
241      * @see #validate(Set)
242      * @see #sessionCreated(SelectionKey, IOSession)
243      * @see #sessionClosed(IOSession)
244      *
245      * @throws InterruptedIOException if the dispatch thread is interrupted.
246      * @throws IOReactorException in case if a non-recoverable I/O error.
247      */
248     protected void execute() throws InterruptedIOException, IOReactorException {
249         this.status = IOReactorStatus.ACTIVE;
250 
251         try {
252             for (;;) {
253 
254                 final int readyCount;
255                 try {
256                     readyCount = this.selector.select(this.selectTimeout);
257                 } catch (final InterruptedIOException ex) {
258                     throw ex;
259                 } catch (final IOException ex) {
260                     throw new IOReactorException("Unexpected selector failure", ex);
261                 }
262 
263                 if (this.status == IOReactorStatus.SHUT_DOWN) {
264                     // Hard shut down. Exit select loop immediately
265                     break;
266                 }
267 
268                 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
269                     // Graceful shutdown in process
270                     // Try to close things out nicely
271                     closeSessions();
272                     closeNewChannels();
273                 }
274 
275                 // Process selected I/O events
276                 if (readyCount > 0) {
277                     processEvents(this.selector.selectedKeys());
278                 }
279 
280                 // Validate active channels
281                 validate(this.selector.keys());
282 
283                 // Process closed sessions
284                 processClosedSessions();
285 
286                 // If active process new channels
287                 if (this.status == IOReactorStatus.ACTIVE) {
288                     processNewChannels();
289                 }
290 
291                 // Exit select loop if graceful shutdown has been completed
292                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
293                         && this.sessions.isEmpty()) {
294                     break;
295                 }
296 
297                 if (this.interestOpsQueueing) {
298                     // process all pending interestOps() operations
299                     processPendingInterestOps();
300                 }
301 
302             }
303 
304         } catch (final ClosedSelectorException ignore) {
305         } finally {
306             hardShutdown();
307             synchronized (this.statusMutex) {
308                 this.statusMutex.notifyAll();
309             }
310         }
311     }
312 
313     private void processEvents(final Set<SelectionKey> selectedKeys) {
314         for (final SelectionKey key : selectedKeys) {
315 
316             processEvent(key);
317 
318         }
319         selectedKeys.clear();
320     }
321 
322     /**
323      * Processes new event on the given selection key.
324      *
325      * @param key the selection key that triggered an event.
326      */
327     protected void processEvent(final SelectionKey key) {
328         final IOSessionImpl session = (IOSessionImpl) key.attachment();
329         try {
330             if (key.isAcceptable()) {
331                 acceptable(key);
332             }
333             if (key.isConnectable()) {
334                 connectable(key);
335             }
336             if (key.isReadable()) {
337                 session.resetLastRead();
338                 readable(key);
339             }
340             if (key.isWritable()) {
341                 session.resetLastWrite();
342                 writable(key);
343             }
344         } catch (final CancelledKeyException ex) {
345             queueClosedSession(session);
346             key.attach(null);
347         }
348     }
349 
350     /**
351      * Queues the given I/O session to be processed asynchronously as closed.
352      *
353      * @param session the closed I/O session.
354      */
355     protected void queueClosedSession(final IOSession session) {
356         if (session != null) {
357             this.closedSessions.add(session);
358         }
359     }
360 
361     private void processNewChannels() throws IOReactorException {
362         ChannelEntry entry;
363         while ((entry = this.newChannels.poll()) != null) {
364 
365             final SocketChannel channel;
366             final SelectionKey key;
367             try {
368                 channel = entry.getChannel();
369                 channel.configureBlocking(false);
370                 key = channel.register(this.selector, SelectionKey.OP_READ);
371             } catch (final ClosedChannelException ex) {
372                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
373                 if (sessionRequest != null) {
374                     sessionRequest.failed(ex);
375                 }
376                 return;
377 
378             } catch (final IOException ex) {
379                 throw new IOReactorException("Failure registering channel " +
380                         "with the selector", ex);
381             }
382 
383             final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
384 
385                 public void sessionClosed(final IOSession session) {
386                     queueClosedSession(session);
387                 }
388 
389             };
390 
391             InterestOpsCallback interestOpsCallback = null;
392             if (this.interestOpsQueueing) {
393                 interestOpsCallback = new InterestOpsCallback() {
394 
395                     public void addInterestOps(final InterestOpEntry entry) {
396                         queueInterestOps(entry);
397                     }
398 
399                 };
400             }
401 
402             final IOSession session;
403             try {
404                 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
405                 int timeout = 0;
406                 try {
407                     timeout = channel.socket().getSoTimeout();
408                 } catch (final IOException ex) {
409                     // Very unlikely to happen and is not fatal
410                     // as the protocol layer is expected to overwrite
411                     // this value anyways
412                 }
413 
414                 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
415                 session.setSocketTimeout(timeout);
416             } catch (final CancelledKeyException ex) {
417                 continue;
418             }
419             try {
420                 this.sessions.add(session);
421                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
422                 if (sessionRequest != null) {
423                     sessionRequest.completed(session);
424                 }
425                 key.attach(session);
426                 sessionCreated(key, session);
427             } catch (final CancelledKeyException ex) {
428                 queueClosedSession(session);
429                 key.attach(null);
430             }
431         }
432     }
433 
434     private void processClosedSessions() {
435         IOSession session;
436         while ((session = this.closedSessions.poll()) != null) {
437             if (this.sessions.remove(session)) {
438                 try {
439                     sessionClosed(session);
440                 } catch (final CancelledKeyException ex) {
441                     // ignore and move on
442                 }
443             }
444         }
445     }
446 
447     private void processPendingInterestOps() {
448         // validity check
449         if (!this.interestOpsQueueing) {
450             return;
451         }
452         InterestOpEntry entry;
453         while ((entry = this.interestOpsQueue.poll()) != null) {
454             // obtain the operation's details
455             final SelectionKey key = entry.getSelectionKey();
456             final int eventMask = entry.getEventMask();
457             if (key.isValid()) {
458                 key.interestOps(eventMask);
459             }
460         }
461     }
462 
463     private boolean queueInterestOps(final InterestOpEntry entry) {
464         // validity checks
465         Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled");
466         if (entry == null) {
467             return false;
468         }
469 
470         // add this operation to the interestOps() queue
471         this.interestOpsQueue.add(entry);
472 
473         return true;
474     }
475 
476     /**
477      * Triggered to verify whether the I/O session associated with the
478      * given selection key has not timed out.
479      * <p>
480      * Super-classes can implement this method to react to the event.
481      *
482      * @param key the selection key.
483      * @param now current time as long value.
484      */
485     protected void timeoutCheck(final SelectionKey key, final long now) {
486         final IOSessionImpl session = (IOSessionImpl) key.attachment();
487         if (session != null) {
488             final int timeout = session.getSocketTimeout();
489             if (timeout > 0) {
490                 if (session.getLastAccessTime() + timeout < now) {
491                     sessionTimedOut(session);
492                 }
493             }
494         }
495     }
496 
497     /**
498      * Closes out all I/O sessions maintained by this I/O reactor.
499      */
500     protected void closeSessions() {
501         synchronized (this.sessions) {
502             for (final IOSession session : this.sessions) {
503                 session.close();
504             }
505         }
506     }
507 
508     /**
509      * Closes out all new channels pending registration with the selector of
510      * this I/O reactor.
511      * @throws IOReactorException - not thrown currently
512      */
513     protected void closeNewChannels() throws IOReactorException {
514         ChannelEntry entry;
515         while ((entry = this.newChannels.poll()) != null) {
516             final SessionRequestImpl sessionRequest = entry.getSessionRequest();
517             if (sessionRequest != null) {
518                 sessionRequest.cancel();
519             }
520             final SocketChannel channel = entry.getChannel();
521             try {
522                 channel.close();
523             } catch (final IOException ignore) {
524             }
525         }
526     }
527 
528     /**
529      * Closes out all active channels registered with the selector of
530      * this I/O reactor.
531      * @throws IOReactorException - not thrown currently
532      */
533     protected void closeActiveChannels() throws IOReactorException {
534         try {
535             final Set<SelectionKey> keys = this.selector.keys();
536             for (final SelectionKey key : keys) {
537                 final IOSession session = getSession(key);
538                 if (session != null) {
539                     session.close();
540                 }
541             }
542             this.selector.close();
543         } catch (final IOException ignore) {
544         }
545     }
546 
547     /**
548      * Attempts graceful shutdown of this I/O reactor.
549      */
550     public void gracefulShutdown() {
551         synchronized (this.statusMutex) {
552             if (this.status != IOReactorStatus.ACTIVE) {
553                 // Already shutting down
554                 return;
555             }
556             this.status = IOReactorStatus.SHUTTING_DOWN;
557         }
558         this.selector.wakeup();
559     }
560 
561     /**
562      * Attempts force-shutdown of this I/O reactor.
563      */
564     public void hardShutdown() throws IOReactorException {
565         synchronized (this.statusMutex) {
566             if (this.status == IOReactorStatus.SHUT_DOWN) {
567                 // Already shut down
568                 return;
569             }
570             this.status = IOReactorStatus.SHUT_DOWN;
571         }
572 
573         closeNewChannels();
574         closeActiveChannels();
575         processClosedSessions();
576     }
577 
578     /**
579      * Blocks for the given period of time in milliseconds awaiting
580      * the completion of the reactor shutdown.
581      *
582      * @param timeout the maximum wait time.
583      * @throws InterruptedException if interrupted.
584      */
585     public void awaitShutdown(final long timeout) throws InterruptedException {
586         synchronized (this.statusMutex) {
587             final long deadline = System.currentTimeMillis() + timeout;
588             long remaining = timeout;
589             while (this.status != IOReactorStatus.SHUT_DOWN) {
590                 this.statusMutex.wait(remaining);
591                 if (timeout > 0) {
592                     remaining = deadline - System.currentTimeMillis();
593                     if (remaining <= 0) {
594                         break;
595                     }
596                 }
597             }
598         }
599     }
600 
601     public void shutdown(final long gracePeriod) throws IOReactorException {
602         if (this.status != IOReactorStatus.INACTIVE) {
603             gracefulShutdown();
604             try {
605                 awaitShutdown(gracePeriod);
606             } catch (final InterruptedException ignore) {
607             }
608         }
609         if (this.status != IOReactorStatus.SHUT_DOWN) {
610             hardShutdown();
611         }
612     }
613 
614     public void shutdown() throws IOReactorException {
615         shutdown(1000);
616     }
617 
618 }