1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.channels.CancelledKeyException;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.ClosedSelectorException;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.Selector;
37 import java.nio.channels.SocketChannel;
38 import java.util.Collections;
39 import java.util.HashSet;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentLinkedQueue;
43
44 import org.apache.http.annotation.ThreadSafe;
45 import org.apache.http.nio.reactor.IOReactor;
46 import org.apache.http.nio.reactor.IOReactorException;
47 import org.apache.http.nio.reactor.IOReactorStatus;
48 import org.apache.http.nio.reactor.IOSession;
49 import org.apache.http.util.Args;
50 import org.apache.http.util.Asserts;
51
52
53
54
55
56
57
58
59 @ThreadSafe
60 public abstract class AbstractIOReactor implements IOReactor {
61
62 private volatile IOReactorStatus status;
63
64 private final Object statusMutex;
65 private final long selectTimeout;
66 private final boolean interestOpsQueueing;
67 private final Selector selector;
68 private final Set<IOSession> sessions;
69 private final Queue<InterestOpEntry> interestOpsQueue;
70 private final Queue<IOSession> closedSessions;
71 private final Queue<ChannelEntry> newChannels;
72
73
74
75
76
77
78
79 public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
80 this(selectTimeout, false);
81 }
82
83
84
85
86
87
88
89
90
91
92
93 public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
94 super();
95 Args.positive(selectTimeout, "Select timeout");
96 this.selectTimeout = selectTimeout;
97 this.interestOpsQueueing = interestOpsQueueing;
98 this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
99 this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
100 this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
101 this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
102 try {
103 this.selector = Selector.open();
104 } catch (final IOException ex) {
105 throw new IOReactorException("Failure opening selector", ex);
106 }
107 this.statusMutex = new Object();
108 this.status = IOReactorStatus.INACTIVE;
109 }
110
111
112
113
114
115
116
117
118 protected abstract void acceptable(SelectionKey key);
119
120
121
122
123
124
125
126
127 protected abstract void connectable(SelectionKey key);
128
129
130
131
132
133
134
135
136 protected abstract void readable(SelectionKey key);
137
138
139
140
141
142
143
144
145 protected abstract void writable(SelectionKey key);
146
147
148
149
150
151
152
153
154
155
156
157 protected abstract void validate(Set<SelectionKey> keys);
158
159
160
161
162
163
164
165
166
167 protected void sessionCreated(final SelectionKey key, final IOSession session) {
168 }
169
170
171
172
173
174
175
176
177 protected void sessionClosed(final IOSession session) {
178 }
179
180
181
182
183
184
185
186
187 protected void sessionTimedOut(final IOSession session) {
188 }
189
190
191
192
193
194
195
196
197 protected IOSession getSession(final SelectionKey key) {
198 return (IOSession) key.attachment();
199 }
200
201 public IOReactorStatus getStatus() {
202 return this.status;
203 }
204
205
206
207
208
209
210 public boolean getInterestOpsQueueing() {
211 return this.interestOpsQueueing;
212 }
213
214
215
216
217
218
219
220 public void addChannel(final ChannelEntry channelEntry) {
221 Args.notNull(channelEntry, "Channel entry");
222 this.newChannels.add(channelEntry);
223 this.selector.wakeup();
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 protected void execute() throws InterruptedIOException, IOReactorException {
249 this.status = IOReactorStatus.ACTIVE;
250
251 try {
252 for (;;) {
253
254 int readyCount;
255 try {
256 readyCount = this.selector.select(this.selectTimeout);
257 } catch (final InterruptedIOException ex) {
258 throw ex;
259 } catch (final IOException ex) {
260 throw new IOReactorException("Unexpected selector failure", ex);
261 }
262
263 if (this.status == IOReactorStatus.SHUT_DOWN) {
264
265 break;
266 }
267
268 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
269
270
271 closeSessions();
272 closeNewChannels();
273 }
274
275
276 if (readyCount > 0) {
277 processEvents(this.selector.selectedKeys());
278 }
279
280
281 validate(this.selector.keys());
282
283
284 processClosedSessions();
285
286
287 if (this.status == IOReactorStatus.ACTIVE) {
288 processNewChannels();
289 }
290
291
292 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
293 && this.sessions.isEmpty()) {
294 break;
295 }
296
297 if (this.interestOpsQueueing) {
298
299 processPendingInterestOps();
300 }
301
302 }
303
304 } catch (final ClosedSelectorException ex) {
305 } finally {
306 hardShutdown();
307 synchronized (this.statusMutex) {
308 this.statusMutex.notifyAll();
309 }
310 }
311 }
312
313 private void processEvents(final Set<SelectionKey> selectedKeys) {
314 for (final SelectionKey key : selectedKeys) {
315
316 processEvent(key);
317
318 }
319 selectedKeys.clear();
320 }
321
322
323
324
325
326
327 protected void processEvent(final SelectionKey key) {
328 final IOSessionImpl session = (IOSessionImpl) key.attachment();
329 try {
330 if (key.isAcceptable()) {
331 acceptable(key);
332 }
333 if (key.isConnectable()) {
334 connectable(key);
335 }
336 if (key.isReadable()) {
337 session.resetLastRead();
338 readable(key);
339 }
340 if (key.isWritable()) {
341 session.resetLastWrite();
342 writable(key);
343 }
344 } catch (final CancelledKeyException ex) {
345 queueClosedSession(session);
346 key.attach(null);
347 }
348 }
349
350
351
352
353
354
355 protected void queueClosedSession(final IOSession session) {
356 if (session != null) {
357 this.closedSessions.add(session);
358 }
359 }
360
361 private void processNewChannels() throws IOReactorException {
362 ChannelEntry entry;
363 while ((entry = this.newChannels.poll()) != null) {
364
365 SocketChannel channel;
366 SelectionKey key;
367 try {
368 channel = entry.getChannel();
369 channel.configureBlocking(false);
370 key = channel.register(this.selector, SelectionKey.OP_READ);
371 } catch (final ClosedChannelException ex) {
372 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
373 if (sessionRequest != null) {
374 sessionRequest.failed(ex);
375 }
376 return;
377
378 } catch (final IOException ex) {
379 throw new IOReactorException("Failure registering channel " +
380 "with the selector", ex);
381 }
382
383 final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
384
385 public void sessionClosed(final IOSession session) {
386 queueClosedSession(session);
387 }
388
389 };
390
391 InterestOpsCallback interestOpsCallback = null;
392 if (this.interestOpsQueueing) {
393 interestOpsCallback = new InterestOpsCallback() {
394
395 public void addInterestOps(final InterestOpEntry entry) {
396 queueInterestOps(entry);
397 }
398
399 };
400 }
401
402 final IOSession session;
403 try {
404 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
405 int timeout = 0;
406 try {
407 timeout = channel.socket().getSoTimeout();
408 } catch (final IOException ex) {
409
410
411
412 }
413
414 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
415 session.setSocketTimeout(timeout);
416 } catch (final CancelledKeyException ex) {
417 continue;
418 }
419 try {
420 this.sessions.add(session);
421 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
422 if (sessionRequest != null) {
423 sessionRequest.completed(session);
424 }
425 key.attach(session);
426 sessionCreated(key, session);
427 } catch (final CancelledKeyException ex) {
428 queueClosedSession(session);
429 key.attach(null);
430 }
431 }
432 }
433
434 private void processClosedSessions() {
435 IOSession session;
436 while ((session = this.closedSessions.poll()) != null) {
437 if (this.sessions.remove(session)) {
438 try {
439 sessionClosed(session);
440 } catch (final CancelledKeyException ex) {
441
442 }
443 }
444 }
445 }
446
447 private void processPendingInterestOps() {
448
449 if (!this.interestOpsQueueing) {
450 return;
451 }
452 InterestOpEntry entry;
453 while ((entry = this.interestOpsQueue.poll()) != null) {
454
455 final SelectionKey key = entry.getSelectionKey();
456 final int eventMask = entry.getEventMask();
457 if (key.isValid()) {
458 key.interestOps(eventMask);
459 }
460 }
461 }
462
463 private boolean queueInterestOps(final InterestOpEntry entry) {
464
465 Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled");
466 if (entry == null) {
467 return false;
468 }
469
470
471 this.interestOpsQueue.add(entry);
472
473 return true;
474 }
475
476
477
478
479
480
481
482
483
484
485 protected void timeoutCheck(final SelectionKey key, final long now) {
486 final IOSessionImpl session = (IOSessionImpl) key.attachment();
487 if (session != null) {
488 final int timeout = session.getSocketTimeout();
489 if (timeout > 0) {
490 if (session.getLastAccessTime() + timeout < now) {
491 sessionTimedOut(session);
492 }
493 }
494 }
495 }
496
497
498
499
500 protected void closeSessions() {
501 synchronized (this.sessions) {
502 for (final IOSession session : this.sessions) {
503 session.close();
504 }
505 }
506 }
507
508
509
510
511
512
513 protected void closeNewChannels() throws IOReactorException {
514 ChannelEntry entry;
515 while ((entry = this.newChannels.poll()) != null) {
516 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
517 if (sessionRequest != null) {
518 sessionRequest.cancel();
519 }
520 final SocketChannel channel = entry.getChannel();
521 try {
522 channel.close();
523 } catch (final IOException ignore) {
524 }
525 }
526 }
527
528
529
530
531
532
533 protected void closeActiveChannels() throws IOReactorException {
534 try {
535 final Set<SelectionKey> keys = this.selector.keys();
536 for (final SelectionKey key : keys) {
537 final IOSession session = getSession(key);
538 if (session != null) {
539 session.close();
540 }
541 }
542 this.selector.close();
543 } catch (final IOException ignore) {
544 }
545 }
546
547
548
549
550 public void gracefulShutdown() {
551 synchronized (this.statusMutex) {
552 if (this.status != IOReactorStatus.ACTIVE) {
553
554 return;
555 }
556 this.status = IOReactorStatus.SHUTTING_DOWN;
557 }
558 this.selector.wakeup();
559 }
560
561
562
563
564 public void hardShutdown() throws IOReactorException {
565 synchronized (this.statusMutex) {
566 if (this.status == IOReactorStatus.SHUT_DOWN) {
567
568 return;
569 }
570 this.status = IOReactorStatus.SHUT_DOWN;
571 }
572
573 closeNewChannels();
574 closeActiveChannels();
575 processClosedSessions();
576 }
577
578
579
580
581
582
583
584
585 public void awaitShutdown(final long timeout) throws InterruptedException {
586 synchronized (this.statusMutex) {
587 final long deadline = System.currentTimeMillis() + timeout;
588 long remaining = timeout;
589 while (this.status != IOReactorStatus.SHUT_DOWN) {
590 this.statusMutex.wait(remaining);
591 if (timeout > 0) {
592 remaining = deadline - System.currentTimeMillis();
593 if (remaining <= 0) {
594 break;
595 }
596 }
597 }
598 }
599 }
600
601 public void shutdown(final long gracePeriod) throws IOReactorException {
602 if (this.status != IOReactorStatus.INACTIVE) {
603 gracefulShutdown();
604 try {
605 awaitShutdown(gracePeriod);
606 } catch (final InterruptedException ignore) {
607 }
608 }
609 if (this.status != IOReactorStatus.SHUT_DOWN) {
610 hardShutdown();
611 }
612 }
613
614 public void shutdown() throws IOReactorException {
615 shutdown(1000);
616 }
617
618 }