1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.concurrent.Executor;
33
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpEntity;
36 import org.apache.http.HttpEntityEnclosingRequest;
37 import org.apache.http.HttpException;
38 import org.apache.http.HttpRequest;
39 import org.apache.http.HttpResponse;
40 import org.apache.http.HttpStatus;
41 import org.apache.http.annotation.ThreadSafe;
42 import org.apache.http.nio.ContentDecoder;
43 import org.apache.http.nio.ContentEncoder;
44 import org.apache.http.nio.IOControl;
45 import org.apache.http.nio.NHttpClientConnection;
46 import org.apache.http.nio.NHttpClientHandler;
47 import org.apache.http.nio.entity.ContentBufferEntity;
48 import org.apache.http.nio.entity.ContentOutputStream;
49 import org.apache.http.nio.params.NIOReactorPNames;
50 import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
51 import org.apache.http.nio.util.ByteBufferAllocator;
52 import org.apache.http.nio.util.ContentInputBuffer;
53 import org.apache.http.nio.util.ContentOutputBuffer;
54 import org.apache.http.nio.util.DirectByteBufferAllocator;
55 import org.apache.http.nio.util.SharedInputBuffer;
56 import org.apache.http.nio.util.SharedOutputBuffer;
57 import org.apache.http.params.HttpParams;
58 import org.apache.http.params.CoreProtocolPNames;
59 import org.apache.http.params.DefaultedHttpParams;
60 import org.apache.http.protocol.ExecutionContext;
61 import org.apache.http.protocol.HttpContext;
62 import org.apache.http.protocol.HttpProcessor;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 @Deprecated
100 @ThreadSafe
101 public class ThrottlingHttpClientHandler extends NHttpHandlerBase
102 implements NHttpClientHandler {
103
104 protected HttpRequestExecutionHandler execHandler;
105 protected final Executor executor;
106
107 private final int bufsize;
108
109 public ThrottlingHttpClientHandler(
110 final HttpProcessor httpProcessor,
111 final HttpRequestExecutionHandler execHandler,
112 final ConnectionReuseStrategy connStrategy,
113 final ByteBufferAllocator allocator,
114 final Executor executor,
115 final HttpParams params) {
116 super(httpProcessor, connStrategy, allocator, params);
117 if (execHandler == null) {
118 throw new IllegalArgumentException("HTTP request execution handler may not be null.");
119 }
120 if (executor == null) {
121 throw new IllegalArgumentException("Executor may not be null");
122 }
123 this.execHandler = execHandler;
124 this.executor = executor;
125 this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
126 }
127
128 public ThrottlingHttpClientHandler(
129 final HttpProcessor httpProcessor,
130 final HttpRequestExecutionHandler execHandler,
131 final ConnectionReuseStrategy connStrategy,
132 final Executor executor,
133 final HttpParams params) {
134 this(httpProcessor, execHandler, connStrategy,
135 new DirectByteBufferAllocator(), executor, params);
136 }
137
138 public void connected(final NHttpClientConnection conn, final Object attachment) {
139 HttpContext context = conn.getContext();
140
141 initialize(conn, attachment);
142
143 ClientConnState connState = new ClientConnState(this.bufsize, conn, this.allocator);
144 context.setAttribute(CONN_STATE, connState);
145
146 if (this.eventListener != null) {
147 this.eventListener.connectionOpen(conn);
148 }
149
150 requestReady(conn);
151 }
152
153 public void closed(final NHttpClientConnection conn) {
154 HttpContext context = conn.getContext();
155 ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
156
157 if (connState != null) {
158 synchronized (connState) {
159 connState.close();
160 connState.notifyAll();
161 }
162 }
163 this.execHandler.finalizeContext(context);
164
165 if (this.eventListener != null) {
166 this.eventListener.connectionClosed(conn);
167 }
168 }
169
170 public void exception(final NHttpClientConnection conn, final HttpException ex) {
171 closeConnection(conn, ex);
172 if (this.eventListener != null) {
173 this.eventListener.fatalProtocolException(ex, conn);
174 }
175 }
176
177 public void exception(final NHttpClientConnection conn, final IOException ex) {
178 shutdownConnection(conn, ex);
179 if (this.eventListener != null) {
180 this.eventListener.fatalIOException(ex, conn);
181 }
182 }
183
184
185 public void requestReady(final NHttpClientConnection conn) {
186 HttpContext context = conn.getContext();
187
188 ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
189
190 try {
191
192 synchronized (connState) {
193 if (connState.getOutputState() != ClientConnState.READY) {
194 return;
195 }
196
197 HttpRequest request = this.execHandler.submitRequest(context);
198 if (request == null) {
199 return;
200 }
201
202 request.setParams(
203 new DefaultedHttpParams(request.getParams(), this.params));
204
205 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
206 this.httpProcessor.process(request, context);
207 connState.setRequest(request);
208 conn.submitRequest(request);
209 connState.setOutputState(ClientConnState.REQUEST_SENT);
210
211 conn.requestInput();
212
213 if (request instanceof HttpEntityEnclosingRequest) {
214 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
215 int timeout = conn.getSocketTimeout();
216 connState.setTimeout(timeout);
217 timeout = this.params.getIntParameter(
218 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
219 conn.setSocketTimeout(timeout);
220 connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
221 } else {
222 sendRequestBody(
223 (HttpEntityEnclosingRequest) request,
224 connState,
225 conn);
226 }
227 }
228
229 connState.notifyAll();
230 }
231
232 } catch (IOException ex) {
233 shutdownConnection(conn, ex);
234 if (this.eventListener != null) {
235 this.eventListener.fatalIOException(ex, conn);
236 }
237 } catch (HttpException ex) {
238 closeConnection(conn, ex);
239 if (this.eventListener != null) {
240 this.eventListener.fatalProtocolException(ex, conn);
241 }
242 }
243 }
244
245 public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
246 HttpContext context = conn.getContext();
247
248 ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
249
250 try {
251
252 synchronized (connState) {
253 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
254 conn.suspendOutput();
255 return;
256 }
257 ContentOutputBuffer buffer = connState.getOutbuffer();
258 buffer.produceContent(encoder);
259 if (encoder.isCompleted()) {
260 connState.setInputState(ClientConnState.REQUEST_BODY_DONE);
261 } else {
262 connState.setInputState(ClientConnState.REQUEST_BODY_STREAM);
263 }
264
265 connState.notifyAll();
266 }
267
268 } catch (IOException ex) {
269 shutdownConnection(conn, ex);
270 if (this.eventListener != null) {
271 this.eventListener.fatalIOException(ex, conn);
272 }
273 }
274 }
275
276 public void responseReceived(final NHttpClientConnection conn) {
277 HttpContext context = conn.getContext();
278 ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
279
280 try {
281
282 synchronized (connState) {
283 HttpResponse response = conn.getHttpResponse();
284 response.setParams(
285 new DefaultedHttpParams(response.getParams(), this.params));
286
287 HttpRequest request = connState.getRequest();
288
289 int statusCode = response.getStatusLine().getStatusCode();
290 if (statusCode < HttpStatus.SC_OK) {
291
292 if (statusCode == HttpStatus.SC_CONTINUE
293 && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
294 connState.setOutputState(ClientConnState.REQUEST_SENT);
295 continueRequest(conn, connState);
296 }
297 return;
298 } else {
299 connState.setResponse(response);
300 connState.setInputState(ClientConnState.RESPONSE_RECEIVED);
301
302 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
303 int timeout = connState.getTimeout();
304 conn.setSocketTimeout(timeout);
305 conn.resetOutput();
306 }
307 }
308
309 if (!canResponseHaveBody(request, response)) {
310 conn.resetInput();
311 response.setEntity(null);
312 connState.setInputState(ClientConnState.RESPONSE_DONE);
313
314 if (!this.connStrategy.keepAlive(response, context)) {
315 conn.close();
316 }
317 }
318
319 if (response.getEntity() != null) {
320 response.setEntity(new ContentBufferEntity(
321 response.getEntity(),
322 connState.getInbuffer()));
323 }
324
325 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
326
327 this.httpProcessor.process(response, context);
328
329 handleResponse(response, connState, conn);
330
331 connState.notifyAll();
332 }
333
334 } catch (IOException ex) {
335 shutdownConnection(conn, ex);
336 if (this.eventListener != null) {
337 this.eventListener.fatalIOException(ex, conn);
338 }
339 } catch (HttpException ex) {
340 closeConnection(conn, ex);
341 if (this.eventListener != null) {
342 this.eventListener.fatalProtocolException(ex, conn);
343 }
344 }
345 }
346
347 public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
348 HttpContext context = conn.getContext();
349
350 ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
351 try {
352
353 synchronized (connState) {
354 HttpResponse response = connState.getResponse();
355 ContentInputBuffer buffer = connState.getInbuffer();
356
357 buffer.consumeContent(decoder);
358 if (decoder.isCompleted()) {
359 connState.setInputState(ClientConnState.RESPONSE_BODY_DONE);
360
361 if (!this.connStrategy.keepAlive(response, context)) {
362 conn.close();
363 }
364 } else {
365 connState.setInputState(ClientConnState.RESPONSE_BODY_STREAM);
366 }
367
368 connState.notifyAll();
369 }
370
371 } catch (IOException ex) {
372 shutdownConnection(conn, ex);
373 if (this.eventListener != null) {
374 this.eventListener.fatalIOException(ex, conn);
375 }
376 }
377 }
378
379 public void timeout(final NHttpClientConnection conn) {
380 HttpContext context = conn.getContext();
381 ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
382
383 try {
384
385 synchronized (connState) {
386 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
387 connState.setOutputState(ClientConnState.REQUEST_SENT);
388 continueRequest(conn, connState);
389
390 connState.notifyAll();
391 return;
392 }
393 }
394
395 } catch (IOException ex) {
396 shutdownConnection(conn, ex);
397 if (this.eventListener != null) {
398 this.eventListener.fatalIOException(ex, conn);
399 }
400 }
401
402 handleTimeout(conn);
403 }
404
405 private void initialize(
406 final NHttpClientConnection conn,
407 final Object attachment) {
408 HttpContext context = conn.getContext();
409
410 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
411 this.execHandler.initalizeContext(context, attachment);
412 }
413
414 private void continueRequest(
415 final NHttpClientConnection conn,
416 final ClientConnState connState) throws IOException {
417
418 HttpRequest request = connState.getRequest();
419
420 int timeout = connState.getTimeout();
421 conn.setSocketTimeout(timeout);
422
423 sendRequestBody(
424 (HttpEntityEnclosingRequest) request,
425 connState,
426 conn);
427 }
428
429
430
431
432 private void sendRequestBody(
433 final HttpEntityEnclosingRequest request,
434 final ClientConnState connState,
435 final NHttpClientConnection conn) throws IOException {
436 HttpEntity entity = request.getEntity();
437 if (entity != null) {
438
439 this.executor.execute(new Runnable() {
440
441 public void run() {
442 try {
443
444
445
446 synchronized (connState) {
447 try {
448 for (;;) {
449 int currentState = connState.getOutputState();
450 if (!connState.isWorkerRunning()) {
451 break;
452 }
453 if (currentState == ServerConnState.SHUTDOWN) {
454 return;
455 }
456 connState.wait();
457 }
458 } catch (InterruptedException ex) {
459 connState.shutdown();
460 return;
461 }
462 connState.setWorkerRunning(true);
463 }
464
465 HttpEntity entity = request.getEntity();
466 OutputStream outstream = new ContentOutputStream(
467 connState.getOutbuffer());
468 entity.writeTo(outstream);
469 outstream.flush();
470 outstream.close();
471
472 synchronized (connState) {
473 connState.setWorkerRunning(false);
474 connState.notifyAll();
475 }
476
477 } catch (IOException ex) {
478 shutdownConnection(conn, ex);
479 if (eventListener != null) {
480 eventListener.fatalIOException(ex, conn);
481 }
482 }
483 }
484
485 });
486 }
487 }
488
489 private void handleResponse(
490 final HttpResponse response,
491 final ClientConnState connState,
492 final NHttpClientConnection conn) {
493
494 final HttpContext context = conn.getContext();
495
496 this.executor.execute(new Runnable() {
497
498 public void run() {
499 try {
500
501
502
503 synchronized (connState) {
504 try {
505 for (;;) {
506 int currentState = connState.getOutputState();
507 if (!connState.isWorkerRunning()) {
508 break;
509 }
510 if (currentState == ServerConnState.SHUTDOWN) {
511 return;
512 }
513 connState.wait();
514 }
515 } catch (InterruptedException ex) {
516 connState.shutdown();
517 return;
518 }
519 connState.setWorkerRunning(true);
520 }
521
522 execHandler.handleResponse(response, context);
523
524 synchronized (connState) {
525
526 try {
527 for (;;) {
528 int currentState = connState.getInputState();
529 if (currentState == ClientConnState.RESPONSE_DONE) {
530 break;
531 }
532 if (currentState == ServerConnState.SHUTDOWN) {
533 return;
534 }
535 connState.wait();
536 }
537 } catch (InterruptedException ex) {
538 connState.shutdown();
539 }
540
541 connState.resetInput();
542 connState.resetOutput();
543 if (conn.isOpen()) {
544 conn.requestOutput();
545 }
546 connState.setWorkerRunning(false);
547 connState.notifyAll();
548 }
549
550 } catch (IOException ex) {
551 shutdownConnection(conn, ex);
552 if (eventListener != null) {
553 eventListener.fatalIOException(ex, conn);
554 }
555 }
556 }
557
558 });
559
560 }
561
562 static class ClientConnState {
563
564 public static final int SHUTDOWN = -1;
565 public static final int READY = 0;
566 public static final int REQUEST_SENT = 1;
567 public static final int EXPECT_CONTINUE = 2;
568 public static final int REQUEST_BODY_STREAM = 4;
569 public static final int REQUEST_BODY_DONE = 8;
570 public static final int RESPONSE_RECEIVED = 16;
571 public static final int RESPONSE_BODY_STREAM = 32;
572 public static final int RESPONSE_BODY_DONE = 64;
573 public static final int RESPONSE_DONE = 64;
574
575 private final SharedInputBuffer inbuffer;
576 private final SharedOutputBuffer outbuffer;
577
578 private volatile int inputState;
579 private volatile int outputState;
580
581 private volatile HttpRequest request;
582 private volatile HttpResponse response;
583
584 private volatile int timeout;
585
586 private volatile boolean workerRunning;
587
588 public ClientConnState(
589 int bufsize,
590 final IOControl ioControl,
591 final ByteBufferAllocator allocator) {
592 super();
593 this.inbuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
594 this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
595 this.inputState = READY;
596 this.outputState = READY;
597 }
598
599 public ContentInputBuffer getInbuffer() {
600 return this.inbuffer;
601 }
602
603 public ContentOutputBuffer getOutbuffer() {
604 return this.outbuffer;
605 }
606
607 public int getInputState() {
608 return this.inputState;
609 }
610
611 public void setInputState(int inputState) {
612 this.inputState = inputState;
613 }
614
615 public int getOutputState() {
616 return this.outputState;
617 }
618
619 public void setOutputState(int outputState) {
620 this.outputState = outputState;
621 }
622
623 public HttpRequest getRequest() {
624 return this.request;
625 }
626
627 public void setRequest(final HttpRequest request) {
628 this.request = request;
629 }
630
631 public HttpResponse getResponse() {
632 return this.response;
633 }
634
635 public void setResponse(final HttpResponse response) {
636 this.response = response;
637 }
638
639 public int getTimeout() {
640 return this.timeout;
641 }
642
643 public void setTimeout(int timeout) {
644 this.timeout = timeout;
645 }
646
647 public boolean isWorkerRunning() {
648 return this.workerRunning;
649 }
650
651 public void setWorkerRunning(boolean b) {
652 this.workerRunning = b;
653 }
654
655 public void close() {
656 this.inbuffer.close();
657 this.outbuffer.close();
658 this.inputState = SHUTDOWN;
659 this.outputState = SHUTDOWN;
660 }
661
662 public void shutdown() {
663 this.inbuffer.shutdown();
664 this.outbuffer.shutdown();
665 this.inputState = SHUTDOWN;
666 this.outputState = SHUTDOWN;
667 }
668
669 public void resetInput() {
670 this.inbuffer.reset();
671 this.request = null;
672 this.inputState = READY;
673 }
674
675 public void resetOutput() {
676 this.outbuffer.reset();
677 this.response = null;
678 this.outputState = READY;
679 }
680
681 }
682
683 }