1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.net.Socket;
33 import java.nio.channels.Channel;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.ClosedSelectorException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.Selector;
39 import java.util.ArrayList;
40 import java.util.Date;
41 import java.util.Iterator;
42 import java.util.List;
43 import java.util.Set;
44 import java.util.concurrent.ThreadFactory;
45
46 import org.apache.http.annotation.ThreadSafe;
47 import org.apache.http.nio.params.NIOReactorParams;
48 import org.apache.http.nio.reactor.IOEventDispatch;
49 import org.apache.http.nio.reactor.IOReactor;
50 import org.apache.http.nio.reactor.IOReactorException;
51 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
52 import org.apache.http.nio.reactor.IOReactorStatus;
53 import org.apache.http.params.BasicHttpParams;
54 import org.apache.http.params.HttpConnectionParams;
55 import org.apache.http.params.HttpParams;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 @SuppressWarnings("deprecation")
109 @ThreadSafe
110 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
111
112 protected volatile IOReactorStatus status;
113
114
115
116
117 @Deprecated
118 protected final HttpParams params;
119 protected final IOReactorConfig config;
120 protected final Selector selector;
121 protected final long selectTimeout;
122 protected final boolean interestOpsQueueing;
123
124 private final int workerCount;
125 private final ThreadFactory threadFactory;
126 private final BaseIOReactor[] dispatchers;
127 private final Worker[] workers;
128 private final Thread[] threads;
129 private final Object statusLock;
130
131 protected IOReactorExceptionHandler exceptionHandler;
132 protected List<ExceptionEvent> auditLog;
133
134 private int currentWorker = 0;
135
136
137
138
139
140
141
142
143
144
145
146 public AbstractMultiworkerIOReactor(
147 final IOReactorConfig config,
148 final ThreadFactory threadFactory) throws IOReactorException {
149 super();
150 if (config != null) {
151 try {
152 this.config = config.clone();
153 } catch (CloneNotSupportedException ex) {
154 throw new IOReactorException("Unable to clone configuration");
155 }
156 } else {
157 this.config = new IOReactorConfig();
158 }
159 this.params = new BasicHttpParams();
160 try {
161 this.selector = Selector.open();
162 } catch (IOException ex) {
163 throw new IOReactorException("Failure opening selector", ex);
164 }
165 this.selectTimeout = this.config.getSelectInterval();
166 this.interestOpsQueueing = this.config.isInterestOpQueued();
167 this.statusLock = new Object();
168 if (threadFactory != null) {
169 this.threadFactory = threadFactory;
170 } else {
171 this.threadFactory = new DefaultThreadFactory();
172 }
173 this.workerCount = this.config.getIoThreadCount();
174 this.dispatchers = new BaseIOReactor[workerCount];
175 this.workers = new Worker[workerCount];
176 this.threads = new Thread[workerCount];
177 this.status = IOReactorStatus.INACTIVE;
178 }
179
180
181
182
183
184
185
186
187 public AbstractMultiworkerIOReactor() throws IOReactorException {
188 this(null, null);
189 }
190
191 static IOReactorConfig convert(int workerCount, final HttpParams params) {
192 if (params == null) {
193 throw new IllegalArgumentException("HTTP parameters may not be null");
194 }
195 IOReactorConfig config = new IOReactorConfig();
196 config.setSelectInterval(NIOReactorParams.getSelectInterval(params));
197 config.setShutdownGracePeriod(NIOReactorParams.getGracePeriod(params));
198 config.setInterestOpQueued(NIOReactorParams.getInterestOpsQueueing(params));
199 config.setIoThreadCount(workerCount);
200 config.setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(params));
201 config.setSoTimeout(HttpConnectionParams.getSoTimeout(params));
202 config.setSoLinger(HttpConnectionParams.getLinger(params));
203 config.setSoKeepalive(HttpConnectionParams.getSoKeepalive(params));
204 config.setConnectTimeout(HttpConnectionParams.getConnectionTimeout(params));
205 config.setSoReuseAddress(HttpConnectionParams.getSoReuseaddr(params));
206
207 return config;
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221 @Deprecated
222 public AbstractMultiworkerIOReactor(
223 int workerCount,
224 final ThreadFactory threadFactory,
225 final HttpParams params) throws IOReactorException {
226 this(convert(workerCount, params), threadFactory);
227 }
228
229 public IOReactorStatus getStatus() {
230 return this.status;
231 }
232
233
234
235
236
237
238
239 public synchronized List<ExceptionEvent> getAuditLog() {
240 if (this.auditLog != null) {
241 return new ArrayList<ExceptionEvent>(this.auditLog);
242 } else {
243 return null;
244 }
245 }
246
247
248
249
250
251
252
253
254
255 protected synchronized void addExceptionEvent(final Throwable ex, Date timestamp) {
256 if (ex == null) {
257 return;
258 }
259 if (timestamp == null) {
260 timestamp = new Date();
261 }
262 if (this.auditLog == null) {
263 this.auditLog = new ArrayList<ExceptionEvent>();
264 }
265 this.auditLog.add(new ExceptionEvent(ex, timestamp));
266 }
267
268
269
270
271
272
273 protected void addExceptionEvent(final Throwable ex) {
274 addExceptionEvent(ex, null);
275 }
276
277
278
279
280
281
282 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
283 this.exceptionHandler = exceptionHandler;
284 }
285
286
287
288
289
290
291
292
293
294 protected abstract void processEvents(int count) throws IOReactorException;
295
296
297
298
299
300
301
302
303 protected abstract void cancelRequests() throws IOReactorException;
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327 public void execute(
328 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
329 if (eventDispatch == null) {
330 throw new IllegalArgumentException("Event dispatcher may not be null");
331 }
332 synchronized (this.statusLock) {
333 if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
334 this.status = IOReactorStatus.SHUT_DOWN;
335 this.statusLock.notifyAll();
336 return;
337 }
338 if (this.status.compareTo(IOReactorStatus.INACTIVE) != 0) {
339 throw new IllegalStateException("Illegal state: " + this.status);
340 }
341 this.status = IOReactorStatus.ACTIVE;
342
343 for (int i = 0; i < this.dispatchers.length; i++) {
344 BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
345 dispatcher.setExceptionHandler(exceptionHandler);
346 this.dispatchers[i] = dispatcher;
347 }
348 for (int i = 0; i < this.workerCount; i++) {
349 BaseIOReactor dispatcher = this.dispatchers[i];
350 this.workers[i] = new Worker(dispatcher, eventDispatch);
351 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
352 }
353 }
354 try {
355
356 for (int i = 0; i < this.workerCount; i++) {
357 if (this.status != IOReactorStatus.ACTIVE) {
358 return;
359 }
360 this.threads[i].start();
361 }
362
363 for (;;) {
364 int readyCount;
365 try {
366 readyCount = this.selector.select(this.selectTimeout);
367 } catch (InterruptedIOException ex) {
368 throw ex;
369 } catch (IOException ex) {
370 throw new IOReactorException("Unexpected selector failure", ex);
371 }
372
373 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
374 processEvents(readyCount);
375 }
376
377
378 for (int i = 0; i < this.workerCount; i++) {
379 Worker worker = this.workers[i];
380 Exception ex = worker.getException();
381 if (ex != null) {
382 throw new IOReactorException(
383 "I/O dispatch worker terminated abnormally", ex);
384 }
385 }
386
387 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
388 break;
389 }
390 }
391
392 } catch (ClosedSelectorException ex) {
393 addExceptionEvent(ex);
394 } catch (IOReactorException ex) {
395 if (ex.getCause() != null) {
396 addExceptionEvent(ex.getCause());
397 }
398 throw ex;
399 } finally {
400 doShutdown();
401 synchronized (this.statusLock) {
402 this.status = IOReactorStatus.SHUT_DOWN;
403 this.statusLock.notifyAll();
404 }
405 }
406 }
407
408
409
410
411
412
413
414
415
416
417
418 protected void doShutdown() throws InterruptedIOException {
419 synchronized (this.statusLock) {
420 if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
421 return;
422 }
423 this.status = IOReactorStatus.SHUTTING_DOWN;
424 }
425 try {
426 cancelRequests();
427 } catch (IOReactorException ex) {
428 if (ex.getCause() != null) {
429 addExceptionEvent(ex.getCause());
430 }
431 }
432 this.selector.wakeup();
433
434
435 if (this.selector.isOpen()) {
436 Set<SelectionKey> keys = this.selector.keys();
437 for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
438 try {
439 SelectionKey key = it.next();
440 Channel channel = key.channel();
441 if (channel != null) {
442 channel.close();
443 }
444 } catch (IOException ex) {
445 addExceptionEvent(ex);
446 }
447 }
448
449 try {
450 this.selector.close();
451 } catch (IOException ex) {
452 addExceptionEvent(ex);
453 }
454 }
455
456
457 for (int i = 0; i < this.workerCount; i++) {
458 BaseIOReactor dispatcher = this.dispatchers[i];
459 dispatcher.gracefulShutdown();
460 }
461
462 long gracePeriod = this.config.getShutdownGracePeriod();
463
464 try {
465
466
467 for (int i = 0; i < this.workerCount; i++) {
468 BaseIOReactor dispatcher = this.dispatchers[i];
469 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
470 dispatcher.awaitShutdown(gracePeriod);
471 }
472 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
473 try {
474 dispatcher.hardShutdown();
475 } catch (IOReactorException ex) {
476 if (ex.getCause() != null) {
477 addExceptionEvent(ex.getCause());
478 }
479 }
480 }
481 }
482
483 for (int i = 0; i < this.workerCount; i++) {
484 Thread t = this.threads[i];
485 if (t != null) {
486 t.join(gracePeriod);
487 }
488 }
489 } catch (InterruptedException ex) {
490 throw new InterruptedIOException(ex.getMessage());
491 }
492 }
493
494
495
496
497
498
499 protected void addChannel(final ChannelEntry entry) {
500
501 int i = Math.abs(this.currentWorker++ % this.workerCount);
502 this.dispatchers[i].addChannel(entry);
503 }
504
505
506
507
508
509
510
511
512
513 protected SelectionKey registerChannel(
514 final SelectableChannel channel, int ops) throws ClosedChannelException {
515 return channel.register(this.selector, ops);
516 }
517
518
519
520
521
522
523
524 protected void prepareSocket(final Socket socket) throws IOException {
525 socket.setTcpNoDelay(this.config.isTcpNoDelay());
526 socket.setKeepAlive(this.config.isSoKeepalive());
527 socket.setReuseAddress(this.config.isSoReuseAddress());
528 if (this.config.getSoTimeout() > 0) {
529 socket.setSoTimeout(this.config.getSoTimeout());
530 }
531 if (this.config.getSndBufSize() > 0) {
532 socket.setSendBufferSize(this.config.getSndBufSize());
533 }
534 if (this.config.getRcvBufSize() > 0) {
535 socket.setReceiveBufferSize(this.config.getRcvBufSize());
536 }
537 int linger = this.config.getSoLinger();
538 if (linger >= 0) {
539 socket.setSoLinger(linger > 0, linger);
540 }
541 }
542
543
544
545
546
547
548
549
550
551
552 protected void awaitShutdown(long timeout) throws InterruptedException {
553 synchronized (this.statusLock) {
554 long deadline = System.currentTimeMillis() + timeout;
555 long remaining = timeout;
556 while (this.status != IOReactorStatus.SHUT_DOWN) {
557 this.statusLock.wait(remaining);
558 if (timeout > 0) {
559 remaining = deadline - System.currentTimeMillis();
560 if (remaining <= 0) {
561 break;
562 }
563 }
564 }
565 }
566 }
567
568 public void shutdown() throws IOException {
569 shutdown(2000);
570 }
571
572 public void shutdown(long waitMs) throws IOException {
573 synchronized (this.statusLock) {
574 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
575 return;
576 }
577 if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
578 this.status = IOReactorStatus.SHUT_DOWN;
579 cancelRequests();
580 return;
581 }
582 this.status = IOReactorStatus.SHUTDOWN_REQUEST;
583 }
584 this.selector.wakeup();
585 try {
586 awaitShutdown(waitMs);
587 } catch (InterruptedException ignore) {
588 }
589 }
590
591 static void closeChannel(final Channel channel) {
592 try {
593 channel.close();
594 } catch (IOException ignore) {
595 }
596 }
597
598 static class Worker implements Runnable {
599
600 final BaseIOReactor dispatcher;
601 final IOEventDispatch eventDispatch;
602
603 private volatile Exception exception;
604
605 public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
606 super();
607 this.dispatcher = dispatcher;
608 this.eventDispatch = eventDispatch;
609 }
610
611 public void run() {
612 try {
613 this.dispatcher.execute(this.eventDispatch);
614 } catch (Exception ex) {
615 this.exception = ex;
616 }
617 }
618
619 public Exception getException() {
620 return this.exception;
621 }
622
623 }
624
625 static class DefaultThreadFactory implements ThreadFactory {
626
627 private static volatile int COUNT = 0;
628
629 public Thread newThread(final Runnable r) {
630 return new Thread(r, "I/O dispatcher " + (++COUNT));
631 }
632
633 }
634
635 }