1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.net.Socket;
33 import java.nio.channels.Channel;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.ClosedSelectorException;
36 import java.nio.channels.SelectableChannel;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.Selector;
39 import java.util.ArrayList;
40 import java.util.Date;
41 import java.util.List;
42 import java.util.concurrent.ThreadFactory;
43
44 import org.apache.http.annotation.ThreadSafe;
45 import org.apache.http.nio.params.NIOReactorParams;
46 import org.apache.http.nio.reactor.IOEventDispatch;
47 import org.apache.http.nio.reactor.IOReactor;
48 import org.apache.http.nio.reactor.IOReactorException;
49 import org.apache.http.nio.reactor.IOReactorExceptionHandler;
50 import org.apache.http.nio.reactor.IOReactorStatus;
51 import org.apache.http.params.BasicHttpParams;
52 import org.apache.http.params.HttpConnectionParams;
53 import org.apache.http.params.HttpParams;
54 import org.apache.http.util.Args;
55 import org.apache.http.util.Asserts;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @SuppressWarnings("deprecation")
98 @ThreadSafe
99 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
100
101 protected volatile IOReactorStatus status;
102
103
104
105
106 @Deprecated
107 protected final HttpParams params;
108 protected final IOReactorConfig config;
109 protected final Selector selector;
110 protected final long selectTimeout;
111 protected final boolean interestOpsQueueing;
112
113 private final int workerCount;
114 private final ThreadFactory threadFactory;
115 private final BaseIOReactor[] dispatchers;
116 private final Worker[] workers;
117 private final Thread[] threads;
118 private final Object statusLock;
119
120 protected IOReactorExceptionHandler exceptionHandler;
121 protected List<ExceptionEvent> auditLog;
122
123 private int currentWorker = 0;
124
125
126
127
128
129
130
131
132
133
134
135 public AbstractMultiworkerIOReactor(
136 final IOReactorConfig config,
137 final ThreadFactory threadFactory) throws IOReactorException {
138 super();
139 this.config = config != null ? config : IOReactorConfig.DEFAULT;
140 this.params = new BasicHttpParams();
141 try {
142 this.selector = Selector.open();
143 } catch (final IOException ex) {
144 throw new IOReactorException("Failure opening selector", ex);
145 }
146 this.selectTimeout = this.config.getSelectInterval();
147 this.interestOpsQueueing = this.config.isInterestOpQueued();
148 this.statusLock = new Object();
149 if (threadFactory != null) {
150 this.threadFactory = threadFactory;
151 } else {
152 this.threadFactory = new DefaultThreadFactory();
153 }
154 this.workerCount = this.config.getIoThreadCount();
155 this.dispatchers = new BaseIOReactor[workerCount];
156 this.workers = new Worker[workerCount];
157 this.threads = new Thread[workerCount];
158 this.status = IOReactorStatus.INACTIVE;
159 }
160
161
162
163
164
165
166
167
168 public AbstractMultiworkerIOReactor() throws IOReactorException {
169 this(null, null);
170 }
171
172 static IOReactorConfig convert(final int workerCount, final HttpParams params) {
173 Args.notNull(params, "HTTP parameters");
174 final IOReactorConfig config = IOReactorConfig.custom()
175 .setSelectInterval(NIOReactorParams.getSelectInterval(params))
176 .setShutdownGracePeriod(NIOReactorParams.getGracePeriod(params))
177 .setInterestOpQueued(NIOReactorParams.getInterestOpsQueueing(params))
178 .setIoThreadCount(workerCount)
179 .setTcpNoDelay(HttpConnectionParams.getTcpNoDelay(params))
180 .setSoTimeout(HttpConnectionParams.getSoTimeout(params))
181 .setSoLinger(HttpConnectionParams.getLinger(params))
182 .setSoKeepAlive(HttpConnectionParams.getSoKeepalive(params))
183 .setConnectTimeout(HttpConnectionParams.getConnectionTimeout(params))
184 .setSoReuseAddress(HttpConnectionParams.getSoReuseaddr(params))
185 .build();
186
187 return config;
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201 @Deprecated
202 public AbstractMultiworkerIOReactor(
203 final int workerCount,
204 final ThreadFactory threadFactory,
205 final HttpParams params) throws IOReactorException {
206 this(convert(workerCount, params), threadFactory);
207 }
208
209 public IOReactorStatus getStatus() {
210 return this.status;
211 }
212
213
214
215
216
217
218
219 public synchronized List<ExceptionEvent> getAuditLog() {
220 if (this.auditLog != null) {
221 return new ArrayList<ExceptionEvent>(this.auditLog);
222 } else {
223 return null;
224 }
225 }
226
227
228
229
230
231
232
233
234
235 protected synchronized void addExceptionEvent(final Throwable ex, Date timestamp) {
236 if (ex == null) {
237 return;
238 }
239 if (timestamp == null) {
240 timestamp = new Date();
241 }
242 if (this.auditLog == null) {
243 this.auditLog = new ArrayList<ExceptionEvent>();
244 }
245 this.auditLog.add(new ExceptionEvent(ex, timestamp));
246 }
247
248
249
250
251
252
253 protected void addExceptionEvent(final Throwable ex) {
254 addExceptionEvent(ex, null);
255 }
256
257
258
259
260
261
262 public void setExceptionHandler(final IOReactorExceptionHandler exceptionHandler) {
263 this.exceptionHandler = exceptionHandler;
264 }
265
266
267
268
269
270
271
272
273
274 protected abstract void processEvents(int count) throws IOReactorException;
275
276
277
278
279
280
281
282
283 protected abstract void cancelRequests() throws IOReactorException;
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307 public void execute(
308 final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
309 Args.notNull(eventDispatch, "Event dispatcher");
310 synchronized (this.statusLock) {
311 if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
312 this.status = IOReactorStatus.SHUT_DOWN;
313 this.statusLock.notifyAll();
314 return;
315 }
316 Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
317 "Illegal state %s", this.status);
318 this.status = IOReactorStatus.ACTIVE;
319
320 for (int i = 0; i < this.dispatchers.length; i++) {
321 final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
322 dispatcher.setExceptionHandler(exceptionHandler);
323 this.dispatchers[i] = dispatcher;
324 }
325 for (int i = 0; i < this.workerCount; i++) {
326 final BaseIOReactor dispatcher = this.dispatchers[i];
327 this.workers[i] = new Worker(dispatcher, eventDispatch);
328 this.threads[i] = this.threadFactory.newThread(this.workers[i]);
329 }
330 }
331 try {
332
333 for (int i = 0; i < this.workerCount; i++) {
334 if (this.status != IOReactorStatus.ACTIVE) {
335 return;
336 }
337 this.threads[i].start();
338 }
339
340 for (;;) {
341 int readyCount;
342 try {
343 readyCount = this.selector.select(this.selectTimeout);
344 } catch (final InterruptedIOException ex) {
345 throw ex;
346 } catch (final IOException ex) {
347 throw new IOReactorException("Unexpected selector failure", ex);
348 }
349
350 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
351 processEvents(readyCount);
352 }
353
354
355 for (int i = 0; i < this.workerCount; i++) {
356 final Worker worker = this.workers[i];
357 final Exception ex = worker.getException();
358 if (ex != null) {
359 throw new IOReactorException(
360 "I/O dispatch worker terminated abnormally", ex);
361 }
362 }
363
364 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
365 break;
366 }
367 }
368
369 } catch (final ClosedSelectorException ex) {
370 addExceptionEvent(ex);
371 } catch (final IOReactorException ex) {
372 if (ex.getCause() != null) {
373 addExceptionEvent(ex.getCause());
374 }
375 throw ex;
376 } finally {
377 doShutdown();
378 synchronized (this.statusLock) {
379 this.status = IOReactorStatus.SHUT_DOWN;
380 this.statusLock.notifyAll();
381 }
382 }
383 }
384
385
386
387
388
389
390
391
392
393
394
395 protected void doShutdown() throws InterruptedIOException {
396 synchronized (this.statusLock) {
397 if (this.status.compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
398 return;
399 }
400 this.status = IOReactorStatus.SHUTTING_DOWN;
401 }
402 try {
403 cancelRequests();
404 } catch (final IOReactorException ex) {
405 if (ex.getCause() != null) {
406 addExceptionEvent(ex.getCause());
407 }
408 }
409 this.selector.wakeup();
410
411
412 if (this.selector.isOpen()) {
413 for (final SelectionKey key : this.selector.keys()) {
414 try {
415 final Channel channel = key.channel();
416 if (channel != null) {
417 channel.close();
418 }
419 } catch (final IOException ex) {
420 addExceptionEvent(ex);
421 }
422 }
423
424 try {
425 this.selector.close();
426 } catch (final IOException ex) {
427 addExceptionEvent(ex);
428 }
429 }
430
431
432 for (int i = 0; i < this.workerCount; i++) {
433 final BaseIOReactor dispatcher = this.dispatchers[i];
434 dispatcher.gracefulShutdown();
435 }
436
437 final long gracePeriod = this.config.getShutdownGracePeriod();
438
439 try {
440
441
442 for (int i = 0; i < this.workerCount; i++) {
443 final BaseIOReactor dispatcher = this.dispatchers[i];
444 if (dispatcher.getStatus() != IOReactorStatus.INACTIVE) {
445 dispatcher.awaitShutdown(gracePeriod);
446 }
447 if (dispatcher.getStatus() != IOReactorStatus.SHUT_DOWN) {
448 try {
449 dispatcher.hardShutdown();
450 } catch (final IOReactorException ex) {
451 if (ex.getCause() != null) {
452 addExceptionEvent(ex.getCause());
453 }
454 }
455 }
456 }
457
458 for (int i = 0; i < this.workerCount; i++) {
459 final Thread t = this.threads[i];
460 if (t != null) {
461 t.join(gracePeriod);
462 }
463 }
464 } catch (final InterruptedException ex) {
465 throw new InterruptedIOException(ex.getMessage());
466 }
467 }
468
469
470
471
472
473
474 protected void addChannel(final ChannelEntry entry) {
475
476 final int i = Math.abs(this.currentWorker++ % this.workerCount);
477 this.dispatchers[i].addChannel(entry);
478 }
479
480
481
482
483
484
485
486
487
488 protected SelectionKey registerChannel(
489 final SelectableChannel channel, final int ops) throws ClosedChannelException {
490 return channel.register(this.selector, ops);
491 }
492
493
494
495
496
497
498
499 protected void prepareSocket(final Socket socket) throws IOException {
500 socket.setTcpNoDelay(this.config.isTcpNoDelay());
501 socket.setKeepAlive(this.config.isSoKeepalive());
502 socket.setReuseAddress(this.config.isSoReuseAddress());
503 if (this.config.getSoTimeout() > 0) {
504 socket.setSoTimeout(this.config.getSoTimeout());
505 }
506 if (this.config.getSndBufSize() > 0) {
507 socket.setSendBufferSize(this.config.getSndBufSize());
508 }
509 if (this.config.getRcvBufSize() > 0) {
510 socket.setReceiveBufferSize(this.config.getRcvBufSize());
511 }
512 final int linger = this.config.getSoLinger();
513 if (linger >= 0) {
514 socket.setSoLinger(linger > 0, linger);
515 }
516 }
517
518
519
520
521
522
523
524
525
526
527 protected void awaitShutdown(final long timeout) throws InterruptedException {
528 synchronized (this.statusLock) {
529 final long deadline = System.currentTimeMillis() + timeout;
530 long remaining = timeout;
531 while (this.status != IOReactorStatus.SHUT_DOWN) {
532 this.statusLock.wait(remaining);
533 if (timeout > 0) {
534 remaining = deadline - System.currentTimeMillis();
535 if (remaining <= 0) {
536 break;
537 }
538 }
539 }
540 }
541 }
542
543 public void shutdown() throws IOException {
544 shutdown(2000);
545 }
546
547 public void shutdown(final long waitMs) throws IOException {
548 synchronized (this.statusLock) {
549 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
550 return;
551 }
552 if (this.status.compareTo(IOReactorStatus.INACTIVE) == 0) {
553 this.status = IOReactorStatus.SHUT_DOWN;
554 cancelRequests();
555 return;
556 }
557 this.status = IOReactorStatus.SHUTDOWN_REQUEST;
558 }
559 this.selector.wakeup();
560 try {
561 awaitShutdown(waitMs);
562 } catch (final InterruptedException ignore) {
563 }
564 }
565
566 static void closeChannel(final Channel channel) {
567 try {
568 channel.close();
569 } catch (final IOException ignore) {
570 }
571 }
572
573 static class Worker implements Runnable {
574
575 final BaseIOReactor dispatcher;
576 final IOEventDispatch eventDispatch;
577
578 private volatile Exception exception;
579
580 public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) {
581 super();
582 this.dispatcher = dispatcher;
583 this.eventDispatch = eventDispatch;
584 }
585
586 public void run() {
587 try {
588 this.dispatcher.execute(this.eventDispatch);
589 } catch (final Exception ex) {
590 this.exception = ex;
591 }
592 }
593
594 public Exception getException() {
595 return this.exception;
596 }
597
598 }
599
600 static class DefaultThreadFactory implements ThreadFactory {
601
602 private static volatile int COUNT = 0;
603
604 public Thread newThread(final Runnable r) {
605 return new Thread(r, "I/O dispatcher " + (++COUNT));
606 }
607
608 }
609
610 }