1 // Copyright (C) 2018-2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 use std::cmp;
28
29 use std::str::FromStr;
30
31 use std::time::Duration;
32 use std::time::Instant;
33
34 use std::collections::VecDeque;
35
36 use crate::Config;
37 use crate::Error;
38 use crate::Result;
39
40 use crate::frame;
41 use crate::minmax;
42 use crate::packet;
43 use crate::ranges;
44
45 // Loss Recovery
46 const PACKET_THRESHOLD: u64 = 3;
47
48 const TIME_THRESHOLD: f64 = 9.0 / 8.0;
49
50 const GRANULARITY: Duration = Duration::from_millis(1);
51
52 const INITIAL_RTT: Duration = Duration::from_millis(333);
53
54 const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3;
55
56 const RTT_WINDOW: Duration = Duration::from_secs(300);
57
58 const MAX_PTO_PROBES_COUNT: usize = 2;
59
60 // Congestion Control
61 const INITIAL_WINDOW_PACKETS: usize = 10;
62
63 const INITIAL_WINDOW: usize = INITIAL_WINDOW_PACKETS * MAX_DATAGRAM_SIZE;
64
65 const MINIMUM_WINDOW: usize = 2 * MAX_DATAGRAM_SIZE;
66
67 const MAX_DATAGRAM_SIZE: usize = 1452;
68
69 const LOSS_REDUCTION_FACTOR: f64 = 0.5;
70
71 pub struct Recovery {
72 loss_detection_timer: Option<Instant>,
73
74 pto_count: u32,
75
76 time_of_last_sent_ack_eliciting_pkt: [Option<Instant>; packet::EPOCH_COUNT],
77
78 largest_acked_pkt: [u64; packet::EPOCH_COUNT],
79
80 largest_sent_pkt: [u64; packet::EPOCH_COUNT],
81
82 latest_rtt: Duration,
83
84 smoothed_rtt: Option<Duration>,
85
86 rttvar: Duration,
87
88 minmax_filter: minmax::Minmax<Duration>,
89
90 min_rtt: Duration,
91
92 pub max_ack_delay: Duration,
93
94 loss_time: [Option<Instant>; packet::EPOCH_COUNT],
95
96 sent: [VecDeque<Sent>; packet::EPOCH_COUNT],
97
98 pub lost: [Vec<frame::Frame>; packet::EPOCH_COUNT],
99
100 pub acked: [Vec<frame::Frame>; packet::EPOCH_COUNT],
101
102 pub lost_count: usize,
103
104 pub loss_probes: [usize; packet::EPOCH_COUNT],
105
106 in_flight_count: [usize; packet::EPOCH_COUNT],
107
108 app_limited: bool,
109
110 delivery_rate: delivery_rate::Rate,
111
112 // Congestion control.
113 cc_ops: &'static CongestionControlOps,
114
115 congestion_window: usize,
116
117 bytes_in_flight: usize,
118
119 ssthresh: usize,
120
121 bytes_acked: usize,
122
123 congestion_recovery_start_time: Option<Instant>,
124
125 cubic_state: cubic::State,
126
127 // HyStart++.
128 hystart: hystart::Hystart,
129 }
130
131 impl Recovery {
new(config: &Config) -> Self132 pub fn new(config: &Config) -> Self {
133 Recovery {
134 loss_detection_timer: None,
135
136 pto_count: 0,
137
138 time_of_last_sent_ack_eliciting_pkt: [None; packet::EPOCH_COUNT],
139
140 largest_acked_pkt: [std::u64::MAX; packet::EPOCH_COUNT],
141
142 largest_sent_pkt: [0; packet::EPOCH_COUNT],
143
144 latest_rtt: Duration::new(0, 0),
145
146 // This field should be initialized to `INITIAL_RTT` for the initial
147 // PTO calculation, but it also needs to be an `Option` to track
148 // whether any RTT sample was received, so the initial value is
149 // handled by the `rtt()` method instead.
150 smoothed_rtt: None,
151
152 minmax_filter: minmax::Minmax::new(Duration::new(0, 0)),
153
154 min_rtt: Duration::new(0, 0),
155
156 rttvar: INITIAL_RTT / 2,
157
158 max_ack_delay: Duration::new(0, 0),
159
160 loss_time: [None; packet::EPOCH_COUNT],
161
162 sent: [VecDeque::new(), VecDeque::new(), VecDeque::new()],
163
164 lost: [Vec::new(), Vec::new(), Vec::new()],
165
166 acked: [Vec::new(), Vec::new(), Vec::new()],
167
168 lost_count: 0,
169
170 loss_probes: [0; packet::EPOCH_COUNT],
171
172 in_flight_count: [0; packet::EPOCH_COUNT],
173
174 congestion_window: INITIAL_WINDOW,
175
176 bytes_in_flight: 0,
177
178 ssthresh: std::usize::MAX,
179
180 bytes_acked: 0,
181
182 congestion_recovery_start_time: None,
183
184 cc_ops: config.cc_algorithm.into(),
185
186 delivery_rate: delivery_rate::Rate::default(),
187
188 cubic_state: cubic::State::default(),
189
190 app_limited: false,
191
192 hystart: hystart::Hystart::new(config.hystart),
193 }
194 }
195
on_packet_sent( &mut self, mut pkt: Sent, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )196 pub fn on_packet_sent(
197 &mut self, mut pkt: Sent, epoch: packet::Epoch,
198 handshake_status: HandshakeStatus, now: Instant, trace_id: &str,
199 ) {
200 let ack_eliciting = pkt.ack_eliciting;
201 let in_flight = pkt.in_flight;
202 let sent_bytes = pkt.size;
203 let pkt_num = pkt.pkt_num;
204
205 self.delivery_rate.on_packet_sent(&mut pkt, now);
206
207 self.largest_sent_pkt[epoch] =
208 cmp::max(self.largest_sent_pkt[epoch], pkt_num);
209
210 self.sent[epoch].push_back(pkt);
211
212 if in_flight {
213 if ack_eliciting {
214 self.time_of_last_sent_ack_eliciting_pkt[epoch] = Some(now);
215 }
216
217 self.in_flight_count[epoch] += 1;
218
219 self.app_limited =
220 (self.bytes_in_flight + sent_bytes) < self.congestion_window;
221
222 self.on_packet_sent_cc(sent_bytes, now);
223
224 self.set_loss_detection_timer(handshake_status, now);
225 }
226
227 // HyStart++: Start of the round in a slow start.
228 if self.hystart.enabled() &&
229 epoch == packet::EPOCH_APPLICATION &&
230 self.congestion_window < self.ssthresh
231 {
232 self.hystart.start_round(pkt_num);
233 }
234
235 trace!("{} {:?}", trace_id, self);
236 }
237
on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant)238 fn on_packet_sent_cc(&mut self, sent_bytes: usize, now: Instant) {
239 (self.cc_ops.on_packet_sent)(self, sent_bytes, now);
240 }
241
on_ack_received( &mut self, ranges: &ranges::RangeSet, ack_delay: u64, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, ) -> Result<()>242 pub fn on_ack_received(
243 &mut self, ranges: &ranges::RangeSet, ack_delay: u64,
244 epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant,
245 trace_id: &str,
246 ) -> Result<()> {
247 let largest_acked = ranges.last().unwrap();
248
249 // If the largest packet number acked exceeds any packet number we have
250 // sent, then the ACK is obviously invalid, so there's no need to
251 // continue further.
252 if largest_acked > self.largest_sent_pkt[epoch] {
253 if cfg!(feature = "fuzzing") {
254 return Ok(());
255 }
256
257 return Err(Error::InvalidPacket);
258 }
259
260 if self.largest_acked_pkt[epoch] == std::u64::MAX {
261 self.largest_acked_pkt[epoch] = largest_acked;
262 } else {
263 self.largest_acked_pkt[epoch] =
264 cmp::max(self.largest_acked_pkt[epoch], largest_acked);
265 }
266
267 let mut has_ack_eliciting = false;
268
269 let mut largest_newly_acked_pkt_num = 0;
270 let mut largest_newly_acked_sent_time = now;
271
272 let mut newly_acked = Vec::new();
273
274 // Detect and mark acked packets, without removing them from the sent
275 // packets list.
276 for r in ranges.iter() {
277 let lowest_acked = r.start;
278 let largest_acked = r.end - 1;
279
280 let unacked_iter = self.sent[epoch]
281 .iter_mut()
282 // Skip packets that precede the lowest acked packet in the block.
283 .skip_while(|p| p.pkt_num < lowest_acked)
284 // Skip packets that follow the largest acked packet in the block.
285 .take_while(|p| p.pkt_num <= largest_acked)
286 // Skip packets that have already been acked or lost.
287 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
288
289 for unacked in unacked_iter {
290 unacked.time_acked = Some(now);
291
292 if unacked.ack_eliciting {
293 has_ack_eliciting = true;
294 }
295
296 largest_newly_acked_pkt_num = unacked.pkt_num;
297 largest_newly_acked_sent_time = unacked.time_sent;
298
299 self.acked[epoch].append(&mut unacked.frames);
300
301 if unacked.in_flight {
302 self.in_flight_count[epoch] =
303 self.in_flight_count[epoch].saturating_sub(1);
304
305 self.delivery_rate.on_packet_acked(&unacked, now);
306 }
307
308 newly_acked.push(Acked {
309 pkt_num: unacked.pkt_num,
310
311 time_sent: unacked.time_sent,
312
313 size: unacked.size,
314 });
315
316 trace!("{} packet newly acked {}", trace_id, unacked.pkt_num);
317 }
318 }
319
320 self.delivery_rate.estimate();
321
322 if newly_acked.is_empty() {
323 return Ok(());
324 }
325
326 if largest_newly_acked_pkt_num == largest_acked && has_ack_eliciting {
327 let latest_rtt = now - largest_newly_acked_sent_time;
328
329 let ack_delay = if epoch == packet::EPOCH_APPLICATION {
330 Duration::from_micros(ack_delay)
331 } else {
332 Duration::from_micros(0)
333 };
334
335 self.update_rtt(latest_rtt, ack_delay, now);
336 }
337
338 // Detect and mark lost packets without removing them from the sent
339 // packets list.
340 self.detect_lost_packets(epoch, now, trace_id);
341
342 self.on_packets_acked(newly_acked, epoch, now);
343
344 self.pto_count = 0;
345
346 self.set_loss_detection_timer(handshake_status, now);
347
348 self.drain_packets(epoch);
349
350 trace!("{} {:?}", trace_id, self);
351
352 Ok(())
353 }
354
on_loss_detection_timeout( &mut self, handshake_status: HandshakeStatus, now: Instant, trace_id: &str, )355 pub fn on_loss_detection_timeout(
356 &mut self, handshake_status: HandshakeStatus, now: Instant,
357 trace_id: &str,
358 ) {
359 let (earliest_loss_time, epoch) = self.loss_time_and_space();
360
361 if earliest_loss_time.is_some() {
362 // Time threshold loss detection.
363 self.detect_lost_packets(epoch, now, trace_id);
364
365 self.set_loss_detection_timer(handshake_status, now);
366
367 trace!("{} {:?}", trace_id, self);
368 return;
369 }
370
371 let epoch = if self.bytes_in_flight > 0 {
372 // Send new data if available, else retransmit old data. If neither
373 // is available, send a single PING frame.
374 let (_, e) = self.pto_time_and_space(handshake_status, now);
375
376 e
377 } else {
378 // Client sends an anti-deadlock packet: Initial is padded to earn
379 // more anti-amplification credit, a Handshake packet proves address
380 // ownership.
381 if handshake_status.has_handshake_keys {
382 packet::EPOCH_HANDSHAKE
383 } else {
384 packet::EPOCH_INITIAL
385 }
386 };
387
388 self.pto_count += 1;
389
390 self.loss_probes[epoch] =
391 cmp::min(self.pto_count as usize, MAX_PTO_PROBES_COUNT);
392
393 let unacked_iter = self.sent[epoch]
394 .iter_mut()
395 // Skip packets that have already been acked or lost, and packets
396 // that don't contain either CRYPTO or STREAM frames.
397 .filter(|p| p.has_data && p.time_acked.is_none() && p.time_lost.is_none())
398 // Only return as many packets as the number of probe packets that
399 // will be sent.
400 .take(self.loss_probes[epoch]);
401
402 // Retransmit the frames from the oldest sent packets on PTO. However
403 // the packets are not actually declared lost (so there is no effect to
404 // congestion control), we just reschedule the data they carried.
405 //
406 // This will also trigger sending an ACK and retransmitting frames like
407 // HANDSHAKE_DONE and MAX_DATA / MAX_STREAM_DATA as well, in addition
408 // to CRYPTO and STREAM, if the original packet carried them.
409 for unacked in unacked_iter {
410 self.lost[epoch].extend_from_slice(&unacked.frames);
411 }
412
413 self.set_loss_detection_timer(handshake_status, now);
414
415 trace!("{} {:?}", trace_id, self);
416 }
417
on_pkt_num_space_discarded( &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus, now: Instant, )418 pub fn on_pkt_num_space_discarded(
419 &mut self, epoch: packet::Epoch, handshake_status: HandshakeStatus,
420 now: Instant,
421 ) {
422 let unacked_bytes = self.sent[epoch]
423 .iter()
424 .filter(|p| {
425 p.in_flight && p.time_acked.is_none() && p.time_lost.is_none()
426 })
427 .fold(0, |acc, p| acc + p.size);
428
429 self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked_bytes);
430
431 self.sent[epoch].clear();
432 self.lost[epoch].clear();
433 self.acked[epoch].clear();
434
435 self.time_of_last_sent_ack_eliciting_pkt[epoch] = None;
436 self.loss_time[epoch] = None;
437 self.loss_probes[epoch] = 0;
438 self.in_flight_count[epoch] = 0;
439
440 self.set_loss_detection_timer(handshake_status, now);
441 }
442
loss_detection_timer(&self) -> Option<Instant>443 pub fn loss_detection_timer(&self) -> Option<Instant> {
444 self.loss_detection_timer
445 }
446
cwnd(&self) -> usize447 pub fn cwnd(&self) -> usize {
448 self.congestion_window
449 }
450
cwnd_available(&self) -> usize451 pub fn cwnd_available(&self) -> usize {
452 // Ignore cwnd when sending probe packets.
453 if self.loss_probes.iter().any(|&x| x > 0) {
454 return std::usize::MAX;
455 }
456
457 self.congestion_window.saturating_sub(self.bytes_in_flight)
458 }
459
rtt(&self) -> Duration460 pub fn rtt(&self) -> Duration {
461 self.smoothed_rtt.unwrap_or(INITIAL_RTT)
462 }
463
pto(&self) -> Duration464 pub fn pto(&self) -> Duration {
465 self.rtt() + cmp::max(self.rttvar * 4, GRANULARITY)
466 }
467
delivery_rate(&self) -> u64468 pub fn delivery_rate(&self) -> u64 {
469 self.delivery_rate.delivery_rate()
470 }
471
update_rtt( &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant, )472 fn update_rtt(
473 &mut self, latest_rtt: Duration, ack_delay: Duration, now: Instant,
474 ) {
475 self.latest_rtt = latest_rtt;
476
477 match self.smoothed_rtt {
478 // First RTT sample.
479 None => {
480 self.min_rtt = self.minmax_filter.reset(now, latest_rtt);
481
482 self.smoothed_rtt = Some(latest_rtt);
483
484 self.rttvar = latest_rtt / 2;
485 },
486
487 Some(srtt) => {
488 self.min_rtt =
489 self.minmax_filter.running_min(RTT_WINDOW, now, latest_rtt);
490
491 let ack_delay = cmp::min(self.max_ack_delay, ack_delay);
492
493 // Adjust for ack delay if plausible.
494 let adjusted_rtt = if latest_rtt > self.min_rtt + ack_delay {
495 latest_rtt - ack_delay
496 } else {
497 latest_rtt
498 };
499
500 self.rttvar = self.rttvar.mul_f64(3.0 / 4.0) +
501 sub_abs(srtt, adjusted_rtt).mul_f64(1.0 / 4.0);
502
503 self.smoothed_rtt = Some(
504 srtt.mul_f64(7.0 / 8.0) + adjusted_rtt.mul_f64(1.0 / 8.0),
505 );
506 },
507 }
508 }
509
loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch)510 fn loss_time_and_space(&self) -> (Option<Instant>, packet::Epoch) {
511 let mut epoch = packet::EPOCH_INITIAL;
512 let mut time = self.loss_time[epoch];
513
514 // Iterate over all packet number spaces starting from Handshake.
515 #[allow(clippy::needless_range_loop)]
516 for e in packet::EPOCH_HANDSHAKE..packet::EPOCH_COUNT {
517 let new_time = self.loss_time[e];
518
519 if time.is_none() || new_time < time {
520 time = new_time;
521 epoch = e;
522 }
523 }
524
525 (time, epoch)
526 }
527
pto_time_and_space( &self, handshake_status: HandshakeStatus, now: Instant, ) -> (Option<Instant>, packet::Epoch)528 fn pto_time_and_space(
529 &self, handshake_status: HandshakeStatus, now: Instant,
530 ) -> (Option<Instant>, packet::Epoch) {
531 let mut duration = self.pto() * 2_u32.pow(self.pto_count);
532
533 // Arm PTO from now when there are no inflight packets.
534 if self.bytes_in_flight == 0 {
535 if handshake_status.has_handshake_keys {
536 return (Some(now + duration), packet::EPOCH_HANDSHAKE);
537 } else {
538 return (Some(now + duration), packet::EPOCH_INITIAL);
539 }
540 }
541
542 let mut pto_timeout = None;
543 let mut pto_space = packet::EPOCH_INITIAL;
544
545 // Iterate over all packet number spaces.
546 for e in packet::EPOCH_INITIAL..packet::EPOCH_COUNT {
547 if self.in_flight_count[e] == 0 {
548 continue;
549 }
550
551 if e == packet::EPOCH_APPLICATION {
552 // Skip Application Data until handshake completes.
553 if !handshake_status.completed {
554 return (pto_timeout, pto_space);
555 }
556
557 // Include max_ack_delay and backoff for Application Data.
558 duration += self.max_ack_delay * 2_u32.pow(self.pto_count);
559 }
560
561 let new_time =
562 self.time_of_last_sent_ack_eliciting_pkt[e].map(|t| t + duration);
563
564 if pto_timeout.is_none() || new_time < pto_timeout {
565 pto_timeout = new_time;
566 pto_space = e;
567 }
568 }
569
570 (pto_timeout, pto_space)
571 }
572
set_loss_detection_timer( &mut self, handshake_status: HandshakeStatus, now: Instant, )573 fn set_loss_detection_timer(
574 &mut self, handshake_status: HandshakeStatus, now: Instant,
575 ) {
576 let (earliest_loss_time, _) = self.loss_time_and_space();
577
578 if earliest_loss_time.is_some() {
579 // Time threshold loss detection.
580 self.loss_detection_timer = earliest_loss_time;
581 return;
582 }
583
584 if self.bytes_in_flight == 0 && handshake_status.peer_verified_address {
585 self.loss_detection_timer = None;
586 return;
587 }
588
589 // PTO timer.
590 let (timeout, _) = self.pto_time_and_space(handshake_status, now);
591 self.loss_detection_timer = timeout;
592 }
593
detect_lost_packets( &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str, )594 fn detect_lost_packets(
595 &mut self, epoch: packet::Epoch, now: Instant, trace_id: &str,
596 ) {
597 let largest_acked = self.largest_acked_pkt[epoch];
598
599 self.loss_time[epoch] = None;
600
601 let loss_delay =
602 cmp::max(self.latest_rtt, self.rtt()).mul_f64(TIME_THRESHOLD);
603
604 // Minimum time of kGranularity before packets are deemed lost.
605 let loss_delay = cmp::max(loss_delay, GRANULARITY);
606
607 // Packets sent before this time are deemed lost.
608 let lost_send_time = now - loss_delay;
609
610 let mut lost_bytes = 0;
611
612 let mut largest_lost_pkt = None;
613
614 let unacked_iter = self.sent[epoch]
615 .iter_mut()
616 // Skip packets that follow the largest acked packet.
617 .take_while(|p| p.pkt_num <= largest_acked)
618 // Skip packets that have already been acked or lost.
619 .filter(|p| p.time_acked.is_none() && p.time_lost.is_none());
620
621 for unacked in unacked_iter {
622 // Mark packet as lost, or set time when it should be marked.
623 if unacked.time_sent <= lost_send_time ||
624 largest_acked >= unacked.pkt_num + PACKET_THRESHOLD
625 {
626 self.lost[epoch].append(&mut unacked.frames);
627
628 unacked.time_lost = Some(now);
629
630 if unacked.in_flight {
631 lost_bytes += unacked.size;
632
633 // Frames have already been removed from the packet, so
634 // cloning the whole packet should be relatively cheap.
635 largest_lost_pkt = Some(unacked.clone());
636
637 self.in_flight_count[epoch] =
638 self.in_flight_count[epoch].saturating_sub(1);
639
640 trace!(
641 "{} packet {} lost on epoch {}",
642 trace_id,
643 unacked.pkt_num,
644 epoch
645 );
646 }
647
648 self.lost_count += 1;
649 } else {
650 let loss_time = match self.loss_time[epoch] {
651 None => unacked.time_sent + loss_delay,
652
653 Some(loss_time) =>
654 cmp::min(loss_time, unacked.time_sent + loss_delay),
655 };
656
657 self.loss_time[epoch] = Some(loss_time);
658 }
659 }
660
661 if let Some(pkt) = largest_lost_pkt {
662 self.on_packets_lost(lost_bytes, &pkt, epoch, now);
663 }
664
665 self.drain_packets(epoch);
666 }
667
drain_packets(&mut self, epoch: packet::Epoch)668 fn drain_packets(&mut self, epoch: packet::Epoch) {
669 let mut lowest_non_expired_pkt_index = self.sent[epoch].len();
670
671 // In order to avoid removing elements from the middle of the list
672 // (which would require copying other elements to compact the list),
673 // we only remove a contiguous range of elements from the start of the
674 // list.
675 //
676 // This means that acked or lost elements coming after this will not
677 // be removed at this point, but their removal is delayed for a later
678 // time, once the gaps have been filled.
679
680 // First, find the first element that is neither acked nor lost.
681 for (i, pkt) in self.sent[epoch].iter().enumerate() {
682 if pkt.time_acked.is_none() && pkt.time_lost.is_none() {
683 lowest_non_expired_pkt_index = i;
684 break;
685 }
686 }
687
688 // Then remove elements up to the previously found index.
689 self.sent[epoch].drain(..lowest_non_expired_pkt_index);
690 }
691
on_packets_acked( &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant, )692 fn on_packets_acked(
693 &mut self, acked: Vec<Acked>, epoch: packet::Epoch, now: Instant,
694 ) {
695 for pkt in acked {
696 (self.cc_ops.on_packet_acked)(self, &pkt, epoch, now);
697 }
698 }
699
in_congestion_recovery(&self, sent_time: Instant) -> bool700 fn in_congestion_recovery(&self, sent_time: Instant) -> bool {
701 match self.congestion_recovery_start_time {
702 Some(congestion_recovery_start_time) =>
703 sent_time <= congestion_recovery_start_time,
704
705 None => false,
706 }
707 }
708
in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool709 fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool {
710 let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD;
711
712 // TODO: properly detect persistent congestion
713 false
714 }
715
on_packets_lost( &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, epoch: packet::Epoch, now: Instant, )716 fn on_packets_lost(
717 &mut self, lost_bytes: usize, largest_lost_pkt: &Sent,
718 epoch: packet::Epoch, now: Instant,
719 ) {
720 self.bytes_in_flight = self.bytes_in_flight.saturating_sub(lost_bytes);
721
722 self.congestion_event(largest_lost_pkt.time_sent, epoch, now);
723
724 if self.in_persistent_congestion(largest_lost_pkt.pkt_num) {
725 self.collapse_cwnd();
726 }
727 }
728
congestion_event( &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant, )729 fn congestion_event(
730 &mut self, time_sent: Instant, epoch: packet::Epoch, now: Instant,
731 ) {
732 (self.cc_ops.congestion_event)(self, time_sent, epoch, now);
733 }
734
collapse_cwnd(&mut self)735 fn collapse_cwnd(&mut self) {
736 (self.cc_ops.collapse_cwnd)(self);
737 }
738
rate_check_app_limited(&mut self)739 pub fn rate_check_app_limited(&mut self) {
740 if self.app_limited {
741 self.delivery_rate.check_app_limited(self.bytes_in_flight)
742 }
743 }
744
hystart_on_packet_acked( &mut self, packet: &Acked, now: Instant, ) -> (usize, usize)745 fn hystart_on_packet_acked(
746 &mut self, packet: &Acked, now: Instant,
747 ) -> (usize, usize) {
748 self.hystart.on_packet_acked(
749 packet,
750 self.latest_rtt,
751 self.congestion_window,
752 self.ssthresh,
753 now,
754 )
755 }
756
update_app_limited(&mut self, v: bool)757 pub fn update_app_limited(&mut self, v: bool) {
758 self.app_limited = v;
759 }
760
app_limited(&mut self) -> bool761 pub fn app_limited(&mut self) -> bool {
762 self.app_limited
763 }
764
765 #[cfg(feature = "qlog")]
to_qlog(&self) -> qlog::event::Event766 pub fn to_qlog(&self) -> qlog::event::Event {
767 // QVis can't use all these fields and they can be large.
768 qlog::event::Event::metrics_updated(
769 Some(self.min_rtt.as_millis() as u64),
770 Some(self.rtt().as_millis() as u64),
771 Some(self.latest_rtt.as_millis() as u64),
772 Some(self.rttvar.as_millis() as u64),
773 None, // delay
774 None, // probe_count
775 Some(self.cwnd() as u64),
776 Some(self.bytes_in_flight as u64),
777 None, // ssthresh
778 None, // packets_in_flight
779 None, // in_recovery
780 None, // pacing_rate
781 )
782 }
783 }
784
785 /// Available congestion control algorithms.
786 ///
787 /// This enum provides currently available list of congestion control
788 /// algorithms.
789 #[derive(Debug, Copy, Clone, PartialEq)]
790 #[repr(C)]
791 pub enum CongestionControlAlgorithm {
792 /// Reno congestion control algorithm. `reno` in a string form.
793 Reno = 0,
794 /// CUBIC congestion control algorithm (default). `cubic` in a string form.
795 CUBIC = 1,
796 }
797
798 impl FromStr for CongestionControlAlgorithm {
799 type Err = crate::Error;
800
801 /// Converts a string to `CongestionControlAlgorithm`.
802 ///
803 /// If `name` is not valid, `Error::CongestionControl` is returned.
from_str(name: &str) -> std::result::Result<Self, Self::Err>804 fn from_str(name: &str) -> std::result::Result<Self, Self::Err> {
805 match name {
806 "reno" => Ok(CongestionControlAlgorithm::Reno),
807 "cubic" => Ok(CongestionControlAlgorithm::CUBIC),
808
809 _ => Err(crate::Error::CongestionControl),
810 }
811 }
812 }
813
814 pub struct CongestionControlOps {
815 pub on_packet_sent: fn(r: &mut Recovery, sent_bytes: usize, now: Instant),
816
817 pub on_packet_acked:
818 fn(r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant),
819
820 pub congestion_event: fn(
821 r: &mut Recovery,
822 time_sent: Instant,
823 epoch: packet::Epoch,
824 now: Instant,
825 ),
826
827 pub collapse_cwnd: fn(r: &mut Recovery),
828 }
829
830 impl From<CongestionControlAlgorithm> for &'static CongestionControlOps {
from(algo: CongestionControlAlgorithm) -> Self831 fn from(algo: CongestionControlAlgorithm) -> Self {
832 match algo {
833 CongestionControlAlgorithm::Reno => &reno::RENO,
834 CongestionControlAlgorithm::CUBIC => &cubic::CUBIC,
835 }
836 }
837 }
838
839 impl std::fmt::Debug for Recovery {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result840 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
841 match self.loss_detection_timer {
842 Some(v) => {
843 let now = Instant::now();
844
845 if v > now {
846 let d = v.duration_since(now);
847 write!(f, "timer={:?} ", d)?;
848 } else {
849 write!(f, "timer=exp ")?;
850 }
851 },
852
853 None => {
854 write!(f, "timer=none ")?;
855 },
856 };
857
858 write!(f, "latest_rtt={:?} ", self.latest_rtt)?;
859 write!(f, "srtt={:?} ", self.smoothed_rtt)?;
860 write!(f, "min_rtt={:?} ", self.min_rtt)?;
861 write!(f, "rttvar={:?} ", self.rttvar)?;
862 write!(f, "loss_time={:?} ", self.loss_time)?;
863 write!(f, "loss_probes={:?} ", self.loss_probes)?;
864 write!(f, "cwnd={} ", self.congestion_window)?;
865 write!(f, "ssthresh={} ", self.ssthresh)?;
866 write!(f, "bytes_in_flight={} ", self.bytes_in_flight)?;
867 write!(f, "app_limited={} ", self.app_limited)?;
868 write!(
869 f,
870 "congestion_recovery_start_time={:?} ",
871 self.congestion_recovery_start_time
872 )?;
873 write!(f, "{:?} ", self.delivery_rate)?;
874
875 if self.hystart.enabled() {
876 write!(f, "hystart={:?} ", self.hystart)?;
877 }
878
879 Ok(())
880 }
881 }
882
883 #[derive(Clone)]
884 pub struct Sent {
885 pub pkt_num: u64,
886
887 pub frames: Vec<frame::Frame>,
888
889 pub time_sent: Instant,
890
891 pub time_acked: Option<Instant>,
892
893 pub time_lost: Option<Instant>,
894
895 pub size: usize,
896
897 pub ack_eliciting: bool,
898
899 pub in_flight: bool,
900
901 pub delivered: usize,
902
903 pub delivered_time: Instant,
904
905 pub recent_delivered_packet_sent_time: Instant,
906
907 pub is_app_limited: bool,
908
909 pub has_data: bool,
910 }
911
912 impl std::fmt::Debug for Sent {
fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result913 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
914 write!(f, "pkt_num={:?} ", self.pkt_num)?;
915 write!(f, "pkt_sent_time={:?} ", self.time_sent.elapsed())?;
916 write!(f, "pkt_size={:?} ", self.size)?;
917 write!(f, "delivered={:?} ", self.delivered)?;
918 write!(f, "delivered_time={:?} ", self.delivered_time.elapsed())?;
919 write!(
920 f,
921 "recent_delivered_packet_sent_time={:?} ",
922 self.recent_delivered_packet_sent_time.elapsed()
923 )?;
924 write!(f, "is_app_limited={} ", self.is_app_limited)?;
925 write!(f, "has_data={} ", self.has_data)?;
926
927 Ok(())
928 }
929 }
930
931 #[derive(Clone)]
932 pub struct Acked {
933 pub pkt_num: u64,
934
935 pub time_sent: Instant,
936
937 pub size: usize,
938 }
939
940 #[derive(Clone, Copy, Debug)]
941 pub struct HandshakeStatus {
942 pub has_handshake_keys: bool,
943
944 pub peer_verified_address: bool,
945
946 pub completed: bool,
947 }
948
949 #[cfg(test)]
950 impl Default for HandshakeStatus {
default() -> HandshakeStatus951 fn default() -> HandshakeStatus {
952 HandshakeStatus {
953 has_handshake_keys: true,
954
955 peer_verified_address: true,
956
957 completed: true,
958 }
959 }
960 }
961
sub_abs(lhs: Duration, rhs: Duration) -> Duration962 fn sub_abs(lhs: Duration, rhs: Duration) -> Duration {
963 if lhs > rhs {
964 lhs - rhs
965 } else {
966 rhs - lhs
967 }
968 }
969
970 #[cfg(test)]
971 mod tests {
972 use super::*;
973
974 #[test]
lookup_cc_algo_ok()975 fn lookup_cc_algo_ok() {
976 let algo = CongestionControlAlgorithm::from_str("reno").unwrap();
977 assert_eq!(algo, CongestionControlAlgorithm::Reno);
978 }
979
980 #[test]
lookup_cc_algo_bad()981 fn lookup_cc_algo_bad() {
982 assert_eq!(
983 CongestionControlAlgorithm::from_str("???"),
984 Err(Error::CongestionControl)
985 );
986 }
987
988 #[test]
collapse_cwnd()989 fn collapse_cwnd() {
990 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
991 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
992
993 let mut r = Recovery::new(&cfg);
994
995 // cwnd will be reset.
996 r.collapse_cwnd();
997 assert_eq!(r.cwnd(), MINIMUM_WINDOW);
998 }
999
1000 #[test]
loss_on_pto()1001 fn loss_on_pto() {
1002 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1003 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1004
1005 let mut r = Recovery::new(&cfg);
1006
1007 let mut now = Instant::now();
1008
1009 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1010
1011 // Start by sending a few packets.
1012 let p = Sent {
1013 pkt_num: 0,
1014 frames: vec![],
1015 time_sent: now,
1016 time_acked: None,
1017 time_lost: None,
1018 size: 1000,
1019 ack_eliciting: true,
1020 in_flight: true,
1021 delivered: 0,
1022 delivered_time: now,
1023 recent_delivered_packet_sent_time: now,
1024 is_app_limited: false,
1025 has_data: false,
1026 };
1027
1028 r.on_packet_sent(
1029 p,
1030 packet::EPOCH_APPLICATION,
1031 HandshakeStatus::default(),
1032 now,
1033 "",
1034 );
1035 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1036 assert_eq!(r.bytes_in_flight, 1000);
1037
1038 let p = Sent {
1039 pkt_num: 1,
1040 frames: vec![],
1041 time_sent: now,
1042 time_acked: None,
1043 time_lost: None,
1044 size: 1000,
1045 ack_eliciting: true,
1046 in_flight: true,
1047 delivered: 0,
1048 delivered_time: now,
1049 recent_delivered_packet_sent_time: now,
1050 is_app_limited: false,
1051 has_data: false,
1052 };
1053
1054 r.on_packet_sent(
1055 p,
1056 packet::EPOCH_APPLICATION,
1057 HandshakeStatus::default(),
1058 now,
1059 "",
1060 );
1061 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1062 assert_eq!(r.bytes_in_flight, 2000);
1063
1064 let p = Sent {
1065 pkt_num: 2,
1066 frames: vec![],
1067 time_sent: now,
1068 time_acked: None,
1069 time_lost: None,
1070 size: 1000,
1071 ack_eliciting: true,
1072 in_flight: true,
1073 delivered: 0,
1074 delivered_time: now,
1075 recent_delivered_packet_sent_time: now,
1076 is_app_limited: false,
1077 has_data: false,
1078 };
1079
1080 r.on_packet_sent(
1081 p,
1082 packet::EPOCH_APPLICATION,
1083 HandshakeStatus::default(),
1084 now,
1085 "",
1086 );
1087 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1088 assert_eq!(r.bytes_in_flight, 3000);
1089
1090 let p = Sent {
1091 pkt_num: 3,
1092 frames: vec![],
1093 time_sent: now,
1094 time_acked: None,
1095 time_lost: None,
1096 size: 1000,
1097 ack_eliciting: true,
1098 in_flight: true,
1099 delivered: 0,
1100 delivered_time: now,
1101 recent_delivered_packet_sent_time: now,
1102 is_app_limited: false,
1103 has_data: false,
1104 };
1105
1106 r.on_packet_sent(
1107 p,
1108 packet::EPOCH_APPLICATION,
1109 HandshakeStatus::default(),
1110 now,
1111 "",
1112 );
1113 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1114 assert_eq!(r.bytes_in_flight, 4000);
1115
1116 // Wait for 10ms.
1117 now += Duration::from_millis(10);
1118
1119 // Only the first 2 packets are acked.
1120 let mut acked = ranges::RangeSet::default();
1121 acked.insert(0..2);
1122
1123 assert_eq!(
1124 r.on_ack_received(
1125 &acked,
1126 25,
1127 packet::EPOCH_APPLICATION,
1128 HandshakeStatus::default(),
1129 now,
1130 ""
1131 ),
1132 Ok(())
1133 );
1134
1135 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1136 assert_eq!(r.bytes_in_flight, 2000);
1137 assert_eq!(r.lost_count, 0);
1138
1139 // Wait until loss detection timer expires.
1140 now = r.loss_detection_timer().unwrap();
1141
1142 // PTO.
1143 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1144 assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 1);
1145 assert_eq!(r.lost_count, 0);
1146 assert_eq!(r.pto_count, 1);
1147
1148 let p = Sent {
1149 pkt_num: 4,
1150 frames: vec![],
1151 time_sent: now,
1152 time_acked: None,
1153 time_lost: None,
1154 size: 1000,
1155 ack_eliciting: true,
1156 in_flight: true,
1157 delivered: 0,
1158 delivered_time: now,
1159 recent_delivered_packet_sent_time: now,
1160 is_app_limited: false,
1161 has_data: false,
1162 };
1163
1164 r.on_packet_sent(
1165 p,
1166 packet::EPOCH_APPLICATION,
1167 HandshakeStatus::default(),
1168 now,
1169 "",
1170 );
1171 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1172 assert_eq!(r.bytes_in_flight, 3000);
1173
1174 let p = Sent {
1175 pkt_num: 5,
1176 frames: vec![],
1177 time_sent: now,
1178 time_acked: None,
1179 time_lost: None,
1180 size: 1000,
1181 ack_eliciting: true,
1182 in_flight: true,
1183 delivered: 0,
1184 delivered_time: now,
1185 recent_delivered_packet_sent_time: now,
1186 is_app_limited: false,
1187 has_data: false,
1188 };
1189
1190 r.on_packet_sent(
1191 p,
1192 packet::EPOCH_APPLICATION,
1193 HandshakeStatus::default(),
1194 now,
1195 "",
1196 );
1197 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1198 assert_eq!(r.bytes_in_flight, 4000);
1199 assert_eq!(r.lost_count, 0);
1200
1201 // Wait for 10ms.
1202 now += Duration::from_millis(10);
1203
1204 // PTO packets are acked.
1205 let mut acked = ranges::RangeSet::default();
1206 acked.insert(4..6);
1207
1208 assert_eq!(
1209 r.on_ack_received(
1210 &acked,
1211 25,
1212 packet::EPOCH_APPLICATION,
1213 HandshakeStatus::default(),
1214 now,
1215 ""
1216 ),
1217 Ok(())
1218 );
1219
1220 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1221 assert_eq!(r.bytes_in_flight, 0);
1222
1223 assert_eq!(r.lost_count, 2);
1224 }
1225
1226 #[test]
loss_on_timer()1227 fn loss_on_timer() {
1228 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1229 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1230
1231 let mut r = Recovery::new(&cfg);
1232
1233 let mut now = Instant::now();
1234
1235 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1236
1237 // Start by sending a few packets.
1238 let p = Sent {
1239 pkt_num: 0,
1240 frames: vec![],
1241 time_sent: now,
1242 time_acked: None,
1243 time_lost: None,
1244 size: 1000,
1245 ack_eliciting: true,
1246 in_flight: true,
1247 delivered: 0,
1248 delivered_time: now,
1249 recent_delivered_packet_sent_time: now,
1250 is_app_limited: false,
1251 has_data: false,
1252 };
1253
1254 r.on_packet_sent(
1255 p,
1256 packet::EPOCH_APPLICATION,
1257 HandshakeStatus::default(),
1258 now,
1259 "",
1260 );
1261 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1262 assert_eq!(r.bytes_in_flight, 1000);
1263
1264 let p = Sent {
1265 pkt_num: 1,
1266 frames: vec![],
1267 time_sent: now,
1268 time_acked: None,
1269 time_lost: None,
1270 size: 1000,
1271 ack_eliciting: true,
1272 in_flight: true,
1273 delivered: 0,
1274 delivered_time: now,
1275 recent_delivered_packet_sent_time: now,
1276 is_app_limited: false,
1277 has_data: false,
1278 };
1279
1280 r.on_packet_sent(
1281 p,
1282 packet::EPOCH_APPLICATION,
1283 HandshakeStatus::default(),
1284 now,
1285 "",
1286 );
1287 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1288 assert_eq!(r.bytes_in_flight, 2000);
1289
1290 let p = Sent {
1291 pkt_num: 2,
1292 frames: vec![],
1293 time_sent: now,
1294 time_acked: None,
1295 time_lost: None,
1296 size: 1000,
1297 ack_eliciting: true,
1298 in_flight: true,
1299 delivered: 0,
1300 delivered_time: now,
1301 recent_delivered_packet_sent_time: now,
1302 is_app_limited: false,
1303 has_data: false,
1304 };
1305
1306 r.on_packet_sent(
1307 p,
1308 packet::EPOCH_APPLICATION,
1309 HandshakeStatus::default(),
1310 now,
1311 "",
1312 );
1313 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1314 assert_eq!(r.bytes_in_flight, 3000);
1315
1316 let p = Sent {
1317 pkt_num: 3,
1318 frames: vec![],
1319 time_sent: now,
1320 time_acked: None,
1321 time_lost: None,
1322 size: 1000,
1323 ack_eliciting: true,
1324 in_flight: true,
1325 delivered: 0,
1326 delivered_time: now,
1327 recent_delivered_packet_sent_time: now,
1328 is_app_limited: false,
1329 has_data: false,
1330 };
1331
1332 r.on_packet_sent(
1333 p,
1334 packet::EPOCH_APPLICATION,
1335 HandshakeStatus::default(),
1336 now,
1337 "",
1338 );
1339 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1340 assert_eq!(r.bytes_in_flight, 4000);
1341
1342 // Wait for 10ms.
1343 now += Duration::from_millis(10);
1344
1345 // Only the first 2 packets and the last one are acked.
1346 let mut acked = ranges::RangeSet::default();
1347 acked.insert(0..2);
1348 acked.insert(3..4);
1349
1350 assert_eq!(
1351 r.on_ack_received(
1352 &acked,
1353 25,
1354 packet::EPOCH_APPLICATION,
1355 HandshakeStatus::default(),
1356 now,
1357 ""
1358 ),
1359 Ok(())
1360 );
1361
1362 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1363 assert_eq!(r.bytes_in_flight, 1000);
1364 assert_eq!(r.lost_count, 0);
1365
1366 // Wait until loss detection timer expires.
1367 now = r.loss_detection_timer().unwrap();
1368
1369 // Packet is declared lost.
1370 r.on_loss_detection_timeout(HandshakeStatus::default(), now, "");
1371 assert_eq!(r.loss_probes[packet::EPOCH_APPLICATION], 0);
1372
1373 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1374 assert_eq!(r.bytes_in_flight, 0);
1375
1376 assert_eq!(r.lost_count, 1);
1377 }
1378
1379 #[test]
loss_on_reordering()1380 fn loss_on_reordering() {
1381 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
1382 cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno);
1383
1384 let mut r = Recovery::new(&cfg);
1385
1386 let mut now = Instant::now();
1387
1388 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1389
1390 // Start by sending a few packets.
1391 let p = Sent {
1392 pkt_num: 0,
1393 frames: vec![],
1394 time_sent: now,
1395 time_acked: None,
1396 time_lost: None,
1397 size: 1000,
1398 ack_eliciting: true,
1399 in_flight: true,
1400 delivered: 0,
1401 delivered_time: now,
1402 recent_delivered_packet_sent_time: now,
1403 is_app_limited: false,
1404 has_data: false,
1405 };
1406
1407 r.on_packet_sent(
1408 p,
1409 packet::EPOCH_APPLICATION,
1410 HandshakeStatus::default(),
1411 now,
1412 "",
1413 );
1414 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 1);
1415 assert_eq!(r.bytes_in_flight, 1000);
1416
1417 let p = Sent {
1418 pkt_num: 1,
1419 frames: vec![],
1420 time_sent: now,
1421 time_acked: None,
1422 time_lost: None,
1423 size: 1000,
1424 ack_eliciting: true,
1425 in_flight: true,
1426 delivered: 0,
1427 delivered_time: now,
1428 recent_delivered_packet_sent_time: now,
1429 is_app_limited: false,
1430 has_data: false,
1431 };
1432
1433 r.on_packet_sent(
1434 p,
1435 packet::EPOCH_APPLICATION,
1436 HandshakeStatus::default(),
1437 now,
1438 "",
1439 );
1440 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 2);
1441 assert_eq!(r.bytes_in_flight, 2000);
1442
1443 let p = Sent {
1444 pkt_num: 2,
1445 frames: vec![],
1446 time_sent: now,
1447 time_acked: None,
1448 time_lost: None,
1449 size: 1000,
1450 ack_eliciting: true,
1451 in_flight: true,
1452 delivered: 0,
1453 delivered_time: now,
1454 recent_delivered_packet_sent_time: now,
1455 is_app_limited: false,
1456 has_data: false,
1457 };
1458
1459 r.on_packet_sent(
1460 p,
1461 packet::EPOCH_APPLICATION,
1462 HandshakeStatus::default(),
1463 now,
1464 "",
1465 );
1466 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 3);
1467 assert_eq!(r.bytes_in_flight, 3000);
1468
1469 let p = Sent {
1470 pkt_num: 3,
1471 frames: vec![],
1472 time_sent: now,
1473 time_acked: None,
1474 time_lost: None,
1475 size: 1000,
1476 ack_eliciting: true,
1477 in_flight: true,
1478 delivered: 0,
1479 delivered_time: now,
1480 recent_delivered_packet_sent_time: now,
1481 is_app_limited: false,
1482 has_data: false,
1483 };
1484
1485 r.on_packet_sent(
1486 p,
1487 packet::EPOCH_APPLICATION,
1488 HandshakeStatus::default(),
1489 now,
1490 "",
1491 );
1492 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 4);
1493 assert_eq!(r.bytes_in_flight, 4000);
1494
1495 // Wait for 10ms.
1496 now += Duration::from_millis(10);
1497
1498 // ACKs are reordered.
1499 let mut acked = ranges::RangeSet::default();
1500 acked.insert(2..4);
1501
1502 assert_eq!(
1503 r.on_ack_received(
1504 &acked,
1505 25,
1506 packet::EPOCH_APPLICATION,
1507 HandshakeStatus::default(),
1508 now,
1509 ""
1510 ),
1511 Ok(())
1512 );
1513
1514 now += Duration::from_millis(10);
1515
1516 let mut acked = ranges::RangeSet::default();
1517 acked.insert(0..2);
1518
1519 assert_eq!(
1520 r.on_ack_received(
1521 &acked,
1522 25,
1523 packet::EPOCH_APPLICATION,
1524 HandshakeStatus::default(),
1525 now,
1526 ""
1527 ),
1528 Ok(())
1529 );
1530
1531 assert_eq!(r.sent[packet::EPOCH_APPLICATION].len(), 0);
1532 assert_eq!(r.bytes_in_flight, 0);
1533
1534 // Spurious loss.
1535 assert_eq!(r.lost_count, 1);
1536 }
1537 }
1538
1539 mod cubic;
1540 mod delivery_rate;
1541 mod hystart;
1542 mod reno;
1543