1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.concurrent.Executor;
33
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpEntity;
36 import org.apache.http.HttpEntityEnclosingRequest;
37 import org.apache.http.HttpException;
38 import org.apache.http.HttpRequest;
39 import org.apache.http.HttpResponse;
40 import org.apache.http.HttpStatus;
41 import org.apache.http.annotation.ThreadSafe;
42 import org.apache.http.nio.ContentDecoder;
43 import org.apache.http.nio.ContentEncoder;
44 import org.apache.http.nio.IOControl;
45 import org.apache.http.nio.NHttpClientConnection;
46 import org.apache.http.nio.NHttpClientHandler;
47 import org.apache.http.nio.entity.ContentBufferEntity;
48 import org.apache.http.nio.entity.ContentOutputStream;
49 import org.apache.http.nio.params.NIOReactorPNames;
50 import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
51 import org.apache.http.nio.util.ByteBufferAllocator;
52 import org.apache.http.nio.util.ContentInputBuffer;
53 import org.apache.http.nio.util.ContentOutputBuffer;
54 import org.apache.http.nio.util.DirectByteBufferAllocator;
55 import org.apache.http.nio.util.SharedInputBuffer;
56 import org.apache.http.nio.util.SharedOutputBuffer;
57 import org.apache.http.params.CoreProtocolPNames;
58 import org.apache.http.params.DefaultedHttpParams;
59 import org.apache.http.params.HttpParams;
60 import org.apache.http.protocol.ExecutionContext;
61 import org.apache.http.protocol.HttpContext;
62 import org.apache.http.protocol.HttpProcessor;
63 import org.apache.http.util.Args;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 @Deprecated
101 @ThreadSafe
102 public class ThrottlingHttpClientHandler extends NHttpHandlerBase
103 implements NHttpClientHandler {
104
105 protected HttpRequestExecutionHandler execHandler;
106 protected final Executor executor;
107
108 private final int bufsize;
109
110 public ThrottlingHttpClientHandler(
111 final HttpProcessor httpProcessor,
112 final HttpRequestExecutionHandler execHandler,
113 final ConnectionReuseStrategy connStrategy,
114 final ByteBufferAllocator allocator,
115 final Executor executor,
116 final HttpParams params) {
117 super(httpProcessor, connStrategy, allocator, params);
118 Args.notNull(execHandler, "HTTP request execution handler");
119 Args.notNull(executor, "Executor");
120 this.execHandler = execHandler;
121 this.executor = executor;
122 this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
123 }
124
125 public ThrottlingHttpClientHandler(
126 final HttpProcessor httpProcessor,
127 final HttpRequestExecutionHandler execHandler,
128 final ConnectionReuseStrategy connStrategy,
129 final Executor executor,
130 final HttpParams params) {
131 this(httpProcessor, execHandler, connStrategy,
132 DirectByteBufferAllocator.INSTANCE, executor, params);
133 }
134
135 public void connected(final NHttpClientConnection conn, final Object attachment) {
136 final HttpContext context = conn.getContext();
137
138 initialize(conn, attachment);
139
140 final ClientConnState connState = new ClientConnState(this.bufsize, conn, this.allocator);
141 context.setAttribute(CONN_STATE, connState);
142
143 if (this.eventListener != null) {
144 this.eventListener.connectionOpen(conn);
145 }
146
147 requestReady(conn);
148 }
149
150 public void closed(final NHttpClientConnection conn) {
151 final HttpContext context = conn.getContext();
152 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
153
154 if (connState != null) {
155 synchronized (connState) {
156 connState.close();
157 connState.notifyAll();
158 }
159 }
160 this.execHandler.finalizeContext(context);
161
162 if (this.eventListener != null) {
163 this.eventListener.connectionClosed(conn);
164 }
165 }
166
167 public void exception(final NHttpClientConnection conn, final HttpException ex) {
168 closeConnection(conn, ex);
169 if (this.eventListener != null) {
170 this.eventListener.fatalProtocolException(ex, conn);
171 }
172 }
173
174 public void exception(final NHttpClientConnection conn, final IOException ex) {
175 shutdownConnection(conn, ex);
176 if (this.eventListener != null) {
177 this.eventListener.fatalIOException(ex, conn);
178 }
179 }
180
181
182 public void requestReady(final NHttpClientConnection conn) {
183 final HttpContext context = conn.getContext();
184
185 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
186
187 try {
188
189 synchronized (connState) {
190 if (connState.getOutputState() != ClientConnState.READY) {
191 return;
192 }
193
194 final HttpRequest request = this.execHandler.submitRequest(context);
195 if (request == null) {
196 return;
197 }
198
199 request.setParams(
200 new DefaultedHttpParams(request.getParams(), this.params));
201
202 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
203 this.httpProcessor.process(request, context);
204 connState.setRequest(request);
205 conn.submitRequest(request);
206 connState.setOutputState(ClientConnState.REQUEST_SENT);
207
208 conn.requestInput();
209
210 if (request instanceof HttpEntityEnclosingRequest) {
211 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
212 int timeout = conn.getSocketTimeout();
213 connState.setTimeout(timeout);
214 timeout = this.params.getIntParameter(
215 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
216 conn.setSocketTimeout(timeout);
217 connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
218 } else {
219 sendRequestBody(
220 (HttpEntityEnclosingRequest) request,
221 connState,
222 conn);
223 }
224 }
225
226 connState.notifyAll();
227 }
228
229 } catch (final IOException ex) {
230 shutdownConnection(conn, ex);
231 if (this.eventListener != null) {
232 this.eventListener.fatalIOException(ex, conn);
233 }
234 } catch (final HttpException ex) {
235 closeConnection(conn, ex);
236 if (this.eventListener != null) {
237 this.eventListener.fatalProtocolException(ex, conn);
238 }
239 }
240 }
241
242 public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
243 final HttpContext context = conn.getContext();
244
245 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
246
247 try {
248
249 synchronized (connState) {
250 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
251 conn.suspendOutput();
252 return;
253 }
254 final ContentOutputBuffer buffer = connState.getOutbuffer();
255 buffer.produceContent(encoder);
256 if (encoder.isCompleted()) {
257 connState.setInputState(ClientConnState.REQUEST_BODY_DONE);
258 } else {
259 connState.setInputState(ClientConnState.REQUEST_BODY_STREAM);
260 }
261
262 connState.notifyAll();
263 }
264
265 } catch (final IOException ex) {
266 shutdownConnection(conn, ex);
267 if (this.eventListener != null) {
268 this.eventListener.fatalIOException(ex, conn);
269 }
270 }
271 }
272
273 public void responseReceived(final NHttpClientConnection conn) {
274 final HttpContext context = conn.getContext();
275 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
276
277 try {
278
279 synchronized (connState) {
280 final HttpResponse response = conn.getHttpResponse();
281 response.setParams(
282 new DefaultedHttpParams(response.getParams(), this.params));
283
284 final HttpRequest request = connState.getRequest();
285
286 final int statusCode = response.getStatusLine().getStatusCode();
287 if (statusCode < HttpStatus.SC_OK) {
288
289 if (statusCode == HttpStatus.SC_CONTINUE
290 && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
291 connState.setOutputState(ClientConnState.REQUEST_SENT);
292 continueRequest(conn, connState);
293 }
294 return;
295 } else {
296 connState.setResponse(response);
297 connState.setInputState(ClientConnState.RESPONSE_RECEIVED);
298
299 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
300 final int timeout = connState.getTimeout();
301 conn.setSocketTimeout(timeout);
302 conn.resetOutput();
303 }
304 }
305
306 if (!canResponseHaveBody(request, response)) {
307 conn.resetInput();
308 response.setEntity(null);
309 connState.setInputState(ClientConnState.RESPONSE_DONE);
310
311 if (!this.connStrategy.keepAlive(response, context)) {
312 conn.close();
313 }
314 }
315
316 if (response.getEntity() != null) {
317 response.setEntity(new ContentBufferEntity(
318 response.getEntity(),
319 connState.getInbuffer()));
320 }
321
322 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
323
324 this.httpProcessor.process(response, context);
325
326 handleResponse(response, connState, conn);
327
328 connState.notifyAll();
329 }
330
331 } catch (final IOException ex) {
332 shutdownConnection(conn, ex);
333 if (this.eventListener != null) {
334 this.eventListener.fatalIOException(ex, conn);
335 }
336 } catch (final HttpException ex) {
337 closeConnection(conn, ex);
338 if (this.eventListener != null) {
339 this.eventListener.fatalProtocolException(ex, conn);
340 }
341 }
342 }
343
344 public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
345 final HttpContext context = conn.getContext();
346
347 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
348 try {
349
350 synchronized (connState) {
351 final HttpResponse response = connState.getResponse();
352 final ContentInputBuffer buffer = connState.getInbuffer();
353
354 buffer.consumeContent(decoder);
355 if (decoder.isCompleted()) {
356 connState.setInputState(ClientConnState.RESPONSE_BODY_DONE);
357
358 if (!this.connStrategy.keepAlive(response, context)) {
359 conn.close();
360 }
361 } else {
362 connState.setInputState(ClientConnState.RESPONSE_BODY_STREAM);
363 }
364
365 connState.notifyAll();
366 }
367
368 } catch (final IOException ex) {
369 shutdownConnection(conn, ex);
370 if (this.eventListener != null) {
371 this.eventListener.fatalIOException(ex, conn);
372 }
373 }
374 }
375
376 public void timeout(final NHttpClientConnection conn) {
377 final HttpContext context = conn.getContext();
378 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
379
380 try {
381
382 synchronized (connState) {
383 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
384 connState.setOutputState(ClientConnState.REQUEST_SENT);
385 continueRequest(conn, connState);
386
387 connState.notifyAll();
388 return;
389 }
390 }
391
392 } catch (final IOException ex) {
393 shutdownConnection(conn, ex);
394 if (this.eventListener != null) {
395 this.eventListener.fatalIOException(ex, conn);
396 }
397 }
398
399 handleTimeout(conn);
400 }
401
402 private void initialize(
403 final NHttpClientConnection conn,
404 final Object attachment) {
405 final HttpContext context = conn.getContext();
406
407 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
408 this.execHandler.initalizeContext(context, attachment);
409 }
410
411 private void continueRequest(
412 final NHttpClientConnection conn,
413 final ClientConnState connState) throws IOException {
414
415 final HttpRequest request = connState.getRequest();
416
417 final int timeout = connState.getTimeout();
418 conn.setSocketTimeout(timeout);
419
420 sendRequestBody(
421 (HttpEntityEnclosingRequest) request,
422 connState,
423 conn);
424 }
425
426
427
428
429 private void sendRequestBody(
430 final HttpEntityEnclosingRequest request,
431 final ClientConnState connState,
432 final NHttpClientConnection conn) throws IOException {
433 final HttpEntity entity = request.getEntity();
434 if (entity != null) {
435
436 this.executor.execute(new Runnable() {
437
438 public void run() {
439 try {
440
441
442
443 synchronized (connState) {
444 try {
445 for (;;) {
446 final int currentState = connState.getOutputState();
447 if (!connState.isWorkerRunning()) {
448 break;
449 }
450 if (currentState == ServerConnState.SHUTDOWN) {
451 return;
452 }
453 connState.wait();
454 }
455 } catch (final InterruptedException ex) {
456 connState.shutdown();
457 return;
458 }
459 connState.setWorkerRunning(true);
460 }
461
462 final HttpEntity entity = request.getEntity();
463 final OutputStream outstream = new ContentOutputStream(
464 connState.getOutbuffer());
465 entity.writeTo(outstream);
466 outstream.flush();
467 outstream.close();
468
469 synchronized (connState) {
470 connState.setWorkerRunning(false);
471 connState.notifyAll();
472 }
473
474 } catch (final IOException ex) {
475 shutdownConnection(conn, ex);
476 if (eventListener != null) {
477 eventListener.fatalIOException(ex, conn);
478 }
479 }
480 }
481
482 });
483 }
484 }
485
486 private void handleResponse(
487 final HttpResponse response,
488 final ClientConnState connState,
489 final NHttpClientConnection conn) {
490
491 final HttpContext context = conn.getContext();
492
493 this.executor.execute(new Runnable() {
494
495 public void run() {
496 try {
497
498
499
500 synchronized (connState) {
501 try {
502 for (;;) {
503 final int currentState = connState.getOutputState();
504 if (!connState.isWorkerRunning()) {
505 break;
506 }
507 if (currentState == ServerConnState.SHUTDOWN) {
508 return;
509 }
510 connState.wait();
511 }
512 } catch (final InterruptedException ex) {
513 connState.shutdown();
514 return;
515 }
516 connState.setWorkerRunning(true);
517 }
518
519 execHandler.handleResponse(response, context);
520
521 synchronized (connState) {
522
523 try {
524 for (;;) {
525 final int currentState = connState.getInputState();
526 if (currentState == ClientConnState.RESPONSE_DONE) {
527 break;
528 }
529 if (currentState == ServerConnState.SHUTDOWN) {
530 return;
531 }
532 connState.wait();
533 }
534 } catch (final InterruptedException ex) {
535 connState.shutdown();
536 }
537
538 connState.resetInput();
539 connState.resetOutput();
540 if (conn.isOpen()) {
541 conn.requestOutput();
542 }
543 connState.setWorkerRunning(false);
544 connState.notifyAll();
545 }
546
547 } catch (final IOException ex) {
548 shutdownConnection(conn, ex);
549 if (eventListener != null) {
550 eventListener.fatalIOException(ex, conn);
551 }
552 }
553 }
554
555 });
556
557 }
558
559 static class ClientConnState {
560
561 public static final int SHUTDOWN = -1;
562 public static final int READY = 0;
563 public static final int REQUEST_SENT = 1;
564 public static final int EXPECT_CONTINUE = 2;
565 public static final int REQUEST_BODY_STREAM = 4;
566 public static final int REQUEST_BODY_DONE = 8;
567 public static final int RESPONSE_RECEIVED = 16;
568 public static final int RESPONSE_BODY_STREAM = 32;
569 public static final int RESPONSE_BODY_DONE = 64;
570 public static final int RESPONSE_DONE = 64;
571
572 private final SharedInputBuffer inbuffer;
573 private final SharedOutputBuffer outbuffer;
574
575 private volatile int inputState;
576 private volatile int outputState;
577
578 private volatile HttpRequest request;
579 private volatile HttpResponse response;
580
581 private volatile int timeout;
582
583 private volatile boolean workerRunning;
584
585 public ClientConnState(
586 final int bufsize,
587 final IOControl ioControl,
588 final ByteBufferAllocator allocator) {
589 super();
590 this.inbuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
591 this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
592 this.inputState = READY;
593 this.outputState = READY;
594 }
595
596 public ContentInputBuffer getInbuffer() {
597 return this.inbuffer;
598 }
599
600 public ContentOutputBuffer getOutbuffer() {
601 return this.outbuffer;
602 }
603
604 public int getInputState() {
605 return this.inputState;
606 }
607
608 public void setInputState(final int inputState) {
609 this.inputState = inputState;
610 }
611
612 public int getOutputState() {
613 return this.outputState;
614 }
615
616 public void setOutputState(final int outputState) {
617 this.outputState = outputState;
618 }
619
620 public HttpRequest getRequest() {
621 return this.request;
622 }
623
624 public void setRequest(final HttpRequest request) {
625 this.request = request;
626 }
627
628 public HttpResponse getResponse() {
629 return this.response;
630 }
631
632 public void setResponse(final HttpResponse response) {
633 this.response = response;
634 }
635
636 public int getTimeout() {
637 return this.timeout;
638 }
639
640 public void setTimeout(final int timeout) {
641 this.timeout = timeout;
642 }
643
644 public boolean isWorkerRunning() {
645 return this.workerRunning;
646 }
647
648 public void setWorkerRunning(final boolean b) {
649 this.workerRunning = b;
650 }
651
652 public void close() {
653 this.inbuffer.close();
654 this.outbuffer.close();
655 this.inputState = SHUTDOWN;
656 this.outputState = SHUTDOWN;
657 }
658
659 public void shutdown() {
660 this.inbuffer.shutdown();
661 this.outbuffer.shutdown();
662 this.inputState = SHUTDOWN;
663 this.outputState = SHUTDOWN;
664 }
665
666 public void resetInput() {
667 this.inbuffer.reset();
668 this.request = null;
669 this.inputState = READY;
670 }
671
672 public void resetOutput() {
673 this.outbuffer.reset();
674 this.response = null;
675 this.outputState = READY;
676 }
677
678 }
679
680 }