View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  
35  import org.apache.http.ConnectionClosedException;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpEntity;
38  import org.apache.http.HttpEntityEnclosingRequest;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpRequest;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.HttpStatus;
43  import org.apache.http.HttpVersion;
44  import org.apache.http.ProtocolException;
45  import org.apache.http.ProtocolVersion;
46  import org.apache.http.annotation.Immutable;
47  import org.apache.http.nio.ContentDecoder;
48  import org.apache.http.nio.ContentEncoder;
49  import org.apache.http.nio.NHttpClientConnection;
50  import org.apache.http.nio.NHttpClientEventHandler;
51  import org.apache.http.nio.NHttpConnection;
52  import org.apache.http.protocol.HttpContext;
53  import org.apache.http.util.Args;
54  import org.apache.http.util.Asserts;
55  
56  /**
57   * {@code HttpAsyncRequestExecutor} is a fully asynchronous HTTP client side
58   * protocol handler based on the NIO (non-blocking) I/O model.
59   * {@code HttpAsyncRequestExecutor} translates individual events fired through
60   * the {@link NHttpClientEventHandler} interface into logically related HTTP
61   * message exchanges.
62   * <p> The caller is expected to pass an instance of
63   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
64   * of HTTP message exchanges through the connection context using
65   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
66   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
67   * returns {@code true}. The {@link HttpAsyncRequester} utility class can
68   * be used to facilitate initiation of asynchronous HTTP request execution.
69   * <p>
70   * Individual {@code HttpAsyncClientExchangeHandler} are expected to make use of
71   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
72   * headers for all outgoing messages and apply common, cross-cutting message
73   * transformations to all incoming and outgoing messages.
74   * {@code HttpAsyncClientExchangeHandler}s can delegate implementation of
75   * application specific content generation and processing to
76   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
77   *
78   * @see HttpAsyncClientExchangeHandler
79   *
80   * @since 4.2
81   */
82  @Immutable
83  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
84  
85      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
86      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
87  
88      private final int waitForContinue;
89      private final ExceptionLogger exceptionLogger;
90  
91      /**
92       * Creates new instance of {@code HttpAsyncRequestExecutor}.
93       * @param waitForContinue wait for continue time period.
94       * @param exceptionLogger Exception logger. If {@code null}
95       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
96       *   logger will be only used to log I/O exception thrown while closing
97       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
98       *
99       * @since 4.4
100      */
101     public HttpAsyncRequestExecutor(
102             final int waitForContinue,
103             final ExceptionLogger exceptionLogger) {
104         super();
105         this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
106         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
107     }
108 
109     /**
110      * Creates new instance of HttpAsyncRequestExecutor.
111      *
112      * @since 4.3
113      */
114     public HttpAsyncRequestExecutor(final int waitForContinue) {
115         this(waitForContinue, null);
116     }
117 
118     public HttpAsyncRequestExecutor() {
119         this(DEFAULT_WAIT_FOR_CONTINUE, null);
120     }
121 
122     @Override
123     public void connected(
124             final NHttpClientConnection conn,
125             final Object attachment) throws IOException, HttpException {
126         final State state = new State();
127         final HttpContext context = conn.getContext();
128         context.setAttribute(HTTP_EXCHANGE_STATE, state);
129         requestReady(conn);
130     }
131 
132     @Override
133     public void closed(final NHttpClientConnection conn) {
134         final State state = getState(conn);
135         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
136         if (state != null) {
137             if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
138                 if (handler != null) {
139                     handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
140                 }
141             }
142         }
143         if (state == null || (handler != null && handler.isDone())) {
144             closeHandler(handler);
145         }
146     }
147 
148     @Override
149     public void exception(
150             final NHttpClientConnection conn, final Exception cause) {
151         shutdownConnection(conn);
152         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
153         if (handler != null) {
154             handler.failed(cause);
155         } else {
156             log(cause);
157         }
158     }
159 
160     @Override
161     public void requestReady(
162             final NHttpClientConnection conn) throws IOException, HttpException {
163         final State state = getState(conn);
164         Asserts.notNull(state, "Connection state");
165         Asserts.check(state.getRequestState() == MessageState.READY ||
166                         state.getRequestState() == MessageState.COMPLETED,
167                 "Unexpected request state %s", state.getRequestState());
168 
169         if (state.getRequestState() == MessageState.COMPLETED) {
170             conn.suspendOutput();
171             return;
172         }
173         final HttpContext context = conn.getContext();
174         final HttpAsyncClientExchangeHandler handler;
175         synchronized (context) {
176             handler = getHandler(conn);
177             if (handler == null || handler.isDone()) {
178                 conn.suspendOutput();
179                 return;
180             }
181         }
182         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
183 
184         final HttpRequest request = handler.generateRequest();
185         if (request == null) {
186             conn.suspendOutput();
187             return;
188         }
189         final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
190         if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
191             throw new ProtocolException(version + " cannot be used with request pipelining");
192         }
193         state.setRequest(request);
194         if (pipelined) {
195             state.getRequestQueue().add(request);
196         }
197         if (request instanceof HttpEntityEnclosingRequest) {
198             final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
199             if (expectContinue && pipelined) {
200                 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
201             }
202             conn.submitRequest(request);
203             if (expectContinue) {
204                 final int timeout = conn.getSocketTimeout();
205                 state.setTimeout(timeout);
206                 conn.setSocketTimeout(this.waitForContinue);
207                 state.setRequestState(MessageState.ACK_EXPECTED);
208             } else {
209                 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
210                 if (entity != null) {
211                     state.setRequestState(MessageState.BODY_STREAM);
212                 } else {
213                     handler.requestCompleted();
214                     state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
215                 }
216             }
217         } else {
218             conn.submitRequest(request);
219             handler.requestCompleted();
220             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
221         }
222     }
223 
224     @Override
225     public void outputReady(
226             final NHttpClientConnection conn,
227             final ContentEncoder encoder) throws IOException, HttpException {
228         final State state = getState(conn);
229         Asserts.notNull(state, "Connection state");
230         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
231                         state.getRequestState() == MessageState.ACK_EXPECTED,
232                 "Unexpected request state %s", state.getRequestState());
233 
234         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
235         Asserts.notNull(handler, "Client exchange handler");
236         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
237             conn.suspendOutput();
238             return;
239         }
240         handler.produceContent(encoder, conn);
241         if (encoder.isCompleted()) {
242             handler.requestCompleted();
243             final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
244             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
245         }
246     }
247 
248     @Override
249     public void responseReceived(
250             final NHttpClientConnection conn) throws HttpException, IOException {
251         final State state = getState(conn);
252         Asserts.notNull(state, "Connection state");
253         Asserts.check(state.getResponseState() == MessageState.READY,
254                 "Unexpected request state %s", state.getResponseState());
255 
256         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
257         Asserts.notNull(handler, "Client exchange handler");
258 
259         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
260         final HttpRequest request;
261         if (pipelined) {
262             request = state.getRequestQueue().poll();
263             Asserts.notNull(request, "HTTP request");
264         } else {
265             request = state.getRequest();
266             if (request == null) {
267                 throw new HttpException("Out of sequence response");
268             }
269         }
270 
271         final HttpResponse response = conn.getHttpResponse();
272 
273         final int statusCode = response.getStatusLine().getStatusCode();
274         if (statusCode < HttpStatus.SC_OK) {
275             // 1xx intermediate response
276             if (statusCode != HttpStatus.SC_CONTINUE) {
277                 throw new ProtocolException(
278                         "Unexpected response: " + response.getStatusLine());
279             }
280             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
281                 final int timeout = state.getTimeout();
282                 conn.setSocketTimeout(timeout);
283                 conn.requestOutput();
284                 state.setRequestState(MessageState.BODY_STREAM);
285             }
286             return;
287         }
288         state.setResponse(response);
289         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
290             final int timeout = state.getTimeout();
291             conn.setSocketTimeout(timeout);
292             conn.resetOutput();
293             state.setRequestState(MessageState.COMPLETED);
294         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
295             // Early response
296             conn.resetOutput();
297             conn.suspendOutput();
298             state.setRequestState(MessageState.COMPLETED);
299             state.invalidate();
300         }
301 
302         handler.responseReceived(response);
303 
304         state.setResponseState(MessageState.BODY_STREAM);
305         if (!canResponseHaveBody(request, response)) {
306             response.setEntity(null);
307             conn.resetInput();
308             processResponse(conn, state, handler);
309         }
310     }
311 
312     @Override
313     public void inputReady(
314             final NHttpClientConnection conn,
315             final ContentDecoder decoder) throws IOException, HttpException {
316         final State state = getState(conn);
317         Asserts.notNull(state, "Connection state");
318         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
319                 "Unexpected request state %s", state.getResponseState());
320 
321         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
322         Asserts.notNull(handler, "Client exchange handler");
323         handler.consumeContent(decoder, conn);
324         if (decoder.isCompleted()) {
325             processResponse(conn, state, handler);
326         }
327     }
328 
329     @Override
330     public void endOfInput(final NHttpClientConnection conn) throws IOException {
331         final State state = getState(conn);
332         if (state != null) {
333             if (state.getRequestState().compareTo(MessageState.READY) != 0) {
334                 state.invalidate();
335             }
336             final HttpAsyncClientExchangeHandler handler = getHandler(conn);
337             if (handler != null) {
338                 if (state.isValid()) {
339                     handler.inputTerminated();
340                 } else {
341                     handler.failed(new ConnectionClosedException("Connection closed"));
342                 }
343             }
344         }
345         // Closing connection in an orderly manner and
346         // waiting for output buffer to get flushed.
347         // Do not want to wait indefinitely, though, in case
348         // the opposite end is not reading
349         if (conn.getSocketTimeout() <= 0) {
350             conn.setSocketTimeout(1000);
351         }
352         conn.close();
353     }
354 
355     @Override
356     public void timeout(
357             final NHttpClientConnection conn) throws IOException {
358         final State state = getState(conn);
359         if (state != null) {
360             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
361                 final int timeout = state.getTimeout();
362                 conn.setSocketTimeout(timeout);
363                 conn.requestOutput();
364                 state.setRequestState(MessageState.BODY_STREAM);
365                 state.setTimeout(0);
366                 return;
367             } else {
368                 state.invalidate();
369                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
370                 if (handler != null) {
371                     handler.failed(new SocketTimeoutException());
372                     handler.close();
373                 }
374             }
375         }
376         if (conn.getStatus() == NHttpConnection.ACTIVE) {
377             conn.close();
378             if (conn.getStatus() == NHttpConnection.CLOSING) {
379                 // Give the connection some grace time to
380                 // close itself nicely
381                 conn.setSocketTimeout(250);
382             }
383         } else {
384             conn.shutdown();
385         }
386     }
387 
388     /**
389      * This method can be used to log I/O exception thrown while closing
390      * {@link java.io.Closeable} objects (such as
391      * {@link org.apache.http.HttpConnection}}).
392      *
393      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
394      */
395     protected void log(final Exception ex) {
396         this.exceptionLogger.log(ex);
397     }
398 
399     private State getState(final NHttpConnection conn) {
400         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
401     }
402 
403     private HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
404         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
405     }
406 
407     private void shutdownConnection(final NHttpConnection conn) {
408         try {
409             conn.shutdown();
410         } catch (final IOException ex) {
411             log(ex);
412         }
413     }
414 
415     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
416         if (handler != null) {
417             try {
418                 handler.close();
419             } catch (final IOException ioex) {
420                 log(ioex);
421             }
422         }
423     }
424 
425     private void processResponse(
426             final NHttpClientConnection conn,
427             final State state,
428             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
429         if (!state.isValid()) {
430             conn.close();
431         }
432         handler.responseCompleted();
433 
434         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
435         if (!pipelined) {
436             state.setRequestState(MessageState.READY);
437             state.setRequest(null);
438         }
439         state.setResponseState(MessageState.READY);
440         state.setResponse(null);
441         if (!handler.isDone() && conn.isOpen()) {
442             conn.requestOutput();
443         }
444     }
445 
446     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
447 
448         final String method = request.getRequestLine().getMethod();
449         final int status = response.getStatusLine().getStatusCode();
450 
451         if (method.equalsIgnoreCase("HEAD")) {
452             return false;
453         }
454         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
455             return false;
456         }
457         return status >= HttpStatus.SC_OK
458             && status != HttpStatus.SC_NO_CONTENT
459             && status != HttpStatus.SC_NOT_MODIFIED
460             && status != HttpStatus.SC_RESET_CONTENT;
461     }
462 
463     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
464 
465     static class State {
466 
467         private final Queue<HttpRequest> requestQueue;
468         private volatile MessageState requestState;
469         private volatile MessageState responseState;
470         private volatile HttpRequest request;
471         private volatile HttpResponse response;
472         private volatile boolean valid;
473         private volatile int timeout;
474 
475         State() {
476             super();
477             this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
478             this.valid = true;
479             this.requestState = MessageState.READY;
480             this.responseState = MessageState.READY;
481         }
482 
483         public MessageState getRequestState() {
484             return this.requestState;
485         }
486 
487         public void setRequestState(final MessageState state) {
488             this.requestState = state;
489         }
490 
491         public MessageState getResponseState() {
492             return this.responseState;
493         }
494 
495         public void setResponseState(final MessageState state) {
496             this.responseState = state;
497         }
498 
499         public HttpRequest getRequest() {
500             return this.request;
501         }
502 
503         public void setRequest(final HttpRequest request) {
504             this.request = request;
505         }
506 
507         public HttpResponse getResponse() {
508             return this.response;
509         }
510 
511         public void setResponse(final HttpResponse response) {
512             this.response = response;
513         }
514 
515         public Queue<HttpRequest> getRequestQueue() {
516             return this.requestQueue;
517         }
518 
519         public int getTimeout() {
520             return this.timeout;
521         }
522 
523         public void setTimeout(final int timeout) {
524             this.timeout = timeout;
525         }
526 
527         public boolean isValid() {
528             return this.valid;
529         }
530 
531         public void invalidate() {
532             this.valid = false;
533         }
534 
535         @Override
536         public String toString() {
537             final StringBuilder buf = new StringBuilder();
538             buf.append("request state: ");
539             buf.append(this.requestState);
540             buf.append("; request: ");
541             if (this.request != null) {
542                 buf.append(this.request.getRequestLine());
543             }
544             buf.append("; response state: ");
545             buf.append(this.responseState);
546             buf.append("; response: ");
547             if (this.response != null) {
548                 buf.append(this.response.getStatusLine());
549             }
550             buf.append("; valid: ");
551             buf.append(this.valid);
552             buf.append(";");
553             return buf.toString();
554         }
555 
556     }
557 
558 }