• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2024 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::future::Future;
15 use std::pin::Pin;
16 use std::task::{Context, Poll};
17 use std::time::{Duration, Instant};
18 
19 use crate::runtime::{sleep, Sleep};
20 use crate::HttpClientError;
21 
22 pub(crate) const SPEED_CHECK_PERIOD: Duration = Duration::from_millis(1000);
23 
24 #[derive(Default, Clone)]
25 pub(crate) struct SpeedController {
26     pub(crate) send_rate_limit: RateLimit,
27     pub(crate) recv_rate_limit: RateLimit,
28 }
29 
30 impl SpeedController {
none() -> Self31     pub(crate) fn none() -> Self {
32         SpeedController::default()
33     }
34 
set_speed_limit(&mut self, config: SpeedConfig)35     pub(crate) fn set_speed_limit(&mut self, config: SpeedConfig) {
36         if let Some(speed) = config.max_recv_speed() {
37             self.recv_rate_limit
38                 .set_max_speed(speed, SPEED_CHECK_PERIOD);
39         }
40 
41         if let Some(speed) = config.min_recv_speed() {
42             if let Some(interval) = config.min_speed_interval() {
43                 self.recv_rate_limit.set_min_speed(
44                     speed,
45                     SPEED_CHECK_PERIOD,
46                     Duration::from_secs(interval),
47                 );
48             }
49         }
50 
51         if let Some(speed) = config.max_send_speed() {
52             self.send_rate_limit
53                 .set_max_speed(speed, SPEED_CHECK_PERIOD);
54         }
55 
56         if let Some(speed) = config.min_send_speed() {
57             if let Some(interval) = config.min_speed_interval() {
58                 self.send_rate_limit.set_min_speed(
59                     speed,
60                     SPEED_CHECK_PERIOD,
61                     Duration::from_secs(interval),
62                 );
63             }
64         }
65     }
66 
need_limit_max_send_speed(&self) -> bool67     pub(crate) fn need_limit_max_send_speed(&self) -> bool {
68         self.send_rate_limit.need_limit_max_speed()
69     }
70 
max_send_speed_limit(&mut self, size: usize)71     pub(crate) async fn max_send_speed_limit(&mut self, size: usize) {
72         self.send_rate_limit.max_speed_limit(size).await
73     }
74 
delay_max_recv_speed_limit(&mut self, size: usize)75     pub(crate) fn delay_max_recv_speed_limit(&mut self, size: usize) {
76         self.recv_rate_limit.delay_max_speed_limit(size)
77     }
78 
79     #[cfg(any(feature = "http2", feature = "http3"))]
delay_max_send_speed_limit(&mut self, size: usize)80     pub(crate) fn delay_max_send_speed_limit(&mut self, size: usize) {
81         self.send_rate_limit.delay_max_speed_limit(size)
82     }
83 
min_send_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError>84     pub(crate) fn min_send_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError> {
85         self.send_rate_limit.min_speed_limit(size)
86     }
87 
reset_send_pending_timeout(&mut self)88     pub(crate) fn reset_send_pending_timeout(&mut self) {
89         self.send_rate_limit.reset_pending_timeout()
90     }
91 
min_recv_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError>92     pub(crate) fn min_recv_speed_limit(&mut self, size: usize) -> Result<(), HttpClientError> {
93         self.recv_rate_limit.min_speed_limit(size)
94     }
95 
reset_recv_pending_timeout(&mut self)96     pub(crate) fn reset_recv_pending_timeout(&mut self) {
97         self.recv_rate_limit.reset_pending_timeout()
98     }
99 
poll_max_recv_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()>100     pub(crate) fn poll_max_recv_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()> {
101         self.recv_rate_limit.poll_limited_delay(cx)
102     }
103 
poll_recv_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool104     pub(crate) fn poll_recv_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool {
105         self.recv_rate_limit.poll_pending_timeout(cx)
106     }
107 
poll_send_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool108     pub(crate) fn poll_send_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool {
109         self.send_rate_limit.poll_pending_timeout(cx)
110     }
111 
112     #[cfg(any(feature = "http2", feature = "http3"))]
poll_max_send_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()>113     pub(crate) fn poll_max_send_delay_time(&mut self, cx: &mut Context<'_>) -> Poll<()> {
114         self.send_rate_limit.poll_limited_delay(cx)
115     }
116 
init_max_send_if_not_start(&mut self)117     pub(crate) fn init_max_send_if_not_start(&mut self) {
118         self.send_rate_limit.init_max_limit_if_not_start();
119     }
120 
init_min_send_if_not_start(&mut self)121     pub(crate) fn init_min_send_if_not_start(&mut self) {
122         self.send_rate_limit.init_min_limit_if_not_start();
123     }
124 
init_max_recv_if_not_start(&mut self)125     pub(crate) fn init_max_recv_if_not_start(&mut self) {
126         self.recv_rate_limit.init_max_limit_if_not_start();
127     }
128 
init_min_recv_if_not_start(&mut self)129     pub(crate) fn init_min_recv_if_not_start(&mut self) {
130         self.recv_rate_limit.init_min_limit_if_not_start();
131     }
132 }
133 
134 #[derive(Default, Clone)]
135 pub(crate) struct RateLimit {
136     min_speed: Option<SpeedLimit>,
137     max_speed: Option<SpeedLimit>,
138 }
139 
140 impl RateLimit {
set_min_speed(&mut self, rate: u64, period: Duration, interval: Duration)141     pub(crate) fn set_min_speed(&mut self, rate: u64, period: Duration, interval: Duration) {
142         let limit = SpeedLimit::new(rate, period, interval);
143         self.min_speed = Some(limit)
144     }
145 
set_max_speed(&mut self, rate: u64, period: Duration)146     pub(crate) fn set_max_speed(&mut self, rate: u64, period: Duration) {
147         let limit = SpeedLimit::new(rate, period, Duration::default());
148         self.max_speed = Some(limit)
149     }
150 
need_limit_max_speed(&self) -> bool151     pub(crate) fn need_limit_max_speed(&self) -> bool {
152         self.max_speed.is_some()
153     }
154 
init_max_limit_if_not_start(&mut self)155     pub(crate) fn init_max_limit_if_not_start(&mut self) {
156         if let Some(ref mut speed) = self.max_speed {
157             speed.init_if_not_start()
158         }
159     }
160 
init_min_limit_if_not_start(&mut self)161     pub(crate) fn init_min_limit_if_not_start(&mut self) {
162         if let Some(ref mut speed) = self.min_speed {
163             speed.init_if_not_start()
164         }
165     }
166 
max_speed_limit(&mut self, read: usize)167     pub(crate) async fn max_speed_limit(&mut self, read: usize) {
168         if let Some(ref mut speed) = self.max_speed {
169             speed.limit_max_speed(read).await
170         }
171     }
172 
delay_max_speed_limit(&mut self, read: usize)173     pub(crate) fn delay_max_speed_limit(&mut self, read: usize) {
174         if let Some(ref mut speed) = self.max_speed {
175             speed.delay_max_speed_limit(read)
176         }
177     }
178 
min_speed_limit(&mut self, read: usize) -> Result<(), HttpClientError>179     pub(crate) fn min_speed_limit(&mut self, read: usize) -> Result<(), HttpClientError> {
180         if let Some(ref mut speed) = self.min_speed {
181             speed.limit_min_speed(read)
182         } else {
183             Ok(())
184         }
185     }
186 
reset_pending_timeout(&mut self)187     pub(crate) fn reset_pending_timeout(&mut self) {
188         if let Some(ref mut speed) = self.min_speed {
189             speed.reset_pending_timeout()
190         }
191     }
192 
poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool193     pub(crate) fn poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool {
194         self.min_speed
195             .as_mut()
196             .is_some_and(|speed| speed.poll_pending_timeout(cx))
197     }
198 
poll_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()>199     pub(crate) fn poll_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()> {
200         if let Some(ref mut speed) = self.max_speed {
201             return speed.poll_max_limited_delay(cx);
202         }
203         Poll::Ready(())
204     }
205 }
206 
207 #[derive(Default)]
208 pub(crate) struct SpeedLimit {
209     rate: u64,
210     // Speed limiting period, millisecond.
211     period: Duration,
212     min_speed_interval: Duration,
213     // min_speed_interval start time.
214     min_speed_start: Option<Instant>,
215     // Data received within a period, byte.
216     period_data: u64,
217     // The elapsed time in the period.
218     elapsed_time: Duration,
219     // The maximum data allowed within a period, byte.
220     max_speed_allowed_bytes: u64,
221     // The start time of each io read or write.
222     start: Option<Instant>,
223     // The time delay required to trigger the maximum speed limit.
224     delay: Option<Pin<Box<Sleep>>>,
225     // min_speed_interval Pending Timeout time.
226     timeout: Option<Pin<Box<Sleep>>>,
227 }
228 
229 impl SpeedLimit {
230     /// Creates a new `SpeedLimit`.
231     /// `rate` is the download size allowed within a period, expressed in
232     /// bytes/second.
new(rate: u64, period: Duration, interval: Duration) -> SpeedLimit233     pub(crate) fn new(rate: u64, period: Duration, interval: Duration) -> SpeedLimit {
234         SpeedLimit {
235             rate,
236             period,
237             min_speed_interval: interval,
238             min_speed_start: None,
239             period_data: 0,
240             elapsed_time: Duration::default(),
241             max_speed_allowed_bytes: rate * period.as_secs(),
242             start: None,
243             delay: None,
244             timeout: Some(Box::pin(sleep(interval))),
245         }
246     }
247 
init_if_not_start(&mut self)248     pub(crate) fn init_if_not_start(&mut self) {
249         self.start.get_or_insert(Instant::now());
250     }
251 
poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool252     pub(crate) fn poll_pending_timeout(&mut self, cx: &mut Context<'_>) -> bool {
253         self.timeout
254             .as_mut()
255             .is_some_and(|timeout| Pin::new(timeout).poll(cx).is_ready())
256     }
257 
poll_max_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()>258     pub(crate) fn poll_max_limited_delay(&mut self, cx: &mut Context<'_>) -> Poll<()> {
259         if let Some(delay) = self.delay.as_mut() {
260             return match Pin::new(delay).poll(cx) {
261                 Poll::Ready(()) => {
262                     self.delay = None;
263                     self.next_period();
264                     Poll::Ready(())
265                 }
266                 Poll::Pending => Poll::Pending,
267             };
268         }
269         Poll::Ready(())
270     }
271 
delay_max_speed_limit(&mut self, data_size: usize)272     pub(crate) fn delay_max_speed_limit(&mut self, data_size: usize) {
273         if let Some(start_time) = self.start.take() {
274             self.elapsed_time += start_time.elapsed();
275             self.period_data += data_size as u64;
276             if self.elapsed_time < self.period {
277                 if self.period_data >= self.max_speed_allowed_bytes {
278                     // The minimum milliseconds to download this data within the speed limit.
279                     let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate);
280                     // We will not poll here immediately because the data has not yet been returned
281                     // to user.
282                     self.delay = Some(Box::pin(sleep(limited_time - self.elapsed_time)));
283                 }
284             } else {
285                 // The minimum milliseconds to download this data within the speed limit.
286                 let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate);
287                 if self.elapsed_time < limited_time {
288                     // We will not poll here immediately because the data has not yet been returned
289                     // to user.
290                     self.delay = Some(Box::pin(sleep(limited_time - self.elapsed_time)));
291                 } else {
292                     // We don't count the part that goes beyond the period, and we go straight to
293                     // the next period.
294                     self.next_period()
295                 }
296             }
297         }
298     }
299 
limit_max_speed(&mut self, data_size: usize)300     pub(crate) async fn limit_max_speed(&mut self, data_size: usize) {
301         if let Some(start_time) = self.start.take() {
302             // let elapsed_total = start_time.elapsed();
303             self.elapsed_time += start_time.elapsed();
304             self.period_data += data_size as u64;
305             if self.elapsed_time < self.period {
306                 if self.period_data >= self.max_speed_allowed_bytes {
307                     // The minimum milliseconds to download this data within the speed limit.
308                     let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate);
309                     sleep(limited_time - self.elapsed_time).await;
310                     self.next_period();
311                 }
312             } else {
313                 // The minimum milliseconds to download this data within the speed limit.
314                 let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate);
315                 if self.elapsed_time < limited_time {
316                     sleep(limited_time - self.elapsed_time).await;
317                 }
318                 // We don't count the part that goes beyond the period, and we go straight to
319                 // the next period.
320                 self.next_period()
321             }
322         }
323     }
324 
limit_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError>325     pub(crate) fn limit_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError> {
326         if let Some(start_time) = self.start.take() {
327             self.min_speed_start.get_or_insert(start_time);
328             self.elapsed_time += start_time.elapsed();
329             if self.elapsed_time >= self.period {
330                 self.check_min_speed(data_size)?;
331             } else {
332                 self.period_data += data_size as u64;
333             }
334         }
335         Ok(())
336     }
337 
reset_pending_timeout(&mut self)338     pub(crate) fn reset_pending_timeout(&mut self) {
339         self.timeout = Some(Box::pin(sleep(self.min_speed_interval)));
340     }
341 
check_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError>342     fn check_min_speed(&mut self, data_size: usize) -> Result<(), HttpClientError> {
343         self.period_data += data_size as u64;
344         // The time it takes to process period_data at the minimum speed limit.
345         let limited_time = Duration::from_millis(self.period_data * 1000 / self.rate);
346         if self.elapsed_time > limited_time {
347             // self.min_speed_start must be Some because it was assigned before this
348             // function was called.
349             if let Some(ref check_start) = self.min_speed_start {
350                 let check_elapsed = check_start.elapsed();
351                 // If the time at min_speed_limit exceeds min_speed_interval, an error is
352                 // raised.
353                 if check_elapsed > self.min_speed_interval {
354                     return err_from_msg!(BodyTransfer, "Below low speed limit");
355                 }
356             }
357         } else {
358             // If the speed exceeds min_speed_limit, min_speed_interval is reset
359             // immediately.
360             self.next_interval();
361         }
362         self.next_period();
363         Ok(())
364     }
365 
next_period(&mut self)366     fn next_period(&mut self) {
367         self.period_data = 0;
368         self.start = None;
369         self.elapsed_time = Duration::default();
370     }
371 
next_interval(&mut self)372     fn next_interval(&mut self) {
373         self.min_speed_start = None
374     }
375 }
376 
377 impl Clone for SpeedLimit {
clone(&self) -> Self378     fn clone(&self) -> Self {
379         Self {
380             rate: self.rate,
381             period: self.period,
382             min_speed_interval: self.min_speed_interval,
383             min_speed_start: None,
384             period_data: self.period_data,
385             elapsed_time: self.elapsed_time,
386             max_speed_allowed_bytes: self.max_speed_allowed_bytes,
387             start: None,
388             delay: None,
389             timeout: None,
390         }
391     }
392 }
393 
394 #[derive(Default, Copy, Clone)]
395 pub(crate) struct SpeedConfig {
396     max_recv: Option<u64>,
397     min_recv: Option<u64>,
398     max_send: Option<u64>,
399     min_send: Option<u64>,
400     min_speed_interval: Option<u64>,
401 }
402 
403 impl SpeedConfig {
none() -> SpeedConfig404     pub(crate) fn none() -> SpeedConfig {
405         Self::default()
406     }
407 
set_max_rate(&mut self, rate: u64)408     pub(crate) fn set_max_rate(&mut self, rate: u64) {
409         self.max_recv = Some(rate);
410         self.max_send = Some(rate)
411     }
412 
set_min_rate(&mut self, rate: u64)413     pub(crate) fn set_min_rate(&mut self, rate: u64) {
414         self.min_send = Some(rate);
415         self.min_recv = Some(rate)
416     }
417 
set_min_speed_interval(&mut self, seconds: u64)418     pub(crate) fn set_min_speed_interval(&mut self, seconds: u64) {
419         self.min_speed_interval = Some(seconds)
420     }
421 
max_recv_speed(&self) -> Option<u64>422     pub(crate) fn max_recv_speed(&self) -> Option<u64> {
423         self.max_recv
424     }
425 
max_send_speed(&self) -> Option<u64>426     pub(crate) fn max_send_speed(&self) -> Option<u64> {
427         self.max_send
428     }
429 
min_recv_speed(&self) -> Option<u64>430     pub(crate) fn min_recv_speed(&self) -> Option<u64> {
431         self.min_recv
432     }
433 
min_send_speed(&self) -> Option<u64>434     pub(crate) fn min_send_speed(&self) -> Option<u64> {
435         self.min_send
436     }
437 
min_speed_interval(&self) -> Option<u64>438     pub(crate) fn min_speed_interval(&self) -> Option<u64> {
439         self.min_speed_interval
440     }
441 }
442