View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.bootstrap;
28  
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.InterruptedIOException;
32  import java.io.OutputStream;
33  import java.net.InetSocketAddress;
34  import java.net.Proxy;
35  import java.net.Socket;
36  import java.util.Set;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import javax.net.ssl.SSLHandshakeException;
43  import javax.net.ssl.SSLParameters;
44  import javax.net.ssl.SSLSession;
45  import javax.net.ssl.SSLSocket;
46  import javax.net.ssl.SSLSocketFactory;
47  
48  import org.apache.hc.core5.annotation.Internal;
49  import org.apache.hc.core5.function.Callback;
50  import org.apache.hc.core5.function.Resolver;
51  import org.apache.hc.core5.http.ClassicHttpRequest;
52  import org.apache.hc.core5.http.ClassicHttpResponse;
53  import org.apache.hc.core5.http.ConnectionClosedException;
54  import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
55  import org.apache.hc.core5.http.HttpEntity;
56  import org.apache.hc.core5.http.HttpException;
57  import org.apache.hc.core5.http.HttpHost;
58  import org.apache.hc.core5.http.URIScheme;
59  import org.apache.hc.core5.http.config.CharCodingConfig;
60  import org.apache.hc.core5.http.config.Http1Config;
61  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
62  import org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnectionFactory;
63  import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
64  import org.apache.hc.core5.http.io.EofSensorInputStream;
65  import org.apache.hc.core5.http.io.EofSensorWatcher;
66  import org.apache.hc.core5.http.io.HttpClientConnection;
67  import org.apache.hc.core5.http.io.HttpClientResponseHandler;
68  import org.apache.hc.core5.http.io.HttpConnectionFactory;
69  import org.apache.hc.core5.http.io.HttpResponseInformationCallback;
70  import org.apache.hc.core5.http.io.SocketConfig;
71  import org.apache.hc.core5.http.io.entity.EntityUtils;
72  import org.apache.hc.core5.http.io.entity.HttpEntityWrapper;
73  import org.apache.hc.core5.http.io.ssl.SSLSessionVerifier;
74  import org.apache.hc.core5.http.protocol.HttpContext;
75  import org.apache.hc.core5.http.protocol.HttpProcessor;
76  import org.apache.hc.core5.io.CloseMode;
77  import org.apache.hc.core5.io.Closer;
78  import org.apache.hc.core5.io.ModalCloseable;
79  import org.apache.hc.core5.io.SocketSupport;
80  import org.apache.hc.core5.net.URIAuthority;
81  import org.apache.hc.core5.pool.ConnPoolControl;
82  import org.apache.hc.core5.pool.ManagedConnPool;
83  import org.apache.hc.core5.pool.PoolEntry;
84  import org.apache.hc.core5.pool.PoolStats;
85  import org.apache.hc.core5.util.Args;
86  import org.apache.hc.core5.util.TimeValue;
87  import org.apache.hc.core5.util.Timeout;
88  
89  /**
90   * HTTP/1.1 client side message exchange initiator.
91   *
92   * @since 5.0
93   */
94  public class HttpRequester implements ConnPoolControl<HttpHost>, ModalCloseable {
95  
96      private final HttpRequestExecutor requestExecutor;
97      private final HttpProcessor httpProcessor;
98      private final ManagedConnPool<HttpHost, HttpClientConnection> connPool;
99      private final SocketConfig socketConfig;
100     private final HttpConnectionFactory<? extends HttpClientConnection> connectFactory;
101     private final SSLSocketFactory sslSocketFactory;
102     private final Callback<SSLParameters> sslSetupHandler;
103     private final SSLSessionVerifier sslSessionVerifier;
104     private final Resolver<HttpHost, InetSocketAddress> addressResolver;
105 
106     /**
107      * Use {@link RequesterBootstrap} to create instances of this class.
108      */
109     @Internal
110     public HttpRequester(
111             final HttpRequestExecutor requestExecutor,
112             final HttpProcessor httpProcessor,
113             final ManagedConnPool<HttpHost, HttpClientConnection> connPool,
114             final SocketConfig socketConfig,
115             final HttpConnectionFactory<? extends HttpClientConnection> connectFactory,
116             final SSLSocketFactory sslSocketFactory,
117             final Callback<SSLParameters> sslSetupHandler,
118             final SSLSessionVerifier sslSessionVerifier,
119             final Resolver<HttpHost, InetSocketAddress> addressResolver) {
120         this.requestExecutor = Args.notNull(requestExecutor, "Request executor");
121         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
122         this.connPool = Args.notNull(connPool, "Connection pool");
123         this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
124         this.connectFactory = connectFactory != null ? connectFactory : new DefaultBHttpClientConnectionFactory(
125                 Http1Config.DEFAULT, CharCodingConfig.DEFAULT);
126         this.sslSocketFactory = sslSocketFactory != null ? sslSocketFactory : (SSLSocketFactory) SSLSocketFactory.getDefault();
127         this.sslSetupHandler = sslSetupHandler;
128         this.sslSessionVerifier = sslSessionVerifier;
129         this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE;
130     }
131 
132     @Override
133     public PoolStats getTotalStats() {
134         return connPool.getTotalStats();
135     }
136 
137     @Override
138     public PoolStats getStats(final HttpHost route) {
139         return connPool.getStats(route);
140     }
141 
142     @Override
143     public void setMaxTotal(final int max) {
144         connPool.setMaxTotal(max);
145     }
146 
147     @Override
148     public int getMaxTotal() {
149         return connPool.getMaxTotal();
150     }
151 
152     @Override
153     public void setDefaultMaxPerRoute(final int max) {
154         connPool.setDefaultMaxPerRoute(max);
155     }
156 
157     @Override
158     public int getDefaultMaxPerRoute() {
159         return connPool.getDefaultMaxPerRoute();
160     }
161 
162     @Override
163     public void setMaxPerRoute(final HttpHost route, final int max) {
164         connPool.setMaxPerRoute(route, max);
165     }
166 
167     @Override
168     public int getMaxPerRoute(final HttpHost route) {
169         return connPool.getMaxPerRoute(route);
170     }
171 
172     @Override
173     public void closeIdle(final TimeValue idleTime) {
174         connPool.closeIdle(idleTime);
175     }
176 
177     @Override
178     public void closeExpired() {
179         connPool.closeExpired();
180     }
181 
182     @Override
183     public Set<HttpHost> getRoutes() {
184         return connPool.getRoutes();
185     }
186 
187     public ClassicHttpResponse execute(
188             final HttpClientConnection connection,
189             final ClassicHttpRequest request,
190             final HttpResponseInformationCallback informationCallback,
191             final HttpContext context) throws HttpException, IOException {
192         Args.notNull(connection, "HTTP connection");
193         Args.notNull(request, "HTTP request");
194         Args.notNull(context, "HTTP context");
195         if (!connection.isOpen()) {
196             throw new ConnectionClosedException();
197         }
198         requestExecutor.preProcess(request, httpProcessor, context);
199         final ClassicHttpResponse response = requestExecutor.execute(request, connection, informationCallback, context);
200         requestExecutor.postProcess(response, httpProcessor, context);
201         return response;
202     }
203 
204     public ClassicHttpResponse execute(
205             final HttpClientConnection connection,
206             final ClassicHttpRequest request,
207             final HttpContext context) throws HttpException, IOException {
208         return execute(connection, request, null, context);
209     }
210 
211     public boolean keepAlive(
212             final HttpClientConnection connection,
213             final ClassicHttpRequest request,
214             final ClassicHttpResponse response,
215             final HttpContext context) throws IOException {
216         final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
217         if (!keepAlive) {
218             connection.close();
219         }
220         return keepAlive;
221     }
222 
223     public <T> T execute(
224             final HttpClientConnection connection,
225             final ClassicHttpRequest request,
226             final HttpContext context,
227             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
228         try (final ClassicHttpResponse response = execute(connection, request, context)) {
229             final T result = responseHandler.handleResponse(response);
230             EntityUtils.consume(response.getEntity());
231             final boolean keepAlive = requestExecutor.keepAlive(request, response, connection, context);
232             if (!keepAlive) {
233                 connection.close();
234             }
235             return result;
236         } catch (final HttpException | IOException | RuntimeException ex) {
237             connection.close(CloseMode.IMMEDIATE);
238             throw ex;
239         }
240     }
241 
242     private HttpClientConnection createConnection(final Socket sock, final HttpHost targetHost) throws IOException {
243         sock.setSoTimeout(socketConfig.getSoTimeout().toMillisecondsIntBound());
244         sock.setReuseAddress(socketConfig.isSoReuseAddress());
245         sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
246         sock.setKeepAlive(socketConfig.isSoKeepAlive());
247         if (socketConfig.getRcvBufSize() > 0) {
248             sock.setReceiveBufferSize(socketConfig.getRcvBufSize());
249         }
250         if (socketConfig.getSndBufSize() > 0) {
251             sock.setSendBufferSize(socketConfig.getSndBufSize());
252         }
253         if (this.socketConfig.getTcpKeepIdle() > 0) {
254             SocketSupport.setOption(sock, SocketSupport.TCP_KEEPIDLE, this.socketConfig.getTcpKeepIdle());
255         }
256         if (this.socketConfig.getTcpKeepInterval() > 0) {
257             SocketSupport.setOption(sock, SocketSupport.TCP_KEEPINTERVAL, this.socketConfig.getTcpKeepInterval());
258         }
259         if (this.socketConfig.getTcpKeepCount() > 0) {
260             SocketSupport.setOption(sock, SocketSupport.TCP_KEEPCOUNT, this.socketConfig.getTcpKeepCount());
261         }
262         final int linger = socketConfig.getSoLinger().toMillisecondsIntBound();
263         if (linger >= 0) {
264             sock.setSoLinger(true, linger);
265         }
266 
267         final InetSocketAddress targetAddress = addressResolver.resolve(targetHost);
268         sock.connect(targetAddress, socketConfig.getSoTimeout().toMillisecondsIntBound());
269         if (URIScheme.HTTPS.same(targetHost.getSchemeName())) {
270             final SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(
271                     sock, targetHost.getHostName(), targetAddress.getPort(), false);
272             if (this.sslSetupHandler != null) {
273                 final SSLParameters sslParameters = sslSocket.getSSLParameters();
274                 this.sslSetupHandler.execute(sslParameters);
275                 sslSocket.setSSLParameters(sslParameters);
276             }
277             try {
278                 sslSocket.startHandshake();
279                 final SSLSession session = sslSocket.getSession();
280                 if (session == null) {
281                     throw new SSLHandshakeException("SSL session not available");
282                 }
283                 if (sslSessionVerifier != null) {
284                     sslSessionVerifier.verify(targetHost, session);
285                 }
286                 return connectFactory.createConnection(sslSocket, sock);
287             } catch (final IOException ex) {
288                 Closer.closeQuietly(sslSocket);
289                 throw ex;
290             }
291         }
292         return connectFactory.createConnection(sock);
293     }
294 
295     public ClassicHttpResponse execute(
296             final HttpHost targetHost,
297             final ClassicHttpRequest request,
298             final HttpResponseInformationCallback informationCallback,
299             final Timeout connectTimeout,
300             final HttpContext context) throws HttpException, IOException {
301         Args.notNull(targetHost, "HTTP host");
302         Args.notNull(request, "HTTP request");
303         final Future<PoolEntry<HttpHost, HttpClientConnection>> leaseFuture = connPool.lease(targetHost, null, connectTimeout, null);
304         final PoolEntry<HttpHost, HttpClientConnection> poolEntry;
305         final Timeout timeout = Timeout.defaultsToInfinite(connectTimeout);
306         try {
307             poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
308         } catch (final InterruptedException ex) {
309             Thread.currentThread().interrupt();
310             throw new InterruptedIOException(ex.getMessage());
311         } catch (final ExecutionException ex) {
312             throw new HttpException("Unexpected failure leasing connection", ex);
313         } catch (final TimeoutException ex) {
314             throw new ConnectionRequestTimeoutException("Connection request timeout");
315         }
316         final PoolEntryHolder connectionHolder = new PoolEntryHolder(poolEntry);
317         try {
318             HttpClientConnection connection = poolEntry.getConnection();
319             if (connection == null) {
320                 final Socket sock;
321                 if (socketConfig.getSocksProxyAddress() != null) {
322                     sock = new Socket(new Proxy(Proxy.Type.SOCKS, socketConfig.getSocksProxyAddress()));
323                 } else {
324                     sock = new Socket();
325                 }
326                 try {
327                     connection = createConnection(sock, targetHost);
328                     poolEntry.assignConnection(connection);
329                 } catch (IOException | RuntimeException ex) {
330                     Closer.closeQuietly(sock);
331                     throw ex;
332                 }
333             }
334             if (request.getAuthority() == null) {
335                 request.setAuthority(new URIAuthority(targetHost.getHostName(), targetHost.getPort()));
336             }
337             final ClassicHttpResponse response = execute(connection, request, informationCallback, context);
338             final HttpEntity entity = response.getEntity();
339             if (entity != null) {
340                 response.setEntity(new HttpEntityWrapper(entity) {
341 
342                     private void releaseConnection() throws IOException {
343                         try {
344                             final HttpClientConnection localConn = connectionHolder.getConnection();
345                             if (localConn != null) {
346                                 if (requestExecutor.keepAlive(request, response, localConn, context)) {
347                                     if (super.isStreaming()) {
348                                         Closer.close(super.getContent());
349                                     }
350                                     connectionHolder.releaseConnection();
351                                 }
352                             }
353                         } finally {
354                             connectionHolder.discardConnection();
355                         }
356                     }
357 
358                     private void abortConnection() {
359                         connectionHolder.discardConnection();
360                     }
361 
362                     @Override
363                     public boolean isStreaming() {
364                         return true;
365                     }
366 
367                     @Override
368                     public InputStream getContent() throws IOException {
369                         return new EofSensorInputStream(super.getContent(), new EofSensorWatcher() {
370 
371                             @Override
372                             public boolean eofDetected(final InputStream wrapped) throws IOException {
373                                 releaseConnection();
374                                 return false;
375                             }
376 
377                             @Override
378                             public boolean streamClosed(final InputStream wrapped) throws IOException {
379                                 releaseConnection();
380                                 return false;
381                             }
382 
383                             @Override
384                             public boolean streamAbort(final InputStream wrapped) throws IOException {
385                                 abortConnection();
386                                 return false;
387                             }
388 
389                         });
390                     }
391 
392                     @Override
393                     public void writeTo(final OutputStream outStream) throws IOException {
394                         try {
395                             if (outStream != null) {
396                                 super.writeTo(outStream);
397                             }
398                             close();
399                         } catch (final IOException | RuntimeException ex) {
400                             abortConnection();
401                         }
402                     }
403 
404                     @Override
405                     public void close() throws IOException {
406                         releaseConnection();
407                     }
408 
409                 });
410             } else {
411                 final HttpClientConnection localConn = connectionHolder.getConnection();
412                 if (!requestExecutor.keepAlive(request, response, localConn, context)) {
413                     localConn.close();
414                 }
415                 connectionHolder.releaseConnection();
416             }
417             return response;
418         } catch (final HttpException | IOException | RuntimeException ex) {
419             connectionHolder.discardConnection();
420             throw ex;
421         }
422     }
423 
424     public ClassicHttpResponse execute(
425             final HttpHost targetHost,
426             final ClassicHttpRequest request,
427             final Timeout connectTimeout,
428             final HttpContext context) throws HttpException, IOException {
429         return execute(targetHost, request, null, connectTimeout, context);
430     }
431 
432     public <T> T  execute(
433             final HttpHost targetHost,
434             final ClassicHttpRequest request,
435             final Timeout connectTimeout,
436             final HttpContext context,
437             final HttpClientResponseHandler<T> responseHandler) throws HttpException, IOException {
438         try (final ClassicHttpResponse response = execute(targetHost, request, null, connectTimeout, context)) {
439             final T result = responseHandler.handleResponse(response);
440             EntityUtils.consume(response.getEntity());
441             return result;
442         }
443     }
444 
445     public ConnPoolControl<HttpHost> getConnPoolControl() {
446         return connPool;
447     }
448 
449     @Override
450     public void close(final CloseMode closeMode) {
451         connPool.close(closeMode);
452     }
453 
454     @Override
455     public void close() throws IOException {
456         connPool.close();
457     }
458 
459     private class PoolEntryHolder {
460 
461         private final AtomicReference<PoolEntry<HttpHost, HttpClientConnection>> poolEntryRef;
462 
463         PoolEntryHolder(final PoolEntry<HttpHost, HttpClientConnection> poolEntry) {
464             this.poolEntryRef = new AtomicReference<>(poolEntry);
465         }
466 
467         HttpClientConnection getConnection() {
468             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.get();
469             return poolEntry != null ? poolEntry.getConnection() : null;
470         }
471 
472         void releaseConnection() {
473             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
474             if (poolEntry != null) {
475                 final HttpClientConnection connection = poolEntry.getConnection();
476                 connPool.release(poolEntry, connection != null && connection.isOpen());
477             }
478         }
479 
480         void discardConnection() {
481             final PoolEntry<HttpHost, HttpClientConnection> poolEntry = poolEntryRef.getAndSet(null);
482             if (poolEntry != null) {
483                 poolEntry.discardConnection(CloseMode.GRACEFUL);
484                 connPool.release(poolEntry, false);
485             }
486         }
487 
488     }
489 
490 }