View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.http.ConnectionReuseStrategy;
37  import org.apache.http.ExceptionLogger;
38  import org.apache.http.HttpEntity;
39  import org.apache.http.HttpEntityEnclosingRequest;
40  import org.apache.http.HttpException;
41  import org.apache.http.HttpRequest;
42  import org.apache.http.HttpResponse;
43  import org.apache.http.HttpResponseFactory;
44  import org.apache.http.HttpStatus;
45  import org.apache.http.HttpVersion;
46  import org.apache.http.MethodNotSupportedException;
47  import org.apache.http.ProtocolException;
48  import org.apache.http.UnsupportedHttpVersionException;
49  import org.apache.http.annotation.ThreadingBehavior;
50  import org.apache.http.annotation.Contract;
51  import org.apache.http.concurrent.Cancellable;
52  import org.apache.http.entity.ContentType;
53  import org.apache.http.impl.DefaultConnectionReuseStrategy;
54  import org.apache.http.impl.DefaultHttpResponseFactory;
55  import org.apache.http.nio.ContentDecoder;
56  import org.apache.http.nio.ContentEncoder;
57  import org.apache.http.nio.NHttpConnection;
58  import org.apache.http.nio.NHttpServerConnection;
59  import org.apache.http.nio.NHttpServerEventHandler;
60  import org.apache.http.nio.entity.NStringEntity;
61  import org.apache.http.nio.reactor.SessionBufferStatus;
62  import org.apache.http.params.HttpParams;
63  import org.apache.http.protocol.BasicHttpContext;
64  import org.apache.http.protocol.HttpContext;
65  import org.apache.http.protocol.HttpCoreContext;
66  import org.apache.http.protocol.HttpProcessor;
67  import org.apache.http.util.Args;
68  import org.apache.http.util.Asserts;
69  
70  /**
71   * {@code HttpAsyncService} is a fully asynchronous HTTP server side protocol
72   * handler based on the non-blocking (NIO) I/O model.
73   * {@code HttpAsyncServerProtocolHandler} translates individual events fired
74   * through the {@link NHttpServerEventHandler} interface into logically related
75   * HTTP message exchanges.
76   * <p>
77   * Upon receiving an incoming request {@code HttpAsyncService} verifies
78   * the message for compliance with the server expectations using
79   * {@link HttpAsyncExpectationVerifier}, if provided, and then
80   * {@link HttpAsyncRequestHandlerMapper} is used to map the request
81   * to a particular {@link HttpAsyncRequestHandler} intended to handle
82   * the request with the given URI. The protocol handler uses the selected
83   * {@link HttpAsyncRequestHandler} instance to process the incoming request
84   * and to generate an outgoing response.
85   * <p>
86   * {@code HttpAsyncService} relies on {@link HttpProcessor} to generate
87   * mandatory protocol headers for all outgoing messages and apply common,
88   * cross-cutting message transformations to all incoming and outgoing messages,
89   * whereas individual {@link HttpAsyncRequestHandler}s are expected
90   * to implement application specific content generation and processing.
91   * <p>
92   * Individual {@link HttpAsyncRequestHandler}s do not have to submit a response
93   * immediately. They can defer transmission of an HTTP response back to
94   * the client without blocking the I/O thread by delegating the process of
95   * request handling to another service or a worker thread. HTTP response can
96   * be submitted as a later a later point of time once response content becomes
97   * available.
98   *
99   * @since 4.2
100  */
101 @SuppressWarnings("deprecation")
102 @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
103 public class HttpAsyncService implements NHttpServerEventHandler {
104 
105     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
106 
107     private final HttpProcessor httpProcessor;
108     private final ConnectionReuseStrategy connStrategy;
109     private final HttpResponseFactory responseFactory;
110     private final HttpAsyncRequestHandlerMapper handlerMapper;
111     private final HttpAsyncExpectationVerifier expectationVerifier;
112     private final ExceptionLogger exceptionLogger;
113 
114     /**
115      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
116      *
117      * @param httpProcessor HTTP protocol processor (required).
118      * @param connStrategy Connection re-use strategy (required).
119      * @param responseFactory HTTP response factory (required).
120      * @param handlerResolver Request handler resolver.
121      * @param expectationVerifier Request expectation verifier (optional).
122      * @param params HTTP parameters (required).
123      *
124      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
125      *  ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
126      *    HttpAsyncExpectationVerifier)}
127      */
128     @Deprecated
129     public HttpAsyncService(
130             final HttpProcessor httpProcessor,
131             final ConnectionReuseStrategy connStrategy,
132             final HttpResponseFactory responseFactory,
133             final HttpAsyncRequestHandlerResolver handlerResolver,
134             final HttpAsyncExpectationVerifier expectationVerifier,
135             final HttpParams params) {
136         this(httpProcessor,
137              connStrategy,
138              responseFactory,
139              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
140              expectationVerifier);
141     }
142 
143     /**
144      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
145      *
146      * @param httpProcessor HTTP protocol processor (required).
147      * @param connStrategy Connection re-use strategy (required).
148      * @param handlerResolver Request handler resolver.
149      * @param params HTTP parameters (required).
150      *
151      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
152      *   ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
153      *   HttpAsyncExpectationVerifier)}
154      */
155     @Deprecated
156     public HttpAsyncService(
157             final HttpProcessor httpProcessor,
158             final ConnectionReuseStrategy connStrategy,
159             final HttpAsyncRequestHandlerResolver handlerResolver,
160             final HttpParams params) {
161         this(httpProcessor,
162              connStrategy,
163              DefaultHttpResponseFactory.INSTANCE,
164              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
165              null);
166     }
167 
168     /**
169      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
170      *
171      * @param httpProcessor HTTP protocol processor.
172      * @param connStrategy Connection re-use strategy. If {@code null}
173      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
174      * @param responseFactory HTTP response factory. If {@code null}
175      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
176      * @param handlerMapper Request handler mapper.
177      * @param expectationVerifier Request expectation verifier. May be {@code null}.
178      *
179      * @since 4.3
180      */
181     public HttpAsyncService(
182             final HttpProcessor httpProcessor,
183             final ConnectionReuseStrategy connStrategy,
184             final HttpResponseFactory responseFactory,
185             final HttpAsyncRequestHandlerMapper handlerMapper,
186             final HttpAsyncExpectationVerifier expectationVerifier) {
187         this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null);
188     }
189 
190     /**
191      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
192      *
193      * @param httpProcessor HTTP protocol processor.
194      * @param connStrategy Connection re-use strategy. If {@code null}
195      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
196      * @param responseFactory HTTP response factory. If {@code null}
197      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
198      * @param handlerMapper Request handler mapper.
199      * @param expectationVerifier Request expectation verifier. May be {@code null}.
200      * @param exceptionLogger Exception logger. If {@code null}
201      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
202      *   logger will be only used to log I/O exception thrown while closing
203      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
204      *
205      * @since 4.4
206      */
207     public HttpAsyncService(
208             final HttpProcessor httpProcessor,
209             final ConnectionReuseStrategy connStrategy,
210             final HttpResponseFactory responseFactory,
211             final HttpAsyncRequestHandlerMapper handlerMapper,
212             final HttpAsyncExpectationVerifier expectationVerifier,
213             final ExceptionLogger exceptionLogger) {
214         super();
215         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
216         this.connStrategy = connStrategy != null ? connStrategy :
217                 DefaultConnectionReuseStrategy.INSTANCE;
218         this.responseFactory = responseFactory != null ? responseFactory :
219                 DefaultHttpResponseFactory.INSTANCE;
220         this.handlerMapper = handlerMapper;
221         this.expectationVerifier = expectationVerifier;
222         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
223     }
224 
225     /**
226      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
227      *
228      * @param httpProcessor HTTP protocol processor.
229      * @param handlerMapper Request handler mapper.
230      *
231      * @since 4.3
232      */
233     public HttpAsyncService(
234             final HttpProcessor httpProcessor,
235             final HttpAsyncRequestHandlerMapper handlerMapper) {
236         this(httpProcessor, null, null, handlerMapper, null);
237     }
238 
239     /**
240      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
241      *
242      * @param httpProcessor HTTP protocol processor.
243      * @param handlerMapper Request handler mapper.
244      * @param exceptionLogger Exception logger. If {@code null}
245      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
246      *   logger will be only used to log I/O exception thrown while closing
247      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
248      *
249      * @since 4.4
250      */
251     public HttpAsyncService(
252             final HttpProcessor httpProcessor,
253             final HttpAsyncRequestHandlerMapper handlerMapper,
254             final ExceptionLogger exceptionLogger) {
255         this(httpProcessor, null, null, handlerMapper, null, exceptionLogger);
256     }
257 
258     @Override
259     public void connected(final NHttpServerConnection conn) {
260         final State state = new State();
261         conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
262     }
263 
264     @Override
265     public void closed(final NHttpServerConnection conn) {
266         final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE);
267         if (state != null) {
268             state.setTerminated();
269             closeHandlers(state);
270             final Cancellable cancellable = state.getCancellable();
271             if (cancellable != null) {
272                 cancellable.cancel();
273             }
274         }
275     }
276 
277     @Override
278     public void exception(
279             final NHttpServerConnection conn, final Exception cause) {
280         final State state = getState(conn);
281         if (state == null) {
282             shutdownConnection(conn);
283             log(cause);
284             return;
285         }
286         state.setTerminated();
287         closeHandlers(state, cause);
288         final Cancellable cancellable = state.getCancellable();
289         if (cancellable != null) {
290             cancellable.cancel();
291         }
292         final Queue<PipelineEntry> pipeline = state.getPipeline();
293         if (!pipeline.isEmpty()
294                 || conn.isResponseSubmitted()
295                 || state.getResponseState().compareTo(MessageState.INIT) > 0) {
296             // There is not much that we can do if a response
297             // has already been submitted or pipelining is being used.
298             shutdownConnection(conn);
299         } else {
300             try {
301                 final Incoming incoming = state.getIncoming();
302                 final HttpRequest request = incoming != null ? incoming.getRequest() : null;
303                 final HttpContext context = incoming != null ? incoming.getContext() : new BasicHttpContext();
304                 final HttpAsyncResponseProducer responseProducer = handleException(cause, context);
305                 final HttpResponse response = responseProducer.generateResponse();
306                 final Outgoing outgoing = new Outgoing(request, response, responseProducer, context);
307                 state.setResponseState(MessageState.INIT);
308                 state.setOutgoing(outgoing);
309                 commitFinalResponse(conn, state);
310             } catch (final Exception ex) {
311                 shutdownConnection(conn);
312                 closeHandlers(state);
313                 if (ex instanceof RuntimeException) {
314                     throw (RuntimeException) ex;
315                 } else {
316                     log(ex);
317                 }
318             }
319         }
320     }
321 
322     @Override
323     public void requestReceived(
324             final NHttpServerConnection conn) throws IOException, HttpException {
325         final State state = getState(conn);
326         Asserts.notNull(state, "Connection state");
327         Asserts.check(state.getRequestState() == MessageState.READY,
328                 "Unexpected request state %s", state.getRequestState());
329 
330         final HttpRequest request = conn.getHttpRequest();
331         final HttpContext context = new BasicHttpContext();
332 
333         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
334         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
335         this.httpProcessor.process(request, context);
336 
337         final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
338         final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
339         consumer.requestReceived(request);
340 
341         final Incoming incoming = new Incoming(request, requestHandler, consumer, context);
342         state.setIncoming(incoming);
343 
344         if (request instanceof HttpEntityEnclosingRequest) {
345 
346             // If 100-continue is expected make sure
347             // there is no pending response data, no pipelined requests or buffered input
348             if (((HttpEntityEnclosingRequest) request).expectContinue()
349                         && state.getResponseState() == MessageState.READY
350                         && state.getPipeline().isEmpty()
351                         && !(conn instanceof SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) {
352 
353                 state.setRequestState(MessageState.ACK_EXPECTED);
354                 final HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
355                         HttpStatus.SC_CONTINUE, context);
356                 if (this.expectationVerifier != null) {
357                     conn.suspendInput();
358                     conn.suspendOutput();
359                     final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl(
360                             request, ack, state, conn, context);
361                     this.expectationVerifier.verify(httpAsyncExchange, context);
362                 } else {
363                     conn.submitResponse(ack);
364                     state.setRequestState(MessageState.BODY_STREAM);
365                 }
366             } else {
367                 state.setRequestState(MessageState.BODY_STREAM);
368             }
369         } else {
370             // No request content is expected. Process request right away
371             completeRequest(incoming, conn, state);
372         }
373     }
374 
375     @Override
376     public void inputReady(
377             final NHttpServerConnection conn,
378             final ContentDecoder decoder) throws IOException, HttpException {
379         final State state = getState(conn);
380         Asserts.notNull(state, "Connection state");
381         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM,
382                 "Unexpected request state %s", state.getRequestState());
383 
384         final Incoming incoming = state.getIncoming();
385         Asserts.notNull(incoming, "Incoming request");
386         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
387         consumer.consumeContent(decoder, conn);
388         if (decoder.isCompleted()) {
389             completeRequest(incoming, conn, state);
390         }
391     }
392 
393     @Override
394     public void responseReady(
395             final NHttpServerConnection conn) throws IOException, HttpException {
396         final State state = getState(conn);
397         Asserts.notNull(state, "Connection state");
398         Asserts.check(state.getResponseState() == MessageState.READY ||
399                         state.getResponseState() == MessageState.INIT,
400                 "Unexpected response state %s", state.getResponseState());
401 
402         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
403             final Outgoing outgoing = state.getOutgoing();
404             Asserts.notNull(outgoing, "Outgoing response");
405 
406             final HttpResponse response = outgoing.getResponse();
407             final int status = response.getStatusLine().getStatusCode();
408             if (status == 100) {
409                 final HttpContext context = outgoing.getContext();
410                 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
411                 try {
412                     // Make sure 100 response has no entity
413                     response.setEntity(null);
414                     conn.requestInput();
415                     state.setRequestState(MessageState.BODY_STREAM);
416                     state.setOutgoing(null);
417                     conn.submitResponse(response);
418                     responseProducer.responseCompleted(context);
419                 } finally {
420                     responseProducer.close();
421                 }
422             } else if (status >= 400) {
423                 conn.resetInput();
424                 state.setRequestState(MessageState.READY);
425                 commitFinalResponse(conn, state);
426             } else {
427                 throw new HttpException("Invalid response: " + response.getStatusLine());
428             }
429         } else {
430             if (state.getResponseState() == MessageState.READY) {
431                 final Queue<PipelineEntry> pipeline = state.getPipeline();
432                 final PipelineEntry pipelineEntry = pipeline.poll();
433                 if (pipelineEntry == null) {
434                     conn.suspendOutput();
435                     return;
436                 }
437                 state.setResponseState(MessageState.INIT);
438                 final Object result = pipelineEntry.getResult();
439                 final HttpRequest request = pipelineEntry.getRequest();
440                 final HttpContext context = pipelineEntry.getContext();
441                 if (result != null) {
442                     final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
443                             HttpStatus.SC_OK, context);
444                     final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
445                             request, response, state, conn, context);
446                     final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler();
447                     conn.suspendOutput();
448                     try {
449                         handler.handle(result, httpExchange, context);
450                     } catch (RuntimeException ex) {
451                         throw ex;
452                     } catch (Exception ex) {
453                         pipeline.add(new PipelineEntry(
454                             request,
455                             null,
456                             ex,
457                             handler,
458                             context));
459                         state.setResponseState(MessageState.READY);
460                         responseReady(conn);
461                         return;
462                     }
463                 } else {
464                     final Exception exception = pipelineEntry.getException();
465                     final HttpAsyncResponseProducer responseProducer = handleException(
466                             exception != null ? exception : new HttpException("Internal error processing request"),
467                             context);
468                     final HttpResponse error = responseProducer.generateResponse();
469                     state.setOutgoing(new Outgoing(request, error, responseProducer, context));
470                 }
471             }
472             if (state.getResponseState() == MessageState.INIT) {
473                 final Outgoing outgoing;
474                 synchronized (state) {
475                     outgoing = state.getOutgoing();
476                     if (outgoing == null) {
477                         conn.suspendOutput();
478                         return;
479                     }
480                 }
481                 final HttpResponse response = outgoing.getResponse();
482                 final int status = response.getStatusLine().getStatusCode();
483                 if (status >= 200) {
484                     commitFinalResponse(conn, state);
485                 } else {
486                     throw new HttpException("Invalid response: " + response.getStatusLine());
487                 }
488             }
489         }
490     }
491 
492     @Override
493     public void outputReady(
494             final NHttpServerConnection conn,
495             final ContentEncoder encoder) throws HttpException, IOException {
496         final State state = getState(conn);
497         Asserts.notNull(state, "Connection state");
498         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
499                 "Unexpected response state %s", state.getResponseState());
500 
501         final Outgoing outgoing = state.getOutgoing();
502         Asserts.notNull(outgoing, "Outgoing response");
503         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
504 
505         responseProducer.produceContent(encoder, conn);
506 
507         if (encoder.isCompleted()) {
508             completeResponse(outgoing, conn, state);
509         }
510     }
511 
512     @Override
513     public void endOfInput(final NHttpServerConnection conn) throws IOException {
514         // Closing connection in an orderly manner and
515         // waiting for output buffer to get flushed.
516         // Do not want to wait indefinitely, though, in case
517         // the opposite end is not reading
518         if (conn.getSocketTimeout() <= 0) {
519             conn.setSocketTimeout(1000);
520         }
521         conn.close();
522     }
523 
524     @Override
525     public void timeout(final NHttpServerConnection conn) throws IOException {
526         final State state = getState(conn);
527         if (state != null) {
528             closeHandlers(state, new SocketTimeoutException());
529         }
530         if (conn.getStatus() == NHttpConnection.ACTIVE) {
531             conn.close();
532             if (conn.getStatus() == NHttpConnection.CLOSING) {
533                 // Give the connection some grace time to
534                 // close itself nicely
535                 conn.setSocketTimeout(250);
536             }
537         } else {
538             conn.shutdown();
539         }
540     }
541 
542     private State getState(final NHttpConnection conn) {
543         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
544     }
545 
546     /**
547      * This method can be used to log I/O exception thrown while closing
548      * {@link java.io.Closeable} objects (such as
549      * {@link org.apache.http.HttpConnection}).
550      *
551      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
552      */
553     protected void log(final Exception ex) {
554         this.exceptionLogger.log(ex);
555     }
556 
557     private void shutdownConnection(final NHttpConnection conn) {
558         try {
559             conn.shutdown();
560         } catch (final IOException ex) {
561             log(ex);
562         }
563     }
564 
565     private void closeHandlers(final State state, final Exception ex) {
566         final HttpAsyncRequestConsumer<Object> consumer =
567                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
568         if (consumer != null) {
569             try {
570                 consumer.failed(ex);
571             } finally {
572                 try {
573                     consumer.close();
574                 } catch (final IOException ioex) {
575                     log(ioex);
576                 }
577             }
578         }
579         final HttpAsyncResponseProducer producer =
580                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
581         if (producer != null) {
582             try {
583                 producer.failed(ex);
584             } finally {
585                 try {
586                     producer.close();
587                 } catch (final IOException ioex) {
588                     log(ioex);
589                 }
590             }
591         }
592     }
593 
594     private void closeHandlers(final State state) {
595         final HttpAsyncRequestConsumer<Object> consumer =
596                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
597         if (consumer != null) {
598             try {
599                 consumer.close();
600             } catch (final IOException ioex) {
601                 log(ioex);
602             }
603         }
604         final HttpAsyncResponseProducer producer =
605                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
606         if (producer != null) {
607             try {
608                 producer.close();
609             } catch (final IOException ioex) {
610                 log(ioex);
611             }
612         }
613     }
614 
615     protected HttpAsyncResponseProducer handleException(
616             final Exception ex, final HttpContext context) {
617         final int code;
618         if (ex instanceof MethodNotSupportedException) {
619             code = HttpStatus.SC_NOT_IMPLEMENTED;
620         } else if (ex instanceof UnsupportedHttpVersionException) {
621             code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
622         } else if (ex instanceof ProtocolException) {
623             code = HttpStatus.SC_BAD_REQUEST;
624         } else {
625             code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
626         }
627         String message = ex.getMessage();
628         if (message == null) {
629             message = ex.toString();
630         }
631         final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
632                 code, context);
633         return new ErrorResponseProducer(response,
634                 new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
635     }
636 
637     /**
638      * This method can be used to handle callback set up happened after
639      * response submission.
640      *
641      * @param cancellable Request cancellation callback.
642      * @param context Request context.
643      *
644      * @since 4.4
645      */
646     protected void handleAlreadySubmittedResponse(
647             final Cancellable cancellable, final HttpContext context) {
648         throw new IllegalStateException("Response already submitted");
649     }
650 
651     /**
652      * This method can be used to handle double response submission.
653      *
654      * @param responseProducer Response producer for second response.
655      * @param context Request context.
656      *
657      * @since 4.4
658      */
659     protected void handleAlreadySubmittedResponse(
660             final HttpAsyncResponseProducer responseProducer,
661             final HttpContext context) {
662         throw new IllegalStateException("Response already submitted");
663     }
664 
665     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
666         if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
667             return false;
668         }
669         final int status = response.getStatusLine().getStatusCode();
670         return status >= HttpStatus.SC_OK
671             && status != HttpStatus.SC_NO_CONTENT
672             && status != HttpStatus.SC_NOT_MODIFIED
673             && status != HttpStatus.SC_RESET_CONTENT;
674     }
675 
676     private void completeRequest(
677             final Incoming incoming,
678             final NHttpServerConnection conn,
679             final State state) throws IOException, HttpException {
680         state.setRequestState(MessageState.READY);
681         state.setIncoming(null);
682 
683         final PipelineEntry pipelineEntry;
684         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
685         try {
686             final HttpContext context = incoming.getContext();
687             consumer.requestCompleted(context);
688             pipelineEntry = new PipelineEntry(
689                     incoming.getRequest(),
690                     consumer.getResult(),
691                     consumer.getException(),
692                     incoming.getHandler(),
693                     context);
694         } finally {
695             consumer.close();
696         }
697         final Queue<PipelineEntry> pipeline = state.getPipeline();
698         pipeline.add(pipelineEntry);
699         if (state.getResponseState() == MessageState.READY) {
700             conn.requestOutput();
701         }
702     }
703 
704     private void commitFinalResponse(
705             final NHttpServerConnection conn,
706             final State state) throws IOException, HttpException {
707         final Outgoing outgoing = state.getOutgoing();
708         Asserts.notNull(outgoing, "Outgoing response");
709         final HttpRequest request = outgoing.getRequest();
710         final HttpResponse response = outgoing.getResponse();
711         final HttpContext context = outgoing.getContext();
712 
713         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
714         this.httpProcessor.process(response, context);
715 
716         HttpEntity entity = response.getEntity();
717         if (entity != null && !canResponseHaveBody(request, response)) {
718             response.setEntity(null);
719             entity = null;
720         }
721 
722         conn.submitResponse(response);
723 
724         if (entity == null) {
725             completeResponse(outgoing, conn, state);
726         } else {
727             state.setResponseState(MessageState.BODY_STREAM);
728         }
729     }
730 
731     private void completeResponse(
732             final Outgoing outgoing,
733             final NHttpServerConnection conn,
734             final State state) throws IOException, HttpException {
735         final HttpContext context = outgoing.getContext();
736         final HttpResponse response = outgoing.getResponse();
737         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
738         try {
739             responseProducer.responseCompleted(context);
740             state.setOutgoing(null);
741             state.setCancellable(null);
742             state.setResponseState(MessageState.READY);
743         } finally {
744             responseProducer.close();
745         }
746         if (!this.connStrategy.keepAlive(response, context)) {
747             conn.close();
748         } else {
749             conn.requestInput();
750         }
751     }
752 
753     @SuppressWarnings("unchecked")
754     private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
755         HttpAsyncRequestHandler<Object> handler = null;
756         if (this.handlerMapper != null) {
757             handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
758         }
759         if (handler == null) {
760             handler = new NullRequestHandler();
761         }
762         return handler;
763     }
764 
765     static class Incoming {
766 
767         private final HttpRequest request;
768         private final HttpAsyncRequestHandler<Object> handler;
769         private final HttpAsyncRequestConsumer<Object> consumer;
770         private final HttpContext context;
771 
772         Incoming(
773                 final HttpRequest request,
774                 final HttpAsyncRequestHandler<Object> handler,
775                 final HttpAsyncRequestConsumer<Object> consumer,
776                 final HttpContext context) {
777             this.request = request;
778             this.handler = handler;
779             this.consumer = consumer;
780             this.context = context;
781         }
782 
783         public HttpRequest getRequest() {
784             return this.request;
785         }
786 
787         public HttpAsyncRequestHandler<Object> getHandler() {
788             return this.handler;
789         }
790 
791         public HttpAsyncRequestConsumer<Object> getConsumer() {
792             return this.consumer;
793         }
794 
795         public HttpContext getContext() {
796             return this.context;
797         }
798     }
799 
800     static class Outgoing {
801 
802         private final HttpRequest request;
803         private final HttpResponse response;
804         private final HttpAsyncResponseProducer producer;
805         private final HttpContext context;
806 
807         Outgoing(
808                 final HttpRequest request,
809                 final HttpResponse response,
810                 final HttpAsyncResponseProducer producer,
811                 final HttpContext context) {
812             this.request = request;
813             this.response = response;
814             this.producer = producer;
815             this.context = context;
816         }
817 
818         public HttpRequest getRequest() {
819             return this.request;
820         }
821 
822         public HttpResponse getResponse() {
823             return this.response;
824         }
825 
826         public HttpAsyncResponseProducer getProducer() {
827             return this.producer;
828         }
829 
830         public HttpContext getContext() {
831             return this.context;
832         }
833     }
834 
835     static class PipelineEntry {
836 
837         private final HttpRequest request;
838         private final Object result;
839         private final Exception exception;
840         private final HttpAsyncRequestHandler<Object> handler;
841         private final HttpContext context;
842 
843         PipelineEntry(
844                 final HttpRequest request,
845                 final Object result,
846                 final Exception exception,
847                 final HttpAsyncRequestHandler<Object> handler,
848                 final HttpContext context) {
849             this.request = request;
850             this.result = result;
851             this.exception = exception;
852             this.handler = handler;
853             this.context = context;
854         }
855 
856         public HttpRequest getRequest() {
857             return this.request;
858         }
859 
860         public Object getResult() {
861             return this.result;
862         }
863 
864         public Exception getException() {
865             return this.exception;
866         }
867 
868         public HttpAsyncRequestHandler<Object> getHandler() {
869             return this.handler;
870         }
871 
872         public HttpContext getContext() {
873             return this.context;
874         }
875 
876     }
877 
878     static class State {
879 
880         private final Queue<PipelineEntry> pipeline;
881         private volatile boolean terminated;
882         private volatile MessageState requestState;
883         private volatile MessageState responseState;
884         private volatile Incoming incoming;
885         private volatile Outgoing outgoing;
886         private volatile Cancellable cancellable;
887 
888         State() {
889             super();
890             this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>();
891             this.requestState = MessageState.READY;
892             this.responseState = MessageState.READY;
893         }
894 
895         public boolean isTerminated() {
896             return this.terminated;
897         }
898 
899         public void setTerminated() {
900             this.terminated = true;
901         }
902 
903         public MessageState getRequestState() {
904             return this.requestState;
905         }
906 
907         public void setRequestState(final MessageState state) {
908             this.requestState = state;
909         }
910 
911         public MessageState getResponseState() {
912             return this.responseState;
913         }
914 
915         public void setResponseState(final MessageState state) {
916             this.responseState = state;
917         }
918 
919         public Incoming getIncoming() {
920             return this.incoming;
921         }
922 
923         public void setIncoming(final Incoming incoming) {
924             this.incoming = incoming;
925         }
926 
927         public Outgoing getOutgoing() {
928             return this.outgoing;
929         }
930 
931         public void setOutgoing(final Outgoing outgoing) {
932             this.outgoing = outgoing;
933         }
934 
935         public Cancellable getCancellable() {
936             return this.cancellable;
937         }
938 
939         public void setCancellable(final Cancellable cancellable) {
940             this.cancellable = cancellable;
941         }
942 
943         public Queue<PipelineEntry> getPipeline() {
944             return this.pipeline;
945         }
946 
947         @Override
948         public String toString() {
949             final StringBuilder buf = new StringBuilder();
950             buf.append("[incoming ");
951             buf.append(this.requestState);
952             if (this.incoming != null) {
953                 buf.append(" ");
954                 buf.append(this.incoming.getRequest().getRequestLine());
955             }
956             buf.append("; outgoing ");
957             buf.append(this.responseState);
958             if (this.outgoing != null) {
959                 buf.append(" ");
960                 buf.append(this.outgoing.getResponse().getStatusLine());
961             }
962             buf.append("]");
963             return buf.toString();
964         }
965 
966     }
967 
968     class HttpAsyncExchangeImpl implements HttpAsyncExchange {
969 
970         private final AtomicBoolean completed = new AtomicBoolean();
971         private final HttpRequest request;
972         private final HttpResponse response;
973         private final State state;
974         private final NHttpServerConnection conn;
975         private final HttpContext context;
976 
977         public HttpAsyncExchangeImpl(
978                 final HttpRequest request,
979                 final HttpResponse response,
980                 final State state,
981                 final NHttpServerConnection conn,
982                 final HttpContext context) {
983             super();
984             this.request = request;
985             this.response = response;
986             this.state = state;
987             this.conn = conn;
988             this.context = context;
989         }
990 
991         @Override
992         public HttpRequest getRequest() {
993             return this.request;
994         }
995 
996         @Override
997         public HttpResponse getResponse() {
998             return this.response;
999         }
1000 
1001         @Override
1002         public void setCallback(final Cancellable cancellable) {
1003             if (this.completed.get()) {
1004                 handleAlreadySubmittedResponse(cancellable, context);
1005             } else if (this.state.isTerminated() && cancellable != null) {
1006                 cancellable.cancel();
1007             } else {
1008                 this.state.setCancellable(cancellable);
1009             }
1010         }
1011 
1012         @Override
1013         public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
1014             Args.notNull(responseProducer, "Response producer");
1015             if (this.completed.getAndSet(true)) {
1016                 handleAlreadySubmittedResponse(responseProducer, context);
1017             } else if (!this.state.isTerminated()) {
1018                 final HttpResponse response = responseProducer.generateResponse();
1019                 final Outgoing outgoing = new Outgoing(
1020                         this.request, response, responseProducer, this.context);
1021 
1022                 synchronized (this.state) {
1023                     this.state.setOutgoing(outgoing);
1024                     this.state.setCancellable(null);
1025                     this.conn.requestOutput();
1026                 }
1027 
1028             } else {
1029                 try {
1030                     responseProducer.close();
1031                 } catch (final IOException ex) {
1032                     log(ex);
1033                 }
1034             }
1035         }
1036 
1037         @Override
1038         public void submitResponse() {
1039             submitResponse(new BasicAsyncResponseProducer(this.response));
1040         }
1041 
1042         @Override
1043         public boolean isCompleted() {
1044             return this.completed.get();
1045         }
1046 
1047         @Override
1048         public void setTimeout(final int timeout) {
1049             this.conn.setSocketTimeout(timeout);
1050         }
1051 
1052         @Override
1053         public int getTimeout() {
1054             return this.conn.getSocketTimeout();
1055         }
1056 
1057     }
1058 
1059     /**
1060      * Adaptor class to transition from HttpAsyncRequestHandlerResolver to HttpAsyncRequestHandlerMapper.
1061      */
1062     @Deprecated
1063     private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
1064 
1065         private final HttpAsyncRequestHandlerResolver resolver;
1066 
1067         public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
1068             this.resolver = resolver;
1069         }
1070 
1071         @Override
1072         public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
1073             return resolver.lookup(request.getRequestLine().getUri());
1074         }
1075 
1076     }
1077 
1078 }