View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.protocol;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.List;
32  import java.util.concurrent.Future;
33  
34  import org.apache.http.ConnectionClosedException;
35  import org.apache.http.ConnectionReuseStrategy;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpHost;
38  import org.apache.http.annotation.Immutable;
39  import org.apache.http.concurrent.BasicFuture;
40  import org.apache.http.concurrent.FutureCallback;
41  import org.apache.http.impl.DefaultConnectionReuseStrategy;
42  import org.apache.http.nio.NHttpClientConnection;
43  import org.apache.http.params.HttpParams;
44  import org.apache.http.pool.ConnPool;
45  import org.apache.http.pool.PoolEntry;
46  import org.apache.http.protocol.BasicHttpContext;
47  import org.apache.http.protocol.HttpContext;
48  import org.apache.http.protocol.HttpProcessor;
49  import org.apache.http.util.Args;
50  
51  /**
52   * {@code HttpAsyncRequester} is a utility class that can be used
53   * in conjunction with {@link HttpAsyncRequestExecutor} to initiate execution
54   * of asynchronous HTTP requests.
55   *
56   * @see HttpAsyncRequestExecutor
57   *
58   * @since 4.2
59   */
60  @SuppressWarnings("deprecation")
61  @Immutable
62  public class HttpAsyncRequester {
63  
64      private final HttpProcessor httpprocessor;
65      private final ConnectionReuseStrategy connReuseStrategy;
66      private final ExceptionLogger exceptionLogger;
67  
68      /**
69       * @deprecated (4.3) use {@link HttpAsyncRequester#HttpAsyncRequester(HttpProcessor,
70       *   ConnectionReuseStrategy)}
71       */
72      @Deprecated
73      public HttpAsyncRequester(
74              final HttpProcessor httpprocessor,
75              final ConnectionReuseStrategy reuseStrategy,
76              final HttpParams params) {
77          this(httpprocessor, reuseStrategy);
78      }
79  
80      /**
81       * Creates new instance of {@code HttpAsyncRequester}.
82       * @param httpprocessor HTTP protocol processor.
83       * @param connReuseStrategy Connection re-use strategy. If {@code null}
84       *   {@link DefaultConnectionReuseStrategy#INSTANCE} will be used.
85       * @param exceptionLogger Exception logger. If {@code null}
86       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
87       *   logger will be only used to log I/O exception thrown while closing
88       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
89       *
90       * @since 4.4
91       */
92      public HttpAsyncRequester(
93              final HttpProcessor httpprocessor,
94              final ConnectionReuseStrategy connReuseStrategy,
95              final ExceptionLogger exceptionLogger) {
96          super();
97          this.httpprocessor = Args.notNull(httpprocessor, "HTTP processor");
98          this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
99                  DefaultConnectionReuseStrategy.INSTANCE;
100         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
101     }
102 
103     /**
104      * Creates new instance of HttpAsyncRequester.
105      *
106      * @since 4.3
107      */
108     public HttpAsyncRequester(
109             final HttpProcessor httpprocessor,
110             final ConnectionReuseStrategy connReuseStrategy) {
111         this(httpprocessor, connReuseStrategy, (ExceptionLogger) null);
112     }
113 
114     /**
115      * Creates new instance of HttpAsyncRequester.
116      *
117      * @since 4.3
118      */
119     public HttpAsyncRequester(final HttpProcessor httpprocessor) {
120         this(httpprocessor, null);
121     }
122 
123     /**
124      * Initiates asynchronous HTTP request execution.
125      *
126      * @param <T> the result type of request execution.
127      * @param requestProducer request producer.
128      * @param responseConsumer response consumer.
129      * @param conn underlying HTTP connection.
130      * @param context HTTP context
131      * @param callback future callback.
132      * @return future representing pending completion of the operation.
133      */
134     public <T> Future<T> execute(
135             final HttpAsyncRequestProducer requestProducer,
136             final HttpAsyncResponseConsumer<T> responseConsumer,
137             final NHttpClientConnection conn,
138             final HttpContext context,
139             final FutureCallback<T> callback) {
140         Args.notNull(requestProducer, "HTTP request producer");
141         Args.notNull(responseConsumer, "HTTP response consumer");
142         Args.notNull(conn, "HTTP connection");
143         Args.notNull(context, "HTTP context");
144         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
145                 requestProducer, responseConsumer, callback, context, conn,
146                 this.httpprocessor, this.connReuseStrategy);
147         initExecution(handler, conn);
148         return handler.getFuture();
149     }
150 
151     private void initExecution(
152             final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
153 
154         final HttpContext context = conn.getContext();
155         synchronized (context) {
156             context.setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
157             if (!conn.isOpen()) {
158                 handler.failed(new ConnectionClosedException("Connection closed"));
159             } else {
160                 conn.requestOutput();
161             }
162         }
163         if (handler.isDone()) {
164             try {
165                 handler.close();
166             } catch (final IOException ex) {
167                 log(ex);
168             }
169         }
170     }
171 
172     /**
173      * Initiates asynchronous HTTP request execution.
174      *
175      * @param <T> the result type of request execution.
176      * @param requestProducer request producer.
177      * @param responseConsumer response consumer.
178      * @param conn underlying HTTP connection.
179      * @param context HTTP context
180      * @return future representing pending completion of the operation.
181      */
182     public <T> Future<T> execute(
183             final HttpAsyncRequestProducer requestProducer,
184             final HttpAsyncResponseConsumer<T> responseConsumer,
185             final NHttpClientConnection conn,
186             final HttpContext context) {
187         return execute(requestProducer, responseConsumer, conn, context, null);
188     }
189 
190     /**
191      * Initiates asynchronous HTTP request execution.
192      *
193      * @param <T> the result type of request execution.
194      * @param requestProducer request producer.
195      * @param responseConsumer response consumer.
196      * @param conn underlying HTTP connection.
197      * @return future representing pending completion of the operation.
198      */
199     public <T> Future<T> execute(
200             final HttpAsyncRequestProducer requestProducer,
201             final HttpAsyncResponseConsumer<T> responseConsumer,
202             final NHttpClientConnection conn) {
203         return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
204     }
205 
206     /**
207      * Initiates asynchronous HTTP request execution.
208      *
209      * @param <T> the result type of request execution.
210      * @param <E> the connection pool entry type.
211      * @param requestProducer request producer.
212      * @param responseConsumer response consumer.
213      * @param connPool pool of persistent reusable connections.
214      * @param context HTTP context
215      * @param callback future callback.
216      * @return future representing pending completion of the operation.
217      */
218     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
219             final HttpAsyncRequestProducer requestProducer,
220             final HttpAsyncResponseConsumer<T> responseConsumer,
221             final ConnPool<HttpHost, E> connPool,
222             final HttpContext context,
223             final FutureCallback<T> callback) {
224         Args.notNull(requestProducer, "HTTP request producer");
225         Args.notNull(responseConsumer, "HTTP response consumer");
226         Args.notNull(connPool, "HTTP connection pool");
227         Args.notNull(context, "HTTP context");
228         final BasicFuture<T> future = new BasicFuture<T>(callback);
229         final HttpHost target = requestProducer.getTarget();
230         connPool.lease(target, null, new ConnRequestCallback<T, E>(
231                 future, requestProducer, responseConsumer, connPool, context));
232         return future;
233     }
234 
235     /**
236      * Initiates asynchronous pipelined HTTP request execution.
237      *
238      * @param <T> the result type of request execution.
239      * @param <E> the connection pool entry type.
240      * @param target target host.
241      * @param requestProducers list of request producers.
242      * @param responseConsumers list of response consumers.
243      * @param connPool pool of persistent reusable connections.
244      * @param context HTTP context
245      * @param callback future callback.
246      * @return future representing pending completion of the operation.
247      *
248      * @since 4.4
249      */
250     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
251             final HttpHost target,
252             final List<? extends HttpAsyncRequestProducer> requestProducers,
253             final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
254             final ConnPool<HttpHost, E> connPool,
255             final HttpContext context,
256             final FutureCallback<List<T>> callback) {
257         Args.notNull(target, "HTTP target");
258         Args.notEmpty(requestProducers, "Request producer list");
259         Args.notEmpty(responseConsumers, "Response consumer list");
260         Args.notNull(connPool, "HTTP connection pool");
261         Args.notNull(context, "HTTP context");
262         final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
263         connPool.lease(target, null, new ConnPipelinedRequestCallback<T, E>(
264                 future, requestProducers, responseConsumers, connPool, context));
265         return future;
266     }
267 
268     /**
269      * Initiates asynchronous HTTP request execution. This method automatically releases
270      * the given pool entry once request execution is completed (successfully or unsuccessfully).
271      *
272      * @param <T> the result type of request execution.
273      * @param <E> the connection pool entry type.
274      * @param requestProducer request producer.
275      * @param responseConsumer response consumer.
276      * @param poolEntry leased pool entry. It will be automatically released
277      *   back to the pool when execution is completed.
278      * @param connPool pool of persistent reusable connections.
279      * @param context HTTP context
280      * @param callback future callback.
281      * @return future representing pending completion of the operation.
282      *
283      * @since 4.3
284      */
285     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
286             final HttpAsyncRequestProducer requestProducer,
287             final HttpAsyncResponseConsumer<T> responseConsumer,
288             final E poolEntry,
289             final ConnPool<HttpHost, E> connPool,
290             final HttpContext context,
291             final FutureCallback<T> callback) {
292         Args.notNull(requestProducer, "HTTP request producer");
293         Args.notNull(responseConsumer, "HTTP response consumer");
294         Args.notNull(connPool, "HTTP connection pool");
295         Args.notNull(poolEntry, "Pool entry");
296         Args.notNull(context, "HTTP context");
297         final BasicFuture<T> future = new BasicFuture<T>(callback);
298         final NHttpClientConnection conn = poolEntry.getConnection();
299         final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
300                 requestProducer, responseConsumer,
301                 new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
302                 context, conn,
303                 this.httpprocessor, this.connReuseStrategy);
304         initExecution(handler, conn);
305         return future;
306     }
307 
308     /**
309      * Initiates asynchronous pipelined HTTP request execution. This method automatically releases
310      * the given pool entry once request execution is completed (successfully or unsuccessfully).
311      *
312      * @param <T> the result type of request execution.
313      * @param <E> the connection pool entry type.
314      * @param requestProducers list of request producers.
315      * @param responseConsumers list of response consumers.
316      * @param poolEntry leased pool entry. It will be automatically released
317      *   back to the pool when execution is completed.
318      * @param connPool pool of persistent reusable connections.
319      * @param context HTTP context
320      * @param callback future callback.
321      * @return future representing pending completion of the operation.
322      *
323      * @since 4.4
324      */
325     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<List<T>> executePipelined(
326             final List<HttpAsyncRequestProducer> requestProducers,
327             final List<HttpAsyncResponseConsumer<T>> responseConsumers,
328             final E poolEntry,
329             final ConnPool<HttpHost, E> connPool,
330             final HttpContext context,
331             final FutureCallback<List<T>> callback) {
332         Args.notEmpty(requestProducers, "Request producer list");
333         Args.notEmpty(responseConsumers, "Response consumer list");
334         Args.notNull(connPool, "HTTP connection pool");
335         Args.notNull(poolEntry, "Pool entry");
336         Args.notNull(context, "HTTP context");
337         final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
338         final NHttpClientConnection conn = poolEntry.getConnection();
339         final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
340                 requestProducers, responseConsumers,
341                 new RequestExecutionCallback<List<T>, E>(future, poolEntry, connPool),
342                 context, conn,
343                 this.httpprocessor, this.connReuseStrategy);
344         initExecution(handler, conn);
345         return future;
346     }
347 
348     /**
349      * Initiates asynchronous HTTP request execution.
350      *
351      * @param <T> the result type of request execution.
352      * @param <E> the connection pool entry type.
353      * @param requestProducer request producer.
354      * @param responseConsumer response consumer.
355      * @param connPool pool of persistent reusable connections.
356      * @param context HTTP context
357      * @return future representing pending completion of the operation.
358      */
359     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
360             final HttpAsyncRequestProducer requestProducer,
361             final HttpAsyncResponseConsumer<T> responseConsumer,
362             final ConnPool<HttpHost, E> connPool,
363             final HttpContext context) {
364         return execute(requestProducer, responseConsumer, connPool, context, null);
365     }
366 
367     /**
368      * Initiates asynchronous HTTP request execution.
369      *
370      * @param <T> the result type of request execution.
371      * @param <E> the connection pool entry type.
372      * @param requestProducer request producer.
373      * @param responseConsumer response consumer.
374      * @param connPool pool of persistent reusable connections.
375      * @return future representing pending completion of the operation.
376      */
377     public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
378             final HttpAsyncRequestProducer requestProducer,
379             final HttpAsyncResponseConsumer<T> responseConsumer,
380             final ConnPool<HttpHost, E> connPool) {
381         return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
382     }
383 
384     class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
385 
386         private final BasicFuture<T> requestFuture;
387         private final HttpAsyncRequestProducer requestProducer;
388         private final HttpAsyncResponseConsumer<T> responseConsumer;
389         private final ConnPool<HttpHost, E> connPool;
390         private final HttpContext context;
391 
392         ConnRequestCallback(
393                 final BasicFuture<T> requestFuture,
394                 final HttpAsyncRequestProducer requestProducer,
395                 final HttpAsyncResponseConsumer<T> responseConsumer,
396                 final ConnPool<HttpHost, E> connPool,
397                 final HttpContext context) {
398             super();
399             this.requestFuture = requestFuture;
400             this.requestProducer = requestProducer;
401             this.responseConsumer = responseConsumer;
402             this.connPool = connPool;
403             this.context = context;
404         }
405 
406         @Override
407         public void completed(final E result) {
408             if (this.requestFuture.isDone()) {
409                 this.connPool.release(result, true);
410                 return;
411             }
412             final NHttpClientConnection conn = result.getConnection();
413             final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
414                     this.requestProducer, this.responseConsumer,
415                     new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
416                     this.context, conn, httpprocessor, connReuseStrategy);
417             initExecution(handler, conn);
418         }
419 
420         @Override
421         public void failed(final Exception ex) {
422             try {
423                 try {
424                     this.responseConsumer.failed(ex);
425                 } finally {
426                     releaseResources();
427                 }
428             } finally {
429                 this.requestFuture.failed(ex);
430             }
431         }
432 
433         @Override
434         public void cancelled() {
435             try {
436                 try {
437                     this.responseConsumer.cancel();
438                 } finally {
439                     releaseResources();
440                 }
441             } finally {
442                 this.requestFuture.cancel(true);
443             }
444         }
445 
446         public void releaseResources() {
447             close(requestProducer);
448             close(responseConsumer);
449         }
450 
451     }
452 
453     class ConnPipelinedRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
454 
455         private final BasicFuture<List<T>> requestFuture;
456         private final List<? extends HttpAsyncRequestProducer> requestProducers;
457         private final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers;
458         private final ConnPool<HttpHost, E> connPool;
459         private final HttpContext context;
460 
461         ConnPipelinedRequestCallback(
462                 final BasicFuture<List<T>> requestFuture,
463                 final List<? extends HttpAsyncRequestProducer> requestProducers,
464                 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
465                 final ConnPool<HttpHost, E> connPool,
466                 final HttpContext context) {
467             super();
468             this.requestFuture = requestFuture;
469             this.requestProducers = requestProducers;
470             this.responseConsumers = responseConsumers;
471             this.connPool = connPool;
472             this.context = context;
473         }
474 
475         @Override
476         public void completed(final E result) {
477             if (this.requestFuture.isDone()) {
478                 this.connPool.release(result, true);
479                 return;
480             }
481             final NHttpClientConnection conn = result.getConnection();
482             final PipeliningClientExchangeHandler<T> handler = new PipeliningClientExchangeHandler<T>(
483                     this.requestProducers, this.responseConsumers,
484                     new RequestExecutionCallback<List<T>, E>(this.requestFuture, result, this.connPool),
485                     this.context, conn, httpprocessor, connReuseStrategy);
486             initExecution(handler, conn);
487         }
488 
489         @Override
490         public void failed(final Exception ex) {
491             try {
492                 try {
493                     for (HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
494                         responseConsumer.failed(ex);
495                     }
496                 } finally {
497                     releaseResources();
498                 }
499             } finally {
500                 this.requestFuture.failed(ex);
501             }
502         }
503 
504         @Override
505         public void cancelled() {
506             try {
507                 try {
508                     for (HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
509                         responseConsumer.cancel();
510                     }
511                 } finally {
512                     releaseResources();
513                 }
514             } finally {
515                 this.requestFuture.cancel(true);
516             }
517         }
518 
519         public void releaseResources() {
520             for (HttpAsyncRequestProducer requestProducer: this.requestProducers) {
521                 close(requestProducer);
522             }
523             for (HttpAsyncResponseConsumer<T> responseConsumer: this.responseConsumers) {
524                 close(responseConsumer);
525             }
526         }
527 
528     }
529 
530     class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
531                                                implements FutureCallback<T> {
532 
533         private final BasicFuture<T> future;
534         private final E poolEntry;
535         private final ConnPool<HttpHost, E> connPool;
536 
537         RequestExecutionCallback(
538                 final BasicFuture<T> future,
539                 final E poolEntry,
540                 final ConnPool<HttpHost, E> connPool) {
541             super();
542             this.future = future;
543             this.poolEntry = poolEntry;
544             this.connPool = connPool;
545         }
546 
547         @Override
548         public void completed(final T result) {
549             try {
550                 this.connPool.release(this.poolEntry, true);
551             } finally {
552                 this.future.completed(result);
553             }
554         }
555 
556         @Override
557         public void failed(final Exception ex) {
558             try {
559                 this.connPool.release(this.poolEntry, false);
560             } finally {
561                 this.future.failed(ex);
562             }
563         }
564 
565         @Override
566         public void cancelled() {
567             try {
568                 this.connPool.release(this.poolEntry, false);
569             } finally {
570                 this.future.cancel(true);
571             }
572         }
573 
574     }
575 
576     /**
577      * This method can be used to log I/O exception thrown while closing
578      * {@link java.io.Closeable} objects (such as
579      * {@link org.apache.http.HttpConnection}}).
580      *
581      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
582      */
583     protected void log(final Exception ex) {
584         this.exceptionLogger.log(ex);
585     }
586 
587     private void close(final Closeable closeable) {
588         try {
589             closeable.close();
590         } catch (IOException ex) {
591             log(ex);
592         }
593     }
594 
595 }