1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.concurrent.Executor;
33
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpEntity;
36 import org.apache.http.HttpEntityEnclosingRequest;
37 import org.apache.http.HttpException;
38 import org.apache.http.HttpRequest;
39 import org.apache.http.HttpResponse;
40 import org.apache.http.HttpResponseFactory;
41 import org.apache.http.HttpStatus;
42 import org.apache.http.HttpVersion;
43 import org.apache.http.MethodNotSupportedException;
44 import org.apache.http.ProtocolVersion;
45 import org.apache.http.ProtocolException;
46 import org.apache.http.UnsupportedHttpVersionException;
47 import org.apache.http.annotation.ThreadSafe;
48 import org.apache.http.entity.ByteArrayEntity;
49 import org.apache.http.nio.ContentDecoder;
50 import org.apache.http.nio.ContentEncoder;
51 import org.apache.http.nio.IOControl;
52 import org.apache.http.nio.NHttpConnection;
53 import org.apache.http.nio.NHttpServerConnection;
54 import org.apache.http.nio.NHttpServiceHandler;
55 import org.apache.http.nio.entity.ContentBufferEntity;
56 import org.apache.http.nio.entity.ContentOutputStream;
57 import org.apache.http.nio.params.NIOReactorPNames;
58 import org.apache.http.nio.util.ByteBufferAllocator;
59 import org.apache.http.nio.util.ContentInputBuffer;
60 import org.apache.http.nio.util.ContentOutputBuffer;
61 import org.apache.http.nio.util.DirectByteBufferAllocator;
62 import org.apache.http.nio.util.SharedInputBuffer;
63 import org.apache.http.nio.util.SharedOutputBuffer;
64 import org.apache.http.params.HttpParams;
65 import org.apache.http.params.DefaultedHttpParams;
66 import org.apache.http.protocol.HttpContext;
67 import org.apache.http.protocol.ExecutionContext;
68 import org.apache.http.protocol.HttpExpectationVerifier;
69 import org.apache.http.protocol.HttpProcessor;
70 import org.apache.http.protocol.HttpRequestHandler;
71 import org.apache.http.protocol.HttpRequestHandlerResolver;
72 import org.apache.http.util.EncodingUtils;
73 import org.apache.http.util.EntityUtils;
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 @Deprecated
111 @ThreadSafe
112 public class ThrottlingHttpServiceHandler extends NHttpHandlerBase
113 implements NHttpServiceHandler {
114
115 protected final HttpResponseFactory responseFactory;
116 protected final Executor executor;
117
118 protected HttpRequestHandlerResolver handlerResolver;
119 protected HttpExpectationVerifier expectationVerifier;
120
121 private final int bufsize;
122
123 public ThrottlingHttpServiceHandler(
124 final HttpProcessor httpProcessor,
125 final HttpResponseFactory responseFactory,
126 final ConnectionReuseStrategy connStrategy,
127 final ByteBufferAllocator allocator,
128 final Executor executor,
129 final HttpParams params) {
130 super(httpProcessor, connStrategy, allocator, params);
131 if (responseFactory == null) {
132 throw new IllegalArgumentException("Response factory may not be null");
133 }
134 if (executor == null) {
135 throw new IllegalArgumentException("Executor may not be null");
136 }
137 this.responseFactory = responseFactory;
138 this.executor = executor;
139 this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
140 }
141
142 public ThrottlingHttpServiceHandler(
143 final HttpProcessor httpProcessor,
144 final HttpResponseFactory responseFactory,
145 final ConnectionReuseStrategy connStrategy,
146 final Executor executor,
147 final HttpParams params) {
148 this(httpProcessor, responseFactory, connStrategy,
149 new DirectByteBufferAllocator(), executor, params);
150 }
151
152 public void setHandlerResolver(final HttpRequestHandlerResolver handlerResolver) {
153 this.handlerResolver = handlerResolver;
154 }
155
156 public void setExpectationVerifier(final HttpExpectationVerifier expectationVerifier) {
157 this.expectationVerifier = expectationVerifier;
158 }
159
160 public void connected(final NHttpServerConnection conn) {
161 HttpContext context = conn.getContext();
162
163 ServerConnState connState = new ServerConnState(this.bufsize, conn, allocator);
164 context.setAttribute(CONN_STATE, connState);
165
166 if (this.eventListener != null) {
167 this.eventListener.connectionOpen(conn);
168 }
169 }
170
171 public void closed(final NHttpServerConnection conn) {
172 HttpContext context = conn.getContext();
173 ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
174
175 if (connState != null) {
176 synchronized (connState) {
177 connState.close();
178 connState.notifyAll();
179 }
180 }
181
182 if (this.eventListener != null) {
183 this.eventListener.connectionClosed(conn);
184 }
185 }
186
187 public void exception(final NHttpServerConnection conn, final HttpException httpex) {
188 if (conn.isResponseSubmitted()) {
189 if (eventListener != null) {
190 eventListener.fatalProtocolException(httpex, conn);
191 }
192 return;
193 }
194
195 HttpContext context = conn.getContext();
196
197 ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
198
199 try {
200
201 HttpResponse response = this.responseFactory.newHttpResponse(
202 HttpVersion.HTTP_1_0,
203 HttpStatus.SC_INTERNAL_SERVER_ERROR,
204 context);
205 response.setParams(
206 new DefaultedHttpParams(response.getParams(), this.params));
207 handleException(httpex, response);
208 response.setEntity(null);
209
210 this.httpProcessor.process(response, context);
211
212 synchronized (connState) {
213 connState.setResponse(response);
214
215 conn.requestOutput();
216 }
217
218 } catch (IOException ex) {
219 shutdownConnection(conn, ex);
220 if (eventListener != null) {
221 eventListener.fatalIOException(ex, conn);
222 }
223 } catch (HttpException ex) {
224 closeConnection(conn, ex);
225 if (eventListener != null) {
226 eventListener.fatalProtocolException(ex, conn);
227 }
228 }
229 }
230
231 public void exception(final NHttpServerConnection conn, final IOException ex) {
232 shutdownConnection(conn, ex);
233
234 if (this.eventListener != null) {
235 this.eventListener.fatalIOException(ex, conn);
236 }
237 }
238
239 public void timeout(final NHttpServerConnection conn) {
240 handleTimeout(conn);
241 }
242
243 public void requestReceived(final NHttpServerConnection conn) {
244 HttpContext context = conn.getContext();
245
246 final HttpRequest request = conn.getHttpRequest();
247 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
248
249 synchronized (connState) {
250 boolean contentExpected = false;
251 if (request instanceof HttpEntityEnclosingRequest) {
252 HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
253 if (entity != null) {
254 contentExpected = true;
255 }
256 }
257
258 if (!contentExpected) {
259 conn.suspendInput();
260 }
261
262 this.executor.execute(new Runnable() {
263
264 public void run() {
265 try {
266
267 handleRequest(request, connState, conn);
268
269 } catch (IOException ex) {
270 shutdownConnection(conn, ex);
271 if (eventListener != null) {
272 eventListener.fatalIOException(ex, conn);
273 }
274 } catch (HttpException ex) {
275 shutdownConnection(conn, ex);
276 if (eventListener != null) {
277 eventListener.fatalProtocolException(ex, conn);
278 }
279 }
280 }
281
282 });
283
284 connState.notifyAll();
285 }
286
287 }
288
289 public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
290 HttpContext context = conn.getContext();
291
292 ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
293
294 try {
295
296 synchronized (connState) {
297 ContentInputBuffer buffer = connState.getInbuffer();
298
299 buffer.consumeContent(decoder);
300 if (decoder.isCompleted()) {
301 connState.setInputState(ServerConnState.REQUEST_BODY_DONE);
302 } else {
303 connState.setInputState(ServerConnState.REQUEST_BODY_STREAM);
304 }
305
306 connState.notifyAll();
307 }
308
309 } catch (IOException ex) {
310 shutdownConnection(conn, ex);
311 if (this.eventListener != null) {
312 this.eventListener.fatalIOException(ex, conn);
313 }
314 }
315
316 }
317
318 public void responseReady(final NHttpServerConnection conn) {
319 HttpContext context = conn.getContext();
320
321 ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
322
323 try {
324
325 synchronized (connState) {
326 if (connState.isExpectationFailed()) {
327
328
329
330 conn.resetInput();
331 connState.setExpectationFailed(false);
332 }
333
334 HttpResponse response = connState.getResponse();
335 if (connState.getOutputState() == ServerConnState.READY
336 && response != null
337 && !conn.isResponseSubmitted()) {
338
339 conn.submitResponse(response);
340 int statusCode = response.getStatusLine().getStatusCode();
341 HttpEntity entity = response.getEntity();
342
343 if (statusCode >= 200 && entity == null) {
344 connState.setOutputState(ServerConnState.RESPONSE_DONE);
345
346 if (!this.connStrategy.keepAlive(response, context)) {
347 conn.close();
348 }
349 } else {
350 connState.setOutputState(ServerConnState.RESPONSE_SENT);
351 }
352 }
353
354 connState.notifyAll();
355 }
356
357 } catch (IOException ex) {
358 shutdownConnection(conn, ex);
359 if (eventListener != null) {
360 eventListener.fatalIOException(ex, conn);
361 }
362 } catch (HttpException ex) {
363 closeConnection(conn, ex);
364 if (eventListener != null) {
365 eventListener.fatalProtocolException(ex, conn);
366 }
367 }
368 }
369
370 public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
371 HttpContext context = conn.getContext();
372
373 ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
374
375 try {
376
377 synchronized (connState) {
378 HttpResponse response = connState.getResponse();
379 ContentOutputBuffer buffer = connState.getOutbuffer();
380
381 buffer.produceContent(encoder);
382 if (encoder.isCompleted()) {
383 connState.setOutputState(ServerConnState.RESPONSE_BODY_DONE);
384
385 if (!this.connStrategy.keepAlive(response, context)) {
386 conn.close();
387 }
388 } else {
389 connState.setOutputState(ServerConnState.RESPONSE_BODY_STREAM);
390 }
391
392 connState.notifyAll();
393 }
394
395 } catch (IOException ex) {
396 shutdownConnection(conn, ex);
397 if (this.eventListener != null) {
398 this.eventListener.fatalIOException(ex, conn);
399 }
400 }
401 }
402
403 private void handleException(final HttpException ex, final HttpResponse response) {
404 if (ex instanceof MethodNotSupportedException) {
405 response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
406 } else if (ex instanceof UnsupportedHttpVersionException) {
407 response.setStatusCode(HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED);
408 } else if (ex instanceof ProtocolException) {
409 response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
410 } else {
411 response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
412 }
413 byte[] msg = EncodingUtils.getAsciiBytes(ex.getMessage());
414 ByteArrayEntity entity = new ByteArrayEntity(msg);
415 entity.setContentType("text/plain; charset=US-ASCII");
416 response.setEntity(entity);
417 }
418
419 private void handleRequest(
420 final HttpRequest request,
421 final ServerConnState connState,
422 final NHttpServerConnection conn) throws HttpException, IOException {
423
424 HttpContext context = conn.getContext();
425
426
427
428 synchronized (connState) {
429 try {
430 for (;;) {
431 int currentState = connState.getOutputState();
432 if (currentState == ServerConnState.READY) {
433 break;
434 }
435 if (currentState == ServerConnState.SHUTDOWN) {
436 return;
437 }
438 connState.wait();
439 }
440 } catch (InterruptedException ex) {
441 connState.shutdown();
442 return;
443 }
444 connState.setInputState(ServerConnState.REQUEST_RECEIVED);
445 connState.setRequest(request);
446 }
447
448 request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
449
450 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
451 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
452
453 ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
454
455 if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
456
457 ver = HttpVersion.HTTP_1_1;
458 }
459
460 HttpResponse response = null;
461
462 if (request instanceof HttpEntityEnclosingRequest) {
463 HttpEntityEnclosingRequest eeRequest = (HttpEntityEnclosingRequest) request;
464
465 if (eeRequest.expectContinue()) {
466 response = this.responseFactory.newHttpResponse(
467 ver,
468 HttpStatus.SC_CONTINUE,
469 context);
470 response.setParams(
471 new DefaultedHttpParams(response.getParams(), this.params));
472 if (this.expectationVerifier != null) {
473 try {
474 this.expectationVerifier.verify(request, response, context);
475 } catch (HttpException ex) {
476 response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0,
477 HttpStatus.SC_INTERNAL_SERVER_ERROR, context);
478 response.setParams(
479 new DefaultedHttpParams(response.getParams(), this.params));
480 handleException(ex, response);
481 }
482 }
483
484 synchronized (connState) {
485 if (response.getStatusLine().getStatusCode() < 200) {
486
487
488 connState.setResponse(response);
489 conn.requestOutput();
490
491
492 try {
493 for (;;) {
494 int currentState = connState.getOutputState();
495 if (currentState == ServerConnState.RESPONSE_SENT) {
496 break;
497 }
498 if (currentState == ServerConnState.SHUTDOWN) {
499 return;
500 }
501 connState.wait();
502 }
503 } catch (InterruptedException ex) {
504 connState.shutdown();
505 return;
506 }
507 connState.resetOutput();
508 response = null;
509 } else {
510
511 eeRequest.setEntity(null);
512 conn.suspendInput();
513 connState.setExpectationFailed(true);
514 }
515 }
516 }
517
518
519 if (eeRequest.getEntity() != null) {
520 eeRequest.setEntity(new ContentBufferEntity(
521 eeRequest.getEntity(),
522 connState.getInbuffer()));
523 }
524
525 }
526
527 if (response == null) {
528 response = this.responseFactory.newHttpResponse(
529 ver,
530 HttpStatus.SC_OK,
531 context);
532 response.setParams(
533 new DefaultedHttpParams(response.getParams(), this.params));
534
535 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
536
537 try {
538
539 this.httpProcessor.process(request, context);
540
541 HttpRequestHandler handler = null;
542 if (this.handlerResolver != null) {
543 String requestURI = request.getRequestLine().getUri();
544 handler = this.handlerResolver.lookup(requestURI);
545 }
546 if (handler != null) {
547 handler.handle(request, response, context);
548 } else {
549 response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
550 }
551
552 } catch (HttpException ex) {
553 response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0,
554 HttpStatus.SC_INTERNAL_SERVER_ERROR, context);
555 response.setParams(
556 new DefaultedHttpParams(response.getParams(), this.params));
557 handleException(ex, response);
558 }
559 }
560
561 if (request instanceof HttpEntityEnclosingRequest) {
562 HttpEntityEnclosingRequest eeRequest = (HttpEntityEnclosingRequest) request;
563 HttpEntity entity = eeRequest.getEntity();
564 EntityUtils.consume(entity);
565 }
566
567
568 connState.resetInput();
569
570 this.httpProcessor.process(response, context);
571
572 if (!canResponseHaveBody(request, response)) {
573 response.setEntity(null);
574 }
575
576 connState.setResponse(response);
577
578 conn.requestOutput();
579
580 if (response.getEntity() != null) {
581 ContentOutputBuffer buffer = connState.getOutbuffer();
582 OutputStream outstream = new ContentOutputStream(buffer);
583
584 HttpEntity entity = response.getEntity();
585 entity.writeTo(outstream);
586 outstream.flush();
587 outstream.close();
588 }
589
590 synchronized (connState) {
591 try {
592 for (;;) {
593 int currentState = connState.getOutputState();
594 if (currentState == ServerConnState.RESPONSE_DONE) {
595 break;
596 }
597 if (currentState == ServerConnState.SHUTDOWN) {
598 return;
599 }
600 connState.wait();
601 }
602 } catch (InterruptedException ex) {
603 connState.shutdown();
604 return;
605 }
606 connState.resetOutput();
607 conn.requestInput();
608 connState.notifyAll();
609 }
610 }
611
612 @Override
613 protected void shutdownConnection(final NHttpConnection conn, final Throwable cause) {
614 HttpContext context = conn.getContext();
615
616 ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
617
618 super.shutdownConnection(conn, cause);
619
620 if (connState != null) {
621 connState.shutdown();
622 }
623 }
624
625 static class ServerConnState {
626
627 public static final int SHUTDOWN = -1;
628 public static final int READY = 0;
629 public static final int REQUEST_RECEIVED = 1;
630 public static final int REQUEST_BODY_STREAM = 2;
631 public static final int REQUEST_BODY_DONE = 4;
632 public static final int RESPONSE_SENT = 8;
633 public static final int RESPONSE_BODY_STREAM = 16;
634 public static final int RESPONSE_BODY_DONE = 32;
635 public static final int RESPONSE_DONE = 32;
636
637 private final SharedInputBuffer inbuffer;
638 private final SharedOutputBuffer outbuffer;
639
640 private volatile int inputState;
641 private volatile int outputState;
642
643 private volatile HttpRequest request;
644 private volatile HttpResponse response;
645
646 private volatile boolean expectationFailure;
647
648 public ServerConnState(
649 int bufsize,
650 final IOControl ioControl,
651 final ByteBufferAllocator allocator) {
652 super();
653 this.inbuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
654 this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
655 this.inputState = READY;
656 this.outputState = READY;
657 }
658
659 public ContentInputBuffer getInbuffer() {
660 return this.inbuffer;
661 }
662
663 public ContentOutputBuffer getOutbuffer() {
664 return this.outbuffer;
665 }
666
667 public int getInputState() {
668 return this.inputState;
669 }
670
671 public void setInputState(int inputState) {
672 this.inputState = inputState;
673 }
674
675 public int getOutputState() {
676 return this.outputState;
677 }
678
679 public void setOutputState(int outputState) {
680 this.outputState = outputState;
681 }
682
683 public HttpRequest getRequest() {
684 return this.request;
685 }
686
687 public void setRequest(final HttpRequest request) {
688 this.request = request;
689 }
690
691 public HttpResponse getResponse() {
692 return this.response;
693 }
694
695 public void setResponse(final HttpResponse response) {
696 this.response = response;
697 }
698
699 public boolean isExpectationFailed() {
700 return expectationFailure;
701 }
702
703 public void setExpectationFailed(boolean b) {
704 this.expectationFailure = b;
705 }
706
707 public void close() {
708 this.inbuffer.close();
709 this.outbuffer.close();
710 this.inputState = SHUTDOWN;
711 this.outputState = SHUTDOWN;
712 }
713
714 public void shutdown() {
715 this.inbuffer.shutdown();
716 this.outbuffer.shutdown();
717 this.inputState = SHUTDOWN;
718 this.outputState = SHUTDOWN;
719 }
720
721 public void resetInput() {
722 this.inbuffer.reset();
723 this.request = null;
724 this.inputState = READY;
725 }
726
727 public void resetOutput() {
728 this.outbuffer.reset();
729 this.response = null;
730 this.outputState = READY;
731 this.expectationFailure = false;
732 }
733
734 }
735
736 }