View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.bootstrap;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.InterruptedIOException;
32  import java.io.OutputStream;
33  import java.net.InetSocketAddress;
34  import java.net.Socket;
35  import java.util.Set;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  import javax.net.ssl.SSLSocketFactory;
42  
43  import org.apache.hc.core5.annotation.Internal;
44  import org.apache.hc.core5.function.Resolver;
45  import org.apache.hc.core5.http.ClassicHttpRequest;
46  import org.apache.hc.core5.http.ClassicHttpResponse;
47  import org.apache.hc.core5.http.ConnectionClosedException;
48  import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
49  import org.apache.hc.core5.http.HttpEntity;
50  import org.apache.hc.core5.http.HttpException;
51  import org.apache.hc.core5.http.HttpHost;
52  import org.apache.hc.core5.http.URIScheme;
53  import org.apache.hc.core5.http.config.CharCodingConfig;
54  import org.apache.hc.core5.http.config.H1Config;
55  import org.apache.hc.core5.http.io.SocketConfig;
56  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
57  import org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnectionFactory;
58  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
59  import org.apache.hc.core5.http.io.EofSensorInputStream;
60  import org.apache.hc.core5.http.io.EofSensorWatcher;
61  import org.apache.hc.core5.http.io.HttpClientConnection;
62  import org.apache.hc.core5.http.io.HttpClientResponseHandler;
63  import org.apache.hc.core5.http.io.HttpConnectionFactory;
64  import org.apache.hc.core5.http.io.HttpResponseInformationCallback;
65  import org.apache.hc.core5.http.io.entity.EntityUtils;
66  import org.apache.hc.core5.http.io.entity.HttpEntityWrapper;
67  import org.apache.hc.core5.http.protocol.HttpContext;
68  import org.apache.hc.core5.http.protocol.HttpProcessor;
69  import org.apache.hc.core5.io.ModalCloseable;
70  import org.apache.hc.core5.io.CloseMode;
71  import org.apache.hc.core5.io.Closer;
72  import org.apache.hc.core5.net.URIAuthority;
73  import org.apache.hc.core5.pool.ConnPoolControl;
74  import org.apache.hc.core5.pool.ManagedConnPool;
75  import org.apache.hc.core5.pool.PoolEntry;
76  import org.apache.hc.core5.pool.PoolStats;
77  import org.apache.hc.core5.util.Args;
78  import org.apache.hc.core5.util.TimeValue;
79  import org.apache.hc.core5.util.Timeout;
80  
81  /**
82   * HTTP/1.1 client side message exchange initiator.
83   *
84   * @since 5.0
85   */
86  public class HttpRequester implements ConnPoolControl<HttpHost>, ModalCloseable {
87  
88      private final HttpRequestExecutor requestExecutor;
89      private final HttpProcessor httpProcessor;
90      private final ManagedConnPool<HttpHost, HttpClientConnection> connPool;
91      private final SocketConfig socketConfig;
92      private final HttpConnectionFactory<? extends HttpClientConnection> connectFactory;
93      private final SSLSocketFactory sslSocketFactory;
94      private final Resolver<HttpHost, InetSocketAddress> addressResolver;
95  
96      /**
97       * Use {@link RequesterBootstrap} to create instances of this class.
98       */
99      @Internal
100     public HttpRequester(
101             final HttpRequestExecutor requestExecutor,
102             final HttpProcessor httpProcessor,
103             final ManagedConnPool<HttpHost, HttpClientConnection> connPool,
104             final SocketConfig socketConfig,
105             final HttpConnectionFactory<? extends HttpClientConnection> connectFactory,
106             final SSLSocketFactory sslSocketFactory,
107             final Resolver<HttpHost, InetSocketAddress> addressResolver) {
108         this.requestExecutor = Args.notNull(requestExecutor, "Request executor");
109         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
110         this.connPool = Args.notNull(connPool, "Connection pool");
111         this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
112         this.connectFactory = connectFactory != null ? connectFactory : new DefaultBHttpClientConnectionFactory(
113                 H1Config.DEFAULT, CharCodingConfig.DEFAULT);
114         this.sslSocketFactory = sslSocketFactory != null ? sslSocketFactory : (SSLSocketFactory) SSLSocketFactory.getDefault();
115         this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
116     }
117 
118     @Override
119     public PoolStats getTotalStats() {
120         return connPool.getTotalStats();
121     }
122 
123     @Override
124     public PoolStats getStats(final HttpHost route) {
125         return connPool.getStats(route);
126     }
127 
128     @Override
129     public void setMaxTotal(final int max) {
130         connPool.setMaxTotal(max);
131     }
132 
133     @Override
134     public int getMaxTotal() {
135         return connPool.getMaxTotal();
136     }
137 
138     @Override
139     public void setDefaultMaxPerRoute(final int max) {
140         connPool.setDefaultMaxPerRoute(max);
141     }
142 
143     @Override
144     public int getDefaultMaxPerRoute() {
145         return connPool.getDefaultMaxPerRoute();
146     }
147 
148     @Override
149     public void setMaxPerRoute(final HttpHost route, final int max) {
150         connPool.setMaxPerRoute(route, max);
151     }
152 
153     @Override
154     public int getMaxPerRoute(final HttpHost route) {
155         return connPool.getMaxPerRoute(route);
156     }
157 
158     @Override
159     public void closeIdle(final TimeValue idleTime) {
160         connPool.closeIdle(idleTime);
161     }
162 
163     @Override
164     public void closeExpired() {
165         connPool.closeExpired();
166     }
167 
168     @Override
169     public Set<HttpHost> getRoutes() {
170         return connPool.getRoutes();
171     }
172 
173     public ClassicHttpResponse execute(
174             final HttpClientConnection connection,
175             final ClassicHttpRequest request,
176             final HttpResponseInformationCallback informationCallback,
177             final HttpContext context) throws HttpException, IOException {
178         Args.notNull(connection, "HTTP connection");
179         Args.notNull(request, "HTTP request");
180         Args.notNull(context, "HTTP context");
181         if (!connection.isOpen()) {
182             throw new ConnectionClosedException();
183         }
184         requestExecutor.preProcess(request, httpProcessor, context);
185         final ClassicHttpResponse response = requestExecutor.execute(request, connection, informationCallback, context);
186         requestExecutor.postProcess(response, httpProcessor, context);
187         return response;
188     }
189 
190     public ClassicHttpResponse execute(
191             final HttpClientConnection connection,
192             final ClassicHttpRequest request,
193             final HttpContext context) throws HttpException, IOException {
194         return execute(connection, request, null, context);
195     }
196 
197     public boolean keepAlive(
198             final HttpClientConnection connection,
199             final ClassicHttpRequest request,
200             final ClassicHttpResponse response,
201             final HttpContext context) throws IOException {
202         final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
203         if (!keepAlive) {
204             connection.close();
205         }
206         return keepAlive;
207     }
208 
209     public <T> T execute(
210             final HttpClientConnection connection,
211             final ClassicHttpRequest request,
212             final HttpContext context,
213             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
214         try (final ClassicHttpResponse response = execute(connection, request, context)) {
215             final T result = responseHandler.handleResponse(response);
216             EntityUtils.consume(response.getEntity());
217             final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
218             if (!keepAlive) {
219                 connection.close();
220             }
221             return result;
222         } catch (final HttpException | IOException | RuntimeException ex) {
223             connection.close(CloseMode.IMMEDIATE);
224             throw ex;
225         }
226     }
227 
228     private Socket createSocket(final HttpHost targetHost) throws IOException {
229         final Socket sock = new Socket();
230         sock.setSoTimeout(socketConfig.getSoTimeout().toMillisIntBound());
231         sock.setReuseAddress(socketConfig.isSoReuseAddress());
232         sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
233         sock.setKeepAlive(socketConfig.isSoKeepAlive());
234         if (socketConfig.getRcvBufSize() > 0) {
235             sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
236         }
237         if (socketConfig.getSndBufSize() > 0) {
238             sock.setSendBufferSize(socketConfig.getSndBufSize());
239         }
240         final int linger = socketConfig.getSoLinger().toMillisIntBound();
241         if (linger >= 0) {
242             sock.setSoLinger(true, linger);
243         }
244 
245         final InetSocketAddress targetAddress = addressResolver.resolve(targetHost);
246         sock.connect(targetAddress, socketConfig.getSoTimeout().toMillisIntBound());
247         if (URIScheme.HTTPS.same(targetHost.getSchemeName())) {
248             return sslSocketFactory.createSocket(sock, targetHost.getHostName(), targetAddress.getPort(), true);
249         }
250         return sock;
251     }
252 
253     public ClassicHttpResponse execute(
254             final HttpHost targetHost,
255             final ClassicHttpRequest request,
256             final HttpResponseInformationCallback informationCallback,
257             final Timeout connectTimeout,
258             final HttpContext context) throws HttpException, IOException {
259         Args.notNull(targetHost, "HTTP host");
260         Args.notNull(request, "HTTP request");
261         final Future<PoolEntry<HttpHost, HttpClientConnection>> leaseFuture = connPool.lease(targetHost, null, connectTimeout, null);
262         final PoolEntry<HttpHost, HttpClientConnection> poolEntry;
263         final Timeout timeout = Timeout.defaultsToDisabled(connectTimeout);
264         try {
265             poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
266         } catch (final InterruptedException ex) {
267             throw new InterruptedIOException(ex.getMessage());
268         } catch (final ExecutionException ex) {
269             throw new HttpException("Unexpected failure leasing connection", ex);
270         } catch (final TimeoutException ex) {
271             throw new ConnectionRequestTimeoutException("Connection request timeout");
272         }
273         final PoolEntryHolder connectionHolder = new PoolEntryHolder(poolEntry);
274         try {
275             HttpClientConnection connection = poolEntry.getConnection();
276             if (connection == null) {
277                 final Socket socket = createSocket(targetHost);
278                 connection = connectFactory.createConnection(socket);
279                 poolEntry.assignConnection(connection);
280             }
281             if (request.getAuthority() == null) {
282                 request.setAuthority(new URIAuthority(targetHost.getHostName(), targetHost.getPort()));
283             }
284             final ClassicHttpResponse response = execute(connection, request, informationCallback, context);
285             final HttpEntity entity = response.getEntity();
286             if (entity != null) {
287                 response.setEntity(new HttpEntityWrapper(entity) {
288 
289                     private void releaseConnection() throws IOException {
290                         try {
291                             final HttpClientConnection localConn = connectionHolder.getConnection();
292                             if (localConn != null) {
293                                 if (requestExecutor.keepAlive(request, response, localConn, context)) {
294                                     if (super.isStreaming()) {
295                                         Closer.close(super.getContent());
296                                     }
297                                     connectionHolder.releaseConnection();
298                                 }
299                             }
300                         } finally {
301                             connectionHolder.discardConnection();
302                         }
303                     }
304 
305                     private void abortConnection() {
306                         connectionHolder.discardConnection();
307                     }
308 
309                     @Override
310                     public boolean isStreaming() {
311                         return true;
312                     }
313 
314                     @Override
315                     public InputStream getContent() throws IOException {
316                         return new EofSensorInputStream(super.getContent(), new EofSensorWatcher() {
317 
318                             @Override
319                             public boolean eofDetected(final InputStream wrapped) throws IOException {
320                                 releaseConnection();
321                                 return false;
322                             }
323 
324                             @Override
325                             public boolean streamClosed(final InputStream wrapped) throws IOException {
326                                 releaseConnection();
327                                 return false;
328                             }
329 
330                             @Override
331                             public boolean streamAbort(final InputStream wrapped) throws IOException {
332                                 abortConnection();
333                                 return false;
334                             }
335 
336                         });
337                     }
338 
339                     @Override
340                     public void writeTo(final OutputStream outStream) throws IOException {
341                         try {
342                             if (outStream != null) {
343                                 super.writeTo(outStream);
344                             }
345                             close();
346                         } catch (final IOException | RuntimeException ex) {
347                             abortConnection();
348                         }
349                     }
350 
351                     @Override
352                     public void close() throws IOException {
353                         releaseConnection();
354                     }
355 
356                 });
357             } else {
358                 final HttpClientConnection localConn = connectionHolder.getConnection();
359                 if (!requestExecutor.keepAlive(request, response, localConn, context)) {
360                     localConn.close();
361                 }
362                 connectionHolder.releaseConnection();
363             }
364             return response;
365         } catch (final HttpException | IOException | RuntimeException ex) {
366             connectionHolder.discardConnection();
367             throw ex;
368         }
369     }
370 
371     public ClassicHttpResponse execute(
372             final HttpHost targetHost,
373             final ClassicHttpRequest request,
374             final Timeout connectTimeout,
375             final HttpContext context) throws HttpException, IOException {
376         return execute(targetHost, request, null, connectTimeout, context);
377     }
378 
379     public <T> T  execute(
380             final HttpHost targetHost,
381             final ClassicHttpRequest request,
382             final Timeout connectTimeout,
383             final HttpContext context,
384             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
385         try (final ClassicHttpResponse response = execute(targetHost, request, null, connectTimeout, context)) {
386             final T result = responseHandler.handleResponse(response);
387             EntityUtils.consume(response.getEntity());
388             return result;
389         }
390     }
391 
392     public ConnPoolControl<HttpHost> getConnPoolControl() {
393         return connPool;
394     }
395 
396     @Override
397     public void close(final CloseMode closeMode) {
398         connPool.close(closeMode);
399     }
400 
401     @Override
402     public void close() throws IOException {
403         connPool.close();
404     }
405 
406     private class PoolEntryHolder {
407 
408         private final AtomicReference<PoolEntry<HttpHost, HttpClientConnection>> poolEntryRef;
409 
410         PoolEntryHolder(final PoolEntry<HttpHost, HttpClientConnection> poolEntry) {
411             this.poolEntryRef = new AtomicReference<>(poolEntry);
412         }
413 
414         HttpClientConnection getConnection() {
415             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.get();
416             return poolEntry != null ? poolEntry.getConnection() : null;
417         }
418 
419         void releaseConnection() {
420             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
421             if (poolEntry != null) {
422                 final HttpClientConnection connection = poolEntry.getConnection();
423                 connPool.release(poolEntry, connection != null && connection.isOpen());
424             }
425         }
426 
427         void discardConnection() {
428             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
429             if (poolEntry != null) {
430                 poolEntry.discardConnection(CloseMode.IMMEDIATE);
431                 connPool.release(poolEntry, false);
432             }
433         }
434 
435     }
436 
437 }