1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.net.SocketTimeoutException;
33
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpConnection;
36 import org.apache.http.HttpEntity;
37 import org.apache.http.HttpEntityEnclosingRequest;
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpRequest;
40 import org.apache.http.HttpResponse;
41 import org.apache.http.HttpResponseFactory;
42 import org.apache.http.HttpStatus;
43 import org.apache.http.HttpVersion;
44 import org.apache.http.MethodNotSupportedException;
45 import org.apache.http.ProtocolException;
46 import org.apache.http.UnsupportedHttpVersionException;
47 import org.apache.http.annotation.Immutable;
48 import org.apache.http.concurrent.Cancellable;
49 import org.apache.http.entity.ContentType;
50 import org.apache.http.impl.DefaultConnectionReuseStrategy;
51 import org.apache.http.impl.DefaultHttpResponseFactory;
52 import org.apache.http.nio.ContentDecoder;
53 import org.apache.http.nio.ContentEncoder;
54 import org.apache.http.nio.NHttpConnection;
55 import org.apache.http.nio.NHttpServerConnection;
56 import org.apache.http.nio.NHttpServerEventHandler;
57 import org.apache.http.nio.entity.NStringEntity;
58 import org.apache.http.params.HttpParams;
59 import org.apache.http.protocol.BasicHttpContext;
60 import org.apache.http.protocol.ExecutionContext;
61 import org.apache.http.protocol.HttpContext;
62 import org.apache.http.protocol.HttpProcessor;
63 import org.apache.http.util.Args;
64 import org.apache.http.util.Asserts;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 @SuppressWarnings("deprecation")
98 @Immutable
99 public class HttpAsyncService implements NHttpServerEventHandler {
100
101 static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
102
103 private final HttpProcessor httpProcessor;
104 private final ConnectionReuseStrategy connStrategy;
105 private final HttpResponseFactory responseFactory;
106 private final HttpAsyncRequestHandlerMapper handlerMapper;
107 private final HttpAsyncExpectationVerifier expectationVerifier;
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 @Deprecated
124 public HttpAsyncService(
125 final HttpProcessor httpProcessor,
126 final ConnectionReuseStrategy connStrategy,
127 final HttpResponseFactory responseFactory,
128 final HttpAsyncRequestHandlerResolver handlerResolver,
129 final HttpAsyncExpectationVerifier expectationVerifier,
130 final HttpParams params) {
131 this(httpProcessor,
132 connStrategy,
133 responseFactory,
134 new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
135 expectationVerifier);
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150 @Deprecated
151 public HttpAsyncService(
152 final HttpProcessor httpProcessor,
153 final ConnectionReuseStrategy connStrategy,
154 final HttpAsyncRequestHandlerResolver handlerResolver,
155 final HttpParams params) {
156 this(httpProcessor,
157 connStrategy,
158 DefaultHttpResponseFactory.INSTANCE,
159 new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
160 null);
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176 public HttpAsyncService(
177 final HttpProcessor httpProcessor,
178 final ConnectionReuseStrategy connStrategy,
179 final HttpResponseFactory responseFactory,
180 final HttpAsyncRequestHandlerMapper handlerMapper,
181 final HttpAsyncExpectationVerifier expectationVerifier) {
182 super();
183 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
184 this.connStrategy = connStrategy != null ? connStrategy :
185 DefaultConnectionReuseStrategy.INSTANCE;
186 this.responseFactory = responseFactory != null ? responseFactory :
187 DefaultHttpResponseFactory.INSTANCE;
188 this.handlerMapper = handlerMapper;
189 this.expectationVerifier = expectationVerifier;
190 }
191
192
193
194
195
196
197
198
199
200 public HttpAsyncService(
201 final HttpProcessor httpProcessor,
202 final HttpAsyncRequestHandlerMapper handlerMapper) {
203 this(httpProcessor, null, null, handlerMapper, null);
204 }
205
206 public void connected(final NHttpServerConnection conn) {
207 final State state = new State();
208 conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
209 }
210
211 public void closed(final NHttpServerConnection conn) {
212 final State state = getState(conn);
213 if (state != null) {
214 state.setTerminated();
215 closeHandlers(state);
216 final Cancellable cancellable = state.getCancellable();
217 if (cancellable != null) {
218 cancellable.cancel();
219 }
220 state.reset();
221 }
222 }
223
224 public void exception(
225 final NHttpServerConnection conn, final Exception cause) {
226 final State state = ensureNotNull(getState(conn));
227 if (state != null) {
228 state.setTerminated();
229 closeHandlers(state, cause);
230 final Cancellable cancellable = state.getCancellable();
231 if (cancellable != null) {
232 cancellable.cancel();
233 }
234 if (cause instanceof HttpException) {
235 if (conn.isResponseSubmitted()
236 || state.getResponseState().compareTo(MessageState.INIT) > 0) {
237
238
239 closeConnection(conn);
240 log(cause);
241 } else {
242 final HttpContext context = state.getContext();
243 final HttpAsyncResponseProducer responseProducer = handleException(
244 cause, context);
245 state.setResponseProducer(responseProducer);
246 try {
247 final HttpResponse response = responseProducer.generateResponse();
248 state.setResponse(response);
249 commitFinalResponse(conn, state);
250 } catch (final Exception ex) {
251 shutdownConnection(conn);
252 closeHandlers(state);
253 if (ex instanceof RuntimeException) {
254 throw (RuntimeException) ex;
255 } else {
256 log(ex);
257 }
258 }
259 }
260 } else {
261 shutdownConnection(conn);
262 }
263 } else {
264 shutdownConnection(conn);
265 log(cause);
266 }
267 }
268
269 public void requestReceived(
270 final NHttpServerConnection conn) throws IOException, HttpException {
271 final State state = ensureNotNull(getState(conn));
272 if (state.getResponseState() != MessageState.READY) {
273 throw new ProtocolException("Out of sequence request message detected (pipelining is not supported)");
274 }
275 final HttpRequest request = conn.getHttpRequest();
276 final HttpContext context = state.getContext();
277
278 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
279 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
280 this.httpProcessor.process(request, context);
281
282 state.setRequest(request);
283 final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
284 state.setRequestHandler(requestHandler);
285 final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
286 state.setRequestConsumer(consumer);
287
288 consumer.requestReceived(request);
289
290 if (request instanceof HttpEntityEnclosingRequest) {
291 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
292 state.setRequestState(MessageState.ACK_EXPECTED);
293 final HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
294 HttpStatus.SC_CONTINUE, context);
295 if (this.expectationVerifier != null) {
296 conn.suspendInput();
297 final HttpAsyncExchange httpex = new Exchange(
298 request, ack, state, conn);
299 this.expectationVerifier.verify(httpex, context);
300 } else {
301 conn.submitResponse(ack);
302 state.setRequestState(MessageState.BODY_STREAM);
303 }
304 } else {
305 state.setRequestState(MessageState.BODY_STREAM);
306 }
307 } else {
308
309
310 processRequest(conn, state);
311 }
312 }
313
314 public void inputReady(
315 final NHttpServerConnection conn,
316 final ContentDecoder decoder) throws IOException, HttpException {
317 final State state = ensureNotNull(getState(conn));
318 final HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
319 consumer.consumeContent(decoder, conn);
320 state.setRequestState(MessageState.BODY_STREAM);
321 if (decoder.isCompleted()) {
322 processRequest(conn, state);
323 }
324 }
325
326 public void responseReady(
327 final NHttpServerConnection conn) throws IOException, HttpException {
328 final State state = ensureNotNull(getState(conn));
329 if (state.getResponse() != null) {
330 return;
331 }
332 final HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
333 if (responseProducer == null) {
334 return;
335 }
336 final HttpContext context = state.getContext();
337 final HttpResponse response = responseProducer.generateResponse();
338 final int status = response.getStatusLine().getStatusCode();
339 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
340 if (status == 100) {
341 try {
342
343 response.setEntity(null);
344 conn.requestInput();
345 state.setRequestState(MessageState.BODY_STREAM);
346 conn.submitResponse(response);
347 responseProducer.responseCompleted(context);
348 } finally {
349 state.setResponseProducer(null);
350 responseProducer.close();
351 }
352 } else if (status >= 400) {
353 conn.resetInput();
354 state.setRequestState(MessageState.COMPLETED);
355 state.setResponse(response);
356 commitFinalResponse(conn, state);
357 } else {
358 throw new HttpException("Invalid response: " + response.getStatusLine());
359 }
360 } else {
361 if (status >= 200) {
362 state.setResponse(response);
363 commitFinalResponse(conn, state);
364 } else {
365 throw new HttpException("Invalid response: " + response.getStatusLine());
366 }
367 }
368 }
369
370 public void outputReady(
371 final NHttpServerConnection conn,
372 final ContentEncoder encoder) throws IOException {
373 final State state = ensureNotNull(getState(conn));
374 final HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
375 final HttpContext context = state.getContext();
376 final HttpResponse response = state.getResponse();
377
378 responseProducer.produceContent(encoder, conn);
379 state.setResponseState(MessageState.BODY_STREAM);
380 if (encoder.isCompleted()) {
381 responseProducer.responseCompleted(context);
382 if (!this.connStrategy.keepAlive(response, context)) {
383 conn.close();
384 } else {
385 conn.requestInput();
386 }
387 closeHandlers(state);
388 state.reset();
389 }
390 }
391
392 public void endOfInput(final NHttpServerConnection conn) throws IOException {
393
394
395
396
397 if (conn.getSocketTimeout() <= 0) {
398 conn.setSocketTimeout(1000);
399 }
400 conn.close();
401 }
402
403 public void timeout(final NHttpServerConnection conn) throws IOException {
404 final State state = getState(conn);
405 if (state != null) {
406 closeHandlers(state, new SocketTimeoutException());
407 }
408 if (conn.getStatus() == NHttpConnection.ACTIVE) {
409 conn.close();
410 if (conn.getStatus() == NHttpConnection.CLOSING) {
411
412
413 conn.setSocketTimeout(250);
414 }
415 } else {
416 conn.shutdown();
417 }
418 }
419
420 private State getState(final NHttpConnection conn) {
421 return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
422 }
423
424 private State ensureNotNull(final State state) {
425 Asserts.notNull(state, "HTTP exchange state");
426 return state;
427 }
428
429 private HttpAsyncRequestConsumer<Object> ensureNotNull(final HttpAsyncRequestConsumer<Object> requestConsumer) {
430 Asserts.notNull(requestConsumer, "Request consumer");
431 return requestConsumer;
432 }
433
434
435
436
437
438
439
440 protected void log(final Exception ex) {
441 }
442
443 private void closeConnection(final NHttpConnection conn) {
444 try {
445 conn.close();
446 } catch (final IOException ex) {
447 log(ex);
448 }
449 }
450
451 private void shutdownConnection(final NHttpConnection conn) {
452 try {
453 conn.shutdown();
454 } catch (final IOException ex) {
455 log(ex);
456 }
457 }
458
459 private void closeHandlers(final State state, final Exception ex) {
460 final HttpAsyncRequestConsumer<Object> consumer = state.getRequestConsumer();
461 if (consumer != null) {
462 try {
463 consumer.failed(ex);
464 } finally {
465 try {
466 consumer.close();
467 } catch (final IOException ioex) {
468 log(ioex);
469 }
470 }
471 }
472 final HttpAsyncResponseProducer producer = state.getResponseProducer();
473 if (producer != null) {
474 try {
475 producer.failed(ex);
476 } finally {
477 try {
478 producer.close();
479 } catch (final IOException ioex) {
480 log(ioex);
481 }
482 }
483 }
484 }
485
486 private void closeHandlers(final State state) {
487 final HttpAsyncRequestConsumer<Object> consumer = state.getRequestConsumer();
488 if (consumer != null) {
489 try {
490 consumer.close();
491 } catch (final IOException ioex) {
492 log(ioex);
493 }
494 }
495 final HttpAsyncResponseProducer producer = state.getResponseProducer();
496 if (producer != null) {
497 try {
498 producer.close();
499 } catch (final IOException ioex) {
500 log(ioex);
501 }
502 }
503 }
504
505 protected HttpAsyncResponseProducer handleException(
506 final Exception ex, final HttpContext context) {
507 int code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
508 if (ex instanceof MethodNotSupportedException) {
509 code = HttpStatus.SC_NOT_IMPLEMENTED;
510 } else if (ex instanceof UnsupportedHttpVersionException) {
511 code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
512 } else if (ex instanceof ProtocolException) {
513 code = HttpStatus.SC_BAD_REQUEST;
514 }
515 String message = ex.getMessage();
516 if (message == null) {
517 message = ex.toString();
518 }
519 final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
520 code, context);
521 return new ErrorResponseProducer(response,
522 new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
523 }
524
525 private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
526 if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
527 return false;
528 }
529 final int status = response.getStatusLine().getStatusCode();
530 return status >= HttpStatus.SC_OK
531 && status != HttpStatus.SC_NO_CONTENT
532 && status != HttpStatus.SC_NOT_MODIFIED
533 && status != HttpStatus.SC_RESET_CONTENT;
534 }
535
536 private void processRequest(
537 final NHttpServerConnection conn,
538 final State state) throws HttpException, IOException {
539 final HttpAsyncRequestHandler<Object> handler = state.getRequestHandler();
540 final HttpContext context = state.getContext();
541 final HttpAsyncRequestConsumer<?> consumer = state.getRequestConsumer();
542 consumer.requestCompleted(context);
543 state.setRequestState(MessageState.COMPLETED);
544 state.setResponseState(MessageState.INIT);
545 final Exception exception = consumer.getException();
546 if (exception != null) {
547 final HttpAsyncResponseProducer responseProducer = handleException(exception, context);
548 state.setResponseProducer(responseProducer);
549 conn.requestOutput();
550 } else {
551 final HttpRequest request = state.getRequest();
552 final Object result = consumer.getResult();
553 final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
554 HttpStatus.SC_OK, context);
555 final Exchange httpexchange = new Exchange(request, response, state, conn);
556 try {
557 handler.handle(result, httpexchange, context);
558 } catch (final HttpException ex) {
559 final HttpAsyncResponseProducer responseProducer = handleException(ex, context);
560 state.setResponseProducer(responseProducer);
561 conn.requestOutput();
562 }
563 }
564 }
565
566 private void commitFinalResponse(
567 final NHttpServerConnection conn,
568 final State state) throws IOException, HttpException {
569 final HttpContext context = state.getContext();
570 final HttpRequest request = state.getRequest();
571 final HttpResponse response = state.getResponse();
572
573 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
574 this.httpProcessor.process(response, context);
575
576 HttpEntity entity = response.getEntity();
577 if (entity != null && !canResponseHaveBody(request, response)) {
578 response.setEntity(null);
579 entity = null;
580 }
581
582 conn.submitResponse(response);
583
584 if (entity == null) {
585 final HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
586 responseProducer.responseCompleted(context);
587 if (!this.connStrategy.keepAlive(response, context)) {
588 conn.close();
589 } else {
590
591 conn.requestInput();
592 }
593 closeHandlers(state);
594 state.reset();
595 } else {
596 state.setResponseState(MessageState.BODY_STREAM);
597 }
598 }
599
600 @SuppressWarnings("unchecked")
601 private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
602 HttpAsyncRequestHandler<Object> handler = null;
603 if (this.handlerMapper != null) {
604 handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
605 }
606 if (handler == null) {
607 handler = new NullRequestHandler();
608 }
609 return handler;
610 }
611
612 static class State {
613
614 private final BasicHttpContext context;
615 private volatile boolean terminated;
616 private volatile HttpAsyncRequestHandler<Object> requestHandler;
617 private volatile MessageState requestState;
618 private volatile MessageState responseState;
619 private volatile HttpAsyncRequestConsumer<Object> requestConsumer;
620 private volatile HttpAsyncResponseProducer responseProducer;
621 private volatile HttpRequest request;
622 private volatile HttpResponse response;
623 private volatile Cancellable cancellable;
624
625 State() {
626 super();
627 this.context = new BasicHttpContext();
628 this.requestState = MessageState.READY;
629 this.responseState = MessageState.READY;
630 }
631
632 public HttpContext getContext() {
633 return this.context;
634 }
635
636 public boolean isTerminated() {
637 return this.terminated;
638 }
639
640 public void setTerminated() {
641 this.terminated = true;
642 }
643
644 public HttpAsyncRequestHandler<Object> getRequestHandler() {
645 return this.requestHandler;
646 }
647
648 public void setRequestHandler(final HttpAsyncRequestHandler<Object> requestHandler) {
649 this.requestHandler = requestHandler;
650 }
651
652 public MessageState getRequestState() {
653 return this.requestState;
654 }
655
656 public void setRequestState(final MessageState state) {
657 this.requestState = state;
658 }
659
660 public MessageState getResponseState() {
661 return this.responseState;
662 }
663
664 public void setResponseState(final MessageState state) {
665 this.responseState = state;
666 }
667
668 public HttpAsyncRequestConsumer<Object> getRequestConsumer() {
669 return this.requestConsumer;
670 }
671
672 public void setRequestConsumer(final HttpAsyncRequestConsumer<Object> requestConsumer) {
673 this.requestConsumer = requestConsumer;
674 }
675
676 public HttpAsyncResponseProducer getResponseProducer() {
677 return this.responseProducer;
678 }
679
680 public void setResponseProducer(final HttpAsyncResponseProducer responseProducer) {
681 this.responseProducer = responseProducer;
682 }
683
684 public HttpRequest getRequest() {
685 return this.request;
686 }
687
688 public void setRequest(final HttpRequest request) {
689 this.request = request;
690 }
691
692 public HttpResponse getResponse() {
693 return this.response;
694 }
695
696 public void setResponse(final HttpResponse response) {
697 this.response = response;
698 }
699
700 public Cancellable getCancellable() {
701 return this.cancellable;
702 }
703
704 public void setCancellable(final Cancellable cancellable) {
705 this.cancellable = cancellable;
706 }
707
708 public void reset() {
709 this.context.clear();
710 this.responseState = MessageState.READY;
711 this.requestState = MessageState.READY;
712 this.requestHandler = null;
713 this.requestConsumer = null;
714 this.responseProducer = null;
715 this.request = null;
716 this.response = null;
717 this.cancellable = null;
718 }
719
720 @Override
721 public String toString() {
722 final StringBuilder buf = new StringBuilder();
723 buf.append("request state: ");
724 buf.append(this.requestState);
725 buf.append("; request: ");
726 if (this.request != null) {
727 buf.append(this.request.getRequestLine());
728 }
729 buf.append("; response state: ");
730 buf.append(this.responseState);
731 buf.append("; response: ");
732 if (this.response != null) {
733 buf.append(this.response.getStatusLine());
734 }
735 buf.append(";");
736 return buf.toString();
737 }
738
739 }
740
741 static class Exchange implements HttpAsyncExchange {
742
743 private final HttpRequest request;
744 private final HttpResponse response;
745 private final State state;
746 private final NHttpServerConnection conn;
747
748 private volatile boolean completed;
749
750 public Exchange(
751 final HttpRequest request,
752 final HttpResponse response,
753 final State state,
754 final NHttpServerConnection conn) {
755 super();
756 this.request = request;
757 this.response = response;
758 this.state = state;
759 this.conn = conn;
760 }
761
762 public HttpRequest getRequest() {
763 return this.request;
764 }
765
766 public HttpResponse getResponse() {
767 return this.response;
768 }
769
770 public void setCallback(final Cancellable cancellable) {
771 synchronized (this) {
772 Asserts.check(!this.completed, "Response already submitted");
773 if (this.state.isTerminated() && cancellable != null) {
774 cancellable.cancel();
775 } else {
776 this.state.setCancellable(cancellable);
777 this.conn.requestInput();
778 }
779 }
780 }
781
782 public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
783 Args.notNull(responseProducer, "Response producer");
784 synchronized (this) {
785 Asserts.check(!this.completed, "Response already submitted");
786 this.completed = true;
787 if (!this.state.isTerminated()) {
788 this.state.setResponseProducer(responseProducer);
789 this.state.setCancellable(null);
790 this.conn.requestOutput();
791 } else {
792 try {
793 responseProducer.close();
794 } catch (final IOException ex) {
795 }
796 }
797 }
798 }
799
800 public void submitResponse() {
801 submitResponse(new BasicAsyncResponseProducer(this.response));
802 }
803
804 public boolean isCompleted() {
805 return this.completed;
806 }
807
808 public void setTimeout(final int timeout) {
809 this.conn.setSocketTimeout(timeout);
810 }
811
812 public int getTimeout() {
813 return this.conn.getSocketTimeout();
814 }
815
816 }
817
818
819
820
821 @Deprecated
822 private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
823
824 private final HttpAsyncRequestHandlerResolver resolver;
825
826 public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
827 this.resolver = resolver;
828 }
829
830 public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
831 return resolver.lookup(request.getRequestLine().getUri());
832 }
833
834 }
835
836 }