View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.bootstrap;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.InterruptedIOException;
32  import java.io.OutputStream;
33  import java.net.InetSocketAddress;
34  import java.net.Socket;
35  import java.util.Set;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  import javax.net.ssl.SSLSocketFactory;
42  
43  import org.apache.hc.core5.function.Resolver;
44  import org.apache.hc.core5.http.ClassicHttpRequest;
45  import org.apache.hc.core5.http.ClassicHttpResponse;
46  import org.apache.hc.core5.http.ConnectionClosedException;
47  import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
48  import org.apache.hc.core5.http.HttpEntity;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpHost;
51  import org.apache.hc.core5.http.URIScheme;
52  import org.apache.hc.core5.http.config.CharCodingConfig;
53  import org.apache.hc.core5.http.config.H1Config;
54  import org.apache.hc.core5.http.config.SocketConfig;
55  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
56  import org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnectionFactory;
57  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
58  import org.apache.hc.core5.http.io.EofSensorInputStream;
59  import org.apache.hc.core5.http.io.EofSensorWatcher;
60  import org.apache.hc.core5.http.io.HttpClientConnection;
61  import org.apache.hc.core5.http.io.HttpClientResponseHandler;
62  import org.apache.hc.core5.http.io.HttpConnectionFactory;
63  import org.apache.hc.core5.http.io.HttpResponseInformationCallback;
64  import org.apache.hc.core5.http.io.entity.EntityUtils;
65  import org.apache.hc.core5.http.io.entity.HttpEntityWrapper;
66  import org.apache.hc.core5.http.protocol.HttpContext;
67  import org.apache.hc.core5.http.protocol.HttpProcessor;
68  import org.apache.hc.core5.io.GracefullyCloseable;
69  import org.apache.hc.core5.io.ShutdownType;
70  import org.apache.hc.core5.net.URIAuthority;
71  import org.apache.hc.core5.pool.ConnPoolControl;
72  import org.apache.hc.core5.pool.ManagedConnPool;
73  import org.apache.hc.core5.pool.PoolEntry;
74  import org.apache.hc.core5.pool.PoolStats;
75  import org.apache.hc.core5.util.Args;
76  import org.apache.hc.core5.util.TimeValue;
77  import org.apache.hc.core5.util.Timeout;
78  
79  /**
80   * @since 5.0
81   */
82  public class HttpRequester implements ConnPoolControl<HttpHost>, GracefullyCloseable {
83  
84      private final HttpRequestExecutor requestExecutor;
85      private final HttpProcessor httpProcessor;
86      private final ManagedConnPool<HttpHost, HttpClientConnection> connPool;
87      private final SocketConfig socketConfig;
88      private final HttpConnectionFactory<? extends HttpClientConnection> connectFactory;
89      private final SSLSocketFactory sslSocketFactory;
90      private final Resolver<HttpHost, InetSocketAddress> addressResolver;
91  
92      public HttpRequester(
93              final HttpRequestExecutor requestExecutor,
94              final HttpProcessor httpProcessor,
95              final ManagedConnPool<HttpHost, HttpClientConnection> connPool,
96              final SocketConfig socketConfig,
97              final HttpConnectionFactory<? extends HttpClientConnection> connectFactory,
98              final SSLSocketFactory sslSocketFactory,
99              final Resolver<HttpHost, InetSocketAddress> addressResolver) {
100         this.requestExecutor = Args.notNull(requestExecutor, "Request executor");
101         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
102         this.connPool = Args.notNull(connPool, "Connection pool");
103         this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
104         this.connectFactory = connectFactory != null ? connectFactory : new DefaultBHttpClientConnectionFactory(
105                 H1Config.DEFAULT, CharCodingConfig.DEFAULT);
106         this.sslSocketFactory = sslSocketFactory != null ? sslSocketFactory : (SSLSocketFactory) SSLSocketFactory.getDefault();
107         this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
108     }
109 
110     @Override
111     public PoolStats getTotalStats() {
112         return connPool.getTotalStats();
113     }
114 
115     @Override
116     public PoolStats getStats(final HttpHost route) {
117         return connPool.getStats(route);
118     }
119 
120     @Override
121     public void setMaxTotal(final int max) {
122         connPool.setMaxTotal(max);
123     }
124 
125     @Override
126     public int getMaxTotal() {
127         return connPool.getMaxTotal();
128     }
129 
130     @Override
131     public void setDefaultMaxPerRoute(final int max) {
132         connPool.setDefaultMaxPerRoute(max);
133     }
134 
135     @Override
136     public int getDefaultMaxPerRoute() {
137         return connPool.getDefaultMaxPerRoute();
138     }
139 
140     @Override
141     public void setMaxPerRoute(final HttpHost route, final int max) {
142         connPool.setMaxPerRoute(route, max);
143     }
144 
145     @Override
146     public int getMaxPerRoute(final HttpHost route) {
147         return connPool.getMaxPerRoute(route);
148     }
149 
150     @Override
151     public void closeIdle(final TimeValue idleTime) {
152         connPool.closeIdle(idleTime);
153     }
154 
155     @Override
156     public void closeExpired() {
157         connPool.closeExpired();
158     }
159 
160     @Override
161     public Set<HttpHost> getRoutes() {
162         return connPool.getRoutes();
163     }
164 
165     public ClassicHttpResponse execute(
166             final HttpClientConnection connection,
167             final ClassicHttpRequest request,
168             final HttpResponseInformationCallback informationCallback,
169             final HttpContext context) throws HttpException, IOException {
170         Args.notNull(connection, "HTTP connection");
171         Args.notNull(request, "HTTP request");
172         Args.notNull(context, "HTTP context");
173         if (!connection.isOpen()) {
174             throw new ConnectionClosedException("Connection is closed");
175         }
176         requestExecutor.preProcess(request, httpProcessor, context);
177         final ClassicHttpResponse response = requestExecutor.execute(request, connection, informationCallback, context);
178         requestExecutor.postProcess(response, httpProcessor, context);
179         return response;
180     }
181 
182     public ClassicHttpResponse execute(
183             final HttpClientConnection connection,
184             final ClassicHttpRequest request,
185             final HttpContext context) throws HttpException, IOException {
186         return execute(connection, request, null, context);
187     }
188 
189     public boolean keepAlive(
190             final HttpClientConnection connection,
191             final ClassicHttpRequest request,
192             final ClassicHttpResponse response,
193             final HttpContext context) throws IOException {
194         final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
195         if (!keepAlive) {
196             connection.close();
197         }
198         return keepAlive;
199     }
200 
201     public <T> T execute(
202             final HttpClientConnection connection,
203             final ClassicHttpRequest request,
204             final HttpContext context,
205             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
206         try (final ClassicHttpResponse response = execute(connection, request, context)) {
207             final T result = responseHandler.handleResponse(response);
208             EntityUtils.consume(response.getEntity());
209             final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
210             if (!keepAlive) {
211                 connection.close();
212             }
213             return result;
214         } catch (final HttpException | IOException | RuntimeException ex) {
215             connection.shutdown(ShutdownType.IMMEDIATE);
216             throw ex;
217         }
218     }
219 
220     private Socket createSocket(final HttpHost targetHost) throws IOException {
221         final Socket sock = new Socket();
222         sock.setSoTimeout(socketConfig.getSoTimeout().toMillisIntBound());
223         sock.setReuseAddress(socketConfig.isSoReuseAddress());
224         sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
225         sock.setKeepAlive(socketConfig.isSoKeepAlive());
226         if (socketConfig.getRcvBufSize() > 0) {
227             sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
228         }
229         if (socketConfig.getSndBufSize() > 0) {
230             sock.setSendBufferSize(socketConfig.getSndBufSize());
231         }
232         final int linger = socketConfig.getSoLinger().toMillisIntBound();
233         if (linger >= 0) {
234             sock.setSoLinger(true, linger);
235         }
236 
237         final InetSocketAddress targetAddress = addressResolver.resolve(targetHost);
238         sock.connect(targetAddress, socketConfig.getSoTimeout().toMillisIntBound());
239         if (URIScheme.HTTPS.same(targetHost.getSchemeName())) {
240             return sslSocketFactory.createSocket(sock, targetHost.getHostName(), targetAddress.getPort(), true);
241         } else {
242             return sock;
243         }
244     }
245 
246     public ClassicHttpResponse execute(
247             final HttpHost targetHost,
248             final ClassicHttpRequest request,
249             final HttpResponseInformationCallback informationCallback,
250             final Timeout connectTimeout,
251             final HttpContext context) throws HttpException, IOException {
252         Args.notNull(targetHost, "HTTP host");
253         Args.notNull(request, "HTTP request");
254         final Future<PoolEntry<HttpHost, HttpClientConnection>> leaseFuture = connPool.lease(targetHost, null, connectTimeout, null);
255         final PoolEntry<HttpHost, HttpClientConnection> poolEntry;
256         final Timeout timeout = Timeout.defaultsToDisabled(connectTimeout);
257         try {
258             poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
259         } catch (final InterruptedException ex) {
260             throw new InterruptedIOException(ex.getMessage());
261         } catch (final ExecutionException ex) {
262             throw new HttpException("Unexpected failure leasing connection", ex);
263         } catch (final TimeoutException ex) {
264             throw new ConnectionRequestTimeoutException("Connection request timeout");
265         }
266         final PoolEntryHolder connectionHolder = new PoolEntryHolder(poolEntry);
267         try {
268             HttpClientConnection connection = poolEntry.getConnection();
269             if (connection == null) {
270                 final Socket socket = createSocket(targetHost);
271                 connection = connectFactory.createConnection(socket);
272                 poolEntry.assignConnection(connection);
273             }
274             if (request.getAuthority() == null) {
275                 request.setAuthority(new URIAuthority(targetHost.getHostName(), targetHost.getPort()));
276             }
277             final ClassicHttpResponse response = execute(connection, request, informationCallback, context);
278             final HttpEntity entity = response.getEntity();
279             if (entity != null) {
280                 response.setEntity(new HttpEntityWrapper(entity) {
281 
282                     private void releaseConnection() throws IOException {
283                         try {
284                             final HttpClientConnection localConn = connectionHolder.getConnection();
285                             if (localConn != null) {
286                                 if (requestExecutor.keepAlive(request, response, localConn, context)) {
287                                     if (super.isStreaming()) {
288                                         final InputStream content = super.getContent();
289                                         if (content != null) {
290                                             content.close();
291                                         }
292                                     }
293                                     connectionHolder.releaseConnection();
294                                 }
295                             }
296                         } finally {
297                             connectionHolder.discardConnection();
298                         }
299                     }
300 
301                     private void abortConnection() {
302                         connectionHolder.discardConnection();
303                     }
304 
305                     @Override
306                     public boolean isStreaming() {
307                         return true;
308                     }
309 
310                     @Override
311                     public InputStream getContent() throws IOException {
312                         return new EofSensorInputStream(super.getContent(), new EofSensorWatcher() {
313 
314                             @Override
315                             public boolean eofDetected(final InputStream wrapped) throws IOException {
316                                 releaseConnection();
317                                 return false;
318                             }
319 
320                             @Override
321                             public boolean streamClosed(final InputStream wrapped) throws IOException {
322                                 releaseConnection();
323                                 return false;
324                             }
325 
326                             @Override
327                             public boolean streamAbort(final InputStream wrapped) throws IOException {
328                                 abortConnection();
329                                 return false;
330                             }
331 
332                         });
333                     }
334 
335                     @Override
336                     public void writeTo(final OutputStream outstream) throws IOException {
337                         try {
338                             if (outstream != null) {
339                                 super.writeTo(outstream);
340                             }
341                             close();
342                         } catch (final IOException | RuntimeException ex) {
343                             abortConnection();
344                         }
345                     }
346 
347                     @Override
348                     public void close() throws IOException {
349                         releaseConnection();
350                     }
351 
352                 });
353             } else {
354                 final HttpClientConnection localConn = connectionHolder.getConnection();
355                 if (!requestExecutor.keepAlive(request, response, localConn, context)) {
356                     localConn.close();
357                 }
358                 connectionHolder.releaseConnection();
359             }
360             return response;
361         } catch (final HttpException | IOException | RuntimeException ex) {
362             connectionHolder.discardConnection();
363             throw ex;
364         }
365     }
366 
367     public ClassicHttpResponse execute(
368             final HttpHost targetHost,
369             final ClassicHttpRequest request,
370             final Timeout connectTimeout,
371             final HttpContext context) throws HttpException, IOException {
372         return execute(targetHost, request, null, connectTimeout, context);
373     }
374 
375     public <T> T  execute(
376             final HttpHost targetHost,
377             final ClassicHttpRequest request,
378             final Timeout connectTimeout,
379             final HttpContext context,
380             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
381         try (final ClassicHttpResponse response = execute(targetHost, request, null, connectTimeout, context)) {
382             final T result = responseHandler.handleResponse(response);
383             EntityUtils.consume(response.getEntity());
384             return result;
385         }
386     }
387 
388     public ConnPoolControl<HttpHost> getConnPoolControl() {
389         return connPool;
390     }
391 
392     @Override
393     public void shutdown(final ShutdownType shutdownType) {
394         connPool.shutdown(shutdownType);
395     }
396 
397     @Override
398     public void close() throws IOException {
399         connPool.close();
400     }
401 
402     private class PoolEntryHolder {
403 
404         private final AtomicReference<PoolEntry<HttpHost, HttpClientConnection>> poolEntryRef;
405 
406         PoolEntryHolder(final PoolEntry<HttpHost, HttpClientConnection> poolEntry) {
407             this.poolEntryRef = new AtomicReference<>(poolEntry);
408         }
409 
410         HttpClientConnection getConnection() {
411             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.get();
412             return poolEntry != null ? poolEntry.getConnection() : null;
413         }
414 
415         void releaseConnection() {
416             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
417             if (poolEntry != null) {
418                 final HttpClientConnection connection = poolEntry.getConnection();
419                 connPool.release(poolEntry, connection != null && connection.isOpen());
420             }
421         }
422 
423         void discardConnection() {
424             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
425             if (poolEntry != null) {
426                 poolEntry.discardConnection(ShutdownType.IMMEDIATE);
427                 connPool.release(poolEntry, false);
428             }
429         }
430 
431     }
432 
433 }