View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.protocol;
28  
29  import java.io.IOException;
30  import java.util.concurrent.Future;
31  
32  import org.apache.http.ConnectionClosedException;
33  import org.apache.http.ConnectionReuseStrategy;
34  import org.apache.http.HttpHost;
35  import org.apache.http.annotation.Immutable;
36  import org.apache.http.concurrent.BasicFuture;
37  import org.apache.http.concurrent.FutureCallback;
38  import org.apache.http.impl.DefaultConnectionReuseStrategy;
39  import org.apache.http.nio.NHttpClientConnection;
40  import org.apache.http.params.HttpParams;
41  import org.apache.http.pool.ConnPool;
42  import org.apache.http.pool.PoolEntry;
43  import org.apache.http.protocol.BasicHttpContext;
44  import org.apache.http.protocol.HttpContext;
45  import org.apache.http.protocol.HttpProcessor;
46  import org.apache.http.util.Args;
47  
48  /**
49   * <tt>HttpAsyncRequester</tt> is a utility class that can be used
50   * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution
51   * of asynchronous HTTP requests.
52   *
53   * @see HttpAsyncRequestExecutor
54   *
55   * @since 4.2
56   */
57  @SuppressWarnings("deprecation")
58  @Immutable
59  public class HttpAsyncRequester {
60  
61      private final HttpProcessor httpprocessor;
62      private final ConnectionReuseStrategy connReuseStrategy;
63  
64      /**
65       * @deprecated (4.3) use {@link HttpAsyncRequester#HttpAsyncRequester(HttpProcessor,
66       *   ConnectionReuseStrategy)}
67       */
68      @Deprecated
69      public HttpAsyncRequester(
70              final HttpProcessor httpprocessor,
71              final ConnectionReuseStrategy reuseStrategy,
72              final HttpParams params) {
73          this(httpprocessor, reuseStrategy);
74      }
75  
76      /**
77       * Creates new instance of HttpAsyncRequester.
78       *
79       * @since 4.3
80       */
81      public HttpAsyncRequester(
82              final HttpProcessor httpprocessor,
83              final ConnectionReuseStrategy connReuseStrategy) {
84          super();
85          this.httpprocessor = Args.notNull(httpprocessor, "HTTP processor");
86          this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
87              DefaultConnectionReuseStrategy.INSTANCE;
88      }
89  
90      /**
91       * Creates new instance of HttpAsyncRequester.
92       *
93       * @since 4.3
94       */
95      public HttpAsyncRequester(final HttpProcessor httpprocessor) {
96          this(httpprocessor, null);
97      }
98  
99      /**
100      * Initiates asynchronous HTTP request execution.
101      *
102      * @param <T> the result type of request execution.
103      * @param requestProducer request producer callback.
104      * @param responseConsumer response consumer callaback.
105      * @param conn underlying HTTP connection.
106      * @param context HTTP context
107      * @param callback future callback.
108      * @return future representing pending completion of the operation.
109      */
110     public <T> Future<T> execute(
111             final HttpAsyncRequestProducer requestProducer,
112             final HttpAsyncResponseConsumer<T> responseConsumer,
113             final NHttpClientConnection conn,
114             final HttpContext context,
115             final FutureCallback<T> callback) {
116         Args.notNull(requestProducer, "HTTP request producer");
117         Args.notNull(responseConsumer, "HTTP response consumer");
118         Args.notNull(conn, "HTTP connection");
119         Args.notNull(context, "HTTP context");
120         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
121                 requestProducer, responseConsumer, callback, context, conn,
122                 this.httpprocessor, this.connReuseStrategy);
123         initExection(handler, conn);
124         return handler.getFuture();
125     }
126 
127     private void initExection(
128             final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
129         conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
130         if (!conn.isOpen()) {
131             handler.failed(new ConnectionClosedException("Connection closed"));
132             try {
133                 handler.close();
134             } catch (final IOException ex) {
135                 log(ex);
136             }
137         } else {
138             conn.requestOutput();
139         }
140     }
141 
142     /**
143      * Initiates asynchronous HTTP request execution.
144      *
145      * @param <T> the result type of request execution.
146      * @param requestProducer request producer callback.
147      * @param responseConsumer response consumer callaback.
148      * @param conn underlying HTTP connection.
149      * @param context HTTP context
150      * @return future representing pending completion of the operation.
151      */
152     public <T> Future<T> execute(
153             final HttpAsyncRequestProducer requestProducer,
154             final HttpAsyncResponseConsumer<T> responseConsumer,
155             final NHttpClientConnection conn,
156             final HttpContext context) {
157         return execute(requestProducer, responseConsumer, conn, context, null);
158     }
159 
160     /**
161      * Initiates asynchronous HTTP request execution.
162      *
163      * @param <T> the result type of request execution.
164      * @param requestProducer request producer callback.
165      * @param responseConsumer response consumer callaback.
166      * @param conn underlying HTTP connection.
167      * @return future representing pending completion of the operation.
168      */
169     public <T> Future<T> execute(
170             final HttpAsyncRequestProducer requestProducer,
171             final HttpAsyncResponseConsumer<T> responseConsumer,
172             final NHttpClientConnection conn) {
173         return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
174     }
175 
176     /**
177      * Initiates asynchronous HTTP request execution.
178      *
179      * @param <T> the result type of request execution.
180      * @param <E> the connection pool entry type.
181      * @param requestProducer request producer callback.
182      * @param responseConsumer response consumer callaback.
183      * @param connPool pool of persistent reusable connections.
184      * @param context HTTP context
185      * @param callback future callback.
186      * @return future representing pending completion of the operation.
187      */
188     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
189             final HttpAsyncRequestProducer requestProducer,
190             final HttpAsyncResponseConsumer<T> responseConsumer,
191             final ConnPool<HttpHost, E> connPool,
192             final HttpContext context,
193             final FutureCallback<T> callback) {
194         Args.notNull(requestProducer, "HTTP request producer");
195         Args.notNull(responseConsumer, "HTTP response consumer");
196         Args.notNull(connPool, "HTTP connection pool");
197         Args.notNull(context, "HTTP context");
198         final BasicFuture<T> future = new BasicFuture<T>(callback);
199         final HttpHost target = requestProducer.getTarget();
200         connPool.lease(target, null, new ConnRequestCallback<T, E>(
201                 future, requestProducer, responseConsumer, connPool, context));
202         return future;
203     }
204 
205     /**
206      * Initiates asynchronous HTTP request execution. This method automatically releases
207      * the given pool entry once request execution is completed (successfully or unsuccessfully).
208      *
209      * @param <T> the result type of request execution.
210      * @param <E> the connection pool entry type.
211      * @param requestProducer request producer callback.
212      * @param responseConsumer response consumer callaback.
213      * @param poolEntry leased pool entry. It will be automatically released
214      *   back to the pool when execution is completed.
215      * @param connPool pool of persistent reusable connections.
216      * @param context HTTP context
217      * @param callback future callback.
218      * @return future representing pending completion of the operation.
219      *
220      * @since 4.3
221      */
222     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
223             final HttpAsyncRequestProducer requestProducer,
224             final HttpAsyncResponseConsumer<T> responseConsumer,
225             final E poolEntry,
226             final ConnPool<HttpHost, E> connPool,
227             final HttpContext context,
228             final FutureCallback<T> callback) {
229         Args.notNull(requestProducer, "HTTP request producer");
230         Args.notNull(responseConsumer, "HTTP response consumer");
231         Args.notNull(connPool, "HTTP connection pool");
232         Args.notNull(poolEntry, "Pool entry");
233         Args.notNull(context, "HTTP context");
234         final BasicFuture<T> future = new BasicFuture<T>(callback);
235         final NHttpClientConnection conn = poolEntry.getConnection();
236         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
237                 requestProducer, responseConsumer,
238                 new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
239                 context, conn,
240                 this.httpprocessor, this.connReuseStrategy);
241         initExection(handler, conn);
242         return future;
243     }
244 
245     /**
246      * Initiates asynchronous HTTP request execution.
247      *
248      * @param <T> the result type of request execution.
249      * @param <E> the connection pool entry type.
250      * @param requestProducer request producer callback.
251      * @param responseConsumer response consumer callaback.
252      * @param connPool pool of persistent reusable connections.
253      * @param context HTTP context
254      * @return future representing pending completion of the operation.
255      */
256     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
257             final HttpAsyncRequestProducer requestProducer,
258             final HttpAsyncResponseConsumer<T> responseConsumer,
259             final ConnPool<HttpHost, E> connPool,
260             final HttpContext context) {
261         return execute(requestProducer, responseConsumer, connPool, context, null);
262     }
263 
264     /**
265      * Initiates asynchronous HTTP request execution.
266      *
267      * @param <T> the result type of request execution.
268      * @param <E> the connection pool entry type.
269      * @param requestProducer request producer callback.
270      * @param responseConsumer response consumer callaback.
271      * @param connPool pool of persistent reusable connections.
272      * @return future representing pending completion of the operation.
273      */
274     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
275             final HttpAsyncRequestProducer requestProducer,
276             final HttpAsyncResponseConsumer<T> responseConsumer,
277             final ConnPool<HttpHost, E> connPool) {
278         return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
279     }
280 
281     class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
282 
283         private final BasicFuture<T> requestFuture;
284         private final HttpAsyncRequestProducer requestProducer;
285         private final HttpAsyncResponseConsumer<T> responseConsumer;
286         private final ConnPool<HttpHost, E> connPool;
287         private final HttpContext context;
288 
289         ConnRequestCallback(
290                 final BasicFuture<T> requestFuture,
291                 final HttpAsyncRequestProducer requestProducer,
292                 final HttpAsyncResponseConsumer<T> responseConsumer,
293                 final ConnPool<HttpHost, E> connPool,
294                 final HttpContext context) {
295             super();
296             this.requestFuture = requestFuture;
297             this.requestProducer = requestProducer;
298             this.responseConsumer = responseConsumer;
299             this.connPool = connPool;
300             this.context = context;
301         }
302 
303         public void completed(final E result) {
304             if (this.requestFuture.isDone()) {
305                 this.connPool.release(result, true);
306                 return;
307             }
308             final NHttpClientConnection conn = result.getConnection();
309             final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
310                     this.requestProducer, this.responseConsumer,
311                     new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
312                     this.context, conn, httpprocessor, connReuseStrategy);
313             initExection(handler, conn);
314         }
315 
316         public void failed(final Exception ex) {
317             try {
318                 try {
319                     this.responseConsumer.failed(ex);
320                 } finally {
321                     releaseResources();
322                 }
323             } finally {
324                 this.requestFuture.failed(ex);
325             }
326         }
327 
328         public void cancelled() {
329             try {
330                 try {
331                     this.responseConsumer.cancel();
332                 } finally {
333                     releaseResources();
334                 }
335             } finally {
336                 this.requestFuture.cancel(true);
337             }
338         }
339 
340         public void releaseResources() {
341             try {
342                 this.requestProducer.close();
343             } catch (final IOException ioex) {
344                 log(ioex);
345             }
346             try {
347                 this.responseConsumer.close();
348             } catch (final IOException ioex) {
349                 log(ioex);
350             }
351         }
352 
353     }
354 
355     class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
356                                                implements FutureCallback<T> {
357 
358         private final BasicFuture<T> future;
359         private final E poolEntry;
360         private final ConnPool<HttpHost, E> connPool;
361 
362         RequestExecutionCallback(
363                 final BasicFuture<T> future,
364                 final E poolEntry,
365                 final ConnPool<HttpHost, E> connPool) {
366             super();
367             this.future = future;
368             this.poolEntry = poolEntry;
369             this.connPool = connPool;
370         }
371 
372         public void completed(final T result) {
373             try {
374                 this.connPool.release(this.poolEntry, true);
375             } finally {
376                 this.future.completed(result);
377             }
378         }
379 
380         public void failed(final Exception ex) {
381             try {
382                 this.connPool.release(this.poolEntry, false);
383             } finally {
384                 this.future.failed(ex);
385             }
386         }
387 
388         public void cancelled() {
389             try {
390                 this.connPool.release(this.poolEntry, false);
391             } finally {
392                 this.future.cancel(true);
393             }
394         }
395 
396     }
397 
398     /**
399      * This method can be used to log I/O exception thrown while closing
400      * {@link java.io.Closeable} objects (such as
401      * {@link org.apache.http.HttpConnection}}).
402      *
403      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
404      */
405     protected void log(final Exception ex) {
406     }
407 
408 }