1 // Copyright (c) 2024 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::future::Future; 15 use std::pin::Pin; 16 use std::task::{Context, Poll}; 17 use std::time::{Duration, Instant}; 18 19 use crate::runtime::{sleep, Sleep}; 20 use crate::HttpClientError; 21 22 pub(crate) const SPEED_CHECK_PERIOD: Duration = Duration::from_millis(1000); 23 24 #[derive(Default, Clone)] 25 pub(crate) struct SpeedController { 26 pub(crate) send_rate_limit: RateLimit, 27 pub(crate) recv_rate_limit: RateLimit, 28 } 29 30 impl SpeedController { none() -> Self31 pub(crate) fn none() -> Self { 32 SpeedController::default() 33 } 34 set_speed_limit(&mut self, config: SpeedConfig)35 pub(crate) fn set_speed_limit(&mut self, config: SpeedConfig) { 36 if let Some(speed) = config.max_recv_speed() { 37 self.recv_rate_limit 38 .set_max_speed(speed, SPEED_CHECK_PERIOD); 39 } 40 41 if let Some(speed) = config.min_recv_speed() { 42 if let Some(interval) = config.min_speed_interval() { 43 self.recv_rate_limit.set_min_speed( 44 speed, 45 SPEED_CHECK_PERIOD, 46 Duration::from_secs(interval), 47 ); 48 } 49 } 50 51 if let Some(speed) = config.max_send_speed() { 52 self.send_rate_limit 53 .set_max_speed(speed, SPEED_CHECK_PERIOD); 54 } 55 56 if let Some(speed) = config.min_send_speed() { 57 if let Some(interval) = config.min_speed_interval() { 58 self.send_rate_limit.set_min_speed( 59 speed, 60 SPEED_CHECK_PERIOD, 61 Duration::from_secs(interval), 62 ); 63 } 64 } 65 } 66 need_limit_max_send_speed(&self) -> bool67 pub(crate) fn need_limit_max_send_speed(&self) -> bool { 68 self.send_rate_limit.need_limit_max_speed() 69 } 70 max_send_speed_limit(&mut self, size: usize)71 pub(crate) async fn max_send_speed_limit(&mut self, size: usize) { 72 self.send_rate_limit.max_speed_limit(size).await 73 } 74 delay_max_recv_speed_limit(&mut self, size: usize)75 pub(crate) fn delay_max_recv_speed_limit(&mut self, size: usize) { 76 self.recv_rate_limit.delay_max_speed_limit(size) 77 } 78 79 #[cfg(any(feature = "http2", feature = "http3"))] delay_max_send_speed_limit(&mut self, size: usize)80 pub(crate) fn delay_max_send_speed_limit(&mut self, size: usize) { 81 self.send_rate_limit.delay_max_speed_limit(size) 82 } 83 min_send_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError>84 pub(crate) fn min_send_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError> { 85 self.send_rate_limit.min_speed_limit(size) 86 } 87 reset_send_pending_timeout(&mut self)88 pub(crate) fn reset_send_pending_timeout(&mut self) { 89 self.send_rate_limit.reset_pending_timeout() 90 } 91 min_recv_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError>92 pub(crate) fn min_recv_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError> { 93 self.recv_rate_limit.min_speed_limit(size) 94 } 95 reset_recv_pending_timeout(&mut self)96 pub(crate) fn reset_recv_pending_timeout(&mut self) { 97 self.recv_rate_limit.reset_pending_timeout() 98 } 99 poll_max_recv_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()>100 pub(crate) fn poll_max_recv_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()> { 101 self.recv_rate_limit.poll_limited_delay(cx) 102 } 103 poll_recv_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool104 pub(crate) fn poll_recv_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool { 105 self.recv_rate_limit.poll_pending_timeout(cx) 106 } 107 poll_send_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool108 pub(crate) fn poll_send_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool { 109 self.send_rate_limit.poll_pending_timeout(cx) 110 } 111 112 #[cfg(any(feature = "http2", feature = "http3"))] poll_max_send_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()>113 pub(crate) fn poll_max_send_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()> { 114 self.send_rate_limit.poll_limited_delay(cx) 115 } 116 init_max_send_if_not_start(&mut self)117 pub(crate) fn init_max_send_if_not_start(&mut self) { 118 self.send_rate_limit.init_max_limit_if_not_start(); 119 } 120 init_min_send_if_not_start(&mut self)121 pub(crate) fn init_min_send_if_not_start(&mut self) { 122 self.send_rate_limit.init_min_limit_if_not_start(); 123 } 124 init_max_recv_if_not_start(&mut self)125 pub(crate) fn init_max_recv_if_not_start(&mut self) { 126 self.recv_rate_limit.init_max_limit_if_not_start(); 127 } 128 init_min_recv_if_not_start(&mut self)129 pub(crate) fn init_min_recv_if_not_start(&mut self) { 130 self.recv_rate_limit.init_min_limit_if_not_start(); 131 } 132 } 133 134 #[derive(Default, Clone)] 135 pub(crate) struct RateLimit { 136 min_speed: Option<SpeedLimit>, 137 max_speed: Option<SpeedLimit>, 138 } 139 140 impl RateLimit { set_min_speed(&mut self, rate: u64, period: Duration, interval: Duration)141 pub(crate) fn set_min_speed(&mut self, rate: u64, period: Duration, interval: Duration) { 142 let limit = SpeedLimit::new(rate, period, interval); 143 self.min_speed = Some(limit) 144 } 145 set_max_speed(&mut self, rate: u64, period: Duration)146 pub(crate) fn set_max_speed(&mut self, rate: u64, period: Duration) { 147 let limit = SpeedLimit::new(rate, period, Duration::default()); 148 self.max_speed = Some(limit) 149 } 150 need_limit_max_speed(&self) -> bool151 pub(crate) fn need_limit_max_speed(&self) -> bool { 152 self.max_speed.is_some() 153 } 154 init_max_limit_if_not_start(&mut self)155 pub(crate) fn init_max_limit_if_not_start(&mut self) { 156 if let Some(ref mut speed) = self.max_speed { 157 speed.init_if_not_start() 158 } 159 } 160 init_min_limit_if_not_start(&mut self)161 pub(crate) fn init_min_limit_if_not_start(&mut self) { 162 if let Some(ref mut speed) = self.min_speed { 163 speed.init_if_not_start() 164 } 165 } 166 max_speed_limit(&mut self, read: usize)167 pub(crate) async fn max_speed_limit(&mut self, read: usize) { 168 if let Some(ref mut speed) = self.max_speed { 169 speed.limit_max_speed(read).await 170 } 171 } 172 delay_max_speed_limit(&mut self, read: usize)173 pub(crate) fn delay_max_speed_limit(&mut self, read: usize) { 174 if let Some(ref mut speed) = self.max_speed { 175 speed.delay_max_speed_limit(read) 176 } 177 } 178 min_speed_limit(&mut self, read: usize) -> Result<(), HttpClientError>179 pub(crate) fn min_speed_limit(&mut self, read: usize) -> Result<(), HttpClientError> { 180 if let Some(ref mut speed) = self.min_speed { 181 speed.limit_min_speed(read) 182 } else { 183 Ok(()) 184 } 185 } 186 reset_pending_timeout(&mut self)187 pub(crate) fn reset_pending_timeout(&mut self) { 188 if let Some(ref mut speed) = self.min_speed { 189 speed.reset_pending_timeout() 190 } 191 } 192 poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool193 pub(crate) fn poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool { 194 self.min_speed 195 .as_mut() 196 .is_some_and(|speed| speed.poll_pending_timeout(cx)) 197 } 198 poll_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()>199 pub(crate) fn poll_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()> { 200 if let Some(ref mut speed) = self.max_speed { 201 return speed.poll_max_limited_delay(cx); 202 } 203 Poll::Ready(()) 204 } 205 } 206 207 #[derive(Default)] 208 pub(crate) struct SpeedLimit { 209 rate: u64, 210 // Speed limiting period, millisecond. 211 period: Duration, 212 min_speed_interval: Duration, 213 // min_speed_interval start time. 214 min_speed_start: Option<Instant>, 215 // Data received within a period, byte. 216 period_data: u64, 217 // The elapsed time in the period. 218 elapsed_time: Duration, 219 // The maximum data allowed within a period, byte. 220 max_speed_allowed_bytes: u64, 221 // The start time of each io read or write. 222 start: Option<Instant>, 223 // The time delay required to trigger the maximum speed limit. 224 delay: Option<Pin<Box<Sleep>>>, 225 // min_speed_interval Pending Timeout time. 226 timeout: Option<Pin<Box<Sleep>>>, 227 } 228 229 impl SpeedLimit { 230 /// Creates a new `SpeedLimit`. 231 /// `rate` is the download size allowed within a period, expressed in 232 /// bytes/second. new(rate: u64, period: Duration, interval: Duration) -> SpeedLimit233 pub(crate) fn new(rate: u64, period: Duration, interval: Duration) -> SpeedLimit { 234 SpeedLimit { 235 rate, 236 period, 237 min_speed_interval: interval, 238 min_speed_start: None, 239 period_data: 0, 240 elapsed_time: Duration::default(), 241 max_speed_allowed_bytes: rate * period.as_secs(), 242 start: None, 243 delay: None, 244 timeout: Some(Box::pin(sleep(interval))), 245 } 246 } 247 init_if_not_start(&mut self)248 pub(crate) fn init_if_not_start(&mut self) { 249 self.start.get_or_insert(Instant::now()); 250 } 251 poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool252 pub(crate) fn poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool { 253 self.timeout 254 .as_mut() 255 .is_some_and(|timeout| Pin::new(timeout).poll(cx).is_ready()) 256 } 257 poll_max_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()>258 pub(crate) fn poll_max_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()> { 259 if let Some(delay) = self.delay.as_mut() { 260 return match Pin::new(delay).poll(cx) { 261 Poll::Ready(()) => { 262 self.delay = None; 263 self.next_period(); 264 Poll::Ready(()) 265 } 266 Poll::Pending => Poll::Pending, 267 }; 268 } 269 Poll::Ready(()) 270 } 271 delay_max_speed_limit(&mut self, data_size: usize)272 pub(crate) fn delay_max_speed_limit(&mut self, data_size: usize) { 273 if let Some(start_time) = self.start.take() { 274 self.elapsed_time += start_time.elapsed(); 275 self.period_data += data_size as u64; 276 if self.elapsed_time < self.period { 277 if self.period_data >= self.max_speed_allowed_bytes { 278 // The minimum milliseconds to download this data within the speed limit. 279 let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate); 280 // We will not poll here immediately because the data has not yet been returned 281 // to user. 282 self.delay = Some(Box::pin(sleep(limited_time - self.elapsed_time))); 283 } 284 } else { 285 // The minimum milliseconds to download this data within the speed limit. 286 let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate); 287 if self.elapsed_time < limited_time { 288 // We will not poll here immediately because the data has not yet been returned 289 // to user. 290 self.delay = Some(Box::pin(sleep(limited_time - self.elapsed_time))); 291 } else { 292 // We don't count the part that goes beyond the period, and we go straight to 293 // the next period. 294 self.next_period() 295 } 296 } 297 } 298 } 299 limit_max_speed(&mut self, data_size: usize)300 pub(crate) async fn limit_max_speed(&mut self, data_size: usize) { 301 if let Some(start_time) = self.start.take() { 302 // let elapsed_total = start_time.elapsed(); 303 self.elapsed_time += start_time.elapsed(); 304 self.period_data += data_size as u64; 305 if self.elapsed_time < self.period { 306 if self.period_data >= self.max_speed_allowed_bytes { 307 // The minimum milliseconds to download this data within the speed limit. 308 let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate); 309 sleep(limited_time - self.elapsed_time).await; 310 self.next_period(); 311 } 312 } else { 313 // The minimum milliseconds to download this data within the speed limit. 314 let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate); 315 if self.elapsed_time < limited_time { 316 sleep(limited_time - self.elapsed_time).await; 317 } 318 // We don't count the part that goes beyond the period, and we go straight to 319 // the next period. 320 self.next_period() 321 } 322 } 323 } 324 limit_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError>325 pub(crate) fn limit_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError> { 326 if let Some(start_time) = self.start.take() { 327 self.min_speed_start.get_or_insert(start_time); 328 self.elapsed_time += start_time.elapsed(); 329 if self.elapsed_time >= self.period { 330 self.check_min_speed(data_size)?; 331 } else { 332 self.period_data += data_size as u64; 333 } 334 } 335 Ok(()) 336 } 337 reset_pending_timeout(&mut self)338 pub(crate) fn reset_pending_timeout(&mut self) { 339 self.timeout = Some(Box::pin(sleep(self.min_speed_interval))); 340 } 341 check_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError>342 fn check_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError> { 343 self.period_data += data_size as u64; 344 // The time it takes to process period_data at the minimum speed limit. 345 let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate); 346 if self.elapsed_time > limited_time { 347 // self.min_speed_start must be Some because it was assigned before this 348 // function was called. 349 if let Some(ref check_start) = self.min_speed_start { 350 let check_elapsed = check_start.elapsed(); 351 // If the time at min_speed_limit exceeds min_speed_interval, an error is 352 // raised. 353 if check_elapsed > self.min_speed_interval { 354 return err_from_msg!(BodyTransfer, "Below low speed limit"); 355 } 356 } 357 } else { 358 // If the speed exceeds min_speed_limit, min_speed_interval is reset 359 // immediately. 360 self.next_interval(); 361 } 362 self.next_period(); 363 Ok(()) 364 } 365 next_period(&mut self)366 fn next_period(&mut self) { 367 self.period_data = 0; 368 self.start = None; 369 self.elapsed_time = Duration::default(); 370 } 371 next_interval(&mut self)372 fn next_interval(&mut self) { 373 self.min_speed_start = None 374 } 375 } 376 377 impl Clone for SpeedLimit { clone(&self) -> Self378 fn clone(&self) -> Self { 379 Self { 380 rate: self.rate, 381 period: self.period, 382 min_speed_interval: self.min_speed_interval, 383 min_speed_start: None, 384 period_data: self.period_data, 385 elapsed_time: self.elapsed_time, 386 max_speed_allowed_bytes: self.max_speed_allowed_bytes, 387 start: None, 388 delay: None, 389 timeout: None, 390 } 391 } 392 } 393 394 #[derive(Default, Copy, Clone)] 395 pub(crate) struct SpeedConfig { 396 max_recv: Option<u64>, 397 min_recv: Option<u64>, 398 max_send: Option<u64>, 399 min_send: Option<u64>, 400 min_speed_interval: Option<u64>, 401 } 402 403 impl SpeedConfig { none() -> SpeedConfig404 pub(crate) fn none() -> SpeedConfig { 405 Self::default() 406 } 407 set_max_rate(&mut self, rate: u64)408 pub(crate) fn set_max_rate(&mut self, rate: u64) { 409 self.max_recv = Some(rate); 410 self.max_send = Some(rate) 411 } 412 set_min_rate(&mut self, rate: u64)413 pub(crate) fn set_min_rate(&mut self, rate: u64) { 414 self.min_send = Some(rate); 415 self.min_recv = Some(rate) 416 } 417 set_min_speed_interval(&mut self, seconds: u64)418 pub(crate) fn set_min_speed_interval(&mut self, seconds: u64) { 419 self.min_speed_interval = Some(seconds) 420 } 421 max_recv_speed(&self) -> Option<u64>422 pub(crate) fn max_recv_speed(&self) -> Option<u64> { 423 self.max_recv 424 } 425 max_send_speed(&self) -> Option<u64>426 pub(crate) fn max_send_speed(&self) -> Option<u64> { 427 self.max_send 428 } 429 min_recv_speed(&self) -> Option<u64>430 pub(crate) fn min_recv_speed(&self) -> Option<u64> { 431 self.min_recv 432 } 433 min_send_speed(&self) -> Option<u64>434 pub(crate) fn min_send_speed(&self) -> Option<u64> { 435 self.min_send 436 } 437 min_speed_interval(&self) -> Option<u64>438 pub(crate) fn min_speed_interval(&self) -> Option<u64> { 439 self.min_speed_interval 440 } 441 } 442