1 /*
2 * ====================================================================
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
19 * ====================================================================
20 *
21 * This software consists of voluntary contributions made by many
22 * individuals on behalf of the Apache Software Foundation. For more
23 * information on the Apache Software Foundation, please see
24 * <http://www.apache.org/>.
25 *
26 */
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.InterruptedIOException;
31 import java.nio.channels.CancelledKeyException;
32 import java.nio.channels.SelectionKey;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.Set;
36
37 import org.apache.http.annotation.ThreadSafe;
38 import org.apache.http.nio.reactor.EventMask;
39 import org.apache.http.nio.reactor.IOEventDispatch;
40 import org.apache.http.nio.reactor.IOReactor;
41 import org.apache.http.nio.reactor.IOReactorException;
42 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
43 import org.apache.http.nio.reactor.IOSession;
44 import org.apache.http.util.Args;
45
46 /**
47 * Default implementation of {@link AbstractIOReactor} that serves as a base
48 * for more advanced {@link IOReactor} implementations. This class adds
49 * support for the I/O event dispatching using {@link IOEventDispatch},
50 * management of buffering sessions, and session timeout handling.
51 *
52 * @since 4.0
53 */
54 @ThreadSafe // public methods only
55 public class BaseIOReactor extends AbstractIOReactor {
56
57 private final long timeoutCheckInterval;
58 private final Set<IOSession> bufferingSessions;
59
60 private long lastTimeoutCheck;
61
62 private IOReactorExceptionHandler exceptionHandler = null;
63 private IOEventDispatch eventDispatch = null;
64
65 /**
66 * Creates new BaseIOReactor instance.
67 *
68 * @param selectTimeout the select timeout.
69 * @throws IOReactorException in case if a non-recoverable I/O error.
70 */
71 public BaseIOReactor(final long selectTimeout) throws IOReactorException {
72 this(selectTimeout, false);
73 }
74
75 /**
76 * Creates new BaseIOReactor instance.
77 *
78 * @param selectTimeout the select timeout.
79 * @param interestOpsQueueing Ops queueing flag.
80 *
81 * @throws IOReactorException in case if a non-recoverable I/O error.
82 *
83 * @since 4.1
84 */
85 public BaseIOReactor(
86 final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
87 super(selectTimeout, interestOpsQueueing);
88 this.bufferingSessions = new HashSet<IOSession>();
89 this.timeoutCheckInterval = selectTimeout;
90 this.lastTimeoutCheck = System.currentTimeMillis();
91 }
92
93 /**
94 * Activates the I/O reactor. The I/O reactor will start reacting to I/O
95 * events and dispatch I/O event notifications to the given
96 * {@link IOEventDispatch}.
97 *
98 * @throws InterruptedIOException if the dispatch thread is interrupted.
99 * @throws IOReactorException in case if a non-recoverable I/O error.
100 */
101 public void execute(
102 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
103 Args.notNull(eventDispatch, "Event dispatcher");
104 this.eventDispatch = eventDispatch;
105 execute();
106 }
107
108 /**
109 * Sets exception handler for this I/O reactor.
110 *
111 * @param exceptionHandler the exception handler.
112 */
113 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
114 this.exceptionHandler = exceptionHandler;
115 }
116
117 /**
118 * Handles the given {@link RuntimeException}. This method delegates
119 * handling of the exception to the {@link IOReactorExceptionHandler},
120 * if available.
121 *
122 * @param ex the runtime exception.
123 */
124 protected void handleRuntimeException(final RuntimeException ex) {
125 if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex)) {
126 throw ex;
127 }
128 }
129
130 /**
131 * This I/O reactor implementation does not react to the
132 * {@link SelectionKey#OP_ACCEPT} event.
133 * <p>
134 * Super-classes can override this method to react to the event.
135 */
136 @Override
137 protected void acceptable(final SelectionKey key) {
138 }
139
140 /**
141 * This I/O reactor implementation does not react to the
142 * {@link SelectionKey#OP_CONNECT} event.
143 * <p>
144 * Super-classes can override this method to react to the event.
145 */
146 @Override
147 protected void connectable(final SelectionKey key) {
148 }
149
150 /**
151 * Processes {@link SelectionKey#OP_READ} event on the given selection key.
152 * This method dispatches the event notification to the
153 * {@link IOEventDispatch#inputReady(IOSession)} method.
154 */
155 @Override
156 protected void readable(final SelectionKey key) {
157 final IOSession session = getSession(key);
158 try {
159 this.eventDispatch.inputReady(session);
160 if (session.hasBufferedInput()) {
161 this.bufferingSessions.add(session);
162 }
163 } catch (final CancelledKeyException ex) {
164 queueClosedSession(session);
165 key.attach(null);
166 } catch (final RuntimeException ex) {
167 handleRuntimeException(ex);
168 }
169 }
170
171 /**
172 * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
173 * This method dispatches the event notification to the
174 * {@link IOEventDispatch#outputReady(IOSession)} method.
175 */
176 @Override
177 protected void writable(final SelectionKey key) {
178 final IOSession session = getSession(key);
179 try {
180 this.eventDispatch.outputReady(session);
181 } catch (final CancelledKeyException ex) {
182 queueClosedSession(session);
183 key.attach(null);
184 } catch (final RuntimeException ex) {
185 handleRuntimeException(ex);
186 }
187 }
188
189 /**
190 * Verifies whether any of the sessions associated with the given selection
191 * keys timed out by invoking the {@link #timeoutCheck(SelectionKey, long)}
192 * method.
193 * <p>
194 * This method will also invoke the
195 * {@link IOEventDispatch#inputReady(IOSession)} method on all sessions
196 * that have buffered input data.
197 */
198 @Override
199 protected void validate(final Set<SelectionKey> keys) {
200 final long currentTime = System.currentTimeMillis();
201 if( (currentTime - this.lastTimeoutCheck) >= this.timeoutCheckInterval) {
202 this.lastTimeoutCheck = currentTime;
203 if (keys != null) {
204 for (final SelectionKey key : keys) {
205 timeoutCheck(key, currentTime);
206 }
207 }
208 }
209 if (!this.bufferingSessions.isEmpty()) {
210 for (final Iterator<IOSession> it = this.bufferingSessions.iterator(); it.hasNext(); ) {
211 final IOSession session = it.next();
212 if (!session.hasBufferedInput()) {
213 it.remove();
214 continue;
215 }
216 try {
217 if ((session.getEventMask() & EventMask.READ) > 0) {
218 this.eventDispatch.inputReady(session);
219 if (!session.hasBufferedInput()) {
220 it.remove();
221 }
222 }
223 } catch (final CancelledKeyException ex) {
224 it.remove();
225 queueClosedSession(session);
226 } catch (final RuntimeException ex) {
227 handleRuntimeException(ex);
228 }
229 }
230 }
231 }
232
233 /**
234 * Processes newly created I/O session. This method dispatches the event
235 * notification to the {@link IOEventDispatch#connected(IOSession)} method.
236 */
237 @Override
238 protected void sessionCreated(final SelectionKey key, final IOSession session) {
239 try {
240 this.eventDispatch.connected(session);
241 } catch (final CancelledKeyException ex) {
242 queueClosedSession(session);
243 } catch (final RuntimeException ex) {
244 handleRuntimeException(ex);
245 }
246 }
247
248 /**
249 * Processes timed out I/O session. This method dispatches the event
250 * notification to the {@link IOEventDispatch#timeout(IOSession)} method.
251 */
252 @Override
253 protected void sessionTimedOut(final IOSession session) {
254 try {
255 this.eventDispatch.timeout(session);
256 } catch (final CancelledKeyException ex) {
257 queueClosedSession(session);
258 } catch (final RuntimeException ex) {
259 handleRuntimeException(ex);
260 }
261 }
262
263 /**
264 * Processes closed I/O session. This method dispatches the event
265 * notification to the {@link IOEventDispatch#disconnected(IOSession)}
266 * method.
267 */
268 @Override
269 protected void sessionClosed(final IOSession session) {
270 try {
271 this.eventDispatch.disconnected(session);
272 } catch (final CancelledKeyException ex) {
273 // ignore
274 } catch (final RuntimeException ex) {
275 handleRuntimeException(ex);
276 }
277 }
278
279 }