View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  
33  import org.apache.http.ConnectionClosedException;
34  import org.apache.http.HttpEntityEnclosingRequest;
35  import org.apache.http.HttpException;
36  import org.apache.http.HttpRequest;
37  import org.apache.http.HttpResponse;
38  import org.apache.http.HttpStatus;
39  import org.apache.http.ProtocolException;
40  import org.apache.http.annotation.Immutable;
41  import org.apache.http.nio.ContentDecoder;
42  import org.apache.http.nio.ContentEncoder;
43  import org.apache.http.nio.NHttpClientConnection;
44  import org.apache.http.nio.NHttpClientEventHandler;
45  import org.apache.http.nio.NHttpConnection;
46  import org.apache.http.protocol.HttpContext;
47  import org.apache.http.util.Args;
48  import org.apache.http.util.Asserts;
49  
50  /**
51   * <tt>HttpAsyncRequestExecutor</tt> is a fully asynchronous HTTP client side
52   * protocol handler based on the NIO (non-blocking) I/O model.
53   * <tt>HttpAsyncRequestExecutor</tt> translates individual events fired through
54   * the {@link NHttpClientEventHandler} interface into logically related HTTP
55   * message exchanges.
56   * <p/> The caller is expected to pass an instance of
57   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
58   * of HTTP message exchanges through the connection context using
59   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
60   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
61   * returns <code>true</code>. The {@link HttpAsyncRequester} utility class can
62   * be used to facilitate initiation of asynchronous HTTP request execution.
63   * <p/>
64   * Individual <tt>HttpAsyncClientExchangeHandler</tt> are expected to make use of
65   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
66   * headers for all outgoing messages and apply common, cross-cutting message
67   * transformations to all incoming and outgoing messages.
68   * <tt>HttpAsyncClientExchangeHandler</tt>s can delegate implementation of
69   * application specific content generation and processing to
70   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
71   *
72   * @see HttpAsyncClientExchangeHandler
73   *
74   * @since 4.2
75   */
76  @Immutable
77  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
78  
79      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
80      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
81  
82      private final int waitForContinue;
83  
84      /**
85       * Creates new instance of HttpAsyncRequestExecutor.
86       *
87       * @since 4.3
88       */
89      public HttpAsyncRequestExecutor(final int waitForContinue) {
90          super();
91          this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
92      }
93  
94      public HttpAsyncRequestExecutor() {
95          this(DEFAULT_WAIT_FOR_CONTINUE);
96      }
97  
98      public void connected(
99              final NHttpClientConnection conn,
100             final Object attachment) throws IOException, HttpException {
101         final State state = new State();
102         final HttpContext context = conn.getContext();
103         context.setAttribute(HTTP_EXCHANGE_STATE, state);
104         requestReady(conn);
105     }
106 
107     public void closed(final NHttpClientConnection conn) {
108         final State state = getState(conn);
109         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
110         if (state != null) {
111             if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
112                 if (handler != null) {
113                     handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
114                 }
115             }
116         }
117         if (state == null || (handler != null && handler.isDone())) {
118             closeHandler(handler);
119         }
120     }
121 
122     public void exception(
123             final NHttpClientConnection conn, final Exception cause) {
124         shutdownConnection(conn);
125         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
126         if (handler != null) {
127             handler.failed(cause);
128         } else {
129             log(cause);
130         }
131     }
132 
133     public void requestReady(
134             final NHttpClientConnection conn) throws IOException, HttpException {
135         final State state = ensureNotNull(getState(conn));
136         if (state.getRequestState() != MessageState.READY) {
137             return;
138         }
139         HttpAsyncClientExchangeHandler handler = getHandler(conn);
140         if (handler != null && handler.isDone()) {
141             closeHandler(handler);
142             state.reset();
143             handler = null;
144         }
145         if (handler == null) {
146             return;
147         }
148 
149         final HttpRequest request = handler.generateRequest();
150         if (request == null) {
151             return;
152         }
153         state.setRequest(request);
154 
155         conn.submitRequest(request);
156 
157         if (request instanceof HttpEntityEnclosingRequest) {
158             if (((HttpEntityEnclosingRequest) request).expectContinue()) {
159                 final int timeout = conn.getSocketTimeout();
160                 state.setTimeout(timeout);
161                 conn.setSocketTimeout(this.waitForContinue);
162                 state.setRequestState(MessageState.ACK_EXPECTED);
163             } else {
164                 state.setRequestState(MessageState.BODY_STREAM);
165             }
166         } else {
167             handler.requestCompleted();
168             state.setRequestState(MessageState.COMPLETED);
169         }
170     }
171 
172     public void outputReady(
173             final NHttpClientConnection conn,
174             final ContentEncoder encoder) throws IOException, HttpException {
175         final State state = ensureNotNull(getState(conn));
176         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
177         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
178             conn.suspendOutput();
179             return;
180         }
181         handler.produceContent(encoder, conn);
182         state.setRequestState(MessageState.BODY_STREAM);
183         if (encoder.isCompleted()) {
184             handler.requestCompleted();
185             state.setRequestState(MessageState.COMPLETED);
186         }
187     }
188 
189     public void responseReceived(
190             final NHttpClientConnection conn) throws HttpException, IOException {
191         final State state = ensureNotNull(getState(conn));
192         final HttpRequest request = state.getRequest();
193         if (request == null) {
194             throw new HttpException("Out of sequence response");
195         }
196         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
197         final HttpResponse response = conn.getHttpResponse();
198 
199         final int statusCode = response.getStatusLine().getStatusCode();
200         if (statusCode < HttpStatus.SC_OK) {
201             // 1xx intermediate response
202             if (statusCode != HttpStatus.SC_CONTINUE) {
203                 throw new ProtocolException(
204                         "Unexpected response: " + response.getStatusLine());
205             }
206             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
207                 final int timeout = state.getTimeout();
208                 conn.setSocketTimeout(timeout);
209                 conn.requestOutput();
210                 state.setRequestState(MessageState.ACK);
211             }
212             return;
213         }
214         state.setResponse(response);
215         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
216             final int timeout = state.getTimeout();
217             conn.setSocketTimeout(timeout);
218             conn.resetOutput();
219             state.setRequestState(MessageState.COMPLETED);
220         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
221             // Early response
222             conn.resetOutput();
223             conn.suspendOutput();
224             state.setRequestState(MessageState.COMPLETED);
225             state.invalidate();
226         }
227 
228         handler.responseReceived(response);
229 
230         state.setResponseState(MessageState.BODY_STREAM);
231         if (!canResponseHaveBody(request, response)) {
232             response.setEntity(null);
233             conn.resetInput();
234             processResponse(conn, state, handler);
235         }
236     }
237 
238     public void inputReady(
239             final NHttpClientConnection conn,
240             final ContentDecoder decoder) throws IOException, HttpException {
241         final State state = ensureNotNull(getState(conn));
242         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
243         handler.consumeContent(decoder, conn);
244         state.setResponseState(MessageState.BODY_STREAM);
245         if (decoder.isCompleted()) {
246             processResponse(conn, state, handler);
247         }
248     }
249 
250     public void endOfInput(final NHttpClientConnection conn) throws IOException {
251         final State state = getState(conn);
252         if (state != null) {
253             if (state.getRequestState().compareTo(MessageState.READY) != 0) {
254                 state.invalidate();
255             }
256             final HttpAsyncClientExchangeHandler handler = getHandler(conn);
257             if (handler != null) {
258                 if (state.isValid()) {
259                     handler.inputTerminated();
260                 } else {
261                     handler.failed(new ConnectionClosedException("Connection closed"));
262                 }
263             }
264         }
265         // Closing connection in an orderly manner and
266         // waiting for output buffer to get flushed.
267         // Do not want to wait indefinitely, though, in case
268         // the opposite end is not reading
269         if (conn.getSocketTimeout() <= 0) {
270             conn.setSocketTimeout(1000);
271         }
272         conn.close();
273     }
274 
275     public void timeout(
276             final NHttpClientConnection conn) throws IOException {
277         final State state = getState(conn);
278         if (state != null) {
279             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
280                 final int timeout = state.getTimeout();
281                 conn.setSocketTimeout(timeout);
282                 conn.requestOutput();
283                 state.setRequestState(MessageState.BODY_STREAM);
284                 return;
285             } else {
286                 state.invalidate();
287                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
288                 if (handler != null) {
289                     handler.failed(new SocketTimeoutException());
290                     handler.close();
291                 }
292             }
293         }
294         if (conn.getStatus() == NHttpConnection.ACTIVE) {
295             conn.close();
296             if (conn.getStatus() == NHttpConnection.CLOSING) {
297                 // Give the connection some grace time to
298                 // close itself nicely
299                 conn.setSocketTimeout(250);
300             }
301         } else {
302             conn.shutdown();
303         }
304     }
305 
306     /**
307      * This method can be used to log I/O exception thrown while closing
308      * {@link java.io.Closeable} objects (such as
309      * {@link org.apache.http.HttpConnection}}).
310      *
311      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
312      */
313     protected void log(final Exception ex) {
314     }
315 
316     private State getState(final NHttpConnection conn) {
317         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
318     }
319 
320     private State ensureNotNull(final State state) {
321         Asserts.notNull(state, "HTTP exchange state");
322         return state;
323     }
324 
325     private HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
326         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
327     }
328 
329     private HttpAsyncClientExchangeHandler ensureNotNull(final HttpAsyncClientExchangeHandler handler) {
330         Asserts.notNull(handler, "HTTP exchange handler");
331         return handler;
332     }
333 
334     private void shutdownConnection(final NHttpConnection conn) {
335         try {
336             conn.shutdown();
337         } catch (final IOException ex) {
338             log(ex);
339         }
340     }
341 
342     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
343         if (handler != null) {
344             try {
345                 handler.close();
346             } catch (final IOException ioex) {
347                 log(ioex);
348             }
349         }
350     }
351 
352     private void processResponse(
353             final NHttpClientConnection conn,
354             final State state,
355             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
356         if (!state.isValid()) {
357             conn.close();
358         }
359         handler.responseCompleted();
360         state.reset();
361         if (!handler.isDone() && conn.isOpen()) {
362             conn.requestOutput();
363         }
364     }
365 
366     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
367 
368         final String method = request.getRequestLine().getMethod();
369         final int status = response.getStatusLine().getStatusCode();
370 
371         if (method.equalsIgnoreCase("HEAD")) {
372             return false;
373         }
374         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
375             return false;
376         }
377         return status >= HttpStatus.SC_OK
378             && status != HttpStatus.SC_NO_CONTENT
379             && status != HttpStatus.SC_NOT_MODIFIED
380             && status != HttpStatus.SC_RESET_CONTENT;
381     }
382 
383     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
384 
385     static class State {
386 
387         private volatile MessageState requestState;
388         private volatile MessageState responseState;
389         private volatile HttpRequest request;
390         private volatile HttpResponse response;
391         private volatile boolean valid;
392         private volatile int timeout;
393 
394         State() {
395             super();
396             this.valid = true;
397             this.requestState = MessageState.READY;
398             this.responseState = MessageState.READY;
399         }
400 
401         public MessageState getRequestState() {
402             return this.requestState;
403         }
404 
405         public void setRequestState(final MessageState state) {
406             this.requestState = state;
407         }
408 
409         public MessageState getResponseState() {
410             return this.responseState;
411         }
412 
413         public void setResponseState(final MessageState state) {
414             this.responseState = state;
415         }
416 
417         public HttpRequest getRequest() {
418             return this.request;
419         }
420 
421         public void setRequest(final HttpRequest request) {
422             this.request = request;
423         }
424 
425         public HttpResponse getResponse() {
426             return this.response;
427         }
428 
429         public void setResponse(final HttpResponse response) {
430             this.response = response;
431         }
432 
433         public int getTimeout() {
434             return this.timeout;
435         }
436 
437         public void setTimeout(final int timeout) {
438             this.timeout = timeout;
439         }
440 
441         public void reset() {
442             this.responseState = MessageState.READY;
443             this.requestState = MessageState.READY;
444             this.response = null;
445             this.request = null;
446             this.timeout = 0;
447         }
448 
449         public boolean isValid() {
450             return this.valid;
451         }
452 
453         public void invalidate() {
454             this.valid = false;
455         }
456 
457         @Override
458         public String toString() {
459             final StringBuilder buf = new StringBuilder();
460             buf.append("request state: ");
461             buf.append(this.requestState);
462             buf.append("; request: ");
463             if (this.request != null) {
464                 buf.append(this.request.getRequestLine());
465             }
466             buf.append("; response state: ");
467             buf.append(this.responseState);
468             buf.append("; response: ");
469             if (this.response != null) {
470                 buf.append(this.response.getStatusLine());
471             }
472             buf.append("; valid: ");
473             buf.append(this.valid);
474             buf.append(";");
475             return buf.toString();
476         }
477 
478     }
479 
480 }