View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  
35  import org.apache.http.ConnectionClosedException;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpEntity;
38  import org.apache.http.HttpEntityEnclosingRequest;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpRequest;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.HttpStatus;
43  import org.apache.http.HttpVersion;
44  import org.apache.http.ProtocolException;
45  import org.apache.http.ProtocolVersion;
46  import org.apache.http.annotation.Contract;
47  import org.apache.http.annotation.ThreadingBehavior;
48  import org.apache.http.nio.ContentDecoder;
49  import org.apache.http.nio.ContentEncoder;
50  import org.apache.http.nio.NHttpClientConnection;
51  import org.apache.http.nio.NHttpClientEventHandler;
52  import org.apache.http.nio.NHttpConnection;
53  import org.apache.http.protocol.HttpContext;
54  import org.apache.http.util.Args;
55  import org.apache.http.util.Asserts;
56  
57  /**
58   * {@code HttpAsyncRequestExecutor} is a fully asynchronous HTTP client side
59   * protocol handler based on the NIO (non-blocking) I/O model.
60   * {@code HttpAsyncRequestExecutor} translates individual events fired through
61   * the {@link NHttpClientEventHandler} interface into logically related HTTP
62   * message exchanges.
63   * <p> The caller is expected to pass an instance of
64   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
65   * of HTTP message exchanges through the connection context using
66   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
67   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
68   * returns {@code true}. The {@link HttpAsyncRequester} utility class can
69   * be used to facilitate initiation of asynchronous HTTP request execution.
70   * <p>
71   * Individual {@code HttpAsyncClientExchangeHandler} are expected to make use of
72   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
73   * headers for all outgoing messages and apply common, cross-cutting message
74   * transformations to all incoming and outgoing messages.
75   * {@code HttpAsyncClientExchangeHandler}s can delegate implementation of
76   * application specific content generation and processing to
77   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
78   *
79   * @see HttpAsyncClientExchangeHandler
80   *
81   * @since 4.2
82   */
83  @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
84  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
85  
86      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
87      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
88  
89      private final int waitForContinue;
90      private final ExceptionLogger exceptionLogger;
91  
92      /**
93       * Creates new instance of {@code HttpAsyncRequestExecutor}.
94       * @param waitForContinue wait for continue time period.
95       * @param exceptionLogger Exception logger. If {@code null}
96       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
97       *   logger will be only used to log I/O exception thrown while closing
98       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
99       *
100      * @since 4.4
101      */
102     public HttpAsyncRequestExecutor(
103             final int waitForContinue,
104             final ExceptionLogger exceptionLogger) {
105         super();
106         this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
107         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
108     }
109 
110     /**
111      * Creates new instance of HttpAsyncRequestExecutor.
112      *
113      * @since 4.3
114      */
115     public HttpAsyncRequestExecutor(final int waitForContinue) {
116         this(waitForContinue, null);
117     }
118 
119     public HttpAsyncRequestExecutor() {
120         this(DEFAULT_WAIT_FOR_CONTINUE, null);
121     }
122 
123     @Override
124     public void connected(
125             final NHttpClientConnection conn,
126             final Object attachment) throws IOException, HttpException {
127         final State state = new State();
128         final HttpContext context = conn.getContext();
129         context.setAttribute(HTTP_EXCHANGE_STATE, state);
130         requestReady(conn);
131     }
132 
133     @Override
134     public void closed(final NHttpClientConnection conn) {
135         final State state = getState(conn);
136         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
137         if (state != null) {
138             if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
139                 if (handler != null) {
140                     handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
141                 }
142             }
143         }
144         if (state == null || (handler != null && handler.isDone())) {
145             closeHandler(handler);
146         }
147     }
148 
149     @Override
150     public void exception(
151             final NHttpClientConnection conn, final Exception cause) {
152         shutdownConnection(conn);
153         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
154         if (handler != null) {
155             handler.failed(cause);
156         } else {
157             log(cause);
158         }
159     }
160 
161     @Override
162     public void requestReady(
163             final NHttpClientConnection conn) throws IOException, HttpException {
164         final State state = getState(conn);
165         Asserts.notNull(state, "Connection state");
166         Asserts.check(state.getRequestState() == MessageState.READY ||
167                         state.getRequestState() == MessageState.COMPLETED,
168                 "Unexpected request state %s", state.getRequestState());
169 
170         if (state.getRequestState() == MessageState.COMPLETED) {
171             conn.suspendOutput();
172             return;
173         }
174         final HttpContext context = conn.getContext();
175         final HttpAsyncClientExchangeHandler handler;
176         synchronized (context) {
177             handler = getHandler(conn);
178             if (handler == null || handler.isDone()) {
179                 conn.suspendOutput();
180                 return;
181             }
182         }
183         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
184 
185         final HttpRequest request = handler.generateRequest();
186         if (request == null) {
187             conn.suspendOutput();
188             return;
189         }
190         final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
191         if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
192             throw new ProtocolException(version + " cannot be used with request pipelining");
193         }
194         state.setRequest(request);
195         if (pipelined) {
196             state.getRequestQueue().add(request);
197         }
198         if (request instanceof HttpEntityEnclosingRequest) {
199             final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
200             if (expectContinue && pipelined) {
201                 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
202             }
203             conn.submitRequest(request);
204             if (expectContinue) {
205                 final int timeout = conn.getSocketTimeout();
206                 state.setTimeout(timeout);
207                 conn.setSocketTimeout(this.waitForContinue);
208                 state.setRequestState(MessageState.ACK_EXPECTED);
209             } else {
210                 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
211                 if (entity != null) {
212                     state.setRequestState(MessageState.BODY_STREAM);
213                 } else {
214                     handler.requestCompleted();
215                     state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
216                 }
217             }
218         } else {
219             conn.submitRequest(request);
220             handler.requestCompleted();
221             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
222         }
223     }
224 
225     @Override
226     public void outputReady(
227             final NHttpClientConnection conn,
228             final ContentEncoder encoder) throws IOException, HttpException {
229         final State state = getState(conn);
230         Asserts.notNull(state, "Connection state");
231         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
232                         state.getRequestState() == MessageState.ACK_EXPECTED,
233                 "Unexpected request state %s", state.getRequestState());
234 
235         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
236         Asserts.notNull(handler, "Client exchange handler");
237         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
238             conn.suspendOutput();
239             return;
240         }
241         handler.produceContent(encoder, conn);
242         if (encoder.isCompleted()) {
243             handler.requestCompleted();
244             final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
245             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
246         }
247     }
248 
249     @Override
250     public void responseReceived(
251             final NHttpClientConnection conn) throws HttpException, IOException {
252         final State state = getState(conn);
253         Asserts.notNull(state, "Connection state");
254         Asserts.check(state.getResponseState() == MessageState.READY,
255                 "Unexpected request state %s", state.getResponseState());
256 
257         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
258         Asserts.notNull(handler, "Client exchange handler");
259 
260         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
261         final HttpRequest request;
262         if (pipelined) {
263             request = state.getRequestQueue().poll();
264             Asserts.notNull(request, "HTTP request");
265         } else {
266             request = state.getRequest();
267             if (request == null) {
268                 throw new HttpException("Out of sequence response");
269             }
270         }
271 
272         final HttpResponse response = conn.getHttpResponse();
273 
274         final int statusCode = response.getStatusLine().getStatusCode();
275         if (statusCode < HttpStatus.SC_CONTINUE) {
276             throw new ProtocolException("Invalid response: " + response.getStatusLine());
277         }
278         if (statusCode < HttpStatus.SC_OK) {
279             // 1xx intermediate response
280             if (statusCode != HttpStatus.SC_CONTINUE) {
281                 throw new ProtocolException(
282                         "Unexpected response: " + response.getStatusLine());
283             }
284             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
285                 final int timeout = state.getTimeout();
286                 conn.setSocketTimeout(timeout);
287                 conn.requestOutput();
288                 state.setRequestState(MessageState.BODY_STREAM);
289             }
290             return;
291         }
292         state.setResponse(response);
293         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
294             final int timeout = state.getTimeout();
295             conn.setSocketTimeout(timeout);
296             conn.resetOutput();
297             state.setRequestState(MessageState.COMPLETED);
298         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
299             // Early response
300             if (statusCode >= 400) {
301                 conn.resetOutput();
302                 conn.suspendOutput();
303                 state.setRequestState(MessageState.COMPLETED);
304                 state.invalidate();
305             }
306         }
307 
308         if (canResponseHaveBody(request, response)) {
309             handler.responseReceived(response);
310             state.setResponseState(MessageState.BODY_STREAM);
311         } else {
312             response.setEntity(null);
313             handler.responseReceived(response);
314             conn.resetInput();
315             processResponse(conn, state, handler);
316         }
317     }
318 
319     @Override
320     public void inputReady(
321             final NHttpClientConnection conn,
322             final ContentDecoder decoder) throws IOException, HttpException {
323         final State state = getState(conn);
324         Asserts.notNull(state, "Connection state");
325         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
326                 "Unexpected request state %s", state.getResponseState());
327 
328         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
329         Asserts.notNull(handler, "Client exchange handler");
330         handler.consumeContent(decoder, conn);
331         if (decoder.isCompleted()) {
332             processResponse(conn, state, handler);
333         }
334     }
335 
336     @Override
337     public void endOfInput(final NHttpClientConnection conn) throws IOException {
338         final State state = getState(conn);
339         final HttpContext context = conn.getContext();
340         synchronized (context) {
341             if (state != null) {
342                 if (state.getRequestState().compareTo(MessageState.READY) != 0) {
343                     state.invalidate();
344                 }
345                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
346                 if (handler != null) {
347                     if (state.isValid()) {
348                         handler.inputTerminated();
349                     } else {
350                         handler.failed(new ConnectionClosedException("Connection closed"));
351                     }
352                 }
353             }
354             // Closing connection in an orderly manner and
355             // waiting for output buffer to get flushed.
356             // Do not want to wait indefinitely, though, in case
357             // the opposite end is not reading
358             if (conn.getSocketTimeout() <= 0) {
359                 conn.setSocketTimeout(1000);
360             }
361             conn.close();
362         }
363     }
364 
365     @Override
366     public void timeout(
367             final NHttpClientConnection conn) throws IOException {
368         final State state = getState(conn);
369         if (state != null) {
370             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
371                 final int timeout = state.getTimeout();
372                 conn.setSocketTimeout(timeout);
373                 conn.requestOutput();
374                 state.setRequestState(MessageState.BODY_STREAM);
375                 state.setTimeout(0);
376                 return;
377             }
378             state.invalidate();
379             final HttpAsyncClientExchangeHandler handler = getHandler(conn);
380             if (handler != null) {
381                 handler.failed(new SocketTimeoutException(
382                         String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
383                 handler.close();
384             }
385         }
386         if (conn.getStatus() == NHttpConnection.ACTIVE) {
387             conn.close();
388             if (conn.getStatus() == NHttpConnection.CLOSING) {
389                 // Give the connection some grace time to
390                 // close itself nicely
391                 conn.setSocketTimeout(250);
392             }
393         } else {
394             conn.shutdown();
395         }
396     }
397 
398     /**
399      * This method can be used to log I/O exception thrown while closing
400      * {@link java.io.Closeable} objects (such as
401      * {@link org.apache.http.HttpConnection}}).
402      *
403      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
404      */
405     protected void log(final Exception ex) {
406         this.exceptionLogger.log(ex);
407     }
408 
409     private static State getState(final NHttpConnection conn) {
410         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
411     }
412 
413     private static HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
414         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
415     }
416 
417     private void shutdownConnection(final NHttpConnection conn) {
418         try {
419             conn.shutdown();
420         } catch (final IOException ex) {
421             log(ex);
422         }
423     }
424 
425     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
426         if (handler != null) {
427             try {
428                 handler.close();
429             } catch (final IOException ioex) {
430                 log(ioex);
431             }
432         }
433     }
434 
435     private void processResponse(
436             final NHttpClientConnection conn,
437             final State state,
438             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
439         if (!state.isValid()) {
440             conn.close();
441         }
442         handler.responseCompleted();
443 
444         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
445         if (!pipelined) {
446             state.setRequestState(MessageState.READY);
447             state.setRequest(null);
448         }
449         state.setResponseState(MessageState.READY);
450         state.setResponse(null);
451         if (!handler.isDone() && conn.isOpen()) {
452             conn.requestOutput();
453         }
454     }
455 
456     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
457 
458         final String method = request.getRequestLine().getMethod();
459         final int status = response.getStatusLine().getStatusCode();
460 
461         if (method.equalsIgnoreCase("HEAD")) {
462             return false;
463         }
464         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
465             return false;
466         }
467         return status >= HttpStatus.SC_OK
468             && status != HttpStatus.SC_NO_CONTENT
469             && status != HttpStatus.SC_NOT_MODIFIED
470             && status != HttpStatus.SC_RESET_CONTENT;
471     }
472 
473     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
474 
475     static class State {
476 
477         private final Queue<HttpRequest> requestQueue;
478         private volatile MessageState requestState;
479         private volatile MessageState responseState;
480         private volatile HttpRequest request;
481         private volatile HttpResponse response;
482         private volatile boolean valid;
483         private volatile int timeout;
484 
485         State() {
486             super();
487             this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
488             this.valid = true;
489             this.requestState = MessageState.READY;
490             this.responseState = MessageState.READY;
491         }
492 
493         public MessageState getRequestState() {
494             return this.requestState;
495         }
496 
497         public void setRequestState(final MessageState state) {
498             this.requestState = state;
499         }
500 
501         public MessageState getResponseState() {
502             return this.responseState;
503         }
504 
505         public void setResponseState(final MessageState state) {
506             this.responseState = state;
507         }
508 
509         public HttpRequest getRequest() {
510             return this.request;
511         }
512 
513         public void setRequest(final HttpRequest request) {
514             this.request = request;
515         }
516 
517         public HttpResponse getResponse() {
518             return this.response;
519         }
520 
521         public void setResponse(final HttpResponse response) {
522             this.response = response;
523         }
524 
525         public Queue<HttpRequest> getRequestQueue() {
526             return this.requestQueue;
527         }
528 
529         public int getTimeout() {
530             return this.timeout;
531         }
532 
533         public void setTimeout(final int timeout) {
534             this.timeout = timeout;
535         }
536 
537         public boolean isValid() {
538             return this.valid;
539         }
540 
541         public void invalidate() {
542             this.valid = false;
543         }
544 
545         @Override
546         public String toString() {
547             final StringBuilder buf = new StringBuilder();
548             buf.append("request state: ");
549             buf.append(this.requestState);
550             buf.append("; request: ");
551             if (this.request != null) {
552                 buf.append(this.request.getRequestLine());
553             }
554             buf.append("; response state: ");
555             buf.append(this.responseState);
556             buf.append("; response: ");
557             if (this.response != null) {
558                 buf.append(this.response.getStatusLine());
559             }
560             buf.append("; valid: ");
561             buf.append(this.valid);
562             buf.append(";");
563             return buf.toString();
564         }
565 
566     }
567 
568 }