View Javadoc

1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.Closeable;
31  import java.io.IOException;
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Queue;
35  import java.util.concurrent.ConcurrentLinkedQueue;
36  import java.util.concurrent.Future;
37  
38  import org.apache.http.ConnectionClosedException;
39  import org.apache.http.ConnectionReuseStrategy;
40  import org.apache.http.HttpException;
41  import org.apache.http.HttpRequest;
42  import org.apache.http.HttpResponse;
43  import org.apache.http.concurrent.BasicFuture;
44  import org.apache.http.concurrent.FutureCallback;
45  import org.apache.http.impl.DefaultConnectionReuseStrategy;
46  import org.apache.http.nio.ContentDecoder;
47  import org.apache.http.nio.ContentEncoder;
48  import org.apache.http.nio.IOControl;
49  import org.apache.http.nio.NHttpClientConnection;
50  import org.apache.http.protocol.HttpContext;
51  import org.apache.http.protocol.HttpCoreContext;
52  import org.apache.http.protocol.HttpProcessor;
53  import org.apache.http.util.Args;
54  import org.apache.http.util.Asserts;
55  
56  /**
57   * Basic implementation of {@link org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler} that executes
58   * a single HTTP request / response exchange.
59   *
60   * @param <T> the result type of request execution.
61   * @since 4.4
62   */
63  @Pipelined()
64  public class PipeliningClientExchangeHandler<T> implements HttpAsyncClientExchangeHandler {
65  
66      private final Queue<HttpAsyncRequestProducer> requestProducerQueue;
67      private final Queue<HttpAsyncResponseConsumer<T>> responseConsumerQueue;
68      private final Queue<HttpRequest> requestQueue;
69      private final Queue<T> resultQueue;
70      private final BasicFuture<List<T>> future;
71      private final HttpContext localContext;
72      private final NHttpClientConnection conn;
73      private final HttpProcessor httppocessor;
74      private final ConnectionReuseStrategy connReuseStrategy;
75  
76      private volatile HttpAsyncRequestProducer requestProducer;
77      private volatile HttpAsyncResponseConsumer<T> responseConsumer;
78      private volatile boolean keepAlive;
79      private volatile boolean done;
80  
81      /**
82       * Creates new instance of <tt>PipeliningClientExchangeHandler<tt/>.
83       *
84       * @param requestProducers the request producers.
85       * @param responseConsumers the response consumers.
86       * @param callback the future callback invoked when the operation is completed.
87       * @param localContext the local execution context.
88       * @param conn the actual connection.
89       * @param httppocessor the HTTP protocol processor.
90       * @param connReuseStrategy the connection re-use strategy.
91       */
92      public PipeliningClientExchangeHandler(
93              final List<? extends HttpAsyncRequestProducer> requestProducers,
94              final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
95              final FutureCallback<List<T>> callback,
96              final HttpContext localContext,
97              final NHttpClientConnection conn,
98              final HttpProcessor httppocessor,
99              final ConnectionReuseStrategy connReuseStrategy) {
100         super();
101         Args.notEmpty(requestProducers, "Request producer list");
102         Args.notEmpty(responseConsumers, "Response consumer list");
103         Args.check(requestProducers.size() == responseConsumers.size(),
104                 "Number of request producers does not match that of response consumers");
105         this.requestProducerQueue = new ConcurrentLinkedQueue<HttpAsyncRequestProducer>(requestProducers);
106         this.responseConsumerQueue = new ConcurrentLinkedQueue<HttpAsyncResponseConsumer<T>>(responseConsumers);
107         this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
108         this.resultQueue = new ConcurrentLinkedQueue<T>();
109         this.future = new BasicFuture<List<T>>(callback);
110         this.localContext = Args.notNull(localContext, "HTTP context");
111         this.conn = Args.notNull(conn, "HTTP connection");
112         this.httppocessor = Args.notNull(httppocessor, "HTTP processor");
113         this.connReuseStrategy = connReuseStrategy != null ? connReuseStrategy :
114             DefaultConnectionReuseStrategy.INSTANCE;
115         this.localContext.setAttribute(HttpCoreContext.HTTP_CONNECTION, this.conn);
116     }
117 
118     /**
119      * Creates new instance of <tt>PipeliningClientExchangeHandler<tt/>.
120      *
121      * @param requestProducers the request producers.
122      * @param responseConsumers the response consumers.
123      * @param localContext the local execution context.
124      * @param conn the actual connection.
125      * @param httppocessor the HTTP protocol processor.
126      */
127     public PipeliningClientExchangeHandler(
128             final List<? extends HttpAsyncRequestProducer> requestProducers,
129             final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
130             final HttpContext localContext,
131             final NHttpClientConnection conn,
132             final HttpProcessor httppocessor) {
133         this(requestProducers, responseConsumers, null, localContext, conn, httppocessor, null);
134     }
135 
136     public Future<List<T>> getFuture() {
137         return this.future;
138     }
139 
140     private static void closeQuietly(final Closeable closeable) {
141         if (closeable != null) {
142             try {
143                 closeable.close();
144             } catch (final IOException ex) {
145             }
146         }
147     }
148 
149     private void releaseResources() {
150         closeQuietly(this.requestProducer);
151         this.requestProducer = null;
152         closeQuietly(this.responseConsumer);
153         this.responseConsumer = null;
154         while (!this.requestProducerQueue.isEmpty()) {
155             closeQuietly(this.requestProducerQueue.remove());
156         }
157         while (!this.responseConsumerQueue.isEmpty()) {
158             closeQuietly(this.responseConsumerQueue.remove());
159         }
160         this.requestQueue.clear();
161         this.resultQueue.clear();
162     }
163 
164     @Override
165     public void close() throws IOException {
166         releaseResources();
167         if (!this.future.isDone()) {
168             this.future.cancel();
169         }
170     }
171 
172     @Override
173     public HttpRequest generateRequest() throws IOException, HttpException {
174         Asserts.check(this.requestProducer == null, "Inconsistent state: request producer is not null");
175         this.requestProducer = this.requestProducerQueue.poll();
176         if (this.requestProducer == null) {
177             return null;
178         }
179         final HttpRequest request = this.requestProducer.generateRequest();
180         this.httppocessor.process(request, this.localContext);
181         this.requestQueue.add(request);
182         return request;
183     }
184 
185     @Override
186     public void produceContent(
187             final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
188         Asserts.check(this.requestProducer != null, "Inconsistent state: request producer is null");
189         this.requestProducer.produceContent(encoder, ioctrl);
190     }
191 
192     @Override
193     public void requestCompleted() {
194         Asserts.check(this.requestProducer != null, "Inconsistent state: request producer is null");
195         this.requestProducer.requestCompleted(this.localContext);
196         this.requestProducer = null;
197     }
198 
199     @Override
200     public void responseReceived(final HttpResponse response) throws IOException, HttpException {
201         Asserts.check(this.responseConsumer == null, "Inconsistent state: response consumer is not null");
202 
203         this.responseConsumer = this.responseConsumerQueue.poll();
204         Asserts.check(this.responseConsumer != null, "Inconsistent state: response consumer queue is empty");
205 
206         final HttpRequest request = this.requestQueue.poll();
207         Asserts.check(request != null, "Inconsistent state: request queue is empty");
208 
209         this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
210         this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
211         this.httppocessor.process(response, this.localContext);
212 
213         this.responseConsumer.responseReceived(response);
214         this.keepAlive = this.connReuseStrategy.keepAlive(response, this.localContext);
215     }
216 
217     @Override
218     public void consumeContent(
219             final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
220         Asserts.check(this.responseConsumer != null, "Inconsistent state: response consumer is null");
221         this.responseConsumer.consumeContent(decoder, ioctrl);
222     }
223 
224     @Override
225     public void responseCompleted() throws IOException {
226         Asserts.check(this.responseConsumer != null, "Inconsistent state: response consumer is null");
227         try {
228             if (!this.keepAlive) {
229                 this.conn.close();
230             }
231             this.responseConsumer.responseCompleted(this.localContext);
232             final T result = this.responseConsumer.getResult();
233             final Exception ex = this.responseConsumer.getException();
234             this.responseConsumer = null;
235             if (result != null) {
236                 this.resultQueue.add(result);
237             } else {
238                 this.future.failed(ex);
239                 this.conn.shutdown();
240             }
241             if (!conn.isOpen()) {
242                 this.done = true;
243                 releaseResources();
244             }
245             if (!this.future.isDone() && this.responseConsumerQueue.isEmpty()) {
246                 this.future.completed(new ArrayList<T>(this.resultQueue));
247                 this.resultQueue.clear();
248             }
249         } catch (final RuntimeException ex) {
250             failed(ex);
251             throw ex;
252         }
253     }
254 
255     @Override
256     public void inputTerminated() {
257         failed(new ConnectionClosedException("Connection closed"));
258     }
259 
260     @Override
261     public void failed(final Exception ex) {
262         this.done = true;
263         try {
264             if (this.requestProducer != null) {
265                 this.requestProducer.failed(ex);
266             }
267             if (this.responseConsumer != null) {
268                 this.responseConsumer.failed(ex);
269             }
270         } finally {
271             try {
272                 this.future.failed(ex);
273             } finally {
274                 releaseResources();
275             }
276         }
277     }
278 
279     @Override
280     public boolean cancel() {
281         this.done = true;
282         try {
283             final boolean cancelled = this.responseConsumer.cancel();
284             this.future.cancel();
285             releaseResources();
286             return cancelled;
287         } catch (final RuntimeException ex) {
288             failed(ex);
289             throw ex;
290         }
291     }
292 
293     @Override
294     public boolean isDone() {
295         return this.done;
296     }
297 
298 }