View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.http.ConnectionReuseStrategy;
37  import org.apache.http.ExceptionLogger;
38  import org.apache.http.HttpEntity;
39  import org.apache.http.HttpEntityEnclosingRequest;
40  import org.apache.http.HttpException;
41  import org.apache.http.HttpRequest;
42  import org.apache.http.HttpResponse;
43  import org.apache.http.HttpResponseFactory;
44  import org.apache.http.HttpStatus;
45  import org.apache.http.HttpVersion;
46  import org.apache.http.MethodNotSupportedException;
47  import org.apache.http.ProtocolException;
48  import org.apache.http.UnsupportedHttpVersionException;
49  import org.apache.http.annotation.Contract;
50  import org.apache.http.annotation.ThreadingBehavior;
51  import org.apache.http.concurrent.Cancellable;
52  import org.apache.http.entity.ContentType;
53  import org.apache.http.impl.DefaultConnectionReuseStrategy;
54  import org.apache.http.impl.DefaultHttpResponseFactory;
55  import org.apache.http.nio.ContentDecoder;
56  import org.apache.http.nio.ContentEncoder;
57  import org.apache.http.nio.NHttpConnection;
58  import org.apache.http.nio.NHttpServerConnection;
59  import org.apache.http.nio.NHttpServerEventHandler;
60  import org.apache.http.nio.entity.NStringEntity;
61  import org.apache.http.nio.reactor.SessionBufferStatus;
62  import org.apache.http.params.HttpParams;
63  import org.apache.http.protocol.BasicHttpContext;
64  import org.apache.http.protocol.HttpContext;
65  import org.apache.http.protocol.HttpCoreContext;
66  import org.apache.http.protocol.HttpProcessor;
67  import org.apache.http.util.Args;
68  import org.apache.http.util.Asserts;
69  
70  /**
71   * {@code HttpAsyncService} is a fully asynchronous HTTP server side protocol
72   * handler based on the non-blocking (NIO) I/O model.
73   * {@code HttpAsyncServerProtocolHandler} translates individual events fired
74   * through the {@link NHttpServerEventHandler} interface into logically related
75   * HTTP message exchanges.
76   * <p>
77   * Upon receiving an incoming request {@code HttpAsyncService} verifies
78   * the message for compliance with the server expectations using
79   * {@link HttpAsyncExpectationVerifier}, if provided, and then
80   * {@link HttpAsyncRequestHandlerMapper} is used to map the request
81   * to a particular {@link HttpAsyncRequestHandler} intended to handle
82   * the request with the given URI. The protocol handler uses the selected
83   * {@link HttpAsyncRequestHandler} instance to process the incoming request
84   * and to generate an outgoing response.
85   * <p>
86   * {@code HttpAsyncService} relies on {@link HttpProcessor} to generate
87   * mandatory protocol headers for all outgoing messages and apply common,
88   * cross-cutting message transformations to all incoming and outgoing messages,
89   * whereas individual {@link HttpAsyncRequestHandler}s are expected
90   * to implement application specific content generation and processing.
91   * <p>
92   * Individual {@link HttpAsyncRequestHandler}s do not have to submit a response
93   * immediately. They can defer transmission of an HTTP response back to
94   * the client without blocking the I/O thread by delegating the process of
95   * request handling to another service or a worker thread. HTTP response can
96   * be submitted as a later a later point of time once response content becomes
97   * available.
98   *
99   * @since 4.2
100  */
101 @SuppressWarnings("deprecation")
102 @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
103 public class HttpAsyncService implements NHttpServerEventHandler {
104 
105     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
106 
107     private final HttpProcessor httpProcessor;
108     private final ConnectionReuseStrategy connectionStrategy;
109     private final HttpResponseFactory responseFactory;
110     private final HttpAsyncRequestHandlerMapper handlerMapper;
111     private final HttpAsyncExpectationVerifier expectationVerifier;
112     private final ExceptionLogger exceptionLogger;
113 
114     /**
115      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
116      *
117      * @param httpProcessor HTTP protocol processor (required).
118      * @param connStrategy Connection re-use strategy (required).
119      * @param responseFactory HTTP response factory (required).
120      * @param handlerResolver Request handler resolver.
121      * @param expectationVerifier Request expectation verifier (optional).
122      * @param params HTTP parameters (required).
123      *
124      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
125      *  ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
126      *    HttpAsyncExpectationVerifier)}
127      */
128     @Deprecated
129     public HttpAsyncService(
130             final HttpProcessor httpProcessor,
131             final ConnectionReuseStrategy connStrategy,
132             final HttpResponseFactory responseFactory,
133             final HttpAsyncRequestHandlerResolver handlerResolver,
134             final HttpAsyncExpectationVerifier expectationVerifier,
135             final HttpParams params) {
136         this(httpProcessor,
137              connStrategy,
138              responseFactory,
139              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
140              expectationVerifier);
141     }
142 
143     /**
144      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
145      *
146      * @param httpProcessor HTTP protocol processor (required).
147      * @param connStrategy Connection re-use strategy (required).
148      * @param handlerResolver Request handler resolver.
149      * @param params HTTP parameters (required).
150      *
151      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
152      *   ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
153      *   HttpAsyncExpectationVerifier)}
154      */
155     @Deprecated
156     public HttpAsyncService(
157             final HttpProcessor httpProcessor,
158             final ConnectionReuseStrategy connStrategy,
159             final HttpAsyncRequestHandlerResolver handlerResolver,
160             final HttpParams params) {
161         this(httpProcessor,
162              connStrategy,
163              DefaultHttpResponseFactory.INSTANCE,
164              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
165              null);
166     }
167 
168     /**
169      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
170      *
171      * @param httpProcessor HTTP protocol processor.
172      * @param connStrategy Connection re-use strategy. If {@code null}
173      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
174      * @param responseFactory HTTP response factory. If {@code null}
175      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
176      * @param handlerMapper Request handler mapper.
177      * @param expectationVerifier Request expectation verifier. May be {@code null}.
178      *
179      * @since 4.3
180      */
181     public HttpAsyncService(
182             final HttpProcessor httpProcessor,
183             final ConnectionReuseStrategy connStrategy,
184             final HttpResponseFactory responseFactory,
185             final HttpAsyncRequestHandlerMapper handlerMapper,
186             final HttpAsyncExpectationVerifier expectationVerifier) {
187         this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null);
188     }
189 
190     /**
191      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
192      *
193      * @param httpProcessor HTTP protocol processor.
194      * @param connStrategy Connection re-use strategy. If {@code null}
195      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
196      * @param responseFactory HTTP response factory. If {@code null}
197      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
198      * @param handlerMapper Request handler mapper.
199      * @param expectationVerifier Request expectation verifier. May be {@code null}.
200      * @param exceptionLogger Exception logger. If {@code null}
201      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
202      *   logger will be only used to log I/O exception thrown while closing
203      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
204      *
205      * @since 4.4
206      */
207     public HttpAsyncService(
208             final HttpProcessor httpProcessor,
209             final ConnectionReuseStrategy connStrategy,
210             final HttpResponseFactory responseFactory,
211             final HttpAsyncRequestHandlerMapper handlerMapper,
212             final HttpAsyncExpectationVerifier expectationVerifier,
213             final ExceptionLogger exceptionLogger) {
214         super();
215         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
216         this.connectionStrategy = connStrategy != null ? connStrategy :
217                 DefaultConnectionReuseStrategy.INSTANCE;
218         this.responseFactory = responseFactory != null ? responseFactory :
219                 DefaultHttpResponseFactory.INSTANCE;
220         this.handlerMapper = handlerMapper;
221         this.expectationVerifier = expectationVerifier;
222         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
223     }
224 
225     /**
226      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
227      *
228      * @param httpProcessor HTTP protocol processor.
229      * @param handlerMapper Request handler mapper.
230      *
231      * @since 4.3
232      */
233     public HttpAsyncService(
234             final HttpProcessor httpProcessor,
235             final HttpAsyncRequestHandlerMapper handlerMapper) {
236         this(httpProcessor, null, null, handlerMapper, null);
237     }
238 
239     /**
240      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
241      *
242      * @param httpProcessor HTTP protocol processor.
243      * @param handlerMapper Request handler mapper.
244      * @param exceptionLogger Exception logger. If {@code null}
245      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
246      *   logger will be only used to log I/O exception thrown while closing
247      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
248      *
249      * @since 4.4
250      */
251     public HttpAsyncService(
252             final HttpProcessor httpProcessor,
253             final HttpAsyncRequestHandlerMapper handlerMapper,
254             final ExceptionLogger exceptionLogger) {
255         this(httpProcessor, null, null, handlerMapper, null, exceptionLogger);
256     }
257 
258     @Override
259     public void connected(final NHttpServerConnection conn) {
260         final State state = new State();
261         conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
262     }
263 
264     @Override
265     public void closed(final NHttpServerConnection conn) {
266         final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE);
267         if (state != null) {
268             state.setTerminated();
269             closeHandlers(state);
270             final Cancellable cancellable = state.getCancellable();
271             if (cancellable != null) {
272                 cancellable.cancel();
273             }
274         }
275     }
276 
277     @Override
278     public void exception(
279             final NHttpServerConnection conn, final Exception cause) {
280         log(cause);
281         final State state = getState(conn);
282         if (state == null) {
283             shutdownConnection(conn);
284             return;
285         }
286         state.setTerminated();
287         closeHandlers(state, cause);
288         final Cancellable cancellable = state.getCancellable();
289         if (cancellable != null) {
290             cancellable.cancel();
291         }
292         final Queue<PipelineEntry> pipeline = state.getPipeline();
293         if (!pipeline.isEmpty()
294                 || conn.isResponseSubmitted()
295                 || state.getResponseState().compareTo(MessageState.INIT) > 0) {
296             // There is not much that we can do if a response
297             // has already been submitted or pipelining is being used.
298             shutdownConnection(conn);
299         } else {
300             try {
301                 final Incoming incoming = state.getIncoming();
302                 final HttpRequest request = incoming != null ? incoming.getRequest() : null;
303                 final HttpContext context = incoming != null ? incoming.getContext() : new BasicHttpContext();
304                 final HttpAsyncResponseProducer responseProducer = handleException(cause, context);
305                 final HttpResponse response = responseProducer.generateResponse();
306                 final Outgoing outgoing = new Outgoing(request, response, responseProducer, context);
307                 state.setResponseState(MessageState.INIT);
308                 state.setOutgoing(outgoing);
309                 commitFinalResponse(conn, state);
310             } catch (final Exception ex) {
311                 shutdownConnection(conn);
312                 closeHandlers(state);
313                 if (ex instanceof RuntimeException) {
314                     throw (RuntimeException) ex;
315                 }
316                 log(ex);
317             }
318         }
319     }
320 
321     @Override
322     public void requestReceived(
323             final NHttpServerConnection conn) throws IOException, HttpException {
324         final State state = getState(conn);
325         Asserts.notNull(state, "Connection state");
326         Asserts.check(state.getRequestState() == MessageState.READY,
327                 "Unexpected request state %s", state.getRequestState());
328 
329         final HttpRequest request = conn.getHttpRequest();
330         final HttpContext context = new BasicHttpContext();
331 
332         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
333         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
334         this.httpProcessor.process(request, context);
335 
336         final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
337         final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
338         consumer.requestReceived(request);
339 
340         final Incoming incoming = new Incoming(request, requestHandler, consumer, context);
341         state.setIncoming(incoming);
342 
343         if (request instanceof HttpEntityEnclosingRequest) {
344 
345             // If 100-continue is expected make sure
346             // there is no pending response data, no pipelined requests or buffered input
347             if (((HttpEntityEnclosingRequest) request).expectContinue()
348                         && state.getResponseState() == MessageState.READY
349                         && state.getPipeline().isEmpty()
350                         && !(conn instanceof SessionBufferStatusrg/apache/http/nio/reactor/SessionBufferStatus.html#SessionBufferStatus">SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) {
351 
352                 state.setRequestState(MessageState.ACK_EXPECTED);
353                 final HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
354                         HttpStatus.SC_CONTINUE, context);
355                 if (this.expectationVerifier != null) {
356                     conn.suspendInput();
357                     conn.suspendOutput();
358                     final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl(
359                             request, ack, state, conn, context);
360                     this.expectationVerifier.verify(httpAsyncExchange, context);
361                 } else {
362                     conn.submitResponse(ack);
363                     state.setRequestState(MessageState.BODY_STREAM);
364                 }
365             } else {
366                 state.setRequestState(MessageState.BODY_STREAM);
367             }
368         } else {
369             // No request content is expected. Process request right away
370             completeRequest(incoming, conn, state);
371         }
372     }
373 
374     @Override
375     public void inputReady(
376             final NHttpServerConnection conn,
377             final ContentDecoder decoder) throws IOException, HttpException {
378         final State state = getState(conn);
379         Asserts.notNull(state, "Connection state");
380         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM,
381                 "Unexpected request state %s", state.getRequestState());
382 
383         final Incoming incoming = state.getIncoming();
384         Asserts.notNull(incoming, "Incoming request");
385         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
386         consumer.consumeContent(decoder, conn);
387         if (decoder.isCompleted()) {
388             completeRequest(incoming, conn, state);
389         }
390     }
391 
392     @Override
393     public void responseReady(
394             final NHttpServerConnection conn) throws IOException, HttpException {
395         final State state = getState(conn);
396         Asserts.notNull(state, "Connection state");
397         Asserts.check(state.getResponseState() == MessageState.READY ||
398                         state.getResponseState() == MessageState.INIT,
399                 "Unexpected response state %s", state.getResponseState());
400 
401         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
402             final Outgoing outgoing;
403             synchronized (state) {
404                 outgoing = state.getOutgoing();
405                 if (outgoing == null) {
406                     conn.suspendOutput();
407                     return;
408                 }
409             }
410             final HttpResponse response = outgoing.getResponse();
411             final int status = response.getStatusLine().getStatusCode();
412             if (status == 100) {
413                 final HttpContext context = outgoing.getContext();
414                 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
415                 try {
416                     // Make sure 100 response has no entity
417                     response.setEntity(null);
418                     conn.requestInput();
419                     state.setRequestState(MessageState.BODY_STREAM);
420                     state.setOutgoing(null);
421                     conn.submitResponse(response);
422                     responseProducer.responseCompleted(context);
423                 } finally {
424                     responseProducer.close();
425                 }
426             } else if (status >= 400) {
427                 conn.resetInput();
428                 state.setRequestState(MessageState.READY);
429                 commitFinalResponse(conn, state);
430             } else {
431                 throw new HttpException("Invalid response: " + response.getStatusLine());
432             }
433         } else {
434             if (state.getResponseState() == MessageState.READY) {
435                 final Queue<PipelineEntry> pipeline = state.getPipeline();
436                 final PipelineEntry pipelineEntry = pipeline.poll();
437                 if (pipelineEntry == null) {
438                     conn.suspendOutput();
439                     return;
440                 }
441                 state.setResponseState(MessageState.INIT);
442                 final Object result = pipelineEntry.getResult();
443                 final HttpRequest request = pipelineEntry.getRequest();
444                 final HttpContext context = pipelineEntry.getContext();
445                 if (result != null) {
446                     final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
447                             HttpStatus.SC_OK, context);
448                     final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
449                             request, response, state, conn, context);
450                     final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler();
451                     conn.suspendOutput();
452                     try {
453                         handler.handle(result, httpExchange, context);
454                     } catch (final RuntimeException ex) {
455                         throw ex;
456                     } catch (final Exception ex) {
457                         pipeline.add(new PipelineEntry(
458                             request,
459                             null,
460                             ex,
461                             handler,
462                             context));
463                         state.setResponseState(MessageState.READY);
464                         responseReady(conn);
465                         return;
466                     }
467                 } else {
468                     final Exception exception = pipelineEntry.getException();
469                     final HttpAsyncResponseProducer responseProducer = handleException(
470                             exception != null ? exception : new HttpException("Internal error processing request"),
471                             context);
472                     final HttpResponse error = responseProducer.generateResponse();
473                     state.setOutgoing(new Outgoing(request, error, responseProducer, context));
474                 }
475             }
476             if (state.getResponseState() == MessageState.INIT) {
477                 final Outgoing outgoing;
478                 synchronized (state) {
479                     outgoing = state.getOutgoing();
480                     if (outgoing == null) {
481                         conn.suspendOutput();
482                         return;
483                     }
484                 }
485                 final HttpResponse response = outgoing.getResponse();
486                 final int status = response.getStatusLine().getStatusCode();
487                 if (status >= 200) {
488                     commitFinalResponse(conn, state);
489                 } else {
490                     throw new HttpException("Invalid response: " + response.getStatusLine());
491                 }
492             }
493         }
494     }
495 
496     @Override
497     public void outputReady(
498             final NHttpServerConnection conn,
499             final ContentEncoder encoder) throws HttpException, IOException {
500         final State state = getState(conn);
501         Asserts.notNull(state, "Connection state");
502         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
503                 "Unexpected response state %s", state.getResponseState());
504 
505         final Outgoing outgoing = state.getOutgoing();
506         Asserts.notNull(outgoing, "Outgoing response");
507         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
508 
509         responseProducer.produceContent(encoder, conn);
510 
511         if (encoder.isCompleted()) {
512             completeResponse(outgoing, conn, state);
513         }
514     }
515 
516     @Override
517     public void endOfInput(final NHttpServerConnection conn) throws IOException {
518         // Closing connection in an orderly manner and
519         // waiting for output buffer to get flushed.
520         // Do not want to wait indefinitely, though, in case
521         // the opposite end is not reading
522         if (conn.getSocketTimeout() <= 0) {
523             conn.setSocketTimeout(1000);
524         }
525         conn.close();
526     }
527 
528     @Override
529     public void timeout(final NHttpServerConnection conn) throws IOException {
530         final State state = getState(conn);
531         if (state != null) {
532             exception(conn, new SocketTimeoutException(
533                     String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
534         }
535         if (conn.getStatus() == NHttpConnection.ACTIVE) {
536             conn.close();
537             if (conn.getStatus() == NHttpConnection.CLOSING) {
538                 // Give the connection some grace time to
539                 // close itself nicely
540                 conn.setSocketTimeout(250);
541             }
542         } else {
543             conn.shutdown();
544         }
545     }
546 
547     private State getState(final NHttpConnection conn) {
548         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
549     }
550 
551     /**
552      * This method can be used to log I/O exception thrown while closing
553      * {@link java.io.Closeable} objects (such as
554      * {@link org.apache.http.HttpConnection}).
555      *
556      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
557      */
558     protected void log(final Exception ex) {
559         this.exceptionLogger.log(ex);
560     }
561 
562     private void shutdownConnection(final NHttpConnection conn) {
563         try {
564             conn.shutdown();
565         } catch (final IOException ex) {
566             log(ex);
567         }
568     }
569 
570     private void closeHandlers(final State state, final Exception ex) {
571         final HttpAsyncRequestConsumer<Object> consumer =
572                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
573         if (consumer != null) {
574             try {
575                 consumer.failed(ex);
576             } finally {
577                 try {
578                     consumer.close();
579                 } catch (final IOException ioex) {
580                     log(ioex);
581                 }
582             }
583         }
584         final HttpAsyncResponseProducer producer =
585                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
586         if (producer != null) {
587             try {
588                 producer.failed(ex);
589             } finally {
590                 try {
591                     producer.close();
592                 } catch (final IOException ioex) {
593                     log(ioex);
594                 }
595             }
596         }
597     }
598 
599     private void closeHandlers(final State state) {
600         final HttpAsyncRequestConsumer<Object> consumer =
601                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
602         if (consumer != null) {
603             try {
604                 consumer.close();
605             } catch (final IOException ioex) {
606                 log(ioex);
607             }
608         }
609         final HttpAsyncResponseProducer producer =
610                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
611         if (producer != null) {
612             try {
613                 producer.close();
614             } catch (final IOException ioex) {
615                 log(ioex);
616             }
617         }
618     }
619 
620     protected HttpAsyncResponseProducer handleException(
621             final Exception ex, final HttpContext context) {
622         String message = ex.getMessage();
623         if (message == null) {
624             message = ex.toString();
625         }
626         final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
627                 toStatusCode(ex, context), context);
628         return new ErrorResponseProducer(response,
629                 new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
630     }
631 
632     protected int toStatusCode(final Exception ex, final HttpContext context) {
633         final int code;
634         if (ex instanceof MethodNotSupportedException) {
635             code = HttpStatus.SC_NOT_IMPLEMENTED;
636         } else if (ex instanceof UnsupportedHttpVersionException) {
637             code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
638         } else if (ex instanceof ProtocolException) {
639             code = HttpStatus.SC_BAD_REQUEST;
640         } else if (ex instanceof SocketTimeoutException) {
641             code = HttpStatus.SC_GATEWAY_TIMEOUT;
642         } else {
643             code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
644         }
645         return code;
646     }
647 
648     /**
649      * This method can be used to handle callback set up happened after
650      * response submission.
651      *
652      * @param cancellable Request cancellation callback.
653      * @param context Request context.
654      *
655      * @since 4.4
656      */
657     protected void handleAlreadySubmittedResponse(
658             final Cancellable cancellable, final HttpContext context) {
659         throw new IllegalStateException("Response already submitted");
660     }
661 
662     /**
663      * This method can be used to handle double response submission.
664      *
665      * @param responseProducer Response producer for second response.
666      * @param context Request context.
667      *
668      * @since 4.4
669      */
670     protected void handleAlreadySubmittedResponse(
671             final HttpAsyncResponseProducer responseProducer,
672             final HttpContext context) {
673         throw new IllegalStateException("Response already submitted");
674     }
675 
676     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
677         if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
678             return false;
679         }
680         final int status = response.getStatusLine().getStatusCode();
681         return status >= HttpStatus.SC_OK
682             && status != HttpStatus.SC_NO_CONTENT
683             && status != HttpStatus.SC_NOT_MODIFIED
684             && status != HttpStatus.SC_RESET_CONTENT;
685     }
686 
687     private void completeRequest(
688             final Incoming incoming,
689             final NHttpServerConnection conn,
690             final State state) throws IOException {
691         state.setRequestState(MessageState.READY);
692         state.setIncoming(null);
693 
694         final PipelineEntry pipelineEntry;
695         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
696         try {
697             final HttpContext context = incoming.getContext();
698             consumer.requestCompleted(context);
699             pipelineEntry = new PipelineEntry(
700                     incoming.getRequest(),
701                     consumer.getResult(),
702                     consumer.getException(),
703                     incoming.getHandler(),
704                     context);
705         } finally {
706             consumer.close();
707         }
708         final Queue<PipelineEntry> pipeline = state.getPipeline();
709         pipeline.add(pipelineEntry);
710         if (state.getResponseState() == MessageState.READY) {
711             conn.requestOutput();
712         }
713     }
714 
715     private void commitFinalResponse(
716             final NHttpServerConnection conn,
717             final State state) throws IOException, HttpException {
718         final Outgoing outgoing = state.getOutgoing();
719         Asserts.notNull(outgoing, "Outgoing response");
720         final HttpRequest request = outgoing.getRequest();
721         final HttpResponse response = outgoing.getResponse();
722         final HttpContext context = outgoing.getContext();
723 
724         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
725         this.httpProcessor.process(response, context);
726 
727         HttpEntity entity = response.getEntity();
728         if (entity != null && !canResponseHaveBody(request, response)) {
729             response.setEntity(null);
730             entity = null;
731         }
732 
733         conn.submitResponse(response);
734 
735         if (entity == null) {
736             completeResponse(outgoing, conn, state);
737         } else {
738             state.setResponseState(MessageState.BODY_STREAM);
739         }
740     }
741 
742     private void completeResponse(
743             final Outgoing outgoing,
744             final NHttpServerConnection conn,
745             final State state) throws IOException {
746         final HttpContext context = outgoing.getContext();
747         final HttpResponse response = outgoing.getResponse();
748         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
749         try {
750             responseProducer.responseCompleted(context);
751             state.setOutgoing(null);
752             state.setCancellable(null);
753             state.setResponseState(MessageState.READY);
754         } finally {
755             responseProducer.close();
756         }
757         if (!this.connectionStrategy.keepAlive(response, context)) {
758             conn.close();
759         } else {
760             conn.requestInput();
761         }
762     }
763 
764     @SuppressWarnings("unchecked")
765     private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
766         HttpAsyncRequestHandler<Object> handler = null;
767         if (this.handlerMapper != null) {
768             handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
769         }
770         if (handler == null) {
771             handler = NullRequestHandler.INSTANCE;
772         }
773         return handler;
774     }
775 
776     static class Incoming {
777 
778         private final HttpRequest request;
779         private final HttpAsyncRequestHandler<Object> handler;
780         private final HttpAsyncRequestConsumer<Object> consumer;
781         private final HttpContext context;
782 
783         Incoming(
784                 final HttpRequest request,
785                 final HttpAsyncRequestHandler<Object> handler,
786                 final HttpAsyncRequestConsumer<Object> consumer,
787                 final HttpContext context) {
788             this.request = request;
789             this.handler = handler;
790             this.consumer = consumer;
791             this.context = context;
792         }
793 
794         public HttpRequest getRequest() {
795             return this.request;
796         }
797 
798         public HttpAsyncRequestHandler<Object> getHandler() {
799             return this.handler;
800         }
801 
802         public HttpAsyncRequestConsumer<Object> getConsumer() {
803             return this.consumer;
804         }
805 
806         public HttpContext getContext() {
807             return this.context;
808         }
809     }
810 
811     static class Outgoing {
812 
813         private final HttpRequest request;
814         private final HttpResponse response;
815         private final HttpAsyncResponseProducer producer;
816         private final HttpContext context;
817 
818         Outgoing(
819                 final HttpRequest request,
820                 final HttpResponse response,
821                 final HttpAsyncResponseProducer producer,
822                 final HttpContext context) {
823             this.request = request;
824             this.response = response;
825             this.producer = producer;
826             this.context = context;
827         }
828 
829         public HttpRequest getRequest() {
830             return this.request;
831         }
832 
833         public HttpResponse getResponse() {
834             return this.response;
835         }
836 
837         public HttpAsyncResponseProducer getProducer() {
838             return this.producer;
839         }
840 
841         public HttpContext getContext() {
842             return this.context;
843         }
844     }
845 
846     static class PipelineEntry {
847 
848         private final HttpRequest request;
849         private final Object result;
850         private final Exception exception;
851         private final HttpAsyncRequestHandler<Object> handler;
852         private final HttpContext context;
853 
854         PipelineEntry(
855                 final HttpRequest request,
856                 final Object result,
857                 final Exception exception,
858                 final HttpAsyncRequestHandler<Object> handler,
859                 final HttpContext context) {
860             this.request = request;
861             this.result = result;
862             this.exception = exception;
863             this.handler = handler;
864             this.context = context;
865         }
866 
867         public HttpRequest getRequest() {
868             return this.request;
869         }
870 
871         public Object getResult() {
872             return this.result;
873         }
874 
875         public Exception getException() {
876             return this.exception;
877         }
878 
879         public HttpAsyncRequestHandler<Object> getHandler() {
880             return this.handler;
881         }
882 
883         public HttpContext getContext() {
884             return this.context;
885         }
886 
887     }
888 
889     static class State {
890 
891         private final Queue<PipelineEntry> pipeline;
892         private volatile boolean terminated;
893         private volatile MessageState requestState;
894         private volatile MessageState responseState;
895         private volatile Incoming incoming;
896         private volatile Outgoing outgoing;
897         private volatile Cancellable cancellable;
898 
899         State() {
900             super();
901             this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>();
902             this.requestState = MessageState.READY;
903             this.responseState = MessageState.READY;
904         }
905 
906         public boolean isTerminated() {
907             return this.terminated;
908         }
909 
910         public void setTerminated() {
911             this.terminated = true;
912         }
913 
914         public MessageState getRequestState() {
915             return this.requestState;
916         }
917 
918         public void setRequestState(final MessageState state) {
919             this.requestState = state;
920         }
921 
922         public MessageState getResponseState() {
923             return this.responseState;
924         }
925 
926         public void setResponseState(final MessageState state) {
927             this.responseState = state;
928         }
929 
930         public Incoming getIncoming() {
931             return this.incoming;
932         }
933 
934         public void setIncoming(final Incoming incoming) {
935             this.incoming = incoming;
936         }
937 
938         public Outgoing getOutgoing() {
939             return this.outgoing;
940         }
941 
942         public void setOutgoing(final Outgoing outgoing) {
943             this.outgoing = outgoing;
944         }
945 
946         public Cancellable getCancellable() {
947             return this.cancellable;
948         }
949 
950         public void setCancellable(final Cancellable cancellable) {
951             this.cancellable = cancellable;
952         }
953 
954         public Queue<PipelineEntry> getPipeline() {
955             return this.pipeline;
956         }
957 
958         @Override
959         public String toString() {
960             final StringBuilder buf = new StringBuilder();
961             buf.append("[incoming ");
962             buf.append(this.requestState);
963             if (this.incoming != null) {
964                 buf.append(" ");
965                 buf.append(this.incoming.getRequest().getRequestLine());
966             }
967             buf.append("; outgoing ");
968             buf.append(this.responseState);
969             if (this.outgoing != null) {
970                 buf.append(" ");
971                 buf.append(this.outgoing.getResponse().getStatusLine());
972             }
973             buf.append("]");
974             return buf.toString();
975         }
976 
977     }
978 
979     class HttpAsyncExchangeImpl implements HttpAsyncExchange {
980 
981         private final AtomicBoolean completed = new AtomicBoolean();
982         private final HttpRequest request;
983         private final HttpResponse response;
984         private final State state;
985         private final NHttpServerConnection conn;
986         private final HttpContext context;
987 
988         public HttpAsyncExchangeImpl(
989                 final HttpRequest request,
990                 final HttpResponse response,
991                 final State state,
992                 final NHttpServerConnection conn,
993                 final HttpContext context) {
994             super();
995             this.request = request;
996             this.response = response;
997             this.state = state;
998             this.conn = conn;
999             this.context = context;
1000         }
1001 
1002         @Override
1003         public HttpRequest getRequest() {
1004             return this.request;
1005         }
1006 
1007         @Override
1008         public HttpResponse getResponse() {
1009             return this.response;
1010         }
1011 
1012         @Override
1013         public void setCallback(final Cancellable cancellable) {
1014             if (this.completed.get()) {
1015                 handleAlreadySubmittedResponse(cancellable, context);
1016             } else if (this.state.isTerminated() && cancellable != null) {
1017                 cancellable.cancel();
1018             } else {
1019                 this.state.setCancellable(cancellable);
1020             }
1021         }
1022 
1023         @Override
1024         public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
1025             Args.notNull(responseProducer, "Response producer");
1026             if (this.completed.getAndSet(true)) {
1027                 handleAlreadySubmittedResponse(responseProducer, context);
1028             } else if (!this.state.isTerminated()) {
1029                 final HttpResponse response = responseProducer.generateResponse();
1030                 final Outgoing outgoing = new Outgoing(
1031                         this.request, response, responseProducer, this.context);
1032 
1033                 synchronized (this.state) {
1034                     this.state.setOutgoing(outgoing);
1035                     this.state.setCancellable(null);
1036                     this.conn.requestOutput();
1037                 }
1038 
1039             } else {
1040                 try {
1041                     responseProducer.close();
1042                 } catch (final IOException ex) {
1043                     log(ex);
1044                 }
1045             }
1046         }
1047 
1048         @Override
1049         public void submitResponse() {
1050             submitResponse(new BasicAsyncResponseProducer(this.response));
1051         }
1052 
1053         @Override
1054         public boolean isCompleted() {
1055             return this.completed.get();
1056         }
1057 
1058         @Override
1059         public void setTimeout(final int timeout) {
1060             this.conn.setSocketTimeout(timeout);
1061         }
1062 
1063         @Override
1064         public int getTimeout() {
1065             return this.conn.getSocketTimeout();
1066         }
1067 
1068     }
1069 
1070     /**
1071      * Adaptor class to transition from HttpAsyncRequestHandlerResolver to HttpAsyncRequestHandlerMapper.
1072      *
1073      * @deprecated Do not use.
1074      */
1075     @Deprecated
1076     private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
1077 
1078         private final HttpAsyncRequestHandlerResolver resolver;
1079 
1080         public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
1081             this.resolver = resolver;
1082         }
1083 
1084         @Override
1085         public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
1086             return resolver.lookup(request.getRequestLine().getUri());
1087         }
1088 
1089     }
1090 
1091     /**
1092      * Gets the HttpResponseFactory for this service.
1093      *
1094      * @return  the HttpResponseFactory for this service.
1095      * @since 4.4.8
1096      */
1097     public HttpResponseFactory getResponseFactory() {
1098       return responseFactory;
1099     }
1100 
1101     /**
1102      * Gets the HttpProcessor for this service.
1103      *
1104      * @return the HttpProcessor for this service.
1105      * @since 4.4.9
1106      */
1107     public HttpProcessor getHttpProcessor() {
1108         return httpProcessor;
1109     }
1110 
1111     /**
1112      * Gets the ConnectionReuseStrategy for this service.
1113      *
1114      * @return the ConnectionReuseStrategy for this service.
1115      * @since 4.4.9
1116      */
1117     public ConnectionReuseStrategy getConnectionStrategy() {
1118         return connectionStrategy;
1119     }
1120 
1121     /**
1122      * Gets the HttpAsyncRequestHandlerMapper for this service.
1123      *
1124      * @return the HttpAsyncRequestHandlerMapper for this service.
1125      * @since 4.4.9
1126      */
1127     public HttpAsyncRequestHandlerMapper getHandlerMapper() {
1128         return handlerMapper;
1129     }
1130 
1131     /**
1132      * Gets the HttpAsyncExpectationVerifier for this service.
1133      *
1134      * @return the HttpAsyncExpectationVerifier for this service.
1135      * @since 4.4.9
1136      */
1137     public HttpAsyncExpectationVerifier getExpectationVerifier() {
1138         return expectationVerifier;
1139     }
1140 
1141     /**
1142      * Gets the ExceptionLogger for this service.
1143      *
1144      * @return the ExceptionLogger for this service.
1145      * @since 4.4.9
1146      */
1147     public ExceptionLogger getExceptionLogger() {
1148         return exceptionLogger;
1149     }
1150 
1151 }