View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.bootstrap;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.InterruptedIOException;
32  import java.io.OutputStream;
33  import java.net.InetSocketAddress;
34  import java.net.Socket;
35  import java.util.Set;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  import javax.net.ssl.SSLSocketFactory;
42  
43  import org.apache.hc.core5.annotation.Internal;
44  import org.apache.hc.core5.function.Resolver;
45  import org.apache.hc.core5.http.ClassicHttpRequest;
46  import org.apache.hc.core5.http.ClassicHttpResponse;
47  import org.apache.hc.core5.http.ConnectionClosedException;
48  import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
49  import org.apache.hc.core5.http.HttpEntity;
50  import org.apache.hc.core5.http.HttpException;
51  import org.apache.hc.core5.http.HttpHost;
52  import org.apache.hc.core5.http.URIScheme;
53  import org.apache.hc.core5.http.config.CharCodingConfig;
54  import org.apache.hc.core5.http.config.H1Config;
55  import org.apache.hc.core5.http.io.SocketConfig;
56  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
57  import org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnectionFactory;
58  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
59  import org.apache.hc.core5.http.io.EofSensorInputStream;
60  import org.apache.hc.core5.http.io.EofSensorWatcher;
61  import org.apache.hc.core5.http.io.HttpClientConnection;
62  import org.apache.hc.core5.http.io.HttpClientResponseHandler;
63  import org.apache.hc.core5.http.io.HttpConnectionFactory;
64  import org.apache.hc.core5.http.io.HttpResponseInformationCallback;
65  import org.apache.hc.core5.http.io.entity.EntityUtils;
66  import org.apache.hc.core5.http.io.entity.HttpEntityWrapper;
67  import org.apache.hc.core5.http.protocol.HttpContext;
68  import org.apache.hc.core5.http.protocol.HttpProcessor;
69  import org.apache.hc.core5.io.ModalCloseable;
70  import org.apache.hc.core5.io.CloseMode;
71  import org.apache.hc.core5.net.URIAuthority;
72  import org.apache.hc.core5.pool.ConnPoolControl;
73  import org.apache.hc.core5.pool.ManagedConnPool;
74  import org.apache.hc.core5.pool.PoolEntry;
75  import org.apache.hc.core5.pool.PoolStats;
76  import org.apache.hc.core5.util.Args;
77  import org.apache.hc.core5.util.TimeValue;
78  import org.apache.hc.core5.util.Timeout;
79  
80  /**
81   * HTTP/1.1 client side message exchange initiator.
82   *
83   * @since 5.0
84   */
85  public class HttpRequester implements ConnPoolControl<HttpHost>, ModalCloseable {
86  
87      private final HttpRequestExecutor requestExecutor;
88      private final HttpProcessor httpProcessor;
89      private final ManagedConnPool<HttpHost, HttpClientConnection> connPool;
90      private final SocketConfig socketConfig;
91      private final HttpConnectionFactory<? extends HttpClientConnection> connectFactory;
92      private final SSLSocketFactory sslSocketFactory;
93      private final Resolver<HttpHost, InetSocketAddress> addressResolver;
94  
95      /**
96       * Use {@link RequesterBootstrap} to create instances of this class.
97       */
98      @Internal
99      public HttpRequester(
100             final HttpRequestExecutor requestExecutor,
101             final HttpProcessor httpProcessor,
102             final ManagedConnPool<HttpHost, HttpClientConnection> connPool,
103             final SocketConfig socketConfig,
104             final HttpConnectionFactory<? extends HttpClientConnection> connectFactory,
105             final SSLSocketFactory sslSocketFactory,
106             final Resolver<HttpHost, InetSocketAddress> addressResolver) {
107         this.requestExecutor = Args.notNull(requestExecutor, "Request executor");
108         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
109         this.connPool = Args.notNull(connPool, "Connection pool");
110         this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
111         this.connectFactory = connectFactory != null ? connectFactory : new DefaultBHttpClientConnectionFactory(
112                 H1Config.DEFAULT, CharCodingConfig.DEFAULT);
113         this.sslSocketFactory = sslSocketFactory != null ? sslSocketFactory : (SSLSocketFactory) SSLSocketFactory.getDefault();
114         this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
115     }
116 
117     @Override
118     public PoolStats getTotalStats() {
119         return connPool.getTotalStats();
120     }
121 
122     @Override
123     public PoolStats getStats(final HttpHost route) {
124         return connPool.getStats(route);
125     }
126 
127     @Override
128     public void setMaxTotal(final int max) {
129         connPool.setMaxTotal(max);
130     }
131 
132     @Override
133     public int getMaxTotal() {
134         return connPool.getMaxTotal();
135     }
136 
137     @Override
138     public void setDefaultMaxPerRoute(final int max) {
139         connPool.setDefaultMaxPerRoute(max);
140     }
141 
142     @Override
143     public int getDefaultMaxPerRoute() {
144         return connPool.getDefaultMaxPerRoute();
145     }
146 
147     @Override
148     public void setMaxPerRoute(final HttpHost route, final int max) {
149         connPool.setMaxPerRoute(route, max);
150     }
151 
152     @Override
153     public int getMaxPerRoute(final HttpHost route) {
154         return connPool.getMaxPerRoute(route);
155     }
156 
157     @Override
158     public void closeIdle(final TimeValue idleTime) {
159         connPool.closeIdle(idleTime);
160     }
161 
162     @Override
163     public void closeExpired() {
164         connPool.closeExpired();
165     }
166 
167     @Override
168     public Set<HttpHost> getRoutes() {
169         return connPool.getRoutes();
170     }
171 
172     public ClassicHttpResponse execute(
173             final HttpClientConnection connection,
174             final ClassicHttpRequest request,
175             final HttpResponseInformationCallback informationCallback,
176             final HttpContext context) throws HttpException, IOException {
177         Args.notNull(connection, "HTTP connection");
178         Args.notNull(request, "HTTP request");
179         Args.notNull(context, "HTTP context");
180         if (!connection.isOpen()) {
181             throw new ConnectionClosedException();
182         }
183         requestExecutor.preProcess(request, httpProcessor, context);
184         final ClassicHttpResponse response = requestExecutor.execute(request, connection, informationCallback, context);
185         requestExecutor.postProcess(response, httpProcessor, context);
186         return response;
187     }
188 
189     public ClassicHttpResponse execute(
190             final HttpClientConnection connection,
191             final ClassicHttpRequest request,
192             final HttpContext context) throws HttpException, IOException {
193         return execute(connection, request, null, context);
194     }
195 
196     public boolean keepAlive(
197             final HttpClientConnection connection,
198             final ClassicHttpRequest request,
199             final ClassicHttpResponse response,
200             final HttpContext context) throws IOException {
201         final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
202         if (!keepAlive) {
203             connection.close();
204         }
205         return keepAlive;
206     }
207 
208     public <T> T execute(
209             final HttpClientConnection connection,
210             final ClassicHttpRequest request,
211             final HttpContext context,
212             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
213         try (final ClassicHttpResponse response = execute(connection, request, context)) {
214             final T result = responseHandler.handleResponse(response);
215             EntityUtils.consume(response.getEntity());
216             final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
217             if (!keepAlive) {
218                 connection.close();
219             }
220             return result;
221         } catch (final HttpException | IOException | RuntimeException ex) {
222             connection.close(CloseMode.IMMEDIATE);
223             throw ex;
224         }
225     }
226 
227     private Socket createSocket(final HttpHost targetHost) throws IOException {
228         final Socket sock = new Socket();
229         sock.setSoTimeout(socketConfig.getSoTimeout().toMillisIntBound());
230         sock.setReuseAddress(socketConfig.isSoReuseAddress());
231         sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
232         sock.setKeepAlive(socketConfig.isSoKeepAlive());
233         if (socketConfig.getRcvBufSize() > 0) {
234             sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
235         }
236         if (socketConfig.getSndBufSize() > 0) {
237             sock.setSendBufferSize(socketConfig.getSndBufSize());
238         }
239         final int linger = socketConfig.getSoLinger().toMillisIntBound();
240         if (linger >= 0) {
241             sock.setSoLinger(true, linger);
242         }
243 
244         final InetSocketAddress targetAddress = addressResolver.resolve(targetHost);
245         sock.connect(targetAddress, socketConfig.getSoTimeout().toMillisIntBound());
246         if (URIScheme.HTTPS.same(targetHost.getSchemeName())) {
247             return sslSocketFactory.createSocket(sock, targetHost.getHostName(), targetAddress.getPort(), true);
248         }
249         return sock;
250     }
251 
252     public ClassicHttpResponse execute(
253             final HttpHost targetHost,
254             final ClassicHttpRequest request,
255             final HttpResponseInformationCallback informationCallback,
256             final Timeout connectTimeout,
257             final HttpContext context) throws HttpException, IOException {
258         Args.notNull(targetHost, "HTTP host");
259         Args.notNull(request, "HTTP request");
260         final Future<PoolEntry<HttpHost, HttpClientConnection>> leaseFuture = connPool.lease(targetHost, null, connectTimeout, null);
261         final PoolEntry<HttpHost, HttpClientConnection> poolEntry;
262         final Timeout timeout = Timeout.defaultsToDisabled(connectTimeout);
263         try {
264             poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
265         } catch (final InterruptedException ex) {
266             throw new InterruptedIOException(ex.getMessage());
267         } catch (final ExecutionException ex) {
268             throw new HttpException("Unexpected failure leasing connection", ex);
269         } catch (final TimeoutException ex) {
270             throw new ConnectionRequestTimeoutException("Connection request timeout");
271         }
272         final PoolEntryHolder connectionHolder = new PoolEntryHolder(poolEntry);
273         try {
274             HttpClientConnection connection = poolEntry.getConnection();
275             if (connection == null) {
276                 final Socket socket = createSocket(targetHost);
277                 connection = connectFactory.createConnection(socket);
278                 poolEntry.assignConnection(connection);
279             }
280             if (request.getAuthority() == null) {
281                 request.setAuthority(new URIAuthority(targetHost.getHostName(), targetHost.getPort()));
282             }
283             final ClassicHttpResponse response = execute(connection, request, informationCallback, context);
284             final HttpEntity entity = response.getEntity();
285             if (entity != null) {
286                 response.setEntity(new HttpEntityWrapper(entity) {
287 
288                     private void releaseConnection() throws IOException {
289                         try {
290                             final HttpClientConnection localConn = connectionHolder.getConnection();
291                             if (localConn != null) {
292                                 if (requestExecutor.keepAlive(request, response, localConn, context)) {
293                                     if (super.isStreaming()) {
294                                         final InputStream content = super.getContent();
295                                         if (content != null) {
296                                             content.close();
297                                         }
298                                     }
299                                     connectionHolder.releaseConnection();
300                                 }
301                             }
302                         } finally {
303                             connectionHolder.discardConnection();
304                         }
305                     }
306 
307                     private void abortConnection() {
308                         connectionHolder.discardConnection();
309                     }
310 
311                     @Override
312                     public boolean isStreaming() {
313                         return true;
314                     }
315 
316                     @Override
317                     public InputStream getContent() throws IOException {
318                         return new EofSensorInputStream(super.getContent(), new EofSensorWatcher() {
319 
320                             @Override
321                             public boolean eofDetected(final InputStream wrapped) throws IOException {
322                                 releaseConnection();
323                                 return false;
324                             }
325 
326                             @Override
327                             public boolean streamClosed(final InputStream wrapped) throws IOException {
328                                 releaseConnection();
329                                 return false;
330                             }
331 
332                             @Override
333                             public boolean streamAbort(final InputStream wrapped) throws IOException {
334                                 abortConnection();
335                                 return false;
336                             }
337 
338                         });
339                     }
340 
341                     @Override
342                     public void writeTo(final OutputStream outStream) throws IOException {
343                         try {
344                             if (outStream != null) {
345                                 super.writeTo(outStream);
346                             }
347                             close();
348                         } catch (final IOException | RuntimeException ex) {
349                             abortConnection();
350                         }
351                     }
352 
353                     @Override
354                     public void close() throws IOException {
355                         releaseConnection();
356                     }
357 
358                 });
359             } else {
360                 final HttpClientConnection localConn = connectionHolder.getConnection();
361                 if (!requestExecutor.keepAlive(request, response, localConn, context)) {
362                     localConn.close();
363                 }
364                 connectionHolder.releaseConnection();
365             }
366             return response;
367         } catch (final HttpException | IOException | RuntimeException ex) {
368             connectionHolder.discardConnection();
369             throw ex;
370         }
371     }
372 
373     public ClassicHttpResponse execute(
374             final HttpHost targetHost,
375             final ClassicHttpRequest request,
376             final Timeout connectTimeout,
377             final HttpContext context) throws HttpException, IOException {
378         return execute(targetHost, request, null, connectTimeout, context);
379     }
380 
381     public <T> T  execute(
382             final HttpHost targetHost,
383             final ClassicHttpRequest request,
384             final Timeout connectTimeout,
385             final HttpContext context,
386             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
387         try (final ClassicHttpResponse response = execute(targetHost, request, null, connectTimeout, context)) {
388             final T result = responseHandler.handleResponse(response);
389             EntityUtils.consume(response.getEntity());
390             return result;
391         }
392     }
393 
394     public ConnPoolControl<HttpHost> getConnPoolControl() {
395         return connPool;
396     }
397 
398     @Override
399     public void close(final CloseMode closeMode) {
400         connPool.close(closeMode);
401     }
402 
403     @Override
404     public void close() throws IOException {
405         connPool.close();
406     }
407 
408     private class PoolEntryHolder {
409 
410         private final AtomicReference<PoolEntry<HttpHost, HttpClientConnection>> poolEntryRef;
411 
412         PoolEntryHolder(final PoolEntry<HttpHost, HttpClientConnection> poolEntry) {
413             this.poolEntryRef = new AtomicReference<>(poolEntry);
414         }
415 
416         HttpClientConnection getConnection() {
417             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.get();
418             return poolEntry != null ? poolEntry.getConnection() : null;
419         }
420 
421         void releaseConnection() {
422             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
423             if (poolEntry != null) {
424                 final HttpClientConnection connection = poolEntry.getConnection();
425                 connPool.release(poolEntry, connection != null && connection.isOpen());
426             }
427         }
428 
429         void discardConnection() {
430             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
431             if (poolEntry != null) {
432                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
433                 connPool.release(poolEntry, false);
434             }
435         }
436 
437     }
438 
439 }