View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.bootstrap;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.util.List;
33  import java.util.Set;
34  import java.util.concurrent.Future;
35  import java.util.concurrent.atomic.AtomicReference;
36  
37  import org.apache.hc.core5.concurrent.BasicFuture;
38  import org.apache.hc.core5.concurrent.ComplexFuture;
39  import org.apache.hc.core5.concurrent.FutureCallback;
40  import org.apache.hc.core5.function.Callback;
41  import org.apache.hc.core5.function.Decorator;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpHost;
46  import org.apache.hc.core5.http.HttpRequest;
47  import org.apache.hc.core5.http.HttpResponse;
48  import org.apache.hc.core5.http.ProtocolException;
49  import org.apache.hc.core5.http.URIScheme;
50  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
51  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
52  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
53  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
54  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
55  import org.apache.hc.core5.http.nio.CapacityChannel;
56  import org.apache.hc.core5.http.nio.DataStreamChannel;
57  import org.apache.hc.core5.http.nio.RequestChannel;
58  import org.apache.hc.core5.http.nio.command.ExecutionCommand;
59  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
60  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
61  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
62  import org.apache.hc.core5.http.protocol.HttpContext;
63  import org.apache.hc.core5.http.protocol.HttpCoreContext;
64  import org.apache.hc.core5.io.ShutdownType;
65  import org.apache.hc.core5.net.URIAuthority;
66  import org.apache.hc.core5.pool.ConnPoolControl;
67  import org.apache.hc.core5.pool.ManagedConnPool;
68  import org.apache.hc.core5.pool.PoolEntry;
69  import org.apache.hc.core5.pool.PoolStats;
70  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
71  import org.apache.hc.core5.reactor.IOReactorConfig;
72  import org.apache.hc.core5.reactor.IOSession;
73  import org.apache.hc.core5.reactor.IOSessionListener;
74  import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
75  import org.apache.hc.core5.util.Args;
76  import org.apache.hc.core5.util.TimeValue;
77  import org.apache.hc.core5.util.Timeout;
78  
79  /**
80   * @since 5.0
81   */
82  public class HttpAsyncRequester extends AsyncRequester implements ConnPoolControl<HttpHost> {
83  
84      private final ManagedConnPool<HttpHost, IOSession> connPool;
85      private final TlsStrategy tlsStrategy;
86  
87      public HttpAsyncRequester(
88              final IOReactorConfig ioReactorConfig,
89              final IOEventHandlerFactory eventHandlerFactory,
90              final Decorator<IOSession> ioSessionDecorator,
91              final IOSessionListener sessionListener,
92              final ManagedConnPool<HttpHost, IOSession> connPool,
93              final TlsStrategy tlsStrategy) {
94          super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, sessionListener, new Callback<IOSession>() {
95  
96              @Override
97              public void execute(final IOSession session) {
98                  session.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
99              }
100 
101         }, DefaultAddressResolver.INSTANCE);
102         this.connPool = Args.notNull(connPool, "Connection pool");
103         this.tlsStrategy = tlsStrategy;
104     }
105 
106     @Override
107     public PoolStats getTotalStats() {
108         return connPool.getTotalStats();
109     }
110 
111     @Override
112     public PoolStats getStats(final HttpHost route) {
113         return connPool.getStats(route);
114     }
115 
116     @Override
117     public void setMaxTotal(final int max) {
118         connPool.setMaxTotal(max);
119     }
120 
121     @Override
122     public int getMaxTotal() {
123         return connPool.getMaxTotal();
124     }
125 
126     @Override
127     public void setDefaultMaxPerRoute(final int max) {
128         connPool.setDefaultMaxPerRoute(max);
129     }
130 
131     @Override
132     public int getDefaultMaxPerRoute() {
133         return connPool.getDefaultMaxPerRoute();
134     }
135 
136     @Override
137     public void setMaxPerRoute(final HttpHost route, final int max) {
138         connPool.setMaxPerRoute(route, max);
139     }
140 
141     @Override
142     public int getMaxPerRoute(final HttpHost route) {
143         return connPool.getMaxPerRoute(route);
144     }
145 
146     @Override
147     public void closeIdle(final TimeValue idleTime) {
148         connPool.closeIdle(idleTime);
149     }
150 
151     @Override
152     public void closeExpired() {
153         connPool.closeExpired();
154     }
155 
156     @Override
157     public Set<HttpHost> getRoutes() {
158         return connPool.getRoutes();
159     }
160 
161     public Future<AsyncClientEndpoint> connect(
162             final HttpHost host,
163             final Timeout timeout,
164             final Object attachment,
165             final FutureCallback<AsyncClientEndpoint> callback) {
166         return doConnect(host, timeout, attachment, callback);
167     }
168 
169     protected Future<AsyncClientEndpoint> doConnect(
170             final HttpHost host,
171             final Timeout timeout,
172             final Object attachment,
173             final FutureCallback<AsyncClientEndpoint> callback) {
174         Args.notNull(host, "Host");
175         Args.notNull(timeout, "Timeout");
176         final ComplexFuture<AsyncClientEndpoint> resultFuture = new ComplexFuture<>(callback);
177         final Future<PoolEntry<HttpHost, IOSession>> leaseFuture = connPool.lease(
178                 host, null, timeout, new FutureCallback<PoolEntry<HttpHost, IOSession>>() {
179 
180             @Override
181             public void completed(final PoolEntry<HttpHost, IOSession> poolEntry) {
182                 final AsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(poolEntry);
183                 final IOSession ioSession = poolEntry.getConnection();
184                 if (ioSession != null && ioSession.isClosed()) {
185                     poolEntry.discardConnection(ShutdownType.IMMEDIATE);
186                 }
187                 if (poolEntry.hasConnection()) {
188                     resultFuture.completed(endpoint);
189                 } else {
190                     final Future<IOSession> futute = requestSession(host, timeout, attachment, new FutureCallback<IOSession>() {
191 
192                         @Override
193                         public void completed(final IOSession session) {
194                             if (tlsStrategy != null
195                                     && URIScheme.HTTPS.same(host.getSchemeName())
196                                     && session instanceof TransportSecurityLayer) {
197                                 tlsStrategy.upgrade(
198                                         (TransportSecurityLayer) session,
199                                         host,
200                                         session.getLocalAddress(),
201                                         session.getRemoteAddress(),
202                                         attachment);
203                             }
204                             session.setSocketTimeout(timeout.toMillisIntBound());
205                             poolEntry.assignConnection(session);
206                             resultFuture.completed(endpoint);
207                         }
208 
209                         @Override
210                         public void failed(final Exception cause) {
211                             try {
212                                 resultFuture.failed(cause);
213                             } finally {
214                                 endpoint.releaseAndDiscard();
215                             }
216                         }
217 
218                         @Override
219                         public void cancelled() {
220                             try {
221                                 resultFuture.cancel();
222                             } finally {
223                                 endpoint.releaseAndDiscard();
224                             }
225                         }
226 
227                     });
228                     resultFuture.setDependency(futute);
229                 }
230             }
231 
232             @Override
233             public void failed(final Exception ex) {
234                 resultFuture.failed(ex);
235             }
236 
237             @Override
238             public void cancelled() {
239                 resultFuture.cancel();
240             }
241 
242         });
243         resultFuture.setDependency(leaseFuture);
244         return resultFuture;
245     }
246 
247     public Future<AsyncClientEndpoint> connect(final HttpHost host, final Timeout timeout) throws InterruptedException {
248         return connect(host, timeout, null, null);
249     }
250 
251     public void execute(
252             final AsyncClientExchangeHandler exchangeHandler,
253             final Timeout timeout,
254             final HttpContext context) {
255         Args.notNull(exchangeHandler, "Exchange handler");
256         Args.notNull(timeout, "Timeout");
257         Args.notNull(context, "Context");
258         try {
259             exchangeHandler.produceRequest(new RequestChannel() {
260 
261                 @Override
262                 public void sendRequest(
263                         final HttpRequest request,
264                         final EntityDetails entityDetails) throws HttpException, IOException {
265                     final String scheme = request.getScheme();
266                     final URIAuthority authority = request.getAuthority();
267                     if (authority == null) {
268                         throw new ProtocolException("Request authority not specified");
269                     }
270                     final HttpHost target = new HttpHost(authority, scheme);
271                     connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
272 
273                         @Override
274                         public void completed(final AsyncClientEndpoint endpoint) {
275                             endpoint.execute(new AsyncClientExchangeHandler() {
276 
277                                 @Override
278                                 public void releaseResources() {
279                                     endpoint.releaseAndDiscard();
280                                     exchangeHandler.releaseResources();
281                                 }
282 
283                                 @Override
284                                 public void failed(final Exception cause) {
285                                     endpoint.releaseAndDiscard();
286                                     exchangeHandler.failed(cause);
287                                 }
288 
289                                 @Override
290                                 public void cancel() {
291                                     endpoint.releaseAndDiscard();
292                                     exchangeHandler.cancel();
293                                 }
294 
295                                 @Override
296                                 public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
297                                     channel.sendRequest(request, entityDetails);
298                                 }
299 
300                                 @Override
301                                 public int available() {
302                                     return exchangeHandler.available();
303                                 }
304 
305                                 @Override
306                                 public void produce(final DataStreamChannel channel) throws IOException {
307                                     exchangeHandler.produce(channel);
308                                 }
309 
310                                 @Override
311                                 public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
312                                     exchangeHandler.consumeInformation(response);
313                                 }
314 
315                                 @Override
316                                 public void consumeResponse(
317                                         final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
318                                     if (entityDetails == null) {
319                                         endpoint.releaseAndReuse();
320                                     }
321                                     exchangeHandler.consumeResponse(response, entityDetails);
322                                 }
323 
324                                 @Override
325                                 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
326                                     exchangeHandler.updateCapacity(capacityChannel);
327                                 }
328 
329                                 @Override
330                                 public int consume(final ByteBuffer src) throws IOException {
331                                     return exchangeHandler.consume(src);
332                                 }
333 
334                                 @Override
335                                 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
336                                     endpoint.releaseAndReuse();
337                                     exchangeHandler.streamEnd(trailers);
338                                 }
339 
340                             }, context);
341 
342                         }
343 
344                         @Override
345                         public void failed(final Exception ex) {
346                             exchangeHandler.failed(ex);
347                         }
348 
349                         @Override
350                         public void cancelled() {
351                             exchangeHandler.cancel();
352                         }
353 
354                     });
355 
356                 }
357 
358             });
359 
360         } catch (final IOException | HttpException ex) {
361             exchangeHandler.failed(ex);
362         }
363     }
364 
365     public final <T> Future<T> execute(
366             final AsyncRequestProducer requestProducer,
367             final AsyncResponseConsumer<T> responseConsumer,
368             final Timeout timeout,
369             final HttpContext context,
370             final FutureCallback<T> callback) {
371         Args.notNull(requestProducer, "Request producer");
372         Args.notNull(responseConsumer, "Response consumer");
373         Args.notNull(timeout, "Timeout");
374         final BasicFuture<T> future = new BasicFuture<>(callback);
375         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(requestProducer, responseConsumer, new FutureCallback<T>() {
376 
377             @Override
378             public void completed(final T result) {
379                 future.completed(result);
380             }
381 
382             @Override
383             public void failed(final Exception ex) {
384                 future.failed(ex);
385             }
386 
387             @Override
388             public void cancelled() {
389                 future.cancel();
390             }
391 
392         });
393         execute(exchangeHandler, timeout, context != null ? context : HttpCoreContext.create());
394         return future;
395     }
396 
397     public final <T> Future<T> execute(
398             final AsyncRequestProducer requestProducer,
399             final AsyncResponseConsumer<T> responseConsumer,
400             final Timeout timeout,
401             final FutureCallback<T> callback) {
402         return execute(requestProducer, responseConsumer, timeout, null, callback);
403     }
404 
405     private class InternalAsyncClientEndpoint extends AsyncClientEndpoint {
406 
407         final AtomicReference<PoolEntry<HttpHost, IOSession>> poolEntryRef;
408 
409         InternalAsyncClientEndpoint(final PoolEntry<HttpHost, IOSession> poolEntry) {
410             this.poolEntryRef = new AtomicReference<>(poolEntry);
411         }
412 
413         @Override
414         public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpContext context) {
415             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.get();
416             if (poolEntry == null) {
417                 throw new IllegalStateException("Endpoint has already been released");
418             }
419             final IOSession ioSession = poolEntry.getConnection();
420             if (ioSession == null) {
421                 throw new IllegalStateException("I/O session is invalid");
422             }
423             ioSession.addLast(new ExecutionCommand(exchangeHandler, context));
424         }
425 
426         @Override
427         public void releaseAndReuse() {
428             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
429             if (poolEntry != null) {
430                 final IOSession ioSession = poolEntry.getConnection();
431                 connPool.release(poolEntry, ioSession != null && !ioSession.isClosed());
432             }
433         }
434 
435         @Override
436         public void releaseAndDiscard() {
437             final PoolEntry<HttpHost, IOSession> poolEntry = poolEntryRef.getAndSet(null);
438             if (poolEntry != null) {
439                 poolEntry.discardConnection(ShutdownType.IMMEDIATE);
440                 connPool.release(poolEntry, false);
441             }
442         }
443 
444     }
445 
446 }