View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.Closeable;
31  import java.io.IOException;
32  import java.net.SocketTimeoutException;
33  
34  import org.apache.http.ConnectionClosedException;
35  import org.apache.http.ConnectionReuseStrategy;
36  import org.apache.http.HttpConnection;
37  import org.apache.http.HttpEntityEnclosingRequest;
38  import org.apache.http.HttpException;
39  import org.apache.http.HttpRequest;
40  import org.apache.http.HttpResponse;
41  import org.apache.http.HttpStatus;
42  import org.apache.http.ProtocolException;
43  import org.apache.http.annotation.Immutable;
44  import org.apache.http.nio.ContentDecoder;
45  import org.apache.http.nio.ContentEncoder;
46  import org.apache.http.nio.NHttpClientConnection;
47  import org.apache.http.nio.NHttpClientEventHandler;
48  import org.apache.http.nio.NHttpConnection;
49  import org.apache.http.params.CoreProtocolPNames;
50  import org.apache.http.params.HttpConnectionParams;
51  import org.apache.http.protocol.ExecutionContext;
52  import org.apache.http.protocol.HttpContext;
53  import org.apache.http.protocol.HttpProcessor;
54  
55  /**
56   * <tt>HttpAsyncRequestExecutor</tt> is a fully asynchronous HTTP client side
57   * protocol handler based on the NIO (non-blocking) I/O model.
58   * <tt>HttpAsyncRequestExecutor</tt> translates individual events fired through
59   * the {@link NHttpClientEventHandler} interface into logically related HTTP
60   * message exchanges.
61   * <p/>
62   * <tt>HttpAsyncRequestExecutor</tt> relies on {@link HttpProcessor}
63   * to generate mandatory protocol headers for all outgoing messages and apply
64   * common, cross-cutting message transformations to all incoming and outgoing
65   * messages, whereas individual {@link HttpAsyncRequestExecutionHandler}s
66   * are expected to implement application specific content generation and
67   * processing. The caller is expected to pass an instance of
68   * {@link HttpAsyncRequestExecutionHandler} to be used for the next series
69   * of HTTP message exchanges through the connection context using
70   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
71   * complete when the {@link HttpAsyncRequestExecutionHandler#isDone()} method
72   * returns <code>true</code>. The {@link HttpAsyncRequester} utility class can
73   * be used to facilitate initiation of asynchronous HTTP request execution.
74   * <p/>
75   * The following parameters can be used to customize the behavior of this
76   * class:
77   * <ul>
78   *  <li>{@link org.apache.http.params.CoreProtocolPNames#WAIT_FOR_CONTINUE}</li>
79   *  <li>{@link org.apache.http.params.CoreConnectionPNames#SO_TIMEOUT}</li>
80   * </ul>
81   *
82   * @see HttpAsyncRequestExecutionHandler
83   *
84   * @since 4.2
85   */
86  @Immutable
87  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
88  
89      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
90  
91      public HttpAsyncRequestExecutor() {
92          super();
93      }
94  
95      public void connected(
96              final NHttpClientConnection conn,
97              final Object attachment) throws IOException, HttpException {
98          State state = new State();
99          HttpContext context = conn.getContext();
100         context.setAttribute(HTTP_EXCHANGE_STATE, state);
101         requestReady(conn);
102     }
103 
104     public void closed(final NHttpClientConnection conn) {
105         State state = getState(conn);
106         HttpAsyncRequestExecutionHandler<?> handler = getHandler(conn);
107         if (state == null || (handler != null && handler.isDone())) {
108             closeHandler(handler);
109         }
110         if (state != null) {
111             state.reset();
112         }
113     }
114 
115     public void exception(
116             final NHttpClientConnection conn, final Exception cause) {
117         shutdownConnection(conn);
118         HttpAsyncRequestExecutionHandler<?> handler = getHandler(conn);
119         if (handler != null) {
120             handler.failed(cause);
121         } else {
122             log(cause);
123         }
124     }
125 
126     public void requestReady(
127             final NHttpClientConnection conn) throws IOException, HttpException {
128         State state = ensureNotNull(getState(conn));
129         if (state.getRequestState() != MessageState.READY) {
130             return;
131         }
132         HttpAsyncRequestExecutionHandler<?> handler = getHandler(conn);
133         if (handler != null && handler.isDone()) {
134             closeHandler(handler);
135             state.reset();
136             handler = null;
137         }
138         if (handler == null) {
139             return;
140         }
141 
142         HttpContext context = handler.getContext();
143         context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
144 
145         HttpRequest request = handler.generateRequest();
146         context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
147 
148         conn.setSocketTimeout(HttpConnectionParams.getSoTimeout(request.getParams()));
149 
150         HttpProcessor httppocessor = handler.getHttpProcessor();
151         httppocessor.process(request, context);
152 
153         state.setRequest(request);
154 
155         conn.submitRequest(request);
156 
157         if (request instanceof HttpEntityEnclosingRequest) {
158             if (((HttpEntityEnclosingRequest) request).expectContinue()) {
159                 int timeout = conn.getSocketTimeout();
160                 state.setTimeout(timeout);
161                 timeout = request.getParams().getIntParameter(
162                         CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
163                 conn.setSocketTimeout(timeout);
164                 state.setRequestState(MessageState.ACK_EXPECTED);
165             } else {
166                 state.setRequestState(MessageState.BODY_STREAM);
167             }
168         } else {
169             handler.requestCompleted(context);
170             state.setRequestState(MessageState.COMPLETED);
171         }
172     }
173 
174     public void outputReady(
175             final NHttpClientConnection conn,
176             final ContentEncoder encoder) throws IOException {
177         State state = ensureNotNull(getState(conn));
178         HttpAsyncRequestExecutionHandler<?> handler = ensureNotNull(getHandler(conn));
179         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
180             conn.suspendOutput();
181             return;
182         }
183         HttpContext context = handler.getContext();
184         handler.produceContent(encoder, conn);
185         state.setRequestState(MessageState.BODY_STREAM);
186         if (encoder.isCompleted()) {
187             handler.requestCompleted(context);
188             state.setRequestState(MessageState.COMPLETED);
189         }
190     }
191 
192     public void responseReceived(
193             final NHttpClientConnection conn) throws HttpException, IOException {
194         State state = ensureNotNull(getState(conn));
195         HttpAsyncRequestExecutionHandler<?> handler = ensureNotNull(getHandler(conn));
196         HttpResponse response = conn.getHttpResponse();
197         HttpRequest request = state.getRequest();
198 
199         int statusCode = response.getStatusLine().getStatusCode();
200         if (statusCode < HttpStatus.SC_OK) {
201             // 1xx intermediate response
202             if (statusCode != HttpStatus.SC_CONTINUE) {
203                 throw new ProtocolException(
204                         "Unexpected response: " + response.getStatusLine());
205             }
206             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
207                 int timeout = state.getTimeout();
208                 conn.setSocketTimeout(timeout);
209                 conn.requestOutput();
210                 state.setRequestState(MessageState.ACK);
211             }
212             return;
213         }
214         state.setResponse(response);
215         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
216             int timeout = state.getTimeout();
217             conn.setSocketTimeout(timeout);
218             conn.resetOutput();
219             state.setRequestState(MessageState.COMPLETED);
220         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
221             // Early response
222             conn.resetOutput();
223             conn.suspendOutput();
224             state.setRequestState(MessageState.COMPLETED);
225             state.invalidate();
226         }
227 
228         handler.responseReceived(response);
229 
230         HttpContext context = handler.getContext();
231         HttpProcessor httpprocessor = handler.getHttpProcessor();
232 
233         context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
234         httpprocessor.process(response, context);
235 
236         state.setResponseState(MessageState.BODY_STREAM);
237         if (!canResponseHaveBody(request, response)) {
238             response.setEntity(null);
239             conn.resetInput();
240             processResponse(conn, state, handler);
241         }
242     }
243 
244     public void inputReady(
245             final NHttpClientConnection conn,
246             final ContentDecoder decoder) throws IOException {
247         State state = ensureNotNull(getState(conn));
248         HttpAsyncRequestExecutionHandler<?> handler = ensureNotNull(getHandler(conn));
249         handler.consumeContent(decoder, conn);
250         state.setResponseState(MessageState.BODY_STREAM);
251         if (decoder.isCompleted()) {
252             processResponse(conn, state, handler);
253         }
254     }
255 
256     public void endOfInput(final NHttpClientConnection conn) throws IOException {
257         State state = getState(conn);
258         if (state != null) {
259             if (state.getRequestState().compareTo(MessageState.READY) != 0) {
260                 state.invalidate();
261                 HttpAsyncRequestExecutionHandler<?> handler = getHandler(conn);
262                 handler.failed(new ConnectionClosedException("Connection closed"));
263             }
264         }
265         // Closing connection in an orderly manner and
266         // waiting for output buffer to get flushed.
267         // Do not want to wait indefinitely, though, in case
268         // the opposite end is not reading
269         if (conn.getSocketTimeout() <= 0) {
270             conn.setSocketTimeout(1000);
271         }
272         conn.close();
273     }
274 
275     public void timeout(
276             final NHttpClientConnection conn) throws IOException {
277         State state = getState(conn);
278         if (state != null) {
279             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
280                 int timeout = state.getTimeout();
281                 conn.setSocketTimeout(timeout);
282                 conn.requestOutput();
283                 state.setRequestState(MessageState.BODY_STREAM);
284                 return;
285             } else {
286                 state.invalidate();
287                 HttpAsyncRequestExecutionHandler<?> handler = getHandler(conn);
288                 if (handler != null) {
289                     handler.failed(new SocketTimeoutException());
290                     handler.close();
291                 }
292             }
293         }
294         if (conn.getStatus() == NHttpConnection.ACTIVE) {
295             conn.close();
296             if (conn.getStatus() == NHttpConnection.CLOSING) {
297                 // Give the connection some grace time to
298                 // close itself nicely
299                 conn.setSocketTimeout(250);
300             }
301         } else {
302             conn.shutdown();
303         }
304     }
305 
306     /**
307      * This method can be used to log I/O exception thrown while closing {@link Closeable}
308      * objects (such as {@link HttpConnection}}).
309      *
310      * @param ex I/O exception thrown by {@link Closeable#close()}
311      */
312     protected void log(final Exception ex) {
313     }
314 
315     private State getState(final NHttpConnection conn) {
316         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
317     }
318 
319     private State ensureNotNull(final State state) {
320         if (state == null) {
321             throw new IllegalStateException("HTTP exchange state is null");
322         }
323         return state;
324     }
325 
326     private HttpAsyncRequestExecutionHandler<?> getHandler(final NHttpConnection conn) {
327         return (HttpAsyncRequestExecutionHandler<?>) conn.getContext().getAttribute(HTTP_HANDLER);
328     }
329 
330     private HttpAsyncRequestExecutionHandler<?> ensureNotNull(final HttpAsyncRequestExecutionHandler<?> handler) {
331         if (handler == null) {
332             throw new IllegalStateException("HTTP exchange handler is null");
333         }
334         return handler;
335     }
336 
337     private void shutdownConnection(final NHttpConnection conn) {
338         try {
339             conn.shutdown();
340         } catch (IOException ex) {
341             log(ex);
342         }
343     }
344 
345     private void closeHandler(final HttpAsyncRequestExecutionHandler<?> handler) {
346         if (handler != null) {
347             try {
348                 handler.close();
349             } catch (IOException ioex) {
350                 log(ioex);
351             }
352         }
353     }
354 
355     private void processResponse(
356             final NHttpClientConnection conn,
357             final State state,
358             final HttpAsyncRequestExecutionHandler<?> handler) throws IOException {
359         HttpContext context = handler.getContext();
360         if (state.isValid()) {
361             HttpRequest request = state.getRequest();
362             HttpResponse response = state.getResponse();
363             String method = request.getRequestLine().getMethod();
364             int status = response.getStatusLine().getStatusCode();
365             if (!(method.equalsIgnoreCase("CONNECT") && status < 300)) {
366                 ConnectionReuseStrategy connReuseStrategy = handler.getConnectionReuseStrategy();
367                 if (!connReuseStrategy.keepAlive(response, context)) {
368                     conn.close();
369                 }
370             }
371         } else {
372             conn.close();
373         }
374         handler.responseCompleted(context);
375         state.reset();
376         if (!handler.isDone()) {
377             conn.requestOutput();
378         }
379     }
380 
381     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
382 
383         String method = request.getRequestLine().getMethod();
384         int status = response.getStatusLine().getStatusCode();
385 
386         if (method.equalsIgnoreCase("HEAD")) {
387             return false;
388         }
389         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
390             return false;
391         }
392         return status >= HttpStatus.SC_OK
393             && status != HttpStatus.SC_NO_CONTENT
394             && status != HttpStatus.SC_NOT_MODIFIED
395             && status != HttpStatus.SC_RESET_CONTENT;
396     }
397 
398     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
399 
400     static class State {
401 
402         private volatile MessageState requestState;
403         private volatile MessageState responseState;
404         private volatile HttpRequest request;
405         private volatile HttpResponse response;
406         private volatile boolean valid;
407         private volatile int timeout;
408 
409         State() {
410             super();
411             this.valid = true;
412             this.requestState = MessageState.READY;
413             this.responseState = MessageState.READY;
414         }
415 
416         public MessageState getRequestState() {
417             return this.requestState;
418         }
419 
420         public void setRequestState(final MessageState state) {
421             this.requestState = state;
422         }
423 
424         public MessageState getResponseState() {
425             return this.responseState;
426         }
427 
428         public void setResponseState(final MessageState state) {
429             this.responseState = state;
430         }
431 
432         public HttpRequest getRequest() {
433             return this.request;
434         }
435 
436         public void setRequest(final HttpRequest request) {
437             this.request = request;
438         }
439 
440         public HttpResponse getResponse() {
441             return this.response;
442         }
443 
444         public void setResponse(final HttpResponse response) {
445             this.response = response;
446         }
447 
448         public int getTimeout() {
449             return this.timeout;
450         }
451 
452         public void setTimeout(int timeout) {
453             this.timeout = timeout;
454         }
455 
456         public void reset() {
457             this.responseState = MessageState.READY;
458             this.requestState = MessageState.READY;
459             this.response = null;
460             this.request = null;
461             this.timeout = 0;
462         }
463 
464         public boolean isValid() {
465             return this.valid;
466         }
467 
468         public void invalidate() {
469             this.valid = false;
470         }
471 
472         @Override
473         public String toString() {
474             StringBuilder buf = new StringBuilder();
475             buf.append("request state: ");
476             buf.append(this.requestState);
477             buf.append("; request: ");
478             if (this.request != null) {
479                 buf.append(this.request.getRequestLine());
480             }
481             buf.append("; response state: ");
482             buf.append(this.responseState);
483             buf.append("; response: ");
484             if (this.response != null) {
485                 buf.append(this.response.getStatusLine());
486             }
487             buf.append("; valid: ");
488             buf.append(this.valid);
489             buf.append(";");
490             return buf.toString();
491         }
492 
493     }
494 
495 }