1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.protocol;
28
29 import java.io.Closeable;
30 import java.io.IOException;
31 import java.util.concurrent.Future;
32
33 import org.apache.http.ConnectionClosedException;
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpConnection;
36 import org.apache.http.HttpHost;
37 import org.apache.http.annotation.Immutable;
38 import org.apache.http.concurrent.BasicFuture;
39 import org.apache.http.concurrent.FutureCallback;
40 import org.apache.http.nio.NHttpClientConnection;
41 import org.apache.http.params.HttpParams;
42 import org.apache.http.pool.ConnPool;
43 import org.apache.http.pool.PoolEntry;
44 import org.apache.http.protocol.BasicHttpContext;
45 import org.apache.http.protocol.HttpContext;
46 import org.apache.http.protocol.HttpProcessor;
47
48
49
50
51
52
53
54
55
56
57 @Immutable
58 public class HttpAsyncRequester {
59
60 private final HttpProcessor httppocessor;
61 private final ConnectionReuseStrategy reuseStrategy;
62 private final HttpParams params;
63
64 public HttpAsyncRequester(
65 final HttpProcessor httppocessor,
66 final ConnectionReuseStrategy reuseStrategy,
67 final HttpParams params) {
68 super();
69 this.httppocessor = httppocessor;
70 this.reuseStrategy = reuseStrategy;
71 this.params = params;
72 }
73
74
75
76
77
78
79
80
81
82
83
84
85 public <T> Future<T> execute(
86 final HttpAsyncRequestProducer requestProducer,
87 final HttpAsyncResponseConsumer<T> responseConsumer,
88 final NHttpClientConnection conn,
89 final HttpContext context,
90 final FutureCallback<T> callback) {
91 if (requestProducer == null) {
92 throw new IllegalArgumentException("HTTP request producer may not be null");
93 }
94 if (responseConsumer == null) {
95 throw new IllegalArgumentException("HTTP response consumer may not be null");
96 }
97 if (conn == null) {
98 throw new IllegalArgumentException("HTTP connection may not be null");
99 }
100 if (context == null) {
101 throw new IllegalArgumentException("HTTP context may not be null");
102 }
103 BasicAsyncRequestExecutionHandler<T> handler = new BasicAsyncRequestExecutionHandler<T>(
104 requestProducer, responseConsumer, callback, context,
105 this.httppocessor, this.reuseStrategy, this.params);
106 doExecute(handler, conn);
107 return handler.getFuture();
108 }
109
110 private <T> void doExecute(
111 final HttpAsyncRequestExecutionHandler<T> handler, final NHttpClientConnection conn) {
112 conn.getContext().setAttribute(HttpAsyncRequestExecutor.HTTP_HANDLER, handler);
113 if (!conn.isOpen()) {
114 handler.failed(new ConnectionClosedException("Connection closed"));
115 try {
116 handler.close();
117 } catch (IOException ex) {
118 log(ex);
119 }
120 } else {
121 conn.requestOutput();
122 }
123 }
124
125
126
127
128
129
130
131
132
133
134
135 public <T> Future<T> execute(
136 final HttpAsyncRequestProducer requestProducer,
137 final HttpAsyncResponseConsumer<T> responseConsumer,
138 final NHttpClientConnection conn,
139 final HttpContext context) {
140 return execute(requestProducer, responseConsumer, conn, context, null);
141 }
142
143
144
145
146
147
148
149
150
151
152 public <T> Future<T> execute(
153 final HttpAsyncRequestProducer requestProducer,
154 final HttpAsyncResponseConsumer<T> responseConsumer,
155 final NHttpClientConnection conn) {
156 return execute(requestProducer, responseConsumer, conn, new BasicHttpContext());
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
172 final HttpAsyncRequestProducer requestProducer,
173 final HttpAsyncResponseConsumer<T> responseConsumer,
174 final ConnPool<HttpHost, E> connPool,
175 final HttpContext context,
176 final FutureCallback<T> callback) {
177 if (requestProducer == null) {
178 throw new IllegalArgumentException("HTTP request producer may not be null");
179 }
180 if (responseConsumer == null) {
181 throw new IllegalArgumentException("HTTP response consumer may not be null");
182 }
183 if (connPool == null) {
184 throw new IllegalArgumentException("HTTP connection pool may not be null");
185 }
186 if (context == null) {
187 throw new IllegalArgumentException("HTTP context may not be null");
188 }
189 BasicFuture<T> future = new BasicFuture<T>(callback);
190 HttpHost target = requestProducer.getTarget();
191 connPool.lease(target, null, new ConnRequestCallback<T, E>(
192 future, requestProducer, responseConsumer, connPool, context));
193 return future;
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
208 final HttpAsyncRequestProducer requestProducer,
209 final HttpAsyncResponseConsumer<T> responseConsumer,
210 final ConnPool<HttpHost, E> connPool,
211 final HttpContext context) {
212 return execute(requestProducer, responseConsumer, connPool, context, null);
213 }
214
215
216
217
218
219
220
221
222
223
224
225 public <T, E extends PoolEntry<HttpHost, NHttpClientConnection>> Future<T> execute(
226 final HttpAsyncRequestProducer requestProducer,
227 final HttpAsyncResponseConsumer<T> responseConsumer,
228 final ConnPool<HttpHost, E> connPool) {
229 return execute(requestProducer, responseConsumer, connPool, new BasicHttpContext());
230 }
231
232 class ConnRequestCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>> implements FutureCallback<E> {
233
234 private final BasicFuture<T> requestFuture;
235 private final HttpAsyncRequestProducer requestProducer;
236 private final HttpAsyncResponseConsumer<T> responseConsumer;
237 private final ConnPool<HttpHost, E> connPool;
238 private final HttpContext context;
239
240 ConnRequestCallback(
241 final BasicFuture<T> requestFuture,
242 final HttpAsyncRequestProducer requestProducer,
243 final HttpAsyncResponseConsumer<T> responseConsumer,
244 final ConnPool<HttpHost, E> connPool,
245 final HttpContext context) {
246 super();
247 this.requestFuture = requestFuture;
248 this.requestProducer = requestProducer;
249 this.responseConsumer = responseConsumer;
250 this.connPool = connPool;
251 this.context = context;
252 }
253
254 public void completed(final E result) {
255 if (this.requestFuture.isDone()) {
256 this.connPool.release(result, true);
257 return;
258 }
259 NHttpClientConnection conn = result.getConnection();
260 BasicAsyncRequestExecutionHandler<T> handler = new BasicAsyncRequestExecutionHandler<T>(
261 this.requestProducer, this.responseConsumer,
262 new RequestExecutionCallback<T, E>(this.requestFuture, result, this.connPool),
263 this.context, httppocessor, reuseStrategy, params);
264 doExecute(handler, conn);
265 }
266
267 public void failed(final Exception ex) {
268 try {
269 try {
270 this.responseConsumer.failed(ex);
271 } finally {
272 releaseResources();
273 }
274 } finally {
275 this.requestFuture.failed(ex);
276 }
277 }
278
279 public void cancelled() {
280 try {
281 try {
282 this.responseConsumer.cancel();
283 } finally {
284 releaseResources();
285 }
286 } finally {
287 this.requestFuture.cancel(true);
288 }
289 }
290
291 public void releaseResources() {
292 try {
293 this.requestProducer.close();
294 } catch (IOException ioex) {
295 log(ioex);
296 }
297 try {
298 this.responseConsumer.close();
299 } catch (IOException ioex) {
300 log(ioex);
301 }
302 }
303
304 }
305
306 class RequestExecutionCallback<T, E extends PoolEntry<HttpHost, NHttpClientConnection>>
307 implements FutureCallback<T> {
308
309 private final BasicFuture<T> future;
310 private final E poolEntry;
311 private final ConnPool<HttpHost, E> connPool;
312
313 RequestExecutionCallback(
314 final BasicFuture<T> future,
315 final E poolEntry,
316 final ConnPool<HttpHost, E> connPool) {
317 super();
318 this.future = future;
319 this.poolEntry = poolEntry;
320 this.connPool = connPool;
321 }
322
323 public void completed(final T result) {
324 try {
325 this.connPool.release(this.poolEntry, true);
326 } finally {
327 this.future.completed(result);
328 }
329 }
330
331 public void failed(final Exception ex) {
332 try {
333 this.connPool.release(this.poolEntry, false);
334 } finally {
335 this.future.failed(ex);
336 }
337 }
338
339 public void cancelled() {
340 try {
341 this.connPool.release(this.poolEntry, false);
342 } finally {
343 this.future.cancel(true);
344 }
345 }
346
347 }
348
349
350
351
352
353
354
355 protected void log(Exception ex) {
356 }
357
358 }