1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30
31 import org.apache.hc.client5.http.HttpRequestRetryStrategy;
32 import org.apache.hc.client5.http.async.AsyncExecCallback;
33 import org.apache.hc.client5.http.async.AsyncExecChain;
34 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
35 import org.apache.hc.client5.http.impl.ChainElement;
36 import org.apache.hc.client5.http.protocol.HttpClientContext;
37 import org.apache.hc.core5.annotation.Contract;
38 import org.apache.hc.core5.annotation.Internal;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.http.EntityDetails;
41 import org.apache.hc.core5.http.HttpException;
42 import org.apache.hc.core5.http.HttpHost;
43 import org.apache.hc.core5.http.HttpRequest;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
46 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
47 import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
48 import org.apache.hc.core5.http.support.BasicRequestBuilder;
49 import org.apache.hc.core5.util.Args;
50 import org.apache.hc.core5.util.TimeValue;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @Contract(threading = ThreadingBehavior.STATELESS)
84 @Internal
85 public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
86
87 private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpRequestRetryExec.class);
88
89 private final HttpRequestRetryStrategy retryStrategy;
90
91 public AsyncHttpRequestRetryExec(final HttpRequestRetryStrategy retryStrategy) {
92 Args.notNull(retryStrategy, "retryStrategy");
93 this.retryStrategy = retryStrategy;
94 }
95
96 private static class State {
97
98 volatile boolean retrying;
99 volatile int status;
100 volatile TimeValue delay;
101
102 }
103
104 private void internalExecute(
105 final State state,
106 final HttpRequest request,
107 final AsyncEntityProducer entityProducer,
108 final AsyncExecChain.Scope scope,
109 final AsyncExecChain chain,
110 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
111
112 final String exchangeId = scope.exchangeId;
113
114 chain.proceed(BasicRequestBuilder.copy(request).build(), entityProducer, scope, new AsyncExecCallback() {
115
116 @Override
117 public AsyncDataConsumer handleResponse(
118 final HttpResponse response,
119 final EntityDetails entityDetails) throws HttpException, IOException {
120 final HttpClientContext clientContext = scope.clientContext;
121 if (entityProducer != null && !entityProducer.isRepeatable()) {
122 if (LOG.isDebugEnabled()) {
123 LOG.debug("{} cannot retry non-repeatable request", exchangeId);
124 }
125 return asyncExecCallback.handleResponse(response, entityDetails);
126 }
127 state.retrying = retryStrategy.retryRequest(response, scope.execCount.get(), clientContext);
128 if (state.retrying) {
129 state.status = response.getCode();
130 state.delay = retryStrategy.getRetryInterval(response, scope.execCount.get(), clientContext);
131 return new DiscardingEntityConsumer<>();
132 }
133 return asyncExecCallback.handleResponse(response, entityDetails);
134 }
135
136 @Override
137 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
138 asyncExecCallback.handleInformationResponse(response);
139 }
140
141 @Override
142 public void completed() {
143 if (state.retrying) {
144 final int execCount = scope.execCount.incrementAndGet();
145 if (entityProducer != null) {
146 entityProducer.releaseResources();
147 }
148 final HttpHost target = scope.route.getTargetHost();
149 final TimeValue delay = TimeValue.isPositive(state.delay) ? state.delay : TimeValue.ZERO_MILLISECONDS;
150 if (LOG.isInfoEnabled()) {
151 LOG.info("{} {} responded with status {}; " +
152 "request will be automatically re-executed in {} (exec count {})",
153 exchangeId, target, state.status, delay, execCount);
154 }
155 scope.scheduler.scheduleExecution(
156 request,
157 entityProducer,
158 scope,
159 (r, e, s, c) -> execute(r, e, s, chain, c),
160 asyncExecCallback,
161 delay);
162 } else {
163 asyncExecCallback.completed();
164 }
165 }
166
167 @Override
168 public void failed(final Exception cause) {
169 if (cause instanceof IOException) {
170 final HttpHost target = scope.route.getTargetHost();
171 final HttpClientContext clientContext = scope.clientContext;
172 if (entityProducer != null && !entityProducer.isRepeatable()) {
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("{} cannot retry non-repeatable request", exchangeId);
175 }
176 } else if (retryStrategy.retryRequest(request, (IOException) cause, scope.execCount.get(), clientContext)) {
177 if (LOG.isDebugEnabled()) {
178 LOG.debug("{} {}", exchangeId, cause.getMessage(), cause);
179 }
180 scope.execRuntime.discardEndpoint();
181 if (entityProducer != null) {
182 entityProducer.releaseResources();
183 }
184 state.retrying = true;
185 final int execCount = scope.execCount.incrementAndGet();
186 state.delay = retryStrategy.getRetryInterval(request, (IOException) cause, execCount - 1, clientContext);
187 final TimeValue delay = TimeValue.isPositive(state.delay) ? state.delay : TimeValue.ZERO_MILLISECONDS;
188 if (LOG.isInfoEnabled()) {
189 LOG.info("{} recoverable I/O exception ({}) caught when sending request to {};" +
190 "request will be automatically re-executed in {} (exec count {})",
191 exchangeId, cause.getClass().getName(), target, delay, execCount);
192 }
193 scope.scheduler.scheduleExecution(
194 request,
195 entityProducer,
196 scope,
197 (r, e, s, c) -> execute(r, e, s, chain, c),
198 asyncExecCallback,
199 delay);
200 return;
201 }
202 }
203 asyncExecCallback.failed(cause);
204 }
205
206 });
207
208 }
209
210 @Override
211 public void execute(
212 final HttpRequest request,
213 final AsyncEntityProducer entityProducer,
214 final AsyncExecChain.Scope scope,
215 final AsyncExecChain chain,
216 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
217 final State state = new State();
218 state.retrying = false;
219 internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
220 }
221
222 }