View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.protocol;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.concurrent.Future;
32  
33  import org.apache.http.ConnectionClosedException;
34  import org.apache.http.ConnectionReuseStrategy;
35  import org.apache.http.HttpConnection;
36  import org.apache.http.HttpHost;
37  import org.apache.http.annotation.Immutable;
38  import org.apache.http.concurrent.BasicFuture;
39  import org.apache.http.concurrent.FutureCallback;
40  import org.apache.http.impl.DefaultConnectionReuseStrategy;
41  import org.apache.http.nio.NHttpClientConnection;
42  import org.apache.http.params.HttpParams;
43  import org.apache.http.pool.ConnPool;
44  import org.apache.http.pool.PoolEntry;
45  import org.apache.http.protocol.BasicHttpContext;
46  import org.apache.http.protocol.HttpContext;
47  import org.apache.http.protocol.HttpProcessor;
48  import org.apache.http.util.Args;
49  
50  /**
51   * <tt>HttpAsyncRequester</tt> is a utility class that can be used
52   * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution
53   * of asynchronous HTTP requests.
54   *
55   * @see HttpAsyncRequestExecutor
56   *
57   * @since 4.2
58   */
59  @SuppressWarnings("deprecation")
60  @Immutable
61  public class HttpAsyncRequester {
62  
63      private final HttpProcessor httppocessor;
64      private final ConnectionReuseStrategy connReuseStrategy;
65  
66      /**
67       * @deprecated (4.3) use {@link HttpAsyncRequester#HttpAsyncRequester(HttpProcessor,
68       *   ConnectionReuseStrategy)}
69       */
70      @Deprecated
71      public HttpAsyncRequester(
72              final HttpProcessor httppocessor,
73              final ConnectionReuseStrategy reuseStrategy,
74              final HttpParams params) {
75          this(httppocessor, reuseStrategy);
76      }
77  
78      /**
79       * Creates new instance of HttpAsyncRequester.
80       *
81       * @since 4.3
82       */
83      public HttpAsyncRequester(
84              final HttpProcessor httppocessor,
85              final ConnectionReuseStrategy connReuseStrategy) {
86          super();
87          this.httppocessor = Args.notNull(httppocessor, "HTTP processor");
88          this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
89              DefaultConnectionReuseStrategy.INSTANCE;
90      }
91  
92      /**
93       * Creates new instance of HttpAsyncRequester.
94       *
95       * @since 4.3
96       */
97      public HttpAsyncRequester(final HttpProcessor httppocessor) {
98          this(httppocessor, null);
99      }
100 
101     /**
102      * Initiates asynchronous HTTP request execution.
103      *
104      * @param <T> the result type of request execution.
105      * @param requestProducer request producer callback.
106      * @param responseConsumer response consumer callaback.
107      * @param conn underlying HTTP connection.
108      * @param context HTTP context
109      * @param callback future callback.
110      * @return future representing pending completion of the operation.
111      */
112     public <T> Future<T> execute(
113             final HttpAsyncRequestProducer requestProducer,
114             final HttpAsyncResponseConsumer<T> responseConsumer,
115             final NHttpClientConnection conn,
116             final HttpContext context,
117             final FutureCallback<T> callback) {
118         Args.notNull(requestProducer, "HTTP request producer");
119         Args.notNull(responseConsumer, "HTTP response consumer");
120         Args.notNull(conn, "HTTP connection");
121         Args.notNull(context, "HTTP context");
122         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
123                 requestProducer, responseConsumer, callback, context, conn,
124                 this.httppocessor, this.connReuseStrategy);
125         initExection(handler, conn);
126         return handler.getFuture();
127     }
128 
129     private void initExection(
130             final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
131         conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
132         if (!conn.isOpen()) {
133             handler.failed(new ConnectionClosedException("Connection closed"));
134             try {
135                 handler.close();
136             } catch (final IOException ex) {
137                 log(ex);
138             }
139         } else {
140             conn.requestOutput();
141         }
142     }
143 
144     /**
145      * Initiates asynchronous HTTP request execution.
146      *
147      * @param <T> the result type of request execution.
148      * @param requestProducer request producer callback.
149      * @param responseConsumer response consumer callaback.
150      * @param conn underlying HTTP connection.
151      * @param context HTTP context
152      * @return future representing pending completion of the operation.
153      */
154     public <T> Future<T> execute(
155             final HttpAsyncRequestProducer requestProducer,
156             final HttpAsyncResponseConsumer<T> responseConsumer,
157             final NHttpClientConnection conn,
158             final HttpContext context) {
159         return execute(requestProducer, responseConsumer, conn, context, null);
160     }
161 
162     /**
163      * Initiates asynchronous HTTP request execution.
164      *
165      * @param <T> the result type of request execution.
166      * @param requestProducer request producer callback.
167      * @param responseConsumer response consumer callaback.
168      * @param conn underlying HTTP connection.
169      * @return future representing pending completion of the operation.
170      */
171     public <T> Future<T> execute(
172             final HttpAsyncRequestProducer requestProducer,
173             final HttpAsyncResponseConsumer<T> responseConsumer,
174             final NHttpClientConnection conn) {
175         return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
176     }
177 
178     /**
179      * Initiates asynchronous HTTP request execution.
180      *
181      * @param <T> the result type of request execution.
182      * @param <E> the connection pool entry type.
183      * @param requestProducer request producer callback.
184      * @param responseConsumer response consumer callaback.
185      * @param connPool pool of persistent reusable connections.
186      * @param context HTTP context
187      * @param callback future callback.
188      * @return future representing pending completion of the operation.
189      */
190     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
191             final HttpAsyncRequestProducer requestProducer,
192             final HttpAsyncResponseConsumer<T> responseConsumer,
193             final ConnPool<HttpHost, E> connPool,
194             final HttpContext context,
195             final FutureCallback<T> callback) {
196         Args.notNull(requestProducer, "HTTP request producer");
197         Args.notNull(responseConsumer, "HTTP response consumer");
198         Args.notNull(connPool, "HTTP connection pool");
199         Args.notNull(context, "HTTP context");
200         final BasicFuture<T> future = new BasicFuture<T>(callback);
201         final HttpHost target = requestProducer.getTarget();
202         connPool.lease(target, null, new ConnRequestCallback<T, E>(
203                 future, requestProducer, responseConsumer, connPool, context));
204         return future;
205     }
206 
207     /**
208      * Initiates asynchronous HTTP request execution. This method automatically releases
209      * the given pool entry once request execution is completed (successfully or unsuccessfully).
210      *
211      * @param <T> the result type of request execution.
212      * @param <E> the connection pool entry type.
213      * @param requestProducer request producer callback.
214      * @param responseConsumer response consumer callaback.
215      * @param poolEntry leased pool entry. It will be automatically released
216      *   back to the pool when execution is completed.
217      * @param connPool pool of persistent reusable connections.
218      * @param context HTTP context
219      * @param callback future callback.
220      * @return future representing pending completion of the operation.
221      *
222      * @since 4.3
223      */
224     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
225             final HttpAsyncRequestProducer requestProducer,
226             final HttpAsyncResponseConsumer<T> responseConsumer,
227             final E poolEntry,
228             final ConnPool<HttpHost, E> connPool,
229             final HttpContext context,
230             final FutureCallback<T> callback) {
231         Args.notNull(requestProducer, "HTTP request producer");
232         Args.notNull(responseConsumer, "HTTP response consumer");
233         Args.notNull(connPool, "HTTP connection pool");
234         Args.notNull(poolEntry, "Pool entry");
235         Args.notNull(context, "HTTP context");
236         final BasicFuture<T> future = new BasicFuture<T>(callback);
237         final NHttpClientConnection conn = poolEntry.getConnection();
238         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
239                 requestProducer, responseConsumer,
240                 new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
241                 context, conn,
242                 this.httppocessor, this.connReuseStrategy);
243         initExection(handler, conn);
244         return future;
245     }
246 
247     /**
248      * Initiates asynchronous HTTP request execution.
249      *
250      * @param <T> the result type of request execution.
251      * @param <E> the connection pool entry type.
252      * @param requestProducer request producer callback.
253      * @param responseConsumer response consumer callaback.
254      * @param connPool pool of persistent reusable connections.
255      * @param context HTTP context
256      * @return future representing pending completion of the operation.
257      */
258     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
259             final HttpAsyncRequestProducer requestProducer,
260             final HttpAsyncResponseConsumer<T> responseConsumer,
261             final ConnPool<HttpHost, E> connPool,
262             final HttpContext context) {
263         return execute(requestProducer, responseConsumer, connPool, context, null);
264     }
265 
266     /**
267      * Initiates asynchronous HTTP request execution.
268      *
269      * @param <T> the result type of request execution.
270      * @param <E> the connection pool entry type.
271      * @param requestProducer request producer callback.
272      * @param responseConsumer response consumer callaback.
273      * @param connPool pool of persistent reusable connections.
274      * @return future representing pending completion of the operation.
275      */
276     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
277             final HttpAsyncRequestProducer requestProducer,
278             final HttpAsyncResponseConsumer<T> responseConsumer,
279             final ConnPool<HttpHost, E> connPool) {
280         return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
281     }
282 
283     class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
284 
285         private final BasicFuture<T> requestFuture;
286         private final HttpAsyncRequestProducer requestProducer;
287         private final HttpAsyncResponseConsumer<T> responseConsumer;
288         private final ConnPool<HttpHost, E> connPool;
289         private final HttpContext context;
290 
291         ConnRequestCallback(
292                 final BasicFuture<T> requestFuture,
293                 final HttpAsyncRequestProducer requestProducer,
294                 final HttpAsyncResponseConsumer<T> responseConsumer,
295                 final ConnPool<HttpHost, E> connPool,
296                 final HttpContext context) {
297             super();
298             this.requestFuture = requestFuture;
299             this.requestProducer = requestProducer;
300             this.responseConsumer = responseConsumer;
301             this.connPool = connPool;
302             this.context = context;
303         }
304 
305         public void completed(final E result) {
306             if (this.requestFuture.isDone()) {
307                 this.connPool.release(result, true);
308                 return;
309             }
310             final NHttpClientConnection conn = result.getConnection();
311             final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
312                     this.requestProducer, this.responseConsumer,
313                     new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
314                     this.context, conn, httppocessor, connReuseStrategy);
315             initExection(handler, conn);
316         }
317 
318         public void failed(final Exception ex) {
319             try {
320                 try {
321                     this.responseConsumer.failed(ex);
322                 } finally {
323                     releaseResources();
324                 }
325             } finally {
326                 this.requestFuture.failed(ex);
327             }
328         }
329 
330         public void cancelled() {
331             try {
332                 try {
333                     this.responseConsumer.cancel();
334                 } finally {
335                     releaseResources();
336                 }
337             } finally {
338                 this.requestFuture.cancel(true);
339             }
340         }
341 
342         public void releaseResources() {
343             try {
344                 this.requestProducer.close();
345             } catch (final IOException ioex) {
346                 log(ioex);
347             }
348             try {
349                 this.responseConsumer.close();
350             } catch (final IOException ioex) {
351                 log(ioex);
352             }
353         }
354 
355     }
356 
357     class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
358                                                implements FutureCallback<T> {
359 
360         private final BasicFuture<T> future;
361         private final E poolEntry;
362         private final ConnPool<HttpHost, E> connPool;
363 
364         RequestExecutionCallback(
365                 final BasicFuture<T> future,
366                 final E poolEntry,
367                 final ConnPool<HttpHost, E> connPool) {
368             super();
369             this.future = future;
370             this.poolEntry = poolEntry;
371             this.connPool = connPool;
372         }
373 
374         public void completed(final T result) {
375             try {
376                 this.connPool.release(this.poolEntry, true);
377             } finally {
378                 this.future.completed(result);
379             }
380         }
381 
382         public void failed(final Exception ex) {
383             try {
384                 this.connPool.release(this.poolEntry, false);
385             } finally {
386                 this.future.failed(ex);
387             }
388         }
389 
390         public void cancelled() {
391             try {
392                 this.connPool.release(this.poolEntry, false);
393             } finally {
394                 this.future.cancel(true);
395             }
396         }
397 
398     }
399 
400     /**
401      * This method can be used to log I/O exception thrown while closing {@link Closeable}
402      * objects (such as {@link HttpConnection}}).
403      *
404      * @param ex I/O exception thrown by {@link Closeable#close()}
405      */
406     protected void log(final Exception ex) {
407     }
408 
409 }