1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.core.internal.retry; 17 18 import java.util.OptionalDouble; 19 import software.amazon.awssdk.annotations.SdkInternalApi; 20 import software.amazon.awssdk.annotations.SdkTestInternalApi; 21 import software.amazon.awssdk.core.exception.SdkClientException; 22 23 @SdkInternalApi 24 public class RateLimitingTokenBucket { 25 private static final double MIN_FILL_RATE = 0.5; 26 private static final double MIN_CAPACITY = 1.0; 27 28 private static final double SMOOTH = 0.8; 29 private static final double BETA = 0.7; 30 private static final double SCALE_CONSTANT = 0.4; 31 32 private final Clock clock; 33 34 private Double fillRate; 35 private Double maxCapacity; 36 private double currentCapacity; 37 private Double lastTimestamp; 38 private boolean enabled; 39 private double measuredTxRate; 40 private double lastTxRateBucket; 41 private long requestCount; 42 private double lastMaxRate; 43 private double lastThrottleTime; 44 45 private double timeWindow; 46 47 public interface Clock { time()48 double time(); 49 } 50 RateLimitingTokenBucket()51 public RateLimitingTokenBucket() { 52 clock = new DefaultClock(); 53 initialize(); 54 } 55 56 @SdkTestInternalApi RateLimitingTokenBucket(Clock clock)57 RateLimitingTokenBucket(Clock clock) { 58 this.clock = clock; 59 initialize(); 60 } 61 62 /** 63 * 64 * Acquire tokens from the bucket. If the bucket contains enough capacity 65 * to satisfy the request, this method will return immediately, otherwise 66 * the method will block the calling thread until enough tokens are refilled. 67 * <p> 68 * <pre> 69 * _TokenBucketAcquire(amount) 70 * # Client side throttling is not enabled until we see a throttling error. 71 * if not enabled 72 * return 73 * 74 * _TokenBucketRefill() 75 * # Next see if we have enough capacity for the requested amount. 76 * if amount <= current_capacity 77 * current_capacity = current_capacity - amount 78 * else 79 * sleep((amount - current_capacity) / fill_rate) 80 * current_capacity = current_capacity - amount 81 * return 82 * </pre> 83 * <p> 84 * This is equivalent to {@code acquire(amount, false)}. 85 * 86 * @param amount The amount of tokens to acquire. 87 * 88 * @return Whether the amount was successfully acquired. 89 */ acquire(double amount)90 public boolean acquire(double amount) { 91 return acquire(amount, false); 92 } 93 94 /** 95 * 96 * Acquire tokens from the bucket. If the bucket contains enough capacity 97 * to satisfy the request, this method will return immediately. Otherwise, 98 * the behavior depends on the value of {@code fastFail}. If it is {@code 99 * true}, then it will return {@code false} immediately, signaling that 100 * enough capacity could not be acquired. Otherwise if {@code fastFail} is 101 * {@code false}, then it will wait the required amount of time to fill the 102 * bucket with enough tokens to satisfy {@code amount}. 103 * <pre> 104 * _TokenBucketAcquire(amount) 105 * # Client side throttling is not enabled until we see a throttling error. 106 * if not enabled 107 * return 108 * 109 * _TokenBucketRefill() 110 * # Next see if we have enough capacity for the requested amount. 111 * if amount <= current_capacity 112 * current_capacity = current_capacity - amount 113 * else 114 * sleep((amount - current_capacity) / fill_rate) 115 * current_capacity = current_capacity - amount 116 * return 117 * </pre> 118 * 119 * @param amount The amount of tokens to acquire. 120 * @param fastFail Whether this method should return immediately instead 121 * of waiting if {@code amount} exceeds the current 122 * capacity. 123 * 124 * @return Whether the amount was successfully acquired. 125 */ acquire(double amount, boolean fastFail)126 public boolean acquire(double amount, boolean fastFail) { 127 OptionalDouble waitTime = acquireNonBlocking(amount, fastFail); 128 129 if (!waitTime.isPresent()) { 130 return false; 131 } 132 133 double t = waitTime.getAsDouble(); 134 if (t > 0.0) { 135 sleep(t); 136 } 137 138 return true; 139 } 140 141 /** 142 * Acquire capacity from the rate limiter without blocking the call. 143 * <p> 144 * This method returns an {@code OptionalDouble} whose value, or its absence correspond to the following states: 145 * <ul> 146 * <li>Empty - If the value is not present, then the call fast failed, and no capacity was acquired.</li> 147 * <li>Present - if the value is present, then the value is the time in seconds that caller must wait before 148 * executing the request to be within the rate imposed by the rate limiter./li> 149 * </ul> 150 * 151 * @return The amount of time in seconds to wait before proceeding. 152 */ acquireNonBlocking(double amount, boolean fastFail)153 public OptionalDouble acquireNonBlocking(double amount, boolean fastFail) { 154 double waitTime = 0.0; 155 156 synchronized (this) { 157 // If rate limiting is not enabled, we technically have an uncapped limit 158 if (!enabled) { 159 return OptionalDouble.of(0.0); 160 } 161 162 refill(); 163 164 double originalCapacity = currentCapacity; 165 double unfulfilled = tryAcquireCapacity(amount); 166 167 if (unfulfilled > 0.0 && fastFail) { 168 currentCapacity = originalCapacity; 169 return OptionalDouble.empty(); 170 } 171 172 // If all the tokens couldn't be acquired immediately, wait enough 173 // time to fill the remainder. 174 if (unfulfilled > 0) { 175 waitTime = unfulfilled / fillRate; 176 } 177 } 178 179 return OptionalDouble.of(waitTime); 180 } 181 182 /** 183 * 184 * @param amount The amount of capacity to acquire from the bucket. 185 * @return The unfulfilled amount. 186 */ tryAcquireCapacity(double amount)187 double tryAcquireCapacity(double amount) { 188 double result; 189 if (amount <= currentCapacity) { 190 result = 0; 191 } else { 192 result = amount - currentCapacity; 193 } 194 currentCapacity = currentCapacity - amount; 195 return result; 196 } 197 initialize()198 private void initialize() { 199 fillRate = null; 200 maxCapacity = null; 201 currentCapacity = 0.0; 202 lastTimestamp = null; 203 enabled = false; 204 measuredTxRate = 0.0; 205 lastTxRateBucket = Math.floor(clock.time()); 206 requestCount = 0; 207 lastMaxRate = 0.0; 208 lastThrottleTime = clock.time(); 209 } 210 211 /** 212 * <pre> 213 * _TokenBucketRefill() 214 * timestamp = time() 215 * if last_timestamp is unset 216 * last_timestamp = timestamp 217 * return 218 * fill_amount = (timestamp - last_timestamp) * fill_rate 219 * current_capacity = min(max_capacity, current_capacity + fill_amount) 220 * last_timestamp = timestamp 221 * </pre> 222 */ 223 // Package private for testing refill()224 synchronized void refill() { 225 double timestamp = clock.time(); 226 if (lastTimestamp == null) { 227 lastTimestamp = timestamp; 228 return; 229 } 230 231 double fillAmount = (timestamp - lastTimestamp) * fillRate; 232 currentCapacity = Math.min(maxCapacity, currentCapacity + fillAmount); 233 lastTimestamp = timestamp; 234 } 235 236 /** 237 * <pre> 238 * _TokenBucketUpdateRate(new_rps) 239 * # Refill based on our current rate before we update to the new fill rate. 240 * _TokenBucketRefill() 241 * fill_rate = max(new_rps, MIN_FILL_RATE) 242 * max_capacity = max(new_rps, MIN_CAPACITY) 243 * # When we scale down we can't have a current capacity that exceeds our 244 * # max_capacity. 245 * current_capacity = min(current_capacity, max_capacity) 246 * </pre> 247 */ updateRate(double newRps)248 private synchronized void updateRate(double newRps) { 249 refill(); 250 fillRate = Math.max(newRps, MIN_FILL_RATE); 251 maxCapacity = Math.max(newRps, MIN_CAPACITY); 252 currentCapacity = Math.min(currentCapacity, maxCapacity); 253 } 254 255 /** 256 * <pre> 257 * t = time() 258 * time_bucket = floor(t * 2) / 2 259 * request_count = request_count + 1 260 * if time_bucket > last_tx_rate_bucket 261 * current_rate = request_count / (time_bucket - last_tx_rate_bucket) 262 * measured_tx_rate = (current_rate * SMOOTH) + (measured_tx_rate * (1 - SMOOTH)) 263 * request_count = 0 264 * last_tx_rate_bucket = time_bucket 265 * </pre> 266 */ updateMeasuredRate()267 private synchronized void updateMeasuredRate() { 268 double t = clock.time(); 269 double timeBucket = Math.floor(t * 2) / 2; 270 requestCount = requestCount + 1; 271 if (timeBucket > lastTxRateBucket) { 272 double currentRate = requestCount / (timeBucket - lastTxRateBucket); 273 measuredTxRate = (currentRate * SMOOTH) + (measuredTxRate * (1 - SMOOTH)); 274 requestCount = 0; 275 lastTxRateBucket = timeBucket; 276 } 277 } 278 enable()279 synchronized void enable() { 280 enabled = true; 281 } 282 283 /** 284 * <pre> 285 * _UpdateClientSendingRate(response) 286 * _UpdateMeasuredRate() 287 * 288 * if IsThrottlingError(response) 289 * if not enabled 290 * rate_to_use = measured_tx_rate 291 * else 292 * rate_to_use = min(measured_tx_rate, fill_rate) 293 * 294 * # The fill_rate is from the token bucket. 295 * last_max_rate = rate_to_use 296 * _CalculateTimeWindow() 297 * last_throttle_time = time() 298 * calculated_rate = _CUBICThrottle(rate_to_use) 299 * TokenBucketEnable() 300 * else 301 * _CalculateTimeWindow() 302 * calculated_rate = _CUBICSuccess(time()) 303 * 304 * new_rate = min(calculated_rate, 2 * measured_tx_rate) 305 * _TokenBucketUpdateRate(new_rate) 306 * </pre> 307 */ updateClientSendingRate(boolean throttlingResponse)308 public synchronized void updateClientSendingRate(boolean throttlingResponse) { 309 updateMeasuredRate(); 310 311 double calculatedRate; 312 if (throttlingResponse) { 313 double rateToUse; 314 if (!enabled) { 315 rateToUse = measuredTxRate; 316 } else { 317 rateToUse = Math.min(measuredTxRate, fillRate); 318 } 319 320 lastMaxRate = rateToUse; 321 calculateTimeWindow(); 322 lastThrottleTime = clock.time(); 323 calculatedRate = cubicThrottle(rateToUse); 324 enable(); 325 } else { 326 calculateTimeWindow(); 327 calculatedRate = cubicSuccess(clock.time()); 328 } 329 330 double newRate = Math.min(calculatedRate, 2 * measuredTxRate); 331 updateRate(newRate); 332 } 333 334 /** 335 * <pre> 336 * _CalculateTimeWindow() 337 * # This is broken out into a separate calculation because it only 338 * # gets updated when last_max_rate change so it can be cached. 339 * _time_window = ((last_max_rate * (1 - BETA)) / SCALE_CONSTANT) ^ (1 / 3) 340 * </pre> 341 */ 342 // Package private for testing calculateTimeWindow()343 synchronized void calculateTimeWindow() { 344 timeWindow = Math.pow((lastMaxRate * (1 - BETA)) / SCALE_CONSTANT, 1.0 / 3); 345 } 346 347 /** 348 * Sleep for a given amount of seconds. 349 * 350 * @param seconds The amount of time to sleep in seconds. 351 */ sleep(double seconds)352 void sleep(double seconds) { 353 long millisToSleep = (long) (seconds * 1000); 354 try { 355 Thread.sleep(millisToSleep); 356 } catch (InterruptedException ie) { 357 Thread.currentThread().interrupt(); 358 throw SdkClientException.create("Sleep interrupted", ie); 359 } 360 } 361 362 /** 363 * <pre> 364 * _CUBICThrottle(rate_to_use) 365 * calculated_rate = rate_to_use * BETA 366 * return calculated_rate 367 * </pre> 368 */ 369 // Package private for testing cubicThrottle(double rateToUse)370 double cubicThrottle(double rateToUse) { 371 double calculatedRate = rateToUse * BETA; 372 return calculatedRate; 373 } 374 375 /** 376 * <pre> 377 * _CUBICSuccess(timestamp) 378 * dt = timestamp - last_throttle_time 379 * calculated_rate = (SCALE_CONSTANT * ((dt - _time_window) ^ 3)) + last_max_rate 380 * return calculated_rate 381 * </pre> 382 */ 383 // Package private for testing cubicSuccess(double timestamp)384 synchronized double cubicSuccess(double timestamp) { 385 double dt = timestamp - lastThrottleTime; 386 double calculatedRate = SCALE_CONSTANT * Math.pow(dt - timeWindow, 3) + lastMaxRate; 387 return calculatedRate; 388 } 389 390 static class DefaultClock implements Clock { 391 @Override time()392 public double time() { 393 long timeMillis = System.nanoTime(); 394 return timeMillis / 1000000000.; 395 } 396 } 397 398 @SdkTestInternalApi setLastMaxRate(double lastMaxRate)399 synchronized void setLastMaxRate(double lastMaxRate) { 400 this.lastMaxRate = lastMaxRate; 401 } 402 403 @SdkTestInternalApi setLastThrottleTime(double lastThrottleTime)404 synchronized void setLastThrottleTime(double lastThrottleTime) { 405 this.lastThrottleTime = lastThrottleTime; 406 } 407 408 @SdkTestInternalApi getMeasuredTxRate()409 synchronized double getMeasuredTxRate() { 410 return measuredTxRate; 411 } 412 413 @SdkTestInternalApi getFillRate()414 synchronized double getFillRate() { 415 return fillRate; 416 } 417 418 @SdkTestInternalApi setCurrentCapacity(double currentCapacity)419 synchronized void setCurrentCapacity(double currentCapacity) { 420 this.currentCapacity = currentCapacity; 421 } 422 423 @SdkTestInternalApi getCurrentCapacity()424 synchronized double getCurrentCapacity() { 425 return currentCapacity; 426 } 427 428 @SdkTestInternalApi setFillRate(double fillRate)429 synchronized void setFillRate(double fillRate) { 430 this.fillRate = fillRate; 431 } 432 } 433