1 // Copyright (C) 2020, Cloudflare, Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright notice, 9 // this list of conditions and the following disclaimer. 10 // 11 // * Redistributions in binary form must reproduce the above copyright 12 // notice, this list of conditions and the following disclaimer in the 13 // documentation and/or other materials provided with the distribution. 14 // 15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS 16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, 17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR 19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, 20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF 23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING 24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27 //! Delivery rate estimation. 28 //! 29 //! This implements the algorithm for estimating delivery rate as described in 30 //! <https://tools.ietf.org/html/draft-cheng-iccrg-delivery-rate-estimation-00> 31 32 use std::cmp; 33 34 use std::time::Duration; 35 use std::time::Instant; 36 37 use crate::recovery::Sent; 38 39 #[derive(Default)] 40 pub struct Rate { 41 delivered: usize, 42 43 delivered_time: Option<Instant>, 44 45 recent_delivered_packet_sent_time: Option<Instant>, 46 47 app_limited_at_pkt: usize, 48 49 rate_sample: RateSample, 50 } 51 52 impl Rate { on_packet_sent(&mut self, pkt: &mut Sent, now: Instant)53 pub fn on_packet_sent(&mut self, pkt: &mut Sent, now: Instant) { 54 if self.delivered_time.is_none() { 55 self.delivered_time = Some(now); 56 } 57 58 if self.recent_delivered_packet_sent_time.is_none() { 59 self.recent_delivered_packet_sent_time = Some(now); 60 } 61 62 pkt.delivered = self.delivered; 63 pkt.delivered_time = self.delivered_time.unwrap(); 64 65 pkt.recent_delivered_packet_sent_time = 66 self.recent_delivered_packet_sent_time.unwrap(); 67 68 pkt.is_app_limited = self.app_limited_at_pkt > 0; 69 } 70 on_packet_acked(&mut self, pkt: &Sent, now: Instant)71 pub fn on_packet_acked(&mut self, pkt: &Sent, now: Instant) { 72 self.rate_sample.prior_time = Some(pkt.delivered_time); 73 74 self.delivered += pkt.size; 75 self.delivered_time = Some(now); 76 77 if pkt.delivered > self.rate_sample.prior_delivered { 78 self.rate_sample.prior_delivered = pkt.delivered; 79 80 self.rate_sample.send_elapsed = 81 pkt.time_sent - pkt.recent_delivered_packet_sent_time; 82 83 self.rate_sample.ack_elapsed = self 84 .delivered_time 85 .unwrap() 86 .duration_since(pkt.delivered_time); 87 88 self.recent_delivered_packet_sent_time = Some(pkt.time_sent); 89 } 90 } 91 estimate(&mut self)92 pub fn estimate(&mut self) { 93 if (self.app_limited_at_pkt > 0) && 94 (self.delivered > self.app_limited_at_pkt) 95 { 96 self.app_limited_at_pkt = 0; 97 } 98 99 match self.rate_sample.prior_time { 100 Some(_) => { 101 self.rate_sample.delivered = 102 self.delivered - self.rate_sample.prior_delivered; 103 104 self.rate_sample.interval = cmp::max( 105 self.rate_sample.send_elapsed, 106 self.rate_sample.ack_elapsed, 107 ); 108 }, 109 None => return, 110 } 111 112 if self.rate_sample.interval.as_secs_f64() > 0.0 { 113 self.rate_sample.delivery_rate = (self.rate_sample.delivered as f64 / 114 self.rate_sample.interval.as_secs_f64()) 115 as u64; 116 } 117 } 118 check_app_limited(&mut self, bytes_in_flight: usize)119 pub fn check_app_limited(&mut self, bytes_in_flight: usize) { 120 let limited = self.delivered + bytes_in_flight; 121 self.app_limited_at_pkt = if limited > 0 { limited } else { 1 }; 122 } 123 delivery_rate(&self) -> u64124 pub fn delivery_rate(&self) -> u64 { 125 self.rate_sample.delivery_rate 126 } 127 } 128 129 impl std::fmt::Debug for Rate { fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result130 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 131 write!(f, "delivered={:?} ", self.delivered)?; 132 133 if let Some(t) = self.delivered_time { 134 write!(f, "delivered_time={:?} ", t.elapsed())?; 135 } 136 137 if let Some(t) = self.recent_delivered_packet_sent_time { 138 write!(f, "recent_delivered_packet_sent_time={:?} ", t.elapsed())?; 139 } 140 141 write!(f, "app_limited_at_pkt={:?} ", self.app_limited_at_pkt)?; 142 143 Ok(()) 144 } 145 } 146 147 #[derive(Default)] 148 struct RateSample { 149 delivery_rate: u64, 150 151 interval: Duration, 152 153 delivered: usize, 154 155 prior_delivered: usize, 156 157 prior_time: Option<Instant>, 158 159 send_elapsed: Duration, 160 161 ack_elapsed: Duration, 162 } 163 164 impl std::fmt::Debug for RateSample { fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result165 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 166 write!(f, "delivery_rate={:?} ", self.delivery_rate)?; 167 write!(f, "interval={:?} ", self.interval)?; 168 write!(f, "delivered={:?} ", self.delivered)?; 169 write!(f, "prior_delivered={:?} ", self.prior_delivered)?; 170 write!(f, "send_elapsed={:?} ", self.send_elapsed)?; 171 if let Some(t) = self.prior_time { 172 write!(f, "prior_time={:?} ", t.elapsed())?; 173 } 174 write!(f, "ack_elapsed={:?}", self.ack_elapsed)?; 175 176 Ok(()) 177 } 178 } 179 180 #[cfg(test)] 181 mod tests { 182 use super::*; 183 184 use crate::recovery::*; 185 186 #[test] rate_check()187 fn rate_check() { 188 let config = Config::new(0xbabababa).unwrap(); 189 let mut recovery = Recovery::new(&config); 190 191 let mut pkt_1 = Sent { 192 pkt_num: 0, 193 frames: vec![], 194 time_sent: Instant::now(), 195 time_acked: None, 196 time_lost: None, 197 size: 1200, 198 ack_eliciting: true, 199 in_flight: true, 200 delivered: 0, 201 delivered_time: Instant::now(), 202 recent_delivered_packet_sent_time: Instant::now(), 203 is_app_limited: false, 204 has_data: false, 205 }; 206 207 recovery 208 .delivery_rate 209 .on_packet_sent(&mut pkt_1, Instant::now()); 210 std::thread::sleep(Duration::from_millis(50)); 211 recovery 212 .delivery_rate 213 .on_packet_acked(&pkt_1, Instant::now()); 214 215 let mut pkt_2 = Sent { 216 pkt_num: 1, 217 frames: vec![], 218 time_sent: Instant::now(), 219 time_acked: None, 220 time_lost: None, 221 size: 1200, 222 ack_eliciting: true, 223 in_flight: true, 224 delivered: 0, 225 delivered_time: Instant::now(), 226 recent_delivered_packet_sent_time: Instant::now(), 227 is_app_limited: false, 228 has_data: false, 229 }; 230 231 recovery 232 .delivery_rate 233 .on_packet_sent(&mut pkt_2, Instant::now()); 234 std::thread::sleep(Duration::from_millis(50)); 235 recovery 236 .delivery_rate 237 .on_packet_acked(&pkt_2, Instant::now()); 238 recovery.delivery_rate.estimate(); 239 240 assert!(recovery.delivery_rate() > 0); 241 } 242 243 #[test] app_limited_check()244 fn app_limited_check() { 245 let config = Config::new(0xbabababa).unwrap(); 246 let mut recvry = Recovery::new(&config); 247 248 let mut pkt_1 = Sent { 249 pkt_num: 0, 250 frames: vec![], 251 time_sent: Instant::now(), 252 time_acked: None, 253 time_lost: None, 254 size: 1200, 255 ack_eliciting: true, 256 in_flight: true, 257 delivered: 0, 258 delivered_time: Instant::now(), 259 recent_delivered_packet_sent_time: Instant::now(), 260 is_app_limited: false, 261 has_data: false, 262 }; 263 264 recvry 265 .delivery_rate 266 .on_packet_sent(&mut pkt_1, Instant::now()); 267 std::thread::sleep(Duration::from_millis(50)); 268 recvry.delivery_rate.on_packet_acked(&pkt_1, Instant::now()); 269 270 let mut pkt_2 = Sent { 271 pkt_num: 1, 272 frames: vec![], 273 time_sent: Instant::now(), 274 time_acked: None, 275 time_lost: None, 276 size: 1200, 277 ack_eliciting: true, 278 in_flight: true, 279 delivered: 0, 280 delivered_time: Instant::now(), 281 recent_delivered_packet_sent_time: Instant::now(), 282 is_app_limited: false, 283 has_data: false, 284 }; 285 286 recvry.app_limited = true; 287 recvry 288 .delivery_rate 289 .check_app_limited(recvry.bytes_in_flight); 290 recvry 291 .delivery_rate 292 .on_packet_sent(&mut pkt_2, Instant::now()); 293 std::thread::sleep(Duration::from_millis(50)); 294 recvry.delivery_rate.on_packet_acked(&pkt_2, Instant::now()); 295 recvry.delivery_rate.estimate(); 296 297 assert_eq!(recvry.delivery_rate.app_limited_at_pkt, 0); 298 } 299 } 300