• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2020, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 //! Delivery rate estimation.
28 //!
29 //! This implements the algorithm for estimating delivery rate as described in
30 //! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-00>
31 
32 use std::cmp;
33 
34 use std::time::Duration;
35 use std::time::Instant;
36 
37 use crate::recovery::Sent;
38 
39 #[derive(Default)]
40 pub struct Rate {
41     delivered: usize,
42 
43     delivered_time: Option<Instant>,
44 
45     recent_delivered_packet_sent_time: Option<Instant>,
46 
47     app_limited_at_pkt: usize,
48 
49     rate_sample: RateSample,
50 }
51 
52 impl Rate {
on_packet_sent(&mut self, pkt: &mut Sent, now: Instant)53     pub fn on_packet_sent(&mut self, pkt: &mut Sent, now: Instant) {
54         if self.delivered_time.is_none() {
55             self.delivered_time = Some(now);
56         }
57 
58         if self.recent_delivered_packet_sent_time.is_none() {
59             self.recent_delivered_packet_sent_time = Some(now);
60         }
61 
62         pkt.delivered = self.delivered;
63         pkt.delivered_time = self.delivered_time.unwrap();
64 
65         pkt.recent_delivered_packet_sent_time =
66             self.recent_delivered_packet_sent_time.unwrap();
67 
68         pkt.is_app_limited = self.app_limited_at_pkt > 0;
69     }
70 
on_packet_acked(&mut self, pkt: &Sent, now: Instant)71     pub fn on_packet_acked(&mut self, pkt: &Sent, now: Instant) {
72         self.rate_sample.prior_time = Some(pkt.delivered_time);
73 
74         self.delivered += pkt.size;
75         self.delivered_time = Some(now);
76 
77         if pkt.delivered > self.rate_sample.prior_delivered {
78             self.rate_sample.prior_delivered = pkt.delivered;
79 
80             self.rate_sample.send_elapsed =
81                 pkt.time_sent - pkt.recent_delivered_packet_sent_time;
82 
83             self.rate_sample.ack_elapsed = self
84                 .delivered_time
85                 .unwrap()
86                 .duration_since(pkt.delivered_time);
87 
88             self.recent_delivered_packet_sent_time = Some(pkt.time_sent);
89         }
90     }
91 
estimate(&mut self)92     pub fn estimate(&mut self) {
93         if (self.app_limited_at_pkt > 0) &&
94             (self.delivered > self.app_limited_at_pkt)
95         {
96             self.app_limited_at_pkt = 0;
97         }
98 
99         match self.rate_sample.prior_time {
100             Some(_) => {
101                 self.rate_sample.delivered =
102                     self.delivered - self.rate_sample.prior_delivered;
103 
104                 self.rate_sample.interval = cmp::max(
105                     self.rate_sample.send_elapsed,
106                     self.rate_sample.ack_elapsed,
107                 );
108             },
109             None => return,
110         }
111 
112         if self.rate_sample.interval.as_secs_f64() > 0.0 {
113             self.rate_sample.delivery_rate = (self.rate_sample.delivered as f64 /
114                 self.rate_sample.interval.as_secs_f64())
115                 as u64;
116         }
117     }
118 
check_app_limited(&mut self, bytes_in_flight: usize)119     pub fn check_app_limited(&mut self, bytes_in_flight: usize) {
120         let limited = self.delivered + bytes_in_flight;
121         self.app_limited_at_pkt = if limited > 0 { limited } else { 1 };
122     }
123 
delivery_rate(&self) -> u64124     pub fn delivery_rate(&self) -> u64 {
125         self.rate_sample.delivery_rate
126     }
127 }
128 
129 impl std::fmt::Debug for Rate {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result130     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
131         write!(f, "delivered={:?} ", self.delivered)?;
132 
133         if let Some(t) = self.delivered_time {
134             write!(f, "delivered_time={:?} ", t.elapsed())?;
135         }
136 
137         if let Some(t) = self.recent_delivered_packet_sent_time {
138             write!(f, "recent_delivered_packet_sent_time={:?} ", t.elapsed())?;
139         }
140 
141         write!(f, "app_limited_at_pkt={:?} ", self.app_limited_at_pkt)?;
142 
143         Ok(())
144     }
145 }
146 
147 #[derive(Default)]
148 struct RateSample {
149     delivery_rate: u64,
150 
151     interval: Duration,
152 
153     delivered: usize,
154 
155     prior_delivered: usize,
156 
157     prior_time: Option<Instant>,
158 
159     send_elapsed: Duration,
160 
161     ack_elapsed: Duration,
162 }
163 
164 impl std::fmt::Debug for RateSample {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result165     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
166         write!(f, "delivery_rate={:?} ", self.delivery_rate)?;
167         write!(f, "interval={:?} ", self.interval)?;
168         write!(f, "delivered={:?} ", self.delivered)?;
169         write!(f, "prior_delivered={:?} ", self.prior_delivered)?;
170         write!(f, "send_elapsed={:?} ", self.send_elapsed)?;
171         if let Some(t) = self.prior_time {
172             write!(f, "prior_time={:?} ", t.elapsed())?;
173         }
174         write!(f, "ack_elapsed={:?}", self.ack_elapsed)?;
175 
176         Ok(())
177     }
178 }
179 
180 #[cfg(test)]
181 mod tests {
182     use super::*;
183 
184     use crate::recovery::*;
185 
186     #[test]
rate_check()187     fn rate_check() {
188         let config = Config::new(0xbabababa).unwrap();
189         let mut recovery = Recovery::new(&config);
190 
191         let mut pkt_1 = Sent {
192             pkt_num: 0,
193             frames: vec![],
194             time_sent: Instant::now(),
195             time_acked: None,
196             time_lost: None,
197             size: 1200,
198             ack_eliciting: true,
199             in_flight: true,
200             delivered: 0,
201             delivered_time: Instant::now(),
202             recent_delivered_packet_sent_time: Instant::now(),
203             is_app_limited: false,
204             has_data: false,
205         };
206 
207         recovery
208             .delivery_rate
209             .on_packet_sent(&mut pkt_1, Instant::now());
210         std::thread::sleep(Duration::from_millis(50));
211         recovery
212             .delivery_rate
213             .on_packet_acked(&pkt_1, Instant::now());
214 
215         let mut pkt_2 = Sent {
216             pkt_num: 1,
217             frames: vec![],
218             time_sent: Instant::now(),
219             time_acked: None,
220             time_lost: None,
221             size: 1200,
222             ack_eliciting: true,
223             in_flight: true,
224             delivered: 0,
225             delivered_time: Instant::now(),
226             recent_delivered_packet_sent_time: Instant::now(),
227             is_app_limited: false,
228             has_data: false,
229         };
230 
231         recovery
232             .delivery_rate
233             .on_packet_sent(&mut pkt_2, Instant::now());
234         std::thread::sleep(Duration::from_millis(50));
235         recovery
236             .delivery_rate
237             .on_packet_acked(&pkt_2, Instant::now());
238         recovery.delivery_rate.estimate();
239 
240         assert!(recovery.delivery_rate() > 0);
241     }
242 
243     #[test]
app_limited_check()244     fn app_limited_check() {
245         let config = Config::new(0xbabababa).unwrap();
246         let mut recvry = Recovery::new(&config);
247 
248         let mut pkt_1 = Sent {
249             pkt_num: 0,
250             frames: vec![],
251             time_sent: Instant::now(),
252             time_acked: None,
253             time_lost: None,
254             size: 1200,
255             ack_eliciting: true,
256             in_flight: true,
257             delivered: 0,
258             delivered_time: Instant::now(),
259             recent_delivered_packet_sent_time: Instant::now(),
260             is_app_limited: false,
261             has_data: false,
262         };
263 
264         recvry
265             .delivery_rate
266             .on_packet_sent(&mut pkt_1, Instant::now());
267         std::thread::sleep(Duration::from_millis(50));
268         recvry.delivery_rate.on_packet_acked(&pkt_1, Instant::now());
269 
270         let mut pkt_2 = Sent {
271             pkt_num: 1,
272             frames: vec![],
273             time_sent: Instant::now(),
274             time_acked: None,
275             time_lost: None,
276             size: 1200,
277             ack_eliciting: true,
278             in_flight: true,
279             delivered: 0,
280             delivered_time: Instant::now(),
281             recent_delivered_packet_sent_time: Instant::now(),
282             is_app_limited: false,
283             has_data: false,
284         };
285 
286         recvry.app_limited = true;
287         recvry
288             .delivery_rate
289             .check_app_limited(recvry.bytes_in_flight);
290         recvry
291             .delivery_rate
292             .on_packet_sent(&mut pkt_2, Instant::now());
293         std::thread::sleep(Duration::from_millis(50));
294         recvry.delivery_rate.on_packet_acked(&pkt_2, Instant::now());
295         recvry.delivery_rate.estimate();
296 
297         assert_eq!(recvry.delivery_rate.app_limited_at_pkt, 0);
298     }
299 }
300