• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 use std::cmp;
28 
29 use std::str::FromStr;
30 
31 use std::time::Duration;
32 use std::time::Instant;
33 
34 use std::collections::VecDeque;
35 
36 use crate::Config;
37 use crate::Result;
38 
39 use crate::frame;
40 use crate::minmax;
41 use crate::packet;
42 use crate::ranges;
43 
44 #[cfg(feature = "qlog")]
45 use qlog::events::EventData;
46 
47 use smallvec::SmallVec;
48 
49 // Loss Recovery
50 const INITIAL_PACKET_THRESHOLD: u64 = 3;
51 
52 const MAX_PACKET_THRESHOLD: u64 = 20;
53 
54 const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
55 
56 const GRANULARITY: Duration = Duration::from_millis(1);
57 
58 const INITIAL_RTT: Duration = Duration::from_millis(333);
59 
60 const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;
61 
62 const RTT_WINDOW: Duration = Duration::from_secs(300);
63 
64 const MAX_PTO_PROBES_COUNT: usize = 2;
65 
66 // Congestion Control
67 const INITIAL_WINDOW_PACKETS: usize = 10;
68 
69 const MINIMUM_WINDOW_PACKETS: usize = 2;
70 
71 const LOSS_REDUCTION_FACTOR: f64 = 0.5;
72 
73 const PACING_MULTIPLIER: f64 = 1.25;
74 
75 // How many non ACK eliciting packets we send before including a PING to solicit
76 // an ACK.
77 pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
78 
79 pub struct Recovery {
80     loss_detection_timer: Option<Instant>,
81 
82     pto_count: u32,
83 
84     time_of_last_sent_ack_eliciting_pkt:
85         [Option<Instant>; packet::Epoch::count()],
86 
87     largest_acked_pkt: [u64; packet::Epoch::count()],
88 
89     largest_sent_pkt: [u64; packet::Epoch::count()],
90 
91     latest_rtt: Duration,
92 
93     smoothed_rtt: Option<Duration>,
94 
95     rttvar: Duration,
96 
97     minmax_filter: minmax::Minmax<Duration>,
98 
99     min_rtt: Duration,
100 
101     pub max_ack_delay: Duration,
102 
103     loss_time: [Option<Instant>; packet::Epoch::count()],
104 
105     sent: [VecDeque<Sent>; packet::Epoch::count()],
106 
107     pub lost: [Vec<frame::Frame>; packet::Epoch::count()],
108 
109     pub acked: [Vec<frame::Frame>; packet::Epoch::count()],
110 
111     pub lost_count: usize,
112 
113     pub lost_spurious_count: usize,
114 
115     pub loss_probes: [usize; packet::Epoch::count()],
116 
117     in_flight_count: [usize; packet::Epoch::count()],
118 
119     app_limited: bool,
120 
121     delivery_rate: delivery_rate::Rate,
122 
123     pkt_thresh: u64,
124 
125     time_thresh: f64,
126 
127     // Congestion control.
128     cc_ops: &'static CongestionControlOps,
129 
130     congestion_window: usize,
131 
132     bytes_in_flight: usize,
133 
134     ssthresh: usize,
135 
136     bytes_acked_sl: usize,
137 
138     bytes_acked_ca: usize,
139 
140     bytes_sent: usize,
141 
142     pub bytes_lost: u64,
143 
144     congestion_recovery_start_time: Option<Instant>,
145 
146     max_datagram_size: usize,
147 
148     cubic_state: cubic::State,
149 
150     // HyStart++.
151     hystart: hystart::Hystart,
152 
153     // Pacing.
154     pub pacer: pacer::Pacer,
155 
156     // RFC6937 PRR.
157     prr: prr::PRR,
158 
159     #[cfg(feature = "qlog")]
160     qlog_metrics: QlogMetrics,
161 
162     // The maximum size of a data aggregate scheduled and
163     // transmitted together.
164     send_quantum: usize,
165 
166     // BBR state.
167     bbr_state: bbr::State,
168 
169     /// How many non-ack-eliciting packets have been sent.
170     outstanding_non_ack_eliciting: usize,
171 }
172 
173 pub struct RecoveryConfig {
174     max_send_udp_payload_size: usize,
175     pub max_ack_delay: Duration,
176     cc_ops: &'static CongestionControlOps,
177     hystart: bool,
178     pacing: bool,
179 }
180 
181 impl RecoveryConfig {
from_config(config: &Config) -> Self182     pub fn from_config(config: &Config) -> Self {
183         Self {
184             max_send_udp_payload_size: config.max_send_udp_payload_size,
185             max_ack_delay: Duration::ZERO,
186             cc_ops: config.cc_algorithm.into(),
187             hystart: config.hystart,
188             pacing: config.pacing,
189         }
190     }
191 }
192 
193 impl Recovery {
new_with_config(recovery_config: &RecoveryConfig) -> Self194     pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
195         let initial_congestion_window =
196             recovery_config.max_send_udp_payload_size * INITIAL_WINDOW_PACKETS;
197 
198         Recovery {
199             loss_detection_timer: None,
200 
201             pto_count: 0,
202 
203             time_of_last_sent_ack_eliciting_pkt: [None; packet::Epoch::count()],
204 
205             largest_acked_pkt: [u64::MAX; packet::Epoch::count()],
206 
207             largest_sent_pkt: [0; packet::Epoch::count()],
208 
209             latest_rtt: Duration::ZERO,
210 
211             // This field should be initialized to `INITIAL_RTT` for the initial
212             // PTO calculation, but it also needs to be an `Option` to track
213             // whether any RTT sample was received, so the initial value is
214             // handled by the `rtt()` method instead.
215             smoothed_rtt: None,
216 
217             minmax_filter: minmax::Minmax::new(Duration::ZERO),
218 
219             min_rtt: Duration::ZERO,
220 
221             rttvar: INITIAL_RTT / 2,
222 
223             max_ack_delay: recovery_config.max_ack_delay,
224 
225             loss_time: [None; packet::Epoch::count()],
226 
227             sent: [VecDeque::new(), VecDeque::new(), VecDeque::new()],
228 
229             lost: [Vec::new(), Vec::new(), Vec::new()],
230 
231             acked: [Vec::new(), Vec::new(), Vec::new()],
232 
233             lost_count: 0,
234             lost_spurious_count: 0,
235 
236             loss_probes: [0; packet::Epoch::count()],
237 
238             in_flight_count: [0; packet::Epoch::count()],
239 
240             congestion_window: initial_congestion_window,
241 
242             pkt_thresh: INITIAL_PACKET_THRESHOLD,
243 
244             time_thresh: INITIAL_TIME_THRESHOLD,
245 
246             bytes_in_flight: 0,
247 
248             ssthresh: usize::MAX,
249 
250             bytes_acked_sl: 0,
251 
252             bytes_acked_ca: 0,
253 
254             bytes_sent: 0,
255 
256             bytes_lost: 0,
257 
258             congestion_recovery_start_time: None,
259 
260             max_datagram_size: recovery_config.max_send_udp_payload_size,
261 
262             cc_ops: recovery_config.cc_ops,
263 
264             delivery_rate: delivery_rate::Rate::default(),
265 
266             cubic_state: cubic::State::default(),
267 
268             app_limited: false,
269 
270             hystart: hystart::Hystart::new(recovery_config.hystart),
271 
272             pacer: pacer::Pacer::new(
273                 recovery_config.pacing,
274                 initial_congestion_window,
275                 0,
276                 recovery_config.max_send_udp_payload_size,
277             ),
278 
279             prr: prr::PRR::default(),
280 
281             send_quantum: initial_congestion_window,
282 
283             #[cfg(feature = "qlog")]
284             qlog_metrics: QlogMetrics::default(),
285 
286             bbr_state: bbr::State::new(),
287 
288             outstanding_non_ack_eliciting: 0,
289         }
290     }
291 
new(config: &Config) -> Self292     pub fn new(config: &Config) -> Self {
293         Self::new_with_config(&RecoveryConfig::from_config(config))
294     }
295 
on_init(&mut self)296     pub fn on_init(&mut self) {
297         (self.cc_ops.on_init)(self);
298     }
299 
reset(&mut self)300     pub fn reset(&mut self) {
301         self.congestion_window = self.max_datagram_size * INITIAL_WINDOW_PACKETS;
302         self.in_flight_count = [0; packet::Epoch::count()];
303         self.congestion_recovery_start_time = None;
304         self.ssthresh = usize::MAX;
305         (self.cc_ops.reset)(self);
306         self.hystart.reset();
307         self.prr = prr::PRR::default();
308     }
309 
310     /// Returns whether or not we should elicit an ACK even if we wouldn't
311     /// otherwise have constructed an ACK eliciting packet.
should_elicit_ack(&self, epoch: packet::Epoch) -> bool312     pub fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
313         self.loss_probes[epoch] > 0 ||
314             self.outstanding_non_ack_eliciting >=
315                 MAX_OUTSTANDING_NON_ACK_ELICITING
316     }
317 
on_packet_sent( &mut self, mut pkt: Sent, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )318     pub fn on_packet_sent(
319         &mut self, mut pkt: Sent, epoch: packet::Epoch,
320         handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
321     ) {
322         let ack_eliciting = pkt.ack_eliciting;
323         let in_flight = pkt.in_flight;
324         let sent_bytes = pkt.size;
325         let pkt_num = pkt.pkt_num;
326 
327         if ack_eliciting {
328             self.outstanding_non_ack_eliciting = 0;
329         } else {
330             self.outstanding_non_ack_eliciting += 1;
331         }
332 
333         self.largest_sent_pkt[epoch] =
334             cmp::max(self.largest_sent_pkt[epoch], pkt_num);
335 
336         if in_flight {
337             if ack_eliciting {
338                 self.time_of_last_sent_ack_eliciting_pkt[epoch] = Some(now);
339             }
340 
341             self.in_flight_count[epoch] += 1;
342 
343             self.update_app_limited(
344                 (self.bytes_in_flight + sent_bytes) < self.congestion_window,
345             );
346 
347             self.on_packet_sent_cc(sent_bytes, now);
348 
349             self.prr.on_packet_sent(sent_bytes);
350 
351             self.set_loss_detection_timer(handshake_status, now);
352         }
353 
354         // HyStart++: Start of the round in a slow start.
355         if self.hystart.enabled() &&
356             epoch == packet::Epoch::Application &&
357             self.congestion_window < self.ssthresh
358         {
359             self.hystart.start_round(pkt_num);
360         }
361 
362         // Pacing: Set the pacing rate if CC doesn't do its own.
363         if !(self.cc_ops.has_custom_pacing)() {
364             if let Some(srtt) = self.smoothed_rtt {
365                 let rate = PACING_MULTIPLIER * self.congestion_window as f64 /
366                     srtt.as_secs_f64();
367                 self.set_pacing_rate(rate as u64, now);
368             }
369         }
370 
371         self.schedule_next_packet(epoch, now, sent_bytes);
372 
373         pkt.time_sent = self.get_packet_send_time();
374 
375         // bytes_in_flight is already updated. Use previous value.
376         self.delivery_rate
377             .on_packet_sent(&mut pkt, self.bytes_in_flight - sent_bytes);
378 
379         self.sent[epoch].push_back(pkt);
380 
381         self.bytes_sent += sent_bytes;
382         trace!("{} {:?}", trace_id, self);
383     }
384 
on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant)385     fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
386         (self.cc_ops.on_packet_sent)(self, sent_bytes, now);
387     }
388 
set_pacing_rate(&mut self, rate: u64, now: Instant)389     pub fn set_pacing_rate(&mut self, rate: u64, now: Instant) {
390         self.pacer.update(self.send_quantum, rate, now);
391     }
392 
get_packet_send_time(&self) -> Instant393     pub fn get_packet_send_time(&self) -> Instant {
394         self.pacer.next_time()
395     }
396 
schedule_next_packet( &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, )397     fn schedule_next_packet(
398         &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
399     ) {
400         // Don't pace in any of these cases:
401         //   * Packet contains no data.
402         //   * Packet epoch is not Epoch::Application.
403         //   * The congestion window is within initcwnd.
404 
405         let is_app = epoch == packet::Epoch::Application;
406 
407         let in_initcwnd =
408             self.bytes_sent < self.max_datagram_size * INITIAL_WINDOW_PACKETS;
409 
410         let sent_bytes = if !self.pacer.enabled() || !is_app || in_initcwnd {
411             0
412         } else {
413             packet_size
414         };
415 
416         self.pacer.send(sent_bytes, now);
417     }
418 
on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> Result<(usize, usize)>419     pub fn on_ack_received(
420         &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
421         epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
422         trace_id: &str,
423     ) -> Result<(usize, usize)> {
424         let largest_acked = ranges.last().unwrap();
425 
426         // While quiche used to consider ACK frames acknowledging packet numbers
427         // larger than the largest sent one as invalid, this is not true anymore
428         // if we consider a single packet number space and multiple paths. The
429         // simplest example is the case where the host sends a probing packet on
430         // a validating path, then receives an acknowledgment for that packet on
431         // the active one.
432 
433         if self.largest_acked_pkt[epoch] == u64::MAX {
434             self.largest_acked_pkt[epoch] = largest_acked;
435         } else {
436             self.largest_acked_pkt[epoch] =
437                 cmp::max(self.largest_acked_pkt[epoch], largest_acked);
438         }
439 
440         let mut has_ack_eliciting = false;
441 
442         let mut largest_newly_acked_pkt_num = 0;
443         let mut largest_newly_acked_sent_time = now;
444 
445         let mut newly_acked = Vec::new();
446 
447         let mut undo_cwnd = false;
448 
449         let max_rtt = cmp::max(self.latest_rtt, self.rtt());
450 
451         // Detect and mark acked packets, without removing them from the sent
452         // packets list.
453         for r in ranges.iter() {
454             let lowest_acked_in_block = r.start;
455             let largest_acked_in_block = r.end - 1;
456 
457             let unacked_iter = self.sent[epoch]
458                 .iter_mut()
459                 // Skip packets that precede the lowest acked packet in the block.
460                 .skip_while(|p| p.pkt_num < lowest_acked_in_block)
461                 // Skip packets that follow the largest acked packet in the block.
462                 .take_while(|p| p.pkt_num <= largest_acked_in_block)
463                 // Skip packets that have already been acked or lost.
464                 .filter(|p| p.time_acked.is_none());
465 
466             for unacked in unacked_iter {
467                 unacked.time_acked = Some(now);
468 
469                 // Check if acked packet was already declared lost.
470                 if unacked.time_lost.is_some() {
471                     // Calculate new packet reordering threshold.
472                     let pkt_thresh =
473                         self.largest_acked_pkt[epoch] - unacked.pkt_num + 1;
474                     let pkt_thresh = cmp::min(MAX_PACKET_THRESHOLD, pkt_thresh);
475 
476                     self.pkt_thresh = cmp::max(self.pkt_thresh, pkt_thresh);
477 
478                     // Calculate new time reordering threshold.
479                     let loss_delay = max_rtt.mul_f64(self.time_thresh);
480 
481                     // unacked.time_sent can be in the future due to
482                     // pacing.
483                     if now.saturating_duration_since(unacked.time_sent) >
484                         loss_delay
485                     {
486                         // TODO: do time threshold update
487                         self.time_thresh = 5_f64 / 4_f64;
488                     }
489 
490                     if unacked.in_flight {
491                         undo_cwnd = true;
492                     }
493 
494                     self.lost_spurious_count += 1;
495                     continue;
496                 }
497 
498                 if unacked.ack_eliciting {
499                     has_ack_eliciting = true;
500                 }
501 
502                 largest_newly_acked_pkt_num = unacked.pkt_num;
503                 largest_newly_acked_sent_time = unacked.time_sent;
504 
505                 self.acked[epoch].extend(unacked.frames.drain(..));
506 
507                 if unacked.in_flight {
508                     self.in_flight_count[epoch] =
509                         self.in_flight_count[epoch].saturating_sub(1);
510                 }
511 
512                 newly_acked.push(Acked {
513                     pkt_num: unacked.pkt_num,
514 
515                     time_sent: unacked.time_sent,
516 
517                     size: unacked.size,
518 
519                     rtt: now.saturating_duration_since(unacked.time_sent),
520 
521                     delivered: unacked.delivered,
522 
523                     delivered_time: unacked.delivered_time,
524 
525                     first_sent_time: unacked.first_sent_time,
526 
527                     is_app_limited: unacked.is_app_limited,
528                 });
529 
530                 trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
531             }
532         }
533 
534         // Undo congestion window update.
535         if undo_cwnd {
536             (self.cc_ops.rollback)(self);
537         }
538 
539         if newly_acked.is_empty() {
540             return Ok((0, 0));
541         }
542 
543         if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
544             // The packet's sent time could be in the future if pacing is used
545             // and the network has a very short RTT.
546             let latest_rtt =
547                 now.saturating_duration_since(largest_newly_acked_sent_time);
548 
549             let ack_delay = if epoch == packet::Epoch::Application {
550                 Duration::from_micros(ack_delay)
551             } else {
552                 Duration::from_micros(0)
553             };
554 
555             // Don't update srtt if rtt is zero.
556             if !latest_rtt.is_zero() {
557                 self.update_rtt(latest_rtt, ack_delay, now);
558             }
559         }
560 
561         // Detect and mark lost packets without removing them from the sent
562         // packets list.
563         let (lost_packets, lost_bytes) =
564             self.detect_lost_packets(epoch, now, trace_id);
565 
566         self.on_packets_acked(newly_acked, epoch, now);
567 
568         self.pto_count = 0;
569 
570         self.set_loss_detection_timer(handshake_status, now);
571 
572         self.drain_packets(epoch, now);
573 
574         Ok((lost_packets, lost_bytes))
575     }
576 
on_loss_detection_timeout( &mut self, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> (usize, usize)577     pub fn on_loss_detection_timeout(
578         &mut self, handshake_status: HandshakeStatus, now: Instant,
579         trace_id: &str,
580     ) -> (usize, usize) {
581         let (earliest_loss_time, epoch) = self.loss_time_and_space();
582 
583         if earliest_loss_time.is_some() {
584             // Time threshold loss detection.
585             let (lost_packets, lost_bytes) =
586                 self.detect_lost_packets(epoch, now, trace_id);
587 
588             self.set_loss_detection_timer(handshake_status, now);
589 
590             trace!("{} {:?}", trace_id, self);
591             return (lost_packets, lost_bytes);
592         }
593 
594         let epoch = if self.bytes_in_flight > 0 {
595             // Send new data if available, else retransmit old data. If neither
596             // is available, send a single PING frame.
597             let (_, e) = self.pto_time_and_space(handshake_status, now);
598 
599             e
600         } else {
601             // Client sends an anti-deadlock packet: Initial is padded to earn
602             // more anti-amplification credit, a Handshake packet proves address
603             // ownership.
604             if handshake_status.has_handshake_keys {
605                 packet::Epoch::Handshake
606             } else {
607                 packet::Epoch::Initial
608             }
609         };
610 
611         self.pto_count += 1;
612 
613         self.loss_probes[epoch] =
614             cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
615 
616         let unacked_iter = self.sent[epoch]
617             .iter_mut()
618             // Skip packets that have already been acked or lost, and packets
619             // that don't contain either CRYPTO or STREAM frames.
620             .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
621             // Only return as many packets as the number of probe packets that
622             // will be sent.
623             .take(self.loss_probes[epoch]);
624 
625         // Retransmit the frames from the oldest sent packets on PTO. However
626         // the packets are not actually declared lost (so there is no effect to
627         // congestion control), we just reschedule the data they carried.
628         //
629         // This will also trigger sending an ACK and retransmitting frames like
630         // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
631         // to CRYPTO and STREAM, if the original packet carried them.
632         for unacked in unacked_iter {
633             self.lost[epoch].extend_from_slice(&unacked.frames);
634         }
635 
636         self.set_loss_detection_timer(handshake_status, now);
637 
638         trace!("{} {:?}", trace_id, self);
639 
640         (0, 0)
641     }
642 
on_pkt_num_space_discarded( &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, )643     pub fn on_pkt_num_space_discarded(
644         &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
645         now: Instant,
646     ) {
647         let unacked_bytes = self.sent[epoch]
648             .iter()
649             .filter(|p| {
650                 p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
651             })
652             .fold(0, |acc, p| acc + p.size);
653 
654         self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked_bytes);
655 
656         self.sent[epoch].clear();
657         self.lost[epoch].clear();
658         self.acked[epoch].clear();
659 
660         self.time_of_last_sent_ack_eliciting_pkt[epoch] = None;
661         self.loss_time[epoch] = None;
662         self.loss_probes[epoch] = 0;
663         self.in_flight_count[epoch] = 0;
664 
665         self.set_loss_detection_timer(handshake_status, now);
666     }
667 
loss_detection_timer(&self) -> Option<Instant>668     pub fn loss_detection_timer(&self) -> Option<Instant> {
669         self.loss_detection_timer
670     }
671 
cwnd(&self) -> usize672     pub fn cwnd(&self) -> usize {
673         self.congestion_window
674     }
675 
cwnd_available(&self) -> usize676     pub fn cwnd_available(&self) -> usize {
677         // Ignore cwnd when sending probe packets.
678         if self.loss_probes.iter().any(|&x| x > 0) {
679             return usize::MAX;
680         }
681 
682         // Open more space (snd_cnt) for PRR when allowed.
683         self.congestion_window.saturating_sub(self.bytes_in_flight) +
684             self.prr.snd_cnt
685     }
686 
rtt(&self) -> Duration687     pub fn rtt(&self) -> Duration {
688         self.smoothed_rtt.unwrap_or(INITIAL_RTT)
689     }
690 
min_rtt(&self) -> Option<Duration>691     pub fn min_rtt(&self) -> Option<Duration> {
692         if self.min_rtt == Duration::ZERO {
693             return None;
694         }
695 
696         Some(self.min_rtt)
697     }
698 
rttvar(&self) -> Duration699     pub fn rttvar(&self) -> Duration {
700         self.rttvar
701     }
702 
pto(&self) -> Duration703     pub fn pto(&self) -> Duration {
704         self.rtt() + cmp::max(self.rttvar * 4, GRANULARITY)
705     }
706 
delivery_rate(&self) -> u64707     pub fn delivery_rate(&self) -> u64 {
708         self.delivery_rate.sample_delivery_rate()
709     }
710 
max_datagram_size(&self) -> usize711     pub fn max_datagram_size(&self) -> usize {
712         self.max_datagram_size
713     }
714 
update_max_datagram_size(&mut self, new_max_datagram_size: usize)715     pub fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
716         let max_datagram_size =
717             cmp::min(self.max_datagram_size, new_max_datagram_size);
718 
719         // Update cwnd if it hasn't been updated yet.
720         if self.congestion_window ==
721             self.max_datagram_size * INITIAL_WINDOW_PACKETS
722         {
723             self.congestion_window = max_datagram_size * INITIAL_WINDOW_PACKETS;
724         }
725 
726         self.pacer = pacer::Pacer::new(
727             self.pacer.enabled(),
728             self.congestion_window,
729             0,
730             max_datagram_size,
731         );
732 
733         self.max_datagram_size = max_datagram_size;
734     }
735 
update_rtt( &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant, )736     fn update_rtt(
737         &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant,
738     ) {
739         self.latest_rtt = latest_rtt;
740 
741         match self.smoothed_rtt {
742             // First RTT sample.
743             None => {
744                 self.min_rtt = self.minmax_filter.reset(now, latest_rtt);
745 
746                 self.smoothed_rtt = Some(latest_rtt);
747 
748                 self.rttvar = latest_rtt / 2;
749             },
750 
751             Some(srtt) => {
752                 self.min_rtt =
753                     self.minmax_filter.running_min(RTT_WINDOW, now, latest_rtt);
754 
755                 let ack_delay = cmp::min(self.max_ack_delay, ack_delay);
756 
757                 // Adjust for ack delay if plausible.
758                 let adjusted_rtt = if latest_rtt > self.min_rtt + ack_delay {
759                     latest_rtt - ack_delay
760                 } else {
761                     latest_rtt
762                 };
763 
764                 self.rttvar = self.rttvar.mul_f64(3.0 / 4.0) +
765                     sub_abs(srtt, adjusted_rtt).mul_f64(1.0 / 4.0);
766 
767                 self.smoothed_rtt = Some(
768                     srtt.mul_f64(7.0 / 8.0) + adjusted_rtt.mul_f64(1.0 / 8.0),
769                 );
770             },
771         }
772     }
773 
loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch)774     fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
775         let mut epoch = packet::Epoch::Initial;
776         let mut time = self.loss_time[epoch];
777 
778         // Iterate over all packet number spaces starting from Handshake.
779         for &e in packet::Epoch::epochs(
780             packet::Epoch::Handshake..=packet::Epoch::Application,
781         ) {
782             let new_time = self.loss_time[e];
783 
784             if time.is_none() || new_time < time {
785                 time = new_time;
786                 epoch = e;
787             }
788         }
789 
790         (time, epoch)
791     }
792 
pto_time_and_space( &self, handshake_status: HandshakeStatus, now: Instant, ) -> (Option<Instant>, packet::Epoch)793     fn pto_time_and_space(
794         &self, handshake_status: HandshakeStatus, now: Instant,
795     ) -> (Option<Instant>, packet::Epoch) {
796         let mut duration = self.pto() * 2_u32.pow(self.pto_count);
797 
798         // Arm PTO from now when there are no inflight packets.
799         if self.bytes_in_flight == 0 {
800             if handshake_status.has_handshake_keys {
801                 return (Some(now + duration), packet::Epoch::Handshake);
802             } else {
803                 return (Some(now + duration), packet::Epoch::Initial);
804             }
805         }
806 
807         let mut pto_timeout = None;
808         let mut pto_space = packet::Epoch::Initial;
809 
810         // Iterate over all packet number spaces.
811         for &e in packet::Epoch::epochs(
812             packet::Epoch::Initial..=packet::Epoch::Application,
813         ) {
814             if self.in_flight_count[e] == 0 {
815                 continue;
816             }
817 
818             if e == packet::Epoch::Application {
819                 // Skip Application Data until handshake completes.
820                 if !handshake_status.completed {
821                     return (pto_timeout, pto_space);
822                 }
823 
824                 // Include max_ack_delay and backoff for Application Data.
825                 duration += self.max_ack_delay * 2_u32.pow(self.pto_count);
826             }
827 
828             let new_time =
829                 self.time_of_last_sent_ack_eliciting_pkt[e].map(|t| t + duration);
830 
831             if pto_timeout.is_none() || new_time < pto_timeout {
832                 pto_timeout = new_time;
833                 pto_space = e;
834             }
835         }
836 
837         (pto_timeout, pto_space)
838     }
839 
set_loss_detection_timer( &mut self, handshake_status: HandshakeStatus, now: Instant, )840     fn set_loss_detection_timer(
841         &mut self, handshake_status: HandshakeStatus, now: Instant,
842     ) {
843         let (earliest_loss_time, _) = self.loss_time_and_space();
844 
845         if earliest_loss_time.is_some() {
846             // Time threshold loss detection.
847             self.loss_detection_timer = earliest_loss_time;
848             return;
849         }
850 
851         if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
852             self.loss_detection_timer = None;
853             return;
854         }
855 
856         // PTO timer.
857         let (timeout, _) = self.pto_time_and_space(handshake_status, now);
858         self.loss_detection_timer = timeout;
859     }
860 
detect_lost_packets( &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str, ) -> (usize, usize)861     fn detect_lost_packets(
862         &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
863     ) -> (usize, usize) {
864         let largest_acked = self.largest_acked_pkt[epoch];
865 
866         self.loss_time[epoch] = None;
867 
868         let loss_delay =
869             cmp::max(self.latest_rtt, self.rtt()).mul_f64(self.time_thresh);
870 
871         // Minimum time of kGranularity before packets are deemed lost.
872         let loss_delay = cmp::max(loss_delay, GRANULARITY);
873 
874         // Packets sent before this time are deemed lost.
875         let lost_send_time = now.checked_sub(loss_delay).unwrap();
876 
877         let mut lost_packets = 0;
878         let mut lost_bytes = 0;
879 
880         let mut largest_lost_pkt = None;
881 
882         let unacked_iter = self.sent[epoch]
883             .iter_mut()
884             // Skip packets that follow the largest acked packet.
885             .take_while(|p| p.pkt_num <= largest_acked)
886             // Skip packets that have already been acked or lost.
887             .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
888 
889         for unacked in unacked_iter {
890             // Mark packet as lost, or set time when it should be marked.
891             if unacked.time_sent <= lost_send_time ||
892                 largest_acked >= unacked.pkt_num + self.pkt_thresh
893             {
894                 self.lost[epoch].extend(unacked.frames.drain(..));
895 
896                 unacked.time_lost = Some(now);
897 
898                 if unacked.in_flight {
899                     lost_bytes += unacked.size;
900 
901                     // Frames have already been removed from the packet, so
902                     // cloning the whole packet should be relatively cheap.
903                     largest_lost_pkt = Some(unacked.clone());
904 
905                     self.in_flight_count[epoch] =
906                         self.in_flight_count[epoch].saturating_sub(1);
907 
908                     trace!(
909                         "{} packet {} lost on epoch {}",
910                         trace_id,
911                         unacked.pkt_num,
912                         epoch
913                     );
914                 }
915 
916                 lost_packets += 1;
917                 self.lost_count += 1;
918             } else {
919                 let loss_time = match self.loss_time[epoch] {
920                     None => unacked.time_sent + loss_delay,
921 
922                     Some(loss_time) =>
923                         cmp::min(loss_time, unacked.time_sent + loss_delay),
924                 };
925 
926                 self.loss_time[epoch] = Some(loss_time);
927             }
928         }
929 
930         self.bytes_lost += lost_bytes as u64;
931 
932         if let Some(pkt) = largest_lost_pkt {
933             self.on_packets_lost(lost_bytes, &pkt, epoch, now);
934         }
935 
936         self.drain_packets(epoch, now);
937 
938         (lost_packets, lost_bytes)
939     }
940 
drain_packets(&mut self, epoch: packet::Epoch, now: Instant)941     fn drain_packets(&mut self, epoch: packet::Epoch, now: Instant) {
942         let mut lowest_non_expired_pkt_index = self.sent[epoch].len();
943 
944         // In order to avoid removing elements from the middle of the list
945         // (which would require copying other elements to compact the list),
946         // we only remove a contiguous range of elements from the start of the
947         // list.
948         //
949         // This means that acked or lost elements coming after this will not
950         // be removed at this point, but their removal is delayed for a later
951         // time, once the gaps have been filled.
952 
953         // First, find the first element that is neither acked nor lost.
954         for (i, pkt) in self.sent[epoch].iter().enumerate() {
955             if let Some(time_lost) = pkt.time_lost {
956                 if time_lost + self.rtt() > now {
957                     lowest_non_expired_pkt_index = i;
958                     break;
959                 }
960             }
961 
962             if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
963                 lowest_non_expired_pkt_index = i;
964                 break;
965             }
966         }
967 
968         // Then remove elements up to the previously found index.
969         self.sent[epoch].drain(..lowest_non_expired_pkt_index);
970     }
971 
on_packets_acked( &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant, )972     fn on_packets_acked(
973         &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant,
974     ) {
975         // Update delivery rate sample per acked packet.
976         for pkt in &acked {
977             self.delivery_rate.update_rate_sample(pkt, now);
978         }
979 
980         // Fill in a rate sample.
981         self.delivery_rate.generate_rate_sample(self.min_rtt);
982 
983         // Call congestion control hooks.
984         (self.cc_ops.on_packets_acked)(self, &acked, epoch, now);
985     }
986 
in_congestion_recovery(&self, sent_time: Instant) -> bool987     fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
988         match self.congestion_recovery_start_time {
989             Some(congestion_recovery_start_time) =>
990                 sent_time <= congestion_recovery_start_time,
991 
992             None => false,
993         }
994     }
995 
in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool996     fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool {
997         let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD;
998 
999         // TODO: properly detect persistent congestion
1000         false
1001     }
1002 
on_packets_lost( &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, )1003     fn on_packets_lost(
1004         &mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
1005         epoch: packet::Epoch, now: Instant,
1006     ) {
1007         self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
1008 
1009         self.congestion_event(lost_bytes, largest_lost_pkt.time_sent, epoch, now);
1010 
1011         if self.in_persistent_congestion(largest_lost_pkt.pkt_num) {
1012             self.collapse_cwnd();
1013         }
1014     }
1015 
congestion_event( &mut self, lost_bytes: usize, time_sent: Instant, epoch: packet::Epoch, now: Instant, )1016     fn congestion_event(
1017         &mut self, lost_bytes: usize, time_sent: Instant, epoch: packet::Epoch,
1018         now: Instant,
1019     ) {
1020         if !self.in_congestion_recovery(time_sent) {
1021             (self.cc_ops.checkpoint)(self);
1022         }
1023 
1024         (self.cc_ops.congestion_event)(self, lost_bytes, time_sent, epoch, now);
1025     }
1026 
collapse_cwnd(&mut self)1027     fn collapse_cwnd(&mut self) {
1028         (self.cc_ops.collapse_cwnd)(self);
1029     }
1030 
update_app_limited(&mut self, v: bool)1031     pub fn update_app_limited(&mut self, v: bool) {
1032         self.app_limited = v;
1033     }
1034 
app_limited(&self) -> bool1035     pub fn app_limited(&self) -> bool {
1036         self.app_limited
1037     }
1038 
delivery_rate_update_app_limited(&mut self, v: bool)1039     pub fn delivery_rate_update_app_limited(&mut self, v: bool) {
1040         self.delivery_rate.update_app_limited(v);
1041     }
1042 
1043     #[cfg(feature = "qlog")]
maybe_qlog(&mut self) -> Option<EventData>1044     pub fn maybe_qlog(&mut self) -> Option<EventData> {
1045         let qlog_metrics = QlogMetrics {
1046             min_rtt: self.min_rtt,
1047             smoothed_rtt: self.rtt(),
1048             latest_rtt: self.latest_rtt,
1049             rttvar: self.rttvar,
1050             cwnd: self.cwnd() as u64,
1051             bytes_in_flight: self.bytes_in_flight as u64,
1052             ssthresh: self.ssthresh as u64,
1053             pacing_rate: self.pacer.rate(),
1054         };
1055 
1056         self.qlog_metrics.maybe_update(qlog_metrics)
1057     }
1058 
send_quantum(&self) -> usize1059     pub fn send_quantum(&self) -> usize {
1060         self.send_quantum
1061     }
1062 }
1063 
1064 /// Available congestion control algorithms.
1065 ///
1066 /// This enum provides currently available list of congestion control
1067 /// algorithms.
1068 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
1069 #[repr(C)]
1070 pub enum CongestionControlAlgorithm {
1071     /// Reno congestion control algorithm. `reno` in a string form.
1072     Reno  = 0,
1073     /// CUBIC congestion control algorithm (default). `cubic` in a string form.
1074     CUBIC = 1,
1075     /// BBR congestion control algorithm. `bbr` in a string form.
1076     BBR   = 2,
1077 }
1078 
1079 impl FromStr for CongestionControlAlgorithm {
1080     type Err = crate::Error;
1081 
1082     /// Converts a string to `CongestionControlAlgorithm`.
1083     ///
1084     /// If `name` is not valid, `Error::CongestionControl` is returned.
from_str(name: &str) -> std::result::Result<Self, Self::Err>1085     fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
1086         match name {
1087             "reno" => Ok(CongestionControlAlgorithm::Reno),
1088             "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
1089             "bbr" => Ok(CongestionControlAlgorithm::BBR),
1090 
1091             _ => Err(crate::Error::CongestionControl),
1092         }
1093     }
1094 }
1095 
1096 pub struct CongestionControlOps {
1097     pub on_init: fn(r: &mut Recovery),
1098 
1099     pub reset: fn(r: &mut Recovery),
1100 
1101     pub on_packet_sent: fn(r: &mut Recovery, sent_bytes: usize, now: Instant),
1102 
1103     pub on_packets_acked: fn(
1104         r: &mut Recovery,
1105         packets: &[Acked],
1106         epoch: packet::Epoch,
1107         now: Instant,
1108     ),
1109 
1110     pub congestion_event: fn(
1111         r: &mut Recovery,
1112         lost_bytes: usize,
1113         time_sent: Instant,
1114         epoch: packet::Epoch,
1115         now: Instant,
1116     ),
1117 
1118     pub collapse_cwnd: fn(r: &mut Recovery),
1119 
1120     pub checkpoint: fn(r: &mut Recovery),
1121 
1122     pub rollback: fn(r: &mut Recovery) -> bool,
1123 
1124     pub has_custom_pacing: fn() -> bool,
1125 
1126     pub debug_fmt:
1127         fn(r: &Recovery, formatter: &mut std::fmt::Formatter) -> std::fmt::Result,
1128 }
1129 
1130 impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
from(algo: CongestionControlAlgorithm) -> Self1131     fn from(algo: CongestionControlAlgorithm) -> Self {
1132         match algo {
1133             CongestionControlAlgorithm::Reno => &reno::RENO,
1134             CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
1135             CongestionControlAlgorithm::BBR => &bbr::BBR,
1136         }
1137     }
1138 }
1139 
1140 impl std::fmt::Debug for Recovery {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result1141     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1142         match self.loss_detection_timer {
1143             Some(v) => {
1144                 let now = Instant::now();
1145 
1146                 if v > now {
1147                     let d = v.duration_since(now);
1148                     write!(f, "timer={d:?} ")?;
1149                 } else {
1150                     write!(f, "timer=exp ")?;
1151                 }
1152             },
1153 
1154             None => {
1155                 write!(f, "timer=none ")?;
1156             },
1157         };
1158 
1159         write!(f, "latest_rtt={:?} ", self.latest_rtt)?;
1160         write!(f, "srtt={:?} ", self.smoothed_rtt)?;
1161         write!(f, "min_rtt={:?} ", self.min_rtt)?;
1162         write!(f, "rttvar={:?} ", self.rttvar)?;
1163         write!(f, "loss_time={:?} ", self.loss_time)?;
1164         write!(f, "loss_probes={:?} ", self.loss_probes)?;
1165         write!(f, "cwnd={} ", self.congestion_window)?;
1166         write!(f, "ssthresh={} ", self.ssthresh)?;
1167         write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
1168         write!(f, "app_limited={} ", self.app_limited)?;
1169         write!(
1170             f,
1171             "congestion_recovery_start_time={:?} ",
1172             self.congestion_recovery_start_time
1173         )?;
1174         write!(f, "{:?} ", self.delivery_rate)?;
1175         write!(f, "pacer={:?} ", self.pacer)?;
1176 
1177         if self.hystart.enabled() {
1178             write!(f, "hystart={:?} ", self.hystart)?;
1179         }
1180 
1181         // CC-specific debug info
1182         (self.cc_ops.debug_fmt)(self, f)?;
1183 
1184         Ok(())
1185     }
1186 }
1187 
1188 #[derive(Clone)]
1189 pub struct Sent {
1190     pub pkt_num: u64,
1191 
1192     pub frames: SmallVec<[frame::Frame; 1]>,
1193 
1194     pub time_sent: Instant,
1195 
1196     pub time_acked: Option<Instant>,
1197 
1198     pub time_lost: Option<Instant>,
1199 
1200     pub size: usize,
1201 
1202     pub ack_eliciting: bool,
1203 
1204     pub in_flight: bool,
1205 
1206     pub delivered: usize,
1207 
1208     pub delivered_time: Instant,
1209 
1210     pub first_sent_time: Instant,
1211 
1212     pub is_app_limited: bool,
1213 
1214     pub has_data: bool,
1215 }
1216 
1217 impl std::fmt::Debug for Sent {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result1218     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1219         write!(f, "pkt_num={:?} ", self.pkt_num)?;
1220         write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
1221         write!(f, "pkt_size={:?} ", self.size)?;
1222         write!(f, "delivered={:?} ", self.delivered)?;
1223         write!(f, "delivered_time={:?} ", self.delivered_time)?;
1224         write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
1225         write!(f, "is_app_limited={} ", self.is_app_limited)?;
1226         write!(f, "has_data={} ", self.has_data)?;
1227 
1228         Ok(())
1229     }
1230 }
1231 
1232 #[derive(Clone)]
1233 pub struct Acked {
1234     pub pkt_num: u64,
1235 
1236     pub time_sent: Instant,
1237 
1238     pub size: usize,
1239 
1240     pub rtt: Duration,
1241 
1242     pub delivered: usize,
1243 
1244     pub delivered_time: Instant,
1245 
1246     pub first_sent_time: Instant,
1247 
1248     pub is_app_limited: bool,
1249 }
1250 
1251 #[derive(Clone, Copy, Debug)]
1252 pub struct HandshakeStatus {
1253     pub has_handshake_keys: bool,
1254 
1255     pub peer_verified_address: bool,
1256 
1257     pub completed: bool,
1258 }
1259 
1260 #[cfg(test)]
1261 impl Default for HandshakeStatus {
default() -> HandshakeStatus1262     fn default() -> HandshakeStatus {
1263         HandshakeStatus {
1264             has_handshake_keys: true,
1265 
1266             peer_verified_address: true,
1267 
1268             completed: true,
1269         }
1270     }
1271 }
1272 
sub_abs(lhs: Duration, rhs: Duration) -> Duration1273 fn sub_abs(lhs: Duration, rhs: Duration) -> Duration {
1274     if lhs > rhs {
1275         lhs - rhs
1276     } else {
1277         rhs - lhs
1278     }
1279 }
1280 
1281 // We don't need to log all qlog metrics every time there is a recovery event.
1282 // Instead, we can log only the MetricsUpdated event data fields that we care
1283 // about, only when they change. To support this, the QLogMetrics structure
1284 // keeps a running picture of the fields.
1285 #[derive(Default)]
1286 #[cfg(feature = "qlog")]
1287 struct QlogMetrics {
1288     min_rtt: Duration,
1289     smoothed_rtt: Duration,
1290     latest_rtt: Duration,
1291     rttvar: Duration,
1292     cwnd: u64,
1293     bytes_in_flight: u64,
1294     ssthresh: u64,
1295     pacing_rate: u64,
1296 }
1297 
1298 #[cfg(feature = "qlog")]
1299 impl QlogMetrics {
1300     // Make a qlog event if the latest instance of QlogMetrics is different.
1301     //
1302     // This function diffs each of the fields. A qlog MetricsUpdated event is
1303     // only generated if at least one field is different. Where fields are
1304     // different, the qlog event contains the latest value.
maybe_update(&mut self, latest: Self) -> Option<EventData>1305     fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
1306         let mut emit_event = false;
1307 
1308         let new_min_rtt = if self.min_rtt != latest.min_rtt {
1309             self.min_rtt = latest.min_rtt;
1310             emit_event = true;
1311             Some(latest.min_rtt.as_secs_f32() * 1000.0)
1312         } else {
1313             None
1314         };
1315 
1316         let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
1317             self.smoothed_rtt = latest.smoothed_rtt;
1318             emit_event = true;
1319             Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
1320         } else {
1321             None
1322         };
1323 
1324         let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
1325             self.latest_rtt = latest.latest_rtt;
1326             emit_event = true;
1327             Some(latest.latest_rtt.as_secs_f32() * 1000.0)
1328         } else {
1329             None
1330         };
1331 
1332         let new_rttvar = if self.rttvar != latest.rttvar {
1333             self.rttvar = latest.rttvar;
1334             emit_event = true;
1335             Some(latest.rttvar.as_secs_f32() * 1000.0)
1336         } else {
1337             None
1338         };
1339 
1340         let new_cwnd = if self.cwnd != latest.cwnd {
1341             self.cwnd = latest.cwnd;
1342             emit_event = true;
1343             Some(latest.cwnd)
1344         } else {
1345             None
1346         };
1347 
1348         let new_bytes_in_flight =
1349             if self.bytes_in_flight != latest.bytes_in_flight {
1350                 self.bytes_in_flight = latest.bytes_in_flight;
1351                 emit_event = true;
1352                 Some(latest.bytes_in_flight)
1353             } else {
1354                 None
1355             };
1356 
1357         let new_ssthresh = if self.ssthresh != latest.ssthresh {
1358             self.ssthresh = latest.ssthresh;
1359             emit_event = true;
1360             Some(latest.ssthresh)
1361         } else {
1362             None
1363         };
1364 
1365         let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
1366             self.pacing_rate = latest.pacing_rate;
1367             emit_event = true;
1368             Some(latest.pacing_rate)
1369         } else {
1370             None
1371         };
1372 
1373         if emit_event {
1374             // QVis can't use all these fields and they can be large.
1375             return Some(EventData::MetricsUpdated(
1376                 qlog::events::quic::MetricsUpdated {
1377                     min_rtt: new_min_rtt,
1378                     smoothed_rtt: new_smoothed_rtt,
1379                     latest_rtt: new_latest_rtt,
1380                     rtt_variance: new_rttvar,
1381                     pto_count: None,
1382                     congestion_window: new_cwnd,
1383                     bytes_in_flight: new_bytes_in_flight,
1384                     ssthresh: new_ssthresh,
1385                     packets_in_flight: None,
1386                     pacing_rate: new_pacing_rate,
1387                 },
1388             ));
1389         }
1390 
1391         None
1392     }
1393 }
1394 
1395 #[cfg(test)]
1396 mod tests {
1397     use super::*;
1398     use smallvec::smallvec;
1399 
1400     #[test]
lookup_cc_algo_ok()1401     fn lookup_cc_algo_ok() {
1402         let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
1403         assert_eq!(algo, CongestionControlAlgorithm::Reno);
1404     }
1405 
1406     #[test]
lookup_cc_algo_bad()1407     fn lookup_cc_algo_bad() {
1408         assert_eq!(
1409             CongestionControlAlgorithm::from_str("???"),
1410             Err(crate::Error::CongestionControl)
1411         );
1412     }
1413 
1414     #[test]
collapse_cwnd()1415     fn collapse_cwnd() {
1416         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1417         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1418 
1419         let mut r = Recovery::new(&cfg);
1420 
1421         // cwnd will be reset.
1422         r.collapse_cwnd();
1423         assert_eq!(r.cwnd(), r.max_datagram_size * MINIMUM_WINDOW_PACKETS);
1424     }
1425 
1426     #[test]
loss_on_pto()1427     fn loss_on_pto() {
1428         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1429         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1430 
1431         let mut r = Recovery::new(&cfg);
1432 
1433         let mut now = Instant::now();
1434 
1435         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1436 
1437         // Start by sending a few packets.
1438         let p = Sent {
1439             pkt_num: 0,
1440             frames: smallvec![],
1441             time_sent: now,
1442             time_acked: None,
1443             time_lost: None,
1444             size: 1000,
1445             ack_eliciting: true,
1446             in_flight: true,
1447             delivered: 0,
1448             delivered_time: now,
1449             first_sent_time: now,
1450             is_app_limited: false,
1451             has_data: false,
1452         };
1453 
1454         r.on_packet_sent(
1455             p,
1456             packet::Epoch::Application,
1457             HandshakeStatus::default(),
1458             now,
1459             "",
1460         );
1461         assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
1462         assert_eq!(r.bytes_in_flight, 1000);
1463 
1464         let p = Sent {
1465             pkt_num: 1,
1466             frames: smallvec![],
1467             time_sent: now,
1468             time_acked: None,
1469             time_lost: None,
1470             size: 1000,
1471             ack_eliciting: true,
1472             in_flight: true,
1473             delivered: 0,
1474             delivered_time: now,
1475             first_sent_time: now,
1476             is_app_limited: false,
1477             has_data: false,
1478         };
1479 
1480         r.on_packet_sent(
1481             p,
1482             packet::Epoch::Application,
1483             HandshakeStatus::default(),
1484             now,
1485             "",
1486         );
1487         assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1488         assert_eq!(r.bytes_in_flight, 2000);
1489 
1490         let p = Sent {
1491             pkt_num: 2,
1492             frames: smallvec![],
1493             time_sent: now,
1494             time_acked: None,
1495             time_lost: None,
1496             size: 1000,
1497             ack_eliciting: true,
1498             in_flight: true,
1499             delivered: 0,
1500             delivered_time: now,
1501             first_sent_time: now,
1502             is_app_limited: false,
1503             has_data: false,
1504         };
1505 
1506         r.on_packet_sent(
1507             p,
1508             packet::Epoch::Application,
1509             HandshakeStatus::default(),
1510             now,
1511             "",
1512         );
1513         assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1514         assert_eq!(r.bytes_in_flight, 3000);
1515 
1516         let p = Sent {
1517             pkt_num: 3,
1518             frames: smallvec![],
1519             time_sent: now,
1520             time_acked: None,
1521             time_lost: None,
1522             size: 1000,
1523             ack_eliciting: true,
1524             in_flight: true,
1525             delivered: 0,
1526             delivered_time: now,
1527             first_sent_time: now,
1528             is_app_limited: false,
1529             has_data: false,
1530         };
1531 
1532         r.on_packet_sent(
1533             p,
1534             packet::Epoch::Application,
1535             HandshakeStatus::default(),
1536             now,
1537             "",
1538         );
1539         assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1540         assert_eq!(r.bytes_in_flight, 4000);
1541 
1542         // Wait for 10ms.
1543         now += Duration::from_millis(10);
1544 
1545         // Only the first 2 packets are acked.
1546         let mut acked = ranges::RangeSet::default();
1547         acked.insert(0..2);
1548 
1549         assert_eq!(
1550             r.on_ack_received(
1551                 &acked,
1552                 25,
1553                 packet::Epoch::Application,
1554                 HandshakeStatus::default(),
1555                 now,
1556                 ""
1557             ),
1558             Ok((0, 0))
1559         );
1560 
1561         assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1562         assert_eq!(r.bytes_in_flight, 2000);
1563         assert_eq!(r.lost_count, 0);
1564 
1565         // Wait until loss detection timer expires.
1566         now = r.loss_detection_timer().unwrap();
1567 
1568         // PTO.
1569         r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1570         assert_eq!(r.loss_probes[packet::Epoch::Application], 1);
1571         assert_eq!(r.lost_count, 0);
1572         assert_eq!(r.pto_count, 1);
1573 
1574         let p = Sent {
1575             pkt_num: 4,
1576             frames: smallvec![],
1577             time_sent: now,
1578             time_acked: None,
1579             time_lost: None,
1580             size: 1000,
1581             ack_eliciting: true,
1582             in_flight: true,
1583             delivered: 0,
1584             delivered_time: now,
1585             first_sent_time: now,
1586             is_app_limited: false,
1587             has_data: false,
1588         };
1589 
1590         r.on_packet_sent(
1591             p,
1592             packet::Epoch::Application,
1593             HandshakeStatus::default(),
1594             now,
1595             "",
1596         );
1597         assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1598         assert_eq!(r.bytes_in_flight, 3000);
1599 
1600         let p = Sent {
1601             pkt_num: 5,
1602             frames: smallvec![],
1603             time_sent: now,
1604             time_acked: None,
1605             time_lost: None,
1606             size: 1000,
1607             ack_eliciting: true,
1608             in_flight: true,
1609             delivered: 0,
1610             delivered_time: now,
1611             first_sent_time: now,
1612             is_app_limited: false,
1613             has_data: false,
1614         };
1615 
1616         r.on_packet_sent(
1617             p,
1618             packet::Epoch::Application,
1619             HandshakeStatus::default(),
1620             now,
1621             "",
1622         );
1623         assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1624         assert_eq!(r.bytes_in_flight, 4000);
1625         assert_eq!(r.lost_count, 0);
1626 
1627         // Wait for 10ms.
1628         now += Duration::from_millis(10);
1629 
1630         // PTO packets are acked.
1631         let mut acked = ranges::RangeSet::default();
1632         acked.insert(4..6);
1633 
1634         assert_eq!(
1635             r.on_ack_received(
1636                 &acked,
1637                 25,
1638                 packet::Epoch::Application,
1639                 HandshakeStatus::default(),
1640                 now,
1641                 ""
1642             ),
1643             Ok((2, 2000))
1644         );
1645 
1646         assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1647         assert_eq!(r.bytes_in_flight, 0);
1648 
1649         assert_eq!(r.lost_count, 2);
1650 
1651         // Wait 1 RTT.
1652         now += r.rtt();
1653 
1654         r.detect_lost_packets(packet::Epoch::Application, now, "");
1655 
1656         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1657     }
1658 
1659     #[test]
loss_on_timer()1660     fn loss_on_timer() {
1661         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1662         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1663 
1664         let mut r = Recovery::new(&cfg);
1665 
1666         let mut now = Instant::now();
1667 
1668         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1669 
1670         // Start by sending a few packets.
1671         let p = Sent {
1672             pkt_num: 0,
1673             frames: smallvec![],
1674             time_sent: now,
1675             time_acked: None,
1676             time_lost: None,
1677             size: 1000,
1678             ack_eliciting: true,
1679             in_flight: true,
1680             delivered: 0,
1681             delivered_time: now,
1682             first_sent_time: now,
1683             is_app_limited: false,
1684             has_data: false,
1685         };
1686 
1687         r.on_packet_sent(
1688             p,
1689             packet::Epoch::Application,
1690             HandshakeStatus::default(),
1691             now,
1692             "",
1693         );
1694         assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
1695         assert_eq!(r.bytes_in_flight, 1000);
1696 
1697         let p = Sent {
1698             pkt_num: 1,
1699             frames: smallvec![],
1700             time_sent: now,
1701             time_acked: None,
1702             time_lost: None,
1703             size: 1000,
1704             ack_eliciting: true,
1705             in_flight: true,
1706             delivered: 0,
1707             delivered_time: now,
1708             first_sent_time: now,
1709             is_app_limited: false,
1710             has_data: false,
1711         };
1712 
1713         r.on_packet_sent(
1714             p,
1715             packet::Epoch::Application,
1716             HandshakeStatus::default(),
1717             now,
1718             "",
1719         );
1720         assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1721         assert_eq!(r.bytes_in_flight, 2000);
1722 
1723         let p = Sent {
1724             pkt_num: 2,
1725             frames: smallvec![],
1726             time_sent: now,
1727             time_acked: None,
1728             time_lost: None,
1729             size: 1000,
1730             ack_eliciting: true,
1731             in_flight: true,
1732             delivered: 0,
1733             delivered_time: now,
1734             first_sent_time: now,
1735             is_app_limited: false,
1736             has_data: false,
1737         };
1738 
1739         r.on_packet_sent(
1740             p,
1741             packet::Epoch::Application,
1742             HandshakeStatus::default(),
1743             now,
1744             "",
1745         );
1746         assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1747         assert_eq!(r.bytes_in_flight, 3000);
1748 
1749         let p = Sent {
1750             pkt_num: 3,
1751             frames: smallvec![],
1752             time_sent: now,
1753             time_acked: None,
1754             time_lost: None,
1755             size: 1000,
1756             ack_eliciting: true,
1757             in_flight: true,
1758             delivered: 0,
1759             delivered_time: now,
1760             first_sent_time: now,
1761             is_app_limited: false,
1762             has_data: false,
1763         };
1764 
1765         r.on_packet_sent(
1766             p,
1767             packet::Epoch::Application,
1768             HandshakeStatus::default(),
1769             now,
1770             "",
1771         );
1772         assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1773         assert_eq!(r.bytes_in_flight, 4000);
1774 
1775         // Wait for 10ms.
1776         now += Duration::from_millis(10);
1777 
1778         // Only the first 2 packets and the last one are acked.
1779         let mut acked = ranges::RangeSet::default();
1780         acked.insert(0..2);
1781         acked.insert(3..4);
1782 
1783         assert_eq!(
1784             r.on_ack_received(
1785                 &acked,
1786                 25,
1787                 packet::Epoch::Application,
1788                 HandshakeStatus::default(),
1789                 now,
1790                 ""
1791             ),
1792             Ok((0, 0))
1793         );
1794 
1795         assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1796         assert_eq!(r.bytes_in_flight, 1000);
1797         assert_eq!(r.lost_count, 0);
1798 
1799         // Wait until loss detection timer expires.
1800         now = r.loss_detection_timer().unwrap();
1801 
1802         // Packet is declared lost.
1803         r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1804         assert_eq!(r.loss_probes[packet::Epoch::Application], 0);
1805 
1806         assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1807         assert_eq!(r.bytes_in_flight, 0);
1808 
1809         assert_eq!(r.lost_count, 1);
1810 
1811         // Wait 1 RTT.
1812         now += r.rtt();
1813 
1814         r.detect_lost_packets(packet::Epoch::Application, now, "");
1815 
1816         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1817     }
1818 
1819     #[test]
loss_on_reordering()1820     fn loss_on_reordering() {
1821         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1822         cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1823 
1824         let mut r = Recovery::new(&cfg);
1825 
1826         let mut now = Instant::now();
1827 
1828         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1829 
1830         // Start by sending a few packets.
1831         let p = Sent {
1832             pkt_num: 0,
1833             frames: smallvec![],
1834             time_sent: now,
1835             time_acked: None,
1836             time_lost: None,
1837             size: 1000,
1838             ack_eliciting: true,
1839             in_flight: true,
1840             delivered: 0,
1841             delivered_time: now,
1842             first_sent_time: now,
1843             is_app_limited: false,
1844             has_data: false,
1845         };
1846 
1847         r.on_packet_sent(
1848             p,
1849             packet::Epoch::Application,
1850             HandshakeStatus::default(),
1851             now,
1852             "",
1853         );
1854         assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
1855         assert_eq!(r.bytes_in_flight, 1000);
1856 
1857         let p = Sent {
1858             pkt_num: 1,
1859             frames: smallvec![],
1860             time_sent: now,
1861             time_acked: None,
1862             time_lost: None,
1863             size: 1000,
1864             ack_eliciting: true,
1865             in_flight: true,
1866             delivered: 0,
1867             delivered_time: now,
1868             first_sent_time: now,
1869             is_app_limited: false,
1870             has_data: false,
1871         };
1872 
1873         r.on_packet_sent(
1874             p,
1875             packet::Epoch::Application,
1876             HandshakeStatus::default(),
1877             now,
1878             "",
1879         );
1880         assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1881         assert_eq!(r.bytes_in_flight, 2000);
1882 
1883         let p = Sent {
1884             pkt_num: 2,
1885             frames: smallvec![],
1886             time_sent: now,
1887             time_acked: None,
1888             time_lost: None,
1889             size: 1000,
1890             ack_eliciting: true,
1891             in_flight: true,
1892             delivered: 0,
1893             delivered_time: now,
1894             first_sent_time: now,
1895             is_app_limited: false,
1896             has_data: false,
1897         };
1898 
1899         r.on_packet_sent(
1900             p,
1901             packet::Epoch::Application,
1902             HandshakeStatus::default(),
1903             now,
1904             "",
1905         );
1906         assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1907         assert_eq!(r.bytes_in_flight, 3000);
1908 
1909         let p = Sent {
1910             pkt_num: 3,
1911             frames: smallvec![],
1912             time_sent: now,
1913             time_acked: None,
1914             time_lost: None,
1915             size: 1000,
1916             ack_eliciting: true,
1917             in_flight: true,
1918             delivered: 0,
1919             delivered_time: now,
1920             first_sent_time: now,
1921             is_app_limited: false,
1922             has_data: false,
1923         };
1924 
1925         r.on_packet_sent(
1926             p,
1927             packet::Epoch::Application,
1928             HandshakeStatus::default(),
1929             now,
1930             "",
1931         );
1932         assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1933         assert_eq!(r.bytes_in_flight, 4000);
1934 
1935         // Wait for 10ms.
1936         now += Duration::from_millis(10);
1937 
1938         // ACKs are reordered.
1939         let mut acked = ranges::RangeSet::default();
1940         acked.insert(2..4);
1941 
1942         assert_eq!(
1943             r.on_ack_received(
1944                 &acked,
1945                 25,
1946                 packet::Epoch::Application,
1947                 HandshakeStatus::default(),
1948                 now,
1949                 ""
1950             ),
1951             Ok((1, 1000))
1952         );
1953 
1954         now += Duration::from_millis(10);
1955 
1956         let mut acked = ranges::RangeSet::default();
1957         acked.insert(0..2);
1958 
1959         assert_eq!(r.pkt_thresh, INITIAL_PACKET_THRESHOLD);
1960 
1961         assert_eq!(
1962             r.on_ack_received(
1963                 &acked,
1964                 25,
1965                 packet::Epoch::Application,
1966                 HandshakeStatus::default(),
1967                 now,
1968                 ""
1969             ),
1970             Ok((0, 0))
1971         );
1972 
1973         assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1974         assert_eq!(r.bytes_in_flight, 0);
1975 
1976         // Spurious loss.
1977         assert_eq!(r.lost_count, 1);
1978         assert_eq!(r.lost_spurious_count, 1);
1979 
1980         // Packet threshold was increased.
1981         assert_eq!(r.pkt_thresh, 4);
1982 
1983         // Wait 1 RTT.
1984         now += r.rtt();
1985 
1986         r.detect_lost_packets(packet::Epoch::Application, now, "");
1987 
1988         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1989     }
1990 
1991     #[test]
pacing()1992     fn pacing() {
1993         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1994         cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
1995 
1996         let mut r = Recovery::new(&cfg);
1997 
1998         let mut now = Instant::now();
1999 
2000         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
2001 
2002         // send out first packet (a full initcwnd).
2003         let p = Sent {
2004             pkt_num: 0,
2005             frames: smallvec![],
2006             time_sent: now,
2007             time_acked: None,
2008             time_lost: None,
2009             size: 12000,
2010             ack_eliciting: true,
2011             in_flight: true,
2012             delivered: 0,
2013             delivered_time: now,
2014             first_sent_time: now,
2015             is_app_limited: false,
2016             has_data: false,
2017         };
2018 
2019         r.on_packet_sent(
2020             p,
2021             packet::Epoch::Application,
2022             HandshakeStatus::default(),
2023             now,
2024             "",
2025         );
2026 
2027         assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
2028         assert_eq!(r.bytes_in_flight, 12000);
2029 
2030         // First packet will be sent out immediately.
2031         assert_eq!(r.pacer.rate(), 0);
2032         assert_eq!(r.get_packet_send_time(), now);
2033 
2034         // Wait 50ms for ACK.
2035         now += Duration::from_millis(50);
2036 
2037         let mut acked = ranges::RangeSet::default();
2038         acked.insert(0..1);
2039 
2040         assert_eq!(
2041             r.on_ack_received(
2042                 &acked,
2043                 10,
2044                 packet::Epoch::Application,
2045                 HandshakeStatus::default(),
2046                 now,
2047                 ""
2048             ),
2049             Ok((0, 0))
2050         );
2051 
2052         assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
2053         assert_eq!(r.bytes_in_flight, 0);
2054         assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
2055 
2056         // 1 MSS increased.
2057         assert_eq!(r.congestion_window, 12000 + 1200);
2058 
2059         // Send out second packet.
2060         let p = Sent {
2061             pkt_num: 1,
2062             frames: smallvec![],
2063             time_sent: now,
2064             time_acked: None,
2065             time_lost: None,
2066             size: 6000,
2067             ack_eliciting: true,
2068             in_flight: true,
2069             delivered: 0,
2070             delivered_time: now,
2071             first_sent_time: now,
2072             is_app_limited: false,
2073             has_data: false,
2074         };
2075 
2076         r.on_packet_sent(
2077             p,
2078             packet::Epoch::Application,
2079             HandshakeStatus::default(),
2080             now,
2081             "",
2082         );
2083 
2084         assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
2085         assert_eq!(r.bytes_in_flight, 6000);
2086 
2087         // Pacing is not done during initial phase of connection.
2088         assert_eq!(r.get_packet_send_time(), now);
2089 
2090         // Send the third packet out.
2091         let p = Sent {
2092             pkt_num: 2,
2093             frames: smallvec![],
2094             time_sent: now,
2095             time_acked: None,
2096             time_lost: None,
2097             size: 6000,
2098             ack_eliciting: true,
2099             in_flight: true,
2100             delivered: 0,
2101             delivered_time: now,
2102             first_sent_time: now,
2103             is_app_limited: false,
2104             has_data: false,
2105         };
2106 
2107         r.on_packet_sent(
2108             p,
2109             packet::Epoch::Application,
2110             HandshakeStatus::default(),
2111             now,
2112             "",
2113         );
2114 
2115         assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
2116         assert_eq!(r.bytes_in_flight, 12000);
2117 
2118         // Send the third packet out.
2119         let p = Sent {
2120             pkt_num: 3,
2121             frames: smallvec![],
2122             time_sent: now,
2123             time_acked: None,
2124             time_lost: None,
2125             size: 1000,
2126             ack_eliciting: true,
2127             in_flight: true,
2128             delivered: 0,
2129             delivered_time: now,
2130             first_sent_time: now,
2131             is_app_limited: false,
2132             has_data: false,
2133         };
2134 
2135         r.on_packet_sent(
2136             p,
2137             packet::Epoch::Application,
2138             HandshakeStatus::default(),
2139             now,
2140             "",
2141         );
2142 
2143         assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
2144         assert_eq!(r.bytes_in_flight, 13000);
2145 
2146         // We pace this outgoing packet. as all conditions for pacing
2147         // are passed.
2148         let pacing_rate =
2149             (r.congestion_window as f64 * PACING_MULTIPLIER / 0.05) as u64;
2150         assert_eq!(r.pacer.rate(), pacing_rate);
2151 
2152         assert_eq!(
2153             r.get_packet_send_time(),
2154             now + Duration::from_secs_f64(12000.0 / pacing_rate as f64)
2155         );
2156     }
2157 }
2158 
2159 mod bbr;
2160 mod cubic;
2161 mod delivery_rate;
2162 mod hystart;
2163 mod pacer;
2164 mod prr;
2165 mod reno;
2166