View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.InterruptedIOException;
31  import java.nio.channels.CancelledKeyException;
32  import java.nio.channels.SelectionKey;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.Set;
36  
37  import org.apache.http.annotation.ThreadSafe;
38  import org.apache.http.nio.reactor.EventMask;
39  import org.apache.http.nio.reactor.IOEventDispatch;
40  import org.apache.http.nio.reactor.IOReactor;
41  import org.apache.http.nio.reactor.IOReactorException;
42  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
43  import org.apache.http.nio.reactor.IOSession;
44  
45  /**
46   * Default implementation of {@link AbstractIOReactor} that serves as a base
47   * for more advanced {@link IOReactor} implementations. This class adds
48   * support for the I/O event dispatching using {@link IOEventDispatch},
49   * management of buffering sessions, and session timeout handling.
50   *
51   * @since 4.0
52   */
53  @ThreadSafe // public methods only
54  public class BaseIOReactor extends AbstractIOReactor {
55  
56      private final long timeoutCheckInterval;
57      private final Set<IOSession> bufferingSessions;
58  
59      private long lastTimeoutCheck;
60  
61      private IOReactorExceptionHandler exceptionHandler = null;
62      private IOEventDispatch eventDispatch = null;
63  
64      /**
65       * Creates new BaseIOReactor instance.
66       *
67       * @param selectTimeout the select timeout.
68       * @throws IOReactorException in case if a non-recoverable I/O error.
69       */
70      public BaseIOReactor(long selectTimeout) throws IOReactorException {
71          this(selectTimeout, false);
72      }
73  
74      /**
75       * Creates new BaseIOReactor instance.
76       *
77       * @param selectTimeout the select timeout.
78       * @param interestOpsQueueing Ops queueing flag.
79       *
80       * @throws IOReactorException in case if a non-recoverable I/O error.
81       *
82       * @since 4.1
83       */
84      public BaseIOReactor(
85              long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
86          super(selectTimeout, interestOpsQueueing);
87          this.bufferingSessions = new HashSet<IOSession>();
88          this.timeoutCheckInterval = selectTimeout;
89          this.lastTimeoutCheck = System.currentTimeMillis();
90      }
91  
92      /**
93       * Activates the I/O reactor. The I/O reactor will start reacting to I/O
94       * events and dispatch I/O event notifications to the given
95       * {@link IOEventDispatch}.
96       *
97       * @throws InterruptedIOException if the dispatch thread is interrupted.
98       * @throws IOReactorException in case if a non-recoverable I/O error.
99       */
100     public void execute(
101             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
102         if (eventDispatch == null) {
103             throw new IllegalArgumentException("Event dispatcher may not be null");
104         }
105         this.eventDispatch = eventDispatch;
106         execute();
107     }
108 
109     /**
110      * Sets exception handler for this I/O reactor.
111      *
112      * @param exceptionHandler the exception handler.
113      */
114     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
115         this.exceptionHandler = exceptionHandler;
116     }
117 
118     /**
119      * Handles the given {@link RuntimeException}. This method delegates
120      * handling of the exception to the {@link IOReactorExceptionHandler},
121      * if available.
122      *
123      * @param ex the runtime exception.
124      */
125     protected void handleRuntimeException(final RuntimeException ex) {
126         if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
127             throw ex;
128         }
129     }
130 
131     /**
132      * This I/O reactor implementation does not react to the
133      * {@link SelectionKey#OP_ACCEPT} event.
134      * <p>
135      * Super-classes can override this method to react to the event.
136      */
137     @Override
138     protected void acceptable(final SelectionKey key) {
139     }
140 
141     /**
142      * This I/O reactor implementation does not react to the
143      * {@link SelectionKey#OP_CONNECT} event.
144      * <p>
145      * Super-classes can override this method to react to the event.
146      */
147     @Override
148     protected void connectable(final SelectionKey key) {
149     }
150 
151     /**
152      * Processes {@link SelectionKey#OP_READ} event on the given selection key.
153      * This method dispatches the event notification to the
154      * {@link IOEventDispatch#inputReady(IOSession)} method.
155      */
156     @Override
157     protected void readable(final SelectionKey key) {
158         IOSession session = getSession(key);
159         try {
160             this.eventDispatch.inputReady(session);
161             if (session.hasBufferedInput()) {
162                 this.bufferingSessions.add(session);
163             }
164         } catch (CancelledKeyException ex) {
165             queueClosedSession(session);
166             key.attach(null);
167         } catch (RuntimeException ex) {
168             handleRuntimeException(ex);
169         }
170     }
171 
172     /**
173      * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
174      * This method dispatches the event notification to the
175      * {@link IOEventDispatch#outputReady(IOSession)} method.
176      */
177     @Override
178     protected void writable(final SelectionKey key) {
179         IOSession session = getSession(key);
180         try {
181             this.eventDispatch.outputReady(session);
182         } catch (CancelledKeyException ex) {
183             queueClosedSession(session);
184             key.attach(null);
185         } catch (RuntimeException ex) {
186             handleRuntimeException(ex);
187         }
188     }
189 
190     /**
191      * Verifies whether any of the sessions associated with the given selection
192      * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
193      * method.
194      * <p>
195      * This method will also invoke the
196      * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
197      * that have buffered input data.
198      */
199     @Override
200     protected void validate(final Set<SelectionKey> keys) {
201         long currentTime = System.currentTimeMillis();
202         if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
203             this.lastTimeoutCheck = currentTime;
204             if (keys != null) {
205                 for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
206                     SelectionKey key = it.next();
207                     timeoutCheck(key, currentTime);
208                 }
209             }
210         }
211         if (!this.bufferingSessions.isEmpty()) {
212             for (Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
213                 IOSession session = it.next();
214                 if (!session.hasBufferedInput()) {
215                     it.remove();
216                     continue;
217                 }
218                 try {
219                     if ((session.getEventMask() & EventMask.READ) > 0) {
220                         this.eventDispatch.inputReady(session);
221                         if (!session.hasBufferedInput()) {
222                             it.remove();
223                         }
224                     }
225                 } catch (CancelledKeyException ex) {
226                     it.remove();
227                     queueClosedSession(session);
228                 } catch (RuntimeException ex) {
229                     handleRuntimeException(ex);
230                 }
231             }
232         }
233     }
234 
235     /**
236      * Processes newly created I/O session. This method dispatches the event
237      * notification to the {@link IOEventDispatch#connected(IOSession)} method.
238      */
239     @Override
240     protected void sessionCreated(final SelectionKey key, final IOSession session) {
241         try {
242             this.eventDispatch.connected(session);
243         } catch (CancelledKeyException ex) {
244             queueClosedSession(session);
245         } catch (RuntimeException ex) {
246             handleRuntimeException(ex);
247         }
248     }
249 
250     /**
251      * Processes timed out I/O session. This method dispatches the event
252      * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
253      */
254     @Override
255     protected void sessionTimedOut(final IOSession session) {
256         try {
257             this.eventDispatch.timeout(session);
258         } catch (CancelledKeyException ex) {
259             queueClosedSession(session);
260         } catch (RuntimeException ex) {
261             handleRuntimeException(ex);
262         }
263     }
264 
265     /**
266      * Processes closed I/O session. This method dispatches the event
267      * notification to the {@link IOEventDispatch#disconnected(IOSession)}
268      * method.
269      */
270     @Override
271     protected void sessionClosed(final IOSession session) {
272         try {
273             this.eventDispatch.disconnected(session);
274         } catch (CancelledKeyException ex) {
275             // ignore
276         } catch (RuntimeException ex) {
277             handleRuntimeException(ex);
278         }
279     }
280 
281 }