View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketException;
32  import java.net.SocketTimeoutException;
33  import java.util.Queue;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  
37  import org.apache.http.ConnectionReuseStrategy;
38  import org.apache.http.ExceptionLogger;
39  import org.apache.http.HttpEntity;
40  import org.apache.http.HttpEntityEnclosingRequest;
41  import org.apache.http.HttpException;
42  import org.apache.http.HttpRequest;
43  import org.apache.http.HttpResponse;
44  import org.apache.http.HttpResponseFactory;
45  import org.apache.http.HttpStatus;
46  import org.apache.http.HttpVersion;
47  import org.apache.http.MethodNotSupportedException;
48  import org.apache.http.ProtocolException;
49  import org.apache.http.UnsupportedHttpVersionException;
50  import org.apache.http.annotation.Contract;
51  import org.apache.http.annotation.ThreadingBehavior;
52  import org.apache.http.concurrent.Cancellable;
53  import org.apache.http.entity.ContentType;
54  import org.apache.http.impl.DefaultConnectionReuseStrategy;
55  import org.apache.http.impl.DefaultHttpResponseFactory;
56  import org.apache.http.nio.ContentDecoder;
57  import org.apache.http.nio.ContentEncoder;
58  import org.apache.http.nio.NHttpConnection;
59  import org.apache.http.nio.NHttpServerConnection;
60  import org.apache.http.nio.NHttpServerEventHandler;
61  import org.apache.http.nio.entity.NStringEntity;
62  import org.apache.http.nio.reactor.SessionBufferStatus;
63  import org.apache.http.params.HttpParams;
64  import org.apache.http.protocol.BasicHttpContext;
65  import org.apache.http.protocol.HttpContext;
66  import org.apache.http.protocol.HttpCoreContext;
67  import org.apache.http.protocol.HttpProcessor;
68  import org.apache.http.util.Args;
69  import org.apache.http.util.Asserts;
70  
71  /**
72   * {@code HttpAsyncService} is a fully asynchronous HTTP server side protocol
73   * handler based on the non-blocking (NIO) I/O model.
74   * {@code HttpAsyncServerProtocolHandler} translates individual events fired
75   * through the {@link NHttpServerEventHandler} interface into logically related
76   * HTTP message exchanges.
77   * <p>
78   * Upon receiving an incoming request {@code HttpAsyncService} verifies
79   * the message for compliance with the server expectations using
80   * {@link HttpAsyncExpectationVerifier}, if provided, and then
81   * {@link HttpAsyncRequestHandlerMapper} is used to map the request
82   * to a particular {@link HttpAsyncRequestHandler} intended to handle
83   * the request with the given URI. The protocol handler uses the selected
84   * {@link HttpAsyncRequestHandler} instance to process the incoming request
85   * and to generate an outgoing response.
86   * <p>
87   * {@code HttpAsyncService} relies on {@link HttpProcessor} to generate
88   * mandatory protocol headers for all outgoing messages and apply common,
89   * cross-cutting message transformations to all incoming and outgoing messages,
90   * whereas individual {@link HttpAsyncRequestHandler}s are expected
91   * to implement application specific content generation and processing.
92   * <p>
93   * Individual {@link HttpAsyncRequestHandler}s do not have to submit a response
94   * immediately. They can defer transmission of an HTTP response back to
95   * the client without blocking the I/O thread by delegating the process of
96   * request handling to another service or a worker thread. HTTP response can
97   * be submitted as a later a later point of time once response content becomes
98   * available.
99   *
100  * @since 4.2
101  */
102 @SuppressWarnings("deprecation")
103 @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
104 public class HttpAsyncService implements NHttpServerEventHandler {
105 
106     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
107 
108     private final HttpProcessor httpProcessor;
109     private final ConnectionReuseStrategy connectionStrategy;
110     private final HttpResponseFactory responseFactory;
111     private final HttpAsyncRequestHandlerMapper handlerMapper;
112     private final HttpAsyncExpectationVerifier expectationVerifier;
113     private final ExceptionLogger exceptionLogger;
114 
115     /**
116      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
117      *
118      * @param httpProcessor HTTP protocol processor (required).
119      * @param connStrategy Connection re-use strategy (required).
120      * @param responseFactory HTTP response factory (required).
121      * @param handlerResolver Request handler resolver.
122      * @param expectationVerifier Request expectation verifier (optional).
123      * @param params HTTP parameters (required).
124      *
125      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
126      *  ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
127      *    HttpAsyncExpectationVerifier)}
128      */
129     @Deprecated
130     public HttpAsyncService(
131             final HttpProcessor httpProcessor,
132             final ConnectionReuseStrategy connStrategy,
133             final HttpResponseFactory responseFactory,
134             final HttpAsyncRequestHandlerResolver handlerResolver,
135             final HttpAsyncExpectationVerifier expectationVerifier,
136             final HttpParams params) {
137         this(httpProcessor,
138              connStrategy,
139              responseFactory,
140              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
141              expectationVerifier);
142     }
143 
144     /**
145      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
146      *
147      * @param httpProcessor HTTP protocol processor (required).
148      * @param connStrategy Connection re-use strategy (required).
149      * @param handlerResolver Request handler resolver.
150      * @param params HTTP parameters (required).
151      *
152      * @deprecated (4.3) use {@link HttpAsyncService#HttpAsyncService(HttpProcessor,
153      *   ConnectionReuseStrategy, HttpResponseFactory, HttpAsyncRequestHandlerMapper,
154      *   HttpAsyncExpectationVerifier)}
155      */
156     @Deprecated
157     public HttpAsyncService(
158             final HttpProcessor httpProcessor,
159             final ConnectionReuseStrategy connStrategy,
160             final HttpAsyncRequestHandlerResolver handlerResolver,
161             final HttpParams params) {
162         this(httpProcessor,
163              connStrategy,
164              DefaultHttpResponseFactory.INSTANCE,
165              new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
166              null);
167     }
168 
169     /**
170      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
171      *
172      * @param httpProcessor HTTP protocol processor.
173      * @param connStrategy Connection re-use strategy. If {@code null}
174      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
175      * @param responseFactory HTTP response factory. If {@code null}
176      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
177      * @param handlerMapper Request handler mapper.
178      * @param expectationVerifier Request expectation verifier. May be {@code null}.
179      *
180      * @since 4.3
181      */
182     public HttpAsyncService(
183             final HttpProcessor httpProcessor,
184             final ConnectionReuseStrategy connStrategy,
185             final HttpResponseFactory responseFactory,
186             final HttpAsyncRequestHandlerMapper handlerMapper,
187             final HttpAsyncExpectationVerifier expectationVerifier) {
188         this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null);
189     }
190 
191     /**
192      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
193      *
194      * @param httpProcessor HTTP protocol processor.
195      * @param connStrategy Connection re-use strategy. If {@code null}
196      *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
197      * @param responseFactory HTTP response factory. If {@code null}
198      *   {@link DefaultHttpResponseFactory#INSTANCE} will be used.
199      * @param handlerMapper Request handler mapper.
200      * @param expectationVerifier Request expectation verifier. May be {@code null}.
201      * @param exceptionLogger Exception logger. If {@code null}
202      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
203      *   logger will be only used to log I/O exception thrown while closing
204      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
205      *
206      * @since 4.4
207      */
208     public HttpAsyncService(
209             final HttpProcessor httpProcessor,
210             final ConnectionReuseStrategy connStrategy,
211             final HttpResponseFactory responseFactory,
212             final HttpAsyncRequestHandlerMapper handlerMapper,
213             final HttpAsyncExpectationVerifier expectationVerifier,
214             final ExceptionLogger exceptionLogger) {
215         super();
216         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
217         this.connectionStrategy = connStrategy != null ? connStrategy :
218                 DefaultConnectionReuseStrategy.INSTANCE;
219         this.responseFactory = responseFactory != null ? responseFactory :
220                 DefaultHttpResponseFactory.INSTANCE;
221         this.handlerMapper = handlerMapper;
222         this.expectationVerifier = expectationVerifier;
223         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
224     }
225 
226     /**
227      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
228      *
229      * @param httpProcessor HTTP protocol processor.
230      * @param handlerMapper Request handler mapper.
231      *
232      * @since 4.3
233      */
234     public HttpAsyncService(
235             final HttpProcessor httpProcessor,
236             final HttpAsyncRequestHandlerMapper handlerMapper) {
237         this(httpProcessor, null, null, handlerMapper, null);
238     }
239 
240     /**
241      * Creates new instance of {@code HttpAsyncServerProtocolHandler}.
242      *
243      * @param httpProcessor HTTP protocol processor.
244      * @param handlerMapper Request handler mapper.
245      * @param exceptionLogger Exception logger. If {@code null}
246      *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
247      *   logger will be only used to log I/O exception thrown while closing
248      *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
249      *
250      * @since 4.4
251      */
252     public HttpAsyncService(
253             final HttpProcessor httpProcessor,
254             final HttpAsyncRequestHandlerMapper handlerMapper,
255             final ExceptionLogger exceptionLogger) {
256         this(httpProcessor, null, null, handlerMapper, null, exceptionLogger);
257     }
258 
259     @Override
260     public void connected(final NHttpServerConnection conn) {
261         final State state = new State();
262         conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
263     }
264 
265     @Override
266     public void closed(final NHttpServerConnection conn) {
267         final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE);
268         if (state != null) {
269             state.setTerminated();
270             closeHandlers(state);
271             final Cancellable cancellable = state.getCancellable();
272             if (cancellable != null) {
273                 cancellable.cancel();
274             }
275         }
276     }
277 
278     @Override
279     public void exception(
280             final NHttpServerConnection conn, final Exception cause) {
281         log(cause);
282         final State state = getState(conn);
283         if (state == null) {
284             shutdownConnection(conn);
285             return;
286         }
287         state.setTerminated();
288         closeHandlers(state, cause);
289         try {
290             final Cancellable cancellable = state.getCancellable();
291             if (cancellable != null) {
292                 cancellable.cancel();
293             }
294             if (cause instanceof SocketException || cause.getClass() == IOException.class) {
295                 // Transport layer is likely unreliable.
296                 conn.shutdown();
297                 return;
298             }
299             if (cause instanceof SocketTimeoutException) {
300                 // Connection timed out due to inactivity.
301                 conn.close();
302                 return;
303             }
304 
305             if (conn.isResponseSubmitted() || state.getResponseState().compareTo(MessageState.INIT) > 0) {
306                 // There is not much that we can do if a response has already been submitted.
307                 conn.close();
308                 return;
309             }
310             HttpRequest request = conn.getHttpRequest();
311             if (request == null) {
312                 final Incoming incoming = state.getIncoming();
313                 if (incoming != null) {
314                     request = incoming.getRequest();
315                 }
316             }
317             if (request == null) {
318                 final Queue<PipelineEntry> pipeline = state.getPipeline();
319                 final PipelineEntry pipelineEntry = pipeline.poll();
320                 if (pipelineEntry != null) {
321                     request = pipelineEntry.getRequest();
322                 }
323             }
324             if (request != null) {
325                 conn.resetInput();
326                 final HttpCoreContext context = HttpCoreContext.create();
327                 final HttpAsyncResponseProducer responseProducer = handleException(cause, context);
328                 final HttpResponse response = responseProducer.generateResponse();
329                 final Outgoing outgoing = new Outgoing(request, response, responseProducer, context);
330                 state.setResponseState(MessageState.INIT);
331                 state.setOutgoing(outgoing);
332                 commitFinalResponse(conn, state);
333                 return;
334             }
335             conn.close();
336         } catch (final Exception ex) {
337             shutdownConnection(conn);
338             closeHandlers(state);
339             if (ex instanceof RuntimeException) {
340                 throw (RuntimeException) ex;
341             }
342             log(ex);
343         }
344     }
345 
346     protected HttpResponse createHttpResponse(final int status, final HttpContext context) {
347         return this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1, status, context);
348     }
349 
350     @Override
351     public void requestReceived(
352             final NHttpServerConnection conn) throws IOException, HttpException {
353         final State state = getState(conn);
354         Asserts.notNull(state, "Connection state");
355         Asserts.check(state.getRequestState() == MessageState.READY,
356                 "Unexpected request state %s", state.getRequestState());
357 
358         final HttpRequest request = conn.getHttpRequest();
359         final HttpContext context = new BasicHttpContext();
360 
361         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
362         context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
363         this.httpProcessor.process(request, context);
364 
365         final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
366         final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
367         consumer.requestReceived(request);
368 
369         final Incoming incoming = new Incoming(request, requestHandler, consumer, context);
370         state.setIncoming(incoming);
371 
372         if (request instanceof HttpEntityEnclosingRequest) {
373 
374             // If 100-continue is expected make sure
375             // there is no pending response data, no pipelined requests or buffered input
376             if (((HttpEntityEnclosingRequest) request).expectContinue()
377                         && state.getResponseState() == MessageState.READY
378                         && state.getPipeline().isEmpty()
379                         && !(conn instanceof SessionBufferStatusrg/apache/http/nio/reactor/SessionBufferStatus.html#SessionBufferStatus">SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) {
380 
381                 state.setRequestState(MessageState.ACK_EXPECTED);
382                 final HttpResponse ack = createHttpResponse(HttpStatus.SC_CONTINUE, context);
383                 if (this.expectationVerifier != null) {
384                     conn.suspendInput();
385                     conn.suspendOutput();
386                     final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl(
387                             request, ack, state, conn, context);
388                     this.expectationVerifier.verify(httpAsyncExchange, context);
389                 } else {
390                     conn.submitResponse(ack);
391                     state.setRequestState(MessageState.BODY_STREAM);
392                 }
393             } else {
394                 state.setRequestState(MessageState.BODY_STREAM);
395             }
396         } else {
397             // No request content is expected. Process request right away
398             completeRequest(incoming, conn, state);
399         }
400     }
401 
402     @Override
403     public void inputReady(
404             final NHttpServerConnection conn,
405             final ContentDecoder decoder) throws IOException, HttpException {
406         final State state = getState(conn);
407         Asserts.notNull(state, "Connection state");
408         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM,
409                 "Unexpected request state %s", state.getRequestState());
410 
411         final Incoming incoming = state.getIncoming();
412         Asserts.notNull(incoming, "Incoming request");
413         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
414         consumer.consumeContent(decoder, conn);
415         if (decoder.isCompleted()) {
416             completeRequest(incoming, conn, state);
417         }
418     }
419 
420     @Override
421     public void responseReady(
422             final NHttpServerConnection conn) throws IOException, HttpException {
423         final State state = getState(conn);
424         Asserts.notNull(state, "Connection state");
425         Asserts.check(state.getResponseState() == MessageState.READY ||
426                         state.getResponseState() == MessageState.INIT,
427                 "Unexpected response state %s", state.getResponseState());
428 
429         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
430             final Outgoing outgoing;
431             synchronized (state) {
432                 outgoing = state.getOutgoing();
433                 if (outgoing == null) {
434                     conn.suspendOutput();
435                     return;
436                 }
437             }
438             final HttpResponse response = outgoing.getResponse();
439             final int status = response.getStatusLine().getStatusCode();
440             if (status == 100) {
441                 final HttpContext context = outgoing.getContext();
442                 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
443                 try {
444                     // Make sure 100 response has no entity
445                     response.setEntity(null);
446                     conn.requestInput();
447                     state.setRequestState(MessageState.BODY_STREAM);
448                     state.setOutgoing(null);
449                     conn.submitResponse(response);
450                     responseProducer.responseCompleted(context);
451                 } finally {
452                     responseProducer.close();
453                 }
454             } else if (status >= 400) {
455                 conn.resetInput();
456                 state.setRequestState(MessageState.READY);
457                 commitFinalResponse(conn, state);
458             } else {
459                 throw new HttpException("Invalid response: " + response.getStatusLine());
460             }
461         } else {
462             if (state.getResponseState() == MessageState.READY) {
463                 final Queue<PipelineEntry> pipeline = state.getPipeline();
464                 final PipelineEntry pipelineEntry = pipeline.poll();
465                 if (pipelineEntry == null) {
466                     conn.suspendOutput();
467                     return;
468                 }
469                 state.setResponseState(MessageState.INIT);
470                 final Object result = pipelineEntry.getResult();
471                 final HttpRequest request = pipelineEntry.getRequest();
472                 final HttpContext context = pipelineEntry.getContext();
473                 final HttpResponse response = createHttpResponse(HttpStatus.SC_OK, context);
474                 final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
475                         request, response, state, conn, context);
476                 if (result != null) {
477                     final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler();
478                     conn.suspendOutput();
479                     try {
480                         handler.handle(result, httpExchange, context);
481                     } catch (final RuntimeException ex) {
482                         throw ex;
483                     } catch (final Exception ex) {
484                         if (!httpExchange.isCompleted()) {
485                             httpExchange.submitResponse(handleException(ex, context));
486                         } else {
487                             log(ex);
488                             conn.close();
489                         }
490                         return;
491                     }
492                 } else {
493                     final Exception exception = pipelineEntry.getException();
494                     final HttpAsyncResponseProducer responseProducer = handleException(
495                             exception != null ? exception : new HttpException("Internal error processing request"),
496                             context);
497                     httpExchange.submitResponse(responseProducer);
498                 }
499             }
500             if (state.getResponseState() == MessageState.INIT) {
501                 final Outgoing outgoing;
502                 synchronized (state) {
503                     outgoing = state.getOutgoing();
504                     if (outgoing == null) {
505                         conn.suspendOutput();
506                         return;
507                     }
508                 }
509                 final HttpResponse response = outgoing.getResponse();
510                 final int status = response.getStatusLine().getStatusCode();
511                 if (status >= 200) {
512                     commitFinalResponse(conn, state);
513                 } else {
514                     throw new HttpException("Invalid response: " + response.getStatusLine());
515                 }
516             }
517         }
518     }
519 
520     @Override
521     public void outputReady(
522             final NHttpServerConnection conn,
523             final ContentEncoder encoder) throws HttpException, IOException {
524         final State state = getState(conn);
525         Asserts.notNull(state, "Connection state");
526         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
527                 "Unexpected response state %s", state.getResponseState());
528 
529         final Outgoing outgoing = state.getOutgoing();
530         Asserts.notNull(outgoing, "Outgoing response");
531         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
532 
533         responseProducer.produceContent(encoder, conn);
534 
535         if (encoder.isCompleted()) {
536             completeResponse(outgoing, conn, state);
537         }
538     }
539 
540     @Override
541     public void endOfInput(final NHttpServerConnection conn) throws IOException {
542         // Closing connection in an orderly manner and
543         // waiting for output buffer to get flushed.
544         // Do not want to wait indefinitely, though, in case
545         // the opposite end is not reading
546         if (conn.getSocketTimeout() <= 0) {
547             conn.setSocketTimeout(1000);
548         }
549         conn.close();
550     }
551 
552     @Override
553     public void timeout(final NHttpServerConnection conn) throws IOException {
554         final State state = getState(conn);
555         if (state != null) {
556             closeHandlers(state, new SocketTimeoutException(
557                     String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
558         }
559         if (conn.getStatus() == NHttpConnection.ACTIVE) {
560             conn.close();
561             if (conn.getStatus() == NHttpConnection.CLOSING) {
562                 // Give the connection some grace time to
563                 // close itself nicely
564                 conn.setSocketTimeout(250);
565             }
566         } else {
567             conn.shutdown();
568         }
569     }
570 
571     private State getState(final NHttpConnection conn) {
572         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
573     }
574 
575     /**
576      * This method can be used to log I/O exception thrown while closing
577      * {@link java.io.Closeable} objects (such as
578      * {@link org.apache.http.HttpConnection}).
579      *
580      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
581      */
582     protected void log(final Exception ex) {
583         this.exceptionLogger.log(ex);
584     }
585 
586     private void shutdownConnection(final NHttpConnection conn) {
587         try {
588             conn.shutdown();
589         } catch (final IOException ex) {
590             log(ex);
591         }
592     }
593 
594     private void closeHandlers(final State state, final Exception ex) {
595         final HttpAsyncRequestConsumer<Object> consumer =
596                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
597         if (consumer != null) {
598             try {
599                 consumer.failed(ex);
600             } finally {
601                 try {
602                     consumer.close();
603                 } catch (final IOException ioex) {
604                     log(ioex);
605                 }
606             }
607         }
608         final HttpAsyncResponseProducer producer =
609                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
610         if (producer != null) {
611             try {
612                 producer.failed(ex);
613             } finally {
614                 try {
615                     producer.close();
616                 } catch (final IOException ioex) {
617                     log(ioex);
618                 }
619             }
620         }
621     }
622 
623     private void closeHandlers(final State state) {
624         final HttpAsyncRequestConsumer<Object> consumer =
625                 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
626         if (consumer != null) {
627             try {
628                 consumer.close();
629             } catch (final IOException ioex) {
630                 log(ioex);
631             }
632         }
633         final HttpAsyncResponseProducer producer =
634                 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
635         if (producer != null) {
636             try {
637                 producer.close();
638             } catch (final IOException ioex) {
639                 log(ioex);
640             }
641         }
642     }
643 
644     protected HttpAsyncResponseProducer handleException(
645             final Exception ex, final HttpContext context) {
646         String message = ex.getMessage();
647         if (message == null) {
648             message = ex.toString();
649         }
650         final HttpResponse response = createHttpResponse(toStatusCode(ex, context), context);
651         return new ErrorResponseProducer(response, new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
652     }
653 
654     protected int toStatusCode(final Exception ex, final HttpContext context) {
655         final int code;
656         if (ex instanceof MethodNotSupportedException) {
657             code = HttpStatus.SC_NOT_IMPLEMENTED;
658         } else if (ex instanceof UnsupportedHttpVersionException) {
659             code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
660         } else if (ex instanceof ProtocolException) {
661             code = HttpStatus.SC_BAD_REQUEST;
662         } else {
663             code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
664         }
665         return code;
666     }
667 
668     /**
669      * This method can be used to handle callback set up happened after
670      * response submission.
671      *
672      * @param cancellable Request cancellation callback.
673      * @param context Request context.
674      *
675      * @since 4.4
676      */
677     protected void handleAlreadySubmittedResponse(
678             final Cancellable cancellable, final HttpContext context) {
679         throw new IllegalStateException("Response already submitted");
680     }
681 
682     /**
683      * This method can be used to handle double response submission.
684      *
685      * @param responseProducer Response producer for second response.
686      * @param context Request context.
687      *
688      * @since 4.4
689      */
690     protected void handleAlreadySubmittedResponse(
691             final HttpAsyncResponseProducer responseProducer,
692             final HttpContext context) {
693         throw new IllegalStateException("Response already submitted");
694     }
695 
696     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
697         if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
698             return false;
699         }
700         final int status = response.getStatusLine().getStatusCode();
701         return status >= HttpStatus.SC_OK
702             && status != HttpStatus.SC_NO_CONTENT
703             && status != HttpStatus.SC_NOT_MODIFIED
704             && status != HttpStatus.SC_RESET_CONTENT;
705     }
706 
707     private void completeRequest(
708             final Incoming incoming,
709             final NHttpServerConnection conn,
710             final State state) throws IOException {
711         state.setRequestState(MessageState.READY);
712         state.setIncoming(null);
713 
714         final PipelineEntry pipelineEntry;
715         final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
716         try {
717             final HttpContext context = incoming.getContext();
718             consumer.requestCompleted(context);
719             pipelineEntry = new PipelineEntry(
720                     incoming.getRequest(),
721                     consumer.getResult(),
722                     consumer.getException(),
723                     incoming.getHandler(),
724                     context);
725         } finally {
726             consumer.close();
727         }
728         final Queue<PipelineEntry> pipeline = state.getPipeline();
729         pipeline.add(pipelineEntry);
730         if (state.getResponseState() == MessageState.READY) {
731             conn.requestOutput();
732         }
733     }
734 
735     private void commitFinalResponse(
736             final NHttpServerConnection conn,
737             final State state) throws IOException, HttpException {
738         final Outgoing outgoing = state.getOutgoing();
739         Asserts.notNull(outgoing, "Outgoing response");
740         final HttpRequest request = outgoing.getRequest();
741         final HttpResponse response = outgoing.getResponse();
742         final HttpContext context = outgoing.getContext();
743 
744         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
745         this.httpProcessor.process(response, context);
746 
747         HttpEntity entity = response.getEntity();
748         if (entity != null && !canResponseHaveBody(request, response)) {
749             response.setEntity(null);
750             entity = null;
751         }
752 
753         conn.submitResponse(response);
754 
755         if (entity == null) {
756             completeResponse(outgoing, conn, state);
757         } else {
758             state.setResponseState(MessageState.BODY_STREAM);
759         }
760     }
761 
762     private void completeResponse(
763             final Outgoing outgoing,
764             final NHttpServerConnection conn,
765             final State state) throws IOException {
766         final HttpContext context = outgoing.getContext();
767         final HttpResponse response = outgoing.getResponse();
768         final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
769         try {
770             responseProducer.responseCompleted(context);
771             state.setOutgoing(null);
772             state.setCancellable(null);
773             state.setResponseState(MessageState.READY);
774         } finally {
775             responseProducer.close();
776         }
777         if (!this.connectionStrategy.keepAlive(response, context)) {
778             conn.close();
779         } else {
780             conn.requestInput();
781         }
782     }
783 
784     @SuppressWarnings("unchecked")
785     private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
786         HttpAsyncRequestHandler<Object> handler = null;
787         if (this.handlerMapper != null) {
788             handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
789         }
790         if (handler == null) {
791             handler = NullRequestHandler.INSTANCE;
792         }
793         return handler;
794     }
795 
796     static class Incoming {
797 
798         private final HttpRequest request;
799         private final HttpAsyncRequestHandler<Object> handler;
800         private final HttpAsyncRequestConsumer<Object> consumer;
801         private final HttpContext context;
802 
803         Incoming(
804                 final HttpRequest request,
805                 final HttpAsyncRequestHandler<Object> handler,
806                 final HttpAsyncRequestConsumer<Object> consumer,
807                 final HttpContext context) {
808             this.request = request;
809             this.handler = handler;
810             this.consumer = consumer;
811             this.context = context;
812         }
813 
814         public HttpRequest getRequest() {
815             return this.request;
816         }
817 
818         public HttpAsyncRequestHandler<Object> getHandler() {
819             return this.handler;
820         }
821 
822         public HttpAsyncRequestConsumer<Object> getConsumer() {
823             return this.consumer;
824         }
825 
826         public HttpContext getContext() {
827             return this.context;
828         }
829     }
830 
831     static class Outgoing {
832 
833         private final HttpRequest request;
834         private final HttpResponse response;
835         private final HttpAsyncResponseProducer producer;
836         private final HttpContext context;
837 
838         Outgoing(
839                 final HttpRequest request,
840                 final HttpResponse response,
841                 final HttpAsyncResponseProducer producer,
842                 final HttpContext context) {
843             this.request = request;
844             this.response = response;
845             this.producer = producer;
846             this.context = context;
847         }
848 
849         public HttpRequest getRequest() {
850             return this.request;
851         }
852 
853         public HttpResponse getResponse() {
854             return this.response;
855         }
856 
857         public HttpAsyncResponseProducer getProducer() {
858             return this.producer;
859         }
860 
861         public HttpContext getContext() {
862             return this.context;
863         }
864     }
865 
866     static class PipelineEntry {
867 
868         private final HttpRequest request;
869         private final Object result;
870         private final Exception exception;
871         private final HttpAsyncRequestHandler<Object> handler;
872         private final HttpContext context;
873 
874         PipelineEntry(
875                 final HttpRequest request,
876                 final Object result,
877                 final Exception exception,
878                 final HttpAsyncRequestHandler<Object> handler,
879                 final HttpContext context) {
880             this.request = request;
881             this.result = result;
882             this.exception = exception;
883             this.handler = handler;
884             this.context = context;
885         }
886 
887         public HttpRequest getRequest() {
888             return this.request;
889         }
890 
891         public Object getResult() {
892             return this.result;
893         }
894 
895         public Exception getException() {
896             return this.exception;
897         }
898 
899         public HttpAsyncRequestHandler<Object> getHandler() {
900             return this.handler;
901         }
902 
903         public HttpContext getContext() {
904             return this.context;
905         }
906 
907     }
908 
909     static class State {
910 
911         private final Queue<PipelineEntry> pipeline;
912         private volatile boolean terminated;
913         private volatile MessageState requestState;
914         private volatile MessageState responseState;
915         private volatile Incoming incoming;
916         private volatile Outgoing outgoing;
917         private volatile Cancellable cancellable;
918 
919         State() {
920             super();
921             this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>();
922             this.requestState = MessageState.READY;
923             this.responseState = MessageState.READY;
924         }
925 
926         public boolean isTerminated() {
927             return this.terminated;
928         }
929 
930         public void setTerminated() {
931             this.terminated = true;
932         }
933 
934         public MessageState getRequestState() {
935             return this.requestState;
936         }
937 
938         public void setRequestState(final MessageState state) {
939             this.requestState = state;
940         }
941 
942         public MessageState getResponseState() {
943             return this.responseState;
944         }
945 
946         public void setResponseState(final MessageState state) {
947             this.responseState = state;
948         }
949 
950         public Incoming getIncoming() {
951             return this.incoming;
952         }
953 
954         public void setIncoming(final Incoming incoming) {
955             this.incoming = incoming;
956         }
957 
958         public Outgoing getOutgoing() {
959             return this.outgoing;
960         }
961 
962         public void setOutgoing(final Outgoing outgoing) {
963             this.outgoing = outgoing;
964         }
965 
966         public Cancellable getCancellable() {
967             return this.cancellable;
968         }
969 
970         public void setCancellable(final Cancellable cancellable) {
971             this.cancellable = cancellable;
972         }
973 
974         public Queue<PipelineEntry> getPipeline() {
975             return this.pipeline;
976         }
977 
978         @Override
979         public String toString() {
980             final StringBuilder buf = new StringBuilder();
981             buf.append("[incoming ");
982             buf.append(this.requestState);
983             if (this.incoming != null) {
984                 buf.append(" ");
985                 buf.append(this.incoming.getRequest().getRequestLine());
986             }
987             buf.append("; outgoing ");
988             buf.append(this.responseState);
989             if (this.outgoing != null) {
990                 buf.append(" ");
991                 buf.append(this.outgoing.getResponse().getStatusLine());
992             }
993             buf.append("]");
994             return buf.toString();
995         }
996 
997     }
998 
999     class HttpAsyncExchangeImpl implements HttpAsyncExchange {
1000 
1001         private final AtomicBoolean completed = new AtomicBoolean();
1002         private final HttpRequest request;
1003         private final HttpResponse response;
1004         private final State state;
1005         private final NHttpServerConnection conn;
1006         private final HttpContext context;
1007 
1008         public HttpAsyncExchangeImpl(
1009                 final HttpRequest request,
1010                 final HttpResponse response,
1011                 final State state,
1012                 final NHttpServerConnection conn,
1013                 final HttpContext context) {
1014             super();
1015             this.request = request;
1016             this.response = response;
1017             this.state = state;
1018             this.conn = conn;
1019             this.context = context;
1020         }
1021 
1022         @Override
1023         public HttpRequest getRequest() {
1024             return this.request;
1025         }
1026 
1027         @Override
1028         public HttpResponse getResponse() {
1029             return this.response;
1030         }
1031 
1032         @Override
1033         public void setCallback(final Cancellable cancellable) {
1034             if (this.completed.get()) {
1035                 handleAlreadySubmittedResponse(cancellable, context);
1036             } else if (this.state.isTerminated() && cancellable != null) {
1037                 cancellable.cancel();
1038             } else {
1039                 this.state.setCancellable(cancellable);
1040             }
1041         }
1042 
1043         @Override
1044         public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
1045             Args.notNull(responseProducer, "Response producer");
1046             if (this.completed.getAndSet(true)) {
1047                 handleAlreadySubmittedResponse(responseProducer, context);
1048             } else if (!this.state.isTerminated()) {
1049                 final HttpResponse response = responseProducer.generateResponse();
1050                 final Outgoing outgoing = new Outgoing(
1051                         this.request, response, responseProducer, this.context);
1052 
1053                 synchronized (this.state) {
1054                     this.state.setOutgoing(outgoing);
1055                     this.state.setCancellable(null);
1056                     this.conn.requestOutput();
1057                 }
1058 
1059             } else {
1060                 try {
1061                     responseProducer.close();
1062                 } catch (final IOException ex) {
1063                     log(ex);
1064                 }
1065             }
1066         }
1067 
1068         @Override
1069         public void submitResponse() {
1070             submitResponse(new BasicAsyncResponseProducer(this.response));
1071         }
1072 
1073         @Override
1074         public boolean isCompleted() {
1075             return this.completed.get();
1076         }
1077 
1078         @Override
1079         public void setTimeout(final int timeout) {
1080             this.conn.setSocketTimeout(timeout);
1081         }
1082 
1083         @Override
1084         public int getTimeout() {
1085             return this.conn.getSocketTimeout();
1086         }
1087 
1088     }
1089 
1090     /**
1091      * Adaptor class to transition from HttpAsyncRequestHandlerResolver to HttpAsyncRequestHandlerMapper.
1092      *
1093      * @deprecated Do not use.
1094      */
1095     @Deprecated
1096     private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
1097 
1098         private final HttpAsyncRequestHandlerResolver resolver;
1099 
1100         public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
1101             this.resolver = resolver;
1102         }
1103 
1104         @Override
1105         public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
1106             return resolver.lookup(request.getRequestLine().getUri());
1107         }
1108 
1109     }
1110 
1111     /**
1112      * Gets the HttpResponseFactory for this service.
1113      *
1114      * @return  the HttpResponseFactory for this service.
1115      * @since 4.4.8
1116      */
1117     public HttpResponseFactory getResponseFactory() {
1118       return responseFactory;
1119     }
1120 
1121     /**
1122      * Gets the HttpProcessor for this service.
1123      *
1124      * @return the HttpProcessor for this service.
1125      * @since 4.4.9
1126      */
1127     public HttpProcessor getHttpProcessor() {
1128         return httpProcessor;
1129     }
1130 
1131     /**
1132      * Gets the ConnectionReuseStrategy for this service.
1133      *
1134      * @return the ConnectionReuseStrategy for this service.
1135      * @since 4.4.9
1136      */
1137     public ConnectionReuseStrategy getConnectionStrategy() {
1138         return connectionStrategy;
1139     }
1140 
1141     /**
1142      * Gets the HttpAsyncRequestHandlerMapper for this service.
1143      *
1144      * @return the HttpAsyncRequestHandlerMapper for this service.
1145      * @since 4.4.9
1146      */
1147     public HttpAsyncRequestHandlerMapper getHandlerMapper() {
1148         return handlerMapper;
1149     }
1150 
1151     /**
1152      * Gets the HttpAsyncExpectationVerifier for this service.
1153      *
1154      * @return the HttpAsyncExpectationVerifier for this service.
1155      * @since 4.4.9
1156      */
1157     public HttpAsyncExpectationVerifier getExpectationVerifier() {
1158         return expectationVerifier;
1159     }
1160 
1161     /**
1162      * Gets the ExceptionLogger for this service.
1163      *
1164      * @return the ExceptionLogger for this service.
1165      * @since 4.4.9
1166      */
1167     public ExceptionLogger getExceptionLogger() {
1168         return exceptionLogger;
1169     }
1170 
1171 }