View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.InterruptedIOException;
31  import java.nio.channels.CancelledKeyException;
32  import java.nio.channels.SelectionKey;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.Set;
36  
37  import org.apache.http.nio.reactor.EventMask;
38  import org.apache.http.nio.reactor.IOEventDispatch;
39  import org.apache.http.nio.reactor.IOReactorException;
40  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
41  import org.apache.http.nio.reactor.IOSession;
42  import org.apache.http.util.Args;
43  
44  /**
45   * Default implementation of {@link AbstractIOReactor} that serves as a base
46   * for more advanced {@link org.apache.http.nio.reactor.IOReactor}
47   * implementations. This class adds support for the I/O event dispatching
48   * using {@link IOEventDispatch}, management of buffering sessions, and
49   * session timeout handling.
50   *
51   * @since 4.0
52   */
53  public class BaseIOReactor extends AbstractIOReactor {
54  
55      private final long timeoutCheckInterval;
56      private final Set<IOSession> bufferingSessions;
57  
58      private long lastTimeoutCheck;
59  
60      private IOReactorExceptionHandler exceptionHandler = null;
61      private IOEventDispatch eventDispatch = null;
62  
63      /**
64       * Creates new BaseIOReactor instance.
65       *
66       * @param selectTimeout the select timeout.
67       * @throws IOReactorException in case if a non-recoverable I/O error.
68       */
69      public BaseIOReactor(final long selectTimeout) throws IOReactorException {
70          this(selectTimeout, false);
71      }
72  
73      /**
74       * Creates new BaseIOReactor instance.
75       *
76       * @param selectTimeout the select timeout.
77       * @param interestOpsQueueing Ops queueing flag.
78       *
79       * @throws IOReactorException in case if a non-recoverable I/O error.
80       *
81       * @since 4.1
82       */
83      public BaseIOReactor(
84              final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
85          super(selectTimeout, interestOpsQueueing);
86          this.bufferingSessions = new HashSet<IOSession>();
87          this.timeoutCheckInterval = selectTimeout;
88          this.lastTimeoutCheck = System.currentTimeMillis();
89      }
90  
91      /**
92       * Activates the I/O reactor. The I/O reactor will start reacting to I/O
93       * events and dispatch I/O event notifications to the given
94       * {@link IOEventDispatch}.
95       *
96       * @throws InterruptedIOException if the dispatch thread is interrupted.
97       * @throws IOReactorException in case if a non-recoverable I/O error.
98       */
99      @Override
100     public void execute(
101             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
102         Args.notNull(eventDispatch, "Event dispatcher");
103         this.eventDispatch = eventDispatch;
104         execute();
105     }
106 
107     /**
108      * Sets exception handler for this I/O reactor.
109      *
110      * @param exceptionHandler the exception handler.
111      */
112     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
113         this.exceptionHandler = exceptionHandler;
114     }
115 
116     /**
117      * Handles the given {@link RuntimeException}. This method delegates
118      * handling of the exception to the {@link IOReactorExceptionHandler},
119      * if available.
120      *
121      * @param ex the runtime exception.
122      */
123     protected void handleRuntimeException(final RuntimeException ex) {
124         if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
125             throw ex;
126         }
127     }
128 
129     /**
130      * This I/O reactor implementation does not react to the
131      * {@link SelectionKey#OP_ACCEPT} event.
132      * <p>
133      * Super-classes can override this method to react to the event.
134      */
135     @Override
136     protected void acceptable(final SelectionKey key) {
137     }
138 
139     /**
140      * This I/O reactor implementation does not react to the
141      * {@link SelectionKey#OP_CONNECT} event.
142      * <p>
143      * Super-classes can override this method to react to the event.
144      */
145     @Override
146     protected void connectable(final SelectionKey key) {
147     }
148 
149     /**
150      * Processes {@link SelectionKey#OP_READ} event on the given selection key.
151      * This method dispatches the event notification to the
152      * {@link IOEventDispatch#inputReady(IOSession)} method.
153      */
154     @Override
155     protected void readable(final SelectionKey key) {
156         final IOSession session = getSession(key);
157         try {
158             // Try to gently feed more data to the event dispatcher
159             // if the session input buffer has not been fully exhausted
160             // (the choice of 5 iterations is purely arbitrary)
161             for (int i = 0; i < 5; i++) {
162                 this.eventDispatch.inputReady(session);
163                 if (!session.hasBufferedInput()
164                         || (session.getEventMask() & SelectionKey.OP_READ) == 0) {
165                     break;
166                 }
167             }
168             if (session.hasBufferedInput()) {
169                 this.bufferingSessions.add(session);
170             }
171         } catch (final CancelledKeyException ex) {
172             queueClosedSession(session);
173             key.attach(null);
174         } catch (final RuntimeException ex) {
175             handleRuntimeException(ex);
176         }
177     }
178 
179     /**
180      * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
181      * This method dispatches the event notification to the
182      * {@link IOEventDispatch#outputReady(IOSession)} method.
183      */
184     @Override
185     protected void writable(final SelectionKey key) {
186         final IOSession session = getSession(key);
187         try {
188             this.eventDispatch.outputReady(session);
189         } catch (final CancelledKeyException ex) {
190             queueClosedSession(session);
191             key.attach(null);
192         } catch (final RuntimeException ex) {
193             handleRuntimeException(ex);
194         }
195     }
196 
197     /**
198      * Verifies whether any of the sessions associated with the given selection
199      * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
200      * method.
201      * <p>
202      * This method will also invoke the
203      * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
204      * that have buffered input data.
205      */
206     @Override
207     protected void validate(final Set<SelectionKey> keys) {
208         final long currentTime = System.currentTimeMillis();
209         if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
210             this.lastTimeoutCheck = currentTime;
211             if (keys != null) {
212                 for (final SelectionKey key : keys) {
213                     timeoutCheck(key, currentTime);
214                 }
215             }
216         }
217         if (!this.bufferingSessions.isEmpty()) {
218             for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
219                 final IOSession session = it.next();
220                 if (!session.hasBufferedInput()) {
221                     it.remove();
222                     continue;
223                 }
224                 try {
225                     if ((session.getEventMask() & EventMask.READ) > 0) {
226                         this.eventDispatch.inputReady(session);
227                         if (!session.hasBufferedInput()) {
228                             it.remove();
229                         }
230                     }
231                 } catch (final CancelledKeyException ex) {
232                     it.remove();
233                     queueClosedSession(session);
234                 } catch (final RuntimeException ex) {
235                     handleRuntimeException(ex);
236                 }
237             }
238         }
239     }
240 
241     /**
242      * Processes newly created I/O session. This method dispatches the event
243      * notification to the {@link IOEventDispatch#connected(IOSession)} method.
244      */
245     @Override
246     protected void sessionCreated(final SelectionKey key, final IOSession session) {
247         try {
248             this.eventDispatch.connected(session);
249         } catch (final CancelledKeyException ex) {
250             queueClosedSession(session);
251         } catch (final RuntimeException ex) {
252             handleRuntimeException(ex);
253         }
254     }
255 
256     /**
257      * Processes timed out I/O session. This method dispatches the event
258      * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
259      */
260     @Override
261     protected void sessionTimedOut(final IOSession session) {
262         try {
263             this.eventDispatch.timeout(session);
264         } catch (final CancelledKeyException ex) {
265             queueClosedSession(session);
266         } catch (final RuntimeException ex) {
267             handleRuntimeException(ex);
268         }
269     }
270 
271     /**
272      * Processes closed I/O session. This method dispatches the event
273      * notification to the {@link IOEventDispatch#disconnected(IOSession)}
274      * method.
275      */
276     @Override
277     protected void sessionClosed(final IOSession session) {
278         try {
279             this.eventDispatch.disconnected(session);
280         } catch (final CancelledKeyException ex) {
281             // ignore
282         } catch (final RuntimeException ex) {
283             handleRuntimeException(ex);
284         }
285     }
286 
287 }