View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.InterruptedIOException;
31  import java.nio.channels.CancelledKeyException;
32  import java.nio.channels.SelectionKey;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.Set;
36  
37  import org.apache.http.annotation.ThreadSafe;
38  import org.apache.http.nio.reactor.EventMask;
39  import org.apache.http.nio.reactor.IOEventDispatch;
40  import org.apache.http.nio.reactor.IOReactorException;
41  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
42  import org.apache.http.nio.reactor.IOSession;
43  import org.apache.http.util.Args;
44  
45  /**
46   * Default implementation of {@link AbstractIOReactor} that serves as a base
47   * for more advanced {@link org.apache.http.nio.reactor.IOReactor}
48   * implementations. This class adds support for the I/O event dispatching
49   * using {@link IOEventDispatch}, management of buffering sessions, and
50   * session timeout handling.
51   *
52   * @since 4.0
53   */
54  @ThreadSafe // public methods only
55  public class BaseIOReactor extends AbstractIOReactor {
56  
57      private final long timeoutCheckInterval;
58      private final Set<IOSession> bufferingSessions;
59  
60      private long lastTimeoutCheck;
61  
62      private IOReactorExceptionHandler exceptionHandler = null;
63      private IOEventDispatch eventDispatch = null;
64  
65      /**
66       * Creates new BaseIOReactor instance.
67       *
68       * @param selectTimeout the select timeout.
69       * @throws IOReactorException in case if a non-recoverable I/O error.
70       */
71      public BaseIOReactor(final long selectTimeout) throws IOReactorException {
72          this(selectTimeout, false);
73      }
74  
75      /**
76       * Creates new BaseIOReactor instance.
77       *
78       * @param selectTimeout the select timeout.
79       * @param interestOpsQueueing Ops queueing flag.
80       *
81       * @throws IOReactorException in case if a non-recoverable I/O error.
82       *
83       * @since 4.1
84       */
85      public BaseIOReactor(
86              final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
87          super(selectTimeout, interestOpsQueueing);
88          this.bufferingSessions = new HashSet<IOSession>();
89          this.timeoutCheckInterval = selectTimeout;
90          this.lastTimeoutCheck = System.currentTimeMillis();
91      }
92  
93      /**
94       * Activates the I/O reactor. The I/O reactor will start reacting to I/O
95       * events and dispatch I/O event notifications to the given
96       * {@link IOEventDispatch}.
97       *
98       * @throws InterruptedIOException if the dispatch thread is interrupted.
99       * @throws IOReactorException in case if a non-recoverable I/O error.
100      */
101     public void execute(
102             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
103         Args.notNull(eventDispatch, "Event dispatcher");
104         this.eventDispatch = eventDispatch;
105         execute();
106     }
107 
108     /**
109      * Sets exception handler for this I/O reactor.
110      *
111      * @param exceptionHandler the exception handler.
112      */
113     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
114         this.exceptionHandler = exceptionHandler;
115     }
116 
117     /**
118      * Handles the given {@link RuntimeException}. This method delegates
119      * handling of the exception to the {@link IOReactorExceptionHandler},
120      * if available.
121      *
122      * @param ex the runtime exception.
123      */
124     protected void handleRuntimeException(final RuntimeException ex) {
125         if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
126             throw ex;
127         }
128     }
129 
130     /**
131      * This I/O reactor implementation does not react to the
132      * {@link SelectionKey#OP_ACCEPT} event.
133      * <p>
134      * Super-classes can override this method to react to the event.
135      */
136     @Override
137     protected void acceptable(final SelectionKey key) {
138     }
139 
140     /**
141      * This I/O reactor implementation does not react to the
142      * {@link SelectionKey#OP_CONNECT} event.
143      * <p>
144      * Super-classes can override this method to react to the event.
145      */
146     @Override
147     protected void connectable(final SelectionKey key) {
148     }
149 
150     /**
151      * Processes {@link SelectionKey#OP_READ} event on the given selection key.
152      * This method dispatches the event notification to the
153      * {@link IOEventDispatch#inputReady(IOSession)} method.
154      */
155     @Override
156     protected void readable(final SelectionKey key) {
157         final IOSession session = getSession(key);
158         try {
159             this.eventDispatch.inputReady(session);
160             if (session.hasBufferedInput()) {
161                 this.bufferingSessions.add(session);
162             }
163         } catch (final CancelledKeyException ex) {
164             queueClosedSession(session);
165             key.attach(null);
166         } catch (final RuntimeException ex) {
167             handleRuntimeException(ex);
168         }
169     }
170 
171     /**
172      * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
173      * This method dispatches the event notification to the
174      * {@link IOEventDispatch#outputReady(IOSession)} method.
175      */
176     @Override
177     protected void writable(final SelectionKey key) {
178         final IOSession session = getSession(key);
179         try {
180             this.eventDispatch.outputReady(session);
181         } catch (final CancelledKeyException ex) {
182             queueClosedSession(session);
183             key.attach(null);
184         } catch (final RuntimeException ex) {
185             handleRuntimeException(ex);
186         }
187     }
188 
189     /**
190      * Verifies whether any of the sessions associated with the given selection
191      * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
192      * method.
193      * <p>
194      * This method will also invoke the
195      * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
196      * that have buffered input data.
197      */
198     @Override
199     protected void validate(final Set<SelectionKey> keys) {
200         final long currentTime = System.currentTimeMillis();
201         if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
202             this.lastTimeoutCheck = currentTime;
203             if (keys != null) {
204                 for (final SelectionKey key : keys) {
205                     timeoutCheck(key, currentTime);
206                 }
207             }
208         }
209         if (!this.bufferingSessions.isEmpty()) {
210             for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
211                 final IOSession session = it.next();
212                 if (!session.hasBufferedInput()) {
213                     it.remove();
214                     continue;
215                 }
216                 try {
217                     if ((session.getEventMask() & EventMask.READ) > 0) {
218                         this.eventDispatch.inputReady(session);
219                         if (!session.hasBufferedInput()) {
220                             it.remove();
221                         }
222                     }
223                 } catch (final CancelledKeyException ex) {
224                     it.remove();
225                     queueClosedSession(session);
226                 } catch (final RuntimeException ex) {
227                     handleRuntimeException(ex);
228                 }
229             }
230         }
231     }
232 
233     /**
234      * Processes newly created I/O session. This method dispatches the event
235      * notification to the {@link IOEventDispatch#connected(IOSession)} method.
236      */
237     @Override
238     protected void sessionCreated(final SelectionKey key, final IOSession session) {
239         try {
240             this.eventDispatch.connected(session);
241         } catch (final CancelledKeyException ex) {
242             queueClosedSession(session);
243         } catch (final RuntimeException ex) {
244             handleRuntimeException(ex);
245         }
246     }
247 
248     /**
249      * Processes timed out I/O session. This method dispatches the event
250      * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
251      */
252     @Override
253     protected void sessionTimedOut(final IOSession session) {
254         try {
255             this.eventDispatch.timeout(session);
256         } catch (final CancelledKeyException ex) {
257             queueClosedSession(session);
258         } catch (final RuntimeException ex) {
259             handleRuntimeException(ex);
260         }
261     }
262 
263     /**
264      * Processes closed I/O session. This method dispatches the event
265      * notification to the {@link IOEventDispatch#disconnected(IOSession)}
266      * method.
267      */
268     @Override
269     protected void sessionClosed(final IOSession session) {
270         try {
271             this.eventDispatch.disconnected(session);
272         } catch (final CancelledKeyException ex) {
273             // ignore
274         } catch (final RuntimeException ex) {
275             handleRuntimeException(ex);
276         }
277     }
278 
279 }