View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.io.OutputStream;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.http.ConnectionReuseStrategy;
35  import org.apache.http.HttpEntity;
36  import org.apache.http.HttpEntityEnclosingRequest;
37  import org.apache.http.HttpException;
38  import org.apache.http.HttpRequest;
39  import org.apache.http.HttpResponse;
40  import org.apache.http.HttpStatus;
41  import org.apache.http.annotation.ThreadSafe;
42  import org.apache.http.nio.ContentDecoder;
43  import org.apache.http.nio.ContentEncoder;
44  import org.apache.http.nio.IOControl;
45  import org.apache.http.nio.NHttpClientConnection;
46  import org.apache.http.nio.NHttpClientHandler;
47  import org.apache.http.nio.entity.ContentBufferEntity;
48  import org.apache.http.nio.entity.ContentOutputStream;
49  import org.apache.http.nio.params.NIOReactorPNames;
50  import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
51  import org.apache.http.nio.util.ByteBufferAllocator;
52  import org.apache.http.nio.util.ContentInputBuffer;
53  import org.apache.http.nio.util.ContentOutputBuffer;
54  import org.apache.http.nio.util.DirectByteBufferAllocator;
55  import org.apache.http.nio.util.SharedInputBuffer;
56  import org.apache.http.nio.util.SharedOutputBuffer;
57  import org.apache.http.params.HttpParams;
58  import org.apache.http.params.CoreProtocolPNames;
59  import org.apache.http.params.DefaultedHttpParams;
60  import org.apache.http.protocol.ExecutionContext;
61  import org.apache.http.protocol.HttpContext;
62  import org.apache.http.protocol.HttpProcessor;
63  
64  /**
65   * Client protocol handler implementation that provide compatibility with
66   * the blocking I/O by utilizing shared content buffers and a fairly small pool
67   * of worker threads. The throttling protocol handler allocates input / output
68   * buffers of a constant length upon initialization and controls the rate of
69   * I/O events in order to ensure those content buffers do not ever get
70   * overflown. This helps ensure nearly constant memory footprint for HTTP
71   * connections and avoid the out of memory condition while streaming content
72   * in and out. The {@link HttpRequestExecutionHandler#handleResponse(HttpResponse, HttpContext)}
73   * method will fire immediately when a message is received. The protocol handler
74   * delegate the task of processing requests and generating response content to
75   * an {@link Executor}, which is expected to perform those tasks using
76   * dedicated worker threads in order to avoid blocking the I/O thread.
77   * <p/>
78   * Usually throttling protocol handlers need only a modest number of worker
79   * threads, much fewer than the number of concurrent connections. If the length
80   * of the message is smaller or about the size of the shared content buffer
81   * worker thread will just store content in the buffer and terminate almost
82   * immediately without blocking. The I/O dispatch thread in its turn will take
83   * care of sending out the buffered content asynchronously. The worker thread
84   * will have to block only when processing large messages and the shared buffer
85   * fills up. It is generally advisable to allocate shared buffers of a size of
86   * an average content body for optimal performance.
87   * <p>
88   * The following parameters can be used to customize the behavior of this
89   * class:
90   * <ul>
91   *  <li>{@link org.apache.http.nio.params.NIOReactorPNames#CONTENT_BUFFER_SIZE}</li>
92   *  <li>{@link org.apache.http.params.CoreProtocolPNames#WAIT_FOR_CONTINUE}</li>
93   * </ul>
94   *
95   * @since 4.0
96   *
97   * @deprecated (4.2) use {@link HttpAsyncRequestExecutor} and {@link HttpAsyncRequester}
98   */
99  @Deprecated
100 @ThreadSafe // provided injected dependencies are immutable or thread safe
101 public class ThrottlingHttpClientHandler extends NHttpHandlerBase
102                                          implements NHttpClientHandler {
103 
104     protected HttpRequestExecutionHandler execHandler;
105     protected final Executor executor;
106 
107     private final int bufsize;
108 
109     public ThrottlingHttpClientHandler(
110             final HttpProcessor httpProcessor,
111             final HttpRequestExecutionHandler execHandler,
112             final ConnectionReuseStrategy connStrategy,
113             final ByteBufferAllocator allocator,
114             final Executor executor,
115             final HttpParams params) {
116         super(httpProcessor, connStrategy, allocator, params);
117         if (execHandler == null) {
118             throw new IllegalArgumentException("HTTP request execution handler may not be null.");
119         }
120         if (executor == null) {
121             throw new IllegalArgumentException("Executor may not be null");
122         }
123         this.execHandler = execHandler;
124         this.executor = executor;
125         this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
126     }
127 
128     public ThrottlingHttpClientHandler(
129             final HttpProcessor httpProcessor,
130             final HttpRequestExecutionHandler execHandler,
131             final ConnectionReuseStrategy connStrategy,
132             final Executor executor,
133             final HttpParams params) {
134         this(httpProcessor, execHandler, connStrategy,
135                 new DirectByteBufferAllocator(), executor, params);
136     }
137 
138     public void connected(final NHttpClientConnection conn, final Object attachment) {
139         HttpContext context = conn.getContext();
140 
141         initialize(conn, attachment);
142 
143         ClientConnState connState = new ClientConnState(this.bufsize, conn, this.allocator);
144         context.setAttribute(CONN_STATE, connState);
145 
146         if (this.eventListener != null) {
147             this.eventListener.connectionOpen(conn);
148         }
149 
150         requestReady(conn);
151     }
152 
153     public void closed(final NHttpClientConnection conn) {
154         HttpContext context = conn.getContext();
155         ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
156 
157         if (connState != null) {
158             synchronized (connState) {
159                 connState.close();
160                 connState.notifyAll();
161             }
162         }
163         this.execHandler.finalizeContext(context);
164 
165         if (this.eventListener != null) {
166             this.eventListener.connectionClosed(conn);
167         }
168     }
169 
170     public void exception(final NHttpClientConnection conn, final HttpException ex) {
171         closeConnection(conn, ex);
172         if (this.eventListener != null) {
173             this.eventListener.fatalProtocolException(ex, conn);
174         }
175     }
176 
177     public void exception(final NHttpClientConnection conn, final IOException ex) {
178         shutdownConnection(conn, ex);
179         if (this.eventListener != null) {
180             this.eventListener.fatalIOException(ex, conn);
181         }
182     }
183 
184 
185     public void requestReady(final NHttpClientConnection conn) {
186         HttpContext context = conn.getContext();
187 
188         ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
189 
190         try {
191 
192             synchronized (connState) {
193                 if (connState.getOutputState() != ClientConnState.READY) {
194                     return;
195                 }
196 
197                 HttpRequest request = this.execHandler.submitRequest(context);
198                 if (request == null) {
199                     return;
200                 }
201 
202                 request.setParams(
203                         new DefaultedHttpParams(request.getParams(), this.params));
204 
205                 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
206                 this.httpProcessor.process(request, context);
207                 connState.setRequest(request);
208                 conn.submitRequest(request);
209                 connState.setOutputState(ClientConnState.REQUEST_SENT);
210 
211                 conn.requestInput();
212 
213                 if (request instanceof HttpEntityEnclosingRequest) {
214                     if (((HttpEntityEnclosingRequest) request).expectContinue()) {
215                         int timeout = conn.getSocketTimeout();
216                         connState.setTimeout(timeout);
217                         timeout = this.params.getIntParameter(
218                                 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
219                         conn.setSocketTimeout(timeout);
220                         connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
221                     } else {
222                         sendRequestBody(
223                                 (HttpEntityEnclosingRequest) request,
224                                 connState,
225                                 conn);
226                     }
227                 }
228 
229                 connState.notifyAll();
230             }
231 
232         } catch (IOException ex) {
233             shutdownConnection(conn, ex);
234             if (this.eventListener != null) {
235                 this.eventListener.fatalIOException(ex, conn);
236             }
237         } catch (HttpException ex) {
238             closeConnection(conn, ex);
239             if (this.eventListener != null) {
240                 this.eventListener.fatalProtocolException(ex, conn);
241             }
242         }
243     }
244 
245     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
246         HttpContext context = conn.getContext();
247 
248         ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
249 
250         try {
251 
252             synchronized (connState) {
253                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
254                     conn.suspendOutput();
255                     return;
256                 }
257                 ContentOutputBuffer buffer = connState.getOutbuffer();
258                 buffer.produceContent(encoder);
259                 if (encoder.isCompleted()) {
260                     connState.setInputState(ClientConnState.REQUEST_BODY_DONE);
261                 } else {
262                     connState.setInputState(ClientConnState.REQUEST_BODY_STREAM);
263                 }
264 
265                 connState.notifyAll();
266             }
267 
268         } catch (IOException ex) {
269             shutdownConnection(conn, ex);
270             if (this.eventListener != null) {
271                 this.eventListener.fatalIOException(ex, conn);
272             }
273         }
274     }
275 
276     public void responseReceived(final NHttpClientConnection conn) {
277         HttpContext context = conn.getContext();
278         ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
279 
280         try {
281 
282             synchronized (connState) {
283                 HttpResponse response = conn.getHttpResponse();
284                 response.setParams(
285                         new DefaultedHttpParams(response.getParams(), this.params));
286 
287                 HttpRequest request = connState.getRequest();
288 
289                 int statusCode = response.getStatusLine().getStatusCode();
290                 if (statusCode < HttpStatus.SC_OK) {
291                     // 1xx intermediate response
292                     if (statusCode == HttpStatus.SC_CONTINUE
293                             && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
294                         connState.setOutputState(ClientConnState.REQUEST_SENT);
295                         continueRequest(conn, connState);
296                     }
297                     return;
298                 } else {
299                     connState.setResponse(response);
300                     connState.setInputState(ClientConnState.RESPONSE_RECEIVED);
301 
302                     if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
303                         int timeout = connState.getTimeout();
304                         conn.setSocketTimeout(timeout);
305                         conn.resetOutput();
306                     }
307                 }
308 
309                 if (!canResponseHaveBody(request, response)) {
310                     conn.resetInput();
311                     response.setEntity(null);
312                     connState.setInputState(ClientConnState.RESPONSE_DONE);
313 
314                     if (!this.connStrategy.keepAlive(response, context)) {
315                         conn.close();
316                     }
317                 }
318 
319                 if (response.getEntity() != null) {
320                     response.setEntity(new ContentBufferEntity(
321                             response.getEntity(),
322                             connState.getInbuffer()));
323                 }
324 
325                 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
326 
327                 this.httpProcessor.process(response, context);
328 
329                 handleResponse(response, connState, conn);
330 
331                 connState.notifyAll();
332             }
333 
334         } catch (IOException ex) {
335             shutdownConnection(conn, ex);
336             if (this.eventListener != null) {
337                 this.eventListener.fatalIOException(ex, conn);
338             }
339         } catch (HttpException ex) {
340             closeConnection(conn, ex);
341             if (this.eventListener != null) {
342                 this.eventListener.fatalProtocolException(ex, conn);
343             }
344         }
345     }
346 
347     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
348         HttpContext context = conn.getContext();
349 
350         ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
351         try {
352 
353             synchronized (connState) {
354                 HttpResponse response = connState.getResponse();
355                 ContentInputBuffer buffer = connState.getInbuffer();
356 
357                 buffer.consumeContent(decoder);
358                 if (decoder.isCompleted()) {
359                     connState.setInputState(ClientConnState.RESPONSE_BODY_DONE);
360 
361                     if (!this.connStrategy.keepAlive(response, context)) {
362                         conn.close();
363                     }
364                 } else {
365                     connState.setInputState(ClientConnState.RESPONSE_BODY_STREAM);
366                 }
367 
368                 connState.notifyAll();
369             }
370 
371         } catch (IOException ex) {
372             shutdownConnection(conn, ex);
373             if (this.eventListener != null) {
374                 this.eventListener.fatalIOException(ex, conn);
375             }
376         }
377     }
378 
379     public void timeout(final NHttpClientConnection conn) {
380         HttpContext context = conn.getContext();
381         ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
382 
383         try {
384 
385             synchronized (connState) {
386                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
387                     connState.setOutputState(ClientConnState.REQUEST_SENT);
388                     continueRequest(conn, connState);
389 
390                     connState.notifyAll();
391                     return;
392                 }
393             }
394 
395         } catch (IOException ex) {
396             shutdownConnection(conn, ex);
397             if (this.eventListener != null) {
398                 this.eventListener.fatalIOException(ex, conn);
399             }
400         }
401 
402         handleTimeout(conn);
403     }
404 
405     private void initialize(
406             final NHttpClientConnection conn,
407             final Object attachment) {
408         HttpContext context = conn.getContext();
409 
410         context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
411         this.execHandler.initalizeContext(context, attachment);
412     }
413 
414     private void continueRequest(
415             final NHttpClientConnection conn,
416             final ClientConnState connState) throws IOException {
417 
418         HttpRequest request = connState.getRequest();
419 
420         int timeout = connState.getTimeout();
421         conn.setSocketTimeout(timeout);
422 
423         sendRequestBody(
424                 (HttpEntityEnclosingRequest) request,
425                 connState,
426                 conn);
427     }
428 
429     /**
430      * @throws IOException - not thrown currently
431      */
432     private void sendRequestBody(
433             final HttpEntityEnclosingRequest request,
434             final ClientConnState connState,
435             final NHttpClientConnection conn) throws IOException {
436         HttpEntity entity = request.getEntity();
437         if (entity != null) {
438 
439             this.executor.execute(new Runnable() {
440 
441                 public void run() {
442                     try {
443 
444                         // Block until previous request is fully processed and
445                         // the worker thread no longer holds the shared buffer
446                         synchronized (connState) {
447                             try {
448                                 for (;;) {
449                                     int currentState = connState.getOutputState();
450                                     if (!connState.isWorkerRunning()) {
451                                         break;
452                                     }
453                                     if (currentState == ServerConnState.SHUTDOWN) {
454                                         return;
455                                     }
456                                     connState.wait();
457                                 }
458                             } catch (InterruptedException ex) {
459                                 connState.shutdown();
460                                 return;
461                             }
462                             connState.setWorkerRunning(true);
463                         }
464 
465                         HttpEntity entity = request.getEntity();
466                         OutputStream outstream = new ContentOutputStream(
467                                 connState.getOutbuffer());
468                         entity.writeTo(outstream);
469                         outstream.flush();
470                         outstream.close();
471 
472                         synchronized (connState) {
473                             connState.setWorkerRunning(false);
474                             connState.notifyAll();
475                         }
476 
477                     } catch (IOException ex) {
478                         shutdownConnection(conn, ex);
479                         if (eventListener != null) {
480                             eventListener.fatalIOException(ex, conn);
481                         }
482                     }
483                 }
484 
485             });
486         }
487     }
488 
489     private void handleResponse(
490             final HttpResponse response,
491             final ClientConnState connState,
492             final NHttpClientConnection conn) {
493 
494         final HttpContext context = conn.getContext();
495 
496         this.executor.execute(new Runnable() {
497 
498             public void run() {
499                 try {
500 
501                     // Block until previous request is fully processed and
502                     // the worker thread no longer holds the shared buffer
503                     synchronized (connState) {
504                         try {
505                             for (;;) {
506                                 int currentState = connState.getOutputState();
507                                 if (!connState.isWorkerRunning()) {
508                                     break;
509                                 }
510                                 if (currentState == ServerConnState.SHUTDOWN) {
511                                     return;
512                                 }
513                                 connState.wait();
514                             }
515                         } catch (InterruptedException ex) {
516                             connState.shutdown();
517                             return;
518                         }
519                         connState.setWorkerRunning(true);
520                     }
521 
522                     execHandler.handleResponse(response, context);
523 
524                     synchronized (connState) {
525 
526                         try {
527                             for (;;) {
528                                 int currentState = connState.getInputState();
529                                 if (currentState == ClientConnState.RESPONSE_DONE) {
530                                     break;
531                                 }
532                                 if (currentState == ServerConnState.SHUTDOWN) {
533                                     return;
534                                 }
535                                 connState.wait();
536                             }
537                         } catch (InterruptedException ex) {
538                             connState.shutdown();
539                         }
540 
541                         connState.resetInput();
542                         connState.resetOutput();
543                         if (conn.isOpen()) {
544                             conn.requestOutput();
545                         }
546                         connState.setWorkerRunning(false);
547                         connState.notifyAll();
548                     }
549 
550                 } catch (IOException ex) {
551                     shutdownConnection(conn, ex);
552                     if (eventListener != null) {
553                         eventListener.fatalIOException(ex, conn);
554                     }
555                 }
556             }
557 
558         });
559 
560     }
561 
562     static class ClientConnState {
563 
564         public static final int SHUTDOWN                   = -1;
565         public static final int READY                      = 0;
566         public static final int REQUEST_SENT               = 1;
567         public static final int EXPECT_CONTINUE            = 2;
568         public static final int REQUEST_BODY_STREAM        = 4;
569         public static final int REQUEST_BODY_DONE          = 8;
570         public static final int RESPONSE_RECEIVED          = 16;
571         public static final int RESPONSE_BODY_STREAM       = 32;
572         public static final int RESPONSE_BODY_DONE         = 64;
573         public static final int RESPONSE_DONE              = 64;
574 
575         private final SharedInputBuffer inbuffer;
576         private final SharedOutputBuffer outbuffer;
577 
578         private volatile int inputState;
579         private volatile int outputState;
580 
581         private volatile HttpRequest request;
582         private volatile HttpResponse response;
583 
584         private volatile int timeout;
585 
586         private volatile boolean workerRunning;
587 
588         public ClientConnState(
589                 int bufsize,
590                 final IOControl ioControl,
591                 final ByteBufferAllocator allocator) {
592             super();
593             this.inbuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
594             this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
595             this.inputState = READY;
596             this.outputState = READY;
597         }
598 
599         public ContentInputBuffer getInbuffer() {
600             return this.inbuffer;
601         }
602 
603         public ContentOutputBuffer getOutbuffer() {
604             return this.outbuffer;
605         }
606 
607         public int getInputState() {
608             return this.inputState;
609         }
610 
611         public void setInputState(int inputState) {
612             this.inputState = inputState;
613         }
614 
615         public int getOutputState() {
616             return this.outputState;
617         }
618 
619         public void setOutputState(int outputState) {
620             this.outputState = outputState;
621         }
622 
623         public HttpRequest getRequest() {
624             return this.request;
625         }
626 
627         public void setRequest(final HttpRequest request) {
628             this.request = request;
629         }
630 
631         public HttpResponse getResponse() {
632             return this.response;
633         }
634 
635         public void setResponse(final HttpResponse response) {
636             this.response = response;
637         }
638 
639         public int getTimeout() {
640             return this.timeout;
641         }
642 
643         public void setTimeout(int timeout) {
644             this.timeout = timeout;
645         }
646 
647         public boolean isWorkerRunning() {
648             return this.workerRunning;
649         }
650 
651         public void setWorkerRunning(boolean b) {
652             this.workerRunning = b;
653         }
654 
655         public void close() {
656             this.inbuffer.close();
657             this.outbuffer.close();
658             this.inputState = SHUTDOWN;
659             this.outputState = SHUTDOWN;
660         }
661 
662         public void shutdown() {
663             this.inbuffer.shutdown();
664             this.outbuffer.shutdown();
665             this.inputState = SHUTDOWN;
666             this.outputState = SHUTDOWN;
667         }
668 
669         public void resetInput() {
670             this.inbuffer.reset();
671             this.request = null;
672             this.inputState = READY;
673         }
674 
675         public void resetOutput() {
676             this.outbuffer.reset();
677             this.response = null;
678             this.outputState = READY;
679         }
680 
681     }
682 
683 }