View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  
33  import org.apache.http.ConnectionClosedException;
34  import org.apache.http.HttpEntityEnclosingRequest;
35  import org.apache.http.HttpException;
36  import org.apache.http.HttpRequest;
37  import org.apache.http.HttpResponse;
38  import org.apache.http.HttpStatus;
39  import org.apache.http.ProtocolException;
40  import org.apache.http.annotation.Immutable;
41  import org.apache.http.nio.ContentDecoder;
42  import org.apache.http.nio.ContentEncoder;
43  import org.apache.http.nio.NHttpClientConnection;
44  import org.apache.http.nio.NHttpClientEventHandler;
45  import org.apache.http.nio.NHttpConnection;
46  import org.apache.http.protocol.HttpContext;
47  import org.apache.http.util.Args;
48  import org.apache.http.util.Asserts;
49  
50  /**
51   * <tt>HttpAsyncRequestExecutor</tt> is a fully asynchronous HTTP client side
52   * protocol handler based on the NIO (non-blocking) I/O model.
53   * <tt>HttpAsyncRequestExecutor</tt> translates individual events fired through
54   * the {@link NHttpClientEventHandler} interface into logically related HTTP
55   * message exchanges.
56   * <p/> The caller is expected to pass an instance of
57   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
58   * of HTTP message exchanges through the connection context using
59   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
60   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
61   * returns <code>true</code>. The {@link HttpAsyncRequester} utility class can
62   * be used to facilitate initiation of asynchronous HTTP request execution.
63   * <p/>
64   * Individual <tt>HttpAsyncClientExchangeHandler</tt> are expected to make use of
65   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
66   * headers for all outgoing messages and apply common, cross-cutting message
67   * transformations to all incoming and outgoing messages.
68   * <tt>HttpAsyncClientExchangeHandler</tt>s can delegate implementation of
69   * application specific content generation and processing to
70   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
71   *
72   * @see HttpAsyncClientExchangeHandler
73   *
74   * @since 4.2
75   */
76  @Immutable
77  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
78  
79      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
80      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
81  
82      private final int waitForContinue;
83  
84      /**
85       * Creates new instance of HttpAsyncRequestExecutor.
86       *
87       * @since 4.3
88       */
89      public HttpAsyncRequestExecutor(final int waitForContinue) {
90          super();
91          this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
92      }
93  
94      public HttpAsyncRequestExecutor() {
95          this(DEFAULT_WAIT_FOR_CONTINUE);
96      }
97  
98      public void connected(
99              final NHttpClientConnection conn,
100             final Object attachment) throws IOException, HttpException {
101         final State state = new State();
102         final HttpContext context = conn.getContext();
103         context.setAttribute(HTTP_EXCHANGE_STATE, state);
104         requestReady(conn);
105     }
106 
107     public void closed(final NHttpClientConnection conn) {
108         final State state = getState(conn);
109         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
110         if (state == null || (handler != null && handler.isDone())) {
111             closeHandler(handler);
112         }
113         if (state != null) {
114             state.reset();
115         }
116     }
117 
118     public void exception(
119             final NHttpClientConnection conn, final Exception cause) {
120         shutdownConnection(conn);
121         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
122         if (handler != null) {
123             handler.failed(cause);
124         } else {
125             log(cause);
126         }
127     }
128 
129     public void requestReady(
130             final NHttpClientConnection conn) throws IOException, HttpException {
131         final State state = ensureNotNull(getState(conn));
132         if (state.getRequestState() != MessageState.READY) {
133             return;
134         }
135         HttpAsyncClientExchangeHandler handler = getHandler(conn);
136         if (handler != null && handler.isDone()) {
137             closeHandler(handler);
138             state.reset();
139             handler = null;
140         }
141         if (handler == null) {
142             return;
143         }
144 
145         final HttpRequest request = handler.generateRequest();
146         if (request == null) {
147             return;
148         }
149         state.setRequest(request);
150 
151         conn.submitRequest(request);
152 
153         if (request instanceof HttpEntityEnclosingRequest) {
154             if (((HttpEntityEnclosingRequest) request).expectContinue()) {
155                 final int timeout = conn.getSocketTimeout();
156                 state.setTimeout(timeout);
157                 conn.setSocketTimeout(this.waitForContinue);
158                 state.setRequestState(MessageState.ACK_EXPECTED);
159             } else {
160                 state.setRequestState(MessageState.BODY_STREAM);
161             }
162         } else {
163             handler.requestCompleted();
164             state.setRequestState(MessageState.COMPLETED);
165         }
166     }
167 
168     public void outputReady(
169             final NHttpClientConnection conn,
170             final ContentEncoder encoder) throws IOException, HttpException {
171         final State state = ensureNotNull(getState(conn));
172         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
173         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
174             conn.suspendOutput();
175             return;
176         }
177         handler.produceContent(encoder, conn);
178         state.setRequestState(MessageState.BODY_STREAM);
179         if (encoder.isCompleted()) {
180             handler.requestCompleted();
181             state.setRequestState(MessageState.COMPLETED);
182         }
183     }
184 
185     public void responseReceived(
186             final NHttpClientConnection conn) throws HttpException, IOException {
187         final State state = ensureNotNull(getState(conn));
188         final HttpRequest request = state.getRequest();
189         if (request == null) {
190             throw new HttpException("Out of sequence response");
191         }
192         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
193         final HttpResponse response = conn.getHttpResponse();
194 
195         final int statusCode = response.getStatusLine().getStatusCode();
196         if (statusCode < HttpStatus.SC_OK) {
197             // 1xx intermediate response
198             if (statusCode != HttpStatus.SC_CONTINUE) {
199                 throw new ProtocolException(
200                         "Unexpected response: " + response.getStatusLine());
201             }
202             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
203                 final int timeout = state.getTimeout();
204                 conn.setSocketTimeout(timeout);
205                 conn.requestOutput();
206                 state.setRequestState(MessageState.ACK);
207             }
208             return;
209         }
210         state.setResponse(response);
211         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
212             final int timeout = state.getTimeout();
213             conn.setSocketTimeout(timeout);
214             conn.resetOutput();
215             state.setRequestState(MessageState.COMPLETED);
216         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
217             // Early response
218             conn.resetOutput();
219             conn.suspendOutput();
220             state.setRequestState(MessageState.COMPLETED);
221             state.invalidate();
222         }
223 
224         handler.responseReceived(response);
225 
226         state.setResponseState(MessageState.BODY_STREAM);
227         if (!canResponseHaveBody(request, response)) {
228             response.setEntity(null);
229             conn.resetInput();
230             processResponse(conn, state, handler);
231         }
232     }
233 
234     public void inputReady(
235             final NHttpClientConnection conn,
236             final ContentDecoder decoder) throws IOException, HttpException {
237         final State state = ensureNotNull(getState(conn));
238         final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
239         handler.consumeContent(decoder, conn);
240         state.setResponseState(MessageState.BODY_STREAM);
241         if (decoder.isCompleted()) {
242             processResponse(conn, state, handler);
243         }
244     }
245 
246     public void endOfInput(final NHttpClientConnection conn) throws IOException {
247         final State state = getState(conn);
248         if (state != null) {
249             if (state.getRequestState().compareTo(MessageState.READY) != 0) {
250                 state.invalidate();
251             }
252             final HttpAsyncClientExchangeHandler handler = getHandler(conn);
253             if (handler != null) {
254                 if (state.isValid()) {
255                     handler.inputTerminated();
256                 } else {
257                     handler.failed(new ConnectionClosedException("Connection closed"));
258                 }
259             }
260         }
261         // Closing connection in an orderly manner and
262         // waiting for output buffer to get flushed.
263         // Do not want to wait indefinitely, though, in case
264         // the opposite end is not reading
265         if (conn.getSocketTimeout() <= 0) {
266             conn.setSocketTimeout(1000);
267         }
268         conn.close();
269     }
270 
271     public void timeout(
272             final NHttpClientConnection conn) throws IOException {
273         final State state = getState(conn);
274         if (state != null) {
275             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
276                 final int timeout = state.getTimeout();
277                 conn.setSocketTimeout(timeout);
278                 conn.requestOutput();
279                 state.setRequestState(MessageState.BODY_STREAM);
280                 return;
281             } else {
282                 state.invalidate();
283                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
284                 if (handler != null) {
285                     handler.failed(new SocketTimeoutException());
286                     handler.close();
287                 }
288             }
289         }
290         if (conn.getStatus() == NHttpConnection.ACTIVE) {
291             conn.close();
292             if (conn.getStatus() == NHttpConnection.CLOSING) {
293                 // Give the connection some grace time to
294                 // close itself nicely
295                 conn.setSocketTimeout(250);
296             }
297         } else {
298             conn.shutdown();
299         }
300     }
301 
302     /**
303      * This method can be used to log I/O exception thrown while closing
304      * {@link java.io.Closeable} objects (such as
305      * {@link org.apache.http.HttpConnection}}).
306      *
307      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
308      */
309     protected void log(final Exception ex) {
310     }
311 
312     private State getState(final NHttpConnection conn) {
313         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
314     }
315 
316     private State ensureNotNull(final State state) {
317         Asserts.notNull(state, "HTTP exchange state");
318         return state;
319     }
320 
321     private HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
322         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
323     }
324 
325     private HttpAsyncClientExchangeHandler ensureNotNull(final HttpAsyncClientExchangeHandler handler) {
326         Asserts.notNull(handler, "HTTP exchange handler");
327         return handler;
328     }
329 
330     private void shutdownConnection(final NHttpConnection conn) {
331         try {
332             conn.shutdown();
333         } catch (final IOException ex) {
334             log(ex);
335         }
336     }
337 
338     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
339         if (handler != null) {
340             try {
341                 handler.close();
342             } catch (final IOException ioex) {
343                 log(ioex);
344             }
345         }
346     }
347 
348     private void processResponse(
349             final NHttpClientConnection conn,
350             final State state,
351             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
352         if (!state.isValid()) {
353             conn.close();
354         }
355         handler.responseCompleted();
356         state.reset();
357         if (!handler.isDone() && conn.isOpen()) {
358             conn.requestOutput();
359         }
360     }
361 
362     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
363 
364         final String method = request.getRequestLine().getMethod();
365         final int status = response.getStatusLine().getStatusCode();
366 
367         if (method.equalsIgnoreCase("HEAD")) {
368             return false;
369         }
370         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
371             return false;
372         }
373         return status >= HttpStatus.SC_OK
374             && status != HttpStatus.SC_NO_CONTENT
375             && status != HttpStatus.SC_NOT_MODIFIED
376             && status != HttpStatus.SC_RESET_CONTENT;
377     }
378 
379     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
380 
381     static class State {
382 
383         private volatile MessageState requestState;
384         private volatile MessageState responseState;
385         private volatile HttpRequest request;
386         private volatile HttpResponse response;
387         private volatile boolean valid;
388         private volatile int timeout;
389 
390         State() {
391             super();
392             this.valid = true;
393             this.requestState = MessageState.READY;
394             this.responseState = MessageState.READY;
395         }
396 
397         public MessageState getRequestState() {
398             return this.requestState;
399         }
400 
401         public void setRequestState(final MessageState state) {
402             this.requestState = state;
403         }
404 
405         public MessageState getResponseState() {
406             return this.responseState;
407         }
408 
409         public void setResponseState(final MessageState state) {
410             this.responseState = state;
411         }
412 
413         public HttpRequest getRequest() {
414             return this.request;
415         }
416 
417         public void setRequest(final HttpRequest request) {
418             this.request = request;
419         }
420 
421         public HttpResponse getResponse() {
422             return this.response;
423         }
424 
425         public void setResponse(final HttpResponse response) {
426             this.response = response;
427         }
428 
429         public int getTimeout() {
430             return this.timeout;
431         }
432 
433         public void setTimeout(final int timeout) {
434             this.timeout = timeout;
435         }
436 
437         public void reset() {
438             this.responseState = MessageState.READY;
439             this.requestState = MessageState.READY;
440             this.response = null;
441             this.request = null;
442             this.timeout = 0;
443         }
444 
445         public boolean isValid() {
446             return this.valid;
447         }
448 
449         public void invalidate() {
450             this.valid = false;
451         }
452 
453         @Override
454         public String toString() {
455             final StringBuilder buf = new StringBuilder();
456             buf.append("request state: ");
457             buf.append(this.requestState);
458             buf.append("; request: ");
459             if (this.request != null) {
460                 buf.append(this.request.getRequestLine());
461             }
462             buf.append("; response state: ");
463             buf.append(this.responseState);
464             buf.append("; response: ");
465             if (this.response != null) {
466                 buf.append(this.response.getStatusLine());
467             }
468             buf.append("; valid: ");
469             buf.append(this.valid);
470             buf.append(";");
471             return buf.toString();
472         }
473 
474     }
475 
476 }