View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.io.OutputStream;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.http.ConnectionReuseStrategy;
35  import org.apache.http.HttpEntity;
36  import org.apache.http.HttpEntityEnclosingRequest;
37  import org.apache.http.HttpException;
38  import org.apache.http.HttpRequest;
39  import org.apache.http.HttpResponse;
40  import org.apache.http.HttpStatus;
41  import org.apache.http.annotation.ThreadSafe;
42  import org.apache.http.nio.ContentDecoder;
43  import org.apache.http.nio.ContentEncoder;
44  import org.apache.http.nio.IOControl;
45  import org.apache.http.nio.NHttpClientConnection;
46  import org.apache.http.nio.NHttpClientHandler;
47  import org.apache.http.nio.entity.ContentBufferEntity;
48  import org.apache.http.nio.entity.ContentOutputStream;
49  import org.apache.http.nio.params.NIOReactorPNames;
50  import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
51  import org.apache.http.nio.util.ByteBufferAllocator;
52  import org.apache.http.nio.util.ContentInputBuffer;
53  import org.apache.http.nio.util.ContentOutputBuffer;
54  import org.apache.http.nio.util.DirectByteBufferAllocator;
55  import org.apache.http.nio.util.SharedInputBuffer;
56  import org.apache.http.nio.util.SharedOutputBuffer;
57  import org.apache.http.params.CoreProtocolPNames;
58  import org.apache.http.params.DefaultedHttpParams;
59  import org.apache.http.params.HttpParams;
60  import org.apache.http.protocol.ExecutionContext;
61  import org.apache.http.protocol.HttpContext;
62  import org.apache.http.protocol.HttpProcessor;
63  import org.apache.http.util.Args;
64  
65  /**
66   * Client protocol handler implementation that provide compatibility with
67   * the blocking I/O by utilizing shared content buffers and a fairly small pool
68   * of worker threads. The throttling protocol handler allocates input / output
69   * buffers of a constant length upon initialization and controls the rate of
70   * I/O events in order to ensure those content buffers do not ever get
71   * overflown. This helps ensure nearly constant memory footprint for HTTP
72   * connections and avoid the out of memory condition while streaming content
73   * in and out. The {@link HttpRequestExecutionHandler#handleResponse(HttpResponse, HttpContext)}
74   * method will fire immediately when a message is received. The protocol handler
75   * delegate the task of processing requests and generating response content to
76   * an {@link Executor}, which is expected to perform those tasks using
77   * dedicated worker threads in order to avoid blocking the I/O thread.
78   * <p/>
79   * Usually throttling protocol handlers need only a modest number of worker
80   * threads, much fewer than the number of concurrent connections. If the length
81   * of the message is smaller or about the size of the shared content buffer
82   * worker thread will just store content in the buffer and terminate almost
83   * immediately without blocking. The I/O dispatch thread in its turn will take
84   * care of sending out the buffered content asynchronously. The worker thread
85   * will have to block only when processing large messages and the shared buffer
86   * fills up. It is generally advisable to allocate shared buffers of a size of
87   * an average content body for optimal performance.
88   * <p>
89   * The following parameters can be used to customize the behavior of this
90   * class:
91   * <ul>
92   *  <li>{@link org.apache.http.nio.params.NIOReactorPNames#CONTENT_BUFFER_SIZE}</li>
93   *  <li>{@link org.apache.http.params.CoreProtocolPNames#WAIT_FOR_CONTINUE}</li>
94   * </ul>
95   *
96   * @since 4.0
97   *
98   * @deprecated (4.2) use {@link HttpAsyncRequestExecutor} and {@link HttpAsyncRequester}
99   */
100 @Deprecated
101 @ThreadSafe // provided injected dependencies are immutable or thread safe
102 public class ThrottlingHttpClientHandler extends NHttpHandlerBase
103                                          implements NHttpClientHandler {
104 
105     protected HttpRequestExecutionHandler execHandler;
106     protected final Executor executor;
107 
108     private final int bufsize;
109 
110     public ThrottlingHttpClientHandler(
111             final HttpProcessor httpProcessor,
112             final HttpRequestExecutionHandler execHandler,
113             final ConnectionReuseStrategy connStrategy,
114             final ByteBufferAllocator allocator,
115             final Executor executor,
116             final HttpParams params) {
117         super(httpProcessor, connStrategy, allocator, params);
118         Args.notNull(execHandler, "HTTP request execution handler");
119         Args.notNull(executor, "Executor");
120         this.execHandler = execHandler;
121         this.executor = executor;
122         this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
123     }
124 
125     public ThrottlingHttpClientHandler(
126             final HttpProcessor httpProcessor,
127             final HttpRequestExecutionHandler execHandler,
128             final ConnectionReuseStrategy connStrategy,
129             final Executor executor,
130             final HttpParams params) {
131         this(httpProcessor, execHandler, connStrategy,
132                 DirectByteBufferAllocator.INSTANCE, executor, params);
133     }
134 
135     public void connected(final NHttpClientConnection conn, final Object attachment) {
136         final HttpContext context = conn.getContext();
137 
138         initialize(conn, attachment);
139 
140         final ClientConnState connState = new ClientConnState(this.bufsize, conn, this.allocator);
141         context.setAttribute(CONN_STATE, connState);
142 
143         if (this.eventListener != null) {
144             this.eventListener.connectionOpen(conn);
145         }
146 
147         requestReady(conn);
148     }
149 
150     public void closed(final NHttpClientConnection conn) {
151         final HttpContext context = conn.getContext();
152         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
153 
154         if (connState != null) {
155             synchronized (connState) {
156                 connState.close();
157                 connState.notifyAll();
158             }
159         }
160         this.execHandler.finalizeContext(context);
161 
162         if (this.eventListener != null) {
163             this.eventListener.connectionClosed(conn);
164         }
165     }
166 
167     public void exception(final NHttpClientConnection conn, final HttpException ex) {
168         closeConnection(conn, ex);
169         if (this.eventListener != null) {
170             this.eventListener.fatalProtocolException(ex, conn);
171         }
172     }
173 
174     public void exception(final NHttpClientConnection conn, final IOException ex) {
175         shutdownConnection(conn, ex);
176         if (this.eventListener != null) {
177             this.eventListener.fatalIOException(ex, conn);
178         }
179     }
180 
181 
182     public void requestReady(final NHttpClientConnection conn) {
183         final HttpContext context = conn.getContext();
184 
185         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
186 
187         try {
188 
189             synchronized (connState) {
190                 if (connState.getOutputState() != ClientConnState.READY) {
191                     return;
192                 }
193 
194                 final HttpRequest request = this.execHandler.submitRequest(context);
195                 if (request == null) {
196                     return;
197                 }
198 
199                 request.setParams(
200                         new DefaultedHttpParams(request.getParams(), this.params));
201 
202                 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
203                 this.httpProcessor.process(request, context);
204                 connState.setRequest(request);
205                 conn.submitRequest(request);
206                 connState.setOutputState(ClientConnState.REQUEST_SENT);
207 
208                 conn.requestInput();
209 
210                 if (request instanceof HttpEntityEnclosingRequest) {
211                     if (((HttpEntityEnclosingRequest) request).expectContinue()) {
212                         int timeout = conn.getSocketTimeout();
213                         connState.setTimeout(timeout);
214                         timeout = this.params.getIntParameter(
215                                 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
216                         conn.setSocketTimeout(timeout);
217                         connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
218                     } else {
219                         sendRequestBody(
220                                 (HttpEntityEnclosingRequest) request,
221                                 connState,
222                                 conn);
223                     }
224                 }
225 
226                 connState.notifyAll();
227             }
228 
229         } catch (final IOException ex) {
230             shutdownConnection(conn, ex);
231             if (this.eventListener != null) {
232                 this.eventListener.fatalIOException(ex, conn);
233             }
234         } catch (final HttpException ex) {
235             closeConnection(conn, ex);
236             if (this.eventListener != null) {
237                 this.eventListener.fatalProtocolException(ex, conn);
238             }
239         }
240     }
241 
242     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
243         final HttpContext context = conn.getContext();
244 
245         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
246 
247         try {
248 
249             synchronized (connState) {
250                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
251                     conn.suspendOutput();
252                     return;
253                 }
254                 final ContentOutputBuffer buffer = connState.getOutbuffer();
255                 buffer.produceContent(encoder);
256                 if (encoder.isCompleted()) {
257                     connState.setInputState(ClientConnState.REQUEST_BODY_DONE);
258                 } else {
259                     connState.setInputState(ClientConnState.REQUEST_BODY_STREAM);
260                 }
261 
262                 connState.notifyAll();
263             }
264 
265         } catch (final IOException ex) {
266             shutdownConnection(conn, ex);
267             if (this.eventListener != null) {
268                 this.eventListener.fatalIOException(ex, conn);
269             }
270         }
271     }
272 
273     public void responseReceived(final NHttpClientConnection conn) {
274         final HttpContext context = conn.getContext();
275         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
276 
277         try {
278 
279             synchronized (connState) {
280                 final HttpResponse response = conn.getHttpResponse();
281                 response.setParams(
282                         new DefaultedHttpParams(response.getParams(), this.params));
283 
284                 final HttpRequest request = connState.getRequest();
285 
286                 final int statusCode = response.getStatusLine().getStatusCode();
287                 if (statusCode < HttpStatus.SC_OK) {
288                     // 1xx intermediate response
289                     if (statusCode == HttpStatus.SC_CONTINUE
290                             && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
291                         connState.setOutputState(ClientConnState.REQUEST_SENT);
292                         continueRequest(conn, connState);
293                     }
294                     return;
295                 } else {
296                     connState.setResponse(response);
297                     connState.setInputState(ClientConnState.RESPONSE_RECEIVED);
298 
299                     if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
300                         final int timeout = connState.getTimeout();
301                         conn.setSocketTimeout(timeout);
302                         conn.resetOutput();
303                     }
304                 }
305 
306                 if (!canResponseHaveBody(request, response)) {
307                     conn.resetInput();
308                     response.setEntity(null);
309                     connState.setInputState(ClientConnState.RESPONSE_DONE);
310 
311                     if (!this.connStrategy.keepAlive(response, context)) {
312                         conn.close();
313                     }
314                 }
315 
316                 if (response.getEntity() != null) {
317                     response.setEntity(new ContentBufferEntity(
318                             response.getEntity(),
319                             connState.getInbuffer()));
320                 }
321 
322                 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
323 
324                 this.httpProcessor.process(response, context);
325 
326                 handleResponse(response, connState, conn);
327 
328                 connState.notifyAll();
329             }
330 
331         } catch (final IOException ex) {
332             shutdownConnection(conn, ex);
333             if (this.eventListener != null) {
334                 this.eventListener.fatalIOException(ex, conn);
335             }
336         } catch (final HttpException ex) {
337             closeConnection(conn, ex);
338             if (this.eventListener != null) {
339                 this.eventListener.fatalProtocolException(ex, conn);
340             }
341         }
342     }
343 
344     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
345         final HttpContext context = conn.getContext();
346 
347         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
348         try {
349 
350             synchronized (connState) {
351                 final HttpResponse response = connState.getResponse();
352                 final ContentInputBuffer buffer = connState.getInbuffer();
353 
354                 buffer.consumeContent(decoder);
355                 if (decoder.isCompleted()) {
356                     connState.setInputState(ClientConnState.RESPONSE_BODY_DONE);
357 
358                     if (!this.connStrategy.keepAlive(response, context)) {
359                         conn.close();
360                     }
361                 } else {
362                     connState.setInputState(ClientConnState.RESPONSE_BODY_STREAM);
363                 }
364 
365                 connState.notifyAll();
366             }
367 
368         } catch (final IOException ex) {
369             shutdownConnection(conn, ex);
370             if (this.eventListener != null) {
371                 this.eventListener.fatalIOException(ex, conn);
372             }
373         }
374     }
375 
376     public void timeout(final NHttpClientConnection conn) {
377         final HttpContext context = conn.getContext();
378         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
379 
380         try {
381 
382             synchronized (connState) {
383                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
384                     connState.setOutputState(ClientConnState.REQUEST_SENT);
385                     continueRequest(conn, connState);
386 
387                     connState.notifyAll();
388                     return;
389                 }
390             }
391 
392         } catch (final IOException ex) {
393             shutdownConnection(conn, ex);
394             if (this.eventListener != null) {
395                 this.eventListener.fatalIOException(ex, conn);
396             }
397         }
398 
399         handleTimeout(conn);
400     }
401 
402     private void initialize(
403             final NHttpClientConnection conn,
404             final Object attachment) {
405         final HttpContext context = conn.getContext();
406 
407         context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
408         this.execHandler.initalizeContext(context, attachment);
409     }
410 
411     private void continueRequest(
412             final NHttpClientConnection conn,
413             final ClientConnState connState) throws IOException {
414 
415         final HttpRequest request = connState.getRequest();
416 
417         final int timeout = connState.getTimeout();
418         conn.setSocketTimeout(timeout);
419 
420         sendRequestBody(
421                 (HttpEntityEnclosingRequest) request,
422                 connState,
423                 conn);
424     }
425 
426     /**
427      * @throws IOException - not thrown currently
428      */
429     private void sendRequestBody(
430             final HttpEntityEnclosingRequest request,
431             final ClientConnState connState,
432             final NHttpClientConnection conn) throws IOException {
433         final HttpEntity entity = request.getEntity();
434         if (entity != null) {
435 
436             this.executor.execute(new Runnable() {
437 
438                 public void run() {
439                     try {
440 
441                         // Block until previous request is fully processed and
442                         // the worker thread no longer holds the shared buffer
443                         synchronized (connState) {
444                             try {
445                                 for (;;) {
446                                     final int currentState = connState.getOutputState();
447                                     if (!connState.isWorkerRunning()) {
448                                         break;
449                                     }
450                                     if (currentState == ServerConnState.SHUTDOWN) {
451                                         return;
452                                     }
453                                     connState.wait();
454                                 }
455                             } catch (final InterruptedException ex) {
456                                 connState.shutdown();
457                                 return;
458                             }
459                             connState.setWorkerRunning(true);
460                         }
461 
462                         final HttpEntity entity = request.getEntity();
463                         final OutputStream outstream = new ContentOutputStream(
464                                 connState.getOutbuffer());
465                         entity.writeTo(outstream);
466                         outstream.flush();
467                         outstream.close();
468 
469                         synchronized (connState) {
470                             connState.setWorkerRunning(false);
471                             connState.notifyAll();
472                         }
473 
474                     } catch (final IOException ex) {
475                         shutdownConnection(conn, ex);
476                         if (eventListener != null) {
477                             eventListener.fatalIOException(ex, conn);
478                         }
479                     }
480                 }
481 
482             });
483         }
484     }
485 
486     private void handleResponse(
487             final HttpResponse response,
488             final ClientConnState connState,
489             final NHttpClientConnection conn) {
490 
491         final HttpContext context = conn.getContext();
492 
493         this.executor.execute(new Runnable() {
494 
495             public void run() {
496                 try {
497 
498                     // Block until previous request is fully processed and
499                     // the worker thread no longer holds the shared buffer
500                     synchronized (connState) {
501                         try {
502                             for (;;) {
503                                 final int currentState = connState.getOutputState();
504                                 if (!connState.isWorkerRunning()) {
505                                     break;
506                                 }
507                                 if (currentState == ServerConnState.SHUTDOWN) {
508                                     return;
509                                 }
510                                 connState.wait();
511                             }
512                         } catch (final InterruptedException ex) {
513                             connState.shutdown();
514                             return;
515                         }
516                         connState.setWorkerRunning(true);
517                     }
518 
519                     execHandler.handleResponse(response, context);
520 
521                     synchronized (connState) {
522 
523                         try {
524                             for (;;) {
525                                 final int currentState = connState.getInputState();
526                                 if (currentState == ClientConnState.RESPONSE_DONE) {
527                                     break;
528                                 }
529                                 if (currentState == ServerConnState.SHUTDOWN) {
530                                     return;
531                                 }
532                                 connState.wait();
533                             }
534                         } catch (final InterruptedException ex) {
535                             connState.shutdown();
536                         }
537 
538                         connState.resetInput();
539                         connState.resetOutput();
540                         if (conn.isOpen()) {
541                             conn.requestOutput();
542                         }
543                         connState.setWorkerRunning(false);
544                         connState.notifyAll();
545                     }
546 
547                 } catch (final IOException ex) {
548                     shutdownConnection(conn, ex);
549                     if (eventListener != null) {
550                         eventListener.fatalIOException(ex, conn);
551                     }
552                 }
553             }
554 
555         });
556 
557     }
558 
559     static class ClientConnState {
560 
561         public static final int SHUTDOWN                   = -1;
562         public static final int READY                      = 0;
563         public static final int REQUEST_SENT               = 1;
564         public static final int EXPECT_CONTINUE            = 2;
565         public static final int REQUEST_BODY_STREAM        = 4;
566         public static final int REQUEST_BODY_DONE          = 8;
567         public static final int RESPONSE_RECEIVED          = 16;
568         public static final int RESPONSE_BODY_STREAM       = 32;
569         public static final int RESPONSE_BODY_DONE         = 64;
570         public static final int RESPONSE_DONE              = 64;
571 
572         private final SharedInputBuffer inbuffer;
573         private final SharedOutputBuffer outbuffer;
574 
575         private volatile int inputState;
576         private volatile int outputState;
577 
578         private volatile HttpRequest request;
579         private volatile HttpResponse response;
580 
581         private volatile int timeout;
582 
583         private volatile boolean workerRunning;
584 
585         public ClientConnState(
586                 final int bufsize,
587                 final IOControl ioControl,
588                 final ByteBufferAllocator allocator) {
589             super();
590             this.inbuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
591             this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
592             this.inputState = READY;
593             this.outputState = READY;
594         }
595 
596         public ContentInputBuffer getInbuffer() {
597             return this.inbuffer;
598         }
599 
600         public ContentOutputBuffer getOutbuffer() {
601             return this.outbuffer;
602         }
603 
604         public int getInputState() {
605             return this.inputState;
606         }
607 
608         public void setInputState(final int inputState) {
609             this.inputState = inputState;
610         }
611 
612         public int getOutputState() {
613             return this.outputState;
614         }
615 
616         public void setOutputState(final int outputState) {
617             this.outputState = outputState;
618         }
619 
620         public HttpRequest getRequest() {
621             return this.request;
622         }
623 
624         public void setRequest(final HttpRequest request) {
625             this.request = request;
626         }
627 
628         public HttpResponse getResponse() {
629             return this.response;
630         }
631 
632         public void setResponse(final HttpResponse response) {
633             this.response = response;
634         }
635 
636         public int getTimeout() {
637             return this.timeout;
638         }
639 
640         public void setTimeout(final int timeout) {
641             this.timeout = timeout;
642         }
643 
644         public boolean isWorkerRunning() {
645             return this.workerRunning;
646         }
647 
648         public void setWorkerRunning(final boolean b) {
649             this.workerRunning = b;
650         }
651 
652         public void close() {
653             this.inbuffer.close();
654             this.outbuffer.close();
655             this.inputState = SHUTDOWN;
656             this.outputState = SHUTDOWN;
657         }
658 
659         public void shutdown() {
660             this.inbuffer.shutdown();
661             this.outbuffer.shutdown();
662             this.inputState = SHUTDOWN;
663             this.outputState = SHUTDOWN;
664         }
665 
666         public void resetInput() {
667             this.inbuffer.reset();
668             this.request = null;
669             this.inputState = READY;
670         }
671 
672         public void resetOutput() {
673             this.outbuffer.reset();
674             this.response = null;
675             this.outputState = READY;
676         }
677 
678     }
679 
680 }