1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 use std::cmp;
28
29 use std::str::FromStr;
30
31 use std::time::Duration;
32 use std::time::Instant;
33
34 use std::collections::VecDeque;
35
36 use crate::Config;
37 use crate::Error;
38 use crate::Result;
39
40 use crate::frame;
41 use crate::minmax;
42 use crate::packet;
43 use crate::ranges;
44
45 // Loss Recovery
46 const PACKET_THRESHOLD: u64 = 3;
47
48 const TIME_THRESHOLD: f64 = 9.0 / 8.0;
49
50 const GRANULARITY: Duration = Duration::from_millis(1);
51
52 const INITIAL_RTT: Duration = Duration::from_millis(333);
53
54 const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;
55
56 const RTT_WINDOW: Duration = Duration::from_secs(300);
57
58 const MAX_PTO_PROBES_COUNT: usize = 2;
59
60 // Congestion Control
61 const INITIAL_WINDOW_PACKETS: usize = 10;
62
63 const MINIMUM_WINDOW_PACKETS: usize = 2;
64
65 const LOSS_REDUCTION_FACTOR: f64 = 0.5;
66
67 pub struct Recovery {
68 loss_detection_timer: Option<Instant>,
69
70 pto_count: u32,
71
72 time_of_last_sent_ack_eliciting_pkt: [Option<Instant>; packet::EPOCH_COUNT],
73
74 largest_acked_pkt: [u64; packet::EPOCH_COUNT],
75
76 largest_sent_pkt: [u64; packet::EPOCH_COUNT],
77
78 latest_rtt: Duration,
79
80 smoothed_rtt: Option<Duration>,
81
82 rttvar: Duration,
83
84 minmax_filter: minmax::Minmax<Duration>,
85
86 min_rtt: Duration,
87
88 pub max_ack_delay: Duration,
89
90 loss_time: [Option<Instant>; packet::EPOCH_COUNT],
91
92 sent: [VecDeque<Sent>; packet::EPOCH_COUNT],
93
94 pub lost: [Vec<frame::Frame>; packet::EPOCH_COUNT],
95
96 pub acked: [Vec<frame::Frame>; packet::EPOCH_COUNT],
97
98 pub lost_count: usize,
99
100 pub loss_probes: [usize; packet::EPOCH_COUNT],
101
102 in_flight_count: [usize; packet::EPOCH_COUNT],
103
104 app_limited: bool,
105
106 delivery_rate: delivery_rate::Rate,
107
108 // Congestion control.
109 cc_ops: &'static CongestionControlOps,
110
111 congestion_window: usize,
112
113 bytes_in_flight: usize,
114
115 ssthresh: usize,
116
117 bytes_acked_sl: usize,
118
119 bytes_acked_ca: usize,
120
121 bytes_sent: usize,
122
123 congestion_recovery_start_time: Option<Instant>,
124
125 max_datagram_size: usize,
126
127 cubic_state: cubic::State,
128
129 // HyStart++.
130 hystart: hystart::Hystart,
131
132 // Pacing.
133 pacing_rate: u64,
134
135 last_packet_scheduled_time: Option<Instant>,
136 }
137
138 impl Recovery {
new(config: &Config) -> Self139 pub fn new(config: &Config) -> Self {
140 Recovery {
141 loss_detection_timer: None,
142
143 pto_count: 0,
144
145 time_of_last_sent_ack_eliciting_pkt: [None; packet::EPOCH_COUNT],
146
147 largest_acked_pkt: [std::u64::MAX; packet::EPOCH_COUNT],
148
149 largest_sent_pkt: [0; packet::EPOCH_COUNT],
150
151 latest_rtt: Duration::new(0, 0),
152
153 // This field should be initialized to `INITIAL_RTT` for the initial
154 // PTO calculation, but it also needs to be an `Option` to track
155 // whether any RTT sample was received, so the initial value is
156 // handled by the `rtt()` method instead.
157 smoothed_rtt: None,
158
159 minmax_filter: minmax::Minmax::new(Duration::new(0, 0)),
160
161 min_rtt: Duration::new(0, 0),
162
163 rttvar: INITIAL_RTT / 2,
164
165 max_ack_delay: Duration::new(0, 0),
166
167 loss_time: [None; packet::EPOCH_COUNT],
168
169 sent: [VecDeque::new(), VecDeque::new(), VecDeque::new()],
170
171 lost: [Vec::new(), Vec::new(), Vec::new()],
172
173 acked: [Vec::new(), Vec::new(), Vec::new()],
174
175 lost_count: 0,
176
177 loss_probes: [0; packet::EPOCH_COUNT],
178
179 in_flight_count: [0; packet::EPOCH_COUNT],
180
181 congestion_window: config.max_send_udp_payload_size *
182 INITIAL_WINDOW_PACKETS,
183
184 bytes_in_flight: 0,
185
186 ssthresh: std::usize::MAX,
187
188 bytes_acked_sl: 0,
189
190 bytes_acked_ca: 0,
191
192 bytes_sent: 0,
193
194 congestion_recovery_start_time: None,
195
196 max_datagram_size: config.max_send_udp_payload_size,
197
198 cc_ops: config.cc_algorithm.into(),
199
200 delivery_rate: delivery_rate::Rate::default(),
201
202 cubic_state: cubic::State::default(),
203
204 app_limited: false,
205
206 hystart: hystart::Hystart::new(config.hystart),
207
208 pacing_rate: 0,
209
210 last_packet_scheduled_time: None,
211 }
212 }
213
on_packet_sent( &mut self, mut pkt: Sent, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )214 pub fn on_packet_sent(
215 &mut self, mut pkt: Sent, epoch: packet::Epoch,
216 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
217 ) {
218 let ack_eliciting = pkt.ack_eliciting;
219 let in_flight = pkt.in_flight;
220 let sent_bytes = pkt.size;
221 let pkt_num = pkt.pkt_num;
222
223 self.delivery_rate.on_packet_sent(&mut pkt, now);
224
225 self.largest_sent_pkt[epoch] =
226 cmp::max(self.largest_sent_pkt[epoch], pkt_num);
227
228 self.sent[epoch].push_back(pkt);
229
230 if in_flight {
231 if ack_eliciting {
232 self.time_of_last_sent_ack_eliciting_pkt[epoch] = Some(now);
233 }
234
235 self.in_flight_count[epoch] += 1;
236
237 self.app_limited =
238 (self.bytes_in_flight + sent_bytes) < self.congestion_window;
239
240 self.on_packet_sent_cc(sent_bytes, now);
241
242 self.set_loss_detection_timer(handshake_status, now);
243 }
244
245 // HyStart++: Start of the round in a slow start.
246 if self.hystart.enabled() &&
247 epoch == packet::EPOCH_APPLICATION &&
248 self.congestion_window < self.ssthresh
249 {
250 self.hystart.start_round(pkt_num);
251 }
252
253 // Pacing: Set the pacing rate if CC doesn't do its own.
254 if !(self.cc_ops.has_custom_pacing)() {
255 if let Some(srtt) = self.smoothed_rtt {
256 let rate = (self.congestion_window as u64 * 1000000) /
257 srtt.as_micros() as u64;
258 self.set_pacing_rate(rate);
259 }
260 }
261
262 self.schedule_next_packet(epoch, now, sent_bytes);
263
264 self.bytes_sent += sent_bytes;
265 trace!("{} {:?}", trace_id, self);
266 }
267
on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant)268 fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
269 (self.cc_ops.on_packet_sent)(self, sent_bytes, now);
270 }
271
set_pacing_rate(&mut self, rate: u64)272 pub fn set_pacing_rate(&mut self, rate: u64) {
273 if rate != 0 {
274 self.pacing_rate = rate;
275 }
276 }
277
get_packet_send_time(&self) -> Option<Instant>278 pub fn get_packet_send_time(&self) -> Option<Instant> {
279 self.last_packet_scheduled_time
280 }
281
schedule_next_packet( &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize, )282 fn schedule_next_packet(
283 &mut self, epoch: packet::Epoch, now: Instant, packet_size: usize,
284 ) {
285 // Don't pace in any of these cases:
286 // * Packet epoch is not EPOCH_APPLICATION.
287 // * Packet contains only ACK frames.
288 // * The start of the connection.
289 if epoch != packet::EPOCH_APPLICATION ||
290 packet_size == 0 ||
291 self.bytes_sent <= self.congestion_window ||
292 self.pacing_rate == 0
293 {
294 self.last_packet_scheduled_time = Some(now);
295 return;
296 }
297
298 self.last_packet_scheduled_time = match self.last_packet_scheduled_time {
299 Some(last_scheduled_time) => {
300 let interval: u64 =
301 (packet_size as u64 * 1000000) / self.pacing_rate;
302 let interval = Duration::from_micros(interval);
303 let next_schedule_time = last_scheduled_time + interval;
304 Some(cmp::max(now, next_schedule_time))
305 },
306
307 None => Some(now),
308 };
309 }
310
on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> Result<()>311 pub fn on_ack_received(
312 &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
313 epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
314 trace_id: &str,
315 ) -> Result<()> {
316 let largest_acked = ranges.last().unwrap();
317
318 // If the largest packet number acked exceeds any packet number we have
319 // sent, then the ACK is obviously invalid, so there's no need to
320 // continue further.
321 if largest_acked > self.largest_sent_pkt[epoch] {
322 if cfg!(feature = "fuzzing") {
323 return Ok(());
324 }
325
326 return Err(Error::InvalidPacket);
327 }
328
329 if self.largest_acked_pkt[epoch] == std::u64::MAX {
330 self.largest_acked_pkt[epoch] = largest_acked;
331 } else {
332 self.largest_acked_pkt[epoch] =
333 cmp::max(self.largest_acked_pkt[epoch], largest_acked);
334 }
335
336 let mut has_ack_eliciting = false;
337
338 let mut largest_newly_acked_pkt_num = 0;
339 let mut largest_newly_acked_sent_time = now;
340
341 let mut newly_acked = Vec::new();
342
343 // Detect and mark acked packets, without removing them from the sent
344 // packets list.
345 for r in ranges.iter() {
346 let lowest_acked = r.start;
347 let largest_acked = r.end - 1;
348
349 let unacked_iter = self.sent[epoch]
350 .iter_mut()
351 // Skip packets that precede the lowest acked packet in the block.
352 .skip_while(|p| p.pkt_num < lowest_acked)
353 // Skip packets that follow the largest acked packet in the block.
354 .take_while(|p| p.pkt_num <= largest_acked)
355 // Skip packets that have already been acked or lost.
356 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
357
358 for unacked in unacked_iter {
359 unacked.time_acked = Some(now);
360
361 if unacked.ack_eliciting {
362 has_ack_eliciting = true;
363 }
364
365 largest_newly_acked_pkt_num = unacked.pkt_num;
366 largest_newly_acked_sent_time = unacked.time_sent;
367
368 self.acked[epoch].append(&mut unacked.frames);
369
370 if unacked.in_flight {
371 self.in_flight_count[epoch] =
372 self.in_flight_count[epoch].saturating_sub(1);
373
374 self.delivery_rate.on_packet_acked(&unacked, now);
375 }
376
377 newly_acked.push(Acked {
378 pkt_num: unacked.pkt_num,
379
380 time_sent: unacked.time_sent,
381
382 size: unacked.size,
383 });
384
385 trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
386 }
387 }
388
389 self.delivery_rate.estimate();
390
391 if newly_acked.is_empty() {
392 return Ok(());
393 }
394
395 if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
396 let latest_rtt = now - largest_newly_acked_sent_time;
397
398 let ack_delay = if epoch == packet::EPOCH_APPLICATION {
399 Duration::from_micros(ack_delay)
400 } else {
401 Duration::from_micros(0)
402 };
403
404 self.update_rtt(latest_rtt, ack_delay, now);
405 }
406
407 // Detect and mark lost packets without removing them from the sent
408 // packets list.
409 self.detect_lost_packets(epoch, now, trace_id);
410
411 self.on_packets_acked(newly_acked, epoch, now);
412
413 self.pto_count = 0;
414
415 self.set_loss_detection_timer(handshake_status, now);
416
417 self.drain_packets(epoch);
418
419 Ok(())
420 }
421
on_loss_detection_timeout( &mut self, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )422 pub fn on_loss_detection_timeout(
423 &mut self, handshake_status: HandshakeStatus, now: Instant,
424 trace_id: &str,
425 ) {
426 let (earliest_loss_time, epoch) = self.loss_time_and_space();
427
428 if earliest_loss_time.is_some() {
429 // Time threshold loss detection.
430 self.detect_lost_packets(epoch, now, trace_id);
431
432 self.set_loss_detection_timer(handshake_status, now);
433
434 trace!("{} {:?}", trace_id, self);
435 return;
436 }
437
438 let epoch = if self.bytes_in_flight > 0 {
439 // Send new data if available, else retransmit old data. If neither
440 // is available, send a single PING frame.
441 let (_, e) = self.pto_time_and_space(handshake_status, now);
442
443 e
444 } else {
445 // Client sends an anti-deadlock packet: Initial is padded to earn
446 // more anti-amplification credit, a Handshake packet proves address
447 // ownership.
448 if handshake_status.has_handshake_keys {
449 packet::EPOCH_HANDSHAKE
450 } else {
451 packet::EPOCH_INITIAL
452 }
453 };
454
455 self.pto_count += 1;
456
457 self.loss_probes[epoch] =
458 cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
459
460 let unacked_iter = self.sent[epoch]
461 .iter_mut()
462 // Skip packets that have already been acked or lost, and packets
463 // that don't contain either CRYPTO or STREAM frames.
464 .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
465 // Only return as many packets as the number of probe packets that
466 // will be sent.
467 .take(self.loss_probes[epoch]);
468
469 // Retransmit the frames from the oldest sent packets on PTO. However
470 // the packets are not actually declared lost (so there is no effect to
471 // congestion control), we just reschedule the data they carried.
472 //
473 // This will also trigger sending an ACK and retransmitting frames like
474 // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
475 // to CRYPTO and STREAM, if the original packet carried them.
476 for unacked in unacked_iter {
477 self.lost[epoch].extend_from_slice(&unacked.frames);
478 }
479
480 self.set_loss_detection_timer(handshake_status, now);
481
482 trace!("{} {:?}", trace_id, self);
483 }
484
on_pkt_num_space_discarded( &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, )485 pub fn on_pkt_num_space_discarded(
486 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
487 now: Instant,
488 ) {
489 let unacked_bytes = self.sent[epoch]
490 .iter()
491 .filter(|p| {
492 p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
493 })
494 .fold(0, |acc, p| acc + p.size);
495
496 self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked_bytes);
497
498 self.sent[epoch].clear();
499 self.lost[epoch].clear();
500 self.acked[epoch].clear();
501
502 self.time_of_last_sent_ack_eliciting_pkt[epoch] = None;
503 self.loss_time[epoch] = None;
504 self.loss_probes[epoch] = 0;
505 self.in_flight_count[epoch] = 0;
506
507 self.set_loss_detection_timer(handshake_status, now);
508 }
509
loss_detection_timer(&self) -> Option<Instant>510 pub fn loss_detection_timer(&self) -> Option<Instant> {
511 self.loss_detection_timer
512 }
513
cwnd(&self) -> usize514 pub fn cwnd(&self) -> usize {
515 self.congestion_window
516 }
517
cwnd_available(&self) -> usize518 pub fn cwnd_available(&self) -> usize {
519 // Ignore cwnd when sending probe packets.
520 if self.loss_probes.iter().any(|&x| x > 0) {
521 return std::usize::MAX;
522 }
523
524 self.congestion_window.saturating_sub(self.bytes_in_flight)
525 }
526
rtt(&self) -> Duration527 pub fn rtt(&self) -> Duration {
528 self.smoothed_rtt.unwrap_or(INITIAL_RTT)
529 }
530
pto(&self) -> Duration531 pub fn pto(&self) -> Duration {
532 self.rtt() + cmp::max(self.rttvar * 4, GRANULARITY)
533 }
534
delivery_rate(&self) -> u64535 pub fn delivery_rate(&self) -> u64 {
536 self.delivery_rate.delivery_rate()
537 }
538
max_datagram_size(&self) -> usize539 pub fn max_datagram_size(&self) -> usize {
540 self.max_datagram_size
541 }
542
update_max_datagram_size(&mut self, new_max_datagram_size: usize)543 pub fn update_max_datagram_size(&mut self, new_max_datagram_size: usize) {
544 let max_datagram_size =
545 cmp::min(self.max_datagram_size, new_max_datagram_size);
546
547 // Congestion Window is updated only when it's not updated already.
548 if self.congestion_window ==
549 self.max_datagram_size * INITIAL_WINDOW_PACKETS
550 {
551 self.congestion_window = max_datagram_size * INITIAL_WINDOW_PACKETS;
552 }
553
554 self.max_datagram_size = max_datagram_size;
555 }
556
update_rtt( &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant, )557 fn update_rtt(
558 &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant,
559 ) {
560 self.latest_rtt = latest_rtt;
561
562 match self.smoothed_rtt {
563 // First RTT sample.
564 None => {
565 self.min_rtt = self.minmax_filter.reset(now, latest_rtt);
566
567 self.smoothed_rtt = Some(latest_rtt);
568
569 self.rttvar = latest_rtt / 2;
570 },
571
572 Some(srtt) => {
573 self.min_rtt =
574 self.minmax_filter.running_min(RTT_WINDOW, now, latest_rtt);
575
576 let ack_delay = cmp::min(self.max_ack_delay, ack_delay);
577
578 // Adjust for ack delay if plausible.
579 let adjusted_rtt = if latest_rtt > self.min_rtt + ack_delay {
580 latest_rtt - ack_delay
581 } else {
582 latest_rtt
583 };
584
585 self.rttvar = self.rttvar.mul_f64(3.0 / 4.0) +
586 sub_abs(srtt, adjusted_rtt).mul_f64(1.0 / 4.0);
587
588 self.smoothed_rtt = Some(
589 srtt.mul_f64(7.0 / 8.0) + adjusted_rtt.mul_f64(1.0 / 8.0),
590 );
591 },
592 }
593 }
594
loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch)595 fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
596 let mut epoch = packet::EPOCH_INITIAL;
597 let mut time = self.loss_time[epoch];
598
599 // Iterate over all packet number spaces starting from Handshake.
600 #[allow(clippy::needless_range_loop)]
601 for e in packet::EPOCH_HANDSHAKE..packet::EPOCH_COUNT {
602 let new_time = self.loss_time[e];
603
604 if time.is_none() || new_time < time {
605 time = new_time;
606 epoch = e;
607 }
608 }
609
610 (time, epoch)
611 }
612
pto_time_and_space( &self, handshake_status: HandshakeStatus, now: Instant, ) -> (Option<Instant>, packet::Epoch)613 fn pto_time_and_space(
614 &self, handshake_status: HandshakeStatus, now: Instant,
615 ) -> (Option<Instant>, packet::Epoch) {
616 let mut duration = self.pto() * 2_u32.pow(self.pto_count);
617
618 // Arm PTO from now when there are no inflight packets.
619 if self.bytes_in_flight == 0 {
620 if handshake_status.has_handshake_keys {
621 return (Some(now + duration), packet::EPOCH_HANDSHAKE);
622 } else {
623 return (Some(now + duration), packet::EPOCH_INITIAL);
624 }
625 }
626
627 let mut pto_timeout = None;
628 let mut pto_space = packet::EPOCH_INITIAL;
629
630 // Iterate over all packet number spaces.
631 for e in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
632 if self.in_flight_count[e] == 0 {
633 continue;
634 }
635
636 if e == packet::EPOCH_APPLICATION {
637 // Skip Application Data until handshake completes.
638 if !handshake_status.completed {
639 return (pto_timeout, pto_space);
640 }
641
642 // Include max_ack_delay and backoff for Application Data.
643 duration += self.max_ack_delay * 2_u32.pow(self.pto_count);
644 }
645
646 let new_time =
647 self.time_of_last_sent_ack_eliciting_pkt[e].map(|t| t + duration);
648
649 if pto_timeout.is_none() || new_time < pto_timeout {
650 pto_timeout = new_time;
651 pto_space = e;
652 }
653 }
654
655 (pto_timeout, pto_space)
656 }
657
set_loss_detection_timer( &mut self, handshake_status: HandshakeStatus, now: Instant, )658 fn set_loss_detection_timer(
659 &mut self, handshake_status: HandshakeStatus, now: Instant,
660 ) {
661 let (earliest_loss_time, _) = self.loss_time_and_space();
662
663 if earliest_loss_time.is_some() {
664 // Time threshold loss detection.
665 self.loss_detection_timer = earliest_loss_time;
666 return;
667 }
668
669 if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
670 self.loss_detection_timer = None;
671 return;
672 }
673
674 // PTO timer.
675 let (timeout, _) = self.pto_time_and_space(handshake_status, now);
676 self.loss_detection_timer = timeout;
677 }
678
detect_lost_packets( &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str, )679 fn detect_lost_packets(
680 &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
681 ) {
682 let largest_acked = self.largest_acked_pkt[epoch];
683
684 self.loss_time[epoch] = None;
685
686 let loss_delay =
687 cmp::max(self.latest_rtt, self.rtt()).mul_f64(TIME_THRESHOLD);
688
689 // Minimum time of kGranularity before packets are deemed lost.
690 let loss_delay = cmp::max(loss_delay, GRANULARITY);
691
692 // Packets sent before this time are deemed lost.
693 let lost_send_time = now - loss_delay;
694
695 let mut lost_bytes = 0;
696
697 let mut largest_lost_pkt = None;
698
699 let unacked_iter = self.sent[epoch]
700 .iter_mut()
701 // Skip packets that follow the largest acked packet.
702 .take_while(|p| p.pkt_num <= largest_acked)
703 // Skip packets that have already been acked or lost.
704 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
705
706 for unacked in unacked_iter {
707 // Mark packet as lost, or set time when it should be marked.
708 if unacked.time_sent <= lost_send_time ||
709 largest_acked >= unacked.pkt_num + PACKET_THRESHOLD
710 {
711 self.lost[epoch].append(&mut unacked.frames);
712
713 unacked.time_lost = Some(now);
714
715 if unacked.in_flight {
716 lost_bytes += unacked.size;
717
718 // Frames have already been removed from the packet, so
719 // cloning the whole packet should be relatively cheap.
720 largest_lost_pkt = Some(unacked.clone());
721
722 self.in_flight_count[epoch] =
723 self.in_flight_count[epoch].saturating_sub(1);
724
725 trace!(
726 "{} packet {} lost on epoch {}",
727 trace_id,
728 unacked.pkt_num,
729 epoch
730 );
731 }
732
733 self.lost_count += 1;
734 } else {
735 let loss_time = match self.loss_time[epoch] {
736 None => unacked.time_sent + loss_delay,
737
738 Some(loss_time) =>
739 cmp::min(loss_time, unacked.time_sent + loss_delay),
740 };
741
742 self.loss_time[epoch] = Some(loss_time);
743 }
744 }
745
746 if let Some(pkt) = largest_lost_pkt {
747 self.on_packets_lost(lost_bytes, &pkt, epoch, now);
748 }
749
750 self.drain_packets(epoch);
751 }
752
drain_packets(&mut self, epoch: packet::Epoch)753 fn drain_packets(&mut self, epoch: packet::Epoch) {
754 let mut lowest_non_expired_pkt_index = self.sent[epoch].len();
755
756 // In order to avoid removing elements from the middle of the list
757 // (which would require copying other elements to compact the list),
758 // we only remove a contiguous range of elements from the start of the
759 // list.
760 //
761 // This means that acked or lost elements coming after this will not
762 // be removed at this point, but their removal is delayed for a later
763 // time, once the gaps have been filled.
764
765 // First, find the first element that is neither acked nor lost.
766 for (i, pkt) in self.sent[epoch].iter().enumerate() {
767 if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
768 lowest_non_expired_pkt_index = i;
769 break;
770 }
771 }
772
773 // Then remove elements up to the previously found index.
774 self.sent[epoch].drain(..lowest_non_expired_pkt_index);
775 }
776
on_packets_acked( &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant, )777 fn on_packets_acked(
778 &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant,
779 ) {
780 for pkt in acked {
781 (self.cc_ops.on_packet_acked)(self, &pkt, epoch, now);
782 }
783 }
784
in_congestion_recovery(&self, sent_time: Instant) -> bool785 fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
786 match self.congestion_recovery_start_time {
787 Some(congestion_recovery_start_time) =>
788 sent_time <= congestion_recovery_start_time,
789
790 None => false,
791 }
792 }
793
in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool794 fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool {
795 let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD;
796
797 // TODO: properly detect persistent congestion
798 false
799 }
800
on_packets_lost( &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, )801 fn on_packets_lost(
802 &mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
803 epoch: packet::Epoch, now: Instant,
804 ) {
805 self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
806
807 self.congestion_event(largest_lost_pkt.time_sent, epoch, now);
808
809 if self.in_persistent_congestion(largest_lost_pkt.pkt_num) {
810 self.collapse_cwnd();
811 }
812 }
813
congestion_event( &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant, )814 fn congestion_event(
815 &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant,
816 ) {
817 if !self.in_congestion_recovery(time_sent) {
818 (self.cc_ops.checkpoint)(self);
819 }
820
821 (self.cc_ops.congestion_event)(self, time_sent, epoch, now);
822 }
823
collapse_cwnd(&mut self)824 fn collapse_cwnd(&mut self) {
825 (self.cc_ops.collapse_cwnd)(self);
826 }
827
rate_check_app_limited(&mut self)828 pub fn rate_check_app_limited(&mut self) {
829 if self.app_limited {
830 self.delivery_rate.check_app_limited(self.bytes_in_flight)
831 }
832 }
833
update_app_limited(&mut self, v: bool)834 pub fn update_app_limited(&mut self, v: bool) {
835 self.app_limited = v;
836 }
837
app_limited(&mut self) -> bool838 pub fn app_limited(&mut self) -> bool {
839 self.app_limited
840 }
841
842 #[cfg(feature = "qlog")]
to_qlog(&self) -> qlog::event::Event843 pub fn to_qlog(&self) -> qlog::event::Event {
844 // QVis can't use all these fields and they can be large.
845 qlog::event::Event::metrics_updated(
846 Some(self.min_rtt.as_millis() as u64),
847 Some(self.rtt().as_millis() as u64),
848 Some(self.latest_rtt.as_millis() as u64),
849 Some(self.rttvar.as_millis() as u64),
850 None, // delay
851 None, // probe_count
852 Some(self.cwnd() as u64),
853 Some(self.bytes_in_flight as u64),
854 None, // ssthresh
855 None, // packets_in_flight
856 None, // in_recovery
857 None, // pacing_rate
858 )
859 }
860 }
861
862 /// Available congestion control algorithms.
863 ///
864 /// This enum provides currently available list of congestion control
865 /// algorithms.
866 #[derive(Debug, Copy, Clone, PartialEq)]
867 #[repr(C)]
868 pub enum CongestionControlAlgorithm {
869 /// Reno congestion control algorithm. `reno` in a string form.
870 Reno = 0,
871 /// CUBIC congestion control algorithm (default). `cubic` in a string form.
872 CUBIC = 1,
873 }
874
875 impl FromStr for CongestionControlAlgorithm {
876 type Err = crate::Error;
877
878 /// Converts a string to `CongestionControlAlgorithm`.
879 ///
880 /// If `name` is not valid, `Error::CongestionControl` is returned.
from_str(name: &str) -> std::result::Result<Self, Self::Err>881 fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
882 match name {
883 "reno" => Ok(CongestionControlAlgorithm::Reno),
884 "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
885
886 _ => Err(crate::Error::CongestionControl),
887 }
888 }
889 }
890
891 pub struct CongestionControlOps {
892 pub on_packet_sent: fn(r: &mut Recovery, sent_bytes: usize, now: Instant),
893
894 pub on_packet_acked:
895 fn(r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant),
896
897 pub congestion_event: fn(
898 r: &mut Recovery,
899 time_sent: Instant,
900 epoch: packet::Epoch,
901 now: Instant,
902 ),
903
904 pub collapse_cwnd: fn(r: &mut Recovery),
905
906 pub checkpoint: fn(r: &mut Recovery),
907
908 pub rollback: fn(r: &mut Recovery),
909
910 pub has_custom_pacing: fn() -> bool,
911 }
912
913 impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
from(algo: CongestionControlAlgorithm) -> Self914 fn from(algo: CongestionControlAlgorithm) -> Self {
915 match algo {
916 CongestionControlAlgorithm::Reno => &reno::RENO,
917 CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
918 }
919 }
920 }
921
922 impl std::fmt::Debug for Recovery {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result923 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
924 match self.loss_detection_timer {
925 Some(v) => {
926 let now = Instant::now();
927
928 if v > now {
929 let d = v.duration_since(now);
930 write!(f, "timer={:?} ", d)?;
931 } else {
932 write!(f, "timer=exp ")?;
933 }
934 },
935
936 None => {
937 write!(f, "timer=none ")?;
938 },
939 };
940
941 write!(f, "latest_rtt={:?} ", self.latest_rtt)?;
942 write!(f, "srtt={:?} ", self.smoothed_rtt)?;
943 write!(f, "min_rtt={:?} ", self.min_rtt)?;
944 write!(f, "rttvar={:?} ", self.rttvar)?;
945 write!(f, "loss_time={:?} ", self.loss_time)?;
946 write!(f, "loss_probes={:?} ", self.loss_probes)?;
947 write!(f, "cwnd={} ", self.congestion_window)?;
948 write!(f, "ssthresh={} ", self.ssthresh)?;
949 write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
950 write!(f, "app_limited={} ", self.app_limited)?;
951 write!(
952 f,
953 "congestion_recovery_start_time={:?} ",
954 self.congestion_recovery_start_time
955 )?;
956 write!(f, "{:?} ", self.delivery_rate)?;
957 write!(f, "pacing_rate={:?} ", self.pacing_rate)?;
958 write!(
959 f,
960 "last_packet_scheduled_time={:?} ",
961 self.last_packet_scheduled_time
962 )?;
963
964 if self.hystart.enabled() {
965 write!(f, "hystart={:?} ", self.hystart)?;
966 }
967
968 Ok(())
969 }
970 }
971
972 #[derive(Clone)]
973 pub struct Sent {
974 pub pkt_num: u64,
975
976 pub frames: Vec<frame::Frame>,
977
978 pub time_sent: Instant,
979
980 pub time_acked: Option<Instant>,
981
982 pub time_lost: Option<Instant>,
983
984 pub size: usize,
985
986 pub ack_eliciting: bool,
987
988 pub in_flight: bool,
989
990 pub delivered: usize,
991
992 pub delivered_time: Instant,
993
994 pub recent_delivered_packet_sent_time: Instant,
995
996 pub is_app_limited: bool,
997
998 pub has_data: bool,
999 }
1000
1001 impl std::fmt::Debug for Sent {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result1002 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1003 write!(f, "pkt_num={:?} ", self.pkt_num)?;
1004 write!(f, "pkt_sent_time={:?} ", self.time_sent.elapsed())?;
1005 write!(f, "pkt_size={:?} ", self.size)?;
1006 write!(f, "delivered={:?} ", self.delivered)?;
1007 write!(f, "delivered_time={:?} ", self.delivered_time.elapsed())?;
1008 write!(
1009 f,
1010 "recent_delivered_packet_sent_time={:?} ",
1011 self.recent_delivered_packet_sent_time.elapsed()
1012 )?;
1013 write!(f, "is_app_limited={} ", self.is_app_limited)?;
1014 write!(f, "has_data={} ", self.has_data)?;
1015
1016 Ok(())
1017 }
1018 }
1019
1020 #[derive(Clone)]
1021 pub struct Acked {
1022 pub pkt_num: u64,
1023
1024 pub time_sent: Instant,
1025
1026 pub size: usize,
1027 }
1028
1029 #[derive(Clone, Copy, Debug)]
1030 pub struct HandshakeStatus {
1031 pub has_handshake_keys: bool,
1032
1033 pub peer_verified_address: bool,
1034
1035 pub completed: bool,
1036 }
1037
1038 #[cfg(test)]
1039 impl Default for HandshakeStatus {
default() -> HandshakeStatus1040 fn default() -> HandshakeStatus {
1041 HandshakeStatus {
1042 has_handshake_keys: true,
1043
1044 peer_verified_address: true,
1045
1046 completed: true,
1047 }
1048 }
1049 }
1050
sub_abs(lhs: Duration, rhs: Duration) -> Duration1051 fn sub_abs(lhs: Duration, rhs: Duration) -> Duration {
1052 if lhs > rhs {
1053 lhs - rhs
1054 } else {
1055 rhs - lhs
1056 }
1057 }
1058
1059 #[cfg(test)]
1060 mod tests {
1061 use super::*;
1062
1063 #[test]
lookup_cc_algo_ok()1064 fn lookup_cc_algo_ok() {
1065 let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
1066 assert_eq!(algo, CongestionControlAlgorithm::Reno);
1067 }
1068
1069 #[test]
lookup_cc_algo_bad()1070 fn lookup_cc_algo_bad() {
1071 assert_eq!(
1072 CongestionControlAlgorithm::from_str("???"),
1073 Err(Error::CongestionControl)
1074 );
1075 }
1076
1077 #[test]
collapse_cwnd()1078 fn collapse_cwnd() {
1079 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1080 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1081
1082 let mut r = Recovery::new(&cfg);
1083
1084 // cwnd will be reset.
1085 r.collapse_cwnd();
1086 assert_eq!(r.cwnd(), r.max_datagram_size * MINIMUM_WINDOW_PACKETS);
1087 }
1088
1089 #[test]
loss_on_pto()1090 fn loss_on_pto() {
1091 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1092 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1093
1094 let mut r = Recovery::new(&cfg);
1095
1096 let mut now = Instant::now();
1097
1098 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1099
1100 // Start by sending a few packets.
1101 let p = Sent {
1102 pkt_num: 0,
1103 frames: vec![],
1104 time_sent: now,
1105 time_acked: None,
1106 time_lost: None,
1107 size: 1000,
1108 ack_eliciting: true,
1109 in_flight: true,
1110 delivered: 0,
1111 delivered_time: now,
1112 recent_delivered_packet_sent_time: now,
1113 is_app_limited: false,
1114 has_data: false,
1115 };
1116
1117 r.on_packet_sent(
1118 p,
1119 packet::EPOCH_APPLICATION,
1120 HandshakeStatus::default(),
1121 now,
1122 "",
1123 );
1124 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1125 assert_eq!(r.bytes_in_flight, 1000);
1126
1127 let p = Sent {
1128 pkt_num: 1,
1129 frames: vec![],
1130 time_sent: now,
1131 time_acked: None,
1132 time_lost: None,
1133 size: 1000,
1134 ack_eliciting: true,
1135 in_flight: true,
1136 delivered: 0,
1137 delivered_time: now,
1138 recent_delivered_packet_sent_time: now,
1139 is_app_limited: false,
1140 has_data: false,
1141 };
1142
1143 r.on_packet_sent(
1144 p,
1145 packet::EPOCH_APPLICATION,
1146 HandshakeStatus::default(),
1147 now,
1148 "",
1149 );
1150 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1151 assert_eq!(r.bytes_in_flight, 2000);
1152
1153 let p = Sent {
1154 pkt_num: 2,
1155 frames: vec![],
1156 time_sent: now,
1157 time_acked: None,
1158 time_lost: None,
1159 size: 1000,
1160 ack_eliciting: true,
1161 in_flight: true,
1162 delivered: 0,
1163 delivered_time: now,
1164 recent_delivered_packet_sent_time: now,
1165 is_app_limited: false,
1166 has_data: false,
1167 };
1168
1169 r.on_packet_sent(
1170 p,
1171 packet::EPOCH_APPLICATION,
1172 HandshakeStatus::default(),
1173 now,
1174 "",
1175 );
1176 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1177 assert_eq!(r.bytes_in_flight, 3000);
1178
1179 let p = Sent {
1180 pkt_num: 3,
1181 frames: vec![],
1182 time_sent: now,
1183 time_acked: None,
1184 time_lost: None,
1185 size: 1000,
1186 ack_eliciting: true,
1187 in_flight: true,
1188 delivered: 0,
1189 delivered_time: now,
1190 recent_delivered_packet_sent_time: now,
1191 is_app_limited: false,
1192 has_data: false,
1193 };
1194
1195 r.on_packet_sent(
1196 p,
1197 packet::EPOCH_APPLICATION,
1198 HandshakeStatus::default(),
1199 now,
1200 "",
1201 );
1202 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1203 assert_eq!(r.bytes_in_flight, 4000);
1204
1205 // Wait for 10ms.
1206 now += Duration::from_millis(10);
1207
1208 // Only the first 2 packets are acked.
1209 let mut acked = ranges::RangeSet::default();
1210 acked.insert(0..2);
1211
1212 assert_eq!(
1213 r.on_ack_received(
1214 &acked,
1215 25,
1216 packet::EPOCH_APPLICATION,
1217 HandshakeStatus::default(),
1218 now,
1219 ""
1220 ),
1221 Ok(())
1222 );
1223
1224 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1225 assert_eq!(r.bytes_in_flight, 2000);
1226 assert_eq!(r.lost_count, 0);
1227
1228 // Wait until loss detection timer expires.
1229 now = r.loss_detection_timer().unwrap();
1230
1231 // PTO.
1232 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1233 assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 1);
1234 assert_eq!(r.lost_count, 0);
1235 assert_eq!(r.pto_count, 1);
1236
1237 let p = Sent {
1238 pkt_num: 4,
1239 frames: vec![],
1240 time_sent: now,
1241 time_acked: None,
1242 time_lost: None,
1243 size: 1000,
1244 ack_eliciting: true,
1245 in_flight: true,
1246 delivered: 0,
1247 delivered_time: now,
1248 recent_delivered_packet_sent_time: now,
1249 is_app_limited: false,
1250 has_data: false,
1251 };
1252
1253 r.on_packet_sent(
1254 p,
1255 packet::EPOCH_APPLICATION,
1256 HandshakeStatus::default(),
1257 now,
1258 "",
1259 );
1260 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1261 assert_eq!(r.bytes_in_flight, 3000);
1262
1263 let p = Sent {
1264 pkt_num: 5,
1265 frames: vec![],
1266 time_sent: now,
1267 time_acked: None,
1268 time_lost: None,
1269 size: 1000,
1270 ack_eliciting: true,
1271 in_flight: true,
1272 delivered: 0,
1273 delivered_time: now,
1274 recent_delivered_packet_sent_time: now,
1275 is_app_limited: false,
1276 has_data: false,
1277 };
1278
1279 r.on_packet_sent(
1280 p,
1281 packet::EPOCH_APPLICATION,
1282 HandshakeStatus::default(),
1283 now,
1284 "",
1285 );
1286 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1287 assert_eq!(r.bytes_in_flight, 4000);
1288 assert_eq!(r.lost_count, 0);
1289
1290 // Wait for 10ms.
1291 now += Duration::from_millis(10);
1292
1293 // PTO packets are acked.
1294 let mut acked = ranges::RangeSet::default();
1295 acked.insert(4..6);
1296
1297 assert_eq!(
1298 r.on_ack_received(
1299 &acked,
1300 25,
1301 packet::EPOCH_APPLICATION,
1302 HandshakeStatus::default(),
1303 now,
1304 ""
1305 ),
1306 Ok(())
1307 );
1308
1309 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1310 assert_eq!(r.bytes_in_flight, 0);
1311
1312 assert_eq!(r.lost_count, 2);
1313 }
1314
1315 #[test]
loss_on_timer()1316 fn loss_on_timer() {
1317 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1318 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1319
1320 let mut r = Recovery::new(&cfg);
1321
1322 let mut now = Instant::now();
1323
1324 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1325
1326 // Start by sending a few packets.
1327 let p = Sent {
1328 pkt_num: 0,
1329 frames: vec![],
1330 time_sent: now,
1331 time_acked: None,
1332 time_lost: None,
1333 size: 1000,
1334 ack_eliciting: true,
1335 in_flight: true,
1336 delivered: 0,
1337 delivered_time: now,
1338 recent_delivered_packet_sent_time: now,
1339 is_app_limited: false,
1340 has_data: false,
1341 };
1342
1343 r.on_packet_sent(
1344 p,
1345 packet::EPOCH_APPLICATION,
1346 HandshakeStatus::default(),
1347 now,
1348 "",
1349 );
1350 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1351 assert_eq!(r.bytes_in_flight, 1000);
1352
1353 let p = Sent {
1354 pkt_num: 1,
1355 frames: vec![],
1356 time_sent: now,
1357 time_acked: None,
1358 time_lost: None,
1359 size: 1000,
1360 ack_eliciting: true,
1361 in_flight: true,
1362 delivered: 0,
1363 delivered_time: now,
1364 recent_delivered_packet_sent_time: now,
1365 is_app_limited: false,
1366 has_data: false,
1367 };
1368
1369 r.on_packet_sent(
1370 p,
1371 packet::EPOCH_APPLICATION,
1372 HandshakeStatus::default(),
1373 now,
1374 "",
1375 );
1376 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1377 assert_eq!(r.bytes_in_flight, 2000);
1378
1379 let p = Sent {
1380 pkt_num: 2,
1381 frames: vec![],
1382 time_sent: now,
1383 time_acked: None,
1384 time_lost: None,
1385 size: 1000,
1386 ack_eliciting: true,
1387 in_flight: true,
1388 delivered: 0,
1389 delivered_time: now,
1390 recent_delivered_packet_sent_time: now,
1391 is_app_limited: false,
1392 has_data: false,
1393 };
1394
1395 r.on_packet_sent(
1396 p,
1397 packet::EPOCH_APPLICATION,
1398 HandshakeStatus::default(),
1399 now,
1400 "",
1401 );
1402 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1403 assert_eq!(r.bytes_in_flight, 3000);
1404
1405 let p = Sent {
1406 pkt_num: 3,
1407 frames: vec![],
1408 time_sent: now,
1409 time_acked: None,
1410 time_lost: None,
1411 size: 1000,
1412 ack_eliciting: true,
1413 in_flight: true,
1414 delivered: 0,
1415 delivered_time: now,
1416 recent_delivered_packet_sent_time: now,
1417 is_app_limited: false,
1418 has_data: false,
1419 };
1420
1421 r.on_packet_sent(
1422 p,
1423 packet::EPOCH_APPLICATION,
1424 HandshakeStatus::default(),
1425 now,
1426 "",
1427 );
1428 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1429 assert_eq!(r.bytes_in_flight, 4000);
1430
1431 // Wait for 10ms.
1432 now += Duration::from_millis(10);
1433
1434 // Only the first 2 packets and the last one are acked.
1435 let mut acked = ranges::RangeSet::default();
1436 acked.insert(0..2);
1437 acked.insert(3..4);
1438
1439 assert_eq!(
1440 r.on_ack_received(
1441 &acked,
1442 25,
1443 packet::EPOCH_APPLICATION,
1444 HandshakeStatus::default(),
1445 now,
1446 ""
1447 ),
1448 Ok(())
1449 );
1450
1451 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1452 assert_eq!(r.bytes_in_flight, 1000);
1453 assert_eq!(r.lost_count, 0);
1454
1455 // Wait until loss detection timer expires.
1456 now = r.loss_detection_timer().unwrap();
1457
1458 // Packet is declared lost.
1459 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1460 assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 0);
1461
1462 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1463 assert_eq!(r.bytes_in_flight, 0);
1464
1465 assert_eq!(r.lost_count, 1);
1466 }
1467
1468 #[test]
loss_on_reordering()1469 fn loss_on_reordering() {
1470 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1471 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1472
1473 let mut r = Recovery::new(&cfg);
1474
1475 let mut now = Instant::now();
1476
1477 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1478
1479 // Start by sending a few packets.
1480 let p = Sent {
1481 pkt_num: 0,
1482 frames: vec![],
1483 time_sent: now,
1484 time_acked: None,
1485 time_lost: None,
1486 size: 1000,
1487 ack_eliciting: true,
1488 in_flight: true,
1489 delivered: 0,
1490 delivered_time: now,
1491 recent_delivered_packet_sent_time: now,
1492 is_app_limited: false,
1493 has_data: false,
1494 };
1495
1496 r.on_packet_sent(
1497 p,
1498 packet::EPOCH_APPLICATION,
1499 HandshakeStatus::default(),
1500 now,
1501 "",
1502 );
1503 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1504 assert_eq!(r.bytes_in_flight, 1000);
1505
1506 let p = Sent {
1507 pkt_num: 1,
1508 frames: vec![],
1509 time_sent: now,
1510 time_acked: None,
1511 time_lost: None,
1512 size: 1000,
1513 ack_eliciting: true,
1514 in_flight: true,
1515 delivered: 0,
1516 delivered_time: now,
1517 recent_delivered_packet_sent_time: now,
1518 is_app_limited: false,
1519 has_data: false,
1520 };
1521
1522 r.on_packet_sent(
1523 p,
1524 packet::EPOCH_APPLICATION,
1525 HandshakeStatus::default(),
1526 now,
1527 "",
1528 );
1529 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1530 assert_eq!(r.bytes_in_flight, 2000);
1531
1532 let p = Sent {
1533 pkt_num: 2,
1534 frames: vec![],
1535 time_sent: now,
1536 time_acked: None,
1537 time_lost: None,
1538 size: 1000,
1539 ack_eliciting: true,
1540 in_flight: true,
1541 delivered: 0,
1542 delivered_time: now,
1543 recent_delivered_packet_sent_time: now,
1544 is_app_limited: false,
1545 has_data: false,
1546 };
1547
1548 r.on_packet_sent(
1549 p,
1550 packet::EPOCH_APPLICATION,
1551 HandshakeStatus::default(),
1552 now,
1553 "",
1554 );
1555 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1556 assert_eq!(r.bytes_in_flight, 3000);
1557
1558 let p = Sent {
1559 pkt_num: 3,
1560 frames: vec![],
1561 time_sent: now,
1562 time_acked: None,
1563 time_lost: None,
1564 size: 1000,
1565 ack_eliciting: true,
1566 in_flight: true,
1567 delivered: 0,
1568 delivered_time: now,
1569 recent_delivered_packet_sent_time: now,
1570 is_app_limited: false,
1571 has_data: false,
1572 };
1573
1574 r.on_packet_sent(
1575 p,
1576 packet::EPOCH_APPLICATION,
1577 HandshakeStatus::default(),
1578 now,
1579 "",
1580 );
1581 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1582 assert_eq!(r.bytes_in_flight, 4000);
1583
1584 // Wait for 10ms.
1585 now += Duration::from_millis(10);
1586
1587 // ACKs are reordered.
1588 let mut acked = ranges::RangeSet::default();
1589 acked.insert(2..4);
1590
1591 assert_eq!(
1592 r.on_ack_received(
1593 &acked,
1594 25,
1595 packet::EPOCH_APPLICATION,
1596 HandshakeStatus::default(),
1597 now,
1598 ""
1599 ),
1600 Ok(())
1601 );
1602
1603 now += Duration::from_millis(10);
1604
1605 let mut acked = ranges::RangeSet::default();
1606 acked.insert(0..2);
1607
1608 assert_eq!(
1609 r.on_ack_received(
1610 &acked,
1611 25,
1612 packet::EPOCH_APPLICATION,
1613 HandshakeStatus::default(),
1614 now,
1615 ""
1616 ),
1617 Ok(())
1618 );
1619
1620 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1621 assert_eq!(r.bytes_in_flight, 0);
1622
1623 // Spurious loss.
1624 assert_eq!(r.lost_count, 1);
1625 }
1626
1627 #[test]
pacing()1628 fn pacing() {
1629 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1630 cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC);
1631
1632 let mut r = Recovery::new(&cfg);
1633
1634 let mut now = Instant::now();
1635
1636 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1637
1638 // send out first packet.
1639 let p = Sent {
1640 pkt_num: 0,
1641 frames: vec![],
1642 time_sent: now,
1643 time_acked: None,
1644 time_lost: None,
1645 size: 6500,
1646 ack_eliciting: true,
1647 in_flight: true,
1648 delivered: 0,
1649 delivered_time: now,
1650 recent_delivered_packet_sent_time: now,
1651 is_app_limited: false,
1652 has_data: false,
1653 };
1654
1655 r.on_packet_sent(
1656 p,
1657 packet::EPOCH_APPLICATION,
1658 HandshakeStatus::default(),
1659 now,
1660 "",
1661 );
1662
1663 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1664 assert_eq!(r.bytes_in_flight, 6500);
1665
1666 // First packet will be sent out immidiately.
1667 assert_eq!(r.pacing_rate, 0);
1668 assert_eq!(r.get_packet_send_time().unwrap(), now);
1669
1670 // Wait 50ms for ACK.
1671 now += Duration::from_millis(50);
1672
1673 let mut acked = ranges::RangeSet::default();
1674 acked.insert(0..1);
1675
1676 assert_eq!(
1677 r.on_ack_received(
1678 &acked,
1679 10,
1680 packet::EPOCH_APPLICATION,
1681 HandshakeStatus::default(),
1682 now,
1683 ""
1684 ),
1685 Ok(())
1686 );
1687
1688 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1689 assert_eq!(r.bytes_in_flight, 0);
1690 assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
1691
1692 // Send out second packet.
1693 let p = Sent {
1694 pkt_num: 1,
1695 frames: vec![],
1696 time_sent: now,
1697 time_acked: None,
1698 time_lost: None,
1699 size: 6500,
1700 ack_eliciting: true,
1701 in_flight: true,
1702 delivered: 0,
1703 delivered_time: now,
1704 recent_delivered_packet_sent_time: now,
1705 is_app_limited: false,
1706 has_data: false,
1707 };
1708
1709 r.on_packet_sent(
1710 p,
1711 packet::EPOCH_APPLICATION,
1712 HandshakeStatus::default(),
1713 now,
1714 "",
1715 );
1716
1717 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1718 assert_eq!(r.bytes_in_flight, 6500);
1719
1720 // Pacing is not done during intial phase of connection.
1721 assert_eq!(r.get_packet_send_time().unwrap(), now);
1722
1723 // Send the third packet out.
1724 let p = Sent {
1725 pkt_num: 2,
1726 frames: vec![],
1727 time_sent: now,
1728 time_acked: None,
1729 time_lost: None,
1730 size: 6500,
1731 ack_eliciting: true,
1732 in_flight: true,
1733 delivered: 0,
1734 delivered_time: now,
1735 recent_delivered_packet_sent_time: now,
1736 is_app_limited: false,
1737 has_data: false,
1738 };
1739
1740 r.on_packet_sent(
1741 p,
1742 packet::EPOCH_APPLICATION,
1743 HandshakeStatus::default(),
1744 now,
1745 "",
1746 );
1747
1748 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1749 assert_eq!(r.bytes_in_flight, 13000);
1750 assert_eq!(r.smoothed_rtt.unwrap(), Duration::from_millis(50));
1751
1752 // We pace this outgoing packet. as all conditions for pacing
1753 // are passed.
1754 assert_eq!(r.pacing_rate, (12000.0 / 0.05) as u64);
1755 assert_eq!(
1756 r.get_packet_send_time().unwrap(),
1757 now + Duration::from_micros(
1758 (6500 * 1000000) / (12000.0 / 0.05) as u64
1759 )
1760 );
1761 }
1762 }
1763
1764 mod cubic;
1765 mod delivery_rate;
1766 mod hystart;
1767 mod reno;
1768