1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.async;
28
29 import java.io.IOException;
30
31 import org.apache.hc.client5.http.HttpRequestRetryStrategy;
32 import org.apache.hc.client5.http.HttpRoute;
33 import org.apache.hc.client5.http.async.AsyncExecCallback;
34 import org.apache.hc.client5.http.async.AsyncExecChain;
35 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
36 import org.apache.hc.client5.http.impl.ChainElement;
37 import org.apache.hc.client5.http.protocol.HttpClientContext;
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.Internal;
40 import org.apache.hc.core5.annotation.ThreadingBehavior;
41 import org.apache.hc.core5.http.EntityDetails;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.HttpRequest;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
46 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
47 import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
48 import org.apache.hc.core5.http.support.BasicRequestBuilder;
49 import org.apache.hc.core5.util.Args;
50 import org.apache.hc.core5.util.TimeValue;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @Contract(threading = ThreadingBehavior.STATELESS)
84 @Internal
85 public final class AsyncHttpRequestRetryExec implements AsyncExecChainHandler {
86
87 private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpRequestRetryExec.class);
88
89 private final HttpRequestRetryStrategy retryStrategy;
90
91 public AsyncHttpRequestRetryExec(final HttpRequestRetryStrategy retryStrategy) {
92 Args.notNull(retryStrategy, "retryStrategy");
93 this.retryStrategy = retryStrategy;
94 }
95
96 private static class State {
97
98 volatile boolean retrying;
99 volatile TimeValue delay;
100
101 }
102
103 private void internalExecute(
104 final State state,
105 final HttpRequest request,
106 final AsyncEntityProducer entityProducer,
107 final AsyncExecChain.Scope scope,
108 final AsyncExecChain chain,
109 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
110
111 final String exchangeId = scope.exchangeId;
112
113 chain.proceed(BasicRequestBuilder.copy(request).build(), entityProducer, scope, new AsyncExecCallback() {
114
115 @Override
116 public AsyncDataConsumer handleResponse(
117 final HttpResponse response,
118 final EntityDetails entityDetails) throws HttpException, IOException {
119 final HttpClientContext clientContext = scope.clientContext;
120 if (entityProducer != null && !entityProducer.isRepeatable()) {
121 if (LOG.isDebugEnabled()) {
122 LOG.debug("{} cannot retry non-repeatable request", exchangeId);
123 }
124 return asyncExecCallback.handleResponse(response, entityDetails);
125 }
126 state.retrying = retryStrategy.retryRequest(response, scope.execCount.get(), clientContext);
127 if (state.retrying) {
128 state.delay = retryStrategy.getRetryInterval(response, scope.execCount.get(), clientContext);
129 if (LOG.isDebugEnabled()) {
130 LOG.debug("{} retrying request in {}", exchangeId, state.delay);
131 }
132 return new DiscardingEntityConsumer<>();
133 }
134 return asyncExecCallback.handleResponse(response, entityDetails);
135 }
136
137 @Override
138 public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
139 asyncExecCallback.handleInformationResponse(response);
140 }
141
142 @Override
143 public void completed() {
144 if (state.retrying) {
145 scope.execCount.incrementAndGet();
146 if (entityProducer != null) {
147 entityProducer.releaseResources();
148 }
149 scope.scheduler.scheduleExecution(
150 request,
151 entityProducer,
152 scope,
153 (r, e, s, c) -> execute(r, e, s, chain, c),
154 asyncExecCallback,
155 state.delay);
156 } else {
157 asyncExecCallback.completed();
158 }
159 }
160
161 @Override
162 public void failed(final Exception cause) {
163 if (cause instanceof IOException) {
164 final HttpRoute route = scope.route;
165 final HttpClientContext clientContext = scope.clientContext;
166 if (entityProducer != null && !entityProducer.isRepeatable()) {
167 if (LOG.isDebugEnabled()) {
168 LOG.debug("{} cannot retry non-repeatable request", exchangeId);
169 }
170 } else if (retryStrategy.retryRequest(request, (IOException) cause, scope.execCount.get(), clientContext)) {
171 if (LOG.isDebugEnabled()) {
172 LOG.debug("{} {}", exchangeId, cause.getMessage(), cause);
173 }
174 if (LOG.isInfoEnabled()) {
175 LOG.info("Recoverable I/O exception ({}) caught when processing request to {}",
176 cause.getClass().getName(), route);
177 }
178 scope.execRuntime.discardEndpoint();
179 if (entityProducer != null) {
180 entityProducer.releaseResources();
181 }
182 state.retrying = true;
183 final int execCount = scope.execCount.incrementAndGet();
184 state.delay = retryStrategy.getRetryInterval(request, (IOException) cause, execCount - 1, clientContext);
185 scope.scheduler.scheduleExecution(
186 request,
187 entityProducer,
188 scope,
189 (r, e, s, c) -> execute(r, e, s, chain, c),
190 asyncExecCallback,
191 state.delay);
192 return;
193 }
194 }
195 asyncExecCallback.failed(cause);
196 }
197
198 });
199
200 }
201
202 @Override
203 public void execute(
204 final HttpRequest request,
205 final AsyncEntityProducer entityProducer,
206 final AsyncExecChain.Scope scope,
207 final AsyncExecChain chain,
208 final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
209 final State state = new State();
210 state.retrying = false;
211 internalExecute(state, request, entityProducer, scope, chain, asyncExecCallback);
212 }
213
214 }