View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.Closeable;
31  import java.io.IOException;
32  import java.net.SocketTimeoutException;
33  
34  import org.apache.http.ConnectionClosedException;
35  import org.apache.http.HttpConnection;
36  import org.apache.http.HttpEntityEnclosingRequest;
37  import org.apache.http.HttpException;
38  import org.apache.http.HttpRequest;
39  import org.apache.http.HttpResponse;
40  import org.apache.http.HttpStatus;
41  import org.apache.http.ProtocolException;
42  import org.apache.http.annotation.Immutable;
43  import org.apache.http.nio.ContentDecoder;
44  import org.apache.http.nio.ContentEncoder;
45  import org.apache.http.nio.NHttpClientConnection;
46  import org.apache.http.nio.NHttpClientEventHandler;
47  import org.apache.http.nio.NHttpConnection;
48  import org.apache.http.protocol.HttpContext;
49  import org.apache.http.protocol.HttpProcessor;
50  import org.apache.http.util.Args;
51  import org.apache.http.util.Asserts;
52  
53  /**
54   * <tt>HttpAsyncRequestExecutor</tt> is a fully asynchronous HTTP client side
55   * protocol handler based on the NIO (non-blocking) I/O model.
56   * <tt>HttpAsyncRequestExecutor</tt> translates individual events fired through
57   * the {@link NHttpClientEventHandler} interface into logically related HTTP
58   * message exchanges.
59   * <p/> The caller is expected to pass an instance of
60   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
61   * of HTTP message exchanges through the connection context using
62   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
63   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
64   * returns <code>true</code>. The {@link HttpAsyncRequester} utility class can
65   * be used to facilitate initiation of asynchronous HTTP request execution.
66   * <p/>
67   * Individual <tt>HttpAsyncClientExchangeHandler</tt> are expected to make use of
68   * a {@link HttpProcessor} to generate mandatory protocol headers for all outgoing
69   * messages and apply common, cross-cutting message transformations to all incoming
70   * and outgoing messages. <tt>HttpAsyncClientExchangeHandler</tt>s can delegate
71   * implementation of application specific content generation and processing to
72   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
73   *
74   * @see HttpAsyncClientExchangeHandler
75   *
76   * @since 4.2
77   */
78  @Immutable
79  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
80  
81      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
82      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
83  
84      private final int waitForContinue;
85  
86      /**
87       * Creates new instance of HttpAsyncRequestExecutor.
88       *
89       * @since 4.3
90       */
91      public HttpAsyncRequestExecutor(final int waitForContinue) {
92          super();
93          this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
94      }
95  
96      public HttpAsyncRequestExecutor() {
97          this(DEFAULT_WAIT_FOR_CONTINUE);
98      }
99  
100     public void connected(
101             final NHttpClientConnection conn,
102             final Object attachment) throws IOException, HttpException {
103         final State state = new State();
104         final HttpContext context = conn.getContext();
105         context.setAttribute(HTTP_EXCHANGE_STATE, state);
106         requestReady(conn);
107     }
108 
109     public void closed(final NHttpClientConnection conn) {
110         final State state = getState(conn);
111         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
112         if (state == null || (handler != null && handler.isDone())) {
113             closeHandler(handler);
114         }
115         if (state != null) {
116             state.reset();
117         }
118     }
119 
120     public void exception(
121             final NHttpClientConnection conn, final Exception cause) {
122         shutdownConnection(conn);
123         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
124         if (handler != null) {
125             handler.failed(cause);
126         } else {
127             log(cause);
128         }
129     }
130 
131     public void requestReady(
132             final NHttpClientConnection conn) throws IOException, HttpException {
133         final State state = ensureNotNull(getState(conn));
134         if (state.getRequestState() != MessageState.READY) {
135             return;
136         }
137         HttpAsyncClientExchangeHandler handler = getHandler(conn);
138         if (handler != null && handler.isDone()) {
139             closeHandler(handler);
140             state.reset();
141             handler = null;
142         }
143         if (handler == null) {
144             return;
145         }
146 
147         final HttpRequest request = handler.generateRequest();
148         state.setRequest(request);
149 
150         conn.submitRequest(request);
151 
152         if (request instanceof HttpEntityEnclosingRequest) {
153             if (((HttpEntityEnclosingRequest) request).expectContinue()) {
154                 final int timeout = conn.getSocketTimeout();
155                 state.setTimeout(timeout);
156                 conn.setSocketTimeout(this.waitForContinue);
157                 state.setRequestState(MessageState.ACK_EXPECTED);
158             } else {
159                 state.setRequestState(MessageState.BODY_STREAM);
160             }
161         } else {
162             handler.requestCompleted();
163             state.setRequestState(MessageState.COMPLETED);
164         }
165     }
166 
167     public void outputReady(
168             final NHttpClientConnection conn,
169             final ContentEncoder encoder) throws IOException, HttpException {
170         final State state = ensureNotNull(getState(conn));
171         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
172         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
173             conn.suspendOutput();
174             return;
175         }
176         handler.produceContent(encoder, conn);
177         state.setRequestState(MessageState.BODY_STREAM);
178         if (encoder.isCompleted()) {
179             handler.requestCompleted();
180             state.setRequestState(MessageState.COMPLETED);
181         }
182     }
183 
184     public void responseReceived(
185             final NHttpClientConnection conn) throws HttpException, IOException {
186         final State state = ensureNotNull(getState(conn));
187         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
188         final HttpResponse response = conn.getHttpResponse();
189         final HttpRequest request = state.getRequest();
190 
191         final int statusCode = response.getStatusLine().getStatusCode();
192         if (statusCode < HttpStatus.SC_OK) {
193             // 1xx intermediate response
194             if (statusCode != HttpStatus.SC_CONTINUE) {
195                 throw new ProtocolException(
196                         "Unexpected response: " + response.getStatusLine());
197             }
198             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
199                 final int timeout = state.getTimeout();
200                 conn.setSocketTimeout(timeout);
201                 conn.requestOutput();
202                 state.setRequestState(MessageState.ACK);
203             }
204             return;
205         }
206         state.setResponse(response);
207         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
208             final int timeout = state.getTimeout();
209             conn.setSocketTimeout(timeout);
210             conn.resetOutput();
211             state.setRequestState(MessageState.COMPLETED);
212         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
213             // Early response
214             conn.resetOutput();
215             conn.suspendOutput();
216             state.setRequestState(MessageState.COMPLETED);
217             state.invalidate();
218         }
219 
220         handler.responseReceived(response);
221 
222         state.setResponseState(MessageState.BODY_STREAM);
223         if (!canResponseHaveBody(request, response)) {
224             response.setEntity(null);
225             conn.resetInput();
226             processResponse(conn, state, handler);
227         }
228     }
229 
230     public void inputReady(
231             final NHttpClientConnection conn,
232             final ContentDecoder decoder) throws IOException, HttpException {
233         final State state = ensureNotNull(getState(conn));
234         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
235         handler.consumeContent(decoder, conn);
236         state.setResponseState(MessageState.BODY_STREAM);
237         if (decoder.isCompleted()) {
238             processResponse(conn, state, handler);
239         }
240     }
241 
242     public void endOfInput(final NHttpClientConnection conn) throws IOException {
243         final State state = getState(conn);
244         if (state != null) {
245             if (state.getRequestState().compareTo(MessageState.READY) != 0) {
246                 state.invalidate();
247             }
248             final HttpAsyncClientExchangeHandler handler = getHandler(conn);
249             if (handler != null) {
250                 if (state.isValid()) {
251                     handler.inputTerminated();
252                 } else {
253                     handler.failed(new ConnectionClosedException("Connection closed"));
254                 }
255             }
256         }
257         // Closing connection in an orderly manner and
258         // waiting for output buffer to get flushed.
259         // Do not want to wait indefinitely, though, in case
260         // the opposite end is not reading
261         if (conn.getSocketTimeout() <= 0) {
262             conn.setSocketTimeout(1000);
263         }
264         conn.close();
265     }
266 
267     public void timeout(
268             final NHttpClientConnection conn) throws IOException {
269         final State state = getState(conn);
270         if (state != null) {
271             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
272                 final int timeout = state.getTimeout();
273                 conn.setSocketTimeout(timeout);
274                 conn.requestOutput();
275                 state.setRequestState(MessageState.BODY_STREAM);
276                 return;
277             } else {
278                 state.invalidate();
279                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
280                 if (handler != null) {
281                     handler.failed(new SocketTimeoutException());
282                     handler.close();
283                 }
284             }
285         }
286         if (conn.getStatus() == NHttpConnection.ACTIVE) {
287             conn.close();
288             if (conn.getStatus() == NHttpConnection.CLOSING) {
289                 // Give the connection some grace time to
290                 // close itself nicely
291                 conn.setSocketTimeout(250);
292             }
293         } else {
294             conn.shutdown();
295         }
296     }
297 
298     /**
299      * This method can be used to log I/O exception thrown while closing {@link Closeable}
300      * objects (such as {@link HttpConnection}}).
301      *
302      * @param ex I/O exception thrown by {@link Closeable#close()}
303      */
304     protected void log(final Exception ex) {
305     }
306 
307     private State getState(final NHttpConnection conn) {
308         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
309     }
310 
311     private State ensureNotNull(final State state) {
312         Asserts.notNull(state, "HTTP exchange state");
313         return state;
314     }
315 
316     private HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
317         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
318     }
319 
320     private HttpAsyncClientExchangeHandler ensureNotNull(final HttpAsyncClientExchangeHandler handler) {
321         Asserts.notNull(handler, "HTTP exchange handler");
322         return handler;
323     }
324 
325     private void shutdownConnection(final NHttpConnection conn) {
326         try {
327             conn.shutdown();
328         } catch (final IOException ex) {
329             log(ex);
330         }
331     }
332 
333     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
334         if (handler != null) {
335             try {
336                 handler.close();
337             } catch (final IOException ioex) {
338                 log(ioex);
339             }
340         }
341     }
342 
343     private void processResponse(
344             final NHttpClientConnection conn,
345             final State state,
346             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
347         if (!state.isValid()) {
348             conn.close();
349         }
350         handler.responseCompleted();
351         state.reset();
352         if (!handler.isDone()) {
353             conn.requestOutput();
354         }
355     }
356 
357     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
358 
359         final String method = request.getRequestLine().getMethod();
360         final int status = response.getStatusLine().getStatusCode();
361 
362         if (method.equalsIgnoreCase("HEAD")) {
363             return false;
364         }
365         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
366             return false;
367         }
368         return status >= HttpStatus.SC_OK
369             && status != HttpStatus.SC_NO_CONTENT
370             && status != HttpStatus.SC_NOT_MODIFIED
371             && status != HttpStatus.SC_RESET_CONTENT;
372     }
373 
374     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
375 
376     static class State {
377 
378         private volatile MessageState requestState;
379         private volatile MessageState responseState;
380         private volatile HttpRequest request;
381         private volatile HttpResponse response;
382         private volatile boolean valid;
383         private volatile int timeout;
384 
385         State() {
386             super();
387             this.valid = true;
388             this.requestState = MessageState.READY;
389             this.responseState = MessageState.READY;
390         }
391 
392         public MessageState getRequestState() {
393             return this.requestState;
394         }
395 
396         public void setRequestState(final MessageState state) {
397             this.requestState = state;
398         }
399 
400         public MessageState getResponseState() {
401             return this.responseState;
402         }
403 
404         public void setResponseState(final MessageState state) {
405             this.responseState = state;
406         }
407 
408         public HttpRequest getRequest() {
409             return this.request;
410         }
411 
412         public void setRequest(final HttpRequest request) {
413             this.request = request;
414         }
415 
416         public HttpResponse getResponse() {
417             return this.response;
418         }
419 
420         public void setResponse(final HttpResponse response) {
421             this.response = response;
422         }
423 
424         public int getTimeout() {
425             return this.timeout;
426         }
427 
428         public void setTimeout(final int timeout) {
429             this.timeout = timeout;
430         }
431 
432         public void reset() {
433             this.responseState = MessageState.READY;
434             this.requestState = MessageState.READY;
435             this.response = null;
436             this.request = null;
437             this.timeout = 0;
438         }
439 
440         public boolean isValid() {
441             return this.valid;
442         }
443 
444         public void invalidate() {
445             this.valid = false;
446         }
447 
448         @Override
449         public String toString() {
450             final StringBuilder buf = new StringBuilder();
451             buf.append("request state: ");
452             buf.append(this.requestState);
453             buf.append("; request: ");
454             if (this.request != null) {
455                 buf.append(this.request.getRequestLine());
456             }
457             buf.append("; response state: ");
458             buf.append(this.responseState);
459             buf.append("; response: ");
460             if (this.response != null) {
461                 buf.append(this.response.getStatusLine());
462             }
463             buf.append("; valid: ");
464             buf.append(this.valid);
465             buf.append(";");
466             return buf.toString();
467         }
468 
469     }
470 
471 }