• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use std::cmp;
28 
29 use std::str::FromStr;
30 
31 use std::time::Duration;
32 use std::time::Instant;
33 
34 use std::collections::VecDeque;
35 
36 use crate::Config;
37 use crate::Error;
38 use crate::Result;
39 
40 use crate::frame;
41 use crate::minmax;
42 use crate::packet;
43 use crate::ranges;
44 
45 // Loss Recovery
46 const PACKET_THRESHOLD: u64 = 3;
47 
48 const TIME_THRESHOLD: f64 = 9.0 / 8.0;
49 
50 const GRANULARITY: Duration = Duration::from_millis(1);
51 
52 const INITIAL_RTT: Duration = Duration::from_millis(333);
53 
54 const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;
55 
56 const RTT_WINDOW: Duration = Duration::from_secs(300);
57 
58 const MAX_PTO_PROBES_COUNT: usize = 2;
59 
60 // Congestion Control
61 const INITIAL_WINDOW_PACKETS: usize = 10;
62 
63 const MINIMUM_WINDOW_PACKETS: usize = 2;
64 
65 const LOSS_REDUCTION_FACTOR: f64 = 0.5;
66 
67 pub struct Recovery {
68     loss_detection_timer: Option<Instant>,
69 
70     pto_count: u32,
71 
72     time_of_last_sent_ack_eliciting_pkt: [Option<Instant>; packet::EPOCH_COUNT],
73 
74     largest_acked_pkt: [u64; packet::EPOCH_COUNT],
75 
76     largest_sent_pkt: [u64; packet::EPOCH_COUNT],
77 
78     latest_rtt: Duration,
79 
80     smoothed_rtt: Option<Duration>,
81 
82     rttvar: Duration,
83 
84     minmax_filter: minmax::Minmax<Duration>,
85 
86     min_rtt: Duration,
87 
88     pub max_ack_delay: Duration,
89 
90     loss_time: [Option<Instant>; packet::EPOCH_COUNT],
91 
92     sent: [VecDeque<Sent>; packet::EPOCH_COUNT],
93 
94     pub lost: [Vec<frame::Frame>; packet::EPOCH_COUNT],
95 
96     pub acked: [Vec<frame::Frame>; packet::EPOCH_COUNT],
97 
98     pub lost_count: usize,
99 
100     pub loss_probes: [usize; packet::EPOCH_COUNT],
101 
102     in_flight_count: [usize; packet::EPOCH_COUNT],
103 
104     app_limited: bool,
105 
106     delivery_rate: delivery_rate::Rate,
107 
108     // Congestion control.
109     cc_ops: &'static CongestionControlOps,
110 
111     congestion_window: usize,
112 
113     bytes_in_flight: usize,
114 
115     ssthresh: usize,
116 
117     bytes_acked_sl: usize,
118 
119     bytes_acked_ca: usize,
120 
121     bytes_sent: usize,
122 
123     congestion_recovery_start_time: Option<Instant>,
124 
125     max_datagram_size: usize,
126 
127     cubic_state: cubic::State,
128 
129     // HyStart++.
130     hystart: hystart::Hystart,
131 
132     // Pacing.
133     pacing_rate: u64,
134 
135     last_packet_scheduled_time: Option<Instant>,
136 }
137 
138 impl Recovery {
new(config: &Config) -> Self139     pub fn new(config: &Config) -> Self {
140         Recovery {
141             loss_detection_timer: None,
142 
143             pto_count: 0,
144 
145             time_of_last_sent_ack_eliciting_pkt: [None; packet::EPOCH_COUNT],
146 
147             largest_acked_pkt: [std::u64::MAX; packet::EPOCH_COUNT],
148 
149             largest_sent_pkt: [0; packet::EPOCH_COUNT],
150 
151             latest_rtt: Duration::new(0, 0),
152 
153             // This field should be initialized to `INITIAL_RTT` for the initial
154             // PTO calculation, but it also needs to be an `Option` to track
155             // whether any RTT sample was received, so the initial value is
156             // handled by the `rtt()` method instead.
157             smoothed_rtt: None,
158 
159             minmax_filter: minmax::Minmax::new(Duration::new(0, 0)),
160 
161             min_rtt: Duration::new(0, 0),
162 
163             rttvar: INITIAL_RTT / 2,
164 
165             max_ack_delay: Duration::new(0, 0),
166 
167             loss_time: [None; packet::EPOCH_COUNT],
168 
169             sent: [VecDeque::new(), VecDeque::new(), VecDeque::new()],
170 
171             lost: [Vec::new(), Vec::new(), Vec::new()],
172 
173             acked: [Vec::new(), Vec::new(), Vec::new()],
174 
175             lost_count: 0,
176 
177             loss_probes: [0; packet::EPOCH_COUNT],
178 
179             in_flight_count: [0; packet::EPOCH_COUNT],
180 
181             congestion_window: config.max_send_udp_payload_size *
182                 INITIAL_WINDOW_PACKETS,
183 
184             bytes_in_flight: 0,
185 
186             ssthresh: std::usize::MAX,
187 
188             bytes_acked_sl: 0,
189 
190             bytes_acked_ca: 0,
191 
192             bytes_sent: 0,
193 
194             congestion_recovery_start_time: None,
195 
196             max_datagram_size: config.max_send_udp_payload_size,
197 
198             cc_ops: config.cc_algorithm.into(),
199 
200             delivery_rate: delivery_rate::Rate::default(),
201 
202             cubic_state: cubic::State::default(),
203 
204             app_limited: false,
205 
206             hystart: hystart::Hystart::new(config.hystart),
207 
208             pacing_rate: 0,
209 
210             last_packet_scheduled_time: None,
211         }
212     }
213 
on_packet_sent( &mut self, mut pkt: Sent, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )214     pub fn on_packet_sent(
215         &mut self, mut pkt: Sent, epoch: packet::Epoch,
216         handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
217     ) {
218         let ack_eliciting = pkt.ack_eliciting;
219         let in_flight = pkt.in_flight;
220         let sent_bytes = pkt.size;
221         let pkt_num = pkt.pkt_num;
222 
223         self.delivery_rate.on_packet_sent(&mut pkt, now);
224 
225         self.largest_sent_pkt[epoch] =
226             cmp::max(self.largest_sent_pkt[epoch], pkt_num);
227 
228         self.sent[epoch].push_back(pkt);
229 
230         if in_flight {
231             if ack_eliciting {
232                 self.time_of_last_sent_ack_eliciting_pkt[epoch] = Some(now);
233             }
234 
235             self.in_flight_count[epoch] += 1;
236 
237             self.app_limited =
238                 (self.bytes_in_flight + sent_bytes) < self.congestion_window;
239 
240             self.on_packet_sent_cc(sent_bytes, now);
241 
242             self.set_loss_detection_timer(handshake_status, now);
243         }
244 
245         // HyStart++: Start of the round in a slow start.
246         if self.hystart.enabled() &&
247             epoch == packet::EPOCH_APPLICATION &&
248             self.congestion_window < self.ssthresh
249         {
250             self.hystart.start_round(pkt_num);
251         }
252 
253         // Pacing: Set the pacing rate if CC doesn't do its own.
254         if !(self.cc_ops.has_custom_pacing)() {
255             if let Some(srtt) = self.smoothed_rtt {
256                 let rate = (self.congestion_window as u64 * 1000000) /
257                     srtt.as_micros() as u64;
258                 self.set_pacing_rate(rate);
259             }
260         }
261 
262         self.schedule_next_packet(epoch, now, sent_bytes);
263 
264         self.bytes_sent += sent_bytes;
265         trace!("{} {:?}", trace_id, self);
266     }
267 
on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant)268     fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
269         (self.cc_ops.on_packet_sent)(self, sent_bytes, now);
270     }
271 
set_pacing_rate(&mut self, rate: u64)272     pub fn set_pacing_rate(&mut self, rate: u64) {
273         if rate != 0 {
274             self.pacing_rate = rate;
275         }
276     }
277 
get_packet_send_time(&self) -> Option<Instant>278     pub fn get_packet_send_time(&self) -> Option<Instant> {
279         self.last_packet_scheduled_time
280     }
281 
schedule_next_packet( &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, )282     fn schedule_next_packet(
283         &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
284     ) {
285         // Don't pace in any of these cases:
286         //   * Packet epoch is not EPOCH_APPLICATION.
287         //   * Packet contains only ACK frames.
288         //   * The start of the connection.
289         if epoch != packet::EPOCH_APPLICATION ||
290             packet_size == 0 ||
291             self.bytes_sent <= self.congestion_window ||
292             self.pacing_rate == 0
293         {
294             self.last_packet_scheduled_time = Some(now);
295             return;
296         }
297 
298         self.last_packet_scheduled_time = match self.last_packet_scheduled_time {
299             Some(last_scheduled_time) => {
300                 let interval: u64 =
301                     (packet_size as u64 * 1000000) / self.pacing_rate;
302                 let interval = Duration::from_micros(interval);
303                 let next_schedule_time = last_scheduled_time + interval;
304                 Some(cmp::max(now, next_schedule_time))
305             },
306 
307             None => Some(now),
308         };
309     }
310 
on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> Result<()>311     pub fn on_ack_received(
312         &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
313         epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
314         trace_id: &str,
315     ) -> Result<()> {
316         let largest_acked = ranges.last().unwrap();
317 
318         // If the largest packet number acked exceeds any packet number we have
319         // sent, then the ACK is obviously invalid, so there's no need to
320         // continue further.
321         if largest_acked > self.largest_sent_pkt[epoch] {
322             if cfg!(feature = "fuzzing") {
323                 return Ok(());
324             }
325 
326             return Err(Error::InvalidPacket);
327         }
328 
329         if self.largest_acked_pkt[epoch] == std::u64::MAX {
330             self.largest_acked_pkt[epoch] = largest_acked;
331         } else {
332             self.largest_acked_pkt[epoch] =
333                 cmp::max(self.largest_acked_pkt[epoch], largest_acked);
334         }
335 
336         let mut has_ack_eliciting = false;
337 
338         let mut largest_newly_acked_pkt_num = 0;
339         let mut largest_newly_acked_sent_time = now;
340 
341         let mut newly_acked = Vec::new();
342 
343         // Detect and mark acked packets, without removing them from the sent
344         // packets list.
345         for r in ranges.iter() {
346             let lowest_acked = r.start;
347             let largest_acked = r.end - 1;
348 
349             let unacked_iter = self.sent[epoch]
350                 .iter_mut()
351                 // Skip packets that precede the lowest acked packet in the block.
352                 .skip_while(|p| p.pkt_num < lowest_acked)
353                 // Skip packets that follow the largest acked packet in the block.
354                 .take_while(|p| p.pkt_num <= largest_acked)
355                 // Skip packets that have already been acked or lost.
356                 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
357 
358             for unacked in unacked_iter {
359                 unacked.time_acked = Some(now);
360 
361                 if unacked.ack_eliciting {
362                     has_ack_eliciting = true;
363                 }
364 
365                 largest_newly_acked_pkt_num = unacked.pkt_num;
366                 largest_newly_acked_sent_time = unacked.time_sent;
367 
368                 self.acked[epoch].append(&mut unacked.frames);
369 
370                 if unacked.in_flight {
371                     self.in_flight_count[epoch] =
372                         self.in_flight_count[epoch].saturating_sub(1);
373 
374                     self.delivery_rate.on_packet_acked(&unacked, now);
375                 }
376 
377                 newly_acked.push(Acked {
378                     pkt_num: unacked.pkt_num,
379 
380                     time_sent: unacked.time_sent,
381 
382                     size: unacked.size,
383                 });
384 
385                 trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
386             }
387         }
388 
389         self.delivery_rate.estimate();
390 
391         if newly_acked.is_empty() {
392             return Ok(());
393         }
394 
395         if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
396             let latest_rtt = now - largest_newly_acked_sent_time;
397 
398             let ack_delay = if epoch == packet::EPOCH_APPLICATION {
399                 Duration::from_micros(ack_delay)
400             } else {
401                 Duration::from_micros(0)
402             };
403 
404             self.update_rtt(latest_rtt, ack_delay, now);
405         }
406 
407         // Detect and mark lost packets without removing them from the sent
408         // packets list.
409         self.detect_lost_packets(epoch, now, trace_id);
410 
411         self.on_packets_acked(newly_acked, epoch, now);
412 
413         self.pto_count = 0;
414 
415         self.set_loss_detection_timer(handshake_status, now);
416 
417         self.drain_packets(epoch);
418 
419         Ok(())
420     }
421 
on_loss_detection_timeout( &mut self, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )422     pub fn on_loss_detection_timeout(
423         &mut self, handshake_status: HandshakeStatus, now: Instant,
424         trace_id: &str,
425     ) {
426         let (earliest_loss_time, epoch) = self.loss_time_and_space();
427 
428         if earliest_loss_time.is_some() {
429             // Time threshold loss detection.
430             self.detect_lost_packets(epoch, now, trace_id);
431 
432             self.set_loss_detection_timer(handshake_status, now);
433 
434             trace!("{} {:?}", trace_id, self);
435             return;
436         }
437 
438         let epoch = if self.bytes_in_flight > 0 {
439             // Send new data if available, else retransmit old data. If neither
440             // is available, send a single PING frame.
441             let (_, e) = self.pto_time_and_space(handshake_status, now);
442 
443             e
444         } else {
445             // Client sends an anti-deadlock packet: Initial is padded to earn
446             // more anti-amplification credit, a Handshake packet proves address
447             // ownership.
448             if handshake_status.has_handshake_keys {
449                 packet::EPOCH_HANDSHAKE
450             } else {
451                 packet::EPOCH_INITIAL
452             }
453         };
454 
455         self.pto_count += 1;
456 
457         self.loss_probes[epoch] =
458             cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
459 
460         let unacked_iter = self.sent[epoch]
461             .iter_mut()
462             // Skip packets that have already been acked or lost, and packets
463             // that don't contain either CRYPTO or STREAM frames.
464             .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
465             // Only return as many packets as the number of probe packets that
466             // will be sent.
467             .take(self.loss_probes[epoch]);
468 
469         // Retransmit the frames from the oldest sent packets on PTO. However
470         // the packets are not actually declared lost (so there is no effect to
471         // congestion control), we just reschedule the data they carried.
472         //
473         // This will also trigger sending an ACK and retransmitting frames like
474         // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
475         // to CRYPTO and STREAM, if the original packet carried them.
476         for unacked in unacked_iter {
477             self.lost[epoch].extend_from_slice(&unacked.frames);
478         }
479 
480         self.set_loss_detection_timer(handshake_status, now);
481 
482         trace!("{} {:?}", trace_id, self);
483     }
484 
on_pkt_num_space_discarded( &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, )485     pub fn on_pkt_num_space_discarded(
486         &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
487         now: Instant,
488     ) {
489         let unacked_bytes = self.sent[epoch]
490             .iter()
491             .filter(|p| {
492                 p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
493             })
494             .fold(0, |acc, p| acc + p.size);
495 
496         self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked_bytes);
497 
498         self.sent[epoch].clear();
499         self.lost[epoch].clear();
500         self.acked[epoch].clear();
501 
502         self.time_of_last_sent_ack_eliciting_pkt[epoch] = None;
503         self.loss_time[epoch] = None;
504         self.loss_probes[epoch] = 0;
505         self.in_flight_count[epoch] = 0;
506 
507         self.set_loss_detection_timer(handshake_status, now);
508     }
509 
loss_detection_timer(&self) -> Option<Instant>510     pub fn loss_detection_timer(&self) -> Option<Instant> {
511         self.loss_detection_timer
512     }
513 
cwnd(&self) -> usize514     pub fn cwnd(&self) -> usize {
515         self.congestion_window
516     }
517 
cwnd_available(&self) -> usize518     pub fn cwnd_available(&self) -> usize {
519         // Ignore cwnd when sending probe packets.
520         if self.loss_probes.iter().any(|&x| x > 0) {
521             return std::usize::MAX;
522         }
523 
524         self.congestion_window.saturating_sub(self.bytes_in_flight)
525     }
526 
rtt(&self) -> Duration527     pub fn rtt(&self) -> Duration {
528         self.smoothed_rtt.unwrap_or(INITIAL_RTT)
529     }
530 
pto(&self) -> Duration531     pub fn pto(&self) -> Duration {
532         self.rtt() + cmp::max(self.rttvar * 4, GRANULARITY)
533     }
534 
delivery_rate(&self) -> u64535     pub fn delivery_rate(&self) -> u64 {
536         self.delivery_rate.delivery_rate()
537     }
538 
max_datagram_size(&self) -> usize539     pub fn max_datagram_size(&self) -> usize {
540         self.max_datagram_size
541     }
542 
update_max_datagram_size(&mut self, new_max_datagram_size: usize)543     pub fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
544         let max_datagram_size =
545             cmp::min(self.max_datagram_size, new_max_datagram_size);
546 
547         // Congestion Window is updated only when it's not updated already.
548         if self.congestion_window ==
549             self.max_datagram_size * INITIAL_WINDOW_PACKETS
550         {
551             self.congestion_window = max_datagram_size * INITIAL_WINDOW_PACKETS;
552         }
553 
554         self.max_datagram_size = max_datagram_size;
555     }
556 
update_rtt( &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant, )557     fn update_rtt(
558         &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant,
559     ) {
560         self.latest_rtt = latest_rtt;
561 
562         match self.smoothed_rtt {
563             // First RTT sample.
564             None => {
565                 self.min_rtt = self.minmax_filter.reset(now, latest_rtt);
566 
567                 self.smoothed_rtt = Some(latest_rtt);
568 
569                 self.rttvar = latest_rtt / 2;
570             },
571 
572             Some(srtt) => {
573                 self.min_rtt =
574                     self.minmax_filter.running_min(RTT_WINDOW, now, latest_rtt);
575 
576                 let ack_delay = cmp::min(self.max_ack_delay, ack_delay);
577 
578                 // Adjust for ack delay if plausible.
579                 let adjusted_rtt = if latest_rtt > self.min_rtt + ack_delay {
580                     latest_rtt - ack_delay
581                 } else {
582                     latest_rtt
583                 };
584 
585                 self.rttvar = self.rttvar.mul_f64(3.0 / 4.0) +
586                     sub_abs(srtt, adjusted_rtt).mul_f64(1.0 / 4.0);
587 
588                 self.smoothed_rtt = Some(
589                     srtt.mul_f64(7.0 / 8.0) + adjusted_rtt.mul_f64(1.0 / 8.0),
590                 );
591             },
592         }
593     }
594 
loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch)595     fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
596         let mut epoch = packet::EPOCH_INITIAL;
597         let mut time = self.loss_time[epoch];
598 
599         // Iterate over all packet number spaces starting from Handshake.
600         #[allow(clippy::needless_range_loop)]
601         for e in packet::EPOCH_HANDSHAKE..packet::EPOCH_COUNT {
602             let new_time = self.loss_time[e];
603 
604             if time.is_none() || new_time < time {
605                 time = new_time;
606                 epoch = e;
607             }
608         }
609 
610         (time, epoch)
611     }
612 
pto_time_and_space( &self, handshake_status: HandshakeStatus, now: Instant, ) -> (Option<Instant>, packet::Epoch)613     fn pto_time_and_space(
614         &self, handshake_status: HandshakeStatus, now: Instant,
615     ) -> (Option<Instant>, packet::Epoch) {
616         let mut duration = self.pto() * 2_u32.pow(self.pto_count);
617 
618         // Arm PTO from now when there are no inflight packets.
619         if self.bytes_in_flight == 0 {
620             if handshake_status.has_handshake_keys {
621                 return (Some(now + duration), packet::EPOCH_HANDSHAKE);
622             } else {
623                 return (Some(now + duration), packet::EPOCH_INITIAL);
624             }
625         }
626 
627         let mut pto_timeout = None;
628         let mut pto_space = packet::EPOCH_INITIAL;
629 
630         // Iterate over all packet number spaces.
631         for e in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
632             if self.in_flight_count[e] == 0 {
633                 continue;
634             }
635 
636             if e == packet::EPOCH_APPLICATION {
637                 // Skip Application Data until handshake completes.
638                 if !handshake_status.completed {
639                     return (pto_timeout, pto_space);
640                 }
641 
642                 // Include max_ack_delay and backoff for Application Data.
643                 duration += self.max_ack_delay * 2_u32.pow(self.pto_count);
644             }
645 
646             let new_time =
647                 self.time_of_last_sent_ack_eliciting_pkt[e].map(|t| t + duration);
648 
649             if pto_timeout.is_none() || new_time < pto_timeout {
650                 pto_timeout = new_time;
651                 pto_space = e;
652             }
653         }
654 
655         (pto_timeout, pto_space)
656     }
657 
set_loss_detection_timer( &mut self, handshake_status: HandshakeStatus, now: Instant, )658     fn set_loss_detection_timer(
659         &mut self, handshake_status: HandshakeStatus, now: Instant,
660     ) {
661         let (earliest_loss_time, _) = self.loss_time_and_space();
662 
663         if earliest_loss_time.is_some() {
664             // Time threshold loss detection.
665             self.loss_detection_timer = earliest_loss_time;
666             return;
667         }
668 
669         if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
670             self.loss_detection_timer = None;
671             return;
672         }
673 
674         // PTO timer.
675         let (timeout, _) = self.pto_time_and_space(handshake_status, now);
676         self.loss_detection_timer = timeout;
677     }
678 
detect_lost_packets( &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str, )679     fn detect_lost_packets(
680         &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
681     ) {
682         let largest_acked = self.largest_acked_pkt[epoch];
683 
684         self.loss_time[epoch] = None;
685 
686         let loss_delay =
687             cmp::max(self.latest_rtt, self.rtt()).mul_f64(TIME_THRESHOLD);
688 
689         // Minimum time of kGranularity before packets are deemed lost.
690         let loss_delay = cmp::max(loss_delay, GRANULARITY);
691 
692         // Packets sent before this time are deemed lost.
693         let lost_send_time = now - loss_delay;
694 
695         let mut lost_bytes = 0;
696 
697         let mut largest_lost_pkt = None;
698 
699         let unacked_iter = self.sent[epoch]
700             .iter_mut()
701             // Skip packets that follow the largest acked packet.
702             .take_while(|p| p.pkt_num <= largest_acked)
703             // Skip packets that have already been acked or lost.
704             .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
705 
706         for unacked in unacked_iter {
707             // Mark packet as lost, or set time when it should be marked.
708             if unacked.time_sent <= lost_send_time ||
709                 largest_acked >= unacked.pkt_num + PACKET_THRESHOLD
710             {
711                 self.lost[epoch].append(&mut unacked.frames);
712 
713                 unacked.time_lost = Some(now);
714 
715                 if unacked.in_flight {
716                     lost_bytes += unacked.size;
717 
718                     // Frames have already been removed from the packet, so
719                     // cloning the whole packet should be relatively cheap.
720                     largest_lost_pkt = Some(unacked.clone());
721 
722                     self.in_flight_count[epoch] =
723                         self.in_flight_count[epoch].saturating_sub(1);
724 
725                     trace!(
726                         "{} packet {} lost on epoch {}",
727                         trace_id,
728                         unacked.pkt_num,
729                         epoch
730                     );
731                 }
732 
733                 self.lost_count += 1;
734             } else {
735                 let loss_time = match self.loss_time[epoch] {
736                     None => unacked.time_sent + loss_delay,
737 
738                     Some(loss_time) =>
739                         cmp::min(loss_time, unacked.time_sent + loss_delay),
740                 };
741 
742                 self.loss_time[epoch] = Some(loss_time);
743             }
744         }
745 
746         if let Some(pkt) = largest_lost_pkt {
747             self.on_packets_lost(lost_bytes, &pkt, epoch, now);
748         }
749 
750         self.drain_packets(epoch);
751     }
752 
drain_packets(&mut self, epoch: packet::Epoch)753     fn drain_packets(&mut self, epoch: packet::Epoch) {
754         let mut lowest_non_expired_pkt_index = self.sent[epoch].len();
755 
756         // In order to avoid removing elements from the middle of the list
757         // (which would require copying other elements to compact the list),
758         // we only remove a contiguous range of elements from the start of the
759         // list.
760         //
761         // This means that acked or lost elements coming after this will not
762         // be removed at this point, but their removal is delayed for a later
763         // time, once the gaps have been filled.
764 
765         // First, find the first element that is neither acked nor lost.
766         for (i, pkt) in self.sent[epoch].iter().enumerate() {
767             if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
768                 lowest_non_expired_pkt_index = i;
769                 break;
770             }
771         }
772 
773         // Then remove elements up to the previously found index.
774         self.sent[epoch].drain(..lowest_non_expired_pkt_index);
775     }
776 
on_packets_acked( &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant, )777     fn on_packets_acked(
778         &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant,
779     ) {
780         for pkt in acked {
781             (self.cc_ops.on_packet_acked)(self, &pkt, epoch, now);
782         }
783     }
784 
in_congestion_recovery(&self, sent_time: Instant) -> bool785     fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
786         match self.congestion_recovery_start_time {
787             Some(congestion_recovery_start_time) =>
788                 sent_time <= congestion_recovery_start_time,
789 
790             None => false,
791         }
792     }
793 
in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool794     fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool {
795         let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD;
796 
797         // TODO: properly detect persistent congestion
798         false
799     }
800 
on_packets_lost( &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, )801     fn on_packets_lost(
802         &mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
803         epoch: packet::Epoch, now: Instant,
804     ) {
805         self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
806 
807         self.congestion_event(largest_lost_pkt.time_sent, epoch, now);
808 
809         if self.in_persistent_congestion(largest_lost_pkt.pkt_num) {
810             self.collapse_cwnd();
811         }
812     }
813 
congestion_event( &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant, )814     fn congestion_event(
815         &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant,
816     ) {
817         if !self.in_congestion_recovery(time_sent) {
818             (self.cc_ops.checkpoint)(self);
819         }
820 
821         (self.cc_ops.congestion_event)(self, time_sent, epoch, now);
822     }
823 
collapse_cwnd(&mut self)824     fn collapse_cwnd(&mut self) {
825         (self.cc_ops.collapse_cwnd)(self);
826     }
827 
rate_check_app_limited(&mut self)828     pub fn rate_check_app_limited(&mut self) {
829         if self.app_limited {
830             self.delivery_rate.check_app_limited(self.bytes_in_flight)
831         }
832     }
833 
update_app_limited(&mut self, v: bool)834     pub fn update_app_limited(&mut self, v: bool) {
835         self.app_limited = v;
836     }
837 
app_limited(&mut self) -> bool838     pub fn app_limited(&mut self) -> bool {
839         self.app_limited
840     }
841 
842     #[cfg(feature = "qlog")]
to_qlog(&self) -> qlog::event::Event843     pub fn to_qlog(&self) -> qlog::event::Event {
844         // QVis can't use all these fields and they can be large.
845         qlog::event::Event::metrics_updated(
846             Some(self.min_rtt.as_millis() as u64),
847             Some(self.rtt().as_millis() as u64),
848             Some(self.latest_rtt.as_millis() as u64),
849             Some(self.rttvar.as_millis() as u64),
850             None, // delay
851             None, // probe_count
852             Some(self.cwnd() as u64),
853             Some(self.bytes_in_flight as u64),
854             None, // ssthresh
855             None, // packets_in_flight
856             None, // in_recovery
857             None, // pacing_rate
858         )
859     }
860 }
861 
862 /// Available congestion control algorithms.
863 ///
864 /// This enum provides currently available list of congestion control
865 /// algorithms.
866 #[derive(Debug, Copy, Clone, PartialEq)]
867 #[repr(C)]
868 pub enum CongestionControlAlgorithm {
869     /// Reno congestion control algorithm. `reno` in a string form.
870     Reno  = 0,
871     /// CUBIC congestion control algorithm (default). `cubic` in a string form.
872     CUBIC = 1,
873 }
874 
875 impl FromStr for CongestionControlAlgorithm {
876     type Err = crate::Error;
877 
878     /// Converts a string to `CongestionControlAlgorithm`.
879     ///
880     /// If `name` is not valid, `Error::CongestionControl` is returned.
from_str(name: &str) -> std::result::Result<Self, Self::Err>881     fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
882         match name {
883             "reno" => Ok(CongestionControlAlgorithm::Reno),
884             "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
885 
886             _ => Err(crate::Error::CongestionControl),
887         }
888     }
889 }
890 
891 pub struct CongestionControlOps {
892     pub on_packet_sent: fn(r: &mut Recovery, sent_bytes: usize, now: Instant),
893 
894     pub on_packet_acked:
895         fn(r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant),
896 
897     pub congestion_event: fn(
898         r: &mut Recovery,
899         time_sent: Instant,
900         epoch: packet::Epoch,
901         now: Instant,
902     ),
903 
904     pub collapse_cwnd: fn(r: &mut Recovery),
905 
906     pub checkpoint: fn(r: &mut Recovery),
907 
908     pub rollback: fn(r: &mut Recovery),
909 
910     pub has_custom_pacing: fn() -> bool,
911 }
912 
913 impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
from(algo: CongestionControlAlgorithm) -> Self914     fn from(algo: CongestionControlAlgorithm) -> Self {
915         match algo {
916             CongestionControlAlgorithm::Reno => &reno::RENO,
917             CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
918         }
919     }
920 }
921 
922 impl std::fmt::Debug for Recovery {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result923     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
924         match self.loss_detection_timer {
925             Some(v) => {
926                 let now = Instant::now();
927 
928                 if v > now {
929                     let d = v.duration_since(now);
930                     write!(f, "timer={:?} ", d)?;
931                 } else {
932                     write!(f, "timer=exp ")?;
933                 }
934             },
935 
936             None => {
937                 write!(f, "timer=none ")?;
938             },
939         };
940 
941         write!(f, "latest_rtt={:?} ", self.latest_rtt)?;
942         write!(f, "srtt={:?} ", self.smoothed_rtt)?;
943         write!(f, "min_rtt={:?} ", self.min_rtt)?;
944         write!(f, "rttvar={:?} ", self.rttvar)?;
945         write!(f, "loss_time={:?} ", self.loss_time)?;
946         write!(f, "loss_probes={:?} ", self.loss_probes)?;
947         write!(f, "cwnd={} ", self.congestion_window)?;
948         write!(f, "ssthresh={} ", self.ssthresh)?;
949         write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
950         write!(f, "app_limited={} ", self.app_limited)?;
951         write!(
952             f,
953             "congestion_recovery_start_time={:?} ",
954             self.congestion_recovery_start_time
955         )?;
956         write!(f, "{:?} ", self.delivery_rate)?;
957         write!(f, "pacing_rate={:?} ", self.pacing_rate)?;
958         write!(
959             f,
960             "last_packet_scheduled_time={:?} ",
961             self.last_packet_scheduled_time
962         )?;
963 
964         if self.hystart.enabled() {
965             write!(f, "hystart={:?} ", self.hystart)?;
966         }
967 
968         Ok(())
969     }
970 }
971 
972 #[derive(Clone)]
973 pub struct Sent {
974     pub pkt_num: u64,
975 
976     pub frames: Vec<frame::Frame>,
977 
978     pub time_sent: Instant,
979 
980     pub time_acked: Option<Instant>,
981 
982     pub time_lost: Option<Instant>,
983 
984     pub size: usize,
985 
986     pub ack_eliciting: bool,
987 
988     pub in_flight: bool,
989 
990     pub delivered: usize,
991 
992     pub delivered_time: Instant,
993 
994     pub recent_delivered_packet_sent_time: Instant,
995 
996     pub is_app_limited: bool,
997 
998     pub has_data: bool,
999 }
1000 
1001 impl std::fmt::Debug for Sent {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result1002     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1003         write!(f, "pkt_num={:?} ", self.pkt_num)?;
1004         write!(f, "pkt_sent_time={:?} ", self.time_sent.elapsed())?;
1005         write!(f, "pkt_size={:?} ", self.size)?;
1006         write!(f, "delivered={:?} ", self.delivered)?;
1007         write!(f, "delivered_time={:?} ", self.delivered_time.elapsed())?;
1008         write!(
1009             f,
1010             "recent_delivered_packet_sent_time={:?} ",
1011             self.recent_delivered_packet_sent_time.elapsed()
1012         )?;
1013         write!(f, "is_app_limited={} ", self.is_app_limited)?;
1014         write!(f, "has_data={} ", self.has_data)?;
1015 
1016         Ok(())
1017     }
1018 }
1019 
1020 #[derive(Clone)]
1021 pub struct Acked {
1022     pub pkt_num: u64,
1023 
1024     pub time_sent: Instant,
1025 
1026     pub size: usize,
1027 }
1028 
1029 #[derive(Clone, Copy, Debug)]
1030 pub struct HandshakeStatus {
1031     pub has_handshake_keys: bool,
1032 
1033     pub peer_verified_address: bool,
1034 
1035     pub completed: bool,
1036 }
1037 
1038 #[cfg(test)]
1039 impl Default for HandshakeStatus {
default() -> HandshakeStatus1040     fn default() -> HandshakeStatus {
1041         HandshakeStatus {
1042             has_handshake_keys: true,
1043 
1044             peer_verified_address: true,
1045 
1046             completed: true,
1047         }
1048     }
1049 }
1050 
sub_abs(lhs: Duration, rhs: Duration) -> Duration1051 fn sub_abs(lhs: Duration, rhs: Duration) -> Duration {
1052     if lhs > rhs {
1053         lhs - rhs
1054     } else {
1055         rhs - lhs
1056     }
1057 }
1058 
1059 #[cfg(test)]
1060 mod tests {
1061     use super::*;
1062 
1063     #[test]
lookup_cc_algo_ok()1064     fn lookup_cc_algo_ok() {
1065         let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
1066         assert_eq!(algo, CongestionControlAlgorithm::Reno);
1067     }
1068 
1069     #[test]
lookup_cc_algo_bad()1070     fn lookup_cc_algo_bad() {
1071         assert_eq!(
1072             CongestionControlAlgorithm::from_str("???"),
1073             Err(Error::CongestionControl)
1074         );
1075     }
1076 
1077     #[test]
collapse_cwnd()1078     fn collapse_cwnd() {
1079         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1080         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1081 
1082         let mut r = Recovery::new(&cfg);
1083 
1084         // cwnd will be reset.
1085         r.collapse_cwnd();
1086         assert_eq!(r.cwnd(), r.max_datagram_size * MINIMUM_WINDOW_PACKETS);
1087     }
1088 
1089     #[test]
loss_on_pto()1090     fn loss_on_pto() {
1091         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1092         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1093 
1094         let mut r = Recovery::new(&cfg);
1095 
1096         let mut now = Instant::now();
1097 
1098         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1099 
1100         // Start by sending a few packets.
1101         let p = Sent {
1102             pkt_num: 0,
1103             frames: vec![],
1104             time_sent: now,
1105             time_acked: None,
1106             time_lost: None,
1107             size: 1000,
1108             ack_eliciting: true,
1109             in_flight: true,
1110             delivered: 0,
1111             delivered_time: now,
1112             recent_delivered_packet_sent_time: now,
1113             is_app_limited: false,
1114             has_data: false,
1115         };
1116 
1117         r.on_packet_sent(
1118             p,
1119             packet::EPOCH_APPLICATION,
1120             HandshakeStatus::default(),
1121             now,
1122             "",
1123         );
1124         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1125         assert_eq!(r.bytes_in_flight, 1000);
1126 
1127         let p = Sent {
1128             pkt_num: 1,
1129             frames: vec![],
1130             time_sent: now,
1131             time_acked: None,
1132             time_lost: None,
1133             size: 1000,
1134             ack_eliciting: true,
1135             in_flight: true,
1136             delivered: 0,
1137             delivered_time: now,
1138             recent_delivered_packet_sent_time: now,
1139             is_app_limited: false,
1140             has_data: false,
1141         };
1142 
1143         r.on_packet_sent(
1144             p,
1145             packet::EPOCH_APPLICATION,
1146             HandshakeStatus::default(),
1147             now,
1148             "",
1149         );
1150         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1151         assert_eq!(r.bytes_in_flight, 2000);
1152 
1153         let p = Sent {
1154             pkt_num: 2,
1155             frames: vec![],
1156             time_sent: now,
1157             time_acked: None,
1158             time_lost: None,
1159             size: 1000,
1160             ack_eliciting: true,
1161             in_flight: true,
1162             delivered: 0,
1163             delivered_time: now,
1164             recent_delivered_packet_sent_time: now,
1165             is_app_limited: false,
1166             has_data: false,
1167         };
1168 
1169         r.on_packet_sent(
1170             p,
1171             packet::EPOCH_APPLICATION,
1172             HandshakeStatus::default(),
1173             now,
1174             "",
1175         );
1176         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1177         assert_eq!(r.bytes_in_flight, 3000);
1178 
1179         let p = Sent {
1180             pkt_num: 3,
1181             frames: vec![],
1182             time_sent: now,
1183             time_acked: None,
1184             time_lost: None,
1185             size: 1000,
1186             ack_eliciting: true,
1187             in_flight: true,
1188             delivered: 0,
1189             delivered_time: now,
1190             recent_delivered_packet_sent_time: now,
1191             is_app_limited: false,
1192             has_data: false,
1193         };
1194 
1195         r.on_packet_sent(
1196             p,
1197             packet::EPOCH_APPLICATION,
1198             HandshakeStatus::default(),
1199             now,
1200             "",
1201         );
1202         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1203         assert_eq!(r.bytes_in_flight, 4000);
1204 
1205         // Wait for 10ms.
1206         now += Duration::from_millis(10);
1207 
1208         // Only the first 2 packets are acked.
1209         let mut acked = ranges::RangeSet::default();
1210         acked.insert(0..2);
1211 
1212         assert_eq!(
1213             r.on_ack_received(
1214                 &acked,
1215                 25,
1216                 packet::EPOCH_APPLICATION,
1217                 HandshakeStatus::default(),
1218                 now,
1219                 ""
1220             ),
1221             Ok(())
1222         );
1223 
1224         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1225         assert_eq!(r.bytes_in_flight, 2000);
1226         assert_eq!(r.lost_count, 0);
1227 
1228         // Wait until loss detection timer expires.
1229         now = r.loss_detection_timer().unwrap();
1230 
1231         // PTO.
1232         r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1233         assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 1);
1234         assert_eq!(r.lost_count, 0);
1235         assert_eq!(r.pto_count, 1);
1236 
1237         let p = Sent {
1238             pkt_num: 4,
1239             frames: vec![],
1240             time_sent: now,
1241             time_acked: None,
1242             time_lost: None,
1243             size: 1000,
1244             ack_eliciting: true,
1245             in_flight: true,
1246             delivered: 0,
1247             delivered_time: now,
1248             recent_delivered_packet_sent_time: now,
1249             is_app_limited: false,
1250             has_data: false,
1251         };
1252 
1253         r.on_packet_sent(
1254             p,
1255             packet::EPOCH_APPLICATION,
1256             HandshakeStatus::default(),
1257             now,
1258             "",
1259         );
1260         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1261         assert_eq!(r.bytes_in_flight, 3000);
1262 
1263         let p = Sent {
1264             pkt_num: 5,
1265             frames: vec![],
1266             time_sent: now,
1267             time_acked: None,
1268             time_lost: None,
1269             size: 1000,
1270             ack_eliciting: true,
1271             in_flight: true,
1272             delivered: 0,
1273             delivered_time: now,
1274             recent_delivered_packet_sent_time: now,
1275             is_app_limited: false,
1276             has_data: false,
1277         };
1278 
1279         r.on_packet_sent(
1280             p,
1281             packet::EPOCH_APPLICATION,
1282             HandshakeStatus::default(),
1283             now,
1284             "",
1285         );
1286         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1287         assert_eq!(r.bytes_in_flight, 4000);
1288         assert_eq!(r.lost_count, 0);
1289 
1290         // Wait for 10ms.
1291         now += Duration::from_millis(10);
1292 
1293         // PTO packets are acked.
1294         let mut acked = ranges::RangeSet::default();
1295         acked.insert(4..6);
1296 
1297         assert_eq!(
1298             r.on_ack_received(
1299                 &acked,
1300                 25,
1301                 packet::EPOCH_APPLICATION,
1302                 HandshakeStatus::default(),
1303                 now,
1304                 ""
1305             ),
1306             Ok(())
1307         );
1308 
1309         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1310         assert_eq!(r.bytes_in_flight, 0);
1311 
1312         assert_eq!(r.lost_count, 2);
1313     }
1314 
1315     #[test]
loss_on_timer()1316     fn loss_on_timer() {
1317         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1318         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1319 
1320         let mut r = Recovery::new(&cfg);
1321 
1322         let mut now = Instant::now();
1323 
1324         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1325 
1326         // Start by sending a few packets.
1327         let p = Sent {
1328             pkt_num: 0,
1329             frames: vec![],
1330             time_sent: now,
1331             time_acked: None,
1332             time_lost: None,
1333             size: 1000,
1334             ack_eliciting: true,
1335             in_flight: true,
1336             delivered: 0,
1337             delivered_time: now,
1338             recent_delivered_packet_sent_time: now,
1339             is_app_limited: false,
1340             has_data: false,
1341         };
1342 
1343         r.on_packet_sent(
1344             p,
1345             packet::EPOCH_APPLICATION,
1346             HandshakeStatus::default(),
1347             now,
1348             "",
1349         );
1350         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1351         assert_eq!(r.bytes_in_flight, 1000);
1352 
1353         let p = Sent {
1354             pkt_num: 1,
1355             frames: vec![],
1356             time_sent: now,
1357             time_acked: None,
1358             time_lost: None,
1359             size: 1000,
1360             ack_eliciting: true,
1361             in_flight: true,
1362             delivered: 0,
1363             delivered_time: now,
1364             recent_delivered_packet_sent_time: now,
1365             is_app_limited: false,
1366             has_data: false,
1367         };
1368 
1369         r.on_packet_sent(
1370             p,
1371             packet::EPOCH_APPLICATION,
1372             HandshakeStatus::default(),
1373             now,
1374             "",
1375         );
1376         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1377         assert_eq!(r.bytes_in_flight, 2000);
1378 
1379         let p = Sent {
1380             pkt_num: 2,
1381             frames: vec![],
1382             time_sent: now,
1383             time_acked: None,
1384             time_lost: None,
1385             size: 1000,
1386             ack_eliciting: true,
1387             in_flight: true,
1388             delivered: 0,
1389             delivered_time: now,
1390             recent_delivered_packet_sent_time: now,
1391             is_app_limited: false,
1392             has_data: false,
1393         };
1394 
1395         r.on_packet_sent(
1396             p,
1397             packet::EPOCH_APPLICATION,
1398             HandshakeStatus::default(),
1399             now,
1400             "",
1401         );
1402         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1403         assert_eq!(r.bytes_in_flight, 3000);
1404 
1405         let p = Sent {
1406             pkt_num: 3,
1407             frames: vec![],
1408             time_sent: now,
1409             time_acked: None,
1410             time_lost: None,
1411             size: 1000,
1412             ack_eliciting: true,
1413             in_flight: true,
1414             delivered: 0,
1415             delivered_time: now,
1416             recent_delivered_packet_sent_time: now,
1417             is_app_limited: false,
1418             has_data: false,
1419         };
1420 
1421         r.on_packet_sent(
1422             p,
1423             packet::EPOCH_APPLICATION,
1424             HandshakeStatus::default(),
1425             now,
1426             "",
1427         );
1428         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1429         assert_eq!(r.bytes_in_flight, 4000);
1430 
1431         // Wait for 10ms.
1432         now += Duration::from_millis(10);
1433 
1434         // Only the first 2 packets and the last one are acked.
1435         let mut acked = ranges::RangeSet::default();
1436         acked.insert(0..2);
1437         acked.insert(3..4);
1438 
1439         assert_eq!(
1440             r.on_ack_received(
1441                 &acked,
1442                 25,
1443                 packet::EPOCH_APPLICATION,
1444                 HandshakeStatus::default(),
1445                 now,
1446                 ""
1447             ),
1448             Ok(())
1449         );
1450 
1451         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1452         assert_eq!(r.bytes_in_flight, 1000);
1453         assert_eq!(r.lost_count, 0);
1454 
1455         // Wait until loss detection timer expires.
1456         now = r.loss_detection_timer().unwrap();
1457 
1458         // Packet is declared lost.
1459         r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1460         assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 0);
1461 
1462         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1463         assert_eq!(r.bytes_in_flight, 0);
1464 
1465         assert_eq!(r.lost_count, 1);
1466     }
1467 
1468     #[test]
loss_on_reordering()1469     fn loss_on_reordering() {
1470         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1471         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1472 
1473         let mut r = Recovery::new(&cfg);
1474 
1475         let mut now = Instant::now();
1476 
1477         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1478 
1479         // Start by sending a few packets.
1480         let p = Sent {
1481             pkt_num: 0,
1482             frames: vec![],
1483             time_sent: now,
1484             time_acked: None,
1485             time_lost: None,
1486             size: 1000,
1487             ack_eliciting: true,
1488             in_flight: true,
1489             delivered: 0,
1490             delivered_time: now,
1491             recent_delivered_packet_sent_time: now,
1492             is_app_limited: false,
1493             has_data: false,
1494         };
1495 
1496         r.on_packet_sent(
1497             p,
1498             packet::EPOCH_APPLICATION,
1499             HandshakeStatus::default(),
1500             now,
1501             "",
1502         );
1503         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1504         assert_eq!(r.bytes_in_flight, 1000);
1505 
1506         let p = Sent {
1507             pkt_num: 1,
1508             frames: vec![],
1509             time_sent: now,
1510             time_acked: None,
1511             time_lost: None,
1512             size: 1000,
1513             ack_eliciting: true,
1514             in_flight: true,
1515             delivered: 0,
1516             delivered_time: now,
1517             recent_delivered_packet_sent_time: now,
1518             is_app_limited: false,
1519             has_data: false,
1520         };
1521 
1522         r.on_packet_sent(
1523             p,
1524             packet::EPOCH_APPLICATION,
1525             HandshakeStatus::default(),
1526             now,
1527             "",
1528         );
1529         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1530         assert_eq!(r.bytes_in_flight, 2000);
1531 
1532         let p = Sent {
1533             pkt_num: 2,
1534             frames: vec![],
1535             time_sent: now,
1536             time_acked: None,
1537             time_lost: None,
1538             size: 1000,
1539             ack_eliciting: true,
1540             in_flight: true,
1541             delivered: 0,
1542             delivered_time: now,
1543             recent_delivered_packet_sent_time: now,
1544             is_app_limited: false,
1545             has_data: false,
1546         };
1547 
1548         r.on_packet_sent(
1549             p,
1550             packet::EPOCH_APPLICATION,
1551             HandshakeStatus::default(),
1552             now,
1553             "",
1554         );
1555         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1556         assert_eq!(r.bytes_in_flight, 3000);
1557 
1558         let p = Sent {
1559             pkt_num: 3,
1560             frames: vec![],
1561             time_sent: now,
1562             time_acked: None,
1563             time_lost: None,
1564             size: 1000,
1565             ack_eliciting: true,
1566             in_flight: true,
1567             delivered: 0,
1568             delivered_time: now,
1569             recent_delivered_packet_sent_time: now,
1570             is_app_limited: false,
1571             has_data: false,
1572         };
1573 
1574         r.on_packet_sent(
1575             p,
1576             packet::EPOCH_APPLICATION,
1577             HandshakeStatus::default(),
1578             now,
1579             "",
1580         );
1581         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1582         assert_eq!(r.bytes_in_flight, 4000);
1583 
1584         // Wait for 10ms.
1585         now += Duration::from_millis(10);
1586 
1587         // ACKs are reordered.
1588         let mut acked = ranges::RangeSet::default();
1589         acked.insert(2..4);
1590 
1591         assert_eq!(
1592             r.on_ack_received(
1593                 &acked,
1594                 25,
1595                 packet::EPOCH_APPLICATION,
1596                 HandshakeStatus::default(),
1597                 now,
1598                 ""
1599             ),
1600             Ok(())
1601         );
1602 
1603         now += Duration::from_millis(10);
1604 
1605         let mut acked = ranges::RangeSet::default();
1606         acked.insert(0..2);
1607 
1608         assert_eq!(
1609             r.on_ack_received(
1610                 &acked,
1611                 25,
1612                 packet::EPOCH_APPLICATION,
1613                 HandshakeStatus::default(),
1614                 now,
1615                 ""
1616             ),
1617             Ok(())
1618         );
1619 
1620         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1621         assert_eq!(r.bytes_in_flight, 0);
1622 
1623         // Spurious loss.
1624         assert_eq!(r.lost_count, 1);
1625     }
1626 
1627     #[test]
pacing()1628     fn pacing() {
1629         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1630         cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
1631 
1632         let mut r = Recovery::new(&cfg);
1633 
1634         let mut now = Instant::now();
1635 
1636         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1637 
1638         // send out first packet.
1639         let p = Sent {
1640             pkt_num: 0,
1641             frames: vec![],
1642             time_sent: now,
1643             time_acked: None,
1644             time_lost: None,
1645             size: 6500,
1646             ack_eliciting: true,
1647             in_flight: true,
1648             delivered: 0,
1649             delivered_time: now,
1650             recent_delivered_packet_sent_time: now,
1651             is_app_limited: false,
1652             has_data: false,
1653         };
1654 
1655         r.on_packet_sent(
1656             p,
1657             packet::EPOCH_APPLICATION,
1658             HandshakeStatus::default(),
1659             now,
1660             "",
1661         );
1662 
1663         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1664         assert_eq!(r.bytes_in_flight, 6500);
1665 
1666         // First packet will be sent out immidiately.
1667         assert_eq!(r.pacing_rate, 0);
1668         assert_eq!(r.get_packet_send_time().unwrap(), now);
1669 
1670         // Wait 50ms for ACK.
1671         now += Duration::from_millis(50);
1672 
1673         let mut acked = ranges::RangeSet::default();
1674         acked.insert(0..1);
1675 
1676         assert_eq!(
1677             r.on_ack_received(
1678                 &acked,
1679                 10,
1680                 packet::EPOCH_APPLICATION,
1681                 HandshakeStatus::default(),
1682                 now,
1683                 ""
1684             ),
1685             Ok(())
1686         );
1687 
1688         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1689         assert_eq!(r.bytes_in_flight, 0);
1690         assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
1691 
1692         // Send out second packet.
1693         let p = Sent {
1694             pkt_num: 1,
1695             frames: vec![],
1696             time_sent: now,
1697             time_acked: None,
1698             time_lost: None,
1699             size: 6500,
1700             ack_eliciting: true,
1701             in_flight: true,
1702             delivered: 0,
1703             delivered_time: now,
1704             recent_delivered_packet_sent_time: now,
1705             is_app_limited: false,
1706             has_data: false,
1707         };
1708 
1709         r.on_packet_sent(
1710             p,
1711             packet::EPOCH_APPLICATION,
1712             HandshakeStatus::default(),
1713             now,
1714             "",
1715         );
1716 
1717         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1718         assert_eq!(r.bytes_in_flight, 6500);
1719 
1720         // Pacing is not done during intial phase of connection.
1721         assert_eq!(r.get_packet_send_time().unwrap(), now);
1722 
1723         // Send the third packet out.
1724         let p = Sent {
1725             pkt_num: 2,
1726             frames: vec![],
1727             time_sent: now,
1728             time_acked: None,
1729             time_lost: None,
1730             size: 6500,
1731             ack_eliciting: true,
1732             in_flight: true,
1733             delivered: 0,
1734             delivered_time: now,
1735             recent_delivered_packet_sent_time: now,
1736             is_app_limited: false,
1737             has_data: false,
1738         };
1739 
1740         r.on_packet_sent(
1741             p,
1742             packet::EPOCH_APPLICATION,
1743             HandshakeStatus::default(),
1744             now,
1745             "",
1746         );
1747 
1748         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1749         assert_eq!(r.bytes_in_flight, 13000);
1750         assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
1751 
1752         // We pace this outgoing packet. as all conditions for pacing
1753         // are passed.
1754         assert_eq!(r.pacing_rate, (12000.0 / 0.05) as u64);
1755         assert_eq!(
1756             r.get_packet_send_time().unwrap(),
1757             now + Duration::from_micros(
1758                 (6500 * 1000000) / (12000.0 / 0.05) as u64
1759             )
1760         );
1761     }
1762 }
1763 
1764 mod cubic;
1765 mod delivery_rate;
1766 mod hystart;
1767 mod reno;
1768