1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  package org.apache.hc.client5.http.impl.cache;
28  
29  import java.io.Closeable;
30  import java.io.IOException;
31  import java.util.HashSet;
32  import java.util.Set;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.ScheduledExecutorService;
36  import java.util.concurrent.ScheduledFuture;
37  import java.util.concurrent.ScheduledThreadPoolExecutor;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
41  import org.apache.hc.client5.http.schedule.SchedulingStrategy;
42  import org.apache.hc.core5.util.Args;
43  import org.apache.hc.core5.util.TimeValue;
44  import org.apache.hc.core5.util.Timeout;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  
48  
49  
50  
51  class CacheRevalidatorBase implements Closeable {
52  
53      private final ReentrantLock lock;
54  
55      interface ScheduledExecutor {
56  
57          Future<?> schedule(Runnable command, TimeValue timeValue) throws RejectedExecutionException;
58  
59          void shutdown();
60  
61          void awaitTermination(final Timeout timeout) throws InterruptedException;
62  
63      }
64  
65      public static ScheduledExecutor wrap(final ScheduledExecutorService executorService) {
66  
67          return new ScheduledExecutor() {
68  
69              @Override
70              public ScheduledFuture<?> schedule(final Runnable command, final TimeValue timeValue) throws RejectedExecutionException {
71                  Args.notNull(command, "Runnable");
72                  Args.notNull(timeValue, "Time value");
73                  return executorService.schedule(command, timeValue.getDuration(), timeValue.getTimeUnit());
74              }
75  
76              @Override
77              public void shutdown() {
78                  executorService.shutdown();
79              }
80  
81              @Override
82              public void awaitTermination(final Timeout timeout) throws InterruptedException {
83                  Args.notNull(timeout, "Timeout");
84                  executorService.awaitTermination(timeout.getDuration(), timeout.getTimeUnit());
85              }
86  
87          };
88  
89      }
90  
91      private final ScheduledExecutor scheduledExecutor;
92      private final SchedulingStrategy schedulingStrategy;
93      private final Set<String> pendingRequest;
94      private final ConcurrentCountMap<String> failureCache;
95  
96      private static final Logger LOG = LoggerFactory.getLogger(CacheRevalidatorBase.class);
97  
98      
99  
100 
101 
102     public CacheRevalidatorBase(
103             final ScheduledExecutor scheduledExecutor,
104             final SchedulingStrategy schedulingStrategy) {
105         this.scheduledExecutor = scheduledExecutor;
106         this.schedulingStrategy = schedulingStrategy;
107         this.pendingRequest = new HashSet<>();
108         this.failureCache = new ConcurrentCountMap<>();
109         this.lock = new ReentrantLock();
110     }
111 
112     
113 
114 
115 
116     public CacheRevalidatorBase(
117             final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor,
118             final SchedulingStrategy schedulingStrategy) {
119         this(wrap(scheduledThreadPoolExecutor), schedulingStrategy);
120     }
121 
122     
123 
124 
125     void scheduleRevalidation(final String cacheKey, final Runnable command) {
126         lock.lock();
127         try {
128             if (!pendingRequest.contains(cacheKey)) {
129                 final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
130                 final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
131                 try {
132                     scheduledExecutor.schedule(command, executionTime);
133                     pendingRequest.add(cacheKey);
134                 } catch (final RejectedExecutionException ex) {
135                     LOG.debug("Revalidation of cache entry with key {} could not be scheduled", cacheKey, ex);
136                 }
137             }
138         } finally {
139             lock.unlock();
140         }
141     }
142 
143     @Override
144     public void close() throws IOException {
145         scheduledExecutor.shutdown();
146     }
147 
148     public void awaitTermination(final Timeout timeout) throws InterruptedException {
149         Args.notNull(timeout, "Timeout");
150         scheduledExecutor.awaitTermination(timeout);
151     }
152 
153     void jobSuccessful(final String identifier) {
154         failureCache.resetCount(identifier);
155         lock.lock();
156         try {
157             pendingRequest.remove(identifier);
158         } finally {
159             lock.unlock();
160         }
161     }
162 
163     void jobFailed(final String identifier) {
164         failureCache.increaseCount(identifier);
165         lock.lock();
166         try {
167             pendingRequest.remove(identifier);
168         } finally {
169             lock.unlock();
170         }
171     }
172 
173     Set<String> getScheduledIdentifiers() {
174         lock.lock();
175         try {
176             return new HashSet<>(pendingRequest);
177         } finally {
178             lock.unlock();
179         }
180     }
181 
182 }