View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.impl.nio.reactor;
29  
30  import java.io.InterruptedIOException;
31  import java.nio.channels.CancelledKeyException;
32  import java.nio.channels.SelectionKey;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.Set;
36  
37  import org.apache.http.annotation.ThreadSafe;
38  import org.apache.http.nio.reactor.EventMask;
39  import org.apache.http.nio.reactor.IOEventDispatch;
40  import org.apache.http.nio.reactor.IOReactorException;
41  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
42  import org.apache.http.nio.reactor.IOSession;
43  import org.apache.http.util.Args;
44  
45  /**
46   * Default implementation of {@link AbstractIOReactor} that serves as a base
47   * for more advanced {@link org.apache.http.nio.reactor.IOReactor}
48   * implementations. This class adds support for the I/O event dispatching
49   * using {@link IOEventDispatch}, management of buffering sessions, and
50   * session timeout handling.
51   *
52   * @since 4.0
53   */
54  @ThreadSafe // public methods only
55  public class BaseIOReactor extends AbstractIOReactor {
56  
57      private final long timeoutCheckInterval;
58      private final Set<IOSession> bufferingSessions;
59  
60      private long lastTimeoutCheck;
61  
62      private IOReactorExceptionHandler exceptionHandler = null;
63      private IOEventDispatch eventDispatch = null;
64  
65      /**
66       * Creates new BaseIOReactor instance.
67       *
68       * @param selectTimeout the select timeout.
69       * @throws IOReactorException in case if a non-recoverable I/O error.
70       */
71      public BaseIOReactor(final long selectTimeout) throws IOReactorException {
72          this(selectTimeout, false);
73      }
74  
75      /**
76       * Creates new BaseIOReactor instance.
77       *
78       * @param selectTimeout the select timeout.
79       * @param interestOpsQueueing Ops queueing flag.
80       *
81       * @throws IOReactorException in case if a non-recoverable I/O error.
82       *
83       * @since 4.1
84       */
85      public BaseIOReactor(
86              final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
87          super(selectTimeout, interestOpsQueueing);
88          this.bufferingSessions = new HashSet<IOSession>();
89          this.timeoutCheckInterval = selectTimeout;
90          this.lastTimeoutCheck = System.currentTimeMillis();
91      }
92  
93      /**
94       * Activates the I/O reactor. The I/O reactor will start reacting to I/O
95       * events and dispatch I/O event notifications to the given
96       * {@link IOEventDispatch}.
97       *
98       * @throws InterruptedIOException if the dispatch thread is interrupted.
99       * @throws IOReactorException in case if a non-recoverable I/O error.
100      */
101     @Override
102     public void execute(
103             final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
104         Args.notNull(eventDispatch, "Event dispatcher");
105         this.eventDispatch = eventDispatch;
106         execute();
107     }
108 
109     /**
110      * Sets exception handler for this I/O reactor.
111      *
112      * @param exceptionHandler the exception handler.
113      */
114     public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
115         this.exceptionHandler = exceptionHandler;
116     }
117 
118     /**
119      * Handles the given {@link RuntimeException}. This method delegates
120      * handling of the exception to the {@link IOReactorExceptionHandler},
121      * if available.
122      *
123      * @param ex the runtime exception.
124      */
125     protected void handleRuntimeException(final RuntimeException ex) {
126         if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
127             throw ex;
128         }
129     }
130 
131     /**
132      * This I/O reactor implementation does not react to the
133      * {@link SelectionKey#OP_ACCEPT} event.
134      * <p>
135      * Super-classes can override this method to react to the event.
136      */
137     @Override
138     protected void acceptable(final SelectionKey key) {
139     }
140 
141     /**
142      * This I/O reactor implementation does not react to the
143      * {@link SelectionKey#OP_CONNECT} event.
144      * <p>
145      * Super-classes can override this method to react to the event.
146      */
147     @Override
148     protected void connectable(final SelectionKey key) {
149     }
150 
151     /**
152      * Processes {@link SelectionKey#OP_READ} event on the given selection key.
153      * This method dispatches the event notification to the
154      * {@link IOEventDispatch#inputReady(IOSession)} method.
155      */
156     @Override
157     protected void readable(final SelectionKey key) {
158         final IOSession session = getSession(key);
159         try {
160             // Try to gently feed more data to the event dispatcher
161             // if the session input buffer has not been fully exhausted
162             // (the choice of 5 iterations is purely arbitrary)
163             for (int i = 0; i < 5; i++) {
164                 this.eventDispatch.inputReady(session);
165                 if (!session.hasBufferedInput()
166                         || (session.getEventMask() & SelectionKey.OP_READ) == 0) {
167                     break;
168                 }
169             }
170             if (session.hasBufferedInput()) {
171                 this.bufferingSessions.add(session);
172             }
173         } catch (final CancelledKeyException ex) {
174             queueClosedSession(session);
175             key.attach(null);
176         } catch (final RuntimeException ex) {
177             handleRuntimeException(ex);
178         }
179     }
180 
181     /**
182      * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
183      * This method dispatches the event notification to the
184      * {@link IOEventDispatch#outputReady(IOSession)} method.
185      */
186     @Override
187     protected void writable(final SelectionKey key) {
188         final IOSession session = getSession(key);
189         try {
190             this.eventDispatch.outputReady(session);
191         } catch (final CancelledKeyException ex) {
192             queueClosedSession(session);
193             key.attach(null);
194         } catch (final RuntimeException ex) {
195             handleRuntimeException(ex);
196         }
197     }
198 
199     /**
200      * Verifies whether any of the sessions associated with the given selection
201      * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
202      * method.
203      * <p>
204      * This method will also invoke the
205      * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
206      * that have buffered input data.
207      */
208     @Override
209     protected void validate(final Set<SelectionKey> keys) {
210         final long currentTime = System.currentTimeMillis();
211         if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
212             this.lastTimeoutCheck = currentTime;
213             if (keys != null) {
214                 for (final SelectionKey key : keys) {
215                     timeoutCheck(key, currentTime);
216                 }
217             }
218         }
219         if (!this.bufferingSessions.isEmpty()) {
220             for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
221                 final IOSession session = it.next();
222                 if (!session.hasBufferedInput()) {
223                     it.remove();
224                     continue;
225                 }
226                 try {
227                     if ((session.getEventMask() & EventMask.READ) > 0) {
228                         this.eventDispatch.inputReady(session);
229                         if (!session.hasBufferedInput()) {
230                             it.remove();
231                         }
232                     }
233                 } catch (final CancelledKeyException ex) {
234                     it.remove();
235                     queueClosedSession(session);
236                 } catch (final RuntimeException ex) {
237                     handleRuntimeException(ex);
238                 }
239             }
240         }
241     }
242 
243     /**
244      * Processes newly created I/O session. This method dispatches the event
245      * notification to the {@link IOEventDispatch#connected(IOSession)} method.
246      */
247     @Override
248     protected void sessionCreated(final SelectionKey key, final IOSession session) {
249         try {
250             this.eventDispatch.connected(session);
251         } catch (final CancelledKeyException ex) {
252             queueClosedSession(session);
253         } catch (final RuntimeException ex) {
254             handleRuntimeException(ex);
255         }
256     }
257 
258     /**
259      * Processes timed out I/O session. This method dispatches the event
260      * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
261      */
262     @Override
263     protected void sessionTimedOut(final IOSession session) {
264         try {
265             this.eventDispatch.timeout(session);
266         } catch (final CancelledKeyException ex) {
267             queueClosedSession(session);
268         } catch (final RuntimeException ex) {
269             handleRuntimeException(ex);
270         }
271     }
272 
273     /**
274      * Processes closed I/O session. This method dispatches the event
275      * notification to the {@link IOEventDispatch#disconnected(IOSession)}
276      * method.
277      */
278     @Override
279     protected void sessionClosed(final IOSession session) {
280         try {
281             this.eventDispatch.disconnected(session);
282         } catch (final CancelledKeyException ex) {
283             // ignore
284         } catch (final RuntimeException ex) {
285             handleRuntimeException(ex);
286         }
287     }
288 
289 }