• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.core.internal.retry;
17 
18 import java.util.OptionalDouble;
19 import software.amazon.awssdk.annotations.SdkInternalApi;
20 import software.amazon.awssdk.annotations.SdkTestInternalApi;
21 import software.amazon.awssdk.core.exception.SdkClientException;
22 
23 @SdkInternalApi
24 public class RateLimitingTokenBucket {
25     private static final double MIN_FILL_RATE = 0.5;
26     private static final double MIN_CAPACITY = 1.0;
27 
28     private static final double SMOOTH = 0.8;
29     private static final double BETA = 0.7;
30     private static final double SCALE_CONSTANT = 0.4;
31 
32     private final Clock clock;
33 
34     private Double fillRate;
35     private Double maxCapacity;
36     private double currentCapacity;
37     private Double lastTimestamp;
38     private boolean enabled;
39     private double measuredTxRate;
40     private double lastTxRateBucket;
41     private long requestCount;
42     private double lastMaxRate;
43     private double lastThrottleTime;
44 
45     private double timeWindow;
46 
47     public interface Clock {
time()48         double time();
49     }
50 
RateLimitingTokenBucket()51     public RateLimitingTokenBucket() {
52         clock = new DefaultClock();
53         initialize();
54     }
55 
56     @SdkTestInternalApi
RateLimitingTokenBucket(Clock clock)57     RateLimitingTokenBucket(Clock clock) {
58         this.clock = clock;
59         initialize();
60     }
61 
62     /**
63      *
64      * Acquire tokens from the bucket. If the bucket contains enough capacity
65      * to satisfy the request, this method will return immediately, otherwise
66      * the method will block the calling thread until enough tokens are refilled.
67      * <p>
68      * <pre>
69      * _TokenBucketAcquire(amount)
70      *   # Client side throttling is not enabled until we see a throttling error.
71      *   if not enabled
72      *     return
73      *
74      *   _TokenBucketRefill()
75      *   # Next see if we have enough capacity for the requested amount.
76      *   if amount &lt;= current_capacity
77      *     current_capacity = current_capacity - amount
78      *   else
79      *     sleep((amount - current_capacity) / fill_rate)
80      *     current_capacity = current_capacity - amount
81      *   return
82      * </pre>
83      * <p>
84      * This is equivalent to {@code acquire(amount, false)}.
85      *
86      * @param amount The amount of tokens to acquire.
87      *
88      * @return Whether the amount was successfully acquired.
89      */
acquire(double amount)90     public boolean acquire(double amount) {
91         return acquire(amount, false);
92     }
93 
94     /**
95      *
96      * Acquire tokens from the bucket. If the bucket contains enough capacity
97      * to satisfy the request, this method will return immediately. Otherwise,
98      * the behavior depends on the value of {@code fastFail}. If it is {@code
99      * true}, then it will return {@code false} immediately, signaling that
100      * enough capacity could not be acquired. Otherwise if {@code fastFail} is
101      * {@code false}, then it will wait the required amount of time to fill the
102      * bucket with enough tokens to satisfy {@code amount}.
103      * <pre>
104      * _TokenBucketAcquire(amount)
105      *   # Client side throttling is not enabled until we see a throttling error.
106      *   if not enabled
107      *     return
108      *
109      *   _TokenBucketRefill()
110      *   # Next see if we have enough capacity for the requested amount.
111      *   if amount &lt;= current_capacity
112      *     current_capacity = current_capacity - amount
113      *   else
114      *     sleep((amount - current_capacity) / fill_rate)
115      *     current_capacity = current_capacity - amount
116      *   return
117      * </pre>
118      *
119      * @param amount The amount of tokens to acquire.
120      * @param fastFail Whether this method should return immediately instead
121      *                 of waiting if {@code amount} exceeds the current
122      *                 capacity.
123      *
124      * @return Whether the amount was successfully acquired.
125      */
acquire(double amount, boolean fastFail)126     public boolean acquire(double amount, boolean fastFail) {
127         OptionalDouble waitTime = acquireNonBlocking(amount, fastFail);
128 
129         if (!waitTime.isPresent()) {
130             return false;
131         }
132 
133         double t = waitTime.getAsDouble();
134         if (t > 0.0) {
135             sleep(t);
136         }
137 
138         return true;
139     }
140 
141     /**
142      * Acquire capacity from the rate limiter without blocking the call.
143      * <p>
144      * This method returns an {@code OptionalDouble} whose value, or its absence correspond to the following states:
145      * <ul>
146      *     <li>Empty - If the value is not present, then the call fast failed, and no capacity was acquired.</li>
147      *     <li>Present - if the value is present, then the value is the time in seconds that caller must wait before
148      *     executing the request to be within the rate imposed by the rate limiter./li>
149      * </ul>
150      *
151      * @return The amount of time in seconds to wait before proceeding.
152      */
acquireNonBlocking(double amount, boolean fastFail)153     public OptionalDouble acquireNonBlocking(double amount, boolean fastFail) {
154         double waitTime = 0.0;
155 
156         synchronized (this) {
157             // If rate limiting is not enabled, we technically have an uncapped limit
158             if (!enabled) {
159                 return OptionalDouble.of(0.0);
160             }
161 
162             refill();
163 
164             double originalCapacity = currentCapacity;
165             double unfulfilled = tryAcquireCapacity(amount);
166 
167             if (unfulfilled > 0.0 && fastFail) {
168                 currentCapacity = originalCapacity;
169                 return OptionalDouble.empty();
170             }
171 
172             // If all the tokens couldn't be acquired immediately, wait enough
173             // time to fill the remainder.
174             if (unfulfilled > 0) {
175                 waitTime = unfulfilled / fillRate;
176             }
177         }
178 
179         return OptionalDouble.of(waitTime);
180     }
181 
182     /**
183      *
184      * @param amount The amount of capacity to acquire from the bucket.
185      * @return The unfulfilled amount.
186      */
tryAcquireCapacity(double amount)187     double tryAcquireCapacity(double amount) {
188         double result;
189         if (amount <= currentCapacity) {
190             result = 0;
191         } else {
192             result = amount - currentCapacity;
193         }
194         currentCapacity = currentCapacity - amount;
195         return result;
196     }
197 
initialize()198     private void initialize() {
199         fillRate = null;
200         maxCapacity = null;
201         currentCapacity = 0.0;
202         lastTimestamp = null;
203         enabled = false;
204         measuredTxRate = 0.0;
205         lastTxRateBucket = Math.floor(clock.time());
206         requestCount = 0;
207         lastMaxRate = 0.0;
208         lastThrottleTime = clock.time();
209     }
210 
211     /**
212      * <pre>
213      * _TokenBucketRefill()
214      *   timestamp = time()
215      *   if last_timestamp is unset
216      *     last_timestamp = timestamp
217      *     return
218      *   fill_amount = (timestamp - last_timestamp) * fill_rate
219      *   current_capacity = min(max_capacity, current_capacity + fill_amount)
220      *   last_timestamp = timestamp
221      * </pre>
222      */
223     // Package private for testing
refill()224     synchronized void refill() {
225         double timestamp = clock.time();
226         if (lastTimestamp == null) {
227             lastTimestamp = timestamp;
228             return;
229         }
230 
231         double fillAmount = (timestamp - lastTimestamp) * fillRate;
232         currentCapacity = Math.min(maxCapacity, currentCapacity + fillAmount);
233         lastTimestamp = timestamp;
234     }
235 
236     /**
237      * <pre>
238      * _TokenBucketUpdateRate(new_rps)
239      *   # Refill based on our current rate before we update to the new fill rate.
240      *   _TokenBucketRefill()
241      *   fill_rate = max(new_rps, MIN_FILL_RATE)
242      *   max_capacity = max(new_rps, MIN_CAPACITY)
243      *   # When we scale down we can't have a current capacity that exceeds our
244      *   # max_capacity.
245      *   current_capacity = min(current_capacity, max_capacity)
246      * </pre>
247      */
updateRate(double newRps)248     private synchronized void updateRate(double newRps) {
249         refill();
250         fillRate = Math.max(newRps, MIN_FILL_RATE);
251         maxCapacity = Math.max(newRps, MIN_CAPACITY);
252         currentCapacity = Math.min(currentCapacity, maxCapacity);
253     }
254 
255     /**
256      * <pre>
257      * t = time()
258      * time_bucket = floor(t * 2) / 2
259      * request_count = request_count + 1
260      * if time_bucket > last_tx_rate_bucket
261      *   current_rate = request_count / (time_bucket - last_tx_rate_bucket)
262      *   measured_tx_rate = (current_rate * SMOOTH) + (measured_tx_rate * (1 - SMOOTH))
263      *   request_count = 0
264      *   last_tx_rate_bucket = time_bucket
265      * </pre>
266      */
updateMeasuredRate()267     private synchronized void updateMeasuredRate() {
268         double t = clock.time();
269         double timeBucket = Math.floor(t * 2) / 2;
270         requestCount = requestCount + 1;
271         if (timeBucket > lastTxRateBucket) {
272             double currentRate = requestCount / (timeBucket - lastTxRateBucket);
273             measuredTxRate = (currentRate * SMOOTH) + (measuredTxRate * (1 - SMOOTH));
274             requestCount = 0;
275             lastTxRateBucket = timeBucket;
276         }
277     }
278 
enable()279     synchronized void enable() {
280         enabled = true;
281     }
282 
283     /**
284      * <pre>
285      * _UpdateClientSendingRate(response)
286      *   _UpdateMeasuredRate()
287      *
288      *   if IsThrottlingError(response)
289      *     if not enabled
290      *       rate_to_use = measured_tx_rate
291      *     else
292      *       rate_to_use = min(measured_tx_rate, fill_rate)
293      *
294      *     # The fill_rate is from the token bucket.
295      *     last_max_rate = rate_to_use
296      *     _CalculateTimeWindow()
297      *     last_throttle_time = time()
298      *     calculated_rate = _CUBICThrottle(rate_to_use)
299      *     TokenBucketEnable()
300      *   else
301      *     _CalculateTimeWindow()
302      *     calculated_rate = _CUBICSuccess(time())
303      *
304      *   new_rate = min(calculated_rate, 2 * measured_tx_rate)
305      *   _TokenBucketUpdateRate(new_rate)
306      * </pre>
307      */
updateClientSendingRate(boolean throttlingResponse)308     public synchronized void updateClientSendingRate(boolean throttlingResponse) {
309         updateMeasuredRate();
310 
311         double calculatedRate;
312         if (throttlingResponse) {
313             double rateToUse;
314             if (!enabled) {
315                 rateToUse = measuredTxRate;
316             } else {
317                 rateToUse = Math.min(measuredTxRate, fillRate);
318             }
319 
320             lastMaxRate = rateToUse;
321             calculateTimeWindow();
322             lastThrottleTime = clock.time();
323             calculatedRate = cubicThrottle(rateToUse);
324             enable();
325         } else {
326             calculateTimeWindow();
327             calculatedRate = cubicSuccess(clock.time());
328         }
329 
330         double newRate = Math.min(calculatedRate, 2 * measuredTxRate);
331         updateRate(newRate);
332     }
333 
334     /**
335      * <pre>
336      * _CalculateTimeWindow()
337      *   # This is broken out into a separate calculation because it only
338      *   # gets updated when last_max_rate change so it can be cached.
339      *   _time_window = ((last_max_rate * (1 - BETA)) / SCALE_CONSTANT) ^ (1 / 3)
340      * </pre>
341      */
342     // Package private for testing
calculateTimeWindow()343     synchronized void calculateTimeWindow() {
344         timeWindow = Math.pow((lastMaxRate * (1 - BETA)) / SCALE_CONSTANT, 1.0 / 3);
345     }
346 
347     /**
348      * Sleep for a given amount of seconds.
349      *
350      * @param seconds The amount of time to sleep in seconds.
351      */
sleep(double seconds)352     void sleep(double seconds) {
353         long millisToSleep = (long) (seconds * 1000);
354         try {
355             Thread.sleep(millisToSleep);
356         } catch (InterruptedException ie) {
357             Thread.currentThread().interrupt();
358             throw SdkClientException.create("Sleep interrupted", ie);
359         }
360     }
361 
362     /**
363      * <pre>
364      * _CUBICThrottle(rate_to_use)
365      *   calculated_rate = rate_to_use * BETA
366      *   return calculated_rate
367      * </pre>
368      */
369     // Package private for testing
cubicThrottle(double rateToUse)370     double cubicThrottle(double rateToUse) {
371         double calculatedRate = rateToUse * BETA;
372         return calculatedRate;
373     }
374 
375     /**
376      * <pre>
377      * _CUBICSuccess(timestamp)
378      *   dt = timestamp - last_throttle_time
379      *   calculated_rate = (SCALE_CONSTANT * ((dt - _time_window) ^ 3)) + last_max_rate
380      *   return calculated_rate
381      * </pre>
382      */
383     // Package private for testing
cubicSuccess(double timestamp)384     synchronized double cubicSuccess(double timestamp) {
385         double dt = timestamp - lastThrottleTime;
386         double calculatedRate = SCALE_CONSTANT * Math.pow(dt - timeWindow, 3) + lastMaxRate;
387         return calculatedRate;
388     }
389 
390     static class DefaultClock implements Clock {
391         @Override
time()392         public double time() {
393             long timeMillis = System.nanoTime();
394             return timeMillis / 1000000000.;
395         }
396     }
397 
398     @SdkTestInternalApi
setLastMaxRate(double lastMaxRate)399     synchronized void setLastMaxRate(double lastMaxRate) {
400         this.lastMaxRate = lastMaxRate;
401     }
402 
403     @SdkTestInternalApi
setLastThrottleTime(double lastThrottleTime)404     synchronized void setLastThrottleTime(double lastThrottleTime) {
405         this.lastThrottleTime = lastThrottleTime;
406     }
407 
408     @SdkTestInternalApi
getMeasuredTxRate()409     synchronized double getMeasuredTxRate() {
410         return measuredTxRate;
411     }
412 
413     @SdkTestInternalApi
getFillRate()414     synchronized double getFillRate() {
415         return fillRate;
416     }
417 
418     @SdkTestInternalApi
setCurrentCapacity(double currentCapacity)419     synchronized void setCurrentCapacity(double currentCapacity) {
420         this.currentCapacity = currentCapacity;
421     }
422 
423     @SdkTestInternalApi
getCurrentCapacity()424     synchronized double getCurrentCapacity() {
425         return currentCapacity;
426     }
427 
428     @SdkTestInternalApi
setFillRate(double fillRate)429     synchronized void setFillRate(double fillRate) {
430         this.fillRate = fillRate;
431     }
432 }
433