1 /*
2 * ====================================================================
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 * ====================================================================
20 *
21 * This software consists of voluntary contributions made by many
22 * individuals on behalf of the Apache Software Foundation. For more
23 * information on the Apache Software Foundation, please see
24 * <http://www.apache.org/>.
25 *
26 */
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.InterruptedIOException;
31 import java.nio.channels.CancelledKeyException;
32 import java.nio.channels.SelectionKey;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.Set;
36
37 import org.apache.http.annotation.ThreadSafe;
38 import org.apache.http.nio.reactor.EventMask;
39 import org.apache.http.nio.reactor.IOEventDispatch;
40 import org.apache.http.nio.reactor.IOReactor;
41 import org.apache.http.nio.reactor.IOReactorException;
42 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
43 import org.apache.http.nio.reactor.IOSession;
44
45 /**
46 * Default implementation of {@link AbstractIOReactor} that serves as a base
47 * for more advanced {@link IOReactor} implementations. This class adds
48 * support for the I/O event dispatching using {@link IOEventDispatch},
49 * management of buffering sessions, and session timeout handling.
50 *
51 * @since 4.0
52 */
53 @ThreadSafe // public methods only
54 public class BaseIOReactor extends AbstractIOReactor {
55
56 private final long timeoutCheckInterval;
57 private final Set<IOSession> bufferingSessions;
58
59 private long lastTimeoutCheck;
60
61 private IOReactorExceptionHandler exceptionHandler = null;
62 private IOEventDispatch eventDispatch = null;
63
64 /**
65 * Creates new BaseIOReactor instance.
66 *
67 * @param selectTimeout the select timeout.
68 * @throws IOReactorException in case if a non-recoverable I/O error.
69 */
70 public BaseIOReactor(long selectTimeout) throws IOReactorException {
71 this(selectTimeout, false);
72 }
73
74 /**
75 * Creates new BaseIOReactor instance.
76 *
77 * @param selectTimeout the select timeout.
78 * @param interestOpsQueueing Ops queueing flag.
79 *
80 * @throws IOReactorException in case if a non-recoverable I/O error.
81 *
82 * @since 4.1
83 */
84 public BaseIOReactor(
85 long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
86 super(selectTimeout, interestOpsQueueing);
87 this.bufferingSessions = new HashSet<IOSession>();
88 this.timeoutCheckInterval = selectTimeout;
89 this.lastTimeoutCheck = System.currentTimeMillis();
90 }
91
92 /**
93 * Activates the I/O reactor. The I/O reactor will start reacting to I/O
94 * events and dispatch I/O event notifications to the given
95 * {@link IOEventDispatch}.
96 *
97 * @throws InterruptedIOException if the dispatch thread is interrupted.
98 * @throws IOReactorException in case if a non-recoverable I/O error.
99 */
100 public void execute(
101 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
102 if (eventDispatch == null) {
103 throw new IllegalArgumentException("Event dispatcher may not be null");
104 }
105 this.eventDispatch = eventDispatch;
106 execute();
107 }
108
109 /**
110 * Sets exception handler for this I/O reactor.
111 *
112 * @param exceptionHandler the exception handler.
113 */
114 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
115 this.exceptionHandler = exceptionHandler;
116 }
117
118 /**
119 * Handles the given {@link RuntimeException}. This method delegates
120 * handling of the exception to the {@link IOReactorExceptionHandler},
121 * if available.
122 *
123 * @param ex the runtime exception.
124 */
125 protected void handleRuntimeException(final RuntimeException ex) {
126 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
127 throw ex;
128 }
129 }
130
131 /**
132 * This I/O reactor implementation does not react to the
133 * {@link SelectionKey#OP_ACCEPT} event.
134 * <p>
135 * Super-classes can override this method to react to the event.
136 */
137 @Override
138 protected void acceptable(final SelectionKey key) {
139 }
140
141 /**
142 * This I/O reactor implementation does not react to the
143 * {@link SelectionKey#OP_CONNECT} event.
144 * <p>
145 * Super-classes can override this method to react to the event.
146 */
147 @Override
148 protected void connectable(final SelectionKey key) {
149 }
150
151 /**
152 * Processes {@link SelectionKey#OP_READ} event on the given selection key.
153 * This method dispatches the event notification to the
154 * {@link IOEventDispatch#inputReady(IOSession)} method.
155 */
156 @Override
157 protected void readable(final SelectionKey key) {
158 IOSession session = getSession(key);
159 try {
160 this.eventDispatch.inputReady(session);
161 if (session.hasBufferedInput()) {
162 this.bufferingSessions.add(session);
163 }
164 } catch (CancelledKeyException ex) {
165 queueClosedSession(session);
166 key.attach(null);
167 } catch (RuntimeException ex) {
168 handleRuntimeException(ex);
169 }
170 }
171
172 /**
173 * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
174 * This method dispatches the event notification to the
175 * {@link IOEventDispatch#outputReady(IOSession)} method.
176 */
177 @Override
178 protected void writable(final SelectionKey key) {
179 IOSession session = getSession(key);
180 try {
181 this.eventDispatch.outputReady(session);
182 } catch (CancelledKeyException ex) {
183 queueClosedSession(session);
184 key.attach(null);
185 } catch (RuntimeException ex) {
186 handleRuntimeException(ex);
187 }
188 }
189
190 /**
191 * Verifies whether any of the sessions associated with the given selection
192 * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
193 * method.
194 * <p>
195 * This method will also invoke the
196 * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
197 * that have buffered input data.
198 */
199 @Override
200 protected void validate(final Set<SelectionKey> keys) {
201 long currentTime = System.currentTimeMillis();
202 if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
203 this.lastTimeoutCheck = currentTime;
204 if (keys != null) {
205 for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext();) {
206 SelectionKey key = it.next();
207 timeoutCheck(key, currentTime);
208 }
209 }
210 }
211 if (!this.bufferingSessions.isEmpty()) {
212 for (Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
213 IOSession session = it.next();
214 if (!session.hasBufferedInput()) {
215 it.remove();
216 continue;
217 }
218 try {
219 if ((session.getEventMask() & EventMask.READ) > 0) {
220 this.eventDispatch.inputReady(session);
221 if (!session.hasBufferedInput()) {
222 it.remove();
223 }
224 }
225 } catch (CancelledKeyException ex) {
226 it.remove();
227 queueClosedSession(session);
228 } catch (RuntimeException ex) {
229 handleRuntimeException(ex);
230 }
231 }
232 }
233 }
234
235 /**
236 * Processes newly created I/O session. This method dispatches the event
237 * notification to the {@link IOEventDispatch#connected(IOSession)} method.
238 */
239 @Override
240 protected void sessionCreated(final SelectionKey key, final IOSession session) {
241 try {
242 this.eventDispatch.connected(session);
243 } catch (CancelledKeyException ex) {
244 queueClosedSession(session);
245 } catch (RuntimeException ex) {
246 handleRuntimeException(ex);
247 }
248 }
249
250 /**
251 * Processes timed out I/O session. This method dispatches the event
252 * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
253 */
254 @Override
255 protected void sessionTimedOut(final IOSession session) {
256 try {
257 this.eventDispatch.timeout(session);
258 } catch (CancelledKeyException ex) {
259 queueClosedSession(session);
260 } catch (RuntimeException ex) {
261 handleRuntimeException(ex);
262 }
263 }
264
265 /**
266 * Processes closed I/O session. This method dispatches the event
267 * notification to the {@link IOEventDispatch#disconnected(IOSession)}
268 * method.
269 */
270 @Override
271 protected void sessionClosed(final IOSession session) {
272 try {
273 this.eventDispatch.disconnected(session);
274 } catch (CancelledKeyException ex) {
275 // ignore
276 } catch (RuntimeException ex) {
277 handleRuntimeException(ex);
278 }
279 }
280
281 }