View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.http.ConnectionReuseStrategy;
37  import org.apache.http.ExceptionLogger;
38  import org.apache.http.HttpEntity;
39  import org.apache.http.HttpEntityEnclosingRequest;
40  import org.apache.http.HttpException;
41  import org.apache.http.HttpRequest;
42  import org.apache.http.HttpResponse;
43  import org.apache.http.HttpResponseFactory;
44  import org.apache.http.HttpStatus;
45  import org.apache.http.HttpVersion;
46  import org.apache.http.MethodNotSupportedException;
47  import org.apache.http.ProtocolException;
48  import org.apache.http.UnsupportedHttpVersionException;
49  import org.apache.http.annotation.Immutable;
50  import org.apache.http.concurrent.Cancellable;
51  import org.apache.http.entity.ContentType;
52  import org.apache.http.impl.DefaultConnectionReuseStrategy;
53  import org.apache.http.impl.DefaultHttpResponseFactory;
54  import org.apache.http.nio.ContentDecoder;
55  import org.apache.http.nio.ContentEncoder;
56  import org.apache.http.nio.NHttpConnection;
57  import org.apache.http.nio.NHttpServerConnection;
58  import org.apache.http.nio.NHttpServerEventHandler;
59  import org.apache.http.nio.entity.NStringEntity;
60  import org.apache.http.nio.reactor.SessionBufferStatus;
61  import org.apache.http.params.HttpParams;
62  import org.apache.http.protocol.BasicHttpContext;
63  import org.apache.http.protocol.HttpContext;
64  import org.apache.http.protocol.HttpCoreContext;
65  import org.apache.http.protocol.HttpProcessor;
66  import org.apache.http.util.Args;
67  import org.apache.http.util.Asserts;
68  
69  /**
70   * {@code HttpAsyncService} is a fully asynchronous HTTP server side protocol
71   * handler based on the non-blocking (NIO) I/O model.
72   * {@code HttpAsyncServerProtocolHandler} translates individual events fired
73   * through the {@link NHttpServerEventHandler} interface into logically related
74   * HTTP message exchanges.
75   * <p>
76   * Upon receiving an incoming request {@code HttpAsyncService} verifies
77   * the message for compliance with the server expectations using
78   * {@link HttpAsyncExpectationVerifier}, if provided, and then
79   * {@link HttpAsyncRequestHandlerMapper} is used to map the request
80   * to a particular {@link HttpAsyncRequestHandler} intended to handle
81   * the request with the given URI. The protocol handler uses the selected
82   * {@link HttpAsyncRequestHandler} instance to process the incoming request
83   * and to generate an outgoing response.
84   * <p>
85   * {@code HttpAsyncService} relies on {@link HttpProcessor} to generate
86   * mandatory protocol headers for all outgoing messages and apply common,
87   * cross-cutting message transformations to all incoming and outgoing messages,
88   * whereas individual {@link HttpAsyncRequestHandler}s are expected
89   * to implement application specific content generation and processing.
90   * <p>
91   * Individual {@link HttpAsyncRequestHandler}s do not have to submit a response
92   * immediately. They can defer transmission of an HTTP response back to
93   * the client without blocking the I/O thread by delegating the process of
94   * request handling to another service or a worker thread. HTTP response can
95   * be submitted as a later a later point of time once response content becomes
96   * available.
97   *
98   * @since 4.2
99   */
100 @SuppressWarnings("deprecation")
101 @Immutable // provided injected dependencies are immutable
102 public class HttpAsyncService implements NHttpServerEventHandler {
103 
104     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
105 
106     private final HttpProcessor httpProcessor;
107     private final ConnectionReuseStrategy connStrategy;
108     private final HttpResponseFactory responseFactory;
109     private final HttpAsyncRequestHandlerMapper handlerMapper;
110     private final HttpAsyncExpectationVerifier expectationVerifier;
111     private final ExceptionLogger exceptionLogger;
112 
113     /**
114      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
115      *
116      * @param httpProcessor HTTP protocol processor (required).
117      * @param connStrategy Connection re-use strategy (required).
118      * @param responseFactory HTTP response factory (required).
119      * @param handlerResolver Request handler resolver.
120      * @param expectationVerifier Request expectation verifier (optional).
121      * @param params HTTP parameters (required).
122      *
123      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
124      *  ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
125      *    HttpAsyncExpectationVerifier)}
126      */
127     @Deprecated
128     public HttpAsyncService(
129             final HttpProcessor httpProcessor,
130             final ConnectionReuseStrategy connStrategy,
131             final HttpResponseFactory responseFactory,
132             final HttpAsyncRequestHandlerResolver handlerResolver,
133             final HttpAsyncExpectationVerifier expectationVerifier,
134             final HttpParams params) {
135         this(httpProcessor,
136              connStrategy,
137              responseFactory,
138              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
139              expectationVerifier);
140     }
141 
142     /**
143      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
144      *
145      * @param httpProcessor HTTP protocol processor (required).
146      * @param connStrategy Connection re-use strategy (required).
147      * @param handlerResolver Request handler resolver.
148      * @param params HTTP parameters (required).
149      *
150      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
151      *   ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
152      *   HttpAsyncExpectationVerifier)}
153      */
154     @Deprecated
155     public HttpAsyncService(
156             final HttpProcessor httpProcessor,
157             final ConnectionReuseStrategy connStrategy,
158             final HttpAsyncRequestHandlerResolver handlerResolver,
159             final HttpParams params) {
160         this(httpProcessor,
161              connStrategy,
162              DefaultHttpResponseFactory.INSTANCE,
163              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
164              null);
165     }
166 
167     /**
168      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
169      *
170      * @param httpProcessor HTTP protocol processor.
171      * @param connStrategy Connection re-use strategy. If {@code null}
172      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
173      * @param responseFactory HTTP response factory. If {@code null}
174      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
175      * @param handlerMapper Request handler mapper.
176      * @param expectationVerifier Request expectation verifier. May be {@code null}.
177      *
178      * @since 4.3
179      */
180     public HttpAsyncService(
181             final HttpProcessor httpProcessor,
182             final ConnectionReuseStrategy connStrategy,
183             final HttpResponseFactory responseFactory,
184             final HttpAsyncRequestHandlerMapper handlerMapper,
185             final HttpAsyncExpectationVerifier expectationVerifier) {
186         this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null);
187     }
188 
189     /**
190      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
191      *
192      * @param httpProcessor HTTP protocol processor.
193      * @param connStrategy Connection re-use strategy. If {@code null}
194      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
195      * @param responseFactory HTTP response factory. If {@code null}
196      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
197      * @param handlerMapper Request handler mapper.
198      * @param expectationVerifier Request expectation verifier. May be {@code null}.
199      * @param exceptionLogger Exception logger. If {@code null}
200      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
201      *   logger will be only used to log I/O exception thrown while closing
202      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
203      *
204      * @since 4.4
205      */
206     public HttpAsyncService(
207             final HttpProcessor httpProcessor,
208             final ConnectionReuseStrategy connStrategy,
209             final HttpResponseFactory responseFactory,
210             final HttpAsyncRequestHandlerMapper handlerMapper,
211             final HttpAsyncExpectationVerifier expectationVerifier,
212             final ExceptionLogger exceptionLogger) {
213         super();
214         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
215         this.connStrategy = connStrategy != null ? connStrategy :
216                 DefaultConnectionReuseStrategy.INSTANCE;
217         this.responseFactory = responseFactory != null ? responseFactory :
218                 DefaultHttpResponseFactory.INSTANCE;
219         this.handlerMapper = handlerMapper;
220         this.expectationVerifier = expectationVerifier;
221         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
222     }
223 
224     /**
225      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
226      *
227      * @param httpProcessor HTTP protocol processor.
228      * @param handlerMapper Request handler mapper.
229      *
230      * @since 4.3
231      */
232     public HttpAsyncService(
233             final HttpProcessor httpProcessor,
234             final HttpAsyncRequestHandlerMapper handlerMapper) {
235         this(httpProcessor, null, null, handlerMapper, null);
236     }
237 
238     /**
239      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
240      *
241      * @param httpProcessor HTTP protocol processor.
242      * @param handlerMapper Request handler mapper.
243      * @param exceptionLogger Exception logger. If {@code null}
244      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
245      *   logger will be only used to log I/O exception thrown while closing
246      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
247      *
248      * @since 4.4
249      */
250     public HttpAsyncService(
251             final HttpProcessor httpProcessor,
252             final HttpAsyncRequestHandlerMapper handlerMapper,
253             final ExceptionLogger exceptionLogger) {
254         this(httpProcessor, null, null, handlerMapper, null, exceptionLogger);
255     }
256 
257     @Override
258     public void connected(final NHttpServerConnection conn) {
259         final State state = new State();
260         conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
261     }
262 
263     @Override
264     public void closed(final NHttpServerConnection conn) {
265         final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE);
266         if (state != null) {
267             state.setTerminated();
268             closeHandlers(state);
269             final Cancellable cancellable = state.getCancellable();
270             if (cancellable != null) {
271                 cancellable.cancel();
272             }
273         }
274     }
275 
276     @Override
277     public void exception(
278             final NHttpServerConnection conn, final Exception cause) {
279         final State state = getState(conn);
280         if (state == null) {
281             shutdownConnection(conn);
282             log(cause);
283             return;
284         }
285         state.setTerminated();
286         closeHandlers(state, cause);
287         final Cancellable cancellable = state.getCancellable();
288         if (cancellable != null) {
289             cancellable.cancel();
290         }
291         final Queue<PipelineEntry> pipeline = state.getPipeline();
292         if (!pipeline.isEmpty()
293                 || conn.isResponseSubmitted()
294                 || state.getResponseState().compareTo(MessageState.INIT) > 0) {
295             // There is not much that we can do if a response
296             // has already been submitted or pipelining is being used.
297             shutdownConnection(conn);
298         } else {
299             try {
300                 final Incoming incoming = state.getIncoming();
301                 final HttpRequest request = incoming != null ? incoming.getRequest() : null;
302                 final HttpContext context = incoming != null ? incoming.getContext() : new BasicHttpContext();
303                 final HttpAsyncResponseProducer responseProducer = handleException(cause, context);
304                 final HttpResponse response = responseProducer.generateResponse();
305                 final Outgoing outgoing = new Outgoing(request, response, responseProducer, context);
306                 state.setResponseState(MessageState.INIT);
307                 state.setOutgoing(outgoing);
308                 commitFinalResponse(conn, state);
309             } catch (final Exception ex) {
310                 shutdownConnection(conn);
311                 closeHandlers(state);
312                 if (ex instanceof RuntimeException) {
313                     throw (RuntimeException) ex;
314                 } else {
315                     log(ex);
316                 }
317             }
318         }
319     }
320 
321     @Override
322     public void requestReceived(
323             final NHttpServerConnection conn) throws IOException, HttpException {
324         final State state = getState(conn);
325         Asserts.notNull(state, "Connection state");
326         Asserts.check(state.getRequestState() == MessageState.READY,
327                 "Unexpected request state %s", state.getRequestState());
328 
329         final HttpRequest request = conn.getHttpRequest();
330         final HttpContext context = new BasicHttpContext();
331 
332         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
333         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
334         this.httpProcessor.process(request, context);
335 
336         final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
337         final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
338         consumer.requestReceived(request);
339 
340         final Incoming incoming = new Incoming(request, requestHandler, consumer, context);
341         state.setIncoming(incoming);
342 
343         if (request instanceof HttpEntityEnclosingRequest) {
344 
345             // If 100-continue is expected make sure
346             // there is no pending response data, no pipelined requests or buffered input
347             if (((HttpEntityEnclosingRequest) request).expectContinue()
348                         && state.getResponseState() == MessageState.READY
349                         && state.getPipeline().isEmpty()
350                         && !(conn instanceof SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) {
351 
352                 state.setRequestState(MessageState.ACK_EXPECTED);
353                 final HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
354                         HttpStatus.SC_CONTINUE, context);
355                 if (this.expectationVerifier != null) {
356                     conn.suspendInput();
357                     conn.suspendOutput();
358                     final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl(
359                             request, ack, state, conn, context);
360                     this.expectationVerifier.verify(httpAsyncExchange, context);
361                 } else {
362                     conn.submitResponse(ack);
363                     state.setRequestState(MessageState.BODY_STREAM);
364                 }
365             } else {
366                 state.setRequestState(MessageState.BODY_STREAM);
367             }
368         } else {
369             // No request content is expected. Process request right away
370             completeRequest(incoming, conn, state);
371         }
372     }
373 
374     @Override
375     public void inputReady(
376             final NHttpServerConnection conn,
377             final ContentDecoder decoder) throws IOException, HttpException {
378         final State state = getState(conn);
379         Asserts.notNull(state, "Connection state");
380         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM,
381                 "Unexpected request state %s", state.getRequestState());
382 
383         final Incoming incoming = state.getIncoming();
384         Asserts.notNull(incoming, "Incoming request");
385         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
386         consumer.consumeContent(decoder, conn);
387         if (decoder.isCompleted()) {
388             completeRequest(incoming, conn, state);
389         }
390     }
391 
392     @Override
393     public void responseReady(
394             final NHttpServerConnection conn) throws IOException, HttpException {
395         final State state = getState(conn);
396         Asserts.notNull(state, "Connection state");
397         Asserts.check(state.getResponseState() == MessageState.READY ||
398                         state.getResponseState() == MessageState.INIT,
399                 "Unexpected response state %s", state.getResponseState());
400 
401         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
402             final Outgoing outgoing = state.getOutgoing();
403             Asserts.notNull(outgoing, "Outgoing response");
404 
405             final HttpResponse response = outgoing.getResponse();
406             final int status = response.getStatusLine().getStatusCode();
407             if (status == 100) {
408                 final HttpContext context = outgoing.getContext();
409                 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
410                 try {
411                     // Make sure 100 response has no entity
412                     response.setEntity(null);
413                     conn.requestInput();
414                     state.setRequestState(MessageState.BODY_STREAM);
415                     state.setOutgoing(null);
416                     conn.submitResponse(response);
417                     responseProducer.responseCompleted(context);
418                 } finally {
419                     responseProducer.close();
420                 }
421             } else if (status >= 400) {
422                 conn.resetInput();
423                 state.setRequestState(MessageState.READY);
424                 commitFinalResponse(conn, state);
425             } else {
426                 throw new HttpException("Invalid response: " + response.getStatusLine());
427             }
428         } else {
429             if (state.getResponseState() == MessageState.READY) {
430                 final Queue<PipelineEntry> pipeline = state.getPipeline();
431                 final PipelineEntry pipelineEntry = pipeline.poll();
432                 if (pipelineEntry == null) {
433                     conn.suspendOutput();
434                     return;
435                 }
436                 state.setResponseState(MessageState.INIT);
437                 final Object result = pipelineEntry.getResult();
438                 final HttpRequest request = pipelineEntry.getRequest();
439                 final HttpContext context = pipelineEntry.getContext();
440                 if (result != null) {
441                     final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
442                             HttpStatus.SC_OK, context);
443                     final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
444                             request, response, state, conn, context);
445                     final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler();
446                     conn.suspendOutput();
447                     try {
448                         handler.handle(result, httpExchange, context);
449                     } catch (RuntimeException ex) {
450                         throw ex;
451                     } catch (Exception ex) {
452                         pipeline.add(new PipelineEntry(
453                             request,
454                             null,
455                             ex,
456                             handler,
457                             context));
458                         state.setResponseState(MessageState.READY);
459                         responseReady(conn);
460                         return;
461                     }
462                 } else {
463                     final Exception exception = pipelineEntry.getException();
464                     final HttpAsyncResponseProducer responseProducer = handleException(
465                             exception != null ? exception : new HttpException("Internal error processing request"),
466                             context);
467                     final HttpResponse error = responseProducer.generateResponse();
468                     state.setOutgoing(new Outgoing(request, error, responseProducer, context));
469                 }
470             }
471             if (state.getResponseState() == MessageState.INIT) {
472                 final Outgoing outgoing;
473                 synchronized (state) {
474                     outgoing = state.getOutgoing();
475                     if (outgoing == null) {
476                         conn.suspendOutput();
477                         return;
478                     }
479                 }
480                 final HttpResponse response = outgoing.getResponse();
481                 final int status = response.getStatusLine().getStatusCode();
482                 if (status >= 200) {
483                     commitFinalResponse(conn, state);
484                 } else {
485                     throw new HttpException("Invalid response: " + response.getStatusLine());
486                 }
487             }
488         }
489     }
490 
491     @Override
492     public void outputReady(
493             final NHttpServerConnection conn,
494             final ContentEncoder encoder) throws HttpException, IOException {
495         final State state = getState(conn);
496         Asserts.notNull(state, "Connection state");
497         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
498                 "Unexpected response state %s", state.getResponseState());
499 
500         final Outgoing outgoing = state.getOutgoing();
501         Asserts.notNull(outgoing, "Outgoing response");
502         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
503 
504         responseProducer.produceContent(encoder, conn);
505 
506         if (encoder.isCompleted()) {
507             completeResponse(outgoing, conn, state);
508         }
509     }
510 
511     @Override
512     public void endOfInput(final NHttpServerConnection conn) throws IOException {
513         // Closing connection in an orderly manner and
514         // waiting for output buffer to get flushed.
515         // Do not want to wait indefinitely, though, in case
516         // the opposite end is not reading
517         if (conn.getSocketTimeout() <= 0) {
518             conn.setSocketTimeout(1000);
519         }
520         conn.close();
521     }
522 
523     @Override
524     public void timeout(final NHttpServerConnection conn) throws IOException {
525         final State state = getState(conn);
526         if (state != null) {
527             closeHandlers(state, new SocketTimeoutException());
528         }
529         if (conn.getStatus() == NHttpConnection.ACTIVE) {
530             conn.close();
531             if (conn.getStatus() == NHttpConnection.CLOSING) {
532                 // Give the connection some grace time to
533                 // close itself nicely
534                 conn.setSocketTimeout(250);
535             }
536         } else {
537             conn.shutdown();
538         }
539     }
540 
541     private State getState(final NHttpConnection conn) {
542         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
543     }
544 
545     /**
546      * This method can be used to log I/O exception thrown while closing
547      * {@link java.io.Closeable} objects (such as
548      * {@link org.apache.http.HttpConnection}).
549      *
550      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
551      */
552     protected void log(final Exception ex) {
553         this.exceptionLogger.log(ex);
554     }
555 
556     private void shutdownConnection(final NHttpConnection conn) {
557         try {
558             conn.shutdown();
559         } catch (final IOException ex) {
560             log(ex);
561         }
562     }
563 
564     private void closeHandlers(final State state, final Exception ex) {
565         final HttpAsyncRequestConsumer<Object> consumer =
566                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
567         if (consumer != null) {
568             try {
569                 consumer.failed(ex);
570             } finally {
571                 try {
572                     consumer.close();
573                 } catch (final IOException ioex) {
574                     log(ioex);
575                 }
576             }
577         }
578         final HttpAsyncResponseProducer producer =
579                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
580         if (producer != null) {
581             try {
582                 producer.failed(ex);
583             } finally {
584                 try {
585                     producer.close();
586                 } catch (final IOException ioex) {
587                     log(ioex);
588                 }
589             }
590         }
591     }
592 
593     private void closeHandlers(final State state) {
594         final HttpAsyncRequestConsumer<Object> consumer =
595                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
596         if (consumer != null) {
597             try {
598                 consumer.close();
599             } catch (final IOException ioex) {
600                 log(ioex);
601             }
602         }
603         final HttpAsyncResponseProducer producer =
604                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
605         if (producer != null) {
606             try {
607                 producer.close();
608             } catch (final IOException ioex) {
609                 log(ioex);
610             }
611         }
612     }
613 
614     protected HttpAsyncResponseProducer handleException(
615             final Exception ex, final HttpContext context) {
616         final int code;
617         if (ex instanceof MethodNotSupportedException) {
618             code = HttpStatus.SC_NOT_IMPLEMENTED;
619         } else if (ex instanceof UnsupportedHttpVersionException) {
620             code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
621         } else if (ex instanceof ProtocolException) {
622             code = HttpStatus.SC_BAD_REQUEST;
623         } else {
624             code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
625         }
626         String message = ex.getMessage();
627         if (message == null) {
628             message = ex.toString();
629         }
630         final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
631                 code, context);
632         return new ErrorResponseProducer(response,
633                 new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
634     }
635 
636     /**
637      * This method can be used to handle callback set up happened after
638      * response submission.
639      *
640      * @param cancellable Request cancellation callback.
641      * @param context Request context.
642      *
643      * @since 4.4
644      */
645     protected void handleAlreadySubmittedResponse(
646             final Cancellable cancellable, final HttpContext context) {
647         throw new IllegalStateException("Response already submitted");
648     }
649 
650     /**
651      * This method can be used to handle double response submission.
652      *
653      * @param responseProducer Response producer for second response.
654      * @param context Request context.
655      *
656      * @since 4.4
657      */
658     protected void handleAlreadySubmittedResponse(
659             final HttpAsyncResponseProducer responseProducer,
660             final HttpContext context) {
661         throw new IllegalStateException("Response already submitted");
662     }
663 
664     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
665         if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
666             return false;
667         }
668         final int status = response.getStatusLine().getStatusCode();
669         return status >= HttpStatus.SC_OK
670             && status != HttpStatus.SC_NO_CONTENT
671             && status != HttpStatus.SC_NOT_MODIFIED
672             && status != HttpStatus.SC_RESET_CONTENT;
673     }
674 
675     private void completeRequest(
676             final Incoming incoming,
677             final NHttpServerConnection conn,
678             final State state) throws IOException, HttpException {
679         state.setRequestState(MessageState.READY);
680         state.setIncoming(null);
681 
682         final PipelineEntry pipelineEntry;
683         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
684         try {
685             final HttpContext context = incoming.getContext();
686             consumer.requestCompleted(context);
687             pipelineEntry = new PipelineEntry(
688                     incoming.getRequest(),
689                     consumer.getResult(),
690                     consumer.getException(),
691                     incoming.getHandler(),
692                     context);
693         } finally {
694             consumer.close();
695         }
696         final Queue<PipelineEntry> pipeline = state.getPipeline();
697         pipeline.add(pipelineEntry);
698         if (state.getResponseState() == MessageState.READY) {
699             conn.requestOutput();
700         }
701     }
702 
703     private void commitFinalResponse(
704             final NHttpServerConnection conn,
705             final State state) throws IOException, HttpException {
706         final Outgoing outgoing = state.getOutgoing();
707         Asserts.notNull(outgoing, "Outgoing response");
708         final HttpRequest request = outgoing.getRequest();
709         final HttpResponse response = outgoing.getResponse();
710         final HttpContext context = outgoing.getContext();
711 
712         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
713         this.httpProcessor.process(response, context);
714 
715         HttpEntity entity = response.getEntity();
716         if (entity != null && !canResponseHaveBody(request, response)) {
717             response.setEntity(null);
718             entity = null;
719         }
720 
721         conn.submitResponse(response);
722 
723         if (entity == null) {
724             completeResponse(outgoing, conn, state);
725         } else {
726             state.setResponseState(MessageState.BODY_STREAM);
727         }
728     }
729 
730     private void completeResponse(
731             final Outgoing outgoing,
732             final NHttpServerConnection conn,
733             final State state) throws IOException, HttpException {
734         final HttpContext context = outgoing.getContext();
735         final HttpResponse response = outgoing.getResponse();
736         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
737         try {
738             responseProducer.responseCompleted(context);
739             state.setOutgoing(null);
740             state.setCancellable(null);
741             state.setResponseState(MessageState.READY);
742         } finally {
743             responseProducer.close();
744         }
745         if (!this.connStrategy.keepAlive(response, context)) {
746             conn.close();
747         } else {
748             // Ready to process new request
749             final Queue<PipelineEntry> pipeline = state.getPipeline();
750             if (pipeline.isEmpty()) {
751                 conn.suspendOutput();
752             }
753             conn.requestInput();
754         }
755     }
756 
757     @SuppressWarnings("unchecked")
758     private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
759         HttpAsyncRequestHandler<Object> handler = null;
760         if (this.handlerMapper != null) {
761             handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
762         }
763         if (handler == null) {
764             handler = new NullRequestHandler();
765         }
766         return handler;
767     }
768 
769     static class Incoming {
770 
771         private final HttpRequest request;
772         private final HttpAsyncRequestHandler<Object> handler;
773         private final HttpAsyncRequestConsumer<Object> consumer;
774         private final HttpContext context;
775 
776         Incoming(
777                 final HttpRequest request,
778                 final HttpAsyncRequestHandler<Object> handler,
779                 final HttpAsyncRequestConsumer<Object> consumer,
780                 final HttpContext context) {
781             this.request = request;
782             this.handler = handler;
783             this.consumer = consumer;
784             this.context = context;
785         }
786 
787         public HttpRequest getRequest() {
788             return this.request;
789         }
790 
791         public HttpAsyncRequestHandler<Object> getHandler() {
792             return this.handler;
793         }
794 
795         public HttpAsyncRequestConsumer<Object> getConsumer() {
796             return this.consumer;
797         }
798 
799         public HttpContext getContext() {
800             return this.context;
801         }
802     }
803 
804     static class Outgoing {
805 
806         private final HttpRequest request;
807         private final HttpResponse response;
808         private final HttpAsyncResponseProducer producer;
809         private final HttpContext context;
810 
811         Outgoing(
812                 final HttpRequest request,
813                 final HttpResponse response,
814                 final HttpAsyncResponseProducer producer,
815                 final HttpContext context) {
816             this.request = request;
817             this.response = response;
818             this.producer = producer;
819             this.context = context;
820         }
821 
822         public HttpRequest getRequest() {
823             return this.request;
824         }
825 
826         public HttpResponse getResponse() {
827             return this.response;
828         }
829 
830         public HttpAsyncResponseProducer getProducer() {
831             return this.producer;
832         }
833 
834         public HttpContext getContext() {
835             return this.context;
836         }
837     }
838 
839     static class PipelineEntry {
840 
841         private final HttpRequest request;
842         private final Object result;
843         private final Exception exception;
844         private final HttpAsyncRequestHandler<Object> handler;
845         private final HttpContext context;
846 
847         PipelineEntry(
848                 final HttpRequest request,
849                 final Object result,
850                 final Exception exception,
851                 final HttpAsyncRequestHandler<Object> handler,
852                 final HttpContext context) {
853             this.request = request;
854             this.result = result;
855             this.exception = exception;
856             this.handler = handler;
857             this.context = context;
858         }
859 
860         public HttpRequest getRequest() {
861             return this.request;
862         }
863 
864         public Object getResult() {
865             return this.result;
866         }
867 
868         public Exception getException() {
869             return this.exception;
870         }
871 
872         public HttpAsyncRequestHandler<Object> getHandler() {
873             return this.handler;
874         }
875 
876         public HttpContext getContext() {
877             return this.context;
878         }
879 
880     }
881 
882     static class State {
883 
884         private final Queue<PipelineEntry> pipeline;
885         private volatile boolean terminated;
886         private volatile MessageState requestState;
887         private volatile MessageState responseState;
888         private volatile Incoming incoming;
889         private volatile Outgoing outgoing;
890         private volatile Cancellable cancellable;
891 
892         State() {
893             super();
894             this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>();
895             this.requestState = MessageState.READY;
896             this.responseState = MessageState.READY;
897         }
898 
899         public boolean isTerminated() {
900             return this.terminated;
901         }
902 
903         public void setTerminated() {
904             this.terminated = true;
905         }
906 
907         public MessageState getRequestState() {
908             return this.requestState;
909         }
910 
911         public void setRequestState(final MessageState state) {
912             this.requestState = state;
913         }
914 
915         public MessageState getResponseState() {
916             return this.responseState;
917         }
918 
919         public void setResponseState(final MessageState state) {
920             this.responseState = state;
921         }
922 
923         public Incoming getIncoming() {
924             return this.incoming;
925         }
926 
927         public void setIncoming(final Incoming incoming) {
928             this.incoming = incoming;
929         }
930 
931         public Outgoing getOutgoing() {
932             return this.outgoing;
933         }
934 
935         public void setOutgoing(final Outgoing outgoing) {
936             this.outgoing = outgoing;
937         }
938 
939         public Cancellable getCancellable() {
940             return this.cancellable;
941         }
942 
943         public void setCancellable(final Cancellable cancellable) {
944             this.cancellable = cancellable;
945         }
946 
947         public Queue<PipelineEntry> getPipeline() {
948             return this.pipeline;
949         }
950 
951         @Override
952         public String toString() {
953             final StringBuilder buf = new StringBuilder();
954             buf.append("[incoming ");
955             buf.append(this.requestState);
956             if (this.incoming != null) {
957                 buf.append(" ");
958                 buf.append(this.incoming.getRequest().getRequestLine());
959             }
960             buf.append("; outgoing ");
961             buf.append(this.responseState);
962             if (this.outgoing != null) {
963                 buf.append(" ");
964                 buf.append(this.outgoing.getResponse().getStatusLine());
965             }
966             buf.append("]");
967             return buf.toString();
968         }
969 
970     }
971 
972     class HttpAsyncExchangeImpl implements HttpAsyncExchange {
973 
974         private final AtomicBoolean completed = new AtomicBoolean();
975         private final HttpRequest request;
976         private final HttpResponse response;
977         private final State state;
978         private final NHttpServerConnection conn;
979         private final HttpContext context;
980 
981         public HttpAsyncExchangeImpl(
982                 final HttpRequest request,
983                 final HttpResponse response,
984                 final State state,
985                 final NHttpServerConnection conn,
986                 final HttpContext context) {
987             super();
988             this.request = request;
989             this.response = response;
990             this.state = state;
991             this.conn = conn;
992             this.context = context;
993         }
994 
995         @Override
996         public HttpRequest getRequest() {
997             return this.request;
998         }
999 
1000         @Override
1001         public HttpResponse getResponse() {
1002             return this.response;
1003         }
1004 
1005         @Override
1006         public void setCallback(final Cancellable cancellable) {
1007             if (this.completed.get()) {
1008                 handleAlreadySubmittedResponse(cancellable, context);
1009             } else if (this.state.isTerminated() && cancellable != null) {
1010                 cancellable.cancel();
1011             } else {
1012                 this.state.setCancellable(cancellable);
1013             }
1014         }
1015 
1016         @Override
1017         public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
1018             Args.notNull(responseProducer, "Response producer");
1019             if (this.completed.getAndSet(true)) {
1020                 handleAlreadySubmittedResponse(responseProducer, context);
1021             } else if (!this.state.isTerminated()) {
1022                 final HttpResponse response = responseProducer.generateResponse();
1023                 final Outgoing outgoing = new Outgoing(
1024                         this.request, response, responseProducer, this.context);
1025 
1026                 synchronized (this.state) {
1027                     this.state.setOutgoing(outgoing);
1028                     this.state.setCancellable(null);
1029                     this.conn.requestOutput();
1030                 }
1031 
1032             } else {
1033                 try {
1034                     responseProducer.close();
1035                 } catch (final IOException ex) {
1036                     log(ex);
1037                 }
1038             }
1039         }
1040 
1041         @Override
1042         public void submitResponse() {
1043             submitResponse(new BasicAsyncResponseProducer(this.response));
1044         }
1045 
1046         @Override
1047         public boolean isCompleted() {
1048             return this.completed.get();
1049         }
1050 
1051         @Override
1052         public void setTimeout(final int timeout) {
1053             this.conn.setSocketTimeout(timeout);
1054         }
1055 
1056         @Override
1057         public int getTimeout() {
1058             return this.conn.getSocketTimeout();
1059         }
1060 
1061     }
1062 
1063     /**
1064      * Adaptor class to transition from HttpAsyncRequestHandlerResolver to HttpAsyncRequestHandlerMapper.
1065      */
1066     @Deprecated
1067     private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
1068 
1069         private final HttpAsyncRequestHandlerResolver resolver;
1070 
1071         public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
1072             this.resolver = resolver;
1073         }
1074 
1075         @Override
1076         public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
1077             return resolver.lookup(request.getRequestLine().getUri());
1078         }
1079 
1080     }
1081 
1082 }