1 // Copyright (C) 2020, Cloudflare, Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright notice, 9 // this list of conditions and the following disclaimer. 10 // 11 // * Redistributions in binary form must reproduce the above copyright 12 // notice, this list of conditions and the following disclaimer in the 13 // documentation and/or other materials provided with the distribution. 14 // 15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27 //! Delivery rate estimation. 28 //! 29 //! This implements the algorithm for estimating delivery rate as described in 30 //! https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-00 31 32 use std::cmp; 33 34 use std::time::Duration; 35 use std::time::Instant; 36 37 use crate::recovery::Sent; 38 39 #[derive(Default)] 40 pub struct Rate { 41 delivered: usize, 42 43 delivered_time: Option<Instant>, 44 45 recent_delivered_packet_sent_time: Option<Instant>, 46 47 app_limited_at_pkt: usize, 48 49 rate_sample: RateSample, 50 } 51 52 impl Rate { on_packet_sent(&mut self, pkt: &mut Sent, now: Instant)53 pub fn on_packet_sent(&mut self, pkt: &mut Sent, now: Instant) { 54 if self.delivered_time.is_none() { 55 self.delivered_time = Some(now); 56 } 57 58 if self.recent_delivered_packet_sent_time.is_none() { 59 self.recent_delivered_packet_sent_time = Some(now); 60 } 61 62 pkt.delivered = self.delivered; 63 pkt.delivered_time = self.delivered_time.unwrap(); 64 65 pkt.recent_delivered_packet_sent_time = 66 self.recent_delivered_packet_sent_time.unwrap(); 67 68 pkt.is_app_limited = self.app_limited_at_pkt > 0; 69 } 70 on_packet_acked(&mut self, pkt: &Sent, now: Instant)71 pub fn on_packet_acked(&mut self, pkt: &Sent, now: Instant) { 72 self.rate_sample.prior_time = Some(pkt.delivered_time); 73 74 self.delivered += pkt.size; 75 self.delivered_time = Some(now); 76 77 if pkt.delivered > self.rate_sample.prior_delivered { 78 self.rate_sample.prior_delivered = pkt.delivered; 79 self.rate_sample.is_app_limited = pkt.is_app_limited; 80 81 self.rate_sample.send_elapsed = 82 pkt.time_sent - pkt.recent_delivered_packet_sent_time; 83 84 self.rate_sample.ack_elapsed = self 85 .delivered_time 86 .unwrap() 87 .duration_since(pkt.delivered_time); 88 89 self.recent_delivered_packet_sent_time = Some(pkt.time_sent); 90 } 91 } 92 estimate(&mut self)93 pub fn estimate(&mut self) { 94 if (self.app_limited_at_pkt > 0) && 95 (self.delivered > self.app_limited_at_pkt) 96 { 97 self.app_limited_at_pkt = 0; 98 } 99 100 match self.rate_sample.prior_time { 101 Some(_) => { 102 self.rate_sample.delivered = 103 self.delivered - self.rate_sample.prior_delivered; 104 105 self.rate_sample.interval = cmp::max( 106 self.rate_sample.send_elapsed, 107 self.rate_sample.ack_elapsed, 108 ); 109 }, 110 None => return, 111 } 112 113 if self.rate_sample.interval.as_secs_f64() > 0.0 { 114 self.rate_sample.delivery_rate = (self.rate_sample.delivered as f64 / 115 self.rate_sample.interval.as_secs_f64()) 116 as u64; 117 } 118 } 119 check_app_limited(&mut self, bytes_in_flight: usize)120 pub fn check_app_limited(&mut self, bytes_in_flight: usize) { 121 let limited = self.delivered + bytes_in_flight; 122 self.app_limited_at_pkt = if limited > 0 { limited } else { 1 }; 123 } 124 delivery_rate(&self) -> u64125 pub fn delivery_rate(&self) -> u64 { 126 self.rate_sample.delivery_rate 127 } 128 } 129 130 impl std::fmt::Debug for Rate { fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result131 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 132 write!(f, "delivered={:?} ", self.delivered)?; 133 134 if let Some(t) = self.delivered_time { 135 write!(f, "delivered_time={:?} ", t.elapsed())?; 136 } 137 138 if let Some(t) = self.recent_delivered_packet_sent_time { 139 write!(f, "recent_delivered_packet_sent_time={:?} ", t.elapsed())?; 140 } 141 142 write!(f, "app_limited_at_pkt={:?} ", self.app_limited_at_pkt)?; 143 144 Ok(()) 145 } 146 } 147 148 #[derive(Default)] 149 struct RateSample { 150 delivery_rate: u64, 151 152 is_app_limited: bool, 153 154 interval: Duration, 155 156 delivered: usize, 157 158 prior_delivered: usize, 159 160 prior_time: Option<Instant>, 161 162 send_elapsed: Duration, 163 164 ack_elapsed: Duration, 165 } 166 167 impl std::fmt::Debug for RateSample { fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result168 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 169 write!(f, "delivery_rate={:?} ", self.delivery_rate)?; 170 write!(f, "interval={:?} ", self.interval)?; 171 write!(f, "delivered={:?} ", self.delivered)?; 172 write!(f, "prior_delivered={:?} ", self.prior_delivered)?; 173 write!(f, "send_elapsed={:?} ", self.send_elapsed)?; 174 if let Some(t) = self.prior_time { 175 write!(f, "prior_time={:?} ", t.elapsed())?; 176 } 177 write!(f, "ack_elapsed={:?}", self.ack_elapsed)?; 178 179 Ok(()) 180 } 181 } 182 183 #[cfg(test)] 184 mod tests { 185 use super::*; 186 187 use crate::recovery::*; 188 189 #[test] rate_check()190 fn rate_check() { 191 let config = Config::new(0xbabababa).unwrap(); 192 let mut recovery = Recovery::new(&config); 193 194 let mut pkt_1 = Sent { 195 pkt_num: 0, 196 frames: vec![], 197 time_sent: Instant::now(), 198 time_acked: None, 199 time_lost: None, 200 size: 1200, 201 ack_eliciting: true, 202 in_flight: true, 203 delivered: 0, 204 delivered_time: Instant::now(), 205 recent_delivered_packet_sent_time: Instant::now(), 206 is_app_limited: false, 207 has_data: false, 208 }; 209 210 recovery 211 .delivery_rate 212 .on_packet_sent(&mut pkt_1, Instant::now()); 213 std::thread::sleep(Duration::from_millis(50)); 214 recovery 215 .delivery_rate 216 .on_packet_acked(&pkt_1, Instant::now()); 217 218 let mut pkt_2 = Sent { 219 pkt_num: 1, 220 frames: vec![], 221 time_sent: Instant::now(), 222 time_acked: None, 223 time_lost: None, 224 size: 1200, 225 ack_eliciting: true, 226 in_flight: true, 227 delivered: 0, 228 delivered_time: Instant::now(), 229 recent_delivered_packet_sent_time: Instant::now(), 230 is_app_limited: false, 231 has_data: false, 232 }; 233 234 recovery 235 .delivery_rate 236 .on_packet_sent(&mut pkt_2, Instant::now()); 237 std::thread::sleep(Duration::from_millis(50)); 238 recovery 239 .delivery_rate 240 .on_packet_acked(&pkt_2, Instant::now()); 241 recovery.delivery_rate.estimate(); 242 243 assert!(recovery.delivery_rate() > 0); 244 } 245 246 #[test] app_limited_check()247 fn app_limited_check() { 248 let config = Config::new(0xbabababa).unwrap(); 249 let mut recvry = Recovery::new(&config); 250 251 let mut pkt_1 = Sent { 252 pkt_num: 0, 253 frames: vec![], 254 time_sent: Instant::now(), 255 time_acked: None, 256 time_lost: None, 257 size: 1200, 258 ack_eliciting: true, 259 in_flight: true, 260 delivered: 0, 261 delivered_time: Instant::now(), 262 recent_delivered_packet_sent_time: Instant::now(), 263 is_app_limited: false, 264 has_data: false, 265 }; 266 267 recvry 268 .delivery_rate 269 .on_packet_sent(&mut pkt_1, Instant::now()); 270 std::thread::sleep(Duration::from_millis(50)); 271 recvry.delivery_rate.on_packet_acked(&pkt_1, Instant::now()); 272 273 let mut pkt_2 = Sent { 274 pkt_num: 1, 275 frames: vec![], 276 time_sent: Instant::now(), 277 time_acked: None, 278 time_lost: None, 279 size: 1200, 280 ack_eliciting: true, 281 in_flight: true, 282 delivered: 0, 283 delivered_time: Instant::now(), 284 recent_delivered_packet_sent_time: Instant::now(), 285 is_app_limited: false, 286 has_data: false, 287 }; 288 289 recvry.app_limited = true; 290 recvry 291 .delivery_rate 292 .check_app_limited(recvry.bytes_in_flight); 293 recvry 294 .delivery_rate 295 .on_packet_sent(&mut pkt_2, Instant::now()); 296 std::thread::sleep(Duration::from_millis(50)); 297 recvry.delivery_rate.on_packet_acked(&pkt_2, Instant::now()); 298 recvry.delivery_rate.estimate(); 299 300 assert_eq!(recvry.delivery_rate.app_limited_at_pkt, 0); 301 } 302 } 303