View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  
35  import org.apache.http.ConnectionReuseStrategy;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpEntity;
38  import org.apache.http.HttpEntityEnclosingRequest;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpRequest;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.HttpResponseFactory;
43  import org.apache.http.HttpStatus;
44  import org.apache.http.HttpVersion;
45  import org.apache.http.MethodNotSupportedException;
46  import org.apache.http.ProtocolException;
47  import org.apache.http.UnsupportedHttpVersionException;
48  import org.apache.http.annotation.Immutable;
49  import org.apache.http.concurrent.Cancellable;
50  import org.apache.http.entity.ContentType;
51  import org.apache.http.impl.DefaultConnectionReuseStrategy;
52  import org.apache.http.impl.DefaultHttpResponseFactory;
53  import org.apache.http.nio.ContentDecoder;
54  import org.apache.http.nio.ContentEncoder;
55  import org.apache.http.nio.NHttpConnection;
56  import org.apache.http.nio.NHttpServerConnection;
57  import org.apache.http.nio.NHttpServerEventHandler;
58  import org.apache.http.nio.entity.NStringEntity;
59  import org.apache.http.nio.reactor.SessionBufferStatus;
60  import org.apache.http.params.HttpParams;
61  import org.apache.http.protocol.BasicHttpContext;
62  import org.apache.http.protocol.HttpContext;
63  import org.apache.http.protocol.HttpCoreContext;
64  import org.apache.http.protocol.HttpProcessor;
65  import org.apache.http.util.Args;
66  import org.apache.http.util.Asserts;
67  
68  /**
69   * <tt>HttpAsyncService</tt> is a fully asynchronous HTTP server side protocol
70   * handler based on the non-blocking (NIO) I/O model.
71   * <tt>HttpAsyncServerProtocolHandler</tt> translates individual events fired
72   * through the {@link NHttpServerEventHandler} interface into logically related
73   * HTTP message exchanges.
74   * <p/>
75   * Upon receiving an incoming request <tt>HttpAsyncService</tt> verifies
76   * the message for compliance with the server expectations using
77   * {@link HttpAsyncExpectationVerifier}, if provided, and then
78   * {@link HttpAsyncRequestHandlerMapper} is used to map the request
79   * to a particular {@link HttpAsyncRequestHandler} intended to handle
80   * the request with the given URI. The protocol handler uses the selected
81   * {@link HttpAsyncRequestHandler} instance to process the incoming request
82   * and to generate an outgoing response.
83   * <p/>
84   * <tt>HttpAsyncService</tt> relies on {@link HttpProcessor} to generate
85   * mandatory protocol headers for all outgoing messages and apply common,
86   * cross-cutting message transformations to all incoming and outgoing messages,
87   * whereas individual {@link HttpAsyncRequestHandler}s are expected
88   * to implement application specific content generation and processing.
89   * <p/>
90   * Individual {@link HttpAsyncRequestHandler}s do not have to submit a response
91   * immediately. They can defer transmission of an HTTP response back to
92   * the client without blocking the I/O thread by delegating the process of
93   * request handling to another service or a worker thread. HTTP response can
94   * be submitted as a later a later point of time once response content becomes
95   * available.
96   *
97   * @since 4.2
98   */
99  @SuppressWarnings("deprecation")
100 @Immutable // provided injected dependencies are immutable
101 public class HttpAsyncService implements NHttpServerEventHandler {
102 
103     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
104 
105     private final HttpProcessor httpProcessor;
106     private final ConnectionReuseStrategy connStrategy;
107     private final HttpResponseFactory responseFactory;
108     private final HttpAsyncRequestHandlerMapper handlerMapper;
109     private final HttpAsyncExpectationVerifier expectationVerifier;
110     private final ExceptionLogger exceptionLogger;
111 
112     /**
113      * Creates new instance of <tt>HttpAsyncServerProtocolHandler</tt>.
114      *
115      * @param httpProcessor HTTP protocol processor (required).
116      * @param connStrategy Connection re-use strategy (required).
117      * @param responseFactory HTTP response factory (required).
118      * @param handlerResolver Request handler resolver.
119      * @param expectationVerifier Request expectation verifier (optional).
120      * @param params HTTP parameters (required).
121      *
122      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
123      *  ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
124      *    HttpAsyncExpectationVerifier)}
125      */
126     @Deprecated
127     public HttpAsyncService(
128             final HttpProcessor httpProcessor,
129             final ConnectionReuseStrategy connStrategy,
130             final HttpResponseFactory responseFactory,
131             final HttpAsyncRequestHandlerResolver handlerResolver,
132             final HttpAsyncExpectationVerifier expectationVerifier,
133             final HttpParams params) {
134         this(httpProcessor,
135              connStrategy,
136              responseFactory,
137              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
138              expectationVerifier);
139     }
140 
141     /**
142      * Creates new instance of <tt>HttpAsyncServerProtocolHandler</tt>.
143      *
144      * @param httpProcessor HTTP protocol processor (required).
145      * @param connStrategy Connection re-use strategy (required).
146      * @param handlerResolver Request handler resolver.
147      * @param params HTTP parameters (required).
148      *
149      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
150      *   ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
151      *   HttpAsyncExpectationVerifier)}
152      */
153     @Deprecated
154     public HttpAsyncService(
155             final HttpProcessor httpProcessor,
156             final ConnectionReuseStrategy connStrategy,
157             final HttpAsyncRequestHandlerResolver handlerResolver,
158             final HttpParams params) {
159         this(httpProcessor,
160              connStrategy,
161              DefaultHttpResponseFactory.INSTANCE,
162              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
163              null);
164     }
165 
166     /**
167      * Creates new instance of <tt>HttpAsyncServerProtocolHandler</tt>.
168      *
169      * @param httpProcessor HTTP protocol processor.
170      * @param connStrategy Connection re-use strategy. If <code>null</code>
171      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
172      * @param responseFactory HTTP response factory. If <code>null</code>
173      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
174      * @param handlerMapper Request handler mapper.
175      * @param expectationVerifier Request expectation verifier. May be <code>null</code>.
176      *
177      * @since 4.3
178      */
179     public HttpAsyncService(
180             final HttpProcessor httpProcessor,
181             final ConnectionReuseStrategy connStrategy,
182             final HttpResponseFactory responseFactory,
183             final HttpAsyncRequestHandlerMapper handlerMapper,
184             final HttpAsyncExpectationVerifier expectationVerifier) {
185         this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null);
186     }
187 
188     /**
189      * Creates new instance of <tt>HttpAsyncServerProtocolHandler</tt>.
190      *
191      * @param httpProcessor HTTP protocol processor.
192      * @param connStrategy Connection re-use strategy. If <code>null</code>
193      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
194      * @param responseFactory HTTP response factory. If <code>null</code>
195      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
196      * @param handlerMapper Request handler mapper.
197      * @param expectationVerifier Request expectation verifier. May be <code>null</code>.
198      * @param exceptionLogger Exception logger. If <code>null</code>
199      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
200      *   logger will be only used to log I/O exception thrown while closing
201      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
202      *
203      * @since 4.4
204      */
205     public HttpAsyncService(
206             final HttpProcessor httpProcessor,
207             final ConnectionReuseStrategy connStrategy,
208             final HttpResponseFactory responseFactory,
209             final HttpAsyncRequestHandlerMapper handlerMapper,
210             final HttpAsyncExpectationVerifier expectationVerifier,
211             final ExceptionLogger exceptionLogger) {
212         super();
213         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
214         this.connStrategy = connStrategy != null ? connStrategy :
215                 DefaultConnectionReuseStrategy.INSTANCE;
216         this.responseFactory = responseFactory != null ? responseFactory :
217                 DefaultHttpResponseFactory.INSTANCE;
218         this.handlerMapper = handlerMapper;
219         this.expectationVerifier = expectationVerifier;
220         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
221     }
222 
223     /**
224      * Creates new instance of <tt>HttpAsyncServerProtocolHandler</tt>.
225      *
226      * @param httpProcessor HTTP protocol processor.
227      * @param handlerMapper Request handler mapper.
228      *
229      * @since 4.3
230      */
231     public HttpAsyncService(
232             final HttpProcessor httpProcessor,
233             final HttpAsyncRequestHandlerMapper handlerMapper) {
234         this(httpProcessor, null, null, handlerMapper, null);
235     }
236 
237     /**
238      * Creates new instance of <tt>HttpAsyncServerProtocolHandler</tt>.
239      *
240      * @param httpProcessor HTTP protocol processor.
241      * @param handlerMapper Request handler mapper.
242      * @param exceptionLogger Exception logger. If <code>null</code>
243      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
244      *   logger will be only used to log I/O exception thrown while closing
245      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
246      *
247      * @since 4.4
248      */
249     public HttpAsyncService(
250             final HttpProcessor httpProcessor,
251             final HttpAsyncRequestHandlerMapper handlerMapper,
252             final ExceptionLogger exceptionLogger) {
253         this(httpProcessor, null, null, handlerMapper, null, exceptionLogger);
254     }
255 
256     @Override
257     public void connected(final NHttpServerConnection conn) {
258         final State state = new State();
259         conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
260     }
261 
262     @Override
263     public void closed(final NHttpServerConnection conn) {
264         final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE);
265         if (state != null) {
266             state.setTerminated();
267             closeHandlers(state);
268             final Cancellable cancellable = state.getCancellable();
269             if (cancellable != null) {
270                 cancellable.cancel();
271             }
272         }
273     }
274 
275     @Override
276     public void exception(
277             final NHttpServerConnection conn, final Exception cause) {
278         final State state = getState(conn);
279         if (state == null) {
280             shutdownConnection(conn);
281             log(cause);
282             return;
283         }
284         state.setTerminated();
285         closeHandlers(state, cause);
286         final Cancellable cancellable = state.getCancellable();
287         if (cancellable != null) {
288             cancellable.cancel();
289         }
290         final Queue<PipelineEntry> pipeline = state.getPipeline();
291         if (!pipeline.isEmpty()
292                 || conn.isResponseSubmitted()
293                 || state.getResponseState().compareTo(MessageState.INIT) > 0) {
294             // There is not much that we can do if a response
295             // has already been submitted or pipelining is being used.
296             shutdownConnection(conn);
297         } else {
298             try {
299                 final Incoming incoming = state.getIncoming();
300                 final HttpRequest request = incoming != null ? incoming.getRequest() : null;
301                 final HttpContext context = incoming != null ? incoming.getContext() : new BasicHttpContext();
302                 final HttpAsyncResponseProducer responseProducer = handleException(cause, context);
303                 final HttpResponse response = responseProducer.generateResponse();
304                 final Outgoing outgoing = new Outgoing(request, response, responseProducer, context);
305                 state.setResponseState(MessageState.INIT);
306                 state.setOutgoing(outgoing);
307                 commitFinalResponse(conn, state);
308             } catch (final Exception ex) {
309                 shutdownConnection(conn);
310                 closeHandlers(state);
311                 if (ex instanceof RuntimeException) {
312                     throw (RuntimeException) ex;
313                 } else {
314                     log(ex);
315                 }
316             }
317         }
318     }
319 
320     @Override
321     public void requestReceived(
322             final NHttpServerConnection conn) throws IOException, HttpException {
323         final State state = getState(conn);
324         Asserts.notNull(state, "Connection state");
325         Asserts.check(state.getRequestState() == MessageState.READY,
326                 "Unexpected request state %s", state.getRequestState());
327 
328         final HttpRequest request = conn.getHttpRequest();
329         final HttpContext context = new BasicHttpContext();
330 
331         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
332         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
333         this.httpProcessor.process(request, context);
334 
335         final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
336         final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
337         consumer.requestReceived(request);
338 
339         final Incoming incoming = new Incoming(request, requestHandler, consumer, context);
340         state.setIncoming(incoming);
341 
342         if (request instanceof HttpEntityEnclosingRequest) {
343 
344             // If 100-continue is expected make sure
345             // there is no pending response data, no pipelined requests or buffered input
346             if (((HttpEntityEnclosingRequest) request).expectContinue()
347                         && state.getResponseState() == MessageState.READY
348                         && state.getPipeline().isEmpty()
349                         && !(conn instanceof SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) {
350 
351                 state.setRequestState(MessageState.ACK_EXPECTED);
352                 final HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
353                         HttpStatus.SC_CONTINUE, context);
354                 if (this.expectationVerifier != null) {
355                     conn.suspendInput();
356                     conn.suspendOutput();
357                     final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl(
358                             request, ack, state, conn, context);
359                     this.expectationVerifier.verify(httpAsyncExchange, context);
360                 } else {
361                     conn.submitResponse(ack);
362                     state.setRequestState(MessageState.BODY_STREAM);
363                 }
364             } else {
365                 state.setRequestState(MessageState.BODY_STREAM);
366             }
367         } else {
368             // No request content is expected. Process request right away
369             completeRequest(incoming, conn, state);
370         }
371     }
372 
373     @Override
374     public void inputReady(
375             final NHttpServerConnection conn,
376             final ContentDecoder decoder) throws IOException, HttpException {
377         final State state = getState(conn);
378         Asserts.notNull(state, "Connection state");
379         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM,
380                 "Unexpected request state %s", state.getRequestState());
381 
382         final Incoming incoming = state.getIncoming();
383         Asserts.notNull(incoming, "Incoming request");
384         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
385         consumer.consumeContent(decoder, conn);
386         if (decoder.isCompleted()) {
387             completeRequest(incoming, conn, state);
388         }
389     }
390 
391     @Override
392     public void responseReady(
393             final NHttpServerConnection conn) throws IOException, HttpException {
394         final State state = getState(conn);
395         Asserts.notNull(state, "Connection state");
396         Asserts.check(state.getResponseState() == MessageState.READY ||
397                         state.getResponseState() == MessageState.INIT,
398                 "Unexpected response state %s", state.getResponseState());
399 
400         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
401             final Outgoing outgoing = state.getOutgoing();
402             Asserts.notNull(outgoing, "Outgoing response");
403 
404             final HttpResponse response = outgoing.getResponse();
405             final int status = response.getStatusLine().getStatusCode();
406             if (status == 100) {
407                 final HttpContext context = outgoing.getContext();
408                 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
409                 try {
410                     // Make sure 100 response has no entity
411                     response.setEntity(null);
412                     conn.requestInput();
413                     state.setRequestState(MessageState.BODY_STREAM);
414                     state.setOutgoing(null);
415                     conn.submitResponse(response);
416                     responseProducer.responseCompleted(context);
417                 } finally {
418                     responseProducer.close();
419                 }
420             } else if (status >= 400) {
421                 conn.resetInput();
422                 state.setRequestState(MessageState.READY);
423                 commitFinalResponse(conn, state);
424             } else {
425                 throw new HttpException("Invalid response: " + response.getStatusLine());
426             }
427         } else {
428             if (state.getResponseState() == MessageState.READY) {
429                 final Queue<PipelineEntry> pipeline = state.getPipeline();
430                 final PipelineEntry pipelineEntry = pipeline.poll();
431                 if (pipelineEntry == null) {
432                     conn.suspendOutput();
433                     return;
434                 }
435                 state.setResponseState(MessageState.INIT);
436                 final Object result = pipelineEntry.getResult();
437                 final HttpRequest request = pipelineEntry.getRequest();
438                 final HttpContext context = pipelineEntry.getContext();
439                 if (result != null) {
440                     final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
441                             HttpStatus.SC_OK, context);
442                     final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
443                             request, response, state, conn, context);
444                     final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler();
445                     conn.suspendOutput();
446                     handler.handle(result, httpExchange, context);
447                 } else {
448                     final Exception exception = pipelineEntry.getException();
449                     final HttpAsyncResponseProducer responseProducer = handleException(
450                             exception != null ? exception : new HttpException("Internal error processing request"),
451                             context);
452                     final HttpResponse error = responseProducer.generateResponse();
453                     state.setOutgoing(new Outgoing(request, error, responseProducer, context));
454                 }
455             }
456             if (state.getResponseState() == MessageState.INIT) {
457                 final Outgoing outgoing;
458                 synchronized (state) {
459                     outgoing = state.getOutgoing();
460                     if (outgoing == null) {
461                         conn.suspendOutput();
462                         return;
463                     }
464                 }
465                 final HttpResponse response = outgoing.getResponse();
466                 final int status = response.getStatusLine().getStatusCode();
467                 if (status >= 200) {
468                     commitFinalResponse(conn, state);
469                 } else {
470                     throw new HttpException("Invalid response: " + response.getStatusLine());
471                 }
472             }
473         }
474     }
475 
476     @Override
477     public void outputReady(
478             final NHttpServerConnection conn,
479             final ContentEncoder encoder) throws HttpException, IOException {
480         final State state = getState(conn);
481         Asserts.notNull(state, "Connection state");
482         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
483                 "Unexpected response state %s", state.getResponseState());
484 
485         final Outgoing outgoing = state.getOutgoing();
486         Asserts.notNull(outgoing, "Outgoing response");
487         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
488 
489         responseProducer.produceContent(encoder, conn);
490 
491         if (encoder.isCompleted()) {
492             completeResponse(outgoing, conn, state);
493         }
494     }
495 
496     @Override
497     public void endOfInput(final NHttpServerConnection conn) throws IOException {
498         // Closing connection in an orderly manner and
499         // waiting for output buffer to get flushed.
500         // Do not want to wait indefinitely, though, in case
501         // the opposite end is not reading
502         if (conn.getSocketTimeout() <= 0) {
503             conn.setSocketTimeout(1000);
504         }
505         conn.close();
506     }
507 
508     @Override
509     public void timeout(final NHttpServerConnection conn) throws IOException {
510         final State state = getState(conn);
511         if (state != null) {
512             closeHandlers(state, new SocketTimeoutException());
513         }
514         if (conn.getStatus() == NHttpConnection.ACTIVE) {
515             conn.close();
516             if (conn.getStatus() == NHttpConnection.CLOSING) {
517                 // Give the connection some grace time to
518                 // close itself nicely
519                 conn.setSocketTimeout(250);
520             }
521         } else {
522             conn.shutdown();
523         }
524     }
525 
526     private State getState(final NHttpConnection conn) {
527         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
528     }
529 
530     /**
531      * This method can be used to log I/O exception thrown while closing
532      * {@link java.io.Closeable} objects (such as
533      * {@link org.apache.http.HttpConnection}).
534      *
535      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
536      */
537     protected void log(final Exception ex) {
538         this.exceptionLogger.log(ex);
539     }
540 
541     private void closeConnection(final NHttpConnection conn) {
542         try {
543             conn.close();
544         } catch (final IOException ex) {
545             log(ex);
546         }
547     }
548 
549     private void shutdownConnection(final NHttpConnection conn) {
550         try {
551             conn.shutdown();
552         } catch (final IOException ex) {
553             log(ex);
554         }
555     }
556 
557     private void closeHandlers(final State state, final Exception ex) {
558         final HttpAsyncRequestConsumer<Object> consumer =
559                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
560         if (consumer != null) {
561             try {
562                 consumer.failed(ex);
563             } finally {
564                 try {
565                     consumer.close();
566                 } catch (final IOException ioex) {
567                     log(ioex);
568                 }
569             }
570         }
571         final HttpAsyncResponseProducer producer =
572                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
573         if (producer != null) {
574             try {
575                 producer.failed(ex);
576             } finally {
577                 try {
578                     producer.close();
579                 } catch (final IOException ioex) {
580                     log(ioex);
581                 }
582             }
583         }
584     }
585 
586     private void closeHandlers(final State state) {
587         final HttpAsyncRequestConsumer<Object> consumer =
588                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
589         if (consumer != null) {
590             try {
591                 consumer.close();
592             } catch (final IOException ioex) {
593                 log(ioex);
594             }
595         }
596         final HttpAsyncResponseProducer producer =
597                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
598         if (producer != null) {
599             try {
600                 producer.close();
601             } catch (final IOException ioex) {
602                 log(ioex);
603             }
604         }
605     }
606 
607     protected HttpAsyncResponseProducer handleException(
608             final Exception ex, final HttpContext context) {
609         final int code;
610         if (ex instanceof MethodNotSupportedException) {
611             code = HttpStatus.SC_NOT_IMPLEMENTED;
612         } else if (ex instanceof UnsupportedHttpVersionException) {
613             code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
614         } else if (ex instanceof ProtocolException) {
615             code = HttpStatus.SC_BAD_REQUEST;
616         } else {
617             code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
618         }
619         String message = ex.getMessage();
620         if (message == null) {
621             message = ex.toString();
622         }
623         final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
624                 code, context);
625         return new ErrorResponseProducer(response,
626                 new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
627     }
628 
629     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
630         if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
631             return false;
632         }
633         final int status = response.getStatusLine().getStatusCode();
634         return status >= HttpStatus.SC_OK
635             && status != HttpStatus.SC_NO_CONTENT
636             && status != HttpStatus.SC_NOT_MODIFIED
637             && status != HttpStatus.SC_RESET_CONTENT;
638     }
639 
640     private void completeRequest(
641             final Incoming incoming,
642             final NHttpServerConnection conn,
643             final State state) throws IOException, HttpException {
644         state.setRequestState(MessageState.READY);
645         state.setIncoming(null);
646 
647         final PipelineEntry pipelineEntry;
648         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
649         try {
650             final HttpContext context = incoming.getContext();
651             consumer.requestCompleted(context);
652             pipelineEntry = new PipelineEntry(
653                     incoming.getRequest(),
654                     consumer.getResult(),
655                     consumer.getException(),
656                     incoming.getHandler(),
657                     context);
658         } finally {
659             consumer.close();
660         }
661         final Queue<PipelineEntry> pipeline = state.getPipeline();
662         pipeline.add(pipelineEntry);
663         if (state.getResponseState() == MessageState.READY) {
664             conn.requestOutput();
665         }
666     }
667 
668     private void commitFinalResponse(
669             final NHttpServerConnection conn,
670             final State state) throws IOException, HttpException {
671         final Outgoing outgoing = state.getOutgoing();
672         Asserts.notNull(outgoing, "Outgoing response");
673         final HttpRequest request = outgoing.getRequest();
674         final HttpResponse response = outgoing.getResponse();
675         final HttpContext context = outgoing.getContext();
676 
677         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
678         this.httpProcessor.process(response, context);
679 
680         HttpEntity entity = response.getEntity();
681         if (entity != null && !canResponseHaveBody(request, response)) {
682             response.setEntity(null);
683             entity = null;
684         }
685 
686         conn.submitResponse(response);
687 
688         if (entity == null) {
689             completeResponse(outgoing, conn, state);
690         } else {
691             state.setResponseState(MessageState.BODY_STREAM);
692         }
693     }
694 
695     private void completeResponse(
696             final Outgoing outgoing,
697             final NHttpServerConnection conn,
698             final State state) throws IOException, HttpException {
699         final HttpContext context = outgoing.getContext();
700         final HttpResponse response = outgoing.getResponse();
701         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
702         try {
703             responseProducer.responseCompleted(context);
704             state.setOutgoing(null);
705             state.setCancellable(null);
706             state.setResponseState(MessageState.READY);
707         } finally {
708             responseProducer.close();
709         }
710         if (!this.connStrategy.keepAlive(response, context)) {
711             conn.close();
712         } else {
713             // Ready to process new request
714             final Queue<PipelineEntry> pipeline = state.getPipeline();
715             if (pipeline.isEmpty()) {
716                 conn.suspendOutput();
717             }
718             conn.requestInput();
719         }
720     }
721 
722     @SuppressWarnings("unchecked")
723     private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
724         HttpAsyncRequestHandler<Object> handler = null;
725         if (this.handlerMapper != null) {
726             handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
727         }
728         if (handler == null) {
729             handler = new NullRequestHandler();
730         }
731         return handler;
732     }
733 
734     static class Incoming {
735 
736         private final HttpRequest request;
737         private final HttpAsyncRequestHandler<Object> handler;
738         private final HttpAsyncRequestConsumer<Object> consumer;
739         private final HttpContext context;
740 
741         Incoming(
742                 final HttpRequest request,
743                 final HttpAsyncRequestHandler<Object> handler,
744                 final HttpAsyncRequestConsumer<Object> consumer,
745                 final HttpContext context) {
746             this.request = request;
747             this.handler = handler;
748             this.consumer = consumer;
749             this.context = context;
750         }
751 
752         public HttpRequest getRequest() {
753             return this.request;
754         }
755 
756         public HttpAsyncRequestHandler<Object> getHandler() {
757             return this.handler;
758         }
759 
760         public HttpAsyncRequestConsumer<Object> getConsumer() {
761             return this.consumer;
762         }
763 
764         public HttpContext getContext() {
765             return this.context;
766         }
767     }
768 
769     static class Outgoing {
770 
771         private final HttpRequest request;
772         private final HttpResponse response;
773         private final HttpAsyncResponseProducer producer;
774         private final HttpContext context;
775 
776         Outgoing(
777                 final HttpRequest request,
778                 final HttpResponse response,
779                 final HttpAsyncResponseProducer producer,
780                 final HttpContext context) {
781             this.request = request;
782             this.response = response;
783             this.producer = producer;
784             this.context = context;
785         }
786 
787         public HttpRequest getRequest() {
788             return this.request;
789         }
790 
791         public HttpResponse getResponse() {
792             return this.response;
793         }
794 
795         public HttpAsyncResponseProducer getProducer() {
796             return this.producer;
797         }
798 
799         public HttpContext getContext() {
800             return this.context;
801         }
802     }
803 
804     static class PipelineEntry {
805 
806         private final HttpRequest request;
807         private final Object result;
808         private final Exception exception;
809         private final HttpAsyncRequestHandler<Object> handler;
810         private final HttpContext context;
811 
812         PipelineEntry(
813                 final HttpRequest request,
814                 final Object result,
815                 final Exception exception,
816                 final HttpAsyncRequestHandler<Object> handler,
817                 final HttpContext context) {
818             this.request = request;
819             this.result = result;
820             this.exception = exception;
821             this.handler = handler;
822             this.context = context;
823         }
824 
825         public HttpRequest getRequest() {
826             return this.request;
827         }
828 
829         public Object getResult() {
830             return this.result;
831         }
832 
833         public Exception getException() {
834             return this.exception;
835         }
836 
837         public HttpAsyncRequestHandler<Object> getHandler() {
838             return this.handler;
839         }
840 
841         public HttpContext getContext() {
842             return this.context;
843         }
844 
845     }
846 
847     static class State {
848 
849         private final Queue<PipelineEntry> pipeline;
850         private volatile boolean terminated;
851         private volatile MessageState requestState;
852         private volatile MessageState responseState;
853         private volatile Incoming incoming;
854         private volatile Outgoing outgoing;
855         private volatile Cancellable cancellable;
856 
857         State() {
858             super();
859             this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>();
860             this.requestState = MessageState.READY;
861             this.responseState = MessageState.READY;
862         }
863 
864         public boolean isTerminated() {
865             return this.terminated;
866         }
867 
868         public void setTerminated() {
869             this.terminated = true;
870         }
871 
872         public MessageState getRequestState() {
873             return this.requestState;
874         }
875 
876         public void setRequestState(final MessageState state) {
877             this.requestState = state;
878         }
879 
880         public MessageState getResponseState() {
881             return this.responseState;
882         }
883 
884         public void setResponseState(final MessageState state) {
885             this.responseState = state;
886         }
887 
888         public Incoming getIncoming() {
889             return this.incoming;
890         }
891 
892         public void setIncoming(final Incoming incoming) {
893             this.incoming = incoming;
894         }
895 
896         public Outgoing getOutgoing() {
897             return this.outgoing;
898         }
899 
900         public void setOutgoing(final Outgoing outgoing) {
901             this.outgoing = outgoing;
902         }
903 
904         public Cancellable getCancellable() {
905             return this.cancellable;
906         }
907 
908         public void setCancellable(final Cancellable cancellable) {
909             this.cancellable = cancellable;
910         }
911 
912         public Queue<PipelineEntry> getPipeline() {
913             return this.pipeline;
914         }
915 
916         @Override
917         public String toString() {
918             final StringBuilder buf = new StringBuilder();
919             buf.append("[incoming ");
920             buf.append(this.requestState);
921             if (this.incoming != null) {
922                 buf.append(" ");
923                 buf.append(this.incoming.getRequest().getRequestLine());
924             }
925             buf.append("; outgoing ");
926             buf.append(this.responseState);
927             if (this.outgoing != null) {
928                 buf.append(" ");
929                 buf.append(this.outgoing.getResponse().getStatusLine());
930             }
931             buf.append("]");
932             return buf.toString();
933         }
934 
935     }
936 
937     static class HttpAsyncExchangeImpl implements HttpAsyncExchange {
938 
939         private final HttpRequest request;
940         private final HttpResponse response;
941         private final State state;
942         private final NHttpServerConnection conn;
943         private final HttpContext context;
944 
945         private volatile boolean completed;
946 
947         public HttpAsyncExchangeImpl(
948                 final HttpRequest request,
949                 final HttpResponse response,
950                 final State state,
951                 final NHttpServerConnection conn,
952                 final HttpContext context) {
953             super();
954             this.request = request;
955             this.response = response;
956             this.state = state;
957             this.conn = conn;
958             this.context = context;
959         }
960 
961         @Override
962         public HttpRequest getRequest() {
963             return this.request;
964         }
965 
966         @Override
967         public HttpResponse getResponse() {
968             return this.response;
969         }
970 
971         @Override
972         public void setCallback(final Cancellable cancellable) {
973             Asserts.check(!this.completed, "Response already submitted");
974             if (this.state.isTerminated() && cancellable != null) {
975                 cancellable.cancel();
976             } else {
977                 this.state.setCancellable(cancellable);
978             }
979         }
980 
981         @Override
982         public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
983             Args.notNull(responseProducer, "Response producer");
984             Asserts.check(!this.completed, "Response already submitted");
985             this.completed = true;
986             if (!this.state.isTerminated()) {
987                 final HttpResponse response = responseProducer.generateResponse();
988                 final Outgoing outgoing = new Outgoing(
989                         this.request, response, responseProducer, this.context);
990 
991                 synchronized (this.state) {
992                     this.state.setOutgoing(outgoing);
993                     this.state.setCancellable(null);
994                     this.conn.requestOutput();
995                 }
996 
997             } else {
998                 try {
999                     responseProducer.close();
1000                 } catch (final IOException ex) {
1001                 }
1002             }
1003         }
1004 
1005         @Override
1006         public void submitResponse() {
1007             submitResponse(new BasicAsyncResponseProducer(this.response));
1008         }
1009 
1010         @Override
1011         public boolean isCompleted() {
1012             return this.completed;
1013         }
1014 
1015         @Override
1016         public void setTimeout(final int timeout) {
1017             this.conn.setSocketTimeout(timeout);
1018         }
1019 
1020         @Override
1021         public int getTimeout() {
1022             return this.conn.getSocketTimeout();
1023         }
1024 
1025     }
1026 
1027     /**
1028      * Adaptor class to transition from HttpAsyncRequestHandlerResolver to HttpAsyncRequestHandlerMapper.
1029      */
1030     @Deprecated
1031     private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
1032 
1033         private final HttpAsyncRequestHandlerResolver resolver;
1034 
1035         public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
1036             this.resolver = resolver;
1037         }
1038 
1039         @Override
1040         public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
1041             return resolver.lookup(request.getRequestLine().getUri());
1042         }
1043 
1044     }
1045 
1046 }