View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.io.OutputStream;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.http.ConnectionReuseStrategy;
35  import org.apache.http.HttpEntity;
36  import org.apache.http.HttpEntityEnclosingRequest;
37  import org.apache.http.HttpException;
38  import org.apache.http.HttpRequest;
39  import org.apache.http.HttpResponse;
40  import org.apache.http.HttpStatus;
41  import org.apache.http.annotation.ThreadingBehavior;
42  import org.apache.http.annotation.Contract;
43  import org.apache.http.nio.ContentDecoder;
44  import org.apache.http.nio.ContentEncoder;
45  import org.apache.http.nio.IOControl;
46  import org.apache.http.nio.NHttpClientConnection;
47  import org.apache.http.nio.NHttpClientHandler;
48  import org.apache.http.nio.entity.ContentBufferEntity;
49  import org.apache.http.nio.entity.ContentOutputStream;
50  import org.apache.http.nio.params.NIOReactorPNames;
51  import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
52  import org.apache.http.nio.util.ByteBufferAllocator;
53  import org.apache.http.nio.util.ContentInputBuffer;
54  import org.apache.http.nio.util.ContentOutputBuffer;
55  import org.apache.http.nio.util.DirectByteBufferAllocator;
56  import org.apache.http.nio.util.SharedInputBuffer;
57  import org.apache.http.nio.util.SharedOutputBuffer;
58  import org.apache.http.params.CoreProtocolPNames;
59  import org.apache.http.params.DefaultedHttpParams;
60  import org.apache.http.params.HttpParams;
61  import org.apache.http.protocol.ExecutionContext;
62  import org.apache.http.protocol.HttpContext;
63  import org.apache.http.protocol.HttpProcessor;
64  import org.apache.http.util.Args;
65  
66  /**
67   * Client protocol handler implementation that provide compatibility with
68   * the blocking I/O by utilizing shared content buffers and a fairly small pool
69   * of worker threads. The throttling protocol handler allocates input / output
70   * buffers of a constant length upon initialization and controls the rate of
71   * I/O events in order to ensure those content buffers do not ever get
72   * overflown. This helps ensure nearly constant memory footprint for HTTP
73   * connections and avoid the out of memory condition while streaming content
74   * in and out. The {@link HttpRequestExecutionHandler#handleResponse(HttpResponse, HttpContext)}
75   * method will fire immediately when a message is received. The protocol handler
76   * delegate the task of processing requests and generating response content to
77   * an {@link Executor}, which is expected to perform those tasks using
78   * dedicated worker threads in order to avoid blocking the I/O thread.
79   * <p>
80   * Usually throttling protocol handlers need only a modest number of worker
81   * threads, much fewer than the number of concurrent connections. If the length
82   * of the message is smaller or about the size of the shared content buffer
83   * worker thread will just store content in the buffer and terminate almost
84   * immediately without blocking. The I/O dispatch thread in its turn will take
85   * care of sending out the buffered content asynchronously. The worker thread
86   * will have to block only when processing large messages and the shared buffer
87   * fills up. It is generally advisable to allocate shared buffers of a size of
88   * an average content body for optimal performance.
89   * <p>
90   * The following parameters can be used to customize the behavior of this
91   * class:
92   * <ul>
93   *  <li>{@link org.apache.http.nio.params.NIOReactorPNames#CONTENT_BUFFER_SIZE}</li>
94   *  <li>{@link org.apache.http.params.CoreProtocolPNames#WAIT_FOR_CONTINUE}</li>
95   * </ul>
96   *
97   * @since 4.0
98   *
99   * @deprecated (4.2) use {@link HttpAsyncRequestExecutor} and {@link HttpAsyncRequester}
100  */
101 @Deprecated
102 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
103 public class ThrottlingHttpClientHandler extends NHttpHandlerBase
104                                          implements NHttpClientHandler {
105 
106     protected HttpRequestExecutionHandler execHandler;
107     protected final Executor executor;
108 
109     private final int bufsize;
110 
111     public ThrottlingHttpClientHandler(
112             final HttpProcessor httpProcessor,
113             final HttpRequestExecutionHandler execHandler,
114             final ConnectionReuseStrategy connStrategy,
115             final ByteBufferAllocator allocator,
116             final Executor executor,
117             final HttpParams params) {
118         super(httpProcessor, connStrategy, allocator, params);
119         Args.notNull(execHandler, "HTTP request execution handler");
120         Args.notNull(executor, "Executor");
121         this.execHandler = execHandler;
122         this.executor = executor;
123         this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
124     }
125 
126     public ThrottlingHttpClientHandler(
127             final HttpProcessor httpProcessor,
128             final HttpRequestExecutionHandler execHandler,
129             final ConnectionReuseStrategy connStrategy,
130             final Executor executor,
131             final HttpParams params) {
132         this(httpProcessor, execHandler, connStrategy,
133                 DirectByteBufferAllocator.INSTANCE, executor, params);
134     }
135 
136     public void connected(final NHttpClientConnection conn, final Object attachment) {
137         final HttpContext context = conn.getContext();
138 
139         initialize(conn, attachment);
140 
141         final ClientConnState connState = new ClientConnState(this.bufsize, conn, this.allocator);
142         context.setAttribute(CONN_STATE, connState);
143 
144         if (this.eventListener != null) {
145             this.eventListener.connectionOpen(conn);
146         }
147 
148         requestReady(conn);
149     }
150 
151     public void closed(final NHttpClientConnection conn) {
152         final HttpContext context = conn.getContext();
153         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
154 
155         if (connState != null) {
156             synchronized (connState) {
157                 connState.close();
158                 connState.notifyAll();
159             }
160         }
161         this.execHandler.finalizeContext(context);
162 
163         if (this.eventListener != null) {
164             this.eventListener.connectionClosed(conn);
165         }
166     }
167 
168     public void exception(final NHttpClientConnection conn, final HttpException ex) {
169         closeConnection(conn, ex);
170         if (this.eventListener != null) {
171             this.eventListener.fatalProtocolException(ex, conn);
172         }
173     }
174 
175     public void exception(final NHttpClientConnection conn, final IOException ex) {
176         shutdownConnection(conn, ex);
177         if (this.eventListener != null) {
178             this.eventListener.fatalIOException(ex, conn);
179         }
180     }
181 
182 
183     public void requestReady(final NHttpClientConnection conn) {
184         final HttpContext context = conn.getContext();
185 
186         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
187 
188         try {
189 
190             synchronized (connState) {
191                 if (connState.getOutputState() != ClientConnState.READY) {
192                     return;
193                 }
194 
195                 final HttpRequest request = this.execHandler.submitRequest(context);
196                 if (request == null) {
197                     return;
198                 }
199 
200                 request.setParams(
201                         new DefaultedHttpParams(request.getParams(), this.params));
202 
203                 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
204                 this.httpProcessor.process(request, context);
205                 connState.setRequest(request);
206                 conn.submitRequest(request);
207                 connState.setOutputState(ClientConnState.REQUEST_SENT);
208 
209                 conn.requestInput();
210 
211                 if (request instanceof HttpEntityEnclosingRequest) {
212                     if (((HttpEntityEnclosingRequest) request).expectContinue()) {
213                         int timeout = conn.getSocketTimeout();
214                         connState.setTimeout(timeout);
215                         timeout = this.params.getIntParameter(
216                                 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
217                         conn.setSocketTimeout(timeout);
218                         connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
219                     } else {
220                         sendRequestBody(
221                                 (HttpEntityEnclosingRequest) request,
222                                 connState,
223                                 conn);
224                     }
225                 }
226 
227                 connState.notifyAll();
228             }
229 
230         } catch (final IOException ex) {
231             shutdownConnection(conn, ex);
232             if (this.eventListener != null) {
233                 this.eventListener.fatalIOException(ex, conn);
234             }
235         } catch (final HttpException ex) {
236             closeConnection(conn, ex);
237             if (this.eventListener != null) {
238                 this.eventListener.fatalProtocolException(ex, conn);
239             }
240         }
241     }
242 
243     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
244         final HttpContext context = conn.getContext();
245 
246         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
247 
248         try {
249 
250             synchronized (connState) {
251                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
252                     conn.suspendOutput();
253                     return;
254                 }
255                 final ContentOutputBuffer buffer = connState.getOutbuffer();
256                 buffer.produceContent(encoder);
257                 if (encoder.isCompleted()) {
258                     connState.setInputState(ClientConnState.REQUEST_BODY_DONE);
259                 } else {
260                     connState.setInputState(ClientConnState.REQUEST_BODY_STREAM);
261                 }
262 
263                 connState.notifyAll();
264             }
265 
266         } catch (final IOException ex) {
267             shutdownConnection(conn, ex);
268             if (this.eventListener != null) {
269                 this.eventListener.fatalIOException(ex, conn);
270             }
271         }
272     }
273 
274     public void responseReceived(final NHttpClientConnection conn) {
275         final HttpContext context = conn.getContext();
276         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
277 
278         try {
279 
280             synchronized (connState) {
281                 final HttpResponse response = conn.getHttpResponse();
282                 response.setParams(
283                         new DefaultedHttpParams(response.getParams(), this.params));
284 
285                 final HttpRequest request = connState.getRequest();
286 
287                 final int statusCode = response.getStatusLine().getStatusCode();
288                 if (statusCode < HttpStatus.SC_OK) {
289                     // 1xx intermediate response
290                     if (statusCode == HttpStatus.SC_CONTINUE
291                             && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
292                         connState.setOutputState(ClientConnState.REQUEST_SENT);
293                         continueRequest(conn, connState);
294                     }
295                     return;
296                 } else {
297                     connState.setResponse(response);
298                     connState.setInputState(ClientConnState.RESPONSE_RECEIVED);
299 
300                     if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
301                         final int timeout = connState.getTimeout();
302                         conn.setSocketTimeout(timeout);
303                         conn.resetOutput();
304                     }
305                 }
306 
307                 if (!canResponseHaveBody(request, response)) {
308                     conn.resetInput();
309                     response.setEntity(null);
310                     connState.setInputState(ClientConnState.RESPONSE_DONE);
311 
312                     if (!this.connStrategy.keepAlive(response, context)) {
313                         conn.close();
314                     }
315                 }
316 
317                 if (response.getEntity() != null) {
318                     response.setEntity(new ContentBufferEntity(
319                             response.getEntity(),
320                             connState.getInbuffer()));
321                 }
322 
323                 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
324 
325                 this.httpProcessor.process(response, context);
326 
327                 handleResponse(response, connState, conn);
328 
329                 connState.notifyAll();
330             }
331 
332         } catch (final IOException ex) {
333             shutdownConnection(conn, ex);
334             if (this.eventListener != null) {
335                 this.eventListener.fatalIOException(ex, conn);
336             }
337         } catch (final HttpException ex) {
338             closeConnection(conn, ex);
339             if (this.eventListener != null) {
340                 this.eventListener.fatalProtocolException(ex, conn);
341             }
342         }
343     }
344 
345     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
346         final HttpContext context = conn.getContext();
347 
348         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
349         try {
350 
351             synchronized (connState) {
352                 final HttpResponse response = connState.getResponse();
353                 final ContentInputBuffer buffer = connState.getInbuffer();
354 
355                 buffer.consumeContent(decoder);
356                 if (decoder.isCompleted()) {
357                     connState.setInputState(ClientConnState.RESPONSE_BODY_DONE);
358 
359                     if (!this.connStrategy.keepAlive(response, context)) {
360                         conn.close();
361                     }
362                 } else {
363                     connState.setInputState(ClientConnState.RESPONSE_BODY_STREAM);
364                 }
365 
366                 connState.notifyAll();
367             }
368 
369         } catch (final IOException ex) {
370             shutdownConnection(conn, ex);
371             if (this.eventListener != null) {
372                 this.eventListener.fatalIOException(ex, conn);
373             }
374         }
375     }
376 
377     public void timeout(final NHttpClientConnection conn) {
378         final HttpContext context = conn.getContext();
379         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
380 
381         try {
382 
383             synchronized (connState) {
384                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
385                     connState.setOutputState(ClientConnState.REQUEST_SENT);
386                     continueRequest(conn, connState);
387 
388                     connState.notifyAll();
389                     return;
390                 }
391             }
392 
393         } catch (final IOException ex) {
394             shutdownConnection(conn, ex);
395             if (this.eventListener != null) {
396                 this.eventListener.fatalIOException(ex, conn);
397             }
398         }
399 
400         handleTimeout(conn);
401     }
402 
403     private void initialize(
404             final NHttpClientConnection conn,
405             final Object attachment) {
406         final HttpContext context = conn.getContext();
407 
408         context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
409         this.execHandler.initalizeContext(context, attachment);
410     }
411 
412     private void continueRequest(
413             final NHttpClientConnection conn,
414             final ClientConnState connState) throws IOException {
415 
416         final HttpRequest request = connState.getRequest();
417 
418         final int timeout = connState.getTimeout();
419         conn.setSocketTimeout(timeout);
420 
421         sendRequestBody(
422                 (HttpEntityEnclosingRequest) request,
423                 connState,
424                 conn);
425     }
426 
427     /**
428      * @throws IOException - not thrown currently
429      */
430     private void sendRequestBody(
431             final HttpEntityEnclosingRequest request,
432             final ClientConnState connState,
433             final NHttpClientConnection conn) throws IOException {
434         final HttpEntity entity = request.getEntity();
435         if (entity != null) {
436 
437             this.executor.execute(new Runnable() {
438 
439                 public void run() {
440                     try {
441 
442                         // Block until previous request is fully processed and
443                         // the worker thread no longer holds the shared buffer
444                         synchronized (connState) {
445                             try {
446                                 for (;;) {
447                                     final int currentState = connState.getOutputState();
448                                     if (!connState.isWorkerRunning()) {
449                                         break;
450                                     }
451                                     if (currentState == ServerConnState.SHUTDOWN) {
452                                         return;
453                                     }
454                                     connState.wait();
455                                 }
456                             } catch (final InterruptedException ex) {
457                                 connState.shutdown();
458                                 return;
459                             }
460                             connState.setWorkerRunning(true);
461                         }
462 
463                         final OutputStream outstream = new ContentOutputStream(
464                                 connState.getOutbuffer());
465                         request.getEntity().writeTo(outstream);
466                         outstream.flush();
467                         outstream.close();
468 
469                         synchronized (connState) {
470                             connState.setWorkerRunning(false);
471                             connState.notifyAll();
472                         }
473 
474                     } catch (final IOException ex) {
475                         shutdownConnection(conn, ex);
476                         if (eventListener != null) {
477                             eventListener.fatalIOException(ex, conn);
478                         }
479                     }
480                 }
481 
482             });
483         }
484     }
485 
486     private void handleResponse(
487             final HttpResponse response,
488             final ClientConnState connState,
489             final NHttpClientConnection conn) {
490 
491         final HttpContext context = conn.getContext();
492 
493         this.executor.execute(new Runnable() {
494 
495             public void run() {
496                 try {
497 
498                     // Block until previous request is fully processed and
499                     // the worker thread no longer holds the shared buffer
500                     synchronized (connState) {
501                         try {
502                             for (;;) {
503                                 final int currentState = connState.getOutputState();
504                                 if (!connState.isWorkerRunning()) {
505                                     break;
506                                 }
507                                 if (currentState == ServerConnState.SHUTDOWN) {
508                                     return;
509                                 }
510                                 connState.wait();
511                             }
512                         } catch (final InterruptedException ex) {
513                             connState.shutdown();
514                             return;
515                         }
516                         connState.setWorkerRunning(true);
517                     }
518 
519                     execHandler.handleResponse(response, context);
520 
521                     synchronized (connState) {
522 
523                         try {
524                             for (;;) {
525                                 final int currentState = connState.getInputState();
526                                 if (currentState == ClientConnState.RESPONSE_DONE) {
527                                     break;
528                                 }
529                                 if (currentState == ServerConnState.SHUTDOWN) {
530                                     return;
531                                 }
532                                 connState.wait();
533                             }
534                         } catch (final InterruptedException ex) {
535                             connState.shutdown();
536                         }
537 
538                         connState.resetInput();
539                         connState.resetOutput();
540                         if (conn.isOpen()) {
541                             conn.requestOutput();
542                         }
543                         connState.setWorkerRunning(false);
544                         connState.notifyAll();
545                     }
546 
547                 } catch (final IOException ex) {
548                     shutdownConnection(conn, ex);
549                     if (eventListener != null) {
550                         eventListener.fatalIOException(ex, conn);
551                     }
552                 }
553             }
554 
555         });
556 
557     }
558 
559     static class ClientConnState {
560 
561         public static final int SHUTDOWN                   = -1;
562         public static final int READY                      = 0;
563         public static final int REQUEST_SENT               = 1;
564         public static final int EXPECT_CONTINUE            = 2;
565         public static final int REQUEST_BODY_STREAM        = 4;
566         public static final int REQUEST_BODY_DONE          = 8;
567         public static final int RESPONSE_RECEIVED          = 16;
568         public static final int RESPONSE_BODY_STREAM       = 32;
569         public static final int RESPONSE_BODY_DONE         = 64;
570         public static final int RESPONSE_DONE              = 64;
571 
572         private final SharedInputBuffer inbuffer;
573         private final SharedOutputBuffer outbuffer;
574 
575         private volatile int inputState;
576         private volatile int outputState;
577 
578         private volatile HttpRequest request;
579         private volatile HttpResponse response;
580 
581         private volatile int timeout;
582 
583         private volatile boolean workerRunning;
584 
585         public ClientConnState(
586                 final int bufsize,
587                 final IOControl ioControl,
588                 final ByteBufferAllocator allocator) {
589             super();
590             this.inbuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
591             this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
592             this.inputState = READY;
593             this.outputState = READY;
594         }
595 
596         public ContentInputBuffer getInbuffer() {
597             return this.inbuffer;
598         }
599 
600         public ContentOutputBuffer getOutbuffer() {
601             return this.outbuffer;
602         }
603 
604         public int getInputState() {
605             return this.inputState;
606         }
607 
608         public void setInputState(final int inputState) {
609             this.inputState = inputState;
610         }
611 
612         public int getOutputState() {
613             return this.outputState;
614         }
615 
616         public void setOutputState(final int outputState) {
617             this.outputState = outputState;
618         }
619 
620         public HttpRequest getRequest() {
621             return this.request;
622         }
623 
624         public void setRequest(final HttpRequest request) {
625             this.request = request;
626         }
627 
628         public HttpResponse getResponse() {
629             return this.response;
630         }
631 
632         public void setResponse(final HttpResponse response) {
633             this.response = response;
634         }
635 
636         public int getTimeout() {
637             return this.timeout;
638         }
639 
640         public void setTimeout(final int timeout) {
641             this.timeout = timeout;
642         }
643 
644         public boolean isWorkerRunning() {
645             return this.workerRunning;
646         }
647 
648         public void setWorkerRunning(final boolean b) {
649             this.workerRunning = b;
650         }
651 
652         public void close() {
653             this.inbuffer.close();
654             this.outbuffer.close();
655             this.inputState = SHUTDOWN;
656             this.outputState = SHUTDOWN;
657         }
658 
659         public void shutdown() {
660             this.inbuffer.shutdown();
661             this.outbuffer.shutdown();
662             this.inputState = SHUTDOWN;
663             this.outputState = SHUTDOWN;
664         }
665 
666         public void resetInput() {
667             this.inbuffer.reset();
668             this.request = null;
669             this.inputState = READY;
670         }
671 
672         public void resetOutput() {
673             this.outbuffer.reset();
674             this.response = null;
675             this.outputState = READY;
676         }
677 
678     }
679 
680 }