View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.protocol;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.concurrent.Future;
32  
33  import org.apache.http.ConnectionClosedException;
34  import org.apache.http.ConnectionReuseStrategy;
35  import org.apache.http.HttpConnection;
36  import org.apache.http.HttpHost;
37  import org.apache.http.annotation.Immutable;
38  import org.apache.http.concurrent.BasicFuture;
39  import org.apache.http.concurrent.FutureCallback;
40  import org.apache.http.nio.NHttpClientConnection;
41  import org.apache.http.params.HttpParams;
42  import org.apache.http.pool.ConnPool;
43  import org.apache.http.pool.PoolEntry;
44  import org.apache.http.protocol.BasicHttpContext;
45  import org.apache.http.protocol.HttpContext;
46  import org.apache.http.protocol.HttpProcessor;
47  
48  /**
49   * <tt>HttpAsyncRequester</tt> is a utility class that can be used
50   * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution
51   * of asynchronous HTTP requests.
52   *
53   * @see HttpAsyncRequestExecutor
54   *
55   * @since 4.2
56   */
57  @Immutable
58  public class HttpAsyncRequester {
59  
60      private final HttpProcessor httppocessor;
61      private final ConnectionReuseStrategy reuseStrategy;
62      private final HttpParams params;
63  
64      public HttpAsyncRequester(
65              final HttpProcessor httppocessor,
66              final ConnectionReuseStrategy reuseStrategy,
67              final HttpParams params) {
68          super();
69          this.httppocessor = httppocessor;
70          this.reuseStrategy = reuseStrategy;
71          this.params = params;
72      }
73  
74      /**
75       * Initiates asynchronous HTTP request execution.
76       *
77       * @param <T> the result type of request execution.
78       * @param requestProducer request producer callback.
79       * @param responseConsumer response consumer callaback.
80       * @param conn underlying HTTP connection.
81       * @param context HTTP context
82       * @param callback future callback.
83       * @return future representing pending completion of the operation.
84       */
85      public <T> Future<T> execute(
86              final HttpAsyncRequestProducer requestProducer,
87              final HttpAsyncResponseConsumer<T> responseConsumer,
88              final NHttpClientConnection conn,
89              final HttpContext context,
90              final FutureCallback<T> callback) {
91          if (requestProducer == null) {
92              throw new IllegalArgumentException("HTTP request producer may not be null");
93          }
94          if (responseConsumer == null) {
95              throw new IllegalArgumentException("HTTP response consumer may not be null");
96          }
97          if (conn == null) {
98              throw new IllegalArgumentException("HTTP connection may not be null");
99          }
100         if (context == null) {
101             throw new IllegalArgumentException("HTTP context may not be null");
102         }
103         BasicAsyncRequestExecutionHandler<T> handler = new BasicAsyncRequestExecutionHandler<T>(
104                 requestProducer, responseConsumer, callback, context,
105                 this.httppocessor, this.reuseStrategy, this.params);
106         doExecute(handler, conn);
107         return handler.getFuture();
108     }
109 
110     private <T> void doExecute(
111             final HttpAsyncRequestExecutionHandler<T> handler, final NHttpClientConnection conn) {
112         conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
113         if (!conn.isOpen()) {
114             handler.failed(new ConnectionClosedException("Connection closed"));
115             try {
116                 handler.close();
117             } catch (IOException ex) {
118                 log(ex);
119             }
120         } else {
121             conn.requestOutput();
122         }
123     }
124 
125     /**
126      * Initiates asynchronous HTTP request execution.
127      *
128      * @param <T> the result type of request execution.
129      * @param requestProducer request producer callback.
130      * @param responseConsumer response consumer callaback.
131      * @param conn underlying HTTP connection.
132      * @param context HTTP context
133      * @return future representing pending completion of the operation.
134      */
135     public <T> Future<T> execute(
136             final HttpAsyncRequestProducer requestProducer,
137             final HttpAsyncResponseConsumer<T> responseConsumer,
138             final NHttpClientConnection conn,
139             final HttpContext context) {
140         return execute(requestProducer, responseConsumer, conn, context, null);
141     }
142 
143     /**
144      * Initiates asynchronous HTTP request execution.
145      *
146      * @param <T> the result type of request execution.
147      * @param requestProducer request producer callback.
148      * @param responseConsumer response consumer callaback.
149      * @param conn underlying HTTP connection.
150      * @return future representing pending completion of the operation.
151      */
152     public <T> Future<T> execute(
153             final HttpAsyncRequestProducer requestProducer,
154             final HttpAsyncResponseConsumer<T> responseConsumer,
155             final NHttpClientConnection conn) {
156         return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
157     }
158 
159     /**
160      * Initiates asynchronous HTTP request execution.
161      *
162      * @param <T> the result type of request execution.
163      * @param <E> the connection pool entry type.
164      * @param requestProducer request producer callback.
165      * @param responseConsumer response consumer callaback.
166      * @param connPool pool of persistent reusable connections.
167      * @param context HTTP context
168      * @param callback future callback.
169      * @return future representing pending completion of the operation.
170      */
171     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
172             final HttpAsyncRequestProducer requestProducer,
173             final HttpAsyncResponseConsumer<T> responseConsumer,
174             final ConnPool<HttpHost, E> connPool,
175             final HttpContext context,
176             final FutureCallback<T> callback) {
177         if (requestProducer == null) {
178             throw new IllegalArgumentException("HTTP request producer may not be null");
179         }
180         if (responseConsumer == null) {
181             throw new IllegalArgumentException("HTTP response consumer may not be null");
182         }
183         if (connPool == null) {
184             throw new IllegalArgumentException("HTTP connection pool may not be null");
185         }
186         if (context == null) {
187             throw new IllegalArgumentException("HTTP context may not be null");
188         }
189         BasicFuture<T> future = new BasicFuture<T>(callback);
190         HttpHost target = requestProducer.getTarget();
191         connPool.lease(target, null, new ConnRequestCallback<T, E>(
192                 future, requestProducer, responseConsumer, connPool, context));
193         return future;
194     }
195 
196     /**
197      * Initiates asynchronous HTTP request execution.
198      *
199      * @param <T> the result type of request execution.
200      * @param <E> the connection pool entry type.
201      * @param requestProducer request producer callback.
202      * @param responseConsumer response consumer callaback.
203      * @param connPool pool of persistent reusable connections.
204      * @param context HTTP context
205      * @return future representing pending completion of the operation.
206      */
207     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
208             final HttpAsyncRequestProducer requestProducer,
209             final HttpAsyncResponseConsumer<T> responseConsumer,
210             final ConnPool<HttpHost, E> connPool,
211             final HttpContext context) {
212         return execute(requestProducer, responseConsumer, connPool, context, null);
213     }
214 
215     /**
216      * Initiates asynchronous HTTP request execution.
217      *
218      * @param <T> the result type of request execution.
219      * @param <E> the connection pool entry type.
220      * @param requestProducer request producer callback.
221      * @param responseConsumer response consumer callaback.
222      * @param connPool pool of persistent reusable connections.
223      * @return future representing pending completion of the operation.
224      */
225     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
226             final HttpAsyncRequestProducer requestProducer,
227             final HttpAsyncResponseConsumer<T> responseConsumer,
228             final ConnPool<HttpHost, E> connPool) {
229         return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
230     }
231 
232     class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
233 
234         private final BasicFuture<T> requestFuture;
235         private final HttpAsyncRequestProducer requestProducer;
236         private final HttpAsyncResponseConsumer<T> responseConsumer;
237         private final ConnPool<HttpHost, E> connPool;
238         private final HttpContext context;
239 
240         ConnRequestCallback(
241                 final BasicFuture<T> requestFuture,
242                 final HttpAsyncRequestProducer requestProducer,
243                 final HttpAsyncResponseConsumer<T> responseConsumer,
244                 final ConnPool<HttpHost, E> connPool,
245                 final HttpContext context) {
246             super();
247             this.requestFuture = requestFuture;
248             this.requestProducer = requestProducer;
249             this.responseConsumer = responseConsumer;
250             this.connPool = connPool;
251             this.context = context;
252         }
253 
254         public void completed(final E result) {
255             if (this.requestFuture.isDone()) {
256                 this.connPool.release(result, true);
257                 return;
258             }
259             NHttpClientConnection conn = result.getConnection();
260             BasicAsyncRequestExecutionHandler<T> handler = new BasicAsyncRequestExecutionHandler<T>(
261                     this.requestProducer, this.responseConsumer,
262                     new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
263                     this.context, httppocessor, reuseStrategy, params);
264             doExecute(handler, conn);
265         }
266 
267         public void failed(final Exception ex) {
268             try {
269                 try {
270                     this.responseConsumer.failed(ex);
271                 } finally {
272                     releaseResources();
273                 }
274             } finally {
275                 this.requestFuture.failed(ex);
276             }
277         }
278 
279         public void cancelled() {
280             try {
281                 try {
282                     this.responseConsumer.cancel();
283                 } finally {
284                     releaseResources();
285                 }
286             } finally {
287                 this.requestFuture.cancel(true);
288             }
289         }
290 
291         public void releaseResources() {
292             try {
293                 this.requestProducer.close();
294             } catch (IOException ioex) {
295                 log(ioex);
296             }
297             try {
298                 this.responseConsumer.close();
299             } catch (IOException ioex) {
300                 log(ioex);
301             }
302         }
303 
304     }
305 
306     class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
307                                                implements FutureCallback<T> {
308 
309         private final BasicFuture<T> future;
310         private final E poolEntry;
311         private final ConnPool<HttpHost, E> connPool;
312 
313         RequestExecutionCallback(
314                 final BasicFuture<T> future,
315                 final E poolEntry,
316                 final ConnPool<HttpHost, E> connPool) {
317             super();
318             this.future = future;
319             this.poolEntry = poolEntry;
320             this.connPool = connPool;
321         }
322 
323         public void completed(final T result) {
324             try {
325                 this.connPool.release(this.poolEntry, true);
326             } finally {
327                 this.future.completed(result);
328             }
329         }
330 
331         public void failed(final Exception ex) {
332             try {
333                 this.connPool.release(this.poolEntry, false);
334             } finally {
335                 this.future.failed(ex);
336             }
337         }
338 
339         public void cancelled() {
340             try {
341                 this.connPool.release(this.poolEntry, false);
342             } finally {
343                 this.future.cancel(true);
344             }
345         }
346 
347     }
348 
349     /**
350      * This method can be used to log I/O exception thrown while closing {@link Closeable}
351      * objects (such as {@link HttpConnection}}).
352      *
353      * @param ex I/O exception thrown by {@link Closeable#close()}
354      */
355     protected void log(Exception ex) {
356     }
357 
358 }