• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use std::cmp;
28 
29 use std::str::FromStr;
30 
31 use std::time::Duration;
32 use std::time::Instant;
33 
34 use std::collections::VecDeque;
35 
36 use crate::Config;
37 use crate::Error;
38 use crate::Result;
39 
40 use crate::frame;
41 use crate::minmax;
42 use crate::packet;
43 use crate::ranges;
44 
45 // Loss Recovery
46 const PACKET_THRESHOLD: u64 = 3;
47 
48 const TIME_THRESHOLD: f64 = 9.0 / 8.0;
49 
50 const GRANULARITY: Duration = Duration::from_millis(1);
51 
52 const INITIAL_RTT: Duration = Duration::from_millis(333);
53 
54 const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;
55 
56 const RTT_WINDOW: Duration = Duration::from_secs(300);
57 
58 const MAX_PTO_PROBES_COUNT: usize = 2;
59 
60 // Congestion Control
61 const INITIAL_WINDOW_PACKETS: usize = 10;
62 
63 const INITIAL_WINDOW: usize = INITIAL_WINDOW_PACKETS * MAX_DATAGRAM_SIZE;
64 
65 const MINIMUM_WINDOW: usize = 2 * MAX_DATAGRAM_SIZE;
66 
67 const MAX_DATAGRAM_SIZE: usize = 1452;
68 
69 const LOSS_REDUCTION_FACTOR: f64 = 0.5;
70 
71 pub struct Recovery {
72     loss_detection_timer: Option<Instant>,
73 
74     pto_count: u32,
75 
76     time_of_last_sent_ack_eliciting_pkt: [Option<Instant>; packet::EPOCH_COUNT],
77 
78     largest_acked_pkt: [u64; packet::EPOCH_COUNT],
79 
80     largest_sent_pkt: [u64; packet::EPOCH_COUNT],
81 
82     latest_rtt: Duration,
83 
84     smoothed_rtt: Option<Duration>,
85 
86     rttvar: Duration,
87 
88     minmax_filter: minmax::Minmax<Duration>,
89 
90     min_rtt: Duration,
91 
92     pub max_ack_delay: Duration,
93 
94     loss_time: [Option<Instant>; packet::EPOCH_COUNT],
95 
96     sent: [VecDeque<Sent>; packet::EPOCH_COUNT],
97 
98     pub lost: [Vec<frame::Frame>; packet::EPOCH_COUNT],
99 
100     pub acked: [Vec<frame::Frame>; packet::EPOCH_COUNT],
101 
102     pub lost_count: usize,
103 
104     pub loss_probes: [usize; packet::EPOCH_COUNT],
105 
106     in_flight_count: [usize; packet::EPOCH_COUNT],
107 
108     app_limited: bool,
109 
110     delivery_rate: delivery_rate::Rate,
111 
112     // Congestion control.
113     cc_ops: &'static CongestionControlOps,
114 
115     congestion_window: usize,
116 
117     bytes_in_flight: usize,
118 
119     ssthresh: usize,
120 
121     bytes_acked: usize,
122 
123     congestion_recovery_start_time: Option<Instant>,
124 
125     cubic_state: cubic::State,
126 
127     // HyStart++.
128     hystart: hystart::Hystart,
129 }
130 
131 impl Recovery {
new(config: &Config) -> Self132     pub fn new(config: &Config) -> Self {
133         Recovery {
134             loss_detection_timer: None,
135 
136             pto_count: 0,
137 
138             time_of_last_sent_ack_eliciting_pkt: [None; packet::EPOCH_COUNT],
139 
140             largest_acked_pkt: [std::u64::MAX; packet::EPOCH_COUNT],
141 
142             largest_sent_pkt: [0; packet::EPOCH_COUNT],
143 
144             latest_rtt: Duration::new(0, 0),
145 
146             // This field should be initialized to `INITIAL_RTT` for the initial
147             // PTO calculation, but it also needs to be an `Option` to track
148             // whether any RTT sample was received, so the initial value is
149             // handled by the `rtt()` method instead.
150             smoothed_rtt: None,
151 
152             minmax_filter: minmax::Minmax::new(Duration::new(0, 0)),
153 
154             min_rtt: Duration::new(0, 0),
155 
156             rttvar: INITIAL_RTT / 2,
157 
158             max_ack_delay: Duration::new(0, 0),
159 
160             loss_time: [None; packet::EPOCH_COUNT],
161 
162             sent: [VecDeque::new(), VecDeque::new(), VecDeque::new()],
163 
164             lost: [Vec::new(), Vec::new(), Vec::new()],
165 
166             acked: [Vec::new(), Vec::new(), Vec::new()],
167 
168             lost_count: 0,
169 
170             loss_probes: [0; packet::EPOCH_COUNT],
171 
172             in_flight_count: [0; packet::EPOCH_COUNT],
173 
174             congestion_window: INITIAL_WINDOW,
175 
176             bytes_in_flight: 0,
177 
178             ssthresh: std::usize::MAX,
179 
180             bytes_acked: 0,
181 
182             congestion_recovery_start_time: None,
183 
184             cc_ops: config.cc_algorithm.into(),
185 
186             delivery_rate: delivery_rate::Rate::default(),
187 
188             cubic_state: cubic::State::default(),
189 
190             app_limited: false,
191 
192             hystart: hystart::Hystart::new(config.hystart),
193         }
194     }
195 
on_packet_sent( &mut self, mut pkt: Sent, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )196     pub fn on_packet_sent(
197         &mut self, mut pkt: Sent, epoch: packet::Epoch,
198         handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
199     ) {
200         let ack_eliciting = pkt.ack_eliciting;
201         let in_flight = pkt.in_flight;
202         let sent_bytes = pkt.size;
203         let pkt_num = pkt.pkt_num;
204 
205         self.delivery_rate.on_packet_sent(&mut pkt, now);
206 
207         self.largest_sent_pkt[epoch] =
208             cmp::max(self.largest_sent_pkt[epoch], pkt_num);
209 
210         self.sent[epoch].push_back(pkt);
211 
212         if in_flight {
213             if ack_eliciting {
214                 self.time_of_last_sent_ack_eliciting_pkt[epoch] = Some(now);
215             }
216 
217             self.in_flight_count[epoch] += 1;
218 
219             self.app_limited =
220                 (self.bytes_in_flight + sent_bytes) < self.congestion_window;
221 
222             self.on_packet_sent_cc(sent_bytes, now);
223 
224             self.set_loss_detection_timer(handshake_status, now);
225         }
226 
227         // HyStart++: Start of the round in a slow start.
228         if self.hystart.enabled() &&
229             epoch == packet::EPOCH_APPLICATION &&
230             self.congestion_window < self.ssthresh
231         {
232             self.hystart.start_round(pkt_num);
233         }
234 
235         trace!("{} {:?}", trace_id, self);
236     }
237 
on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant)238     fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
239         (self.cc_ops.on_packet_sent)(self, sent_bytes, now);
240     }
241 
on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> Result<()>242     pub fn on_ack_received(
243         &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
244         epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
245         trace_id: &str,
246     ) -> Result<()> {
247         let largest_acked = ranges.last().unwrap();
248 
249         // If the largest packet number acked exceeds any packet number we have
250         // sent, then the ACK is obviously invalid, so there's no need to
251         // continue further.
252         if largest_acked > self.largest_sent_pkt[epoch] {
253             if cfg!(feature = "fuzzing") {
254                 return Ok(());
255             }
256 
257             return Err(Error::InvalidPacket);
258         }
259 
260         if self.largest_acked_pkt[epoch] == std::u64::MAX {
261             self.largest_acked_pkt[epoch] = largest_acked;
262         } else {
263             self.largest_acked_pkt[epoch] =
264                 cmp::max(self.largest_acked_pkt[epoch], largest_acked);
265         }
266 
267         let mut has_ack_eliciting = false;
268 
269         let mut largest_newly_acked_pkt_num = 0;
270         let mut largest_newly_acked_sent_time = now;
271 
272         let mut newly_acked = Vec::new();
273 
274         // Detect and mark acked packets, without removing them from the sent
275         // packets list.
276         for r in ranges.iter() {
277             let lowest_acked = r.start;
278             let largest_acked = r.end - 1;
279 
280             let unacked_iter = self.sent[epoch]
281                 .iter_mut()
282                 // Skip packets that precede the lowest acked packet in the block.
283                 .skip_while(|p| p.pkt_num < lowest_acked)
284                 // Skip packets that follow the largest acked packet in the block.
285                 .take_while(|p| p.pkt_num <= largest_acked)
286                 // Skip packets that have already been acked or lost.
287                 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
288 
289             for unacked in unacked_iter {
290                 unacked.time_acked = Some(now);
291 
292                 if unacked.ack_eliciting {
293                     has_ack_eliciting = true;
294                 }
295 
296                 largest_newly_acked_pkt_num = unacked.pkt_num;
297                 largest_newly_acked_sent_time = unacked.time_sent;
298 
299                 self.acked[epoch].append(&mut unacked.frames);
300 
301                 if unacked.in_flight {
302                     self.in_flight_count[epoch] =
303                         self.in_flight_count[epoch].saturating_sub(1);
304 
305                     self.delivery_rate.on_packet_acked(&unacked, now);
306                 }
307 
308                 newly_acked.push(Acked {
309                     pkt_num: unacked.pkt_num,
310 
311                     time_sent: unacked.time_sent,
312 
313                     size: unacked.size,
314                 });
315 
316                 trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
317             }
318         }
319 
320         self.delivery_rate.estimate();
321 
322         if newly_acked.is_empty() {
323             return Ok(());
324         }
325 
326         if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
327             let latest_rtt = now - largest_newly_acked_sent_time;
328 
329             let ack_delay = if epoch == packet::EPOCH_APPLICATION {
330                 Duration::from_micros(ack_delay)
331             } else {
332                 Duration::from_micros(0)
333             };
334 
335             self.update_rtt(latest_rtt, ack_delay, now);
336         }
337 
338         // Detect and mark lost packets without removing them from the sent
339         // packets list.
340         self.detect_lost_packets(epoch, now, trace_id);
341 
342         self.on_packets_acked(newly_acked, epoch, now);
343 
344         self.pto_count = 0;
345 
346         self.set_loss_detection_timer(handshake_status, now);
347 
348         self.drain_packets(epoch);
349 
350         trace!("{} {:?}", trace_id, self);
351 
352         Ok(())
353     }
354 
on_loss_detection_timeout( &mut self, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )355     pub fn on_loss_detection_timeout(
356         &mut self, handshake_status: HandshakeStatus, now: Instant,
357         trace_id: &str,
358     ) {
359         let (earliest_loss_time, epoch) = self.loss_time_and_space();
360 
361         if earliest_loss_time.is_some() {
362             // Time threshold loss detection.
363             self.detect_lost_packets(epoch, now, trace_id);
364 
365             self.set_loss_detection_timer(handshake_status, now);
366 
367             trace!("{} {:?}", trace_id, self);
368             return;
369         }
370 
371         let epoch = if self.bytes_in_flight > 0 {
372             // Send new data if available, else retransmit old data. If neither
373             // is available, send a single PING frame.
374             let (_, e) = self.pto_time_and_space(handshake_status, now);
375 
376             e
377         } else {
378             // Client sends an anti-deadlock packet: Initial is padded to earn
379             // more anti-amplification credit, a Handshake packet proves address
380             // ownership.
381             if handshake_status.has_handshake_keys {
382                 packet::EPOCH_HANDSHAKE
383             } else {
384                 packet::EPOCH_INITIAL
385             }
386         };
387 
388         self.pto_count += 1;
389 
390         self.loss_probes[epoch] =
391             cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
392 
393         let unacked_iter = self.sent[epoch]
394             .iter_mut()
395             // Skip packets that have already been acked or lost, and packets
396             // that don't contain either CRYPTO or STREAM frames.
397             .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
398             // Only return as many packets as the number of probe packets that
399             // will be sent.
400             .take(self.loss_probes[epoch]);
401 
402         // Retransmit the frames from the oldest sent packets on PTO. However
403         // the packets are not actually declared lost (so there is no effect to
404         // congestion control), we just reschedule the data they carried.
405         //
406         // This will also trigger sending an ACK and retransmitting frames like
407         // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
408         // to CRYPTO and STREAM, if the original packet carried them.
409         for unacked in unacked_iter {
410             self.lost[epoch].extend_from_slice(&unacked.frames);
411         }
412 
413         self.set_loss_detection_timer(handshake_status, now);
414 
415         trace!("{} {:?}", trace_id, self);
416     }
417 
on_pkt_num_space_discarded( &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, )418     pub fn on_pkt_num_space_discarded(
419         &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
420         now: Instant,
421     ) {
422         let unacked_bytes = self.sent[epoch]
423             .iter()
424             .filter(|p| {
425                 p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
426             })
427             .fold(0, |acc, p| acc + p.size);
428 
429         self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked_bytes);
430 
431         self.sent[epoch].clear();
432         self.lost[epoch].clear();
433         self.acked[epoch].clear();
434 
435         self.time_of_last_sent_ack_eliciting_pkt[epoch] = None;
436         self.loss_time[epoch] = None;
437         self.loss_probes[epoch] = 0;
438         self.in_flight_count[epoch] = 0;
439 
440         self.set_loss_detection_timer(handshake_status, now);
441     }
442 
loss_detection_timer(&self) -> Option<Instant>443     pub fn loss_detection_timer(&self) -> Option<Instant> {
444         self.loss_detection_timer
445     }
446 
cwnd(&self) -> usize447     pub fn cwnd(&self) -> usize {
448         self.congestion_window
449     }
450 
cwnd_available(&self) -> usize451     pub fn cwnd_available(&self) -> usize {
452         // Ignore cwnd when sending probe packets.
453         if self.loss_probes.iter().any(|&x| x > 0) {
454             return std::usize::MAX;
455         }
456 
457         self.congestion_window.saturating_sub(self.bytes_in_flight)
458     }
459 
rtt(&self) -> Duration460     pub fn rtt(&self) -> Duration {
461         self.smoothed_rtt.unwrap_or(INITIAL_RTT)
462     }
463 
pto(&self) -> Duration464     pub fn pto(&self) -> Duration {
465         self.rtt() + cmp::max(self.rttvar * 4, GRANULARITY)
466     }
467 
delivery_rate(&self) -> u64468     pub fn delivery_rate(&self) -> u64 {
469         self.delivery_rate.delivery_rate()
470     }
471 
update_rtt( &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant, )472     fn update_rtt(
473         &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant,
474     ) {
475         self.latest_rtt = latest_rtt;
476 
477         match self.smoothed_rtt {
478             // First RTT sample.
479             None => {
480                 self.min_rtt = self.minmax_filter.reset(now, latest_rtt);
481 
482                 self.smoothed_rtt = Some(latest_rtt);
483 
484                 self.rttvar = latest_rtt / 2;
485             },
486 
487             Some(srtt) => {
488                 self.min_rtt =
489                     self.minmax_filter.running_min(RTT_WINDOW, now, latest_rtt);
490 
491                 let ack_delay = cmp::min(self.max_ack_delay, ack_delay);
492 
493                 // Adjust for ack delay if plausible.
494                 let adjusted_rtt = if latest_rtt > self.min_rtt + ack_delay {
495                     latest_rtt - ack_delay
496                 } else {
497                     latest_rtt
498                 };
499 
500                 self.rttvar = self.rttvar.mul_f64(3.0 / 4.0) +
501                     sub_abs(srtt, adjusted_rtt).mul_f64(1.0 / 4.0);
502 
503                 self.smoothed_rtt = Some(
504                     srtt.mul_f64(7.0 / 8.0) + adjusted_rtt.mul_f64(1.0 / 8.0),
505                 );
506             },
507         }
508     }
509 
loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch)510     fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
511         let mut epoch = packet::EPOCH_INITIAL;
512         let mut time = self.loss_time[epoch];
513 
514         // Iterate over all packet number spaces starting from Handshake.
515         #[allow(clippy::needless_range_loop)]
516         for e in packet::EPOCH_HANDSHAKE..packet::EPOCH_COUNT {
517             let new_time = self.loss_time[e];
518 
519             if time.is_none() || new_time < time {
520                 time = new_time;
521                 epoch = e;
522             }
523         }
524 
525         (time, epoch)
526     }
527 
pto_time_and_space( &self, handshake_status: HandshakeStatus, now: Instant, ) -> (Option<Instant>, packet::Epoch)528     fn pto_time_and_space(
529         &self, handshake_status: HandshakeStatus, now: Instant,
530     ) -> (Option<Instant>, packet::Epoch) {
531         let mut duration = self.pto() * 2_u32.pow(self.pto_count);
532 
533         // Arm PTO from now when there are no inflight packets.
534         if self.bytes_in_flight == 0 {
535             if handshake_status.has_handshake_keys {
536                 return (Some(now + duration), packet::EPOCH_HANDSHAKE);
537             } else {
538                 return (Some(now + duration), packet::EPOCH_INITIAL);
539             }
540         }
541 
542         let mut pto_timeout = None;
543         let mut pto_space = packet::EPOCH_INITIAL;
544 
545         // Iterate over all packet number spaces.
546         for e in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
547             if self.in_flight_count[e] == 0 {
548                 continue;
549             }
550 
551             if e == packet::EPOCH_APPLICATION {
552                 // Skip Application Data until handshake completes.
553                 if !handshake_status.completed {
554                     return (pto_timeout, pto_space);
555                 }
556 
557                 // Include max_ack_delay and backoff for Application Data.
558                 duration += self.max_ack_delay * 2_u32.pow(self.pto_count);
559             }
560 
561             let new_time =
562                 self.time_of_last_sent_ack_eliciting_pkt[e].map(|t| t + duration);
563 
564             if pto_timeout.is_none() || new_time < pto_timeout {
565                 pto_timeout = new_time;
566                 pto_space = e;
567             }
568         }
569 
570         (pto_timeout, pto_space)
571     }
572 
set_loss_detection_timer( &mut self, handshake_status: HandshakeStatus, now: Instant, )573     fn set_loss_detection_timer(
574         &mut self, handshake_status: HandshakeStatus, now: Instant,
575     ) {
576         let (earliest_loss_time, _) = self.loss_time_and_space();
577 
578         if earliest_loss_time.is_some() {
579             // Time threshold loss detection.
580             self.loss_detection_timer = earliest_loss_time;
581             return;
582         }
583 
584         if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
585             self.loss_detection_timer = None;
586             return;
587         }
588 
589         // PTO timer.
590         let (timeout, _) = self.pto_time_and_space(handshake_status, now);
591         self.loss_detection_timer = timeout;
592     }
593 
detect_lost_packets( &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str, )594     fn detect_lost_packets(
595         &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
596     ) {
597         let largest_acked = self.largest_acked_pkt[epoch];
598 
599         self.loss_time[epoch] = None;
600 
601         let loss_delay =
602             cmp::max(self.latest_rtt, self.rtt()).mul_f64(TIME_THRESHOLD);
603 
604         // Minimum time of kGranularity before packets are deemed lost.
605         let loss_delay = cmp::max(loss_delay, GRANULARITY);
606 
607         // Packets sent before this time are deemed lost.
608         let lost_send_time = now - loss_delay;
609 
610         let mut lost_bytes = 0;
611 
612         let mut largest_lost_pkt = None;
613 
614         let unacked_iter = self.sent[epoch]
615             .iter_mut()
616             // Skip packets that follow the largest acked packet.
617             .take_while(|p| p.pkt_num <= largest_acked)
618             // Skip packets that have already been acked or lost.
619             .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
620 
621         for unacked in unacked_iter {
622             // Mark packet as lost, or set time when it should be marked.
623             if unacked.time_sent <= lost_send_time ||
624                 largest_acked >= unacked.pkt_num + PACKET_THRESHOLD
625             {
626                 self.lost[epoch].append(&mut unacked.frames);
627 
628                 unacked.time_lost = Some(now);
629 
630                 if unacked.in_flight {
631                     lost_bytes += unacked.size;
632 
633                     // Frames have already been removed from the packet, so
634                     // cloning the whole packet should be relatively cheap.
635                     largest_lost_pkt = Some(unacked.clone());
636 
637                     self.in_flight_count[epoch] =
638                         self.in_flight_count[epoch].saturating_sub(1);
639 
640                     trace!(
641                         "{} packet {} lost on epoch {}",
642                         trace_id,
643                         unacked.pkt_num,
644                         epoch
645                     );
646                 }
647 
648                 self.lost_count += 1;
649             } else {
650                 let loss_time = match self.loss_time[epoch] {
651                     None => unacked.time_sent + loss_delay,
652 
653                     Some(loss_time) =>
654                         cmp::min(loss_time, unacked.time_sent + loss_delay),
655                 };
656 
657                 self.loss_time[epoch] = Some(loss_time);
658             }
659         }
660 
661         if let Some(pkt) = largest_lost_pkt {
662             self.on_packets_lost(lost_bytes, &pkt, epoch, now);
663         }
664 
665         self.drain_packets(epoch);
666     }
667 
drain_packets(&mut self, epoch: packet::Epoch)668     fn drain_packets(&mut self, epoch: packet::Epoch) {
669         let mut lowest_non_expired_pkt_index = self.sent[epoch].len();
670 
671         // In order to avoid removing elements from the middle of the list
672         // (which would require copying other elements to compact the list),
673         // we only remove a contiguous range of elements from the start of the
674         // list.
675         //
676         // This means that acked or lost elements coming after this will not
677         // be removed at this point, but their removal is delayed for a later
678         // time, once the gaps have been filled.
679 
680         // First, find the first element that is neither acked nor lost.
681         for (i, pkt) in self.sent[epoch].iter().enumerate() {
682             if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
683                 lowest_non_expired_pkt_index = i;
684                 break;
685             }
686         }
687 
688         // Then remove elements up to the previously found index.
689         self.sent[epoch].drain(..lowest_non_expired_pkt_index);
690     }
691 
on_packets_acked( &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant, )692     fn on_packets_acked(
693         &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant,
694     ) {
695         for pkt in acked {
696             (self.cc_ops.on_packet_acked)(self, &pkt, epoch, now);
697         }
698     }
699 
in_congestion_recovery(&self, sent_time: Instant) -> bool700     fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
701         match self.congestion_recovery_start_time {
702             Some(congestion_recovery_start_time) =>
703                 sent_time <= congestion_recovery_start_time,
704 
705             None => false,
706         }
707     }
708 
in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool709     fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool {
710         let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD;
711 
712         // TODO: properly detect persistent congestion
713         false
714     }
715 
on_packets_lost( &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, )716     fn on_packets_lost(
717         &mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
718         epoch: packet::Epoch, now: Instant,
719     ) {
720         self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
721 
722         self.congestion_event(largest_lost_pkt.time_sent, epoch, now);
723 
724         if self.in_persistent_congestion(largest_lost_pkt.pkt_num) {
725             self.collapse_cwnd();
726         }
727     }
728 
congestion_event( &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant, )729     fn congestion_event(
730         &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant,
731     ) {
732         (self.cc_ops.congestion_event)(self, time_sent, epoch, now);
733     }
734 
collapse_cwnd(&mut self)735     fn collapse_cwnd(&mut self) {
736         (self.cc_ops.collapse_cwnd)(self);
737     }
738 
rate_check_app_limited(&mut self)739     pub fn rate_check_app_limited(&mut self) {
740         if self.app_limited {
741             self.delivery_rate.check_app_limited(self.bytes_in_flight)
742         }
743     }
744 
hystart_on_packet_acked( &mut self, packet: &Acked, now: Instant, ) -> (usize, usize)745     fn hystart_on_packet_acked(
746         &mut self, packet: &Acked, now: Instant,
747     ) -> (usize, usize) {
748         self.hystart.on_packet_acked(
749             packet,
750             self.latest_rtt,
751             self.congestion_window,
752             self.ssthresh,
753             now,
754         )
755     }
756 
update_app_limited(&mut self, v: bool)757     pub fn update_app_limited(&mut self, v: bool) {
758         self.app_limited = v;
759     }
760 
app_limited(&mut self) -> bool761     pub fn app_limited(&mut self) -> bool {
762         self.app_limited
763     }
764 
765     #[cfg(feature = "qlog")]
to_qlog(&self) -> qlog::event::Event766     pub fn to_qlog(&self) -> qlog::event::Event {
767         // QVis can't use all these fields and they can be large.
768         qlog::event::Event::metrics_updated(
769             Some(self.min_rtt.as_millis() as u64),
770             Some(self.rtt().as_millis() as u64),
771             Some(self.latest_rtt.as_millis() as u64),
772             Some(self.rttvar.as_millis() as u64),
773             None, // delay
774             None, // probe_count
775             Some(self.cwnd() as u64),
776             Some(self.bytes_in_flight as u64),
777             None, // ssthresh
778             None, // packets_in_flight
779             None, // in_recovery
780             None, // pacing_rate
781         )
782     }
783 }
784 
785 /// Available congestion control algorithms.
786 ///
787 /// This enum provides currently available list of congestion control
788 /// algorithms.
789 #[derive(Debug, Copy, Clone, PartialEq)]
790 #[repr(C)]
791 pub enum CongestionControlAlgorithm {
792     /// Reno congestion control algorithm. `reno` in a string form.
793     Reno  = 0,
794     /// CUBIC congestion control algorithm (default). `cubic` in a string form.
795     CUBIC = 1,
796 }
797 
798 impl FromStr for CongestionControlAlgorithm {
799     type Err = crate::Error;
800 
801     /// Converts a string to `CongestionControlAlgorithm`.
802     ///
803     /// If `name` is not valid, `Error::CongestionControl` is returned.
from_str(name: &str) -> std::result::Result<Self, Self::Err>804     fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
805         match name {
806             "reno" => Ok(CongestionControlAlgorithm::Reno),
807             "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
808 
809             _ => Err(crate::Error::CongestionControl),
810         }
811     }
812 }
813 
814 pub struct CongestionControlOps {
815     pub on_packet_sent: fn(r: &mut Recovery, sent_bytes: usize, now: Instant),
816 
817     pub on_packet_acked:
818         fn(r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant),
819 
820     pub congestion_event: fn(
821         r: &mut Recovery,
822         time_sent: Instant,
823         epoch: packet::Epoch,
824         now: Instant,
825     ),
826 
827     pub collapse_cwnd: fn(r: &mut Recovery),
828 }
829 
830 impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
from(algo: CongestionControlAlgorithm) -> Self831     fn from(algo: CongestionControlAlgorithm) -> Self {
832         match algo {
833             CongestionControlAlgorithm::Reno => &reno::RENO,
834             CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
835         }
836     }
837 }
838 
839 impl std::fmt::Debug for Recovery {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result840     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
841         match self.loss_detection_timer {
842             Some(v) => {
843                 let now = Instant::now();
844 
845                 if v > now {
846                     let d = v.duration_since(now);
847                     write!(f, "timer={:?} ", d)?;
848                 } else {
849                     write!(f, "timer=exp ")?;
850                 }
851             },
852 
853             None => {
854                 write!(f, "timer=none ")?;
855             },
856         };
857 
858         write!(f, "latest_rtt={:?} ", self.latest_rtt)?;
859         write!(f, "srtt={:?} ", self.smoothed_rtt)?;
860         write!(f, "min_rtt={:?} ", self.min_rtt)?;
861         write!(f, "rttvar={:?} ", self.rttvar)?;
862         write!(f, "loss_time={:?} ", self.loss_time)?;
863         write!(f, "loss_probes={:?} ", self.loss_probes)?;
864         write!(f, "cwnd={} ", self.congestion_window)?;
865         write!(f, "ssthresh={} ", self.ssthresh)?;
866         write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
867         write!(f, "app_limited={} ", self.app_limited)?;
868         write!(
869             f,
870             "congestion_recovery_start_time={:?} ",
871             self.congestion_recovery_start_time
872         )?;
873         write!(f, "{:?} ", self.delivery_rate)?;
874 
875         if self.hystart.enabled() {
876             write!(f, "hystart={:?} ", self.hystart)?;
877         }
878 
879         Ok(())
880     }
881 }
882 
883 #[derive(Clone)]
884 pub struct Sent {
885     pub pkt_num: u64,
886 
887     pub frames: Vec<frame::Frame>,
888 
889     pub time_sent: Instant,
890 
891     pub time_acked: Option<Instant>,
892 
893     pub time_lost: Option<Instant>,
894 
895     pub size: usize,
896 
897     pub ack_eliciting: bool,
898 
899     pub in_flight: bool,
900 
901     pub delivered: usize,
902 
903     pub delivered_time: Instant,
904 
905     pub recent_delivered_packet_sent_time: Instant,
906 
907     pub is_app_limited: bool,
908 
909     pub has_data: bool,
910 }
911 
912 impl std::fmt::Debug for Sent {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result913     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
914         write!(f, "pkt_num={:?} ", self.pkt_num)?;
915         write!(f, "pkt_sent_time={:?} ", self.time_sent.elapsed())?;
916         write!(f, "pkt_size={:?} ", self.size)?;
917         write!(f, "delivered={:?} ", self.delivered)?;
918         write!(f, "delivered_time={:?} ", self.delivered_time.elapsed())?;
919         write!(
920             f,
921             "recent_delivered_packet_sent_time={:?} ",
922             self.recent_delivered_packet_sent_time.elapsed()
923         )?;
924         write!(f, "is_app_limited={} ", self.is_app_limited)?;
925         write!(f, "has_data={} ", self.has_data)?;
926 
927         Ok(())
928     }
929 }
930 
931 #[derive(Clone)]
932 pub struct Acked {
933     pub pkt_num: u64,
934 
935     pub time_sent: Instant,
936 
937     pub size: usize,
938 }
939 
940 #[derive(Clone, Copy, Debug)]
941 pub struct HandshakeStatus {
942     pub has_handshake_keys: bool,
943 
944     pub peer_verified_address: bool,
945 
946     pub completed: bool,
947 }
948 
949 #[cfg(test)]
950 impl Default for HandshakeStatus {
default() -> HandshakeStatus951     fn default() -> HandshakeStatus {
952         HandshakeStatus {
953             has_handshake_keys: true,
954 
955             peer_verified_address: true,
956 
957             completed: true,
958         }
959     }
960 }
961 
sub_abs(lhs: Duration, rhs: Duration) -> Duration962 fn sub_abs(lhs: Duration, rhs: Duration) -> Duration {
963     if lhs > rhs {
964         lhs - rhs
965     } else {
966         rhs - lhs
967     }
968 }
969 
970 #[cfg(test)]
971 mod tests {
972     use super::*;
973 
974     #[test]
lookup_cc_algo_ok()975     fn lookup_cc_algo_ok() {
976         let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
977         assert_eq!(algo, CongestionControlAlgorithm::Reno);
978     }
979 
980     #[test]
lookup_cc_algo_bad()981     fn lookup_cc_algo_bad() {
982         assert_eq!(
983             CongestionControlAlgorithm::from_str("???"),
984             Err(Error::CongestionControl)
985         );
986     }
987 
988     #[test]
collapse_cwnd()989     fn collapse_cwnd() {
990         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
991         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
992 
993         let mut r = Recovery::new(&cfg);
994 
995         // cwnd will be reset.
996         r.collapse_cwnd();
997         assert_eq!(r.cwnd(), MINIMUM_WINDOW);
998     }
999 
1000     #[test]
loss_on_pto()1001     fn loss_on_pto() {
1002         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1003         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1004 
1005         let mut r = Recovery::new(&cfg);
1006 
1007         let mut now = Instant::now();
1008 
1009         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1010 
1011         // Start by sending a few packets.
1012         let p = Sent {
1013             pkt_num: 0,
1014             frames: vec![],
1015             time_sent: now,
1016             time_acked: None,
1017             time_lost: None,
1018             size: 1000,
1019             ack_eliciting: true,
1020             in_flight: true,
1021             delivered: 0,
1022             delivered_time: now,
1023             recent_delivered_packet_sent_time: now,
1024             is_app_limited: false,
1025             has_data: false,
1026         };
1027 
1028         r.on_packet_sent(
1029             p,
1030             packet::EPOCH_APPLICATION,
1031             HandshakeStatus::default(),
1032             now,
1033             "",
1034         );
1035         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1036         assert_eq!(r.bytes_in_flight, 1000);
1037 
1038         let p = Sent {
1039             pkt_num: 1,
1040             frames: vec![],
1041             time_sent: now,
1042             time_acked: None,
1043             time_lost: None,
1044             size: 1000,
1045             ack_eliciting: true,
1046             in_flight: true,
1047             delivered: 0,
1048             delivered_time: now,
1049             recent_delivered_packet_sent_time: now,
1050             is_app_limited: false,
1051             has_data: false,
1052         };
1053 
1054         r.on_packet_sent(
1055             p,
1056             packet::EPOCH_APPLICATION,
1057             HandshakeStatus::default(),
1058             now,
1059             "",
1060         );
1061         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1062         assert_eq!(r.bytes_in_flight, 2000);
1063 
1064         let p = Sent {
1065             pkt_num: 2,
1066             frames: vec![],
1067             time_sent: now,
1068             time_acked: None,
1069             time_lost: None,
1070             size: 1000,
1071             ack_eliciting: true,
1072             in_flight: true,
1073             delivered: 0,
1074             delivered_time: now,
1075             recent_delivered_packet_sent_time: now,
1076             is_app_limited: false,
1077             has_data: false,
1078         };
1079 
1080         r.on_packet_sent(
1081             p,
1082             packet::EPOCH_APPLICATION,
1083             HandshakeStatus::default(),
1084             now,
1085             "",
1086         );
1087         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1088         assert_eq!(r.bytes_in_flight, 3000);
1089 
1090         let p = Sent {
1091             pkt_num: 3,
1092             frames: vec![],
1093             time_sent: now,
1094             time_acked: None,
1095             time_lost: None,
1096             size: 1000,
1097             ack_eliciting: true,
1098             in_flight: true,
1099             delivered: 0,
1100             delivered_time: now,
1101             recent_delivered_packet_sent_time: now,
1102             is_app_limited: false,
1103             has_data: false,
1104         };
1105 
1106         r.on_packet_sent(
1107             p,
1108             packet::EPOCH_APPLICATION,
1109             HandshakeStatus::default(),
1110             now,
1111             "",
1112         );
1113         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1114         assert_eq!(r.bytes_in_flight, 4000);
1115 
1116         // Wait for 10ms.
1117         now += Duration::from_millis(10);
1118 
1119         // Only the first 2 packets are acked.
1120         let mut acked = ranges::RangeSet::default();
1121         acked.insert(0..2);
1122 
1123         assert_eq!(
1124             r.on_ack_received(
1125                 &acked,
1126                 25,
1127                 packet::EPOCH_APPLICATION,
1128                 HandshakeStatus::default(),
1129                 now,
1130                 ""
1131             ),
1132             Ok(())
1133         );
1134 
1135         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1136         assert_eq!(r.bytes_in_flight, 2000);
1137         assert_eq!(r.lost_count, 0);
1138 
1139         // Wait until loss detection timer expires.
1140         now = r.loss_detection_timer().unwrap();
1141 
1142         // PTO.
1143         r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1144         assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 1);
1145         assert_eq!(r.lost_count, 0);
1146         assert_eq!(r.pto_count, 1);
1147 
1148         let p = Sent {
1149             pkt_num: 4,
1150             frames: vec![],
1151             time_sent: now,
1152             time_acked: None,
1153             time_lost: None,
1154             size: 1000,
1155             ack_eliciting: true,
1156             in_flight: true,
1157             delivered: 0,
1158             delivered_time: now,
1159             recent_delivered_packet_sent_time: now,
1160             is_app_limited: false,
1161             has_data: false,
1162         };
1163 
1164         r.on_packet_sent(
1165             p,
1166             packet::EPOCH_APPLICATION,
1167             HandshakeStatus::default(),
1168             now,
1169             "",
1170         );
1171         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1172         assert_eq!(r.bytes_in_flight, 3000);
1173 
1174         let p = Sent {
1175             pkt_num: 5,
1176             frames: vec![],
1177             time_sent: now,
1178             time_acked: None,
1179             time_lost: None,
1180             size: 1000,
1181             ack_eliciting: true,
1182             in_flight: true,
1183             delivered: 0,
1184             delivered_time: now,
1185             recent_delivered_packet_sent_time: now,
1186             is_app_limited: false,
1187             has_data: false,
1188         };
1189 
1190         r.on_packet_sent(
1191             p,
1192             packet::EPOCH_APPLICATION,
1193             HandshakeStatus::default(),
1194             now,
1195             "",
1196         );
1197         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1198         assert_eq!(r.bytes_in_flight, 4000);
1199         assert_eq!(r.lost_count, 0);
1200 
1201         // Wait for 10ms.
1202         now += Duration::from_millis(10);
1203 
1204         // PTO packets are acked.
1205         let mut acked = ranges::RangeSet::default();
1206         acked.insert(4..6);
1207 
1208         assert_eq!(
1209             r.on_ack_received(
1210                 &acked,
1211                 25,
1212                 packet::EPOCH_APPLICATION,
1213                 HandshakeStatus::default(),
1214                 now,
1215                 ""
1216             ),
1217             Ok(())
1218         );
1219 
1220         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1221         assert_eq!(r.bytes_in_flight, 0);
1222 
1223         assert_eq!(r.lost_count, 2);
1224     }
1225 
1226     #[test]
loss_on_timer()1227     fn loss_on_timer() {
1228         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1229         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1230 
1231         let mut r = Recovery::new(&cfg);
1232 
1233         let mut now = Instant::now();
1234 
1235         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1236 
1237         // Start by sending a few packets.
1238         let p = Sent {
1239             pkt_num: 0,
1240             frames: vec![],
1241             time_sent: now,
1242             time_acked: None,
1243             time_lost: None,
1244             size: 1000,
1245             ack_eliciting: true,
1246             in_flight: true,
1247             delivered: 0,
1248             delivered_time: now,
1249             recent_delivered_packet_sent_time: now,
1250             is_app_limited: false,
1251             has_data: false,
1252         };
1253 
1254         r.on_packet_sent(
1255             p,
1256             packet::EPOCH_APPLICATION,
1257             HandshakeStatus::default(),
1258             now,
1259             "",
1260         );
1261         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1262         assert_eq!(r.bytes_in_flight, 1000);
1263 
1264         let p = Sent {
1265             pkt_num: 1,
1266             frames: vec![],
1267             time_sent: now,
1268             time_acked: None,
1269             time_lost: None,
1270             size: 1000,
1271             ack_eliciting: true,
1272             in_flight: true,
1273             delivered: 0,
1274             delivered_time: now,
1275             recent_delivered_packet_sent_time: now,
1276             is_app_limited: false,
1277             has_data: false,
1278         };
1279 
1280         r.on_packet_sent(
1281             p,
1282             packet::EPOCH_APPLICATION,
1283             HandshakeStatus::default(),
1284             now,
1285             "",
1286         );
1287         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1288         assert_eq!(r.bytes_in_flight, 2000);
1289 
1290         let p = Sent {
1291             pkt_num: 2,
1292             frames: vec![],
1293             time_sent: now,
1294             time_acked: None,
1295             time_lost: None,
1296             size: 1000,
1297             ack_eliciting: true,
1298             in_flight: true,
1299             delivered: 0,
1300             delivered_time: now,
1301             recent_delivered_packet_sent_time: now,
1302             is_app_limited: false,
1303             has_data: false,
1304         };
1305 
1306         r.on_packet_sent(
1307             p,
1308             packet::EPOCH_APPLICATION,
1309             HandshakeStatus::default(),
1310             now,
1311             "",
1312         );
1313         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1314         assert_eq!(r.bytes_in_flight, 3000);
1315 
1316         let p = Sent {
1317             pkt_num: 3,
1318             frames: vec![],
1319             time_sent: now,
1320             time_acked: None,
1321             time_lost: None,
1322             size: 1000,
1323             ack_eliciting: true,
1324             in_flight: true,
1325             delivered: 0,
1326             delivered_time: now,
1327             recent_delivered_packet_sent_time: now,
1328             is_app_limited: false,
1329             has_data: false,
1330         };
1331 
1332         r.on_packet_sent(
1333             p,
1334             packet::EPOCH_APPLICATION,
1335             HandshakeStatus::default(),
1336             now,
1337             "",
1338         );
1339         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1340         assert_eq!(r.bytes_in_flight, 4000);
1341 
1342         // Wait for 10ms.
1343         now += Duration::from_millis(10);
1344 
1345         // Only the first 2 packets and the last one are acked.
1346         let mut acked = ranges::RangeSet::default();
1347         acked.insert(0..2);
1348         acked.insert(3..4);
1349 
1350         assert_eq!(
1351             r.on_ack_received(
1352                 &acked,
1353                 25,
1354                 packet::EPOCH_APPLICATION,
1355                 HandshakeStatus::default(),
1356                 now,
1357                 ""
1358             ),
1359             Ok(())
1360         );
1361 
1362         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1363         assert_eq!(r.bytes_in_flight, 1000);
1364         assert_eq!(r.lost_count, 0);
1365 
1366         // Wait until loss detection timer expires.
1367         now = r.loss_detection_timer().unwrap();
1368 
1369         // Packet is declared lost.
1370         r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1371         assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 0);
1372 
1373         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1374         assert_eq!(r.bytes_in_flight, 0);
1375 
1376         assert_eq!(r.lost_count, 1);
1377     }
1378 
1379     #[test]
loss_on_reordering()1380     fn loss_on_reordering() {
1381         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1382         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1383 
1384         let mut r = Recovery::new(&cfg);
1385 
1386         let mut now = Instant::now();
1387 
1388         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1389 
1390         // Start by sending a few packets.
1391         let p = Sent {
1392             pkt_num: 0,
1393             frames: vec![],
1394             time_sent: now,
1395             time_acked: None,
1396             time_lost: None,
1397             size: 1000,
1398             ack_eliciting: true,
1399             in_flight: true,
1400             delivered: 0,
1401             delivered_time: now,
1402             recent_delivered_packet_sent_time: now,
1403             is_app_limited: false,
1404             has_data: false,
1405         };
1406 
1407         r.on_packet_sent(
1408             p,
1409             packet::EPOCH_APPLICATION,
1410             HandshakeStatus::default(),
1411             now,
1412             "",
1413         );
1414         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1415         assert_eq!(r.bytes_in_flight, 1000);
1416 
1417         let p = Sent {
1418             pkt_num: 1,
1419             frames: vec![],
1420             time_sent: now,
1421             time_acked: None,
1422             time_lost: None,
1423             size: 1000,
1424             ack_eliciting: true,
1425             in_flight: true,
1426             delivered: 0,
1427             delivered_time: now,
1428             recent_delivered_packet_sent_time: now,
1429             is_app_limited: false,
1430             has_data: false,
1431         };
1432 
1433         r.on_packet_sent(
1434             p,
1435             packet::EPOCH_APPLICATION,
1436             HandshakeStatus::default(),
1437             now,
1438             "",
1439         );
1440         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1441         assert_eq!(r.bytes_in_flight, 2000);
1442 
1443         let p = Sent {
1444             pkt_num: 2,
1445             frames: vec![],
1446             time_sent: now,
1447             time_acked: None,
1448             time_lost: None,
1449             size: 1000,
1450             ack_eliciting: true,
1451             in_flight: true,
1452             delivered: 0,
1453             delivered_time: now,
1454             recent_delivered_packet_sent_time: now,
1455             is_app_limited: false,
1456             has_data: false,
1457         };
1458 
1459         r.on_packet_sent(
1460             p,
1461             packet::EPOCH_APPLICATION,
1462             HandshakeStatus::default(),
1463             now,
1464             "",
1465         );
1466         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1467         assert_eq!(r.bytes_in_flight, 3000);
1468 
1469         let p = Sent {
1470             pkt_num: 3,
1471             frames: vec![],
1472             time_sent: now,
1473             time_acked: None,
1474             time_lost: None,
1475             size: 1000,
1476             ack_eliciting: true,
1477             in_flight: true,
1478             delivered: 0,
1479             delivered_time: now,
1480             recent_delivered_packet_sent_time: now,
1481             is_app_limited: false,
1482             has_data: false,
1483         };
1484 
1485         r.on_packet_sent(
1486             p,
1487             packet::EPOCH_APPLICATION,
1488             HandshakeStatus::default(),
1489             now,
1490             "",
1491         );
1492         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1493         assert_eq!(r.bytes_in_flight, 4000);
1494 
1495         // Wait for 10ms.
1496         now += Duration::from_millis(10);
1497 
1498         // ACKs are reordered.
1499         let mut acked = ranges::RangeSet::default();
1500         acked.insert(2..4);
1501 
1502         assert_eq!(
1503             r.on_ack_received(
1504                 &acked,
1505                 25,
1506                 packet::EPOCH_APPLICATION,
1507                 HandshakeStatus::default(),
1508                 now,
1509                 ""
1510             ),
1511             Ok(())
1512         );
1513 
1514         now += Duration::from_millis(10);
1515 
1516         let mut acked = ranges::RangeSet::default();
1517         acked.insert(0..2);
1518 
1519         assert_eq!(
1520             r.on_ack_received(
1521                 &acked,
1522                 25,
1523                 packet::EPOCH_APPLICATION,
1524                 HandshakeStatus::default(),
1525                 now,
1526                 ""
1527             ),
1528             Ok(())
1529         );
1530 
1531         assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1532         assert_eq!(r.bytes_in_flight, 0);
1533 
1534         // Spurious loss.
1535         assert_eq!(r.lost_count, 1);
1536     }
1537 }
1538 
1539 mod cubic;
1540 mod delivery_rate;
1541 mod hystart;
1542 mod reno;
1543