View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  
33  import org.apache.http.ConnectionClosedException;
34  import org.apache.http.ExceptionLogger;
35  import org.apache.http.HttpEntity;
36  import org.apache.http.HttpEntityEnclosingRequest;
37  import org.apache.http.HttpException;
38  import org.apache.http.HttpRequest;
39  import org.apache.http.HttpResponse;
40  import org.apache.http.HttpStatus;
41  import org.apache.http.HttpVersion;
42  import org.apache.http.ProtocolException;
43  import org.apache.http.ProtocolVersion;
44  import org.apache.http.annotation.Immutable;
45  import org.apache.http.nio.ContentDecoder;
46  import org.apache.http.nio.ContentEncoder;
47  import org.apache.http.nio.NHttpClientConnection;
48  import org.apache.http.nio.NHttpClientEventHandler;
49  import org.apache.http.nio.NHttpConnection;
50  import org.apache.http.protocol.HttpContext;
51  import org.apache.http.util.Args;
52  import org.apache.http.util.Asserts;
53  
54  /**
55   * <tt>HttpAsyncRequestExecutor</tt> is a fully asynchronous HTTP client side
56   * protocol handler based on the NIO (non-blocking) I/O model.
57   * <tt>HttpAsyncRequestExecutor</tt> translates individual events fired through
58   * the {@link NHttpClientEventHandler} interface into logically related HTTP
59   * message exchanges.
60   * <p/> The caller is expected to pass an instance of
61   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
62   * of HTTP message exchanges through the connection context using
63   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
64   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
65   * returns <code>true</code>. The {@link HttpAsyncRequester} utility class can
66   * be used to facilitate initiation of asynchronous HTTP request execution.
67   * <p/>
68   * Individual <tt>HttpAsyncClientExchangeHandler</tt> are expected to make use of
69   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
70   * headers for all outgoing messages and apply common, cross-cutting message
71   * transformations to all incoming and outgoing messages.
72   * <tt>HttpAsyncClientExchangeHandler</tt>s can delegate implementation of
73   * application specific content generation and processing to
74   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
75   *
76   * @see HttpAsyncClientExchangeHandler
77   *
78   * @since 4.2
79   */
80  @Immutable
81  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
82  
83      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
84      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
85  
86      private final int waitForContinue;
87      private final ExceptionLogger exceptionLogger;
88  
89      /**
90       * Creates new instance of <tt>HttpAsyncRequestExecutor</tt>.
91       * @param waitForContinue wait for continue time period.
92       * @param exceptionLogger Exception logger. If <code>null</code>
93       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
94       *   logger will be only used to log I/O exception thrown while closing
95       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
96       *
97       * @since 4.4
98       */
99      public HttpAsyncRequestExecutor(
100             final int waitForContinue,
101             final ExceptionLogger exceptionLogger) {
102         super();
103         this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
104         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
105     }
106 
107     /**
108      * Creates new instance of HttpAsyncRequestExecutor.
109      *
110      * @since 4.3
111      */
112     public HttpAsyncRequestExecutor(final int waitForContinue) {
113         this(waitForContinue, null);
114     }
115 
116     public HttpAsyncRequestExecutor() {
117         this(DEFAULT_WAIT_FOR_CONTINUE, null);
118     }
119 
120     @Override
121     public void connected(
122             final NHttpClientConnection conn,
123             final Object attachment) throws IOException, HttpException {
124         final State state = new State();
125         final HttpContext context = conn.getContext();
126         context.setAttribute(HTTP_EXCHANGE_STATE, state);
127         requestReady(conn);
128     }
129 
130     @Override
131     public void closed(final NHttpClientConnection conn) {
132         final State state = getState(conn);
133         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
134         if (state != null) {
135             if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
136                 if (handler != null) {
137                     handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
138                 }
139             }
140         }
141         if (state == null || (handler != null && handler.isDone())) {
142             closeHandler(handler);
143         }
144     }
145 
146     @Override
147     public void exception(
148             final NHttpClientConnection conn, final Exception cause) {
149         shutdownConnection(conn);
150         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
151         if (handler != null) {
152             handler.failed(cause);
153         } else {
154             log(cause);
155         }
156     }
157 
158     @Override
159     public void requestReady(
160             final NHttpClientConnection conn) throws IOException, HttpException {
161         final State state = getState(conn);
162         Asserts.notNull(state, "Connection state");
163         Asserts.check(state.getRequestState() == MessageState.READY ||
164                         state.getRequestState() == MessageState.COMPLETED,
165                 "Unexpected request state %s", state.getRequestState());
166 
167         if (state.getRequestState() != MessageState.READY) {
168             return;
169         }
170         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
171         if (handler == null || handler.isDone()) {
172             return;
173         }
174         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
175 
176         final HttpRequest request = handler.generateRequest();
177         if (request == null) {
178             return;
179         }
180         final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
181         if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
182             throw new ProtocolException(version + " cannot be used with request pipelining");
183         }
184         state.setRequest(request);
185 
186         if (request instanceof HttpEntityEnclosingRequest) {
187             final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
188             if (expectContinue && pipelined) {
189                 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
190             }
191             conn.submitRequest(request);
192             if (expectContinue) {
193                 final int timeout = conn.getSocketTimeout();
194                 state.setTimeout(timeout);
195                 conn.setSocketTimeout(this.waitForContinue);
196                 state.setRequestState(MessageState.ACK_EXPECTED);
197             } else {
198                 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
199                 if (entity != null) {
200                     state.setRequestState(MessageState.BODY_STREAM);
201                 } else {
202                     handler.requestCompleted();
203                     state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
204                 }
205             }
206         } else {
207             conn.submitRequest(request);
208             handler.requestCompleted();
209             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
210         }
211     }
212 
213     @Override
214     public void outputReady(
215             final NHttpClientConnection conn,
216             final ContentEncoder encoder) throws IOException, HttpException {
217         final State state = getState(conn);
218         Asserts.notNull(state, "Connection state");
219         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
220                         state.getRequestState() == MessageState.ACK_EXPECTED,
221                 "Unexpected request state %s", state.getRequestState());
222 
223         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
224         Asserts.notNull(handler, "Client exchange handler");
225         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
226             conn.suspendOutput();
227             return;
228         }
229         handler.produceContent(encoder, conn);
230         if (encoder.isCompleted()) {
231             handler.requestCompleted();
232             final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
233             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
234         }
235     }
236 
237     @Override
238     public void responseReceived(
239             final NHttpClientConnection conn) throws HttpException, IOException {
240         final State state = getState(conn);
241         Asserts.notNull(state, "Connection state");
242         Asserts.check(state.getResponseState() == MessageState.READY,
243                 "Unexpected request state %s", state.getResponseState());
244 
245         final HttpRequest request = state.getRequest();
246         if (request == null) {
247             throw new HttpException("Out of sequence response");
248         }
249         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
250         Asserts.notNull(handler, "Client exchange handler");
251         final HttpResponse response = conn.getHttpResponse();
252 
253         final int statusCode = response.getStatusLine().getStatusCode();
254         if (statusCode < HttpStatus.SC_OK) {
255             // 1xx intermediate response
256             if (statusCode != HttpStatus.SC_CONTINUE) {
257                 throw new ProtocolException(
258                         "Unexpected response: " + response.getStatusLine());
259             }
260             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
261                 final int timeout = state.getTimeout();
262                 conn.setSocketTimeout(timeout);
263                 conn.requestOutput();
264                 state.setRequestState(MessageState.BODY_STREAM);
265             }
266             return;
267         }
268         state.setResponse(response);
269         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
270             final int timeout = state.getTimeout();
271             conn.setSocketTimeout(timeout);
272             conn.resetOutput();
273             state.setRequestState(MessageState.COMPLETED);
274         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
275             // Early response
276             conn.resetOutput();
277             conn.suspendOutput();
278             state.setRequestState(MessageState.COMPLETED);
279             state.invalidate();
280         }
281 
282         handler.responseReceived(response);
283 
284         state.setResponseState(MessageState.BODY_STREAM);
285         if (!canResponseHaveBody(request, response)) {
286             response.setEntity(null);
287             conn.resetInput();
288             processResponse(conn, state, handler);
289         }
290     }
291 
292     @Override
293     public void inputReady(
294             final NHttpClientConnection conn,
295             final ContentDecoder decoder) throws IOException, HttpException {
296         final State state = getState(conn);
297         Asserts.notNull(state, "Connection state");
298         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
299                 "Unexpected request state %s", state.getResponseState());
300 
301         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
302         Asserts.notNull(handler, "Client exchange handler");
303         handler.consumeContent(decoder, conn);
304         if (decoder.isCompleted()) {
305             processResponse(conn, state, handler);
306         }
307     }
308 
309     @Override
310     public void endOfInput(final NHttpClientConnection conn) throws IOException {
311         final State state = getState(conn);
312         if (state != null) {
313             if (state.getRequestState().compareTo(MessageState.READY) != 0) {
314                 state.invalidate();
315             }
316             final HttpAsyncClientExchangeHandler handler = getHandler(conn);
317             if (handler != null) {
318                 if (state.isValid()) {
319                     handler.inputTerminated();
320                 } else {
321                     handler.failed(new ConnectionClosedException("Connection closed"));
322                 }
323             }
324         }
325         // Closing connection in an orderly manner and
326         // waiting for output buffer to get flushed.
327         // Do not want to wait indefinitely, though, in case
328         // the opposite end is not reading
329         if (conn.getSocketTimeout() <= 0) {
330             conn.setSocketTimeout(1000);
331         }
332         conn.close();
333     }
334 
335     @Override
336     public void timeout(
337             final NHttpClientConnection conn) throws IOException {
338         final State state = getState(conn);
339         if (state != null) {
340             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
341                 final int timeout = state.getTimeout();
342                 conn.setSocketTimeout(timeout);
343                 conn.requestOutput();
344                 state.setRequestState(MessageState.BODY_STREAM);
345                 state.setTimeout(0);
346                 return;
347             } else {
348                 state.invalidate();
349                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
350                 if (handler != null) {
351                     handler.failed(new SocketTimeoutException());
352                     handler.close();
353                 }
354             }
355         }
356         if (conn.getStatus() == NHttpConnection.ACTIVE) {
357             conn.close();
358             if (conn.getStatus() == NHttpConnection.CLOSING) {
359                 // Give the connection some grace time to
360                 // close itself nicely
361                 conn.setSocketTimeout(250);
362             }
363         } else {
364             conn.shutdown();
365         }
366     }
367 
368     /**
369      * This method can be used to log I/O exception thrown while closing
370      * {@link java.io.Closeable} objects (such as
371      * {@link org.apache.http.HttpConnection}}).
372      *
373      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
374      */
375     protected void log(final Exception ex) {
376         this.exceptionLogger.log(ex);
377     }
378 
379     private State getState(final NHttpConnection conn) {
380         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
381     }
382 
383     private HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
384         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
385     }
386 
387     private void shutdownConnection(final NHttpConnection conn) {
388         try {
389             conn.shutdown();
390         } catch (final IOException ex) {
391             log(ex);
392         }
393     }
394 
395     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
396         if (handler != null) {
397             try {
398                 handler.close();
399             } catch (final IOException ioex) {
400                 log(ioex);
401             }
402         }
403     }
404 
405     private void processResponse(
406             final NHttpClientConnection conn,
407             final State state,
408             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
409         if (!state.isValid()) {
410             conn.close();
411         }
412         handler.responseCompleted();
413 
414         final boolean pipelined = handler.getClass().getAnnotation(Pipelined.class) != null;
415         if (!pipelined) {
416             state.setRequestState(MessageState.READY);
417             state.setRequest(null);
418         }
419         state.setResponseState(MessageState.READY);
420         state.setResponse(null);
421         if (!handler.isDone() && conn.isOpen()) {
422             conn.requestOutput();
423         }
424     }
425 
426     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
427 
428         final String method = request.getRequestLine().getMethod();
429         final int status = response.getStatusLine().getStatusCode();
430 
431         if (method.equalsIgnoreCase("HEAD")) {
432             return false;
433         }
434         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
435             return false;
436         }
437         return status >= HttpStatus.SC_OK
438             && status != HttpStatus.SC_NO_CONTENT
439             && status != HttpStatus.SC_NOT_MODIFIED
440             && status != HttpStatus.SC_RESET_CONTENT;
441     }
442 
443     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
444 
445     static class State {
446 
447         private volatile MessageState requestState;
448         private volatile MessageState responseState;
449         private volatile HttpRequest request;
450         private volatile HttpResponse response;
451         private volatile boolean valid;
452         private volatile int timeout;
453 
454         State() {
455             super();
456             this.valid = true;
457             this.requestState = MessageState.READY;
458             this.responseState = MessageState.READY;
459         }
460 
461         public MessageState getRequestState() {
462             return this.requestState;
463         }
464 
465         public void setRequestState(final MessageState state) {
466             this.requestState = state;
467         }
468 
469         public MessageState getResponseState() {
470             return this.responseState;
471         }
472 
473         public void setResponseState(final MessageState state) {
474             this.responseState = state;
475         }
476 
477         public HttpRequest getRequest() {
478             return this.request;
479         }
480 
481         public void setRequest(final HttpRequest request) {
482             this.request = request;
483         }
484 
485         public HttpResponse getResponse() {
486             return this.response;
487         }
488 
489         public void setResponse(final HttpResponse response) {
490             this.response = response;
491         }
492 
493         public int getTimeout() {
494             return this.timeout;
495         }
496 
497         public void setTimeout(final int timeout) {
498             this.timeout = timeout;
499         }
500 
501         public boolean isValid() {
502             return this.valid;
503         }
504 
505         public void invalidate() {
506             this.valid = false;
507         }
508 
509         @Override
510         public String toString() {
511             final StringBuilder buf = new StringBuilder();
512             buf.append("request state: ");
513             buf.append(this.requestState);
514             buf.append("; request: ");
515             if (this.request != null) {
516                 buf.append(this.request.getRequestLine());
517             }
518             buf.append("; response state: ");
519             buf.append(this.responseState);
520             buf.append("; response: ");
521             if (this.response != null) {
522                 buf.append(this.response.getStatusLine());
523             }
524             buf.append("; valid: ");
525             buf.append(this.valid);
526             buf.append(";");
527             return buf.toString();
528         }
529 
530     }
531 
532 }