1 // Copyright (C) 2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 // * Redistributions of source code must retain the above copyright notice,
9 // this list of conditions and the following disclaimer.
10 //
11 // * Redistributions in binary form must reproduce the above copyright
12 // notice, this list of conditions and the following disclaimer in the
13 // documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27 //! CUBIC Congestion Control
28 //!
29 //! This implementation is based on the following RFC:
30 //! https://tools.ietf.org/html/rfc8312
31 //!
32 //! Note that Slow Start can use HyStart++ when enabled.
33
34 use std::cmp;
35
36 use std::time::Duration;
37 use std::time::Instant;
38
39 use crate::packet;
40 use crate::recovery;
41 use crate::recovery::reno;
42
43 use crate::recovery::Acked;
44 use crate::recovery::CongestionControlOps;
45 use crate::recovery::Recovery;
46
47 pub static CUBIC: CongestionControlOps = CongestionControlOps {
48 on_packet_sent,
49 on_packet_acked,
50 congestion_event,
51 collapse_cwnd,
52 };
53
54 /// CUBIC Constants.
55 ///
56 /// These are recommended value in RFC8312.
57 const BETA_CUBIC: f64 = 0.7;
58
59 const C: f64 = 0.4;
60
61 /// CUBIC State Variables.
62 ///
63 /// We need to keep those variables across the connection.
64 /// k, w_max, w_last_max is described in the RFC.
65 #[derive(Debug, Default)]
66 pub struct State {
67 k: f64,
68
69 w_max: f64,
70
71 w_last_max: f64,
72
73 // Used in CUBIC fix (see on_packet_sent())
74 last_sent_time: Option<Instant>,
75
76 // Store cwnd increment during congestion avoidance.
77 cwnd_inc: usize,
78 }
79
80 /// CUBIC Functions.
81 ///
82 /// Note that these calculations are based on a count of cwnd as bytes,
83 /// not packets.
84 /// Unit of t (duration) and RTT are based on seconds (f64).
85 impl State {
86 // K = cbrt(w_max * (1 - beta_cubic) / C) (Eq. 2)
cubic_k(&self) -> f6487 fn cubic_k(&self) -> f64 {
88 let w_max = self.w_max / recovery::MAX_DATAGRAM_SIZE as f64;
89 libm::cbrt(w_max * (1.0 - BETA_CUBIC) / C)
90 }
91
92 // W_cubic(t) = C * (t - K)^3 - w_max (Eq. 1)
w_cubic(&self, t: Duration) -> f6493 fn w_cubic(&self, t: Duration) -> f64 {
94 let w_max = self.w_max / recovery::MAX_DATAGRAM_SIZE as f64;
95
96 (C * (t.as_secs_f64() - self.k).powi(3) + w_max) *
97 recovery::MAX_DATAGRAM_SIZE as f64
98 }
99
100 // W_est(t) = w_max * beta_cubic + 3 * (1 - beta_cubic) / (1 + beta_cubic) *
101 // (t / RTT) (Eq. 4)
w_est(&self, t: Duration, rtt: Duration) -> f64102 fn w_est(&self, t: Duration, rtt: Duration) -> f64 {
103 let w_max = self.w_max / recovery::MAX_DATAGRAM_SIZE as f64;
104 (w_max * BETA_CUBIC +
105 3.0 * (1.0 - BETA_CUBIC) / (1.0 + BETA_CUBIC) * t.as_secs_f64() /
106 rtt.as_secs_f64()) *
107 recovery::MAX_DATAGRAM_SIZE as f64
108 }
109 }
110
collapse_cwnd(r: &mut Recovery)111 fn collapse_cwnd(r: &mut Recovery) {
112 let cubic = &mut r.cubic_state;
113
114 r.congestion_recovery_start_time = None;
115
116 cubic.w_last_max = r.congestion_window as f64;
117 cubic.w_max = cubic.w_last_max;
118
119 // 4.7 Timeout - reduce ssthresh based on BETA_CUBIC
120 r.ssthresh = (r.congestion_window as f64 * BETA_CUBIC) as usize;
121 r.ssthresh = cmp::max(r.ssthresh, recovery::MINIMUM_WINDOW);
122
123 cubic.cwnd_inc = 0;
124
125 reno::collapse_cwnd(r);
126 }
127
on_packet_sent(r: &mut Recovery, sent_bytes: usize, now: Instant)128 fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, now: Instant) {
129 // See https://github.com/torvalds/linux/commit/30927520dbae297182990bb21d08762bcc35ce1d
130 // First transmit when no packets in flight
131 let cubic = &mut r.cubic_state;
132
133 if let Some(last_sent_time) = cubic.last_sent_time {
134 if r.bytes_in_flight == 0 {
135 let delta = now - last_sent_time;
136
137 // We were application limited (idle) for a while.
138 // Shift epoch start to keep cwnd growth to cubic curve.
139 if let Some(recovery_start_time) = r.congestion_recovery_start_time {
140 if delta.as_nanos() > 0 {
141 r.congestion_recovery_start_time =
142 Some(recovery_start_time + delta);
143 }
144 }
145 }
146 }
147
148 cubic.last_sent_time = Some(now);
149
150 reno::on_packet_sent(r, sent_bytes, now);
151 }
152
on_packet_acked( r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant, )153 fn on_packet_acked(
154 r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant,
155 ) {
156 let in_congestion_recovery = r.in_congestion_recovery(packet.time_sent);
157
158 r.bytes_in_flight = r.bytes_in_flight.saturating_sub(packet.size);
159
160 if in_congestion_recovery {
161 return;
162 }
163
164 if r.app_limited {
165 return;
166 }
167
168 if r.congestion_window < r.ssthresh {
169 // Slow start.
170 if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
171 let (cwnd, ssthresh) = r.hystart_on_packet_acked(packet, now);
172
173 r.congestion_window = cwnd;
174 r.ssthresh = ssthresh;
175 } else {
176 // Reno Slow Start.
177 r.congestion_window += packet.size;
178 }
179 } else {
180 // Congestion avoidance.
181 let ca_start_time;
182
183 // In LSS, use lss_start_time instead of congestion_recovery_start_time.
184 if r.hystart.enabled() &&
185 epoch == packet::EPOCH_APPLICATION &&
186 r.hystart.lss_start_time().is_some()
187 {
188 ca_start_time = r.hystart.lss_start_time().unwrap();
189
190 // Reset w_max and k when LSS started.
191 if r.cubic_state.w_max == 0.0 {
192 r.cubic_state.w_max = r.congestion_window as f64;
193 r.cubic_state.k = 0.0;
194 }
195 } else {
196 match r.congestion_recovery_start_time {
197 Some(t) => ca_start_time = t,
198 None => {
199 // When we come here without congestion_event() triggered,
200 // initialize congestion_recovery_start_time, w_max and k.
201 ca_start_time = now;
202 r.congestion_recovery_start_time = Some(now);
203
204 r.cubic_state.w_max = r.congestion_window as f64;
205 r.cubic_state.k = 0.0;
206 },
207 }
208 }
209
210 let t = now - ca_start_time;
211
212 // w_cubic(t + rtt)
213 let w_cubic = r.cubic_state.w_cubic(t + r.min_rtt);
214
215 // w_est(t)
216 let w_est = r.cubic_state.w_est(t, r.min_rtt);
217
218 let mut cubic_cwnd = r.congestion_window;
219
220 if w_cubic < w_est {
221 // TCP friendly region.
222 cubic_cwnd = cmp::max(cubic_cwnd, w_est as usize);
223 } else if cubic_cwnd < w_cubic as usize {
224 // Concave region or convex region use same increment.
225 let cubic_inc = (w_cubic - cubic_cwnd as f64) / cubic_cwnd as f64 *
226 recovery::MAX_DATAGRAM_SIZE as f64;
227
228 cubic_cwnd += cubic_inc as usize;
229 }
230
231 // When in Limited Slow Start, take the max of CA cwnd and
232 // LSS cwnd.
233 if r.hystart.enabled() &&
234 epoch == packet::EPOCH_APPLICATION &&
235 r.hystart.lss_start_time().is_some()
236 {
237 let (lss_cwnd, _) = r.hystart_on_packet_acked(packet, now);
238
239 cubic_cwnd = cmp::max(cubic_cwnd, lss_cwnd);
240 }
241
242 // Update the increment and increase cwnd by MSS.
243 r.cubic_state.cwnd_inc += cubic_cwnd - r.congestion_window;
244
245 // cwnd_inc can be more than 1 MSS in the late stage of max probing.
246 // however QUIC recovery draft 7.4 (Congestion Avoidance) limits
247 // the increase of cwnd to 1 max packet size per cwnd acknowledged.
248 if r.cubic_state.cwnd_inc >= recovery::MAX_DATAGRAM_SIZE {
249 r.congestion_window += recovery::MAX_DATAGRAM_SIZE;
250 r.cubic_state.cwnd_inc -= recovery::MAX_DATAGRAM_SIZE;
251 }
252 }
253 }
254
congestion_event( r: &mut Recovery, time_sent: Instant, epoch: packet::Epoch, now: Instant, )255 fn congestion_event(
256 r: &mut Recovery, time_sent: Instant, epoch: packet::Epoch, now: Instant,
257 ) {
258 let in_congestion_recovery = r.in_congestion_recovery(time_sent);
259
260 // Start a new congestion event if packet was sent after the
261 // start of the previous congestion recovery period.
262 if !in_congestion_recovery {
263 r.congestion_recovery_start_time = Some(now);
264
265 // Fast convergence
266 if r.cubic_state.w_max < r.cubic_state.w_last_max {
267 r.cubic_state.w_last_max = r.cubic_state.w_max;
268 r.cubic_state.w_max =
269 r.cubic_state.w_max as f64 * (1.0 + BETA_CUBIC) / 2.0;
270 } else {
271 r.cubic_state.w_last_max = r.cubic_state.w_max;
272 }
273
274 r.cubic_state.w_max = r.congestion_window as f64;
275 r.ssthresh = (r.cubic_state.w_max * BETA_CUBIC) as usize;
276 r.ssthresh = cmp::max(r.ssthresh, recovery::MINIMUM_WINDOW);
277 r.congestion_window = r.ssthresh;
278 r.cubic_state.k = r.cubic_state.cubic_k();
279
280 r.cubic_state.cwnd_inc =
281 (r.cubic_state.cwnd_inc as f64 * BETA_CUBIC) as usize;
282
283 if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
284 r.hystart.congestion_event();
285 }
286 }
287 }
288
289 #[cfg(test)]
290 mod tests {
291 use super::*;
292 use crate::recovery::hystart;
293
294 #[test]
cubic_init()295 fn cubic_init() {
296 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
297 cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
298
299 let r = Recovery::new(&cfg);
300
301 assert!(r.cwnd() > 0);
302 assert_eq!(r.bytes_in_flight, 0);
303 }
304
305 #[test]
cubic_send()306 fn cubic_send() {
307 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
308 cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
309
310 let mut r = Recovery::new(&cfg);
311
312 r.on_packet_sent_cc(1000, Instant::now());
313
314 assert_eq!(r.bytes_in_flight, 1000);
315 }
316
317 #[test]
cubic_slow_start()318 fn cubic_slow_start() {
319 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
320 cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
321
322 let mut r = Recovery::new(&cfg);
323 let now = Instant::now();
324
325 let p = recovery::Sent {
326 pkt_num: 0,
327 frames: vec![],
328 time_sent: now,
329 time_acked: None,
330 time_lost: None,
331 size: 5000,
332 ack_eliciting: true,
333 in_flight: true,
334 delivered: 0,
335 delivered_time: now,
336 recent_delivered_packet_sent_time: now,
337 is_app_limited: false,
338 has_data: false,
339 };
340
341 // Send 5k x 4 = 20k, higher than default cwnd(~15k)
342 // to become no longer app limited
343 r.on_packet_sent_cc(p.size, now);
344 r.on_packet_sent_cc(p.size, now);
345 r.on_packet_sent_cc(p.size, now);
346 r.on_packet_sent_cc(p.size, now);
347
348 let cwnd_prev = r.cwnd();
349
350 let acked = vec![Acked {
351 pkt_num: p.pkt_num,
352 time_sent: p.time_sent,
353 size: p.size,
354 }];
355
356 r.on_packets_acked(acked, packet::EPOCH_APPLICATION, now);
357
358 // Check if cwnd increased by packet size (slow start)
359 assert_eq!(r.cwnd(), cwnd_prev + p.size);
360 }
361
362 #[test]
cubic_congestion_event()363 fn cubic_congestion_event() {
364 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
365 cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
366
367 let mut r = Recovery::new(&cfg);
368 let now = Instant::now();
369 let prev_cwnd = r.cwnd();
370
371 r.congestion_event(now, packet::EPOCH_APPLICATION, now);
372
373 // In CUBIC, after congestion event, cwnd will be reduced by (1 -
374 // CUBIC_BETA)
375 assert_eq!(prev_cwnd as f64 * BETA_CUBIC, r.cwnd() as f64);
376 }
377
378 #[test]
cubic_congestion_avoidance()379 fn cubic_congestion_avoidance() {
380 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
381 cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
382
383 let mut r = Recovery::new(&cfg);
384 let now = Instant::now();
385 let prev_cwnd = r.cwnd();
386
387 // Fill up bytes_in_flight to avoid app_limited=true
388 r.on_packet_sent_cc(20000, now);
389
390 // Trigger congestion event to update ssthresh
391 r.congestion_event(now, packet::EPOCH_APPLICATION, now);
392
393 // After congestion event, cwnd will be reduced.
394 let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize;
395 assert_eq!(r.cwnd(), cur_cwnd);
396
397 let rtt = Duration::from_millis(100);
398
399 let acked = vec![Acked {
400 pkt_num: 0,
401 // To exit from recovery
402 time_sent: now + rtt,
403 size: 8000,
404 }];
405
406 // Ack more than cwnd bytes with rtt=100ms
407 r.update_rtt(rtt, Duration::from_millis(0), now);
408 r.on_packets_acked(acked, packet::EPOCH_APPLICATION, now + rtt * 3);
409
410 // After acking more than cwnd, expect cwnd increased by MSS
411 assert_eq!(r.cwnd(), cur_cwnd + recovery::MAX_DATAGRAM_SIZE);
412 }
413
414 #[test]
cubic_collapse_cwnd_and_restart()415 fn cubic_collapse_cwnd_and_restart() {
416 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
417 cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
418
419 let mut r = Recovery::new(&cfg);
420 let now = Instant::now();
421
422 // Fill up bytes_in_flight to avoid app_limited=true
423 r.on_packet_sent_cc(30000, now);
424
425 // Trigger congestion event to update ssthresh
426 r.congestion_event(now, packet::EPOCH_APPLICATION, now);
427
428 // After persistent congestion, cwnd should be MINIMUM_WINDOW
429 r.collapse_cwnd();
430 assert_eq!(r.cwnd(), recovery::MINIMUM_WINDOW);
431
432 let acked = vec![Acked {
433 pkt_num: 0,
434 // To exit from recovery
435 time_sent: now + Duration::from_millis(1),
436 size: 10000,
437 }];
438
439 // rtt = 100ms
440 let rtt = Duration::from_millis(100);
441 std::thread::sleep(rtt);
442
443 // Ack 10000 x 2 to exit from slow start
444 r.on_packets_acked(acked.clone(), packet::EPOCH_APPLICATION, now);
445 std::thread::sleep(rtt);
446
447 // This will make CC into congestion avoidance mode
448 r.on_packets_acked(acked, packet::EPOCH_APPLICATION, now);
449
450 assert_eq!(r.cwnd(), recovery::MINIMUM_WINDOW + 10000);
451 }
452
453 #[test]
cubic_hystart_limited_slow_start()454 fn cubic_hystart_limited_slow_start() {
455 let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
456 cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
457 cfg.enable_hystart(true);
458
459 let mut r = Recovery::new(&cfg);
460 let now = Instant::now();
461 let pkt_num = 0;
462 let epoch = packet::EPOCH_APPLICATION;
463
464 let p = recovery::Sent {
465 pkt_num: 0,
466 frames: vec![],
467 time_sent: now,
468 time_acked: None,
469 time_lost: None,
470 size: recovery::MAX_DATAGRAM_SIZE,
471 ack_eliciting: true,
472 in_flight: true,
473 delivered: 0,
474 delivered_time: now,
475 recent_delivered_packet_sent_time: now,
476 is_app_limited: false,
477 has_data: false,
478 };
479
480 // 1st round.
481 let n_rtt_sample = hystart::N_RTT_SAMPLE;
482 let pkts_1st_round = n_rtt_sample as u64;
483 r.hystart.start_round(pkt_num);
484
485 let rtt_1st = 50;
486
487 // Send 1st round packets.
488 for _ in 0..n_rtt_sample {
489 r.on_packet_sent_cc(p.size, now);
490 }
491
492 // Receving Acks.
493 let now = now + Duration::from_millis(rtt_1st);
494 for _ in 0..n_rtt_sample {
495 r.update_rtt(
496 Duration::from_millis(rtt_1st),
497 Duration::from_millis(0),
498 now,
499 );
500
501 let acked = vec![Acked {
502 pkt_num: p.pkt_num,
503 time_sent: p.time_sent,
504 size: p.size,
505 }];
506
507 r.on_packets_acked(acked, epoch, now);
508 }
509
510 // Not in LSS yet.
511 assert_eq!(r.hystart.lss_start_time().is_some(), false);
512
513 // 2nd round.
514 r.hystart.start_round(pkts_1st_round * 2 + 1);
515
516 let mut rtt_2nd = 100;
517 let now = now + Duration::from_millis(rtt_2nd);
518
519 // Send 2nd round packets.
520 for _ in 0..n_rtt_sample + 1 {
521 r.on_packet_sent_cc(p.size, now);
522 }
523
524 // Receving Acks.
525 // Last ack will cause to exit to LSS.
526 let mut cwnd_prev = r.cwnd();
527
528 for _ in 0..n_rtt_sample + 1 {
529 cwnd_prev = r.cwnd();
530 r.update_rtt(
531 Duration::from_millis(rtt_2nd),
532 Duration::from_millis(0),
533 now,
534 );
535
536 let acked = vec![Acked {
537 pkt_num: p.pkt_num,
538 time_sent: p.time_sent,
539 size: p.size,
540 }];
541
542 r.on_packets_acked(acked, epoch, now);
543
544 // Keep increasing RTT so that hystart exits to LSS.
545 rtt_2nd += 4;
546 }
547
548 // Now we are in LSS.
549 assert_eq!(r.hystart.lss_start_time().is_some(), true);
550 assert_eq!(r.cwnd(), cwnd_prev);
551
552 // Ack'ing more packet to increase cwnd by 1 MSS
553 for _ in 0..3 {
554 let acked = vec![Acked {
555 pkt_num: p.pkt_num,
556 time_sent: p.time_sent,
557 size: p.size,
558 }];
559 r.on_packets_acked(acked, epoch, now);
560 }
561
562 assert_eq!(r.cwnd(), cwnd_prev + recovery::MAX_DATAGRAM_SIZE);
563 }
564 }
565