View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  
35  import org.apache.http.ConnectionClosedException;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpEntity;
38  import org.apache.http.HttpEntityEnclosingRequest;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpRequest;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.HttpStatus;
43  import org.apache.http.HttpVersion;
44  import org.apache.http.ProtocolException;
45  import org.apache.http.ProtocolVersion;
46  import org.apache.http.annotation.ThreadingBehavior;
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.nio.ContentDecoder;
49  import org.apache.http.nio.ContentEncoder;
50  import org.apache.http.nio.NHttpClientConnection;
51  import org.apache.http.nio.NHttpClientEventHandler;
52  import org.apache.http.nio.NHttpConnection;
53  import org.apache.http.protocol.HttpContext;
54  import org.apache.http.util.Args;
55  import org.apache.http.util.Asserts;
56  
57  /**
58   * {@code HttpAsyncRequestExecutor} is a fully asynchronous HTTP client side
59   * protocol handler based on the NIO (non-blocking) I/O model.
60   * {@code HttpAsyncRequestExecutor} translates individual events fired through
61   * the {@link NHttpClientEventHandler} interface into logically related HTTP
62   * message exchanges.
63   * <p> The caller is expected to pass an instance of
64   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
65   * of HTTP message exchanges through the connection context using
66   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
67   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
68   * returns {@code true}. The {@link HttpAsyncRequester} utility class can
69   * be used to facilitate initiation of asynchronous HTTP request execution.
70   * <p>
71   * Individual {@code HttpAsyncClientExchangeHandler} are expected to make use of
72   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
73   * headers for all outgoing messages and apply common, cross-cutting message
74   * transformations to all incoming and outgoing messages.
75   * {@code HttpAsyncClientExchangeHandler}s can delegate implementation of
76   * application specific content generation and processing to
77   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
78   *
79   * @see HttpAsyncClientExchangeHandler
80   *
81   * @since 4.2
82   */
83  @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
84  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
85  
86      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
87      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
88  
89      private final int waitForContinue;
90      private final ExceptionLogger exceptionLogger;
91  
92      /**
93       * Creates new instance of {@code HttpAsyncRequestExecutor}.
94       * @param waitForContinue wait for continue time period.
95       * @param exceptionLogger Exception logger. If {@code null}
96       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
97       *   logger will be only used to log I/O exception thrown while closing
98       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
99       *
100      * @since 4.4
101      */
102     public HttpAsyncRequestExecutor(
103             final int waitForContinue,
104             final ExceptionLogger exceptionLogger) {
105         super();
106         this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
107         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
108     }
109 
110     /**
111      * Creates new instance of HttpAsyncRequestExecutor.
112      *
113      * @since 4.3
114      */
115     public HttpAsyncRequestExecutor(final int waitForContinue) {
116         this(waitForContinue, null);
117     }
118 
119     public HttpAsyncRequestExecutor() {
120         this(DEFAULT_WAIT_FOR_CONTINUE, null);
121     }
122 
123     @Override
124     public void connected(
125             final NHttpClientConnection conn,
126             final Object attachment) throws IOException, HttpException {
127         final State state = new State();
128         final HttpContext context = conn.getContext();
129         context.setAttribute(HTTP_EXCHANGE_STATE, state);
130         requestReady(conn);
131     }
132 
133     @Override
134     public void closed(final NHttpClientConnection conn) {
135         final State state = getState(conn);
136         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
137         if (state != null) {
138             if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
139                 if (handler != null) {
140                     handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
141                 }
142             }
143         }
144         if (state == null || (handler != null && handler.isDone())) {
145             closeHandler(handler);
146         }
147     }
148 
149     @Override
150     public void exception(
151             final NHttpClientConnection conn, final Exception cause) {
152         shutdownConnection(conn);
153         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
154         if (handler != null) {
155             handler.failed(cause);
156         } else {
157             log(cause);
158         }
159     }
160 
161     @Override
162     public void requestReady(
163             final NHttpClientConnection conn) throws IOException, HttpException {
164         final State state = getState(conn);
165         Asserts.notNull(state, "Connection state");
166         Asserts.check(state.getRequestState() == MessageState.READY ||
167                         state.getRequestState() == MessageState.COMPLETED,
168                 "Unexpected request state %s", state.getRequestState());
169 
170         if (state.getRequestState() == MessageState.COMPLETED) {
171             conn.suspendOutput();
172             return;
173         }
174         final HttpContext context = conn.getContext();
175         final HttpAsyncClientExchangeHandler handler;
176         synchronized (context) {
177             handler = getHandler(conn);
178             if (handler == null || handler.isDone()) {
179                 conn.suspendOutput();
180                 return;
181             }
182         }
183         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
184 
185         final HttpRequest request = handler.generateRequest();
186         if (request == null) {
187             conn.suspendOutput();
188             return;
189         }
190         final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
191         if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
192             throw new ProtocolException(version + " cannot be used with request pipelining");
193         }
194         state.setRequest(request);
195         if (pipelined) {
196             state.getRequestQueue().add(request);
197         }
198         if (request instanceof HttpEntityEnclosingRequest) {
199             final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
200             if (expectContinue && pipelined) {
201                 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
202             }
203             conn.submitRequest(request);
204             if (expectContinue) {
205                 final int timeout = conn.getSocketTimeout();
206                 state.setTimeout(timeout);
207                 conn.setSocketTimeout(this.waitForContinue);
208                 state.setRequestState(MessageState.ACK_EXPECTED);
209             } else {
210                 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
211                 if (entity != null) {
212                     state.setRequestState(MessageState.BODY_STREAM);
213                 } else {
214                     handler.requestCompleted();
215                     state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
216                 }
217             }
218         } else {
219             conn.submitRequest(request);
220             handler.requestCompleted();
221             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
222         }
223     }
224 
225     @Override
226     public void outputReady(
227             final NHttpClientConnection conn,
228             final ContentEncoder encoder) throws IOException, HttpException {
229         final State state = getState(conn);
230         Asserts.notNull(state, "Connection state");
231         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
232                         state.getRequestState() == MessageState.ACK_EXPECTED,
233                 "Unexpected request state %s", state.getRequestState());
234 
235         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
236         Asserts.notNull(handler, "Client exchange handler");
237         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
238             conn.suspendOutput();
239             return;
240         }
241         handler.produceContent(encoder, conn);
242         if (encoder.isCompleted()) {
243             handler.requestCompleted();
244             final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
245             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
246         }
247     }
248 
249     @Override
250     public void responseReceived(
251             final NHttpClientConnection conn) throws HttpException, IOException {
252         final State state = getState(conn);
253         Asserts.notNull(state, "Connection state");
254         Asserts.check(state.getResponseState() == MessageState.READY,
255                 "Unexpected request state %s", state.getResponseState());
256 
257         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
258         Asserts.notNull(handler, "Client exchange handler");
259 
260         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
261         final HttpRequest request;
262         if (pipelined) {
263             request = state.getRequestQueue().poll();
264             Asserts.notNull(request, "HTTP request");
265         } else {
266             request = state.getRequest();
267             if (request == null) {
268                 throw new HttpException("Out of sequence response");
269             }
270         }
271 
272         final HttpResponse response = conn.getHttpResponse();
273 
274         final int statusCode = response.getStatusLine().getStatusCode();
275         if (statusCode < HttpStatus.SC_OK) {
276             // 1xx intermediate response
277             if (statusCode != HttpStatus.SC_CONTINUE) {
278                 throw new ProtocolException(
279                         "Unexpected response: " + response.getStatusLine());
280             }
281             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
282                 final int timeout = state.getTimeout();
283                 conn.setSocketTimeout(timeout);
284                 conn.requestOutput();
285                 state.setRequestState(MessageState.BODY_STREAM);
286             }
287             return;
288         }
289         state.setResponse(response);
290         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
291             final int timeout = state.getTimeout();
292             conn.setSocketTimeout(timeout);
293             conn.resetOutput();
294             state.setRequestState(MessageState.COMPLETED);
295         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
296             // Early response
297             conn.resetOutput();
298             conn.suspendOutput();
299             state.setRequestState(MessageState.COMPLETED);
300             state.invalidate();
301         }
302 
303         if (canResponseHaveBody(request, response)) {
304             handler.responseReceived(response);
305             state.setResponseState(MessageState.BODY_STREAM);
306         } else {
307             response.setEntity(null);
308             handler.responseReceived(response);
309             conn.resetInput();
310             processResponse(conn, state, handler);
311         }
312     }
313 
314     @Override
315     public void inputReady(
316             final NHttpClientConnection conn,
317             final ContentDecoder decoder) throws IOException, HttpException {
318         final State state = getState(conn);
319         Asserts.notNull(state, "Connection state");
320         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
321                 "Unexpected request state %s", state.getResponseState());
322 
323         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
324         Asserts.notNull(handler, "Client exchange handler");
325         handler.consumeContent(decoder, conn);
326         if (decoder.isCompleted()) {
327             processResponse(conn, state, handler);
328         }
329     }
330 
331     @Override
332     public void endOfInput(final NHttpClientConnection conn) throws IOException {
333         final State state = getState(conn);
334         final HttpContext context = conn.getContext();
335         synchronized (context) {
336             if (state != null) {
337                 if (state.getRequestState().compareTo(MessageState.READY) != 0) {
338                     state.invalidate();
339                 }
340                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
341                 if (handler != null) {
342                     if (state.isValid()) {
343                         handler.inputTerminated();
344                     } else {
345                         handler.failed(new ConnectionClosedException("Connection closed"));
346                     }
347                 }
348             }
349             // Closing connection in an orderly manner and
350             // waiting for output buffer to get flushed.
351             // Do not want to wait indefinitely, though, in case
352             // the opposite end is not reading
353             if (conn.getSocketTimeout() <= 0) {
354                 conn.setSocketTimeout(1000);
355             }
356             conn.close();
357         }
358     }
359 
360     @Override
361     public void timeout(
362             final NHttpClientConnection conn) throws IOException {
363         final State state = getState(conn);
364         if (state != null) {
365             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
366                 final int timeout = state.getTimeout();
367                 conn.setSocketTimeout(timeout);
368                 conn.requestOutput();
369                 state.setRequestState(MessageState.BODY_STREAM);
370                 state.setTimeout(0);
371                 return;
372             } else {
373                 state.invalidate();
374                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
375                 if (handler != null) {
376                     handler.failed(new SocketTimeoutException());
377                     handler.close();
378                 }
379             }
380         }
381         if (conn.getStatus() == NHttpConnection.ACTIVE) {
382             conn.close();
383             if (conn.getStatus() == NHttpConnection.CLOSING) {
384                 // Give the connection some grace time to
385                 // close itself nicely
386                 conn.setSocketTimeout(250);
387             }
388         } else {
389             conn.shutdown();
390         }
391     }
392 
393     /**
394      * This method can be used to log I/O exception thrown while closing
395      * {@link java.io.Closeable} objects (such as
396      * {@link org.apache.http.HttpConnection}}).
397      *
398      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
399      */
400     protected void log(final Exception ex) {
401         this.exceptionLogger.log(ex);
402     }
403 
404     private static State getState(final NHttpConnection conn) {
405         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
406     }
407 
408     private static HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
409         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
410     }
411 
412     private void shutdownConnection(final NHttpConnection conn) {
413         try {
414             conn.shutdown();
415         } catch (final IOException ex) {
416             log(ex);
417         }
418     }
419 
420     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
421         if (handler != null) {
422             try {
423                 handler.close();
424             } catch (final IOException ioex) {
425                 log(ioex);
426             }
427         }
428     }
429 
430     private void processResponse(
431             final NHttpClientConnection conn,
432             final State state,
433             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
434         if (!state.isValid()) {
435             conn.close();
436         }
437         handler.responseCompleted();
438 
439         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
440         if (!pipelined) {
441             state.setRequestState(MessageState.READY);
442             state.setRequest(null);
443         }
444         state.setResponseState(MessageState.READY);
445         state.setResponse(null);
446         if (!handler.isDone() && conn.isOpen()) {
447             conn.requestOutput();
448         }
449     }
450 
451     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
452 
453         final String method = request.getRequestLine().getMethod();
454         final int status = response.getStatusLine().getStatusCode();
455 
456         if (method.equalsIgnoreCase("HEAD")) {
457             return false;
458         }
459         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
460             return false;
461         }
462         return status >= HttpStatus.SC_OK
463             && status != HttpStatus.SC_NO_CONTENT
464             && status != HttpStatus.SC_NOT_MODIFIED
465             && status != HttpStatus.SC_RESET_CONTENT;
466     }
467 
468     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
469 
470     static class State {
471 
472         private final Queue<HttpRequest> requestQueue;
473         private volatile MessageState requestState;
474         private volatile MessageState responseState;
475         private volatile HttpRequest request;
476         private volatile HttpResponse response;
477         private volatile boolean valid;
478         private volatile int timeout;
479 
480         State() {
481             super();
482             this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
483             this.valid = true;
484             this.requestState = MessageState.READY;
485             this.responseState = MessageState.READY;
486         }
487 
488         public MessageState getRequestState() {
489             return this.requestState;
490         }
491 
492         public void setRequestState(final MessageState state) {
493             this.requestState = state;
494         }
495 
496         public MessageState getResponseState() {
497             return this.responseState;
498         }
499 
500         public void setResponseState(final MessageState state) {
501             this.responseState = state;
502         }
503 
504         public HttpRequest getRequest() {
505             return this.request;
506         }
507 
508         public void setRequest(final HttpRequest request) {
509             this.request = request;
510         }
511 
512         public HttpResponse getResponse() {
513             return this.response;
514         }
515 
516         public void setResponse(final HttpResponse response) {
517             this.response = response;
518         }
519 
520         public Queue<HttpRequest> getRequestQueue() {
521             return this.requestQueue;
522         }
523 
524         public int getTimeout() {
525             return this.timeout;
526         }
527 
528         public void setTimeout(final int timeout) {
529             this.timeout = timeout;
530         }
531 
532         public boolean isValid() {
533             return this.valid;
534         }
535 
536         public void invalidate() {
537             this.valid = false;
538         }
539 
540         @Override
541         public String toString() {
542             final StringBuilder buf = new StringBuilder();
543             buf.append("request state: ");
544             buf.append(this.requestState);
545             buf.append("; request: ");
546             if (this.request != null) {
547                 buf.append(this.request.getRequestLine());
548             }
549             buf.append("; response state: ");
550             buf.append(this.responseState);
551             buf.append("; response: ");
552             if (this.response != null) {
553                 buf.append(this.response.getStatusLine());
554             }
555             buf.append("; valid: ");
556             buf.append(this.valid);
557             buf.append(";");
558             return buf.toString();
559         }
560 
561     }
562 
563 }