1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.channels.CancelledKeyException;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.ClosedSelectorException;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.Selector;
37 import java.nio.channels.SocketChannel;
38 import java.util.Collections;
39 import java.util.HashSet;
40 import java.util.Iterator;
41 import java.util.Queue;
42 import java.util.Set;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44
45 import org.apache.http.annotation.ThreadSafe;
46 import org.apache.http.nio.reactor.IOReactor;
47 import org.apache.http.nio.reactor.IOReactorException;
48 import org.apache.http.nio.reactor.IOReactorStatus;
49 import org.apache.http.nio.reactor.IOSession;
50
51
52
53
54
55
56
57
58 @ThreadSafe
59 public abstract class AbstractIOReactor implements IOReactor {
60
61 private volatile IOReactorStatus status;
62
63 private final Object statusMutex;
64 private final long selectTimeout;
65 private final boolean interestOpsQueueing;
66 private final Selector selector;
67 private final Set<IOSession> sessions;
68 private final Queue<InterestOpEntry> interestOpsQueue;
69 private final Queue<IOSession> closedSessions;
70 private final Queue<ChannelEntry> newChannels;
71
72
73
74
75
76
77
78 public AbstractIOReactor(long selectTimeout) throws IOReactorException {
79 this(selectTimeout, false);
80 }
81
82
83
84
85
86
87
88
89
90
91
92 public AbstractIOReactor(long selectTimeout, boolean interestOpsQueueing) throws IOReactorException {
93 super();
94 if (selectTimeout <= 0) {
95 throw new IllegalArgumentException("Select timeout may not be negative or zero");
96 }
97 this.selectTimeout = selectTimeout;
98 this.interestOpsQueueing = interestOpsQueueing;
99 this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
100 this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
101 this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
102 this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
103 try {
104 this.selector = Selector.open();
105 } catch (IOException ex) {
106 throw new IOReactorException("Failure opening selector", ex);
107 }
108 this.statusMutex = new Object();
109 this.status = IOReactorStatus.INACTIVE;
110 }
111
112
113
114
115
116
117
118
119 protected abstract void acceptable(SelectionKey key);
120
121
122
123
124
125
126
127
128 protected abstract void connectable(SelectionKey key);
129
130
131
132
133
134
135
136
137 protected abstract void readable(SelectionKey key);
138
139
140
141
142
143
144
145
146 protected abstract void writable(SelectionKey key);
147
148
149
150
151
152
153
154
155
156
157
158 protected abstract void validate(Set<SelectionKey> keys);
159
160
161
162
163
164
165
166
167
168 protected void sessionCreated(final SelectionKey key, final IOSession session) {
169 }
170
171
172
173
174
175
176
177
178 protected void sessionClosed(final IOSession session) {
179 }
180
181
182
183
184
185
186
187
188 protected void sessionTimedOut(final IOSession session) {
189 }
190
191
192
193
194
195
196
197
198 protected IOSession getSession(final SelectionKey key) {
199 return (IOSession) key.attachment();
200 }
201
202 public IOReactorStatus getStatus() {
203 return this.status;
204 }
205
206
207
208
209
210
211 public boolean getInterestOpsQueueing() {
212 return this.interestOpsQueueing;
213 }
214
215
216
217
218
219
220
221 public void addChannel(final ChannelEntry channelEntry) {
222 if (channelEntry == null) {
223 throw new IllegalArgumentException("Channel entry may not be null");
224 }
225 this.newChannels.add(channelEntry);
226 this.selector.wakeup();
227 }
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 protected void execute() throws InterruptedIOException, IOReactorException {
252 this.status = IOReactorStatus.ACTIVE;
253
254 try {
255 for (;;) {
256
257 int readyCount;
258 try {
259 readyCount = this.selector.select(this.selectTimeout);
260 } catch (InterruptedIOException ex) {
261 throw ex;
262 } catch (IOException ex) {
263 throw new IOReactorException("Unexpected selector failure", ex);
264 }
265
266 if (this.status == IOReactorStatus.SHUT_DOWN) {
267
268 break;
269 }
270
271 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
272
273
274 closeSessions();
275 closeNewChannels();
276 }
277
278
279 if (readyCount > 0) {
280 processEvents(this.selector.selectedKeys());
281 }
282
283
284 validate(this.selector.keys());
285
286
287 processClosedSessions();
288
289
290 if (this.status == IOReactorStatus.ACTIVE) {
291 processNewChannels();
292 }
293
294
295 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
296 && this.sessions.isEmpty()) {
297 break;
298 }
299
300 if (this.interestOpsQueueing) {
301
302 processPendingInterestOps();
303 }
304
305 }
306
307 } catch (ClosedSelectorException ex) {
308 } finally {
309 hardShutdown();
310 synchronized (this.statusMutex) {
311 this.statusMutex.notifyAll();
312 }
313 }
314 }
315
316 private void processEvents(final Set<SelectionKey> selectedKeys) {
317 for (Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext(); ) {
318
319 SelectionKey key = it.next();
320 processEvent(key);
321
322 }
323 selectedKeys.clear();
324 }
325
326
327
328
329
330
331 protected void processEvent(final SelectionKey key) {
332 IOSessionImpl session = (IOSessionImpl) key.attachment();
333 try {
334 if (key.isAcceptable()) {
335 acceptable(key);
336 }
337 if (key.isConnectable()) {
338 connectable(key);
339 }
340 if (key.isReadable()) {
341 session.resetLastRead();
342 readable(key);
343 }
344 if (key.isWritable()) {
345 session.resetLastWrite();
346 writable(key);
347 }
348 } catch (CancelledKeyException ex) {
349 queueClosedSession(session);
350 key.attach(null);
351 }
352 }
353
354
355
356
357
358
359 protected void queueClosedSession(final IOSession session) {
360 if (session != null) {
361 this.closedSessions.add(session);
362 }
363 }
364
365 private void processNewChannels() throws IOReactorException {
366 ChannelEntry entry;
367 while ((entry = this.newChannels.poll()) != null) {
368
369 SocketChannel channel;
370 SelectionKey key;
371 try {
372 channel = entry.getChannel();
373 channel.configureBlocking(false);
374 key = channel.register(this.selector, SelectionKey.OP_READ);
375 } catch (ClosedChannelException ex) {
376 SessionRequestImpl sessionRequest = entry.getSessionRequest();
377 if (sessionRequest != null) {
378 sessionRequest.failed(ex);
379 }
380 return;
381
382 } catch (IOException ex) {
383 throw new IOReactorException("Failure registering channel " +
384 "with the selector", ex);
385 }
386
387 SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
388
389 public void sessionClosed(IOSession session) {
390 queueClosedSession(session);
391 }
392
393 };
394
395 InterestOpsCallback interestOpsCallback = null;
396 if (this.interestOpsQueueing) {
397 interestOpsCallback = new InterestOpsCallback() {
398
399 public void addInterestOps(final InterestOpEntry entry) {
400 queueInterestOps(entry);
401 }
402
403 };
404 }
405
406 IOSession session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
407
408 int timeout = 0;
409 try {
410 timeout = channel.socket().getSoTimeout();
411 } catch (IOException ex) {
412
413
414
415 }
416
417 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
418 session.setSocketTimeout(timeout);
419 this.sessions.add(session);
420
421 try {
422 SessionRequestImpl sessionRequest = entry.getSessionRequest();
423 if (sessionRequest != null) {
424 sessionRequest.completed(session);
425 }
426 key.attach(session);
427 sessionCreated(key, session);
428 } catch (CancelledKeyException ex) {
429 queueClosedSession(session);
430 key.attach(null);
431 }
432 }
433 }
434
435 private void processClosedSessions() {
436 IOSession session;
437 while ((session = this.closedSessions.poll()) != null) {
438 if (this.sessions.remove(session)) {
439 try {
440 sessionClosed(session);
441 } catch (CancelledKeyException ex) {
442
443 }
444 }
445 }
446 }
447
448 private void processPendingInterestOps() {
449
450 if (!this.interestOpsQueueing) {
451 return;
452 }
453 InterestOpEntry entry;
454 while ((entry = this.interestOpsQueue.poll()) != null) {
455
456 SelectionKey key = entry.getSelectionKey();
457 int eventMask = entry.getEventMask();
458 if (key.isValid()) {
459 key.interestOps(eventMask);
460 }
461 }
462 }
463
464 private boolean queueInterestOps(final InterestOpEntry entry) {
465
466 if (!this.interestOpsQueueing) {
467 throw new IllegalStateException("Interest ops queueing not enabled");
468 }
469 if (entry == null) {
470 return false;
471 }
472
473
474 this.interestOpsQueue.add(entry);
475
476 return true;
477 }
478
479
480
481
482
483
484
485
486
487
488 protected void timeoutCheck(final SelectionKey key, long now) {
489 IOSessionImpl session = (IOSessionImpl) key.attachment();
490 if (session != null) {
491 int timeout = session.getSocketTimeout();
492 if (timeout > 0) {
493 if (session.getLastAccessTime() + timeout < now) {
494 sessionTimedOut(session);
495 }
496 }
497 }
498 }
499
500
501
502
503 protected void closeSessions() {
504 synchronized (this.sessions) {
505 for (Iterator<IOSession> it = this.sessions.iterator(); it.hasNext(); ) {
506 IOSession session = it.next();
507 session.close();
508 }
509 }
510 }
511
512
513
514
515
516
517 protected void closeNewChannels() throws IOReactorException {
518 ChannelEntry entry;
519 while ((entry = this.newChannels.poll()) != null) {
520 SessionRequestImpl sessionRequest = entry.getSessionRequest();
521 if (sessionRequest != null) {
522 sessionRequest.cancel();
523 }
524 SocketChannel channel = entry.getChannel();
525 try {
526 channel.close();
527 } catch (IOException ignore) {
528 }
529 }
530 }
531
532
533
534
535
536
537 protected void closeActiveChannels() throws IOReactorException {
538 try {
539 Set<SelectionKey> keys = this.selector.keys();
540 for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
541 SelectionKey key = it.next();
542 IOSession session = getSession(key);
543 if (session != null) {
544 session.close();
545 }
546 }
547 this.selector.close();
548 } catch (IOException ignore) {
549 }
550 }
551
552
553
554
555 public void gracefulShutdown() {
556 synchronized (this.statusMutex) {
557 if (this.status != IOReactorStatus.ACTIVE) {
558
559 return;
560 }
561 this.status = IOReactorStatus.SHUTTING_DOWN;
562 }
563 this.selector.wakeup();
564 }
565
566
567
568
569 public void hardShutdown() throws IOReactorException {
570 synchronized (this.statusMutex) {
571 if (this.status == IOReactorStatus.SHUT_DOWN) {
572
573 return;
574 }
575 this.status = IOReactorStatus.SHUT_DOWN;
576 }
577
578 closeNewChannels();
579 closeActiveChannels();
580 processClosedSessions();
581 }
582
583
584
585
586
587
588
589
590 public void awaitShutdown(long timeout) throws InterruptedException {
591 synchronized (this.statusMutex) {
592 long deadline = System.currentTimeMillis() + timeout;
593 long remaining = timeout;
594 while (this.status != IOReactorStatus.SHUT_DOWN) {
595 this.statusMutex.wait(remaining);
596 if (timeout > 0) {
597 remaining = deadline - System.currentTimeMillis();
598 if (remaining <= 0) {
599 break;
600 }
601 }
602 }
603 }
604 }
605
606 public void shutdown(long gracePeriod) throws IOReactorException {
607 if (this.status != IOReactorStatus.INACTIVE) {
608 gracefulShutdown();
609 try {
610 awaitShutdown(gracePeriod);
611 } catch (InterruptedException ignore) {
612 }
613 }
614 if (this.status != IOReactorStatus.SHUT_DOWN) {
615 hardShutdown();
616 }
617 }
618
619 public void shutdown() throws IOReactorException {
620 shutdown(1000);
621 }
622
623 }