1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 use std::cmp;
28
29 use std::str::FromStr;
30
31 use std::time::Duration;
32 use std::time::Instant;
33
34 use std::collections::VecDeque;
35
36 use crate::Config;
37 use crate::Result;
38
39 use crate::frame;
40 use crate::minmax;
41 use crate::packet;
42 use crate::ranges;
43
44 #[cfg(feature = "qlog")]
45 use qlog::events::EventData;
46
47 use smallvec::SmallVec;
48
49 // Loss Recovery
50 const INITIAL_PACKET_THRESHOLD: u64 = 3;
51
52 const MAX_PACKET_THRESHOLD: u64 = 20;
53
54 const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0;
55
56 const GRANULARITY: Duration = Duration::from_millis(1);
57
58 const INITIAL_RTT: Duration = Duration::from_millis(333);
59
60 const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;
61
62 const RTT_WINDOW: Duration = Duration::from_secs(300);
63
64 const MAX_PTO_PROBES_COUNT: usize = 2;
65
66 // Congestion Control
67 const INITIAL_WINDOW_PACKETS: usize = 10;
68
69 const MINIMUM_WINDOW_PACKETS: usize = 2;
70
71 const LOSS_REDUCTION_FACTOR: f64 = 0.5;
72
73 const PACING_MULTIPLIER: f64 = 1.25;
74
75 // How many non ACK eliciting packets we send before including a PING to solicit
76 // an ACK.
77 pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24;
78
79 pub struct Recovery {
80 loss_detection_timer: Option<Instant>,
81
82 pto_count: u32,
83
84 time_of_last_sent_ack_eliciting_pkt:
85 [Option<Instant>; packet::Epoch::count()],
86
87 largest_acked_pkt: [u64; packet::Epoch::count()],
88
89 largest_sent_pkt: [u64; packet::Epoch::count()],
90
91 latest_rtt: Duration,
92
93 smoothed_rtt: Option<Duration>,
94
95 rttvar: Duration,
96
97 minmax_filter: minmax::Minmax<Duration>,
98
99 min_rtt: Duration,
100
101 pub max_ack_delay: Duration,
102
103 loss_time: [Option<Instant>; packet::Epoch::count()],
104
105 sent: [VecDeque<Sent>; packet::Epoch::count()],
106
107 pub lost: [Vec<frame::Frame>; packet::Epoch::count()],
108
109 pub acked: [Vec<frame::Frame>; packet::Epoch::count()],
110
111 pub lost_count: usize,
112
113 pub lost_spurious_count: usize,
114
115 pub loss_probes: [usize; packet::Epoch::count()],
116
117 in_flight_count: [usize; packet::Epoch::count()],
118
119 app_limited: bool,
120
121 delivery_rate: delivery_rate::Rate,
122
123 pkt_thresh: u64,
124
125 time_thresh: f64,
126
127 // Congestion control.
128 cc_ops: &'static CongestionControlOps,
129
130 congestion_window: usize,
131
132 bytes_in_flight: usize,
133
134 ssthresh: usize,
135
136 bytes_acked_sl: usize,
137
138 bytes_acked_ca: usize,
139
140 bytes_sent: usize,
141
142 pub bytes_lost: u64,
143
144 congestion_recovery_start_time: Option<Instant>,
145
146 max_datagram_size: usize,
147
148 cubic_state: cubic::State,
149
150 // HyStart++.
151 hystart: hystart::Hystart,
152
153 // Pacing.
154 pub pacer: pacer::Pacer,
155
156 // RFC6937 PRR.
157 prr: prr::PRR,
158
159 #[cfg(feature = "qlog")]
160 qlog_metrics: QlogMetrics,
161
162 // The maximum size of a data aggregate scheduled and
163 // transmitted together.
164 send_quantum: usize,
165
166 // BBR state.
167 bbr_state: bbr::State,
168
169 /// How many non-ack-eliciting packets have been sent.
170 outstanding_non_ack_eliciting: usize,
171 }
172
173 pub struct RecoveryConfig {
174 max_send_udp_payload_size: usize,
175 pub max_ack_delay: Duration,
176 cc_ops: &'static CongestionControlOps,
177 hystart: bool,
178 pacing: bool,
179 }
180
181 impl RecoveryConfig {
from_config(config: &Config) -> Self182 pub fn from_config(config: &Config) -> Self {
183 Self {
184 max_send_udp_payload_size: config.max_send_udp_payload_size,
185 max_ack_delay: Duration::ZERO,
186 cc_ops: config.cc_algorithm.into(),
187 hystart: config.hystart,
188 pacing: config.pacing,
189 }
190 }
191 }
192
193 impl Recovery {
new_with_config(recovery_config: &RecoveryConfig) -> Self194 pub fn new_with_config(recovery_config: &RecoveryConfig) -> Self {
195 let initial_congestion_window =
196 recovery_config.max_send_udp_payload_size * INITIAL_WINDOW_PACKETS;
197
198 Recovery {
199 loss_detection_timer: None,
200
201 pto_count: 0,
202
203 time_of_last_sent_ack_eliciting_pkt: [None; packet::Epoch::count()],
204
205 largest_acked_pkt: [u64::MAX; packet::Epoch::count()],
206
207 largest_sent_pkt: [0; packet::Epoch::count()],
208
209 latest_rtt: Duration::ZERO,
210
211 // This field should be initialized to `INITIAL_RTT` for the initial
212 // PTO calculation, but it also needs to be an `Option` to track
213 // whether any RTT sample was received, so the initial value is
214 // handled by the `rtt()` method instead.
215 smoothed_rtt: None,
216
217 minmax_filter: minmax::Minmax::new(Duration::ZERO),
218
219 min_rtt: Duration::ZERO,
220
221 rttvar: INITIAL_RTT / 2,
222
223 max_ack_delay: recovery_config.max_ack_delay,
224
225 loss_time: [None; packet::Epoch::count()],
226
227 sent: [VecDeque::new(), VecDeque::new(), VecDeque::new()],
228
229 lost: [Vec::new(), Vec::new(), Vec::new()],
230
231 acked: [Vec::new(), Vec::new(), Vec::new()],
232
233 lost_count: 0,
234 lost_spurious_count: 0,
235
236 loss_probes: [0; packet::Epoch::count()],
237
238 in_flight_count: [0; packet::Epoch::count()],
239
240 congestion_window: initial_congestion_window,
241
242 pkt_thresh: INITIAL_PACKET_THRESHOLD,
243
244 time_thresh: INITIAL_TIME_THRESHOLD,
245
246 bytes_in_flight: 0,
247
248 ssthresh: usize::MAX,
249
250 bytes_acked_sl: 0,
251
252 bytes_acked_ca: 0,
253
254 bytes_sent: 0,
255
256 bytes_lost: 0,
257
258 congestion_recovery_start_time: None,
259
260 max_datagram_size: recovery_config.max_send_udp_payload_size,
261
262 cc_ops: recovery_config.cc_ops,
263
264 delivery_rate: delivery_rate::Rate::default(),
265
266 cubic_state: cubic::State::default(),
267
268 app_limited: false,
269
270 hystart: hystart::Hystart::new(recovery_config.hystart),
271
272 pacer: pacer::Pacer::new(
273 recovery_config.pacing,
274 initial_congestion_window,
275 0,
276 recovery_config.max_send_udp_payload_size,
277 ),
278
279 prr: prr::PRR::default(),
280
281 send_quantum: initial_congestion_window,
282
283 #[cfg(feature = "qlog")]
284 qlog_metrics: QlogMetrics::default(),
285
286 bbr_state: bbr::State::new(),
287
288 outstanding_non_ack_eliciting: 0,
289 }
290 }
291
new(config: &Config) -> Self292 pub fn new(config: &Config) -> Self {
293 Self::new_with_config(&RecoveryConfig::from_config(config))
294 }
295
on_init(&mut self)296 pub fn on_init(&mut self) {
297 (self.cc_ops.on_init)(self);
298 }
299
reset(&mut self)300 pub fn reset(&mut self) {
301 self.congestion_window = self.max_datagram_size * INITIAL_WINDOW_PACKETS;
302 self.in_flight_count = [0; packet::Epoch::count()];
303 self.congestion_recovery_start_time = None;
304 self.ssthresh = usize::MAX;
305 (self.cc_ops.reset)(self);
306 self.hystart.reset();
307 self.prr = prr::PRR::default();
308 }
309
310 /// Returns whether or not we should elicit an ACK even if we wouldn't
311 /// otherwise have constructed an ACK eliciting packet.
should_elicit_ack(&self, epoch: packet::Epoch) -> bool312 pub fn should_elicit_ack(&self, epoch: packet::Epoch) -> bool {
313 self.loss_probes[epoch] > 0 ||
314 self.outstanding_non_ack_eliciting >=
315 MAX_OUTSTANDING_NON_ACK_ELICITING
316 }
317
on_packet_sent( &mut self, mut pkt: Sent, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )318 pub fn on_packet_sent(
319 &mut self, mut pkt: Sent, epoch: packet::Epoch,
320 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
321 ) {
322 let ack_eliciting = pkt.ack_eliciting;
323 let in_flight = pkt.in_flight;
324 let sent_bytes = pkt.size;
325 let pkt_num = pkt.pkt_num;
326
327 if ack_eliciting {
328 self.outstanding_non_ack_eliciting = 0;
329 } else {
330 self.outstanding_non_ack_eliciting += 1;
331 }
332
333 self.largest_sent_pkt[epoch] =
334 cmp::max(self.largest_sent_pkt[epoch], pkt_num);
335
336 if in_flight {
337 if ack_eliciting {
338 self.time_of_last_sent_ack_eliciting_pkt[epoch] = Some(now);
339 }
340
341 self.in_flight_count[epoch] += 1;
342
343 self.update_app_limited(
344 (self.bytes_in_flight + sent_bytes) < self.congestion_window,
345 );
346
347 self.on_packet_sent_cc(sent_bytes, now);
348
349 self.prr.on_packet_sent(sent_bytes);
350
351 self.set_loss_detection_timer(handshake_status, now);
352 }
353
354 // HyStart++: Start of the round in a slow start.
355 if self.hystart.enabled() &&
356 epoch == packet::Epoch::Application &&
357 self.congestion_window < self.ssthresh
358 {
359 self.hystart.start_round(pkt_num);
360 }
361
362 // Pacing: Set the pacing rate if CC doesn't do its own.
363 if !(self.cc_ops.has_custom_pacing)() {
364 if let Some(srtt) = self.smoothed_rtt {
365 let rate = PACING_MULTIPLIER * self.congestion_window as f64 /
366 srtt.as_secs_f64();
367 self.set_pacing_rate(rate as u64, now);
368 }
369 }
370
371 self.schedule_next_packet(epoch, now, sent_bytes);
372
373 pkt.time_sent = self.get_packet_send_time();
374
375 // bytes_in_flight is already updated. Use previous value.
376 self.delivery_rate
377 .on_packet_sent(&mut pkt, self.bytes_in_flight - sent_bytes);
378
379 self.sent[epoch].push_back(pkt);
380
381 self.bytes_sent += sent_bytes;
382 trace!("{} {:?}", trace_id, self);
383 }
384
on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant)385 fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
386 (self.cc_ops.on_packet_sent)(self, sent_bytes, now);
387 }
388
set_pacing_rate(&mut self, rate: u64, now: Instant)389 pub fn set_pacing_rate(&mut self, rate: u64, now: Instant) {
390 self.pacer.update(self.send_quantum, rate, now);
391 }
392
get_packet_send_time(&self) -> Instant393 pub fn get_packet_send_time(&self) -> Instant {
394 self.pacer.next_time()
395 }
396
schedule_next_packet( &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, )397 fn schedule_next_packet(
398 &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
399 ) {
400 // Don't pace in any of these cases:
401 // * Packet contains no data.
402 // * Packet epoch is not Epoch::Application.
403 // * The congestion window is within initcwnd.
404
405 let is_app = epoch == packet::Epoch::Application;
406
407 let in_initcwnd =
408 self.bytes_sent < self.max_datagram_size * INITIAL_WINDOW_PACKETS;
409
410 let sent_bytes = if !self.pacer.enabled() || !is_app || in_initcwnd {
411 0
412 } else {
413 packet_size
414 };
415
416 self.pacer.send(sent_bytes, now);
417 }
418
on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> Result<(usize, usize)>419 pub fn on_ack_received(
420 &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
421 epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
422 trace_id: &str,
423 ) -> Result<(usize, usize)> {
424 let largest_acked = ranges.last().unwrap();
425
426 // While quiche used to consider ACK frames acknowledging packet numbers
427 // larger than the largest sent one as invalid, this is not true anymore
428 // if we consider a single packet number space and multiple paths. The
429 // simplest example is the case where the host sends a probing packet on
430 // a validating path, then receives an acknowledgment for that packet on
431 // the active one.
432
433 if self.largest_acked_pkt[epoch] == u64::MAX {
434 self.largest_acked_pkt[epoch] = largest_acked;
435 } else {
436 self.largest_acked_pkt[epoch] =
437 cmp::max(self.largest_acked_pkt[epoch], largest_acked);
438 }
439
440 let mut has_ack_eliciting = false;
441
442 let mut largest_newly_acked_pkt_num = 0;
443 let mut largest_newly_acked_sent_time = now;
444
445 let mut newly_acked = Vec::new();
446
447 let mut undo_cwnd = false;
448
449 let max_rtt = cmp::max(self.latest_rtt, self.rtt());
450
451 // Detect and mark acked packets, without removing them from the sent
452 // packets list.
453 for r in ranges.iter() {
454 let lowest_acked_in_block = r.start;
455 let largest_acked_in_block = r.end - 1;
456
457 let unacked_iter = self.sent[epoch]
458 .iter_mut()
459 // Skip packets that precede the lowest acked packet in the block.
460 .skip_while(|p| p.pkt_num < lowest_acked_in_block)
461 // Skip packets that follow the largest acked packet in the block.
462 .take_while(|p| p.pkt_num <= largest_acked_in_block)
463 // Skip packets that have already been acked or lost.
464 .filter(|p| p.time_acked.is_none());
465
466 for unacked in unacked_iter {
467 unacked.time_acked = Some(now);
468
469 // Check if acked packet was already declared lost.
470 if unacked.time_lost.is_some() {
471 // Calculate new packet reordering threshold.
472 let pkt_thresh =
473 self.largest_acked_pkt[epoch] - unacked.pkt_num + 1;
474 let pkt_thresh = cmp::min(MAX_PACKET_THRESHOLD, pkt_thresh);
475
476 self.pkt_thresh = cmp::max(self.pkt_thresh, pkt_thresh);
477
478 // Calculate new time reordering threshold.
479 let loss_delay = max_rtt.mul_f64(self.time_thresh);
480
481 // unacked.time_sent can be in the future due to
482 // pacing.
483 if now.saturating_duration_since(unacked.time_sent) >
484 loss_delay
485 {
486 // TODO: do time threshold update
487 self.time_thresh = 5_f64 / 4_f64;
488 }
489
490 if unacked.in_flight {
491 undo_cwnd = true;
492 }
493
494 self.lost_spurious_count += 1;
495 continue;
496 }
497
498 if unacked.ack_eliciting {
499 has_ack_eliciting = true;
500 }
501
502 largest_newly_acked_pkt_num = unacked.pkt_num;
503 largest_newly_acked_sent_time = unacked.time_sent;
504
505 self.acked[epoch].extend(unacked.frames.drain(..));
506
507 if unacked.in_flight {
508 self.in_flight_count[epoch] =
509 self.in_flight_count[epoch].saturating_sub(1);
510 }
511
512 newly_acked.push(Acked {
513 pkt_num: unacked.pkt_num,
514
515 time_sent: unacked.time_sent,
516
517 size: unacked.size,
518
519 rtt: now.saturating_duration_since(unacked.time_sent),
520
521 delivered: unacked.delivered,
522
523 delivered_time: unacked.delivered_time,
524
525 first_sent_time: unacked.first_sent_time,
526
527 is_app_limited: unacked.is_app_limited,
528 });
529
530 trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
531 }
532 }
533
534 // Undo congestion window update.
535 if undo_cwnd {
536 (self.cc_ops.rollback)(self);
537 }
538
539 if newly_acked.is_empty() {
540 return Ok((0, 0));
541 }
542
543 if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
544 // The packet's sent time could be in the future if pacing is used
545 // and the network has a very short RTT.
546 let latest_rtt =
547 now.saturating_duration_since(largest_newly_acked_sent_time);
548
549 let ack_delay = if epoch == packet::Epoch::Application {
550 Duration::from_micros(ack_delay)
551 } else {
552 Duration::from_micros(0)
553 };
554
555 // Don't update srtt if rtt is zero.
556 if !latest_rtt.is_zero() {
557 self.update_rtt(latest_rtt, ack_delay, now);
558 }
559 }
560
561 // Detect and mark lost packets without removing them from the sent
562 // packets list.
563 let (lost_packets, lost_bytes) =
564 self.detect_lost_packets(epoch, now, trace_id);
565
566 self.on_packets_acked(newly_acked, epoch, now);
567
568 self.pto_count = 0;
569
570 self.set_loss_detection_timer(handshake_status, now);
571
572 self.drain_packets(epoch, now);
573
574 Ok((lost_packets, lost_bytes))
575 }
576
on_loss_detection_timeout( &mut self, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> (usize, usize)577 pub fn on_loss_detection_timeout(
578 &mut self, handshake_status: HandshakeStatus, now: Instant,
579 trace_id: &str,
580 ) -> (usize, usize) {
581 let (earliest_loss_time, epoch) = self.loss_time_and_space();
582
583 if earliest_loss_time.is_some() {
584 // Time threshold loss detection.
585 let (lost_packets, lost_bytes) =
586 self.detect_lost_packets(epoch, now, trace_id);
587
588 self.set_loss_detection_timer(handshake_status, now);
589
590 trace!("{} {:?}", trace_id, self);
591 return (lost_packets, lost_bytes);
592 }
593
594 let epoch = if self.bytes_in_flight > 0 {
595 // Send new data if available, else retransmit old data. If neither
596 // is available, send a single PING frame.
597 let (_, e) = self.pto_time_and_space(handshake_status, now);
598
599 e
600 } else {
601 // Client sends an anti-deadlock packet: Initial is padded to earn
602 // more anti-amplification credit, a Handshake packet proves address
603 // ownership.
604 if handshake_status.has_handshake_keys {
605 packet::Epoch::Handshake
606 } else {
607 packet::Epoch::Initial
608 }
609 };
610
611 self.pto_count += 1;
612
613 self.loss_probes[epoch] =
614 cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
615
616 let unacked_iter = self.sent[epoch]
617 .iter_mut()
618 // Skip packets that have already been acked or lost, and packets
619 // that don't contain either CRYPTO or STREAM frames.
620 .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
621 // Only return as many packets as the number of probe packets that
622 // will be sent.
623 .take(self.loss_probes[epoch]);
624
625 // Retransmit the frames from the oldest sent packets on PTO. However
626 // the packets are not actually declared lost (so there is no effect to
627 // congestion control), we just reschedule the data they carried.
628 //
629 // This will also trigger sending an ACK and retransmitting frames like
630 // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
631 // to CRYPTO and STREAM, if the original packet carried them.
632 for unacked in unacked_iter {
633 self.lost[epoch].extend_from_slice(&unacked.frames);
634 }
635
636 self.set_loss_detection_timer(handshake_status, now);
637
638 trace!("{} {:?}", trace_id, self);
639
640 (0, 0)
641 }
642
on_pkt_num_space_discarded( &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, )643 pub fn on_pkt_num_space_discarded(
644 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
645 now: Instant,
646 ) {
647 let unacked_bytes = self.sent[epoch]
648 .iter()
649 .filter(|p| {
650 p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
651 })
652 .fold(0, |acc, p| acc + p.size);
653
654 self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked_bytes);
655
656 self.sent[epoch].clear();
657 self.lost[epoch].clear();
658 self.acked[epoch].clear();
659
660 self.time_of_last_sent_ack_eliciting_pkt[epoch] = None;
661 self.loss_time[epoch] = None;
662 self.loss_probes[epoch] = 0;
663 self.in_flight_count[epoch] = 0;
664
665 self.set_loss_detection_timer(handshake_status, now);
666 }
667
loss_detection_timer(&self) -> Option<Instant>668 pub fn loss_detection_timer(&self) -> Option<Instant> {
669 self.loss_detection_timer
670 }
671
cwnd(&self) -> usize672 pub fn cwnd(&self) -> usize {
673 self.congestion_window
674 }
675
cwnd_available(&self) -> usize676 pub fn cwnd_available(&self) -> usize {
677 // Ignore cwnd when sending probe packets.
678 if self.loss_probes.iter().any(|&x| x > 0) {
679 return usize::MAX;
680 }
681
682 // Open more space (snd_cnt) for PRR when allowed.
683 self.congestion_window.saturating_sub(self.bytes_in_flight) +
684 self.prr.snd_cnt
685 }
686
rtt(&self) -> Duration687 pub fn rtt(&self) -> Duration {
688 self.smoothed_rtt.unwrap_or(INITIAL_RTT)
689 }
690
min_rtt(&self) -> Option<Duration>691 pub fn min_rtt(&self) -> Option<Duration> {
692 if self.min_rtt == Duration::ZERO {
693 return None;
694 }
695
696 Some(self.min_rtt)
697 }
698
rttvar(&self) -> Duration699 pub fn rttvar(&self) -> Duration {
700 self.rttvar
701 }
702
pto(&self) -> Duration703 pub fn pto(&self) -> Duration {
704 self.rtt() + cmp::max(self.rttvar * 4, GRANULARITY)
705 }
706
delivery_rate(&self) -> u64707 pub fn delivery_rate(&self) -> u64 {
708 self.delivery_rate.sample_delivery_rate()
709 }
710
max_datagram_size(&self) -> usize711 pub fn max_datagram_size(&self) -> usize {
712 self.max_datagram_size
713 }
714
update_max_datagram_size(&mut self, new_max_datagram_size: usize)715 pub fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
716 let max_datagram_size =
717 cmp::min(self.max_datagram_size, new_max_datagram_size);
718
719 // Update cwnd if it hasn't been updated yet.
720 if self.congestion_window ==
721 self.max_datagram_size * INITIAL_WINDOW_PACKETS
722 {
723 self.congestion_window = max_datagram_size * INITIAL_WINDOW_PACKETS;
724 }
725
726 self.pacer = pacer::Pacer::new(
727 self.pacer.enabled(),
728 self.congestion_window,
729 0,
730 max_datagram_size,
731 );
732
733 self.max_datagram_size = max_datagram_size;
734 }
735
update_rtt( &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant, )736 fn update_rtt(
737 &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant,
738 ) {
739 self.latest_rtt = latest_rtt;
740
741 match self.smoothed_rtt {
742 // First RTT sample.
743 None => {
744 self.min_rtt = self.minmax_filter.reset(now, latest_rtt);
745
746 self.smoothed_rtt = Some(latest_rtt);
747
748 self.rttvar = latest_rtt / 2;
749 },
750
751 Some(srtt) => {
752 self.min_rtt =
753 self.minmax_filter.running_min(RTT_WINDOW, now, latest_rtt);
754
755 let ack_delay = cmp::min(self.max_ack_delay, ack_delay);
756
757 // Adjust for ack delay if plausible.
758 let adjusted_rtt = if latest_rtt > self.min_rtt + ack_delay {
759 latest_rtt - ack_delay
760 } else {
761 latest_rtt
762 };
763
764 self.rttvar = self.rttvar.mul_f64(3.0 / 4.0) +
765 sub_abs(srtt, adjusted_rtt).mul_f64(1.0 / 4.0);
766
767 self.smoothed_rtt = Some(
768 srtt.mul_f64(7.0 / 8.0) + adjusted_rtt.mul_f64(1.0 / 8.0),
769 );
770 },
771 }
772 }
773
loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch)774 fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
775 let mut epoch = packet::Epoch::Initial;
776 let mut time = self.loss_time[epoch];
777
778 // Iterate over all packet number spaces starting from Handshake.
779 for &e in packet::Epoch::epochs(
780 packet::Epoch::Handshake..=packet::Epoch::Application,
781 ) {
782 let new_time = self.loss_time[e];
783
784 if time.is_none() || new_time < time {
785 time = new_time;
786 epoch = e;
787 }
788 }
789
790 (time, epoch)
791 }
792
pto_time_and_space( &self, handshake_status: HandshakeStatus, now: Instant, ) -> (Option<Instant>, packet::Epoch)793 fn pto_time_and_space(
794 &self, handshake_status: HandshakeStatus, now: Instant,
795 ) -> (Option<Instant>, packet::Epoch) {
796 let mut duration = self.pto() * 2_u32.pow(self.pto_count);
797
798 // Arm PTO from now when there are no inflight packets.
799 if self.bytes_in_flight == 0 {
800 if handshake_status.has_handshake_keys {
801 return (Some(now + duration), packet::Epoch::Handshake);
802 } else {
803 return (Some(now + duration), packet::Epoch::Initial);
804 }
805 }
806
807 let mut pto_timeout = None;
808 let mut pto_space = packet::Epoch::Initial;
809
810 // Iterate over all packet number spaces.
811 for &e in packet::Epoch::epochs(
812 packet::Epoch::Initial..=packet::Epoch::Application,
813 ) {
814 if self.in_flight_count[e] == 0 {
815 continue;
816 }
817
818 if e == packet::Epoch::Application {
819 // Skip Application Data until handshake completes.
820 if !handshake_status.completed {
821 return (pto_timeout, pto_space);
822 }
823
824 // Include max_ack_delay and backoff for Application Data.
825 duration += self.max_ack_delay * 2_u32.pow(self.pto_count);
826 }
827
828 let new_time =
829 self.time_of_last_sent_ack_eliciting_pkt[e].map(|t| t + duration);
830
831 if pto_timeout.is_none() || new_time < pto_timeout {
832 pto_timeout = new_time;
833 pto_space = e;
834 }
835 }
836
837 (pto_timeout, pto_space)
838 }
839
set_loss_detection_timer( &mut self, handshake_status: HandshakeStatus, now: Instant, )840 fn set_loss_detection_timer(
841 &mut self, handshake_status: HandshakeStatus, now: Instant,
842 ) {
843 let (earliest_loss_time, _) = self.loss_time_and_space();
844
845 if earliest_loss_time.is_some() {
846 // Time threshold loss detection.
847 self.loss_detection_timer = earliest_loss_time;
848 return;
849 }
850
851 if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
852 self.loss_detection_timer = None;
853 return;
854 }
855
856 // PTO timer.
857 let (timeout, _) = self.pto_time_and_space(handshake_status, now);
858 self.loss_detection_timer = timeout;
859 }
860
detect_lost_packets( &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str, ) -> (usize, usize)861 fn detect_lost_packets(
862 &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
863 ) -> (usize, usize) {
864 let largest_acked = self.largest_acked_pkt[epoch];
865
866 self.loss_time[epoch] = None;
867
868 let loss_delay =
869 cmp::max(self.latest_rtt, self.rtt()).mul_f64(self.time_thresh);
870
871 // Minimum time of kGranularity before packets are deemed lost.
872 let loss_delay = cmp::max(loss_delay, GRANULARITY);
873
874 // Packets sent before this time are deemed lost.
875 let lost_send_time = now.checked_sub(loss_delay).unwrap();
876
877 let mut lost_packets = 0;
878 let mut lost_bytes = 0;
879
880 let mut largest_lost_pkt = None;
881
882 let unacked_iter = self.sent[epoch]
883 .iter_mut()
884 // Skip packets that follow the largest acked packet.
885 .take_while(|p| p.pkt_num <= largest_acked)
886 // Skip packets that have already been acked or lost.
887 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
888
889 for unacked in unacked_iter {
890 // Mark packet as lost, or set time when it should be marked.
891 if unacked.time_sent <= lost_send_time ||
892 largest_acked >= unacked.pkt_num + self.pkt_thresh
893 {
894 self.lost[epoch].extend(unacked.frames.drain(..));
895
896 unacked.time_lost = Some(now);
897
898 if unacked.in_flight {
899 lost_bytes += unacked.size;
900
901 // Frames have already been removed from the packet, so
902 // cloning the whole packet should be relatively cheap.
903 largest_lost_pkt = Some(unacked.clone());
904
905 self.in_flight_count[epoch] =
906 self.in_flight_count[epoch].saturating_sub(1);
907
908 trace!(
909 "{} packet {} lost on epoch {}",
910 trace_id,
911 unacked.pkt_num,
912 epoch
913 );
914 }
915
916 lost_packets += 1;
917 self.lost_count += 1;
918 } else {
919 let loss_time = match self.loss_time[epoch] {
920 None => unacked.time_sent + loss_delay,
921
922 Some(loss_time) =>
923 cmp::min(loss_time, unacked.time_sent + loss_delay),
924 };
925
926 self.loss_time[epoch] = Some(loss_time);
927 }
928 }
929
930 self.bytes_lost += lost_bytes as u64;
931
932 if let Some(pkt) = largest_lost_pkt {
933 self.on_packets_lost(lost_bytes, &pkt, epoch, now);
934 }
935
936 self.drain_packets(epoch, now);
937
938 (lost_packets, lost_bytes)
939 }
940
drain_packets(&mut self, epoch: packet::Epoch, now: Instant)941 fn drain_packets(&mut self, epoch: packet::Epoch, now: Instant) {
942 let mut lowest_non_expired_pkt_index = self.sent[epoch].len();
943
944 // In order to avoid removing elements from the middle of the list
945 // (which would require copying other elements to compact the list),
946 // we only remove a contiguous range of elements from the start of the
947 // list.
948 //
949 // This means that acked or lost elements coming after this will not
950 // be removed at this point, but their removal is delayed for a later
951 // time, once the gaps have been filled.
952
953 // First, find the first element that is neither acked nor lost.
954 for (i, pkt) in self.sent[epoch].iter().enumerate() {
955 if let Some(time_lost) = pkt.time_lost {
956 if time_lost + self.rtt() > now {
957 lowest_non_expired_pkt_index = i;
958 break;
959 }
960 }
961
962 if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
963 lowest_non_expired_pkt_index = i;
964 break;
965 }
966 }
967
968 // Then remove elements up to the previously found index.
969 self.sent[epoch].drain(..lowest_non_expired_pkt_index);
970 }
971
on_packets_acked( &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant, )972 fn on_packets_acked(
973 &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant,
974 ) {
975 // Update delivery rate sample per acked packet.
976 for pkt in &acked {
977 self.delivery_rate.update_rate_sample(pkt, now);
978 }
979
980 // Fill in a rate sample.
981 self.delivery_rate.generate_rate_sample(self.min_rtt);
982
983 // Call congestion control hooks.
984 (self.cc_ops.on_packets_acked)(self, &acked, epoch, now);
985 }
986
in_congestion_recovery(&self, sent_time: Instant) -> bool987 fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
988 match self.congestion_recovery_start_time {
989 Some(congestion_recovery_start_time) =>
990 sent_time <= congestion_recovery_start_time,
991
992 None => false,
993 }
994 }
995
in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool996 fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool {
997 let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD;
998
999 // TODO: properly detect persistent congestion
1000 false
1001 }
1002
on_packets_lost( &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, )1003 fn on_packets_lost(
1004 &mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
1005 epoch: packet::Epoch, now: Instant,
1006 ) {
1007 self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
1008
1009 self.congestion_event(lost_bytes, largest_lost_pkt.time_sent, epoch, now);
1010
1011 if self.in_persistent_congestion(largest_lost_pkt.pkt_num) {
1012 self.collapse_cwnd();
1013 }
1014 }
1015
congestion_event( &mut self, lost_bytes: usize, time_sent: Instant, epoch: packet::Epoch, now: Instant, )1016 fn congestion_event(
1017 &mut self, lost_bytes: usize, time_sent: Instant, epoch: packet::Epoch,
1018 now: Instant,
1019 ) {
1020 if !self.in_congestion_recovery(time_sent) {
1021 (self.cc_ops.checkpoint)(self);
1022 }
1023
1024 (self.cc_ops.congestion_event)(self, lost_bytes, time_sent, epoch, now);
1025 }
1026
collapse_cwnd(&mut self)1027 fn collapse_cwnd(&mut self) {
1028 (self.cc_ops.collapse_cwnd)(self);
1029 }
1030
update_app_limited(&mut self, v: bool)1031 pub fn update_app_limited(&mut self, v: bool) {
1032 self.app_limited = v;
1033 }
1034
app_limited(&self) -> bool1035 pub fn app_limited(&self) -> bool {
1036 self.app_limited
1037 }
1038
delivery_rate_update_app_limited(&mut self, v: bool)1039 pub fn delivery_rate_update_app_limited(&mut self, v: bool) {
1040 self.delivery_rate.update_app_limited(v);
1041 }
1042
1043 #[cfg(feature = "qlog")]
maybe_qlog(&mut self) -> Option<EventData>1044 pub fn maybe_qlog(&mut self) -> Option<EventData> {
1045 let qlog_metrics = QlogMetrics {
1046 min_rtt: self.min_rtt,
1047 smoothed_rtt: self.rtt(),
1048 latest_rtt: self.latest_rtt,
1049 rttvar: self.rttvar,
1050 cwnd: self.cwnd() as u64,
1051 bytes_in_flight: self.bytes_in_flight as u64,
1052 ssthresh: self.ssthresh as u64,
1053 pacing_rate: self.pacer.rate(),
1054 };
1055
1056 self.qlog_metrics.maybe_update(qlog_metrics)
1057 }
1058
send_quantum(&self) -> usize1059 pub fn send_quantum(&self) -> usize {
1060 self.send_quantum
1061 }
1062 }
1063
1064 /// Available congestion control algorithms.
1065 ///
1066 /// This enum provides currently available list of congestion control
1067 /// algorithms.
1068 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
1069 #[repr(C)]
1070 pub enum CongestionControlAlgorithm {
1071 /// Reno congestion control algorithm. `reno` in a string form.
1072 Reno = 0,
1073 /// CUBIC congestion control algorithm (default). `cubic` in a string form.
1074 CUBIC = 1,
1075 /// BBR congestion control algorithm. `bbr` in a string form.
1076 BBR = 2,
1077 }
1078
1079 impl FromStr for CongestionControlAlgorithm {
1080 type Err = crate::Error;
1081
1082 /// Converts a string to `CongestionControlAlgorithm`.
1083 ///
1084 /// If `name` is not valid, `Error::CongestionControl` is returned.
from_str(name: &str) -> std::result::Result<Self, Self::Err>1085 fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
1086 match name {
1087 "reno" => Ok(CongestionControlAlgorithm::Reno),
1088 "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
1089 "bbr" => Ok(CongestionControlAlgorithm::BBR),
1090
1091 _ => Err(crate::Error::CongestionControl),
1092 }
1093 }
1094 }
1095
1096 pub struct CongestionControlOps {
1097 pub on_init: fn(r: &mut Recovery),
1098
1099 pub reset: fn(r: &mut Recovery),
1100
1101 pub on_packet_sent: fn(r: &mut Recovery, sent_bytes: usize, now: Instant),
1102
1103 pub on_packets_acked: fn(
1104 r: &mut Recovery,
1105 packets: &[Acked],
1106 epoch: packet::Epoch,
1107 now: Instant,
1108 ),
1109
1110 pub congestion_event: fn(
1111 r: &mut Recovery,
1112 lost_bytes: usize,
1113 time_sent: Instant,
1114 epoch: packet::Epoch,
1115 now: Instant,
1116 ),
1117
1118 pub collapse_cwnd: fn(r: &mut Recovery),
1119
1120 pub checkpoint: fn(r: &mut Recovery),
1121
1122 pub rollback: fn(r: &mut Recovery) -> bool,
1123
1124 pub has_custom_pacing: fn() -> bool,
1125
1126 pub debug_fmt:
1127 fn(r: &Recovery, formatter: &mut std::fmt::Formatter) -> std::fmt::Result,
1128 }
1129
1130 impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
from(algo: CongestionControlAlgorithm) -> Self1131 fn from(algo: CongestionControlAlgorithm) -> Self {
1132 match algo {
1133 CongestionControlAlgorithm::Reno => &reno::RENO,
1134 CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
1135 CongestionControlAlgorithm::BBR => &bbr::BBR,
1136 }
1137 }
1138 }
1139
1140 impl std::fmt::Debug for Recovery {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result1141 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1142 match self.loss_detection_timer {
1143 Some(v) => {
1144 let now = Instant::now();
1145
1146 if v > now {
1147 let d = v.duration_since(now);
1148 write!(f, "timer={d:?} ")?;
1149 } else {
1150 write!(f, "timer=exp ")?;
1151 }
1152 },
1153
1154 None => {
1155 write!(f, "timer=none ")?;
1156 },
1157 };
1158
1159 write!(f, "latest_rtt={:?} ", self.latest_rtt)?;
1160 write!(f, "srtt={:?} ", self.smoothed_rtt)?;
1161 write!(f, "min_rtt={:?} ", self.min_rtt)?;
1162 write!(f, "rttvar={:?} ", self.rttvar)?;
1163 write!(f, "loss_time={:?} ", self.loss_time)?;
1164 write!(f, "loss_probes={:?} ", self.loss_probes)?;
1165 write!(f, "cwnd={} ", self.congestion_window)?;
1166 write!(f, "ssthresh={} ", self.ssthresh)?;
1167 write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
1168 write!(f, "app_limited={} ", self.app_limited)?;
1169 write!(
1170 f,
1171 "congestion_recovery_start_time={:?} ",
1172 self.congestion_recovery_start_time
1173 )?;
1174 write!(f, "{:?} ", self.delivery_rate)?;
1175 write!(f, "pacer={:?} ", self.pacer)?;
1176
1177 if self.hystart.enabled() {
1178 write!(f, "hystart={:?} ", self.hystart)?;
1179 }
1180
1181 // CC-specific debug info
1182 (self.cc_ops.debug_fmt)(self, f)?;
1183
1184 Ok(())
1185 }
1186 }
1187
1188 #[derive(Clone)]
1189 pub struct Sent {
1190 pub pkt_num: u64,
1191
1192 pub frames: SmallVec<[frame::Frame; 1]>,
1193
1194 pub time_sent: Instant,
1195
1196 pub time_acked: Option<Instant>,
1197
1198 pub time_lost: Option<Instant>,
1199
1200 pub size: usize,
1201
1202 pub ack_eliciting: bool,
1203
1204 pub in_flight: bool,
1205
1206 pub delivered: usize,
1207
1208 pub delivered_time: Instant,
1209
1210 pub first_sent_time: Instant,
1211
1212 pub is_app_limited: bool,
1213
1214 pub has_data: bool,
1215 }
1216
1217 impl std::fmt::Debug for Sent {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result1218 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1219 write!(f, "pkt_num={:?} ", self.pkt_num)?;
1220 write!(f, "pkt_sent_time={:?} ", self.time_sent)?;
1221 write!(f, "pkt_size={:?} ", self.size)?;
1222 write!(f, "delivered={:?} ", self.delivered)?;
1223 write!(f, "delivered_time={:?} ", self.delivered_time)?;
1224 write!(f, "first_sent_time={:?} ", self.first_sent_time)?;
1225 write!(f, "is_app_limited={} ", self.is_app_limited)?;
1226 write!(f, "has_data={} ", self.has_data)?;
1227
1228 Ok(())
1229 }
1230 }
1231
1232 #[derive(Clone)]
1233 pub struct Acked {
1234 pub pkt_num: u64,
1235
1236 pub time_sent: Instant,
1237
1238 pub size: usize,
1239
1240 pub rtt: Duration,
1241
1242 pub delivered: usize,
1243
1244 pub delivered_time: Instant,
1245
1246 pub first_sent_time: Instant,
1247
1248 pub is_app_limited: bool,
1249 }
1250
1251 #[derive(Clone, Copy, Debug)]
1252 pub struct HandshakeStatus {
1253 pub has_handshake_keys: bool,
1254
1255 pub peer_verified_address: bool,
1256
1257 pub completed: bool,
1258 }
1259
1260 #[cfg(test)]
1261 impl Default for HandshakeStatus {
default() -> HandshakeStatus1262 fn default() -> HandshakeStatus {
1263 HandshakeStatus {
1264 has_handshake_keys: true,
1265
1266 peer_verified_address: true,
1267
1268 completed: true,
1269 }
1270 }
1271 }
1272
sub_abs(lhs: Duration, rhs: Duration) -> Duration1273 fn sub_abs(lhs: Duration, rhs: Duration) -> Duration {
1274 if lhs > rhs {
1275 lhs - rhs
1276 } else {
1277 rhs - lhs
1278 }
1279 }
1280
1281 // We don't need to log all qlog metrics every time there is a recovery event.
1282 // Instead, we can log only the MetricsUpdated event data fields that we care
1283 // about, only when they change. To support this, the QLogMetrics structure
1284 // keeps a running picture of the fields.
1285 #[derive(Default)]
1286 #[cfg(feature = "qlog")]
1287 struct QlogMetrics {
1288 min_rtt: Duration,
1289 smoothed_rtt: Duration,
1290 latest_rtt: Duration,
1291 rttvar: Duration,
1292 cwnd: u64,
1293 bytes_in_flight: u64,
1294 ssthresh: u64,
1295 pacing_rate: u64,
1296 }
1297
1298 #[cfg(feature = "qlog")]
1299 impl QlogMetrics {
1300 // Make a qlog event if the latest instance of QlogMetrics is different.
1301 //
1302 // This function diffs each of the fields. A qlog MetricsUpdated event is
1303 // only generated if at least one field is different. Where fields are
1304 // different, the qlog event contains the latest value.
maybe_update(&mut self, latest: Self) -> Option<EventData>1305 fn maybe_update(&mut self, latest: Self) -> Option<EventData> {
1306 let mut emit_event = false;
1307
1308 let new_min_rtt = if self.min_rtt != latest.min_rtt {
1309 self.min_rtt = latest.min_rtt;
1310 emit_event = true;
1311 Some(latest.min_rtt.as_secs_f32() * 1000.0)
1312 } else {
1313 None
1314 };
1315
1316 let new_smoothed_rtt = if self.smoothed_rtt != latest.smoothed_rtt {
1317 self.smoothed_rtt = latest.smoothed_rtt;
1318 emit_event = true;
1319 Some(latest.smoothed_rtt.as_secs_f32() * 1000.0)
1320 } else {
1321 None
1322 };
1323
1324 let new_latest_rtt = if self.latest_rtt != latest.latest_rtt {
1325 self.latest_rtt = latest.latest_rtt;
1326 emit_event = true;
1327 Some(latest.latest_rtt.as_secs_f32() * 1000.0)
1328 } else {
1329 None
1330 };
1331
1332 let new_rttvar = if self.rttvar != latest.rttvar {
1333 self.rttvar = latest.rttvar;
1334 emit_event = true;
1335 Some(latest.rttvar.as_secs_f32() * 1000.0)
1336 } else {
1337 None
1338 };
1339
1340 let new_cwnd = if self.cwnd != latest.cwnd {
1341 self.cwnd = latest.cwnd;
1342 emit_event = true;
1343 Some(latest.cwnd)
1344 } else {
1345 None
1346 };
1347
1348 let new_bytes_in_flight =
1349 if self.bytes_in_flight != latest.bytes_in_flight {
1350 self.bytes_in_flight = latest.bytes_in_flight;
1351 emit_event = true;
1352 Some(latest.bytes_in_flight)
1353 } else {
1354 None
1355 };
1356
1357 let new_ssthresh = if self.ssthresh != latest.ssthresh {
1358 self.ssthresh = latest.ssthresh;
1359 emit_event = true;
1360 Some(latest.ssthresh)
1361 } else {
1362 None
1363 };
1364
1365 let new_pacing_rate = if self.pacing_rate != latest.pacing_rate {
1366 self.pacing_rate = latest.pacing_rate;
1367 emit_event = true;
1368 Some(latest.pacing_rate)
1369 } else {
1370 None
1371 };
1372
1373 if emit_event {
1374 // QVis can't use all these fields and they can be large.
1375 return Some(EventData::MetricsUpdated(
1376 qlog::events::quic::MetricsUpdated {
1377 min_rtt: new_min_rtt,
1378 smoothed_rtt: new_smoothed_rtt,
1379 latest_rtt: new_latest_rtt,
1380 rtt_variance: new_rttvar,
1381 pto_count: None,
1382 congestion_window: new_cwnd,
1383 bytes_in_flight: new_bytes_in_flight,
1384 ssthresh: new_ssthresh,
1385 packets_in_flight: None,
1386 pacing_rate: new_pacing_rate,
1387 },
1388 ));
1389 }
1390
1391 None
1392 }
1393 }
1394
1395 #[cfg(test)]
1396 mod tests {
1397 use super::*;
1398 use smallvec::smallvec;
1399
1400 #[test]
lookup_cc_algo_ok()1401 fn lookup_cc_algo_ok() {
1402 let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
1403 assert_eq!(algo, CongestionControlAlgorithm::Reno);
1404 }
1405
1406 #[test]
lookup_cc_algo_bad()1407 fn lookup_cc_algo_bad() {
1408 assert_eq!(
1409 CongestionControlAlgorithm::from_str("???"),
1410 Err(crate::Error::CongestionControl)
1411 );
1412 }
1413
1414 #[test]
collapse_cwnd()1415 fn collapse_cwnd() {
1416 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1417 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1418
1419 let mut r = Recovery::new(&cfg);
1420
1421 // cwnd will be reset.
1422 r.collapse_cwnd();
1423 assert_eq!(r.cwnd(), r.max_datagram_size * MINIMUM_WINDOW_PACKETS);
1424 }
1425
1426 #[test]
loss_on_pto()1427 fn loss_on_pto() {
1428 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1429 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1430
1431 let mut r = Recovery::new(&cfg);
1432
1433 let mut now = Instant::now();
1434
1435 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1436
1437 // Start by sending a few packets.
1438 let p = Sent {
1439 pkt_num: 0,
1440 frames: smallvec![],
1441 time_sent: now,
1442 time_acked: None,
1443 time_lost: None,
1444 size: 1000,
1445 ack_eliciting: true,
1446 in_flight: true,
1447 delivered: 0,
1448 delivered_time: now,
1449 first_sent_time: now,
1450 is_app_limited: false,
1451 has_data: false,
1452 };
1453
1454 r.on_packet_sent(
1455 p,
1456 packet::Epoch::Application,
1457 HandshakeStatus::default(),
1458 now,
1459 "",
1460 );
1461 assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
1462 assert_eq!(r.bytes_in_flight, 1000);
1463
1464 let p = Sent {
1465 pkt_num: 1,
1466 frames: smallvec![],
1467 time_sent: now,
1468 time_acked: None,
1469 time_lost: None,
1470 size: 1000,
1471 ack_eliciting: true,
1472 in_flight: true,
1473 delivered: 0,
1474 delivered_time: now,
1475 first_sent_time: now,
1476 is_app_limited: false,
1477 has_data: false,
1478 };
1479
1480 r.on_packet_sent(
1481 p,
1482 packet::Epoch::Application,
1483 HandshakeStatus::default(),
1484 now,
1485 "",
1486 );
1487 assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1488 assert_eq!(r.bytes_in_flight, 2000);
1489
1490 let p = Sent {
1491 pkt_num: 2,
1492 frames: smallvec![],
1493 time_sent: now,
1494 time_acked: None,
1495 time_lost: None,
1496 size: 1000,
1497 ack_eliciting: true,
1498 in_flight: true,
1499 delivered: 0,
1500 delivered_time: now,
1501 first_sent_time: now,
1502 is_app_limited: false,
1503 has_data: false,
1504 };
1505
1506 r.on_packet_sent(
1507 p,
1508 packet::Epoch::Application,
1509 HandshakeStatus::default(),
1510 now,
1511 "",
1512 );
1513 assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1514 assert_eq!(r.bytes_in_flight, 3000);
1515
1516 let p = Sent {
1517 pkt_num: 3,
1518 frames: smallvec![],
1519 time_sent: now,
1520 time_acked: None,
1521 time_lost: None,
1522 size: 1000,
1523 ack_eliciting: true,
1524 in_flight: true,
1525 delivered: 0,
1526 delivered_time: now,
1527 first_sent_time: now,
1528 is_app_limited: false,
1529 has_data: false,
1530 };
1531
1532 r.on_packet_sent(
1533 p,
1534 packet::Epoch::Application,
1535 HandshakeStatus::default(),
1536 now,
1537 "",
1538 );
1539 assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1540 assert_eq!(r.bytes_in_flight, 4000);
1541
1542 // Wait for 10ms.
1543 now += Duration::from_millis(10);
1544
1545 // Only the first 2 packets are acked.
1546 let mut acked = ranges::RangeSet::default();
1547 acked.insert(0..2);
1548
1549 assert_eq!(
1550 r.on_ack_received(
1551 &acked,
1552 25,
1553 packet::Epoch::Application,
1554 HandshakeStatus::default(),
1555 now,
1556 ""
1557 ),
1558 Ok((0, 0))
1559 );
1560
1561 assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1562 assert_eq!(r.bytes_in_flight, 2000);
1563 assert_eq!(r.lost_count, 0);
1564
1565 // Wait until loss detection timer expires.
1566 now = r.loss_detection_timer().unwrap();
1567
1568 // PTO.
1569 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1570 assert_eq!(r.loss_probes[packet::Epoch::Application], 1);
1571 assert_eq!(r.lost_count, 0);
1572 assert_eq!(r.pto_count, 1);
1573
1574 let p = Sent {
1575 pkt_num: 4,
1576 frames: smallvec![],
1577 time_sent: now,
1578 time_acked: None,
1579 time_lost: None,
1580 size: 1000,
1581 ack_eliciting: true,
1582 in_flight: true,
1583 delivered: 0,
1584 delivered_time: now,
1585 first_sent_time: now,
1586 is_app_limited: false,
1587 has_data: false,
1588 };
1589
1590 r.on_packet_sent(
1591 p,
1592 packet::Epoch::Application,
1593 HandshakeStatus::default(),
1594 now,
1595 "",
1596 );
1597 assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1598 assert_eq!(r.bytes_in_flight, 3000);
1599
1600 let p = Sent {
1601 pkt_num: 5,
1602 frames: smallvec![],
1603 time_sent: now,
1604 time_acked: None,
1605 time_lost: None,
1606 size: 1000,
1607 ack_eliciting: true,
1608 in_flight: true,
1609 delivered: 0,
1610 delivered_time: now,
1611 first_sent_time: now,
1612 is_app_limited: false,
1613 has_data: false,
1614 };
1615
1616 r.on_packet_sent(
1617 p,
1618 packet::Epoch::Application,
1619 HandshakeStatus::default(),
1620 now,
1621 "",
1622 );
1623 assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1624 assert_eq!(r.bytes_in_flight, 4000);
1625 assert_eq!(r.lost_count, 0);
1626
1627 // Wait for 10ms.
1628 now += Duration::from_millis(10);
1629
1630 // PTO packets are acked.
1631 let mut acked = ranges::RangeSet::default();
1632 acked.insert(4..6);
1633
1634 assert_eq!(
1635 r.on_ack_received(
1636 &acked,
1637 25,
1638 packet::Epoch::Application,
1639 HandshakeStatus::default(),
1640 now,
1641 ""
1642 ),
1643 Ok((2, 2000))
1644 );
1645
1646 assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1647 assert_eq!(r.bytes_in_flight, 0);
1648
1649 assert_eq!(r.lost_count, 2);
1650
1651 // Wait 1 RTT.
1652 now += r.rtt();
1653
1654 r.detect_lost_packets(packet::Epoch::Application, now, "");
1655
1656 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1657 }
1658
1659 #[test]
loss_on_timer()1660 fn loss_on_timer() {
1661 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1662 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1663
1664 let mut r = Recovery::new(&cfg);
1665
1666 let mut now = Instant::now();
1667
1668 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1669
1670 // Start by sending a few packets.
1671 let p = Sent {
1672 pkt_num: 0,
1673 frames: smallvec![],
1674 time_sent: now,
1675 time_acked: None,
1676 time_lost: None,
1677 size: 1000,
1678 ack_eliciting: true,
1679 in_flight: true,
1680 delivered: 0,
1681 delivered_time: now,
1682 first_sent_time: now,
1683 is_app_limited: false,
1684 has_data: false,
1685 };
1686
1687 r.on_packet_sent(
1688 p,
1689 packet::Epoch::Application,
1690 HandshakeStatus::default(),
1691 now,
1692 "",
1693 );
1694 assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
1695 assert_eq!(r.bytes_in_flight, 1000);
1696
1697 let p = Sent {
1698 pkt_num: 1,
1699 frames: smallvec![],
1700 time_sent: now,
1701 time_acked: None,
1702 time_lost: None,
1703 size: 1000,
1704 ack_eliciting: true,
1705 in_flight: true,
1706 delivered: 0,
1707 delivered_time: now,
1708 first_sent_time: now,
1709 is_app_limited: false,
1710 has_data: false,
1711 };
1712
1713 r.on_packet_sent(
1714 p,
1715 packet::Epoch::Application,
1716 HandshakeStatus::default(),
1717 now,
1718 "",
1719 );
1720 assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1721 assert_eq!(r.bytes_in_flight, 2000);
1722
1723 let p = Sent {
1724 pkt_num: 2,
1725 frames: smallvec![],
1726 time_sent: now,
1727 time_acked: None,
1728 time_lost: None,
1729 size: 1000,
1730 ack_eliciting: true,
1731 in_flight: true,
1732 delivered: 0,
1733 delivered_time: now,
1734 first_sent_time: now,
1735 is_app_limited: false,
1736 has_data: false,
1737 };
1738
1739 r.on_packet_sent(
1740 p,
1741 packet::Epoch::Application,
1742 HandshakeStatus::default(),
1743 now,
1744 "",
1745 );
1746 assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1747 assert_eq!(r.bytes_in_flight, 3000);
1748
1749 let p = Sent {
1750 pkt_num: 3,
1751 frames: smallvec![],
1752 time_sent: now,
1753 time_acked: None,
1754 time_lost: None,
1755 size: 1000,
1756 ack_eliciting: true,
1757 in_flight: true,
1758 delivered: 0,
1759 delivered_time: now,
1760 first_sent_time: now,
1761 is_app_limited: false,
1762 has_data: false,
1763 };
1764
1765 r.on_packet_sent(
1766 p,
1767 packet::Epoch::Application,
1768 HandshakeStatus::default(),
1769 now,
1770 "",
1771 );
1772 assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1773 assert_eq!(r.bytes_in_flight, 4000);
1774
1775 // Wait for 10ms.
1776 now += Duration::from_millis(10);
1777
1778 // Only the first 2 packets and the last one are acked.
1779 let mut acked = ranges::RangeSet::default();
1780 acked.insert(0..2);
1781 acked.insert(3..4);
1782
1783 assert_eq!(
1784 r.on_ack_received(
1785 &acked,
1786 25,
1787 packet::Epoch::Application,
1788 HandshakeStatus::default(),
1789 now,
1790 ""
1791 ),
1792 Ok((0, 0))
1793 );
1794
1795 assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1796 assert_eq!(r.bytes_in_flight, 1000);
1797 assert_eq!(r.lost_count, 0);
1798
1799 // Wait until loss detection timer expires.
1800 now = r.loss_detection_timer().unwrap();
1801
1802 // Packet is declared lost.
1803 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1804 assert_eq!(r.loss_probes[packet::Epoch::Application], 0);
1805
1806 assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1807 assert_eq!(r.bytes_in_flight, 0);
1808
1809 assert_eq!(r.lost_count, 1);
1810
1811 // Wait 1 RTT.
1812 now += r.rtt();
1813
1814 r.detect_lost_packets(packet::Epoch::Application, now, "");
1815
1816 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1817 }
1818
1819 #[test]
loss_on_reordering()1820 fn loss_on_reordering() {
1821 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1822 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1823
1824 let mut r = Recovery::new(&cfg);
1825
1826 let mut now = Instant::now();
1827
1828 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1829
1830 // Start by sending a few packets.
1831 let p = Sent {
1832 pkt_num: 0,
1833 frames: smallvec![],
1834 time_sent: now,
1835 time_acked: None,
1836 time_lost: None,
1837 size: 1000,
1838 ack_eliciting: true,
1839 in_flight: true,
1840 delivered: 0,
1841 delivered_time: now,
1842 first_sent_time: now,
1843 is_app_limited: false,
1844 has_data: false,
1845 };
1846
1847 r.on_packet_sent(
1848 p,
1849 packet::Epoch::Application,
1850 HandshakeStatus::default(),
1851 now,
1852 "",
1853 );
1854 assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
1855 assert_eq!(r.bytes_in_flight, 1000);
1856
1857 let p = Sent {
1858 pkt_num: 1,
1859 frames: smallvec![],
1860 time_sent: now,
1861 time_acked: None,
1862 time_lost: None,
1863 size: 1000,
1864 ack_eliciting: true,
1865 in_flight: true,
1866 delivered: 0,
1867 delivered_time: now,
1868 first_sent_time: now,
1869 is_app_limited: false,
1870 has_data: false,
1871 };
1872
1873 r.on_packet_sent(
1874 p,
1875 packet::Epoch::Application,
1876 HandshakeStatus::default(),
1877 now,
1878 "",
1879 );
1880 assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
1881 assert_eq!(r.bytes_in_flight, 2000);
1882
1883 let p = Sent {
1884 pkt_num: 2,
1885 frames: smallvec![],
1886 time_sent: now,
1887 time_acked: None,
1888 time_lost: None,
1889 size: 1000,
1890 ack_eliciting: true,
1891 in_flight: true,
1892 delivered: 0,
1893 delivered_time: now,
1894 first_sent_time: now,
1895 is_app_limited: false,
1896 has_data: false,
1897 };
1898
1899 r.on_packet_sent(
1900 p,
1901 packet::Epoch::Application,
1902 HandshakeStatus::default(),
1903 now,
1904 "",
1905 );
1906 assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
1907 assert_eq!(r.bytes_in_flight, 3000);
1908
1909 let p = Sent {
1910 pkt_num: 3,
1911 frames: smallvec![],
1912 time_sent: now,
1913 time_acked: None,
1914 time_lost: None,
1915 size: 1000,
1916 ack_eliciting: true,
1917 in_flight: true,
1918 delivered: 0,
1919 delivered_time: now,
1920 first_sent_time: now,
1921 is_app_limited: false,
1922 has_data: false,
1923 };
1924
1925 r.on_packet_sent(
1926 p,
1927 packet::Epoch::Application,
1928 HandshakeStatus::default(),
1929 now,
1930 "",
1931 );
1932 assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1933 assert_eq!(r.bytes_in_flight, 4000);
1934
1935 // Wait for 10ms.
1936 now += Duration::from_millis(10);
1937
1938 // ACKs are reordered.
1939 let mut acked = ranges::RangeSet::default();
1940 acked.insert(2..4);
1941
1942 assert_eq!(
1943 r.on_ack_received(
1944 &acked,
1945 25,
1946 packet::Epoch::Application,
1947 HandshakeStatus::default(),
1948 now,
1949 ""
1950 ),
1951 Ok((1, 1000))
1952 );
1953
1954 now += Duration::from_millis(10);
1955
1956 let mut acked = ranges::RangeSet::default();
1957 acked.insert(0..2);
1958
1959 assert_eq!(r.pkt_thresh, INITIAL_PACKET_THRESHOLD);
1960
1961 assert_eq!(
1962 r.on_ack_received(
1963 &acked,
1964 25,
1965 packet::Epoch::Application,
1966 HandshakeStatus::default(),
1967 now,
1968 ""
1969 ),
1970 Ok((0, 0))
1971 );
1972
1973 assert_eq!(r.sent[packet::Epoch::Application].len(), 4);
1974 assert_eq!(r.bytes_in_flight, 0);
1975
1976 // Spurious loss.
1977 assert_eq!(r.lost_count, 1);
1978 assert_eq!(r.lost_spurious_count, 1);
1979
1980 // Packet threshold was increased.
1981 assert_eq!(r.pkt_thresh, 4);
1982
1983 // Wait 1 RTT.
1984 now += r.rtt();
1985
1986 r.detect_lost_packets(packet::Epoch::Application, now, "");
1987
1988 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
1989 }
1990
1991 #[test]
pacing()1992 fn pacing() {
1993 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1994 cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
1995
1996 let mut r = Recovery::new(&cfg);
1997
1998 let mut now = Instant::now();
1999
2000 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
2001
2002 // send out first packet (a full initcwnd).
2003 let p = Sent {
2004 pkt_num: 0,
2005 frames: smallvec![],
2006 time_sent: now,
2007 time_acked: None,
2008 time_lost: None,
2009 size: 12000,
2010 ack_eliciting: true,
2011 in_flight: true,
2012 delivered: 0,
2013 delivered_time: now,
2014 first_sent_time: now,
2015 is_app_limited: false,
2016 has_data: false,
2017 };
2018
2019 r.on_packet_sent(
2020 p,
2021 packet::Epoch::Application,
2022 HandshakeStatus::default(),
2023 now,
2024 "",
2025 );
2026
2027 assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
2028 assert_eq!(r.bytes_in_flight, 12000);
2029
2030 // First packet will be sent out immediately.
2031 assert_eq!(r.pacer.rate(), 0);
2032 assert_eq!(r.get_packet_send_time(), now);
2033
2034 // Wait 50ms for ACK.
2035 now += Duration::from_millis(50);
2036
2037 let mut acked = ranges::RangeSet::default();
2038 acked.insert(0..1);
2039
2040 assert_eq!(
2041 r.on_ack_received(
2042 &acked,
2043 10,
2044 packet::Epoch::Application,
2045 HandshakeStatus::default(),
2046 now,
2047 ""
2048 ),
2049 Ok((0, 0))
2050 );
2051
2052 assert_eq!(r.sent[packet::Epoch::Application].len(), 0);
2053 assert_eq!(r.bytes_in_flight, 0);
2054 assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
2055
2056 // 1 MSS increased.
2057 assert_eq!(r.congestion_window, 12000 + 1200);
2058
2059 // Send out second packet.
2060 let p = Sent {
2061 pkt_num: 1,
2062 frames: smallvec![],
2063 time_sent: now,
2064 time_acked: None,
2065 time_lost: None,
2066 size: 6000,
2067 ack_eliciting: true,
2068 in_flight: true,
2069 delivered: 0,
2070 delivered_time: now,
2071 first_sent_time: now,
2072 is_app_limited: false,
2073 has_data: false,
2074 };
2075
2076 r.on_packet_sent(
2077 p,
2078 packet::Epoch::Application,
2079 HandshakeStatus::default(),
2080 now,
2081 "",
2082 );
2083
2084 assert_eq!(r.sent[packet::Epoch::Application].len(), 1);
2085 assert_eq!(r.bytes_in_flight, 6000);
2086
2087 // Pacing is not done during initial phase of connection.
2088 assert_eq!(r.get_packet_send_time(), now);
2089
2090 // Send the third packet out.
2091 let p = Sent {
2092 pkt_num: 2,
2093 frames: smallvec![],
2094 time_sent: now,
2095 time_acked: None,
2096 time_lost: None,
2097 size: 6000,
2098 ack_eliciting: true,
2099 in_flight: true,
2100 delivered: 0,
2101 delivered_time: now,
2102 first_sent_time: now,
2103 is_app_limited: false,
2104 has_data: false,
2105 };
2106
2107 r.on_packet_sent(
2108 p,
2109 packet::Epoch::Application,
2110 HandshakeStatus::default(),
2111 now,
2112 "",
2113 );
2114
2115 assert_eq!(r.sent[packet::Epoch::Application].len(), 2);
2116 assert_eq!(r.bytes_in_flight, 12000);
2117
2118 // Send the third packet out.
2119 let p = Sent {
2120 pkt_num: 3,
2121 frames: smallvec![],
2122 time_sent: now,
2123 time_acked: None,
2124 time_lost: None,
2125 size: 1000,
2126 ack_eliciting: true,
2127 in_flight: true,
2128 delivered: 0,
2129 delivered_time: now,
2130 first_sent_time: now,
2131 is_app_limited: false,
2132 has_data: false,
2133 };
2134
2135 r.on_packet_sent(
2136 p,
2137 packet::Epoch::Application,
2138 HandshakeStatus::default(),
2139 now,
2140 "",
2141 );
2142
2143 assert_eq!(r.sent[packet::Epoch::Application].len(), 3);
2144 assert_eq!(r.bytes_in_flight, 13000);
2145
2146 // We pace this outgoing packet. as all conditions for pacing
2147 // are passed.
2148 let pacing_rate =
2149 (r.congestion_window as f64 * PACING_MULTIPLIER / 0.05) as u64;
2150 assert_eq!(r.pacer.rate(), pacing_rate);
2151
2152 assert_eq!(
2153 r.get_packet_send_time(),
2154 now + Duration::from_secs_f64(12000.0 / pacing_rate as f64)
2155 );
2156 }
2157 }
2158
2159 mod bbr;
2160 mod cubic;
2161 mod delivery_rate;
2162 mod hystart;
2163 mod pacer;
2164 mod prr;
2165 mod reno;
2166