View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.testserver;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.List;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.apache.http.HttpHost;
39  import org.apache.http.HttpRequest;
40  import org.apache.http.HttpResponse;
41  import org.apache.http.OoopsieRuntimeException;
42  import org.apache.http.concurrent.FutureCallback;
43  import org.apache.http.config.ConnectionConfig;
44  import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
45  import org.apache.http.impl.nio.DefaultNHttpClientConnection;
46  import org.apache.http.impl.nio.DefaultNHttpClientConnectionFactory;
47  import org.apache.http.impl.nio.pool.BasicNIOConnPool;
48  import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
49  import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
50  import org.apache.http.impl.nio.reactor.ExceptionEvent;
51  import org.apache.http.nio.NHttpClientConnection;
52  import org.apache.http.nio.NHttpClientEventHandler;
53  import org.apache.http.nio.pool.NIOConnFactory;
54  import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
55  import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
56  import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
57  import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
58  import org.apache.http.nio.protocol.HttpAsyncRequester;
59  import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
60  import org.apache.http.nio.reactor.ConnectingIOReactor;
61  import org.apache.http.nio.reactor.IOEventDispatch;
62  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
63  import org.apache.http.nio.reactor.IOReactorStatus;
64  import org.apache.http.nio.reactor.IOSession;
65  import org.apache.http.nio.reactor.SessionRequest;
66  import org.apache.http.protocol.HttpContext;
67  import org.apache.http.protocol.HttpCoreContext;
68  import org.apache.http.protocol.HttpProcessor;
69  import org.apache.http.protocol.ImmutableHttpProcessor;
70  import org.apache.http.protocol.RequestConnControl;
71  import org.apache.http.protocol.RequestContent;
72  import org.apache.http.protocol.RequestExpectContinue;
73  import org.apache.http.protocol.RequestTargetHost;
74  import org.apache.http.protocol.RequestUserAgent;
75  
76  @SuppressWarnings("deprecation")
77  public class HttpClientNio {
78  
79      public static final HttpProcessor DEFAULT_HTTP_PROC = new ImmutableHttpProcessor(
80              new RequestContent(),
81              new RequestTargetHost(),
82              new RequestConnControl(),
83              new RequestUserAgent("TEST-CLIENT/1.1"),
84              new RequestExpectContinue(true));
85  
86      private final DefaultConnectingIOReactor ioReactor;
87      private final BasicNIOConnPool connpool;
88  
89      private volatile HttpProcessor httpProcessor;
90      private volatile HttpAsyncRequester executor;
91      private volatile IOReactorThread thread;
92      private volatile int timeout;
93  
94      public HttpClientNio(
95              final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory) throws IOException {
96          super();
97          this.ioReactor = new DefaultConnectingIOReactor();
98          this.ioReactor.setExceptionHandler(new SimpleIOReactorExceptionHandler());
99          this.connpool = new BasicNIOConnPool(this.ioReactor, new NIOConnFactory<HttpHost, NHttpClientConnection>() {
100 
101             @Override
102             public NHttpClientConnection create(
103                 final HttpHost route, final IOSession session) throws IOException {
104                 final NHttpClientConnection conn = connFactory.create(route, session);
105                 conn.setSocketTimeout(timeout);
106                 return conn;
107             }
108 
109         }, 0);
110     }
111 
112     public int getTimeout() {
113         return this.timeout;
114     }
115 
116     public void setTimeout(final int timeout) {
117         this.timeout = timeout;
118     }
119 
120     public void setMaxTotal(final int max) {
121         this.connpool.setMaxTotal(max);
122     }
123 
124     public void setMaxPerRoute(final int max) {
125         this.connpool.setDefaultMaxPerRoute(max);
126     }
127 
128     public void setHttpProcessor(final HttpProcessor httpProcessor) {
129         this.httpProcessor = httpProcessor;
130     }
131 
132     public Future<BasicNIOPoolEntry> lease(
133             final HttpHost host,
134             final FutureCallback<BasicNIOPoolEntry> callback) {
135         return this.connpool.lease(host, null, this.timeout, TimeUnit.MILLISECONDS, callback);
136     }
137 
138     public void release(final BasicNIOPoolEntry poolEntry, final boolean reusable) {
139         this.connpool.release(poolEntry, reusable);
140     }
141 
142     public <T> Future<T> execute(
143             final HttpAsyncRequestProducer requestProducer,
144             final HttpAsyncResponseConsumer<T> responseConsumer,
145             final HttpContext context,
146             final FutureCallback<T> callback) {
147         return executor.execute(requestProducer, responseConsumer, this.connpool,
148                 context != null ? context : HttpCoreContext.create(), callback);
149     }
150 
151     public <T> Future<List<T>> executePipelined(
152             final HttpHost target,
153             final List<HttpAsyncRequestProducer> requestProducers,
154             final List<HttpAsyncResponseConsumer<T>> responseConsumers,
155             final HttpContext context,
156             final FutureCallback<List<T>> callback) {
157         return executor.executePipelined(target, requestProducers, responseConsumers, this.connpool,
158                 context != null ? context : HttpCoreContext.create(), callback);
159     }
160 
161     public Future<HttpResponse> execute(
162             final HttpHost target,
163             final HttpRequest request,
164             final HttpContext context,
165             final FutureCallback<HttpResponse> callback) {
166         return execute(
167                 new BasicAsyncRequestProducer(target, request),
168                 new BasicAsyncResponseConsumer(),
169                 context != null ? context : HttpCoreContext.create(),
170                 callback);
171     }
172 
173     public Future<List<HttpResponse>> executePipelined(
174             final HttpHost target,
175             final List<HttpRequest> requests,
176             final HttpContext context,
177             final FutureCallback<List<HttpResponse>> callback) {
178         final List<HttpAsyncRequestProducer> requestProducers =
179                 new ArrayList<HttpAsyncRequestProducer>(requests.size());
180         final List<HttpAsyncResponseConsumer<HttpResponse>> responseConsumers =
181                 new ArrayList<HttpAsyncResponseConsumer<HttpResponse>>(requests.size());
182         for (final HttpRequest request: requests) {
183             requestProducers.add(new BasicAsyncRequestProducer(target, request));
184             responseConsumers.add(new BasicAsyncResponseConsumer());
185         }
186         return executor.executePipelined(target, requestProducers, responseConsumers, this.connpool,
187                 context != null ? context : HttpCoreContext.create(), callback);
188     }
189 
190     public Future<HttpResponse> execute(
191             final HttpHost target,
192             final HttpRequest request,
193             final HttpContext context) {
194         return execute(target, request, context, null);
195     }
196 
197     public Future<List<HttpResponse>> executePipelined(
198             final HttpHost target,
199             final List<HttpRequest> requests,
200             final HttpContext context) {
201         return executePipelined(target, requests, context, null);
202     }
203 
204     public Future<HttpResponse> execute(
205             final HttpHost target,
206             final HttpRequest request) {
207         return execute(target, request, null, null);
208     }
209 
210     public Future<List<HttpResponse>> executePipelined(
211             final HttpHost target,
212             final HttpRequest... requests) {
213         return executePipelined(target, Arrays.asList(requests), null, null);
214     }
215 
216     private void execute(final NHttpClientEventHandler clientHandler) throws IOException {
217         final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(clientHandler,
218             new DefaultNHttpClientConnectionFactory(ConnectionConfig.DEFAULT)) {
219 
220             @Override
221             protected DefaultNHttpClientConnection createConnection(final IOSession session) {
222                 final DefaultNHttpClientConnection conn = super.createConnection(session);
223                 conn.setSocketTimeout(timeout);
224                 return conn;
225             }
226 
227         };
228         this.ioReactor.execute(ioEventDispatch);
229     }
230 
231     public SessionRequest openConnection(final InetSocketAddress address, final Object attachment) {
232         final SessionRequest sessionRequest = this.ioReactor.connect(address, null, attachment, null);
233         sessionRequest.setConnectTimeout(this.timeout);
234         return sessionRequest;
235     }
236 
237     public void start() {
238         this.executor = new HttpAsyncRequester(this.httpProcessor != null ? this.httpProcessor : DEFAULT_HTTP_PROC);
239         this.thread = new IOReactorThread(new HttpAsyncRequestExecutor());
240         this.thread.start();
241     }
242 
243     public ConnectingIOReactor getIoReactor() {
244         return this.ioReactor;
245     }
246 
247     public IOReactorStatus getStatus() {
248         return this.ioReactor.getStatus();
249     }
250 
251     public List<ExceptionEvent> getAuditLog() {
252         return this.ioReactor.getAuditLog();
253     }
254 
255     public void join(final long timeout) throws InterruptedException {
256         if (this.thread != null) {
257             this.thread.join(timeout);
258         }
259     }
260 
261     public Exception getException() {
262         if (this.thread != null) {
263             return this.thread.getException();
264         } else {
265             return null;
266         }
267     }
268 
269     public void shutdown() throws IOException {
270         this.connpool.shutdown(2000);
271         try {
272             join(500);
273         } catch (final InterruptedException ignore) {
274         }
275     }
276 
277     private class IOReactorThread extends Thread {
278 
279         private final NHttpClientEventHandler clientHandler;
280 
281         private volatile Exception ex;
282 
283         public IOReactorThread(final NHttpClientEventHandler clientHandler) {
284             super();
285             this.clientHandler = clientHandler;
286         }
287 
288         @Override
289         public void run() {
290             try {
291                 execute(this.clientHandler);
292             } catch (final Exception ex) {
293                 this.ex = ex;
294             }
295         }
296 
297         public Exception getException() {
298             return this.ex;
299         }
300 
301     }
302 
303     static class SimpleIOReactorExceptionHandler implements IOReactorExceptionHandler {
304 
305         @Override
306         public boolean handle(final RuntimeException ex) {
307             if (!(ex instanceof OoopsieRuntimeException)) {
308                 ex.printStackTrace(System.out);
309             }
310             return false;
311         }
312 
313         @Override
314         public boolean handle(final IOException ex) {
315             ex.printStackTrace(System.out);
316             return false;
317         }
318 
319     }
320 
321 }