1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.protocol;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.concurrent.Future;
32
33 import org.apache.http.ConnectionClosedException;
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpConnection;
36 import org.apache.http.HttpHost;
37 import org.apache.http.annotation.Immutable;
38 import org.apache.http.concurrent.BasicFuture;
39 import org.apache.http.concurrent.FutureCallback;
40 import org.apache.http.impl.DefaultConnectionReuseStrategy;
41 import org.apache.http.nio.NHttpClientConnection;
42 import org.apache.http.params.HttpParams;
43 import org.apache.http.pool.ConnPool;
44 import org.apache.http.pool.PoolEntry;
45 import org.apache.http.protocol.BasicHttpContext;
46 import org.apache.http.protocol.HttpContext;
47 import org.apache.http.protocol.HttpProcessor;
48 import org.apache.http.util.Args;
49
50
51
52
53
54
55
56
57
58
59 @SuppressWarnings("deprecation")
60 @Immutable
61 public class HttpAsyncRequester {
62
63 private final HttpProcessor httppocessor;
64 private final ConnectionReuseStrategy connReuseStrategy;
65
66
67
68
69
70 @Deprecated
71 public HttpAsyncRequester(
72 final HttpProcessor httppocessor,
73 final ConnectionReuseStrategy reuseStrategy,
74 final HttpParams params) {
75 this(httppocessor, reuseStrategy);
76 }
77
78
79
80
81
82
83 public HttpAsyncRequester(
84 final HttpProcessor httppocessor,
85 final ConnectionReuseStrategy connReuseStrategy) {
86 super();
87 this.httppocessor = Args.notNull(httppocessor, "HTTP processor");
88 this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
89 DefaultConnectionReuseStrategy.INSTANCE;
90 }
91
92
93
94
95
96
97 public HttpAsyncRequester(final HttpProcessor httppocessor) {
98 this(httppocessor, null);
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112 public <T> Future<T> execute(
113 final HttpAsyncRequestProducer requestProducer,
114 final HttpAsyncResponseConsumer<T> responseConsumer,
115 final NHttpClientConnection conn,
116 final HttpContext context,
117 final FutureCallback<T> callback) {
118 Args.notNull(requestProducer, "HTTP request producer");
119 Args.notNull(responseConsumer, "HTTP response consumer");
120 Args.notNull(conn, "HTTP connection");
121 Args.notNull(context, "HTTP context");
122 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
123 requestProducer, responseConsumer, callback, context, conn,
124 this.httppocessor, this.connReuseStrategy);
125 initExection(handler, conn);
126 return handler.getFuture();
127 }
128
129 private void initExection(
130 final HttpAsyncClientExchangeHandler handler, final NHttpClientConnection conn) {
131 conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
132 if (!conn.isOpen()) {
133 handler.failed(new ConnectionClosedException("Connection closed"));
134 try {
135 handler.close();
136 } catch (final IOException ex) {
137 log(ex);
138 }
139 } else {
140 conn.requestOutput();
141 }
142 }
143
144
145
146
147
148
149
150
151
152
153
154 public <T> Future<T> execute(
155 final HttpAsyncRequestProducer requestProducer,
156 final HttpAsyncResponseConsumer<T> responseConsumer,
157 final NHttpClientConnection conn,
158 final HttpContext context) {
159 return execute(requestProducer, responseConsumer, conn, context, null);
160 }
161
162
163
164
165
166
167
168
169
170
171 public <T> Future<T> execute(
172 final HttpAsyncRequestProducer requestProducer,
173 final HttpAsyncResponseConsumer<T> responseConsumer,
174 final NHttpClientConnection conn) {
175 return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
176 }
177
178
179
180
181
182
183
184
185
186
187
188
189
190 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
191 final HttpAsyncRequestProducer requestProducer,
192 final HttpAsyncResponseConsumer<T> responseConsumer,
193 final ConnPool<HttpHost, E> connPool,
194 final HttpContext context,
195 final FutureCallback<T> callback) {
196 Args.notNull(requestProducer, "HTTP request producer");
197 Args.notNull(responseConsumer, "HTTP response consumer");
198 Args.notNull(connPool, "HTTP connection pool");
199 Args.notNull(context, "HTTP context");
200 final BasicFuture<T> future = new BasicFuture<T>(callback);
201 final HttpHost target = requestProducer.getTarget();
202 connPool.lease(target, null, new ConnRequestCallback<T, E>(
203 future, requestProducer, responseConsumer, connPool, context));
204 return future;
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
225 final HttpAsyncRequestProducer requestProducer,
226 final HttpAsyncResponseConsumer<T> responseConsumer,
227 final E poolEntry,
228 final ConnPool<HttpHost, E> connPool,
229 final HttpContext context,
230 final FutureCallback<T> callback) {
231 Args.notNull(requestProducer, "HTTP request producer");
232 Args.notNull(responseConsumer, "HTTP response consumer");
233 Args.notNull(connPool, "HTTP connection pool");
234 Args.notNull(poolEntry, "Pool entry");
235 Args.notNull(context, "HTTP context");
236 final BasicFuture<T> future = new BasicFuture<T>(callback);
237 final NHttpClientConnection conn = poolEntry.getConnection();
238 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
239 requestProducer, responseConsumer,
240 new RequestExecutionCallback<T, E>(future, poolEntry, connPool),
241 context, conn,
242 this.httppocessor, this.connReuseStrategy);
243 initExection(handler, conn);
244 return future;
245 }
246
247
248
249
250
251
252
253
254
255
256
257
258 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
259 final HttpAsyncRequestProducer requestProducer,
260 final HttpAsyncResponseConsumer<T> responseConsumer,
261 final ConnPool<HttpHost, E> connPool,
262 final HttpContext context) {
263 return execute(requestProducer, responseConsumer, connPool, context, null);
264 }
265
266
267
268
269
270
271
272
273
274
275
276 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
277 final HttpAsyncRequestProducer requestProducer,
278 final HttpAsyncResponseConsumer<T> responseConsumer,
279 final ConnPool<HttpHost, E> connPool) {
280 return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
281 }
282
283 class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
284
285 private final BasicFuture<T> requestFuture;
286 private final HttpAsyncRequestProducer requestProducer;
287 private final HttpAsyncResponseConsumer<T> responseConsumer;
288 private final ConnPool<HttpHost, E> connPool;
289 private final HttpContext context;
290
291 ConnRequestCallback(
292 final BasicFuture<T> requestFuture,
293 final HttpAsyncRequestProducer requestProducer,
294 final HttpAsyncResponseConsumer<T> responseConsumer,
295 final ConnPool<HttpHost, E> connPool,
296 final HttpContext context) {
297 super();
298 this.requestFuture = requestFuture;
299 this.requestProducer = requestProducer;
300 this.responseConsumer = responseConsumer;
301 this.connPool = connPool;
302 this.context = context;
303 }
304
305 public void completed(final E result) {
306 if (this.requestFuture.isDone()) {
307 this.connPool.release(result, true);
308 return;
309 }
310 final NHttpClientConnection conn = result.getConnection();
311 final BasicAsyncClientExchangeHandler<T> handler = new BasicAsyncClientExchangeHandler<T>(
312 this.requestProducer, this.responseConsumer,
313 new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
314 this.context, conn, httppocessor, connReuseStrategy);
315 initExection(handler, conn);
316 }
317
318 public void failed(final Exception ex) {
319 try {
320 try {
321 this.responseConsumer.failed(ex);
322 } finally {
323 releaseResources();
324 }
325 } finally {
326 this.requestFuture.failed(ex);
327 }
328 }
329
330 public void cancelled() {
331 try {
332 try {
333 this.responseConsumer.cancel();
334 } finally {
335 releaseResources();
336 }
337 } finally {
338 this.requestFuture.cancel(true);
339 }
340 }
341
342 public void releaseResources() {
343 try {
344 this.requestProducer.close();
345 } catch (final IOException ioex) {
346 log(ioex);
347 }
348 try {
349 this.responseConsumer.close();
350 } catch (final IOException ioex) {
351 log(ioex);
352 }
353 }
354
355 }
356
357 class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
358 implements FutureCallback<T> {
359
360 private final BasicFuture<T> future;
361 private final E poolEntry;
362 private final ConnPool<HttpHost, E> connPool;
363
364 RequestExecutionCallback(
365 final BasicFuture<T> future,
366 final E poolEntry,
367 final ConnPool<HttpHost, E> connPool) {
368 super();
369 this.future = future;
370 this.poolEntry = poolEntry;
371 this.connPool = connPool;
372 }
373
374 public void completed(final T result) {
375 try {
376 this.connPool.release(this.poolEntry, true);
377 } finally {
378 this.future.completed(result);
379 }
380 }
381
382 public void failed(final Exception ex) {
383 try {
384 this.connPool.release(this.poolEntry, false);
385 } finally {
386 this.future.failed(ex);
387 }
388 }
389
390 public void cancelled() {
391 try {
392 this.connPool.release(this.poolEntry, false);
393 } finally {
394 this.future.cancel(true);
395 }
396 }
397
398 }
399
400
401
402
403
404
405
406 protected void log(final Exception ex) {
407 }
408
409 }