View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.channels.CancelledKeyException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ClosedSelectorException;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.Selector;
37  import java.nio.channels.SocketChannel;
38  import java.util.Collections;
39  import java.util.HashSet;
40  import java.util.Queue;
41  import java.util.Set;
42  import java.util.concurrent.ConcurrentLinkedQueue;
43  
44  import org.apache.http.nio.reactor.IOReactor;
45  import org.apache.http.nio.reactor.IOReactorException;
46  import org.apache.http.nio.reactor.IOReactorStatus;
47  import org.apache.http.nio.reactor.IOSession;
48  import org.apache.http.util.Args;
49  import org.apache.http.util.Asserts;
50  
51  /**
52   * Generic implementation of {@link IOReactor} that can used as a subclass
53   * for more specialized I/O reactors. It is based on a single {@link Selector}
54   * instance.
55   *
56   * @since 4.0
57   */
58  public abstract class AbstractIOReactor implements IOReactor {
59  
60      private volatile IOReactorStatus status;
61  
62      private final Object statusMutex;
63      private final long selectTimeout;
64      private final boolean interestOpsQueueing;
65      private final Selector selector;
66      private final Set<IOSession> sessions;
67      private final Queue<InterestOpEntry> interestOpsQueue;
68      private final Queue<IOSession> closedSessions;
69      private final Queue<ChannelEntry> newChannels;
70  
71      /**
72       * Creates new AbstractIOReactor instance.
73       *
74       * @param selectTimeout the select timeout.
75       * @throws IOReactorException in case if a non-recoverable I/O error.
76       */
77      public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
78          this(selectTimeout, false);
79      }
80  
81      /**
82       * Creates new AbstractIOReactor instance.
83       *
84       * @param selectTimeout the select timeout.
85       * @param interestOpsQueueing Ops queueing flag.
86       *
87       * @throws IOReactorException in case if a non-recoverable I/O error.
88       *
89       * @since 4.1
90       */
91      public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
92          super();
93          Args.positive(selectTimeout, "Select timeout");
94          this.selectTimeout = selectTimeout;
95          this.interestOpsQueueing = interestOpsQueueing;
96          this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
97          this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
98          this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
99          this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
100         try {
101             this.selector = Selector.open();
102         } catch (final IOException ex) {
103             throw new IOReactorException("Failure opening selector", ex);
104         }
105         this.statusMutex = new Object();
106         this.status = IOReactorStatus.INACTIVE;
107     }
108 
109     /**
110      * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
111      * <p>
112      * Super-classes can implement this method to react to the event.
113      *
114      * @param key the selection key.
115      */
116     protected abstract void acceptable(SelectionKey key);
117 
118     /**
119      * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
120      * <p>
121      * Super-classes can implement this method to react to the event.
122      *
123      * @param key the selection key.
124      */
125     protected abstract void connectable(SelectionKey key);
126 
127     /**
128      * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
129      * <p>
130      * Super-classes can implement this method to react to the event.
131      *
132      * @param key the selection key.
133      */
134     protected abstract void readable(SelectionKey key);
135 
136     /**
137      * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
138      * <p>
139      * Super-classes can implement this method to react to the event.
140      *
141      * @param key the selection key.
142      */
143     protected abstract void writable(SelectionKey key);
144 
145     /**
146      * Triggered to validate keys currently registered with the selector. This
147      * method is called after each I/O select loop.
148      * <p>
149      * Super-classes can implement this method to run validity checks on
150      * active sessions and include additional processing that needs to be
151      * executed after each I/O select loop.
152      *
153      * @param keys all selection keys registered with the selector.
154      */
155     protected abstract void validate(Set<SelectionKey> keys);
156 
157     /**
158      * Triggered when new session has been created.
159      * <p>
160      * Super-classes can implement this method to react to the event.
161      *
162      * @param key the selection key.
163      * @param session new I/O session.
164      */
165     protected void sessionCreated(final SelectionKey key, final IOSession session) {
166     }
167 
168     /**
169      * Triggered when a session has been closed.
170      * <p>
171      * Super-classes can implement this method to react to the event.
172      *
173      * @param session closed I/O session.
174      */
175     protected void sessionClosed(final IOSession session) {
176     }
177 
178     /**
179      * Triggered when a session has timed out.
180      * <p>
181      * Super-classes can implement this method to react to the event.
182      *
183      * @param session timed out I/O session.
184      */
185     protected void sessionTimedOut(final IOSession session) {
186     }
187 
188     /**
189      * Obtains {@link IOSession} instance associated with the given selection
190      * key.
191      *
192      * @param key the selection key.
193      * @return I/O session.
194      */
195     protected IOSession getSession(final SelectionKey key) {
196         return (IOSession) key.attachment();
197     }
198 
199     @Override
200     public IOReactorStatus getStatus() {
201         return this.status;
202     }
203 
204     /**
205      * Returns {@code true} if interest Ops queueing is enabled, {@code false} otherwise.
206      *
207      * @since 4.1
208      */
209     public boolean getInterestOpsQueueing() {
210         return this.interestOpsQueueing;
211     }
212 
213     /**
214      * Adds new channel entry. The channel will be asynchronously registered
215      * with the selector.
216      *
217      * @param channelEntry the channel entry.
218      */
219     public void addChannel(final ChannelEntry channelEntry) {
220         Args.notNull(channelEntry, "Channel entry");
221         this.newChannels.add(channelEntry);
222         this.selector.wakeup();
223     }
224 
225     /**
226      * Activates the I/O reactor. The I/O reactor will start reacting to
227      * I/O events and triggering notification methods.
228      * <p>
229      * This method will enter the infinite I/O select loop on
230      * the {@link Selector} instance associated with this I/O reactor.
231      * <p>
232      * The method will remain blocked unto the I/O reactor is shut down or the
233      * execution thread is interrupted.
234      *
235      * @see #acceptable(SelectionKey)
236      * @see #connectable(SelectionKey)
237      * @see #readable(SelectionKey)
238      * @see #writable(SelectionKey)
239      * @see #timeoutCheck(SelectionKey, long)
240      * @see #validate(Set)
241      * @see #sessionCreated(SelectionKey, IOSession)
242      * @see #sessionClosed(IOSession)
243      *
244      * @throws InterruptedIOException if the dispatch thread is interrupted.
245      * @throws IOReactorException in case if a non-recoverable I/O error.
246      */
247     protected void execute() throws InterruptedIOException, IOReactorException {
248         this.status = IOReactorStatus.ACTIVE;
249 
250         try {
251             for (;;) {
252 
253                 final int readyCount;
254                 try {
255                     readyCount = this.selector.select(this.selectTimeout);
256                 } catch (final InterruptedIOException ex) {
257                     throw ex;
258                 } catch (final IOException ex) {
259                     throw new IOReactorException("Unexpected selector failure", ex);
260                 }
261 
262                 if (this.status == IOReactorStatus.SHUT_DOWN) {
263                     // Hard shut down. Exit select loop immediately
264                     break;
265                 }
266 
267                 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
268                     // Graceful shutdown in process
269                     // Try to close things out nicely
270                     closeSessions();
271                     closeNewChannels();
272                 }
273 
274                 // Process selected I/O events
275                 if (readyCount > 0) {
276                     processEvents(this.selector.selectedKeys());
277                 }
278 
279                 // Validate active channels
280                 validate(this.selector.keys());
281 
282                 // Process closed sessions
283                 processClosedSessions();
284 
285                 // If active process new channels
286                 if (this.status == IOReactorStatus.ACTIVE) {
287                     processNewChannels();
288                 }
289 
290                 // Exit select loop if graceful shutdown has been completed
291                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
292                         && this.sessions.isEmpty()) {
293                     break;
294                 }
295 
296                 if (this.interestOpsQueueing) {
297                     // process all pending interestOps() operations
298                     processPendingInterestOps();
299                 }
300 
301             }
302 
303         } catch (final ClosedSelectorException ignore) {
304         } finally {
305             hardShutdown();
306             synchronized (this.statusMutex) {
307                 this.statusMutex.notifyAll();
308             }
309         }
310     }
311 
312     private void processEvents(final Set<SelectionKey> selectedKeys) {
313         for (final SelectionKey key : selectedKeys) {
314 
315             processEvent(key);
316 
317         }
318         selectedKeys.clear();
319     }
320 
321     /**
322      * Processes new event on the given selection key.
323      *
324      * @param key the selection key that triggered an event.
325      */
326     protected void processEvent(final SelectionKey key) {
327         final IOSessionImpl session = (IOSessionImpl) key.attachment();
328         try {
329             if (key.isAcceptable()) {
330                 acceptable(key);
331             }
332             if (key.isConnectable()) {
333                 connectable(key);
334             }
335             if (key.isReadable()) {
336                 session.resetLastRead();
337                 readable(key);
338             }
339             if (key.isWritable()) {
340                 session.resetLastWrite();
341                 writable(key);
342             }
343         } catch (final CancelledKeyException ex) {
344             queueClosedSession(session);
345             key.attach(null);
346         }
347     }
348 
349     /**
350      * Queues the given I/O session to be processed asynchronously as closed.
351      *
352      * @param session the closed I/O session.
353      */
354     protected void queueClosedSession(final IOSession session) {
355         if (session != null) {
356             this.closedSessions.add(session);
357         }
358     }
359 
360     private void processNewChannels() throws IOReactorException {
361         ChannelEntry entry;
362         while ((entry = this.newChannels.poll()) != null) {
363 
364             final SocketChannel channel;
365             final SelectionKey key;
366             try {
367                 channel = entry.getChannel();
368                 channel.configureBlocking(false);
369                 key = channel.register(this.selector, SelectionKey.OP_READ);
370             } catch (final ClosedChannelException ex) {
371                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
372                 if (sessionRequest != null) {
373                     sessionRequest.failed(ex);
374                 }
375                 return;
376 
377             } catch (final IOException ex) {
378                 throw new IOReactorException("Failure registering channel " +
379                         "with the selector", ex);
380             }
381 
382             final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
383 
384                 @Override
385                 public void sessionClosed(final IOSession session) {
386                     queueClosedSession(session);
387                 }
388 
389             };
390 
391             InterestOpsCallback interestOpsCallback = null;
392             if (this.interestOpsQueueing) {
393                 interestOpsCallback = new InterestOpsCallback() {
394 
395                     @Override
396                     public void addInterestOps(final InterestOpEntry entry) {
397                         queueInterestOps(entry);
398                     }
399 
400                 };
401             }
402 
403             final IOSession session;
404             try {
405                 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
406                 int timeout = 0;
407                 try {
408                     timeout = channel.socket().getSoTimeout();
409                 } catch (final IOException ex) {
410                     // Very unlikely to happen and is not fatal
411                     // as the protocol layer is expected to overwrite
412                     // this value anyways
413                 }
414 
415                 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
416                 session.setSocketTimeout(timeout);
417             } catch (final CancelledKeyException ex) {
418                 continue;
419             }
420             try {
421                 this.sessions.add(session);
422                 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
423                 if (sessionRequest != null) {
424                     sessionRequest.completed(session);
425                 }
426                 key.attach(session);
427                 sessionCreated(key, session);
428             } catch (final CancelledKeyException ex) {
429                 queueClosedSession(session);
430                 key.attach(null);
431             }
432         }
433     }
434 
435     private void processClosedSessions() {
436         IOSession session;
437         while ((session = this.closedSessions.poll()) != null) {
438             if (this.sessions.remove(session)) {
439                 try {
440                     sessionClosed(session);
441                 } catch (final CancelledKeyException ex) {
442                     // ignore and move on
443                 }
444             }
445         }
446     }
447 
448     private void processPendingInterestOps() {
449         // validity check
450         if (!this.interestOpsQueueing) {
451             return;
452         }
453         InterestOpEntry entry;
454         while ((entry = this.interestOpsQueue.poll()) != null) {
455             // obtain the operation's details
456             final SelectionKey key = entry.getSelectionKey();
457             final int eventMask = entry.getEventMask();
458             if (key.isValid()) {
459                 key.interestOps(eventMask);
460             }
461         }
462     }
463 
464     private boolean queueInterestOps(final InterestOpEntry entry) {
465         // validity checks
466         Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled");
467         if (entry == null) {
468             return false;
469         }
470 
471         // add this operation to the interestOps() queue
472         this.interestOpsQueue.add(entry);
473 
474         return true;
475     }
476 
477     /**
478      * Triggered to verify whether the I/O session associated with the
479      * given selection key has not timed out.
480      * <p>
481      * Super-classes can implement this method to react to the event.
482      *
483      * @param key the selection key.
484      * @param now current time as long value.
485      */
486     protected void timeoutCheck(final SelectionKey key, final long now) {
487         final IOSessionImpl session = (IOSessionImpl) key.attachment();
488         if (session != null) {
489             final int timeout = session.getSocketTimeout();
490             if (timeout > 0) {
491                 if (session.getLastAccessTime() + timeout < now) {
492                     sessionTimedOut(session);
493                 }
494             }
495         }
496     }
497 
498     /**
499      * Closes out all I/O sessions maintained by this I/O reactor.
500      */
501     protected void closeSessions() {
502         synchronized (this.sessions) {
503             for (final IOSession session : this.sessions) {
504                 session.close();
505             }
506         }
507     }
508 
509     /**
510      * Closes out all new channels pending registration with the selector of
511      * this I/O reactor.
512      * @throws IOReactorException - not thrown currently
513      */
514     protected void closeNewChannels() throws IOReactorException {
515         ChannelEntry entry;
516         while ((entry = this.newChannels.poll()) != null) {
517             final SessionRequestImpl sessionRequest = entry.getSessionRequest();
518             if (sessionRequest != null) {
519                 sessionRequest.cancel();
520             }
521             final SocketChannel channel = entry.getChannel();
522             try {
523                 channel.close();
524             } catch (final IOException ignore) {
525             }
526         }
527     }
528 
529     /**
530      * Closes out all active channels registered with the selector of
531      * this I/O reactor.
532      * @throws IOReactorException - not thrown currently
533      */
534     protected void closeActiveChannels() throws IOReactorException {
535         try {
536             final Set<SelectionKey> keys = this.selector.keys();
537             for (final SelectionKey key : keys) {
538                 final IOSession session = getSession(key);
539                 if (session != null) {
540                     session.close();
541                 }
542             }
543             this.selector.close();
544         } catch (final IOException ignore) {
545         }
546     }
547 
548     /**
549      * Attempts graceful shutdown of this I/O reactor.
550      */
551     public void gracefulShutdown() {
552         synchronized (this.statusMutex) {
553             if (this.status != IOReactorStatus.ACTIVE) {
554                 // Already shutting down
555                 return;
556             }
557             this.status = IOReactorStatus.SHUTTING_DOWN;
558         }
559         this.selector.wakeup();
560     }
561 
562     /**
563      * Attempts force-shutdown of this I/O reactor.
564      */
565     public void hardShutdown() throws IOReactorException {
566         synchronized (this.statusMutex) {
567             if (this.status == IOReactorStatus.SHUT_DOWN) {
568                 // Already shut down
569                 return;
570             }
571             this.status = IOReactorStatus.SHUT_DOWN;
572         }
573 
574         closeNewChannels();
575         closeActiveChannels();
576         processClosedSessions();
577     }
578 
579     /**
580      * Blocks for the given period of time in milliseconds awaiting
581      * the completion of the reactor shutdown.
582      *
583      * @param timeout the maximum wait time.
584      * @throws InterruptedException if interrupted.
585      */
586     public void awaitShutdown(final long timeout) throws InterruptedException {
587         synchronized (this.statusMutex) {
588             final long deadline = System.currentTimeMillis() + timeout;
589             long remaining = timeout;
590             while (this.status != IOReactorStatus.SHUT_DOWN) {
591                 this.statusMutex.wait(remaining);
592                 if (timeout > 0) {
593                     remaining = deadline - System.currentTimeMillis();
594                     if (remaining <= 0) {
595                         break;
596                     }
597                 }
598             }
599         }
600     }
601 
602     @Override
603     public void shutdown(final long gracePeriod) throws IOReactorException {
604         if (this.status != IOReactorStatus.INACTIVE) {
605             gracefulShutdown();
606             try {
607                 awaitShutdown(gracePeriod);
608             } catch (final InterruptedException ignore) {
609             }
610         }
611         if (this.status != IOReactorStatus.SHUT_DOWN) {
612             hardShutdown();
613         }
614     }
615 
616     @Override
617     public void shutdown() throws IOReactorException {
618         shutdown(1000);
619     }
620 
621 }