View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.async;
28  
29  import java.io.IOException;
30  
31  import org.apache.hc.client5.http.HttpRequestRetryStrategy;
32  import org.apache.hc.client5.http.async.AsyncExecCallback;
33  import org.apache.hc.client5.http.async.AsyncExecChain;
34  import org.apache.hc.client5.http.async.AsyncExecChainHandler;
35  import org.apache.hc.client5.http.impl.ChainElement;
36  import org.apache.hc.client5.http.protocol.HttpClientContext;
37  import org.apache.hc.core5.annotation.Contract;
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.annotation.ThreadingBehavior;
40  import org.apache.hc.core5.http.EntityDetails;
41  import org.apache.hc.core5.http.HttpException;
42  import org.apache.hc.core5.http.HttpHost;
43  import org.apache.hc.core5.http.HttpRequest;
44  import org.apache.hc.core5.http.HttpResponse;
45  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
46  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
47  import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
48  import org.apache.hc.core5.http.support.BasicRequestBuilder;
49  import org.apache.hc.core5.util.Args;
50  import org.apache.hc.core5.util.TimeValue;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  
54  /**
55   * Request executor in the asynchronous request execution chain that is
56   * responsible for making a decision whether a request that failed due to
57   * an I/O exception or received a specific response from the target server should
58   * be re-executed.
59   * <p>
60   * Further responsibilities such as communication with the opposite
61   * endpoint is delegated to the next executor in the request execution
62   * chain.
63   * </p>
64   * <p>
65   * If this handler is active, pay particular attention to the placement
66   * of other handlers within the handler chain relative to the retry handler.
67   * Use {@link ChainElement#RETRY} as name when referring to this handler.
68   * </p>
69   * <p>
70   * If a custom handler is placed <b>before</b> the retry handler, the handler will
71   * see the initial request and the final outcome after the last retry. Elapsed time
72   * will account for any delays imposed by the retry handler.
73   * </p>
74   *
75   * <p>
76   * A custom handler which is placed <b>after</b> the retry handler will be invoked for
77   * each individual retry. Elapsed time will measure each individual http request,
78   * without the delay imposed by the retry handler.
79   * </p>
80   *
81   * @since 5.0
82   */
83  @Contract(threading = ThreadingBehavior.STATELESS)
84  @Internal
85  public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
86  
87      private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpRequestRetryExec.class);
88  
89      private final HttpRequestRetryStrategy retryStrategy;
90  
91      public AsyncHttpRequestRetryExec(final HttpRequestRetryStrategy retryStrategy) {
92          Args.notNull(retryStrategy, "retryStrategy");
93          this.retryStrategy = retryStrategy;
94      }
95  
96      private static class State {
97  
98          volatile boolean retrying;
99          volatile int status;
100         volatile TimeValue delay;
101 
102     }
103 
104     private void internalExecute(
105             final State state,
106             final HttpRequest request,
107             final AsyncEntityProducer entityProducer,
108             final AsyncExecChain.Scope scope,
109             final AsyncExecChain chain,
110             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
111 
112         final String exchangeId = scope.exchangeId;
113 
114         chain.proceed(BasicRequestBuilder.copy(request).build(), entityProducer, scope, new AsyncExecCallback() {
115 
116             @Override
117             public AsyncDataConsumer handleResponse(
118                     final HttpResponse response,
119                     final EntityDetails entityDetails) throws HttpException, IOException {
120                 final HttpClientContext clientContext = scope.clientContext;
121                 if (entityProducer != null && !entityProducer.isRepeatable()) {
122                     if (LOG.isDebugEnabled()) {
123                         LOG.debug("{} cannot retry non-repeatable request", exchangeId);
124                     }
125                     return asyncExecCallback.handleResponse(response, entityDetails);
126                 }
127                 state.retrying = retryStrategy.retryRequest(response, scope.execCount.get(), clientContext);
128                 if (state.retrying) {
129                     state.status = response.getCode();
130                     state.delay = retryStrategy.getRetryInterval(response, scope.execCount.get(), clientContext);
131                     return new DiscardingEntityConsumer<>();
132                 }
133                 return asyncExecCallback.handleResponse(response, entityDetails);
134             }
135 
136             @Override
137             public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
138                 asyncExecCallback.handleInformationResponse(response);
139             }
140 
141             @Override
142             public void completed() {
143                 if (state.retrying) {
144                     final int execCount = scope.execCount.incrementAndGet();
145                     if (entityProducer != null) {
146                        entityProducer.releaseResources();
147                     }
148                     final HttpHost target = scope.route.getTargetHost();
149                     final TimeValue delay = TimeValue.isPositive(state.delay) ? state.delay : TimeValue.ZERO_MILLISECONDS;
150                     if (LOG.isInfoEnabled()) {
151                         LOG.info("{} {} responded with status {}; " +
152                                         "request will be automatically re-executed in {} (exec count {})",
153                                 exchangeId, target, state.status, delay, execCount);
154                     }
155                     scope.scheduler.scheduleExecution(
156                             request,
157                             entityProducer,
158                             scope,
159                             (r, e, s, c) -> execute(r, e, s, chain, c),
160                             asyncExecCallback,
161                             delay);
162                 } else {
163                     asyncExecCallback.completed();
164                 }
165             }
166 
167             @Override
168             public void failed(final Exception cause) {
169                 if (cause instanceof IOException) {
170                     final HttpHost target = scope.route.getTargetHost();
171                     final HttpClientContext clientContext = scope.clientContext;
172                     if (entityProducer != null && !entityProducer.isRepeatable()) {
173                         if (LOG.isDebugEnabled()) {
174                             LOG.debug("{} cannot retry non-repeatable request", exchangeId);
175                         }
176                     } else if (retryStrategy.retryRequest(request, (IOException) cause, scope.execCount.get(), clientContext)) {
177                         if (LOG.isDebugEnabled()) {
178                             LOG.debug("{} {}", exchangeId, cause.getMessage(), cause);
179                         }
180                         scope.execRuntime.discardEndpoint();
181                         if (entityProducer != null) {
182                             entityProducer.releaseResources();
183                         }
184                         state.retrying = true;
185                         final int execCount = scope.execCount.incrementAndGet();
186                         state.delay = retryStrategy.getRetryInterval(request, (IOException) cause, execCount - 1, clientContext);
187                         final TimeValue delay = TimeValue.isPositive(state.delay) ? state.delay : TimeValue.ZERO_MILLISECONDS;
188                         if (LOG.isInfoEnabled()) {
189                             LOG.info("{} recoverable I/O exception ({}) caught when sending request to {};" +
190                                             "request will be automatically re-executed in {} (exec count {})",
191                                     exchangeId, cause.getClass().getName(), target, delay, execCount);
192                         }
193                         scope.scheduler.scheduleExecution(
194                                 request,
195                                 entityProducer,
196                                 scope,
197                                 (r, e, s, c) -> execute(r, e, s, chain, c),
198                                 asyncExecCallback,
199                                 delay);
200                         return;
201                     }
202                 }
203                 asyncExecCallback.failed(cause);
204             }
205 
206         });
207 
208     }
209 
210     @Override
211     public void execute(
212             final HttpRequest request,
213             final AsyncEntityProducer entityProducer,
214             final AsyncExecChain.Scope scope,
215             final AsyncExecChain chain,
216             final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
217         final State state = new State();
218         state.retrying = false;
219         internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
220     }
221 
222 }