1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.net.SocketTimeoutException;
33
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpConnection;
36 import org.apache.http.HttpEntity;
37 import org.apache.http.HttpEntityEnclosingRequest;
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpRequest;
40 import org.apache.http.HttpResponse;
41 import org.apache.http.HttpResponseFactory;
42 import org.apache.http.HttpStatus;
43 import org.apache.http.HttpVersion;
44 import org.apache.http.MethodNotSupportedException;
45 import org.apache.http.ProtocolException;
46 import org.apache.http.UnsupportedHttpVersionException;
47 import org.apache.http.annotation.Immutable;
48 import org.apache.http.concurrent.Cancellable;
49 import org.apache.http.entity.ContentType;
50 import org.apache.http.impl.DefaultHttpResponseFactory;
51 import org.apache.http.nio.ContentDecoder;
52 import org.apache.http.nio.ContentEncoder;
53 import org.apache.http.nio.NHttpConnection;
54 import org.apache.http.nio.NHttpServerConnection;
55 import org.apache.http.nio.NHttpServerEventHandler;
56 import org.apache.http.nio.entity.NStringEntity;
57 import org.apache.http.params.DefaultedHttpParams;
58 import org.apache.http.params.HttpConnectionParams;
59 import org.apache.http.params.HttpParams;
60 import org.apache.http.protocol.BasicHttpContext;
61 import org.apache.http.protocol.ExecutionContext;
62 import org.apache.http.protocol.HttpContext;
63 import org.apache.http.protocol.HttpProcessor;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 @Immutable
103 public class HttpAsyncService implements NHttpServerEventHandler {
104
105 static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
106
107 private final HttpProcessor httpProcessor;
108 private final ConnectionReuseStrategy connStrategy;
109 private final HttpResponseFactory responseFactory;
110 private final HttpAsyncRequestHandlerResolver handlerResolver;
111 private final HttpAsyncExpectationVerifier expectationVerifier;
112 private final HttpParams params;
113
114
115
116
117
118
119
120
121
122
123
124 public HttpAsyncService(
125 final HttpProcessor httpProcessor,
126 final ConnectionReuseStrategy connStrategy,
127 final HttpResponseFactory responseFactory,
128 final HttpAsyncRequestHandlerResolver handlerResolver,
129 final HttpAsyncExpectationVerifier expectationVerifier,
130 final HttpParams params) {
131 super();
132 if (httpProcessor == null) {
133 throw new IllegalArgumentException("HTTP processor may not be null.");
134 }
135 if (connStrategy == null) {
136 throw new IllegalArgumentException("Connection reuse strategy may not be null");
137 }
138 if (responseFactory == null) {
139 throw new IllegalArgumentException("Response factory may not be null");
140 }
141 if (params == null) {
142 throw new IllegalArgumentException("HTTP parameters may not be null");
143 }
144 this.httpProcessor = httpProcessor;
145 this.connStrategy = connStrategy;
146 this.responseFactory = responseFactory;
147 this.handlerResolver = handlerResolver;
148 this.expectationVerifier = expectationVerifier;
149 this.params = params;
150 }
151
152
153
154
155
156
157
158
159
160 public HttpAsyncService(
161 final HttpProcessor httpProcessor,
162 final ConnectionReuseStrategy connStrategy,
163 final HttpAsyncRequestHandlerResolver handlerResolver,
164 final HttpParams params) {
165 this(httpProcessor, connStrategy, new DefaultHttpResponseFactory(),
166 handlerResolver, null, params);
167 }
168
169 public void connected(final NHttpServerConnection conn) {
170 State state = new State();
171 conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
172 }
173
174 public void closed(final NHttpServerConnection conn) {
175 State state = getState(conn);
176 if (state != null) {
177 state.setTerminated();
178 closeHandlers(state);
179 Cancellable cancellable = state.getCancellable();
180 if (cancellable != null) {
181 cancellable.cancel();
182 }
183 state.reset();
184 }
185 }
186
187 public void exception(
188 final NHttpServerConnection conn, final Exception cause) {
189 State state = ensureNotNull(getState(conn));
190 if (state != null) {
191 state.setTerminated();
192 closeHandlers(state, cause);
193 Cancellable cancellable = state.getCancellable();
194 if (cancellable != null) {
195 cancellable.cancel();
196 }
197 if (cause instanceof HttpException) {
198 if (conn.isResponseSubmitted()
199 || state.getResponseState().compareTo(MessageState.INIT) > 0) {
200
201
202 closeConnection(conn);
203 log(cause);
204 } else {
205 HttpContext context = state.getContext();
206 HttpAsyncResponseProducer responseProducer = handleException(
207 cause, context);
208 state.setResponseProducer(responseProducer);
209 try {
210 HttpResponse response = responseProducer.generateResponse();
211 state.setResponse(response);
212 commitFinalResponse(conn, state);
213 } catch (Exception ex) {
214 shutdownConnection(conn);
215 closeHandlers(state);
216 if (ex instanceof RuntimeException) {
217 throw (RuntimeException) ex;
218 } else {
219 log(ex);
220 }
221 }
222 }
223 } else {
224 shutdownConnection(conn);
225 }
226 } else {
227 shutdownConnection(conn);
228 log(cause);
229 }
230 }
231
232 public void requestReceived(
233 final NHttpServerConnection conn) throws IOException, HttpException {
234 State state = ensureNotNull(getState(conn));
235 if (state.getResponseState() != MessageState.READY) {
236 throw new ProtocolException("Out of sequence request message detected (pipelining is not supported)");
237 }
238 HttpRequest request = conn.getHttpRequest();
239 HttpContext context = state.getContext();
240 request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
241
242 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
243 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
244 this.httpProcessor.process(request, context);
245
246 state.setRequest(request);
247 HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
248 state.setRequestHandler(requestHandler);
249 HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
250 state.setRequestConsumer(consumer);
251
252 consumer.requestReceived(request);
253
254 if (request instanceof HttpEntityEnclosingRequest) {
255 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
256 state.setRequestState(MessageState.ACK_EXPECTED);
257 HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
258 HttpStatus.SC_CONTINUE, context);
259 if (this.expectationVerifier != null) {
260 conn.suspendInput();
261 HttpAsyncExchange httpex = new Exchange(
262 request, ack, state, conn);
263 this.expectationVerifier.verify(httpex, context);
264 } else {
265 conn.submitResponse(ack);
266 state.setRequestState(MessageState.BODY_STREAM);
267 }
268 } else {
269 state.setRequestState(MessageState.BODY_STREAM);
270 }
271 } else {
272
273
274 processRequest(conn, state);
275 }
276 }
277
278 public void inputReady(
279 final NHttpServerConnection conn,
280 final ContentDecoder decoder) throws IOException, HttpException {
281 State state = ensureNotNull(getState(conn));
282 HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
283 consumer.consumeContent(decoder, conn);
284 state.setRequestState(MessageState.BODY_STREAM);
285 if (decoder.isCompleted()) {
286 processRequest(conn, state);
287 }
288 }
289
290 public void responseReady(
291 final NHttpServerConnection conn) throws IOException, HttpException {
292 State state = ensureNotNull(getState(conn));
293 if (state.getResponse() != null) {
294 return;
295 }
296 HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
297 if (responseProducer == null) {
298 return;
299 }
300 HttpContext context = state.getContext();
301 HttpResponse response = responseProducer.generateResponse();
302 int status = response.getStatusLine().getStatusCode();
303 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
304 if (status == 100) {
305 try {
306
307 response.setEntity(null);
308 conn.requestInput();
309 state.setRequestState(MessageState.BODY_STREAM);
310 conn.submitResponse(response);
311 responseProducer.responseCompleted(context);
312 } finally {
313 state.setResponseProducer(null);
314 responseProducer.close();
315 }
316 } else if (status >= 400) {
317 conn.resetInput();
318 state.setRequestState(MessageState.COMPLETED);
319 state.setResponse(response);
320 commitFinalResponse(conn, state);
321 } else {
322 throw new HttpException("Invalid response: " + response.getStatusLine());
323 }
324 } else {
325 if (status >= 200) {
326 state.setResponse(response);
327 commitFinalResponse(conn, state);
328 } else {
329 throw new HttpException("Invalid response: " + response.getStatusLine());
330 }
331 }
332 }
333
334 public void outputReady(
335 final NHttpServerConnection conn,
336 final ContentEncoder encoder) throws IOException {
337 State state = ensureNotNull(getState(conn));
338 HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
339 HttpContext context = state.getContext();
340 HttpResponse response = state.getResponse();
341
342 responseProducer.produceContent(encoder, conn);
343 state.setResponseState(MessageState.BODY_STREAM);
344 if (encoder.isCompleted()) {
345 responseProducer.responseCompleted(context);
346 if (!this.connStrategy.keepAlive(response, context)) {
347 conn.close();
348 } else {
349 conn.requestInput();
350 }
351 closeHandlers(state);
352 state.reset();
353 conn.setSocketTimeout(HttpConnectionParams.getSoTimeout(this.params));
354 }
355 }
356
357 public void endOfInput(final NHttpServerConnection conn) throws IOException {
358
359
360
361
362 if (conn.getSocketTimeout() <= 0) {
363 conn.setSocketTimeout(1000);
364 }
365 conn.close();
366 }
367
368 public void timeout(final NHttpServerConnection conn) throws IOException {
369 State state = getState(conn);
370 if (state != null) {
371 closeHandlers(state, new SocketTimeoutException());
372 }
373 if (conn.getStatus() == NHttpConnection.ACTIVE) {
374 conn.close();
375 if (conn.getStatus() == NHttpConnection.CLOSING) {
376
377
378 conn.setSocketTimeout(250);
379 }
380 } else {
381 conn.shutdown();
382 }
383 }
384
385 private State getState(final NHttpConnection conn) {
386 return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
387 }
388
389 private State ensureNotNull(final State state) {
390 if (state == null) {
391 throw new IllegalStateException("HTTP exchange state is null");
392 }
393 return state;
394 }
395
396 private HttpAsyncRequestConsumer<Object> ensureNotNull(final HttpAsyncRequestConsumer<Object> requestConsumer) {
397 if (requestConsumer == null) {
398 throw new IllegalStateException("Request consumer is null");
399 }
400 return requestConsumer;
401 }
402
403
404
405
406
407
408
409 protected void log(final Exception ex) {
410 }
411
412 private void closeConnection(final NHttpConnection conn) {
413 try {
414 conn.close();
415 } catch (IOException ex) {
416 log(ex);
417 }
418 }
419
420 private void shutdownConnection(final NHttpConnection conn) {
421 try {
422 conn.shutdown();
423 } catch (IOException ex) {
424 log(ex);
425 }
426 }
427
428 private void closeHandlers(final State state, final Exception ex) {
429 HttpAsyncRequestConsumer<Object> consumer = state.getRequestConsumer();
430 if (consumer != null) {
431 try {
432 consumer.failed(ex);
433 } finally {
434 try {
435 consumer.close();
436 } catch (IOException ioex) {
437 log(ioex);
438 }
439 }
440 }
441 HttpAsyncResponseProducer producer = state.getResponseProducer();
442 if (producer != null) {
443 try {
444 producer.failed(ex);
445 } finally {
446 try {
447 producer.close();
448 } catch (IOException ioex) {
449 log(ioex);
450 }
451 }
452 }
453 }
454
455 private void closeHandlers(final State state) {
456 HttpAsyncRequestConsumer<Object> consumer = state.getRequestConsumer();
457 if (consumer != null) {
458 try {
459 consumer.close();
460 } catch (IOException ioex) {
461 log(ioex);
462 }
463 }
464 HttpAsyncResponseProducer producer = state.getResponseProducer();
465 if (producer != null) {
466 try {
467 producer.close();
468 } catch (IOException ioex) {
469 log(ioex);
470 }
471 }
472 }
473
474 protected HttpAsyncResponseProducer handleException(
475 final Exception ex, final HttpContext context) {
476 int code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
477 if (ex instanceof MethodNotSupportedException) {
478 code = HttpStatus.SC_NOT_IMPLEMENTED;
479 } else if (ex instanceof UnsupportedHttpVersionException) {
480 code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
481 } else if (ex instanceof ProtocolException) {
482 code = HttpStatus.SC_BAD_REQUEST;
483 }
484 String message = ex.getMessage();
485 if (message == null) {
486 message = ex.toString();
487 }
488 HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
489 code, context);
490 return new ErrorResponseProducer(response,
491 new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
492 }
493
494 private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
495 if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
496 return false;
497 }
498 int status = response.getStatusLine().getStatusCode();
499 return status >= HttpStatus.SC_OK
500 && status != HttpStatus.SC_NO_CONTENT
501 && status != HttpStatus.SC_NOT_MODIFIED
502 && status != HttpStatus.SC_RESET_CONTENT;
503 }
504
505 private void processRequest(
506 final NHttpServerConnection conn,
507 final State state) throws HttpException, IOException {
508 HttpAsyncRequestHandler<Object> handler = state.getRequestHandler();
509 HttpContext context = state.getContext();
510 HttpAsyncRequestConsumer<?> consumer = state.getRequestConsumer();
511 consumer.requestCompleted(context);
512 state.setRequestState(MessageState.COMPLETED);
513 state.setResponseState(MessageState.INIT);
514 Exception exception = consumer.getException();
515 if (exception != null) {
516 HttpAsyncResponseProducer responseProducer = handleException(exception, context);
517 state.setResponseProducer(responseProducer);
518 conn.requestOutput();
519 } else {
520 HttpRequest request = state.getRequest();
521 Object result = consumer.getResult();
522 HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
523 HttpStatus.SC_OK, context);
524 Exchange httpexchange = new Exchange(request, response, state, conn);
525 try {
526 handler.handle(result, httpexchange, context);
527 } catch (HttpException ex) {
528 HttpAsyncResponseProducer responseProducer = handleException(ex, context);
529 state.setResponseProducer(responseProducer);
530 conn.requestOutput();
531 }
532 }
533 }
534
535 private void commitFinalResponse(
536 final NHttpServerConnection conn,
537 final State state) throws IOException, HttpException {
538 HttpContext context = state.getContext();
539 HttpRequest request = state.getRequest();
540 HttpResponse response = state.getResponse();
541
542 response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
543 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
544 this.httpProcessor.process(response, context);
545
546 HttpEntity entity = response.getEntity();
547 if (entity != null && !canResponseHaveBody(request, response)) {
548 response.setEntity(null);
549 entity = null;
550 }
551
552 conn.submitResponse(response);
553
554 if (entity == null) {
555 HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
556 responseProducer.responseCompleted(context);
557 if (!this.connStrategy.keepAlive(response, context)) {
558 conn.close();
559 } else {
560
561 conn.requestInput();
562 }
563 closeHandlers(state);
564 state.reset();
565 conn.setSocketTimeout(HttpConnectionParams.getSoTimeout(this.params));
566 } else {
567 state.setResponseState(MessageState.BODY_STREAM);
568 }
569 }
570
571 @SuppressWarnings("unchecked")
572 private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
573 HttpAsyncRequestHandler<Object> handler = null;
574 if (this.handlerResolver != null) {
575 String requestURI = request.getRequestLine().getUri();
576 handler = (HttpAsyncRequestHandler<Object>) this.handlerResolver.lookup(requestURI);
577 }
578 if (handler == null) {
579 handler = new NullRequestHandler();
580 }
581 return handler;
582 }
583
584 static class State {
585
586 private final BasicHttpContext context;
587 private volatile boolean terminated;
588 private volatile HttpAsyncRequestHandler<Object> requestHandler;
589 private volatile MessageState requestState;
590 private volatile MessageState responseState;
591 private volatile HttpAsyncRequestConsumer<Object> requestConsumer;
592 private volatile HttpAsyncResponseProducer responseProducer;
593 private volatile HttpRequest request;
594 private volatile HttpResponse response;
595 private volatile Cancellable cancellable;
596
597 State() {
598 super();
599 this.context = new BasicHttpContext();
600 this.requestState = MessageState.READY;
601 this.responseState = MessageState.READY;
602 }
603
604 public HttpContext getContext() {
605 return this.context;
606 }
607
608 public boolean isTerminated() {
609 return this.terminated;
610 }
611
612 public void setTerminated() {
613 this.terminated = true;
614 }
615
616 public HttpAsyncRequestHandler<Object> getRequestHandler() {
617 return this.requestHandler;
618 }
619
620 public void setRequestHandler(final HttpAsyncRequestHandler<Object> requestHandler) {
621 this.requestHandler = requestHandler;
622 }
623
624 public MessageState getRequestState() {
625 return this.requestState;
626 }
627
628 public void setRequestState(final MessageState state) {
629 this.requestState = state;
630 }
631
632 public MessageState getResponseState() {
633 return this.responseState;
634 }
635
636 public void setResponseState(final MessageState state) {
637 this.responseState = state;
638 }
639
640 public HttpAsyncRequestConsumer<Object> getRequestConsumer() {
641 return this.requestConsumer;
642 }
643
644 public void setRequestConsumer(final HttpAsyncRequestConsumer<Object> requestConsumer) {
645 this.requestConsumer = requestConsumer;
646 }
647
648 public HttpAsyncResponseProducer getResponseProducer() {
649 return this.responseProducer;
650 }
651
652 public void setResponseProducer(final HttpAsyncResponseProducer responseProducer) {
653 this.responseProducer = responseProducer;
654 }
655
656 public HttpRequest getRequest() {
657 return this.request;
658 }
659
660 public void setRequest(final HttpRequest request) {
661 this.request = request;
662 }
663
664 public HttpResponse getResponse() {
665 return this.response;
666 }
667
668 public void setResponse(final HttpResponse response) {
669 this.response = response;
670 }
671
672 public Cancellable getCancellable() {
673 return this.cancellable;
674 }
675
676 public void setCancellable(final Cancellable cancellable) {
677 this.cancellable = cancellable;
678 }
679
680 public void reset() {
681 this.context.clear();
682 this.responseState = MessageState.READY;
683 this.requestState = MessageState.READY;
684 this.requestHandler = null;
685 this.requestConsumer = null;
686 this.responseProducer = null;
687 this.request = null;
688 this.response = null;
689 this.cancellable = null;
690 }
691
692 @Override
693 public String toString() {
694 StringBuilder buf = new StringBuilder();
695 buf.append("request state: ");
696 buf.append(this.requestState);
697 buf.append("; request: ");
698 if (this.request != null) {
699 buf.append(this.request.getRequestLine());
700 }
701 buf.append("; response state: ");
702 buf.append(this.responseState);
703 buf.append("; response: ");
704 if (this.response != null) {
705 buf.append(this.response.getStatusLine());
706 }
707 buf.append(";");
708 return buf.toString();
709 }
710
711 }
712
713 static class Exchange implements HttpAsyncExchange {
714
715 private final HttpRequest request;
716 private final HttpResponse response;
717 private final State state;
718 private final NHttpServerConnection conn;
719
720 private volatile boolean completed;
721
722 public Exchange(
723 final HttpRequest request,
724 final HttpResponse response,
725 final State state,
726 final NHttpServerConnection conn) {
727 super();
728 this.request = request;
729 this.response = response;
730 this.state = state;
731 this.conn = conn;
732 }
733
734 public HttpRequest getRequest() {
735 return this.request;
736 }
737
738 public HttpResponse getResponse() {
739 return this.response;
740 }
741
742 public void setCallback(final Cancellable cancellable) {
743 synchronized (this) {
744 if (this.completed) {
745 throw new IllegalStateException("Response already submitted");
746 }
747 if (this.state.isTerminated() && cancellable != null) {
748 cancellable.cancel();
749 } else {
750 this.state.setCancellable(cancellable);
751 this.conn.requestInput();
752 }
753 }
754 }
755
756 public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
757 if (responseProducer == null) {
758 throw new IllegalArgumentException("Response producer may not be null");
759 }
760 synchronized (this) {
761 if (this.completed) {
762 throw new IllegalStateException("Response already submitted");
763 }
764 this.completed = true;
765 if (!this.state.isTerminated()) {
766 this.state.setResponseProducer(responseProducer);
767 this.state.setCancellable(null);
768 this.conn.requestOutput();
769 } else {
770 try {
771 responseProducer.close();
772 } catch (IOException ex) {
773 }
774 }
775 }
776 }
777
778 public void submitResponse() {
779 submitResponse(new BasicAsyncResponseProducer(this.response));
780 }
781
782 public boolean isCompleted() {
783 return this.completed;
784 }
785
786 public void setTimeout(int timeout) {
787 this.conn.setSocketTimeout(timeout);
788 }
789
790 public int getTimeout() {
791 return this.conn.getSocketTimeout();
792 }
793
794 }
795
796 }