1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.classic;
28
29
30 import static org.junit.jupiter.api.Assertions.assertEquals;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
32
33 import java.time.Instant;
34 import java.util.Map;
35 import java.util.Random;
36 import java.util.concurrent.BrokenBarrierException;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.CyclicBarrier;
39
40 import org.apache.hc.client5.http.HttpRoute;
41 import org.apache.hc.client5.http.classic.BackoffManager;
42 import org.apache.hc.core5.http.HttpHost;
43 import org.apache.hc.core5.util.TimeValue;
44 import org.junit.jupiter.api.BeforeEach;
45 import org.junit.jupiter.api.Test;
46
47 class TestAIMDBackoffManager {
48
49 private AIMDBackoffManager impl;
50 private MockConnPoolControl connPerRoute;
51 private HttpRoute route;
52 private static final long DEFAULT_COOL_DOWN_MS = 10;
53
54
55 @BeforeEach
56 void setUp() {
57 connPerRoute = new MockConnPoolControl();
58 route = new HttpRoute(new HttpHost("localhost", 80));
59 impl = new AIMDBackoffManager(connPerRoute);
60 impl.setPerHostConnectionCap(10);
61 impl.setCoolDown(TimeValue.ofMilliseconds(DEFAULT_COOL_DOWN_MS));
62
63 }
64
65 @Test
66 void isABackoffManager() {
67 assertTrue(impl instanceof BackoffManager);
68 }
69
70 @Test
71 void halvesConnectionsOnBackoff() {
72 connPerRoute.setMaxPerRoute(route, 4);
73 impl.backOff(route);
74 assertEquals(2, connPerRoute.getMaxPerRoute(route));
75 }
76
77 @Test
78 void doesNotBackoffBelowOneConnection() {
79 connPerRoute.setMaxPerRoute(route, 1);
80 impl.backOff(route);
81 assertEquals(1, connPerRoute.getMaxPerRoute(route));
82 }
83
84 @Test
85 void increasesByOneOnProbe() {
86 connPerRoute.setMaxPerRoute(route, 2);
87 impl.probe(route);
88 assertEquals(3, connPerRoute.getMaxPerRoute(route));
89 }
90
91 @Test
92 void doesNotIncreaseBeyondPerHostMaxOnProbe() {
93 connPerRoute.setDefaultMaxPerRoute(5);
94 connPerRoute.setMaxPerRoute(route, 5);
95 impl.setPerHostConnectionCap(5);
96 impl.probe(route);
97 assertEquals(5, connPerRoute.getMaxPerRoute(route));
98 }
99
100 @Test
101 void backoffDoesNotAdjustDuringCoolDownPeriod() {
102
103 connPerRoute.setMaxPerRoute(route, 4);
104
105
106 impl.backOff(route);
107 final long max1 = connPerRoute.getMaxPerRoute(route);
108
109
110 final Map<HttpRoute, Instant> lastRouteBackoffs = impl.getLastRouteBackoffs();
111 lastRouteBackoffs.put(route, Instant.now().minusMillis(1));
112
113
114 impl.backOff(route);
115 final long max2 = connPerRoute.getMaxPerRoute(route);
116
117
118 assertEquals(max1, max2);
119 }
120
121
122 @Test
123 void backoffStillAdjustsAfterCoolDownPeriod() {
124
125 connPerRoute.setMaxPerRoute(route, 8);
126
127
128 impl.backOff(route);
129 final long initialMax = connPerRoute.getMaxPerRoute(route);
130
131
132 final Map<HttpRoute, Instant> lastRouteBackoffs = impl.getLastRouteBackoffs();
133 lastRouteBackoffs.put(route, Instant.now().minusMillis(DEFAULT_COOL_DOWN_MS + 1));
134
135
136 impl.backOff(route);
137 final long finalMax = connPerRoute.getMaxPerRoute(route);
138
139
140 if (initialMax != 1) {
141 assertTrue(finalMax < initialMax, "Max connections should decrease after cooldown");
142 } else {
143 assertEquals(1, finalMax, "Max connections should remain 1 if it's already at the minimum");
144 }
145 }
146
147
148 @Test
149 void probeDoesNotAdjustDuringCooldownPeriod() {
150
151 connPerRoute.setMaxPerRoute(route, 4);
152
153
154 impl.probe(route);
155 final long max1 = connPerRoute.getMaxPerRoute(route);
156
157
158 final Map<HttpRoute, Instant> lastRouteProbes = impl.getLastRouteProbes();
159 lastRouteProbes.put(route, Instant.now().minusMillis(1));
160
161
162 impl.probe(route);
163 final long max2 = connPerRoute.getMaxPerRoute(route);
164
165
166 assertEquals(max1, max2);
167 }
168
169
170 @Test
171 void probeStillAdjustsAfterCoolDownPeriod() {
172 connPerRoute.setMaxPerRoute(route, 8);
173
174
175 impl.probe(route);
176 final long max = connPerRoute.getMaxPerRoute(route);
177
178
179 final Map<HttpRoute, Instant> lastRouteProbes = impl.getLastRouteProbes();
180 lastRouteProbes.put(route, Instant.now().minusMillis(DEFAULT_COOL_DOWN_MS + 1));
181
182
183 impl.probe(route);
184
185
186 assertTrue(max < connPerRoute.getMaxPerRoute(route));
187 }
188
189
190 @Test
191 void willBackoffImmediatelyEvenAfterAProbe() {
192 connPerRoute.setMaxPerRoute(route, 8);
193 impl.probe(route);
194 final long max = connPerRoute.getMaxPerRoute(route);
195 impl.backOff(route);
196 assertTrue(connPerRoute.getMaxPerRoute(route) < max);
197 }
198
199 @Test
200 void backOffFactorIsConfigurable() {
201 connPerRoute.setMaxPerRoute(route, 10);
202 impl.setBackoffFactor(0.9);
203 impl.backOff(route);
204 assertEquals(9, connPerRoute.getMaxPerRoute(route));
205 }
206
207 @Test
208 void coolDownPeriodIsConfigurable() {
209 final long cd = new Random().nextInt(500) + 500;
210 impl.setCoolDown(TimeValue.ofMilliseconds(cd));
211
212
213 impl.probe(route);
214 final int max0 = connPerRoute.getMaxPerRoute(route);
215
216
217 final Map<HttpRoute, Instant> lastRouteProbes = impl.getLastRouteProbes();
218 lastRouteProbes.put(route, Instant.now().minusMillis(cd / 2));
219
220
221 impl.probe(route);
222 assertEquals(max0, connPerRoute.getMaxPerRoute(route));
223
224
225 lastRouteProbes.put(route, Instant.now().minusMillis(cd + 1));
226
227
228 impl.probe(route);
229 assertTrue(max0 < connPerRoute.getMaxPerRoute(route));
230 }
231
232 @Test
233 void testConcurrency() throws InterruptedException {
234 final int initialMaxPerRoute = 10;
235 final int numberOfThreads = 20;
236 final int numberOfOperationsPerThread = 100;
237
238
239 final CyclicBarrier barrier = new CyclicBarrier(numberOfThreads);
240
241 final CountDownLatch latch = new CountDownLatch(numberOfThreads);
242
243 for (int i = 0; i < numberOfThreads; i++) {
244 final HttpRoute threadRoute = new HttpRoute(new HttpHost("localhost", 8080 + i));
245 connPerRoute.setMaxPerRoute(threadRoute, initialMaxPerRoute);
246
247 new Thread(() -> {
248 try {
249
250 barrier.await();
251
252
253 for (int j = 0; j < numberOfOperationsPerThread; j++) {
254 if (Math.random() < 0.5) {
255 impl.backOff(threadRoute);
256 } else {
257 impl.probe(threadRoute);
258 }
259 }
260 } catch (InterruptedException | BrokenBarrierException e) {
261 Thread.currentThread().interrupt();
262 } finally {
263 latch.countDown();
264 }
265 }).start();
266 }
267
268 latch.await();
269
270
271 for (int i = 0; i < numberOfThreads; i++) {
272 final HttpRoute threadRoute = new HttpRoute(new HttpHost("localhost", 8080 + i));
273 final int finalMaxPerRoute = connPerRoute.getMaxPerRoute(threadRoute);
274 assertTrue(finalMaxPerRoute >= 1 && finalMaxPerRoute <= initialMaxPerRoute + 7);
275 }
276 }
277 }