View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.protocol;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.concurrent.Future;
33  
34  import org.apache.http.ConnectionClosedException;
35  import org.apache.http.ConnectionReuseStrategy;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpHost;
38  import org.apache.http.annotation.Immutable;
39  import org.apache.http.concurrent.BasicFuture;
40  import org.apache.http.concurrent.FutureCallback;
41  import org.apache.http.impl.DefaultConnectionReuseStrategy;
42  import org.apache.http.nio.NHttpClientConnection;
43  import org.apache.http.params.HttpParams;
44  import org.apache.http.pool.ConnPool;
45  import org.apache.http.pool.PoolEntry;
46  import org.apache.http.protocol.BasicHttpContext;
47  import org.apache.http.protocol.HttpContext;
48  import org.apache.http.protocol.HttpProcessor;
49  import org.apache.http.util.Args;
50  
51  /**
52   * <tt>HttpAsyncRequester</tt> is a utility class that can be used
53   * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution
54   * of asynchronous HTTP requests.
55   *
56   * @see HttpAsyncRequestExecutor
57   *
58   * @since 4.2
59   */
60  @SuppressWarnings("deprecation")
61  @Immutable
62  public class HttpAsyncRequester {
63  
64      private final HttpProcessor httpprocessor;
65      private final ConnectionReuseStrategy connReuseStrategy;
66      private final ExceptionLogger exceptionLogger;
67  
68      /**
69       * @deprecated (4.3) use {@link HttpAsyncRequester#HttpAsyncRequester(HttpProcessor,
70       *   ConnectionReuseStrategy)}
71       */
72      @Deprecated
73      public HttpAsyncRequester(
74              final HttpProcessor httpprocessor,
75              final ConnectionReuseStrategy reuseStrategy,
76              final HttpParams params) {
77          this(httpprocessor, reuseStrategy);
78      }
79  
80      /**
81       * Creates new instance of <tt>HttpAsyncRequester<tt/>.
82       * @param httpProcessor HTTP protocol processor.
83       * @param connStrategy Connection re-use strategy. If <code>null</code>
84       *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
85       * @param exceptionLogger Exception logger. If <code>null</code>
86       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
87       *   logger will be only used to log I/O exception thrown while closing
88       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
89       *
90       * @since 4.4
91       */
92      public HttpAsyncRequester(
93              final HttpProcessor httpprocessor,
94              final ConnectionReuseStrategy connReuseStrategy,
95              final ExceptionLogger exceptionLogger) {
96          super();
97          this.httpprocessor = Args.notNull(httpprocessor, "HTTP processor");
98          this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
99                  DefaultConnectionReuseStrategy.INSTANCE;
100         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
101     }
102 
103     /**
104      * Creates new instance of HttpAsyncRequester.
105      *
106      * @since 4.3
107      */
108     public HttpAsyncRequester(
109             final HttpProcessor httpprocessor,
110             final ConnectionReuseStrategy connReuseStrategy) {
111         this(httpprocessor, connReuseStrategy, (ExceptionLogger) null);
112     }
113 
114     /**
115      * Creates new instance of HttpAsyncRequester.
116      *
117      * @since 4.3
118      */
119     public HttpAsyncRequester(final HttpProcessor httpprocessor) {
120         this(httpprocessor, null);
121     }
122 
123     /**
124      * Initiates asynchronous HTTP request execution.
125      *
126      * @param <T> the result type of request execution.
127      * @param requestProducer request producer.
128      * @param responseConsumer response consumer.
129      * @param conn underlying HTTP connection.
130      * @param context HTTP context
131      * @param callback future callback.
132      * @return future representing pending completion of the operation.
133      */
134     public <T> Future<T> execute(
135             final HttpAsyncRequestProducer requestProducer,
136             final HttpAsyncResponseConsumer<T> responseConsumer,
137             final NHttpClientConnection conn,
138             final HttpContext context,
139             final FutureCallback<T> callback) {
140         Args.notNull(requestProducer, "HTTP request producer");
141         Args.notNull(responseConsumer, "HTTP response consumer");
142         Args.notNull(conn, "HTTP connection");
143         Args.notNull(context, "HTTP context");
144         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
145                 requestProducer, responseConsumer, callback, context, conn,
146                 this.httpprocessor, this.connReuseStrategy);
147         initExection(handler, conn);
148         return handler.getFuture();
149     }
150 
151     private void initExection(
152             final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
153         conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
154         if (!conn.isOpen()) {
155             handler.failed(new ConnectionClosedException("Connection closed"));
156             try {
157                 handler.close();
158             } catch (final IOException ex) {
159                 log(ex);
160             }
161         } else {
162             conn.requestOutput();
163         }
164     }
165 
166     /**
167      * Initiates asynchronous HTTP request execution.
168      *
169      * @param <T> the result type of request execution.
170      * @param requestProducer request producer.
171      * @param responseConsumer response consumer.
172      * @param conn underlying HTTP connection.
173      * @param context HTTP context
174      * @return future representing pending completion of the operation.
175      */
176     public <T> Future<T> execute(
177             final HttpAsyncRequestProducer requestProducer,
178             final HttpAsyncResponseConsumer<T> responseConsumer,
179             final NHttpClientConnection conn,
180             final HttpContext context) {
181         return execute(requestProducer, responseConsumer, conn, context, null);
182     }
183 
184     /**
185      * Initiates asynchronous HTTP request execution.
186      *
187      * @param <T> the result type of request execution.
188      * @param requestProducer request producer.
189      * @param responseConsumer response consumer.
190      * @param conn underlying HTTP connection.
191      * @return future representing pending completion of the operation.
192      */
193     public <T> Future<T> execute(
194             final HttpAsyncRequestProducer requestProducer,
195             final HttpAsyncResponseConsumer<T> responseConsumer,
196             final NHttpClientConnection conn) {
197         return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
198     }
199 
200     /**
201      * Initiates asynchronous HTTP request execution.
202      *
203      * @param <T> the result type of request execution.
204      * @param <E> the connection pool entry type.
205      * @param requestProducer request producer.
206      * @param responseConsumer response consumer.
207      * @param connPool pool of persistent reusable connections.
208      * @param context HTTP context
209      * @param callback future callback.
210      * @return future representing pending completion of the operation.
211      */
212     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
213             final HttpAsyncRequestProducer requestProducer,
214             final HttpAsyncResponseConsumer<T> responseConsumer,
215             final ConnPool<HttpHost, E> connPool,
216             final HttpContext context,
217             final FutureCallback<T> callback) {
218         Args.notNull(requestProducer, "HTTP request producer");
219         Args.notNull(responseConsumer, "HTTP response consumer");
220         Args.notNull(connPool, "HTTP connection pool");
221         Args.notNull(context, "HTTP context");
222         final BasicFuture<T> future = new BasicFuture<T>(callback);
223         final HttpHost target = requestProducer.getTarget();
224         connPool.lease(target, null, new ConnRequestCallback<T, E>(
225                 future, requestProducer, responseConsumer, connPool, context));
226         return future;
227     }
228 
229     /**
230      * Initiates asynchronous pipelined HTTP request execution.
231      *
232      * @param <T> the result type of request execution.
233      * @param <E> the connection pool entry type.
234      * @param target target host.
235      * @param requestProducers list of request producers.
236      * @param responseConsumers list of response consumers.
237      * @param connPool pool of persistent reusable connections.
238      * @param context HTTP context
239      * @param callback future callback.
240      * @return future representing pending completion of the operation.
241      *
242      * @since 4.4
243      */
244     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
245             final HttpHost target,
246             final List<? extends HttpAsyncRequestProducer> requestProducers,
247             final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
248             final ConnPool<HttpHost, E> connPool,
249             final HttpContext context,
250             final FutureCallback<List<T>> callback) {
251         Args.notNull(target, "HTTP target");
252         Args.notEmpty(requestProducers, "Request producer list");
253         Args.notEmpty(responseConsumers, "Response consumer list");
254         Args.notNull(connPool, "HTTP connection pool");
255         Args.notNull(context, "HTTP context");
256         final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
257         connPool.lease(target, null, new ConnPipelinedRequestCallback<T, E>(
258                 future, requestProducers, responseConsumers, connPool, context));
259         return future;
260     }
261 
262     /**
263      * Initiates asynchronous HTTP request execution. This method automatically releases
264      * the given pool entry once request execution is completed (successfully or unsuccessfully).
265      *
266      * @param <T> the result type of request execution.
267      * @param <E> the connection pool entry type.
268      * @param requestProducer request producer.
269      * @param responseConsumer response consumer.
270      * @param poolEntry leased pool entry. It will be automatically released
271      *   back to the pool when execution is completed.
272      * @param connPool pool of persistent reusable connections.
273      * @param context HTTP context
274      * @param callback future callback.
275      * @return future representing pending completion of the operation.
276      *
277      * @since 4.3
278      */
279     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
280             final HttpAsyncRequestProducer requestProducer,
281             final HttpAsyncResponseConsumer<T> responseConsumer,
282             final E poolEntry,
283             final ConnPool<HttpHost, E> connPool,
284             final HttpContext context,
285             final FutureCallback<T> callback) {
286         Args.notNull(requestProducer, "HTTP request producer");
287         Args.notNull(responseConsumer, "HTTP response consumer");
288         Args.notNull(connPool, "HTTP connection pool");
289         Args.notNull(poolEntry, "Pool entry");
290         Args.notNull(context, "HTTP context");
291         final BasicFuture<T> future = new BasicFuture<T>(callback);
292         final NHttpClientConnection conn = poolEntry.getConnection();
293         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
294                 requestProducer, responseConsumer,
295                 new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
296                 context, conn,
297                 this.httpprocessor, this.connReuseStrategy);
298         initExection(handler, conn);
299         return future;
300     }
301 
302     /**
303      * Initiates asynchronous pipelined HTTP request execution. This method automatically releases
304      * the given pool entry once request execution is completed (successfully or unsuccessfully).
305      *
306      * @param <T> the result type of request execution.
307      * @param <E> the connection pool entry type.
308      * @param requestProducers list of request producers.
309      * @param responseConsumer list of response consumers.
310      * @param poolEntry leased pool entry. It will be automatically released
311      *   back to the pool when execution is completed.
312      * @param connPool pool of persistent reusable connections.
313      * @param context HTTP context
314      * @param callback future callback.
315      * @return future representing pending completion of the operation.
316      *
317      * @since 4.4
318      */
319     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
320             final List<HttpAsyncRequestProducer> requestProducers,
321             final List<HttpAsyncResponseConsumer<T>> responseConsumers,
322             final E poolEntry,
323             final ConnPool<HttpHost, E> connPool,
324             final HttpContext context,
325             final FutureCallback<List<T>> callback) {
326         Args.notEmpty(requestProducers, "Request producer list");
327         Args.notEmpty(responseConsumers, "Response consumer list");
328         Args.notNull(connPool, "HTTP connection pool");
329         Args.notNull(poolEntry, "Pool entry");
330         Args.notNull(context, "HTTP context");
331         final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
332         final NHttpClientConnection conn = poolEntry.getConnection();
333         final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
334                 requestProducers, responseConsumers,
335                 new RequestExecutionCallback<List<T>, E>(future, poolEntry, connPool),
336                 context, conn,
337                 this.httpprocessor, this.connReuseStrategy);
338         initExection(handler, conn);
339         return future;
340     }
341 
342     /**
343      * Initiates asynchronous HTTP request execution.
344      *
345      * @param <T> the result type of request execution.
346      * @param <E> the connection pool entry type.
347      * @param requestProducer request producer.
348      * @param responseConsumer response consumer.
349      * @param connPool pool of persistent reusable connections.
350      * @param context HTTP context
351      * @return future representing pending completion of the operation.
352      */
353     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
354             final HttpAsyncRequestProducer requestProducer,
355             final HttpAsyncResponseConsumer<T> responseConsumer,
356             final ConnPool<HttpHost, E> connPool,
357             final HttpContext context) {
358         return execute(requestProducer, responseConsumer, connPool, context, null);
359     }
360 
361     /**
362      * Initiates asynchronous HTTP request execution.
363      *
364      * @param <T> the result type of request execution.
365      * @param <E> the connection pool entry type.
366      * @param requestProducer request producer.
367      * @param responseConsumer response consumer.
368      * @param connPool pool of persistent reusable connections.
369      * @return future representing pending completion of the operation.
370      */
371     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
372             final HttpAsyncRequestProducer requestProducer,
373             final HttpAsyncResponseConsumer<T> responseConsumer,
374             final ConnPool<HttpHost, E> connPool) {
375         return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
376     }
377 
378     class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
379 
380         private final BasicFuture<T> requestFuture;
381         private final HttpAsyncRequestProducer requestProducer;
382         private final HttpAsyncResponseConsumer<T> responseConsumer;
383         private final ConnPool<HttpHost, E> connPool;
384         private final HttpContext context;
385 
386         ConnRequestCallback(
387                 final BasicFuture<T> requestFuture,
388                 final HttpAsyncRequestProducer requestProducer,
389                 final HttpAsyncResponseConsumer<T> responseConsumer,
390                 final ConnPool<HttpHost, E> connPool,
391                 final HttpContext context) {
392             super();
393             this.requestFuture = requestFuture;
394             this.requestProducer = requestProducer;
395             this.responseConsumer = responseConsumer;
396             this.connPool = connPool;
397             this.context = context;
398         }
399 
400         @Override
401         public void completed(final E result) {
402             if (this.requestFuture.isDone()) {
403                 this.connPool.release(result, true);
404                 return;
405             }
406             final NHttpClientConnection conn = result.getConnection();
407             final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
408                     this.requestProducer, this.responseConsumer,
409                     new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
410                     this.context, conn, httpprocessor, connReuseStrategy);
411             initExection(handler, conn);
412         }
413 
414         @Override
415         public void failed(final Exception ex) {
416             try {
417                 try {
418                     this.responseConsumer.failed(ex);
419                 } finally {
420                     releaseResources();
421                 }
422             } finally {
423                 this.requestFuture.failed(ex);
424             }
425         }
426 
427         @Override
428         public void cancelled() {
429             try {
430                 try {
431                     this.responseConsumer.cancel();
432                 } finally {
433                     releaseResources();
434                 }
435             } finally {
436                 this.requestFuture.cancel(true);
437             }
438         }
439 
440         public void releaseResources() {
441             close(requestProducer);
442             close(responseConsumer);
443         }
444 
445     }
446 
447     class ConnPipelinedRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
448 
449         private final BasicFuture<List<T>> requestFuture;
450         private final List<? extends HttpAsyncRequestProducer> requestProducers;
451         private final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers;
452         private final ConnPool<HttpHost, E> connPool;
453         private final HttpContext context;
454 
455         ConnPipelinedRequestCallback(
456                 final BasicFuture<List<T>> requestFuture,
457                 final List<? extends HttpAsyncRequestProducer> requestProducers,
458                 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
459                 final ConnPool<HttpHost, E> connPool,
460                 final HttpContext context) {
461             super();
462             this.requestFuture = requestFuture;
463             this.requestProducers = requestProducers;
464             this.responseConsumers = responseConsumers;
465             this.connPool = connPool;
466             this.context = context;
467         }
468 
469         @Override
470         public void completed(final E result) {
471             if (this.requestFuture.isDone()) {
472                 this.connPool.release(result, true);
473                 return;
474             }
475             final NHttpClientConnection conn = result.getConnection();
476             final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
477                     this.requestProducers, this.responseConsumers,
478                     new RequestExecutionCallback<List<T>, E>(this.requestFuture, result, this.connPool),
479                     this.context, conn, httpprocessor, connReuseStrategy);
480             initExection(handler, conn);
481         }
482 
483         @Override
484         public void failed(final Exception ex) {
485             try {
486                 try {
487                     for (HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
488                         responseConsumer.failed(ex);
489                     }
490                 } finally {
491                     releaseResources();
492                 }
493             } finally {
494                 this.requestFuture.failed(ex);
495             }
496         }
497 
498         @Override
499         public void cancelled() {
500             try {
501                 try {
502                     for (HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
503                         responseConsumer.cancel();
504                     }
505                 } finally {
506                     releaseResources();
507                 }
508             } finally {
509                 this.requestFuture.cancel(true);
510             }
511         }
512 
513         public void releaseResources() {
514             for (HttpAsyncRequestProducer requestProducer: this.requestProducers) {
515                 close(requestProducer);
516             }
517             for (HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
518                 close(responseConsumer);
519             }
520         }
521 
522     }
523 
524     class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
525                                                implements FutureCallback<T> {
526 
527         private final BasicFuture<T> future;
528         private final E poolEntry;
529         private final ConnPool<HttpHost, E> connPool;
530 
531         RequestExecutionCallback(
532                 final BasicFuture<T> future,
533                 final E poolEntry,
534                 final ConnPool<HttpHost, E> connPool) {
535             super();
536             this.future = future;
537             this.poolEntry = poolEntry;
538             this.connPool = connPool;
539         }
540 
541         @Override
542         public void completed(final T result) {
543             try {
544                 this.connPool.release(this.poolEntry, true);
545             } finally {
546                 this.future.completed(result);
547             }
548         }
549 
550         @Override
551         public void failed(final Exception ex) {
552             try {
553                 this.connPool.release(this.poolEntry, false);
554             } finally {
555                 this.future.failed(ex);
556             }
557         }
558 
559         @Override
560         public void cancelled() {
561             try {
562                 this.connPool.release(this.poolEntry, false);
563             } finally {
564                 this.future.cancel(true);
565             }
566         }
567 
568     }
569 
570     /**
571      * This method can be used to log I/O exception thrown while closing
572      * {@link java.io.Closeable} objects (such as
573      * {@link org.apache.http.HttpConnection}}).
574      *
575      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
576      */
577     protected void log(final Exception ex) {
578         this.exceptionLogger.log(ex);
579     }
580 
581     private void close(final Closeable closeable) {
582         try {
583             closeable.close();
584         } catch (IOException ex) {
585             log(ex);
586         }
587     }
588 
589 }