View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  
35  import org.apache.http.ConnectionClosedException;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpEntity;
38  import org.apache.http.HttpEntityEnclosingRequest;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpRequest;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.HttpStatus;
43  import org.apache.http.HttpVersion;
44  import org.apache.http.ProtocolException;
45  import org.apache.http.ProtocolVersion;
46  import org.apache.http.annotation.ThreadingBehavior;
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.nio.ContentDecoder;
49  import org.apache.http.nio.ContentEncoder;
50  import org.apache.http.nio.NHttpClientConnection;
51  import org.apache.http.nio.NHttpClientEventHandler;
52  import org.apache.http.nio.NHttpConnection;
53  import org.apache.http.protocol.HttpContext;
54  import org.apache.http.util.Args;
55  import org.apache.http.util.Asserts;
56  
57  /**
58   * {@code HttpAsyncRequestExecutor} is a fully asynchronous HTTP client side
59   * protocol handler based on the NIO (non-blocking) I/O model.
60   * {@code HttpAsyncRequestExecutor} translates individual events fired through
61   * the {@link NHttpClientEventHandler} interface into logically related HTTP
62   * message exchanges.
63   * <p> The caller is expected to pass an instance of
64   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
65   * of HTTP message exchanges through the connection context using
66   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
67   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
68   * returns {@code true}. The {@link HttpAsyncRequester} utility class can
69   * be used to facilitate initiation of asynchronous HTTP request execution.
70   * <p>
71   * Individual {@code HttpAsyncClientExchangeHandler} are expected to make use of
72   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
73   * headers for all outgoing messages and apply common, cross-cutting message
74   * transformations to all incoming and outgoing messages.
75   * {@code HttpAsyncClientExchangeHandler}s can delegate implementation of
76   * application specific content generation and processing to
77   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
78   *
79   * @see HttpAsyncClientExchangeHandler
80   *
81   * @since 4.2
82   */
83  @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
84  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
85  
86      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
87      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
88  
89      private final int waitForContinue;
90      private final ExceptionLogger exceptionLogger;
91  
92      /**
93       * Creates new instance of {@code HttpAsyncRequestExecutor}.
94       * @param waitForContinue wait for continue time period.
95       * @param exceptionLogger Exception logger. If {@code null}
96       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
97       *   logger will be only used to log I/O exception thrown while closing
98       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
99       *
100      * @since 4.4
101      */
102     public HttpAsyncRequestExecutor(
103             final int waitForContinue,
104             final ExceptionLogger exceptionLogger) {
105         super();
106         this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
107         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
108     }
109 
110     /**
111      * Creates new instance of HttpAsyncRequestExecutor.
112      *
113      * @since 4.3
114      */
115     public HttpAsyncRequestExecutor(final int waitForContinue) {
116         this(waitForContinue, null);
117     }
118 
119     public HttpAsyncRequestExecutor() {
120         this(DEFAULT_WAIT_FOR_CONTINUE, null);
121     }
122 
123     @Override
124     public void connected(
125             final NHttpClientConnection conn,
126             final Object attachment) throws IOException, HttpException {
127         final State state = new State();
128         final HttpContext context = conn.getContext();
129         context.setAttribute(HTTP_EXCHANGE_STATE, state);
130         requestReady(conn);
131     }
132 
133     @Override
134     public void closed(final NHttpClientConnection conn) {
135         final State state = getState(conn);
136         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
137         if (state != null) {
138             if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
139                 if (handler != null) {
140                     handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
141                 }
142             }
143         }
144         if (state == null || (handler != null && handler.isDone())) {
145             closeHandler(handler);
146         }
147     }
148 
149     @Override
150     public void exception(
151             final NHttpClientConnection conn, final Exception cause) {
152         shutdownConnection(conn);
153         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
154         if (handler != null) {
155             handler.failed(cause);
156         } else {
157             log(cause);
158         }
159     }
160 
161     @Override
162     public void requestReady(
163             final NHttpClientConnection conn) throws IOException, HttpException {
164         final State state = getState(conn);
165         Asserts.notNull(state, "Connection state");
166         Asserts.check(state.getRequestState() == MessageState.READY ||
167                         state.getRequestState() == MessageState.COMPLETED,
168                 "Unexpected request state %s", state.getRequestState());
169 
170         if (state.getRequestState() == MessageState.COMPLETED) {
171             conn.suspendOutput();
172             return;
173         }
174         final HttpContext context = conn.getContext();
175         final HttpAsyncClientExchangeHandler handler;
176         synchronized (context) {
177             handler = getHandler(conn);
178             if (handler == null || handler.isDone()) {
179                 conn.suspendOutput();
180                 return;
181             }
182         }
183         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
184 
185         final HttpRequest request = handler.generateRequest();
186         if (request == null) {
187             conn.suspendOutput();
188             return;
189         }
190         final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
191         if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
192             throw new ProtocolException(version + " cannot be used with request pipelining");
193         }
194         state.setRequest(request);
195         if (pipelined) {
196             state.getRequestQueue().add(request);
197         }
198         if (request instanceof HttpEntityEnclosingRequest) {
199             final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
200             if (expectContinue && pipelined) {
201                 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
202             }
203             conn.submitRequest(request);
204             if (expectContinue) {
205                 final int timeout = conn.getSocketTimeout();
206                 state.setTimeout(timeout);
207                 conn.setSocketTimeout(this.waitForContinue);
208                 state.setRequestState(MessageState.ACK_EXPECTED);
209             } else {
210                 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
211                 if (entity != null) {
212                     state.setRequestState(MessageState.BODY_STREAM);
213                 } else {
214                     handler.requestCompleted();
215                     state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
216                 }
217             }
218         } else {
219             conn.submitRequest(request);
220             handler.requestCompleted();
221             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
222         }
223     }
224 
225     @Override
226     public void outputReady(
227             final NHttpClientConnection conn,
228             final ContentEncoder encoder) throws IOException, HttpException {
229         final State state = getState(conn);
230         Asserts.notNull(state, "Connection state");
231         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
232                         state.getRequestState() == MessageState.ACK_EXPECTED,
233                 "Unexpected request state %s", state.getRequestState());
234 
235         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
236         Asserts.notNull(handler, "Client exchange handler");
237         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
238             conn.suspendOutput();
239             return;
240         }
241         handler.produceContent(encoder, conn);
242         if (encoder.isCompleted()) {
243             handler.requestCompleted();
244             final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
245             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
246         }
247     }
248 
249     @Override
250     public void responseReceived(
251             final NHttpClientConnection conn) throws HttpException, IOException {
252         final State state = getState(conn);
253         Asserts.notNull(state, "Connection state");
254         Asserts.check(state.getResponseState() == MessageState.READY,
255                 "Unexpected request state %s", state.getResponseState());
256 
257         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
258         Asserts.notNull(handler, "Client exchange handler");
259 
260         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
261         final HttpRequest request;
262         if (pipelined) {
263             request = state.getRequestQueue().poll();
264             Asserts.notNull(request, "HTTP request");
265         } else {
266             request = state.getRequest();
267             if (request == null) {
268                 throw new HttpException("Out of sequence response");
269             }
270         }
271 
272         final HttpResponse response = conn.getHttpResponse();
273 
274         final int statusCode = response.getStatusLine().getStatusCode();
275         if (statusCode < HttpStatus.SC_OK) {
276             // 1xx intermediate response
277             if (statusCode != HttpStatus.SC_CONTINUE) {
278                 throw new ProtocolException(
279                         "Unexpected response: " + response.getStatusLine());
280             }
281             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
282                 final int timeout = state.getTimeout();
283                 conn.setSocketTimeout(timeout);
284                 conn.requestOutput();
285                 state.setRequestState(MessageState.BODY_STREAM);
286             }
287             return;
288         }
289         state.setResponse(response);
290         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
291             final int timeout = state.getTimeout();
292             conn.setSocketTimeout(timeout);
293             conn.resetOutput();
294             state.setRequestState(MessageState.COMPLETED);
295         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
296             // Early response
297             conn.resetOutput();
298             conn.suspendOutput();
299             state.setRequestState(MessageState.COMPLETED);
300             state.invalidate();
301         }
302 
303         handler.responseReceived(response);
304 
305         state.setResponseState(MessageState.BODY_STREAM);
306         if (!canResponseHaveBody(request, response)) {
307             response.setEntity(null);
308             conn.resetInput();
309             processResponse(conn, state, handler);
310         }
311     }
312 
313     @Override
314     public void inputReady(
315             final NHttpClientConnection conn,
316             final ContentDecoder decoder) throws IOException, HttpException {
317         final State state = getState(conn);
318         Asserts.notNull(state, "Connection state");
319         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
320                 "Unexpected request state %s", state.getResponseState());
321 
322         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
323         Asserts.notNull(handler, "Client exchange handler");
324         handler.consumeContent(decoder, conn);
325         if (decoder.isCompleted()) {
326             processResponse(conn, state, handler);
327         }
328     }
329 
330     @Override
331     public void endOfInput(final NHttpClientConnection conn) throws IOException {
332         final State state = getState(conn);
333         final HttpContext context = conn.getContext();
334         synchronized (context) {
335             if (state != null) {
336                 if (state.getRequestState().compareTo(MessageState.READY) != 0) {
337                     state.invalidate();
338                 }
339                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
340                 if (handler != null) {
341                     if (state.isValid()) {
342                         handler.inputTerminated();
343                     } else {
344                         handler.failed(new ConnectionClosedException("Connection closed"));
345                     }
346                 }
347             }
348             // Closing connection in an orderly manner and
349             // waiting for output buffer to get flushed.
350             // Do not want to wait indefinitely, though, in case
351             // the opposite end is not reading
352             if (conn.getSocketTimeout() <= 0) {
353                 conn.setSocketTimeout(1000);
354             }
355             conn.close();
356         }
357     }
358 
359     @Override
360     public void timeout(
361             final NHttpClientConnection conn) throws IOException {
362         final State state = getState(conn);
363         if (state != null) {
364             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
365                 final int timeout = state.getTimeout();
366                 conn.setSocketTimeout(timeout);
367                 conn.requestOutput();
368                 state.setRequestState(MessageState.BODY_STREAM);
369                 state.setTimeout(0);
370                 return;
371             } else {
372                 state.invalidate();
373                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
374                 if (handler != null) {
375                     handler.failed(new SocketTimeoutException());
376                     handler.close();
377                 }
378             }
379         }
380         if (conn.getStatus() == NHttpConnection.ACTIVE) {
381             conn.close();
382             if (conn.getStatus() == NHttpConnection.CLOSING) {
383                 // Give the connection some grace time to
384                 // close itself nicely
385                 conn.setSocketTimeout(250);
386             }
387         } else {
388             conn.shutdown();
389         }
390     }
391 
392     /**
393      * This method can be used to log I/O exception thrown while closing
394      * {@link java.io.Closeable} objects (such as
395      * {@link org.apache.http.HttpConnection}}).
396      *
397      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
398      */
399     protected void log(final Exception ex) {
400         this.exceptionLogger.log(ex);
401     }
402 
403     private static State getState(final NHttpConnection conn) {
404         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
405     }
406 
407     private static HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
408         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
409     }
410 
411     private void shutdownConnection(final NHttpConnection conn) {
412         try {
413             conn.shutdown();
414         } catch (final IOException ex) {
415             log(ex);
416         }
417     }
418 
419     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
420         if (handler != null) {
421             try {
422                 handler.close();
423             } catch (final IOException ioex) {
424                 log(ioex);
425             }
426         }
427     }
428 
429     private void processResponse(
430             final NHttpClientConnection conn,
431             final State state,
432             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
433         if (!state.isValid()) {
434             conn.close();
435         }
436         handler.responseCompleted();
437 
438         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
439         if (!pipelined) {
440             state.setRequestState(MessageState.READY);
441             state.setRequest(null);
442         }
443         state.setResponseState(MessageState.READY);
444         state.setResponse(null);
445         if (!handler.isDone() && conn.isOpen()) {
446             conn.requestOutput();
447         }
448     }
449 
450     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
451 
452         final String method = request.getRequestLine().getMethod();
453         final int status = response.getStatusLine().getStatusCode();
454 
455         if (method.equalsIgnoreCase("HEAD")) {
456             return false;
457         }
458         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
459             return false;
460         }
461         return status >= HttpStatus.SC_OK
462             && status != HttpStatus.SC_NO_CONTENT
463             && status != HttpStatus.SC_NOT_MODIFIED
464             && status != HttpStatus.SC_RESET_CONTENT;
465     }
466 
467     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
468 
469     static class State {
470 
471         private final Queue<HttpRequest> requestQueue;
472         private volatile MessageState requestState;
473         private volatile MessageState responseState;
474         private volatile HttpRequest request;
475         private volatile HttpResponse response;
476         private volatile boolean valid;
477         private volatile int timeout;
478 
479         State() {
480             super();
481             this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
482             this.valid = true;
483             this.requestState = MessageState.READY;
484             this.responseState = MessageState.READY;
485         }
486 
487         public MessageState getRequestState() {
488             return this.requestState;
489         }
490 
491         public void setRequestState(final MessageState state) {
492             this.requestState = state;
493         }
494 
495         public MessageState getResponseState() {
496             return this.responseState;
497         }
498 
499         public void setResponseState(final MessageState state) {
500             this.responseState = state;
501         }
502 
503         public HttpRequest getRequest() {
504             return this.request;
505         }
506 
507         public void setRequest(final HttpRequest request) {
508             this.request = request;
509         }
510 
511         public HttpResponse getResponse() {
512             return this.response;
513         }
514 
515         public void setResponse(final HttpResponse response) {
516             this.response = response;
517         }
518 
519         public Queue<HttpRequest> getRequestQueue() {
520             return this.requestQueue;
521         }
522 
523         public int getTimeout() {
524             return this.timeout;
525         }
526 
527         public void setTimeout(final int timeout) {
528             this.timeout = timeout;
529         }
530 
531         public boolean isValid() {
532             return this.valid;
533         }
534 
535         public void invalidate() {
536             this.valid = false;
537         }
538 
539         @Override
540         public String toString() {
541             final StringBuilder buf = new StringBuilder();
542             buf.append("request state: ");
543             buf.append(this.requestState);
544             buf.append("; request: ");
545             if (this.request != null) {
546                 buf.append(this.request.getRequestLine());
547             }
548             buf.append("; response state: ");
549             buf.append(this.responseState);
550             buf.append("; response: ");
551             if (this.response != null) {
552                 buf.append(this.response.getStatusLine());
553             }
554             buf.append("; valid: ");
555             buf.append(this.valid);
556             buf.append(";");
557             return buf.toString();
558         }
559 
560     }
561 
562 }