View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.IOException;
31  import java.io.InterruptedIOException;
32  import java.nio.channels.CancelledKeyException;
33  import java.nio.channels.ClosedChannelException;
34  import java.nio.channels.ClosedSelectorException;
35  import java.nio.channels.SelectionKey;
36  import java.nio.channels.Selector;
37  import java.nio.channels.SocketChannel;
38  import java.util.Collections;
39  import java.util.HashSet;
40  import java.util.Iterator;
41  import java.util.Queue;
42  import java.util.Set;
43  import java.util.concurrent.ConcurrentLinkedQueue;
44  
45  import org.apache.http.annotation.ThreadSafe;
46  import org.apache.http.nio.reactor.IOReactor;
47  import org.apache.http.nio.reactor.IOReactorException;
48  import org.apache.http.nio.reactor.IOReactorStatus;
49  import org.apache.http.nio.reactor.IOSession;
50  
51  /**
52   * Generic implementation of {@link IOReactor} that can used as a subclass
53   * for more specialized I/O reactors. It is based on a single {@link Selector}
54   * instance.
55   *
56   * @since 4.0
57   */
58  @ThreadSafe // public methods only
59  public abstract class AbstractIOReactor implements IOReactor {
60  
61      private volatile IOReactorStatus status;
62  
63      private final Object statusMutex;
64      private final long selectTimeout;
65      private final boolean interestOpsQueueing;
66      private final Selector selector;
67      private final Set<IOSession> sessions;
68      private final Queue<InterestOpEntry> interestOpsQueue;
69      private final Queue<IOSession> closedSessions;
70      private final Queue<ChannelEntry> newChannels;
71  
72      /**
73       * Creates new AbstractIOReactor instance.
74       *
75       * @param selectTimeout the select timeout.
76       * @throws IOReactorException in case if a non-recoverable I/O error.
77       */
78      public AbstractIOReactor(long selectTimeout) throws IOReactorException {
79          this(selectTimeout, false);
80      }
81  
82      /**
83       * Creates new AbstractIOReactor instance.
84       *
85       * @param selectTimeout the select timeout.
86       * @param interestOpsQueueing Ops queueing flag.
87       *
88       * @throws IOReactorException in case if a non-recoverable I/O error.
89       *
90       * @since 4.1
91       */
92      public AbstractIOReactor(long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
93          super();
94          if (selectTimeout <= 0) {
95              throw new IllegalArgumentException("Select timeout may not be negative or zero");
96          }
97          this.selectTimeout = selectTimeout;
98          this.interestOpsQueueing = interestOpsQueueing;
99          this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
100         this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
101         this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
102         this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
103         try {
104             this.selector = Selector.open();
105         } catch (IOException ex) {
106             throw new IOReactorException("Failure opening selector", ex);
107         }
108         this.statusMutex = new Object();
109         this.status = IOReactorStatus.INACTIVE;
110     }
111 
112     /**
113      * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness.
114      * <p>
115      * Super-classes can implement this method to react to the event.
116      *
117      * @param key the selection key.
118      */
119     protected abstract void acceptable(SelectionKey key);
120 
121     /**
122      * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness.
123      * <p>
124      * Super-classes can implement this method to react to the event.
125      *
126      * @param key the selection key.
127      */
128     protected abstract void connectable(SelectionKey key);
129 
130     /**
131      * Triggered when the key signals {@link SelectionKey#OP_READ} readiness.
132      * <p>
133      * Super-classes can implement this method to react to the event.
134      *
135      * @param key the selection key.
136      */
137     protected abstract void readable(SelectionKey key);
138 
139     /**
140      * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness.
141      * <p>
142      * Super-classes can implement this method to react to the event.
143      *
144      * @param key the selection key.
145      */
146     protected abstract void writable(SelectionKey key);
147 
148     /**
149      * Triggered to validate keys currently registered with the selector. This
150      * method is called after each I/O select loop.
151      * <p>
152      * Super-classes can implement this method to run validity checks on
153      * active sessions and include additional processing that needs to be
154      * executed after each I/O select loop.
155      *
156      * @param keys all selection keys registered with the selector.
157      */
158     protected abstract void validate(Set<SelectionKey> keys);
159 
160     /**
161      * Triggered when new session has been created.
162      * <p>
163      * Super-classes can implement this method to react to the event.
164      *
165      * @param key the selection key.
166      * @param session new I/O session.
167      */
168     protected void sessionCreated(final SelectionKey key, final IOSession session) {
169     }
170 
171     /**
172      * Triggered when a session has been closed.
173      * <p>
174      * Super-classes can implement this method to react to the event.
175      *
176      * @param session closed I/O session.
177      */
178     protected void sessionClosed(final IOSession session) {
179     }
180 
181     /**
182      * Triggered when a session has timed out.
183      * <p>
184      * Super-classes can implement this method to react to the event.
185      *
186      * @param session timed out I/O session.
187      */
188     protected void sessionTimedOut(final IOSession session) {
189     }
190 
191     /**
192      * Obtains {@link IOSession} instance associated with the given selection
193      * key.
194      *
195      * @param key the selection key.
196      * @return I/O session.
197      */
198     protected IOSession getSession(final SelectionKey key) {
199         return (IOSession) key.attachment();
200     }
201 
202     public IOReactorStatus getStatus() {
203         return this.status;
204     }
205 
206     /**
207      * Returns <code>true</code> if interest Ops queueing is enabled, <code>false</code> otherwise.
208      *
209      * @since 4.1
210      */
211     public boolean getInterestOpsQueueing() {
212         return this.interestOpsQueueing;
213     }
214 
215     /**
216      * Adds new channel entry. The channel will be asynchronously registered
217      * with the selector.
218      *
219      * @param channelEntry the channel entry.
220      */
221     public void addChannel(final ChannelEntry channelEntry) {
222         if (channelEntry == null) {
223             throw new IllegalArgumentException("Channel entry may not be null");
224         }
225         this.newChannels.add(channelEntry);
226         this.selector.wakeup();
227     }
228 
229     /**
230      * Activates the I/O reactor. The I/O reactor will start reacting to
231      * I/O events and triggering notification methods.
232      * <p>
233      * This method will enter the infinite I/O select loop on
234      * the {@link Selector} instance associated with this I/O reactor.
235      * <p>
236      * The method will remain blocked unto the I/O reactor is shut down or the
237      * execution thread is interrupted.
238      *
239      * @see #acceptable(SelectionKey)
240      * @see #connectable(SelectionKey)
241      * @see #readable(SelectionKey)
242      * @see #writable(SelectionKey)
243      * @see #timeoutCheck(SelectionKey, long)
244      * @see #validate(Set)
245      * @see #sessionCreated(SelectionKey, IOSession)
246      * @see #sessionClosed(IOSession)
247      *
248      * @throws InterruptedIOException if the dispatch thread is interrupted.
249      * @throws IOReactorException in case if a non-recoverable I/O error.
250      */
251     protected void execute() throws InterruptedIOException, IOReactorException {
252         this.status = IOReactorStatus.ACTIVE;
253 
254         try {
255             for (;;) {
256 
257                 int readyCount;
258                 try {
259                     readyCount = this.selector.select(this.selectTimeout);
260                 } catch (InterruptedIOException ex) {
261                     throw ex;
262                 } catch (IOException ex) {
263                     throw new IOReactorException("Unexpected selector failure", ex);
264                 }
265 
266                 if (this.status == IOReactorStatus.SHUT_DOWN) {
267                     // Hard shut down. Exit select loop immediately
268                     break;
269                 }
270 
271                 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
272                     // Graceful shutdown in process
273                     // Try to close things out nicely
274                     closeSessions();
275                     closeNewChannels();
276                 }
277 
278                 // Process selected I/O events
279                 if (readyCount > 0) {
280                     processEvents(this.selector.selectedKeys());
281                 }
282 
283                 // Validate active channels
284                 validate(this.selector.keys());
285 
286                 // Process closed sessions
287                 processClosedSessions();
288 
289                 // If active process new channels
290                 if (this.status == IOReactorStatus.ACTIVE) {
291                     processNewChannels();
292                 }
293 
294                 // Exit select loop if graceful shutdown has been completed
295                 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
296                         && this.sessions.isEmpty()) {
297                     break;
298                 }
299 
300                 if (this.interestOpsQueueing) {
301                     // process all pending interestOps() operations
302                     processPendingInterestOps();
303                 }
304 
305             }
306 
307         } catch (ClosedSelectorException ex) {
308         } finally {
309             hardShutdown();
310             synchronized (this.statusMutex) {
311                 this.statusMutex.notifyAll();
312             }
313         }
314     }
315 
316     private void processEvents(final Set<SelectionKey> selectedKeys) {
317         for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
318 
319             SelectionKey key = it.next();
320             processEvent(key);
321 
322         }
323         selectedKeys.clear();
324     }
325 
326     /**
327      * Processes new event on the given selection key.
328      *
329      * @param key the selection key that triggered an event.
330      */
331     protected void processEvent(final SelectionKey key) {
332         IOSessionImpl session = (IOSessionImpl) key.attachment();
333         try {
334             if (key.isAcceptable()) {
335                 acceptable(key);
336             }
337             if (key.isConnectable()) {
338                 connectable(key);
339             }
340             if (key.isReadable()) {
341                 session.resetLastRead();
342                 readable(key);
343             }
344             if (key.isWritable()) {
345                 session.resetLastWrite();
346                 writable(key);
347             }
348         } catch (CancelledKeyException ex) {
349             queueClosedSession(session);
350             key.attach(null);
351         }
352     }
353 
354     /**
355      * Queues the given I/O session to be processed asynchronously as closed.
356      *
357      * @param session the closed I/O session.
358      */
359     protected void queueClosedSession(final IOSession session) {
360         if (session != null) {
361             this.closedSessions.add(session);
362         }
363     }
364 
365     private void processNewChannels() throws IOReactorException {
366         ChannelEntry entry;
367         while ((entry = this.newChannels.poll()) != null) {
368 
369             SocketChannel channel;
370             SelectionKey key;
371             try {
372                 channel = entry.getChannel();
373                 channel.configureBlocking(false);
374                 key = channel.register(this.selector, SelectionKey.OP_READ);
375             } catch (ClosedChannelException ex) {
376                 SessionRequestImpl sessionRequest = entry.getSessionRequest();
377                 if (sessionRequest != null) {
378                     sessionRequest.failed(ex);
379                 }
380                 return;
381 
382             } catch (IOException ex) {
383                 throw new IOReactorException("Failure registering channel " +
384                         "with the selector", ex);
385             }
386 
387             SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
388 
389                 public void sessionClosed(IOSession session) {
390                     queueClosedSession(session);
391                 }
392 
393             };
394 
395             InterestOpsCallback interestOpsCallback = null;
396             if (this.interestOpsQueueing) {
397                 interestOpsCallback = new InterestOpsCallback() {
398 
399                     public void addInterestOps(final InterestOpEntry entry) {
400                         queueInterestOps(entry);
401                     }
402 
403                 };
404             }
405 
406             IOSession session;
407             try {
408                 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
409                 int timeout = 0;
410                 try {
411                     timeout = channel.socket().getSoTimeout();
412                 } catch (final IOException ex) {
413                     // Very unlikely to happen and is not fatal
414                     // as the protocol layer is expected to overwrite
415                     // this value anyways
416                 }
417 
418                 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
419                 session.setSocketTimeout(timeout);
420             } catch (final CancelledKeyException ex) {
421                 continue;
422             }
423             try {
424                 this.sessions.add(session);
425                 SessionRequestImpl sessionRequest = entry.getSessionRequest();
426                 if (sessionRequest != null) {
427                     sessionRequest.completed(session);
428                 }
429                 key.attach(session);
430                 sessionCreated(key, session);
431             } catch (CancelledKeyException ex) {
432                 queueClosedSession(session);
433                 key.attach(null);
434             }
435         }
436     }
437 
438     private void processClosedSessions() {
439         IOSession session;
440         while ((session = this.closedSessions.poll()) != null) {
441             if (this.sessions.remove(session)) {
442                 try {
443                     sessionClosed(session);
444                 } catch (CancelledKeyException ex) {
445                     // ignore and move on
446                 }
447             }
448         }
449     }
450 
451     private void processPendingInterestOps() {
452         // validity check
453         if (!this.interestOpsQueueing) {
454             return;
455         }
456         InterestOpEntry entry;
457         while ((entry = this.interestOpsQueue.poll()) != null) {
458             // obtain the operation's details
459             SelectionKey key = entry.getSelectionKey();
460             int eventMask = entry.getEventMask();
461             if (key.isValid()) {
462                 key.interestOps(eventMask);
463             }
464         }
465     }
466 
467     private boolean queueInterestOps(final InterestOpEntry entry) {
468         // validity checks
469         if (!this.interestOpsQueueing) {
470             throw new IllegalStateException("Interest ops queueing not enabled");
471         }
472         if (entry == null) {
473             return false;
474         }
475 
476         // add this operation to the interestOps() queue
477         this.interestOpsQueue.add(entry);
478 
479         return true;
480     }
481 
482     /**
483      * Triggered to verify whether the I/O session associated with the
484      * given selection key has not timed out.
485      * <p>
486      * Super-classes can implement this method to react to the event.
487      *
488      * @param key the selection key.
489      * @param now current time as long value.
490      */
491     protected void timeoutCheck(final SelectionKey key, long now) {
492         IOSessionImpl session = (IOSessionImpl) key.attachment();
493         if (session != null) {
494             int timeout = session.getSocketTimeout();
495             if (timeout > 0) {
496                 if (session.getLastAccessTime() + timeout < now) {
497                     sessionTimedOut(session);
498                 }
499             }
500         }
501     }
502 
503     /**
504      * Closes out all I/O sessions maintained by this I/O reactor.
505      */
506     protected void closeSessions() {
507         synchronized (this.sessions) {
508             for (Iterator<IOSession> it = this.sessions.iterator(); it.hasNext(); ) {
509                 IOSession session = it.next();
510                 session.close();
511             }
512         }
513     }
514 
515     /**
516      * Closes out all new channels pending registration with the selector of
517      * this I/O reactor.
518      * @throws IOReactorException - not thrown currently
519      */
520     protected void closeNewChannels() throws IOReactorException {
521         ChannelEntry entry;
522         while ((entry = this.newChannels.poll()) != null) {
523             SessionRequestImpl sessionRequest = entry.getSessionRequest();
524             if (sessionRequest != null) {
525                 sessionRequest.cancel();
526             }
527             SocketChannel channel = entry.getChannel();
528             try {
529                 channel.close();
530             } catch (IOException ignore) {
531             }
532         }
533     }
534 
535     /**
536      * Closes out all active channels registered with the selector of
537      * this I/O reactor.
538      * @throws IOReactorException - not thrown currently
539      */
540     protected void closeActiveChannels() throws IOReactorException {
541         try {
542             Set<SelectionKey> keys = this.selector.keys();
543             for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
544                 SelectionKey key = it.next();
545                 IOSession session = getSession(key);
546                 if (session != null) {
547                     session.close();
548                 }
549             }
550             this.selector.close();
551         } catch (IOException ignore) {
552         }
553     }
554 
555     /**
556      * Attempts graceful shutdown of this I/O reactor.
557      */
558     public void gracefulShutdown() {
559         synchronized (this.statusMutex) {
560             if (this.status != IOReactorStatus.ACTIVE) {
561                 // Already shutting down
562                 return;
563             }
564             this.status = IOReactorStatus.SHUTTING_DOWN;
565         }
566         this.selector.wakeup();
567     }
568 
569     /**
570      * Attempts force-shutdown of this I/O reactor.
571      */
572     public void hardShutdown() throws IOReactorException {
573         synchronized (this.statusMutex) {
574             if (this.status == IOReactorStatus.SHUT_DOWN) {
575                 // Already shut down
576                 return;
577             }
578             this.status = IOReactorStatus.SHUT_DOWN;
579         }
580 
581         closeNewChannels();
582         closeActiveChannels();
583         processClosedSessions();
584     }
585 
586     /**
587      * Blocks for the given period of time in milliseconds awaiting
588      * the completion of the reactor shutdown.
589      *
590      * @param timeout the maximum wait time.
591      * @throws InterruptedException if interrupted.
592      */
593     public void awaitShutdown(long timeout) throws InterruptedException {
594         synchronized (this.statusMutex) {
595             long deadline = System.currentTimeMillis() + timeout;
596             long remaining = timeout;
597             while (this.status != IOReactorStatus.SHUT_DOWN) {
598                 this.statusMutex.wait(remaining);
599                 if (timeout > 0) {
600                     remaining = deadline - System.currentTimeMillis();
601                     if (remaining <= 0) {
602                         break;
603                     }
604                 }
605             }
606         }
607     }
608 
609     public void shutdown(long gracePeriod) throws IOReactorException {
610         if (this.status != IOReactorStatus.INACTIVE) {
611             gracefulShutdown();
612             try {
613                 awaitShutdown(gracePeriod);
614             } catch (InterruptedException ignore) {
615             }
616         }
617         if (this.status != IOReactorStatus.SHUT_DOWN) {
618             hardShutdown();
619         }
620     }
621 
622     public void shutdown() throws IOReactorException {
623         shutdown(1000);
624     }
625 
626 }