View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.channels.CancelledKeyException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ClosedSelectorException;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.Selector;
37  import java.nio.channels.SocketChannel;
38  import java.util.Collections;
39  import java.util.HashSet;
40  import java.util.Iterator;
41  import java.util.Queue;
42  import java.util.Set;
43  import java.util.concurrent.ConcurrentLinkedQueue;
44  
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.nio.reactor.IOReactor;
47  import org.apache.http.nio.reactor.IOReactorException;
48  import org.apache.http.nio.reactor.IOReactorStatus;
49  import org.apache.http.nio.reactor.IOSession;
50  
51  /**
52   * Generic implementation of {@link IOReactor} that can used as a subclass
53   * for more specialized I/O reactors. It is based on a single {@link Selector}
54   * instance.
55   *
56   * @since 4.0
57   */
58  @ThreadSafe // public methods only
59  public abstract class AbstractIOReactor implements IOReactor {
60  
61      private volatile IOReactorStatus status;
62  
63      private final Object statusMutex;
64      private final long selectTimeout;
65      private final boolean interestOpsQueueing;
66      private final Selector selector;
67      private final Set<IOSession> sessions;
68      private final Queue<InterestOpEntry> interestOpsQueue;
69      private final Queue<IOSession> closedSessions;
70      private final Queue<ChannelEntry> newChannels;
71  
72      /**
73       * Creates new AbstractIOReactor instance.
74       *
75       * @param selectTimeout the select timeout.
76       * @throws IOReactorException in case if a non-recoverable I/O error.
77       */
78      public AbstractIOReactor(long selectTimeout) throws IOReactorException {
79          this(selectTimeout, false);
80      }
81  
82      /**
83       * Creates new AbstractIOReactor instance.
84       *
85       * @param selectTimeout the select timeout.
86       * @param interestOpsQueueing Ops queueing flag.
87       *
88       * @throws IOReactorException in case if a non-recoverable I/O error.
89       *
90       * @since 4.1
91       */
92      public AbstractIOReactor(long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
93          super();
94          if (selectTimeout <= 0) {
95              throw new IllegalArgumentException("Select timeout may not be negative or zero");
96          }
97          this.selectTimeout = selectTimeout;
98          this.interestOpsQueueing = interestOpsQueueing;
99          this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
100         this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
101         this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
102         this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
103         try {
104             this.selector = Selector.open();
105         } catch (IOException ex) {
106             throw new IOReactorException("Failure opening selector", ex);
107         }
108         this.statusMutex = new Object();
109         this.status = IOReactorStatus.INACTIVE;
110     }
111 
112     /**
113      * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
114      * <p>
115      * Super-classes can implement this method to react to the event.
116      *
117      * @param key the selection key.
118      */
119     protected abstract void acceptable(SelectionKey key);
120 
121     /**
122      * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
123      * <p>
124      * Super-classes can implement this method to react to the event.
125      *
126      * @param key the selection key.
127      */
128     protected abstract void connectable(SelectionKey key);
129 
130     /**
131      * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
132      * <p>
133      * Super-classes can implement this method to react to the event.
134      *
135      * @param key the selection key.
136      */
137     protected abstract void readable(SelectionKey key);
138 
139     /**
140      * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
141      * <p>
142      * Super-classes can implement this method to react to the event.
143      *
144      * @param key the selection key.
145      */
146     protected abstract void writable(SelectionKey key);
147 
148     /**
149      * Triggered to validate keys currently registered with the selector. This
150      * method is called after each I/O select loop.
151      * <p>
152      * Super-classes can implement this method to run validity checks on
153      * active sessions and include additional processing that needs to be
154      * executed after each I/O select loop.
155      *
156      * @param keys all selection keys registered with the selector.
157      */
158     protected abstract void validate(Set<SelectionKey> keys);
159 
160     /**
161      * Triggered when new session has been created.
162      * <p>
163      * Super-classes can implement this method to react to the event.
164      *
165      * @param key the selection key.
166      * @param session new I/O session.
167      */
168     protected void sessionCreated(final SelectionKey key, final IOSession session) {
169     }
170 
171     /**
172      * Triggered when a session has been closed.
173      * <p>
174      * Super-classes can implement this method to react to the event.
175      *
176      * @param session closed I/O session.
177      */
178     protected void sessionClosed(final IOSession session) {
179     }
180 
181     /**
182      * Triggered when a session has timed out.
183      * <p>
184      * Super-classes can implement this method to react to the event.
185      *
186      * @param session timed out I/O session.
187      */
188     protected void sessionTimedOut(final IOSession session) {
189     }
190 
191     /**
192      * Obtains {@link IOSession} instance associated with the given selection
193      * key.
194      *
195      * @param key the selection key.
196      * @return I/O session.
197      */
198     protected IOSession getSession(final SelectionKey key) {
199         return (IOSession) key.attachment();
200     }
201 
202     public IOReactorStatus getStatus() {
203         return this.status;
204     }
205 
206     /**
207      * Returns <code>true</code> if interest Ops queueing is enabled, <code>false</code> otherwise.
208      *
209      * @since 4.1
210      */
211     public boolean getInterestOpsQueueing() {
212         return this.interestOpsQueueing;
213     }
214 
215     /**
216      * Adds new channel entry. The channel will be asynchronously registered
217      * with the selector.
218      *
219      * @param channelEntry the channel entry.
220      */
221     public void addChannel(final ChannelEntry channelEntry) {
222         if (channelEntry == null) {
223             throw new IllegalArgumentException("Channel entry may not be null");
224         }
225         this.newChannels.add(channelEntry);
226         this.selector.wakeup();
227     }
228 
229     /**
230      * Activates the I/O reactor. The I/O reactor will start reacting to
231      * I/O events and triggering notification methods.
232      * <p>
233      * This method will enter the infinite I/O select loop on
234      * the {@link Selector} instance associated with this I/O reactor.
235      * <p>
236      * The method will remain blocked unto the I/O reactor is shut down or the
237      * execution thread is interrupted.
238      *
239      * @see #acceptable(SelectionKey)
240      * @see #connectable(SelectionKey)
241      * @see #readable(SelectionKey)
242      * @see #writable(SelectionKey)
243      * @see #timeoutCheck(SelectionKey, long)
244      * @see #validate(Set)
245      * @see #sessionCreated(SelectionKey, IOSession)
246      * @see #sessionClosed(IOSession)
247      *
248      * @throws InterruptedIOException if the dispatch thread is interrupted.
249      * @throws IOReactorException in case if a non-recoverable I/O error.
250      */
251     protected void execute() throws InterruptedIOException, IOReactorException {
252         this.status = IOReactorStatus.ACTIVE;
253 
254         try {
255             for (;;) {
256 
257                 int readyCount;
258                 try {
259                     readyCount = this.selector.select(this.selectTimeout);
260                 } catch (InterruptedIOException ex) {
261                     throw ex;
262                 } catch (IOException ex) {
263                     throw new IOReactorException("Unexpected selector failure", ex);
264                 }
265 
266                 if (this.status == IOReactorStatus.SHUT_DOWN) {
267                     // Hard shut down. Exit select loop immediately
268                     break;
269                 }
270 
271                 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
272                     // Graceful shutdown in process
273                     // Try to close things out nicely
274                     closeSessions();
275                     closeNewChannels();
276                 }
277 
278                 // Process selected I/O events
279                 if (readyCount > 0) {
280                     processEvents(this.selector.selectedKeys());
281                 }
282 
283                 // Validate active channels
284                 validate(this.selector.keys());
285 
286                 // Process closed sessions
287                 processClosedSessions();
288 
289                 // If active process new channels
290                 if (this.status == IOReactorStatus.ACTIVE) {
291                     processNewChannels();
292                 }
293 
294                 // Exit select loop if graceful shutdown has been completed
295                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
296                         && this.sessions.isEmpty()) {
297                     break;
298                 }
299 
300                 if (this.interestOpsQueueing) {
301                     // process all pending interestOps() operations
302                     processPendingInterestOps();
303                 }
304 
305             }
306 
307         } catch (ClosedSelectorException ex) {
308         } finally {
309             hardShutdown();
310             synchronized (this.statusMutex) {
311                 this.statusMutex.notifyAll();
312             }
313         }
314     }
315 
316     private void processEvents(final Set<SelectionKey> selectedKeys) {
317         for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
318 
319             SelectionKey key = it.next();
320             processEvent(key);
321 
322         }
323         selectedKeys.clear();
324     }
325 
326     /**
327      * Processes new event on the given selection key.
328      *
329      * @param key the selection key that triggered an event.
330      */
331     protected void processEvent(final SelectionKey key) {
332         IOSessionImpl session = (IOSessionImpl) key.attachment();
333         try {
334             if (key.isAcceptable()) {
335                 acceptable(key);
336             }
337             if (key.isConnectable()) {
338                 connectable(key);
339             }
340             if (key.isReadable()) {
341                 session.resetLastRead();
342                 readable(key);
343             }
344             if (key.isWritable()) {
345                 session.resetLastWrite();
346                 writable(key);
347             }
348         } catch (CancelledKeyException ex) {
349             queueClosedSession(session);
350             key.attach(null);
351         }
352     }
353 
354     /**
355      * Queues the given I/O session to be processed asynchronously as closed.
356      *
357      * @param session the closed I/O session.
358      */
359     protected void queueClosedSession(final IOSession session) {
360         if (session != null) {
361             this.closedSessions.add(session);
362         }
363     }
364 
365     private void processNewChannels() throws IOReactorException {
366         ChannelEntry entry;
367         while ((entry = this.newChannels.poll()) != null) {
368 
369             SocketChannel channel;
370             SelectionKey key;
371             try {
372                 channel = entry.getChannel();
373                 channel.configureBlocking(false);
374                 key = channel.register(this.selector, SelectionKey.OP_READ);
375             } catch (ClosedChannelException ex) {
376                 SessionRequestImpl sessionRequest = entry.getSessionRequest();
377                 if (sessionRequest != null) {
378                     sessionRequest.failed(ex);
379                 }
380                 return;
381 
382             } catch (IOException ex) {
383                 throw new IOReactorException("Failure registering channel " +
384                         "with the selector", ex);
385             }
386 
387             SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
388 
389                 public void sessionClosed(IOSession session) {
390                     queueClosedSession(session);
391                 }
392 
393             };
394 
395             InterestOpsCallback interestOpsCallback = null;
396             if (this.interestOpsQueueing) {
397                 interestOpsCallback = new InterestOpsCallback() {
398 
399                     public void addInterestOps(final InterestOpEntry entry) {
400                         queueInterestOps(entry);
401                     }
402 
403                 };
404             }
405 
406             IOSession session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
407 
408             int timeout = 0;
409             try {
410                 timeout = channel.socket().getSoTimeout();
411             } catch (IOException ex) {
412                 // Very unlikely to happen and is not fatal
413                 // as the protocol layer is expected to overwrite
414                 // this value anyways
415             }
416 
417             session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
418             session.setSocketTimeout(timeout);
419             this.sessions.add(session);
420 
421             try {
422                 SessionRequestImpl sessionRequest = entry.getSessionRequest();
423                 if (sessionRequest != null) {
424                     sessionRequest.completed(session);
425                 }
426                 key.attach(session);
427                 sessionCreated(key, session);
428             } catch (CancelledKeyException ex) {
429                 queueClosedSession(session);
430                 key.attach(null);
431             }
432         }
433     }
434 
435     private void processClosedSessions() {
436         IOSession session;
437         while ((session = this.closedSessions.poll()) != null) {
438             if (this.sessions.remove(session)) {
439                 try {
440                     sessionClosed(session);
441                 } catch (CancelledKeyException ex) {
442                     // ignore and move on
443                 }
444             }
445         }
446     }
447 
448     private void processPendingInterestOps() {
449         // validity check
450         if (!this.interestOpsQueueing) {
451             return;
452         }
453         InterestOpEntry entry;
454         while ((entry = this.interestOpsQueue.poll()) != null) {
455             // obtain the operation's details
456             SelectionKey key = entry.getSelectionKey();
457             int eventMask = entry.getEventMask();
458             if (key.isValid()) {
459                 key.interestOps(eventMask);
460             }
461         }
462     }
463 
464     private boolean queueInterestOps(final InterestOpEntry entry) {
465         // validity checks
466         if (!this.interestOpsQueueing) {
467             throw new IllegalStateException("Interest ops queueing not enabled");
468         }
469         if (entry == null) {
470             return false;
471         }
472 
473         // add this operation to the interestOps() queue
474         this.interestOpsQueue.add(entry);
475 
476         return true;
477     }
478 
479     /**
480      * Triggered to verify whether the I/O session associated with the
481      * given selection key has not timed out.
482      * <p>
483      * Super-classes can implement this method to react to the event.
484      *
485      * @param key the selection key.
486      * @param now current time as long value.
487      */
488     protected void timeoutCheck(final SelectionKey key, long now) {
489         IOSessionImpl session = (IOSessionImpl) key.attachment();
490         if (session != null) {
491             int timeout = session.getSocketTimeout();
492             if (timeout > 0) {
493                 if (session.getLastAccessTime() + timeout < now) {
494                     sessionTimedOut(session);
495                 }
496             }
497         }
498     }
499 
500     /**
501      * Closes out all I/O sessions maintained by this I/O reactor.
502      */
503     protected void closeSessions() {
504         synchronized (this.sessions) {
505             for (Iterator<IOSession> it = this.sessions.iterator(); it.hasNext(); ) {
506                 IOSession session = it.next();
507                 session.close();
508             }
509         }
510     }
511 
512     /**
513      * Closes out all new channels pending registration with the selector of
514      * this I/O reactor.
515      * @throws IOReactorException - not thrown currently
516      */
517     protected void closeNewChannels() throws IOReactorException {
518         ChannelEntry entry;
519         while ((entry = this.newChannels.poll()) != null) {
520             SessionRequestImpl sessionRequest = entry.getSessionRequest();
521             if (sessionRequest != null) {
522                 sessionRequest.cancel();
523             }
524             SocketChannel channel = entry.getChannel();
525             try {
526                 channel.close();
527             } catch (IOException ignore) {
528             }
529         }
530     }
531 
532     /**
533      * Closes out all active channels registered with the selector of
534      * this I/O reactor.
535      * @throws IOReactorException - not thrown currently
536      */
537     protected void closeActiveChannels() throws IOReactorException {
538         try {
539             Set<SelectionKey> keys = this.selector.keys();
540             for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
541                 SelectionKey key = it.next();
542                 IOSession session = getSession(key);
543                 if (session != null) {
544                     session.close();
545                 }
546             }
547             this.selector.close();
548         } catch (IOException ignore) {
549         }
550     }
551 
552     /**
553      * Attempts graceful shutdown of this I/O reactor.
554      */
555     public void gracefulShutdown() {
556         synchronized (this.statusMutex) {
557             if (this.status != IOReactorStatus.ACTIVE) {
558                 // Already shutting down
559                 return;
560             }
561             this.status = IOReactorStatus.SHUTTING_DOWN;
562         }
563         this.selector.wakeup();
564     }
565 
566     /**
567      * Attempts force-shutdown of this I/O reactor.
568      */
569     public void hardShutdown() throws IOReactorException {
570         synchronized (this.statusMutex) {
571             if (this.status == IOReactorStatus.SHUT_DOWN) {
572                 // Already shut down
573                 return;
574             }
575             this.status = IOReactorStatus.SHUT_DOWN;
576         }
577 
578         closeNewChannels();
579         closeActiveChannels();
580         processClosedSessions();
581     }
582 
583     /**
584      * Blocks for the given period of time in milliseconds awaiting
585      * the completion of the reactor shutdown.
586      *
587      * @param timeout the maximum wait time.
588      * @throws InterruptedException if interrupted.
589      */
590     public void awaitShutdown(long timeout) throws InterruptedException {
591         synchronized (this.statusMutex) {
592             long deadline = System.currentTimeMillis() + timeout;
593             long remaining = timeout;
594             while (this.status != IOReactorStatus.SHUT_DOWN) {
595                 this.statusMutex.wait(remaining);
596                 if (timeout > 0) {
597                     remaining = deadline - System.currentTimeMillis();
598                     if (remaining <= 0) {
599                         break;
600                     }
601                 }
602             }
603         }
604     }
605 
606     public void shutdown(long gracePeriod) throws IOReactorException {
607         if (this.status != IOReactorStatus.INACTIVE) {
608             gracefulShutdown();
609             try {
610                 awaitShutdown(gracePeriod);
611             } catch (InterruptedException ignore) {
612             }
613         }
614         if (this.status != IOReactorStatus.SHUT_DOWN) {
615             hardShutdown();
616         }
617     }
618 
619     public void shutdown() throws IOReactorException {
620         shutdown(1000);
621     }
622 
623 }