1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.Closeable;
31 import java.io.IOException;
32 import java.net.SocketTimeoutException;
33
34 import org.apache.http.ConnectionClosedException;
35 import org.apache.http.HttpConnection;
36 import org.apache.http.HttpEntityEnclosingRequest;
37 import org.apache.http.HttpException;
38 import org.apache.http.HttpRequest;
39 import org.apache.http.HttpResponse;
40 import org.apache.http.HttpStatus;
41 import org.apache.http.ProtocolException;
42 import org.apache.http.annotation.Immutable;
43 import org.apache.http.nio.ContentDecoder;
44 import org.apache.http.nio.ContentEncoder;
45 import org.apache.http.nio.NHttpClientConnection;
46 import org.apache.http.nio.NHttpClientEventHandler;
47 import org.apache.http.nio.NHttpConnection;
48 import org.apache.http.protocol.HttpContext;
49 import org.apache.http.protocol.HttpProcessor;
50 import org.apache.http.util.Args;
51 import org.apache.http.util.Asserts;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @Immutable
79 public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
80
81 public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
82 public static final String HTTP_HANDLER = "http.nio.exchange-handler";
83
84 private final int waitForContinue;
85
86
87
88
89
90
91 public HttpAsyncRequestExecutor(final int waitForContinue) {
92 super();
93 this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
94 }
95
96 public HttpAsyncRequestExecutor() {
97 this(DEFAULT_WAIT_FOR_CONTINUE);
98 }
99
100 public void connected(
101 final NHttpClientConnection conn,
102 final Object attachment) throws IOException, HttpException {
103 final State state = new State();
104 final HttpContext context = conn.getContext();
105 context.setAttribute(HTTP_EXCHANGE_STATE, state);
106 requestReady(conn);
107 }
108
109 public void closed(final NHttpClientConnection conn) {
110 final State state = getState(conn);
111 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
112 if (state == null || (handler != null && handler.isDone())) {
113 closeHandler(handler);
114 }
115 if (state != null) {
116 state.reset();
117 }
118 }
119
120 public void exception(
121 final NHttpClientConnection conn, final Exception cause) {
122 shutdownConnection(conn);
123 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
124 if (handler != null) {
125 handler.failed(cause);
126 } else {
127 log(cause);
128 }
129 }
130
131 public void requestReady(
132 final NHttpClientConnection conn) throws IOException, HttpException {
133 final State state = ensureNotNull(getState(conn));
134 if (state.getRequestState() != MessageState.READY) {
135 return;
136 }
137 HttpAsyncClientExchangeHandler handler = getHandler(conn);
138 if (handler != null && handler.isDone()) {
139 closeHandler(handler);
140 state.reset();
141 handler = null;
142 }
143 if (handler == null) {
144 return;
145 }
146
147 final HttpRequest request = handler.generateRequest();
148 state.setRequest(request);
149
150 conn.submitRequest(request);
151
152 if (request instanceof HttpEntityEnclosingRequest) {
153 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
154 final int timeout = conn.getSocketTimeout();
155 state.setTimeout(timeout);
156 conn.setSocketTimeout(this.waitForContinue);
157 state.setRequestState(MessageState.ACK_EXPECTED);
158 } else {
159 state.setRequestState(MessageState.BODY_STREAM);
160 }
161 } else {
162 handler.requestCompleted();
163 state.setRequestState(MessageState.COMPLETED);
164 }
165 }
166
167 public void outputReady(
168 final NHttpClientConnection conn,
169 final ContentEncoder encoder) throws IOException, HttpException {
170 final State state = ensureNotNull(getState(conn));
171 final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
172 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
173 conn.suspendOutput();
174 return;
175 }
176 handler.produceContent(encoder, conn);
177 state.setRequestState(MessageState.BODY_STREAM);
178 if (encoder.isCompleted()) {
179 handler.requestCompleted();
180 state.setRequestState(MessageState.COMPLETED);
181 }
182 }
183
184 public void responseReceived(
185 final NHttpClientConnection conn) throws HttpException, IOException {
186 final State state = ensureNotNull(getState(conn));
187 final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
188 final HttpResponse response = conn.getHttpResponse();
189 final HttpRequest request = state.getRequest();
190
191 final int statusCode = response.getStatusLine().getStatusCode();
192 if (statusCode < HttpStatus.SC_OK) {
193
194 if (statusCode != HttpStatus.SC_CONTINUE) {
195 throw new ProtocolException(
196 "Unexpected response: " + response.getStatusLine());
197 }
198 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
199 final int timeout = state.getTimeout();
200 conn.setSocketTimeout(timeout);
201 conn.requestOutput();
202 state.setRequestState(MessageState.ACK);
203 }
204 return;
205 }
206 state.setResponse(response);
207 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
208 final int timeout = state.getTimeout();
209 conn.setSocketTimeout(timeout);
210 conn.resetOutput();
211 state.setRequestState(MessageState.COMPLETED);
212 } else if (state.getRequestState() == MessageState.BODY_STREAM) {
213
214 conn.resetOutput();
215 conn.suspendOutput();
216 state.setRequestState(MessageState.COMPLETED);
217 state.invalidate();
218 }
219
220 handler.responseReceived(response);
221
222 state.setResponseState(MessageState.BODY_STREAM);
223 if (!canResponseHaveBody(request, response)) {
224 response.setEntity(null);
225 conn.resetInput();
226 processResponse(conn, state, handler);
227 }
228 }
229
230 public void inputReady(
231 final NHttpClientConnection conn,
232 final ContentDecoder decoder) throws IOException, HttpException {
233 final State state = ensureNotNull(getState(conn));
234 final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
235 handler.consumeContent(decoder, conn);
236 state.setResponseState(MessageState.BODY_STREAM);
237 if (decoder.isCompleted()) {
238 processResponse(conn, state, handler);
239 }
240 }
241
242 public void endOfInput(final NHttpClientConnection conn) throws IOException {
243 final State state = getState(conn);
244 if (state != null) {
245 if (state.getRequestState().compareTo(MessageState.READY) != 0) {
246 state.invalidate();
247 }
248 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
249 if (handler != null) {
250 if (state.isValid()) {
251 handler.inputTerminated();
252 } else {
253 handler.failed(new ConnectionClosedException("Connection closed"));
254 }
255 }
256 }
257
258
259
260
261 if (conn.getSocketTimeout() <= 0) {
262 conn.setSocketTimeout(1000);
263 }
264 conn.close();
265 }
266
267 public void timeout(
268 final NHttpClientConnection conn) throws IOException {
269 final State state = getState(conn);
270 if (state != null) {
271 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
272 final int timeout = state.getTimeout();
273 conn.setSocketTimeout(timeout);
274 conn.requestOutput();
275 state.setRequestState(MessageState.BODY_STREAM);
276 return;
277 } else {
278 state.invalidate();
279 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
280 if (handler != null) {
281 handler.failed(new SocketTimeoutException());
282 handler.close();
283 }
284 }
285 }
286 if (conn.getStatus() == NHttpConnection.ACTIVE) {
287 conn.close();
288 if (conn.getStatus() == NHttpConnection.CLOSING) {
289
290
291 conn.setSocketTimeout(250);
292 }
293 } else {
294 conn.shutdown();
295 }
296 }
297
298
299
300
301
302
303
304 protected void log(final Exception ex) {
305 }
306
307 private State getState(final NHttpConnection conn) {
308 return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
309 }
310
311 private State ensureNotNull(final State state) {
312 Asserts.notNull(state, "HTTP exchange state");
313 return state;
314 }
315
316 private HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
317 return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
318 }
319
320 private HttpAsyncClientExchangeHandler ensureNotNull(final HttpAsyncClientExchangeHandler handler) {
321 Asserts.notNull(handler, "HTTP exchange handler");
322 return handler;
323 }
324
325 private void shutdownConnection(final NHttpConnection conn) {
326 try {
327 conn.shutdown();
328 } catch (final IOException ex) {
329 log(ex);
330 }
331 }
332
333 private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
334 if (handler != null) {
335 try {
336 handler.close();
337 } catch (final IOException ioex) {
338 log(ioex);
339 }
340 }
341 }
342
343 private void processResponse(
344 final NHttpClientConnection conn,
345 final State state,
346 final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
347 if (!state.isValid()) {
348 conn.close();
349 }
350 handler.responseCompleted();
351 state.reset();
352 if (!handler.isDone()) {
353 conn.requestOutput();
354 }
355 }
356
357 private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
358
359 final String method = request.getRequestLine().getMethod();
360 final int status = response.getStatusLine().getStatusCode();
361
362 if (method.equalsIgnoreCase("HEAD")) {
363 return false;
364 }
365 if (method.equalsIgnoreCase("CONNECT") && status < 300) {
366 return false;
367 }
368 return status >= HttpStatus.SC_OK
369 && status != HttpStatus.SC_NO_CONTENT
370 && status != HttpStatus.SC_NOT_MODIFIED
371 && status != HttpStatus.SC_RESET_CONTENT;
372 }
373
374 static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
375
376 static class State {
377
378 private volatile MessageState requestState;
379 private volatile MessageState responseState;
380 private volatile HttpRequest request;
381 private volatile HttpResponse response;
382 private volatile boolean valid;
383 private volatile int timeout;
384
385 State() {
386 super();
387 this.valid = true;
388 this.requestState = MessageState.READY;
389 this.responseState = MessageState.READY;
390 }
391
392 public MessageState getRequestState() {
393 return this.requestState;
394 }
395
396 public void setRequestState(final MessageState state) {
397 this.requestState = state;
398 }
399
400 public MessageState getResponseState() {
401 return this.responseState;
402 }
403
404 public void setResponseState(final MessageState state) {
405 this.responseState = state;
406 }
407
408 public HttpRequest getRequest() {
409 return this.request;
410 }
411
412 public void setRequest(final HttpRequest request) {
413 this.request = request;
414 }
415
416 public HttpResponse getResponse() {
417 return this.response;
418 }
419
420 public void setResponse(final HttpResponse response) {
421 this.response = response;
422 }
423
424 public int getTimeout() {
425 return this.timeout;
426 }
427
428 public void setTimeout(final int timeout) {
429 this.timeout = timeout;
430 }
431
432 public void reset() {
433 this.responseState = MessageState.READY;
434 this.requestState = MessageState.READY;
435 this.response = null;
436 this.request = null;
437 this.timeout = 0;
438 }
439
440 public boolean isValid() {
441 return this.valid;
442 }
443
444 public void invalidate() {
445 this.valid = false;
446 }
447
448 @Override
449 public String toString() {
450 final StringBuilder buf = new StringBuilder();
451 buf.append("request state: ");
452 buf.append(this.requestState);
453 buf.append("; request: ");
454 if (this.request != null) {
455 buf.append(this.request.getRequestLine());
456 }
457 buf.append("; response state: ");
458 buf.append(this.responseState);
459 buf.append("; response: ");
460 if (this.response != null) {
461 buf.append(this.response.getStatusLine());
462 }
463 buf.append("; valid: ");
464 buf.append(this.valid);
465 buf.append(";");
466 return buf.toString();
467 }
468
469 }
470
471 }