• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2019, Cloudflare, Inc.
2 // All rights reserved.
3 //
4 // Redistribution and use in source and binary forms, with or without
5 // modification, are permitted provided that the following conditions are
6 // met:
7 //
8 //     * Redistributions of source code must retain the above copyright notice,
9 //       this list of conditions and the following disclaimer.
10 //
11 //     * Redistributions in binary form must reproduce the above copyright
12 //       notice, this list of conditions and the following disclaimer in the
13 //       documentation and/or other materials provided with the distribution.
14 //
15 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16 // IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17 // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 
27 //! CUBIC Congestion Control
28 //!
29 //! This implementation is based on the following RFC:
30 //! https://tools.ietf.org/html/rfc8312
31 //!
32 //! Note that Slow Start can use HyStart++ when enabled.
33 
34 use std::cmp;
35 
36 use std::time::Duration;
37 use std::time::Instant;
38 
39 use crate::packet;
40 use crate::recovery;
41 use crate::recovery::reno;
42 
43 use crate::recovery::Acked;
44 use crate::recovery::CongestionControlOps;
45 use crate::recovery::Recovery;
46 
47 pub static CUBIC: CongestionControlOps = CongestionControlOps {
48     on_packet_sent,
49     on_packet_acked,
50     congestion_event,
51     collapse_cwnd,
52 };
53 
54 /// CUBIC Constants.
55 ///
56 /// These are recommended value in RFC8312.
57 const BETA_CUBIC: f64 = 0.7;
58 
59 const C: f64 = 0.4;
60 
61 /// CUBIC State Variables.
62 ///
63 /// We need to keep those variables across the connection.
64 /// k, w_max, w_last_max is described in the RFC.
65 #[derive(Debug, Default)]
66 pub struct State {
67     k: f64,
68 
69     w_max: f64,
70 
71     w_last_max: f64,
72 
73     // Used in CUBIC fix (see on_packet_sent())
74     last_sent_time: Option<Instant>,
75 
76     // Store cwnd increment during congestion avoidance.
77     cwnd_inc: usize,
78 }
79 
80 /// CUBIC Functions.
81 ///
82 /// Note that these calculations are based on a count of cwnd as bytes,
83 /// not packets.
84 /// Unit of t (duration) and RTT are based on seconds (f64).
85 impl State {
86     // K = cbrt(w_max * (1 - beta_cubic) / C) (Eq. 2)
cubic_k(&self) -> f6487     fn cubic_k(&self) -> f64 {
88         let w_max = self.w_max / recovery::MAX_DATAGRAM_SIZE as f64;
89         libm::cbrt(w_max * (1.0 - BETA_CUBIC) / C)
90     }
91 
92     // W_cubic(t) = C * (t - K)^3 - w_max (Eq. 1)
w_cubic(&self, t: Duration) -> f6493     fn w_cubic(&self, t: Duration) -> f64 {
94         let w_max = self.w_max / recovery::MAX_DATAGRAM_SIZE as f64;
95 
96         (C * (t.as_secs_f64() - self.k).powi(3) + w_max) *
97             recovery::MAX_DATAGRAM_SIZE as f64
98     }
99 
100     // W_est(t) = w_max * beta_cubic + 3 * (1 - beta_cubic) / (1 + beta_cubic) *
101     // (t / RTT) (Eq. 4)
w_est(&self, t: Duration, rtt: Duration) -> f64102     fn w_est(&self, t: Duration, rtt: Duration) -> f64 {
103         let w_max = self.w_max / recovery::MAX_DATAGRAM_SIZE as f64;
104         (w_max * BETA_CUBIC +
105             3.0 * (1.0 - BETA_CUBIC) / (1.0 + BETA_CUBIC) * t.as_secs_f64() /
106                 rtt.as_secs_f64()) *
107             recovery::MAX_DATAGRAM_SIZE as f64
108     }
109 }
110 
collapse_cwnd(r: &mut Recovery)111 fn collapse_cwnd(r: &mut Recovery) {
112     let cubic = &mut r.cubic_state;
113 
114     r.congestion_recovery_start_time = None;
115 
116     cubic.w_last_max = r.congestion_window as f64;
117     cubic.w_max = cubic.w_last_max;
118 
119     // 4.7 Timeout - reduce ssthresh based on BETA_CUBIC
120     r.ssthresh = (r.congestion_window as f64 * BETA_CUBIC) as usize;
121     r.ssthresh = cmp::max(r.ssthresh, recovery::MINIMUM_WINDOW);
122 
123     cubic.cwnd_inc = 0;
124 
125     reno::collapse_cwnd(r);
126 }
127 
on_packet_sent(r: &mut Recovery, sent_bytes: usize, now: Instant)128 fn on_packet_sent(r: &mut Recovery, sent_bytes: usize, now: Instant) {
129     // See https://github.com/torvalds/linux/commit/30927520dbae297182990bb21d08762bcc35ce1d
130     // First transmit when no packets in flight
131     let cubic = &mut r.cubic_state;
132 
133     if let Some(last_sent_time) = cubic.last_sent_time {
134         if r.bytes_in_flight == 0 {
135             let delta = now - last_sent_time;
136 
137             // We were application limited (idle) for a while.
138             // Shift epoch start to keep cwnd growth to cubic curve.
139             if let Some(recovery_start_time) = r.congestion_recovery_start_time {
140                 if delta.as_nanos() > 0 {
141                     r.congestion_recovery_start_time =
142                         Some(recovery_start_time + delta);
143                 }
144             }
145         }
146     }
147 
148     cubic.last_sent_time = Some(now);
149 
150     reno::on_packet_sent(r, sent_bytes, now);
151 }
152 
on_packet_acked( r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant, )153 fn on_packet_acked(
154     r: &mut Recovery, packet: &Acked, epoch: packet::Epoch, now: Instant,
155 ) {
156     let in_congestion_recovery = r.in_congestion_recovery(packet.time_sent);
157 
158     r.bytes_in_flight = r.bytes_in_flight.saturating_sub(packet.size);
159 
160     if in_congestion_recovery {
161         return;
162     }
163 
164     if r.app_limited {
165         return;
166     }
167 
168     if r.congestion_window < r.ssthresh {
169         // Slow start.
170         if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
171             let (cwnd, ssthresh) = r.hystart_on_packet_acked(packet, now);
172 
173             r.congestion_window = cwnd;
174             r.ssthresh = ssthresh;
175         } else {
176             // Reno Slow Start.
177             r.congestion_window += packet.size;
178         }
179     } else {
180         // Congestion avoidance.
181         let ca_start_time;
182 
183         // In LSS, use lss_start_time instead of congestion_recovery_start_time.
184         if r.hystart.enabled() &&
185             epoch == packet::EPOCH_APPLICATION &&
186             r.hystart.lss_start_time().is_some()
187         {
188             ca_start_time = r.hystart.lss_start_time().unwrap();
189 
190             // Reset w_max and k when LSS started.
191             if r.cubic_state.w_max == 0.0 {
192                 r.cubic_state.w_max = r.congestion_window as f64;
193                 r.cubic_state.k = 0.0;
194             }
195         } else {
196             match r.congestion_recovery_start_time {
197                 Some(t) => ca_start_time = t,
198                 None => {
199                     // When we come here without congestion_event() triggered,
200                     // initialize congestion_recovery_start_time, w_max and k.
201                     ca_start_time = now;
202                     r.congestion_recovery_start_time = Some(now);
203 
204                     r.cubic_state.w_max = r.congestion_window as f64;
205                     r.cubic_state.k = 0.0;
206                 },
207             }
208         }
209 
210         let t = now - ca_start_time;
211 
212         // w_cubic(t + rtt)
213         let w_cubic = r.cubic_state.w_cubic(t + r.min_rtt);
214 
215         // w_est(t)
216         let w_est = r.cubic_state.w_est(t, r.min_rtt);
217 
218         let mut cubic_cwnd = r.congestion_window;
219 
220         if w_cubic < w_est {
221             // TCP friendly region.
222             cubic_cwnd = cmp::max(cubic_cwnd, w_est as usize);
223         } else if cubic_cwnd < w_cubic as usize {
224             // Concave region or convex region use same increment.
225             let cubic_inc = (w_cubic - cubic_cwnd as f64) / cubic_cwnd as f64 *
226                 recovery::MAX_DATAGRAM_SIZE as f64;
227 
228             cubic_cwnd += cubic_inc as usize;
229         }
230 
231         // When in Limited Slow Start, take the max of CA cwnd and
232         // LSS cwnd.
233         if r.hystart.enabled() &&
234             epoch == packet::EPOCH_APPLICATION &&
235             r.hystart.lss_start_time().is_some()
236         {
237             let (lss_cwnd, _) = r.hystart_on_packet_acked(packet, now);
238 
239             cubic_cwnd = cmp::max(cubic_cwnd, lss_cwnd);
240         }
241 
242         // Update the increment and increase cwnd by MSS.
243         r.cubic_state.cwnd_inc += cubic_cwnd - r.congestion_window;
244 
245         // cwnd_inc can be more than 1 MSS in the late stage of max probing.
246         // however QUIC recovery draft 7.4 (Congestion Avoidance) limits
247         // the increase of cwnd to 1 max packet size per cwnd acknowledged.
248         if r.cubic_state.cwnd_inc >= recovery::MAX_DATAGRAM_SIZE {
249             r.congestion_window += recovery::MAX_DATAGRAM_SIZE;
250             r.cubic_state.cwnd_inc -= recovery::MAX_DATAGRAM_SIZE;
251         }
252     }
253 }
254 
congestion_event( r: &mut Recovery, time_sent: Instant, epoch: packet::Epoch, now: Instant, )255 fn congestion_event(
256     r: &mut Recovery, time_sent: Instant, epoch: packet::Epoch, now: Instant,
257 ) {
258     let in_congestion_recovery = r.in_congestion_recovery(time_sent);
259 
260     // Start a new congestion event if packet was sent after the
261     // start of the previous congestion recovery period.
262     if !in_congestion_recovery {
263         r.congestion_recovery_start_time = Some(now);
264 
265         // Fast convergence
266         if r.cubic_state.w_max < r.cubic_state.w_last_max {
267             r.cubic_state.w_last_max = r.cubic_state.w_max;
268             r.cubic_state.w_max =
269                 r.cubic_state.w_max as f64 * (1.0 + BETA_CUBIC) / 2.0;
270         } else {
271             r.cubic_state.w_last_max = r.cubic_state.w_max;
272         }
273 
274         r.cubic_state.w_max = r.congestion_window as f64;
275         r.ssthresh = (r.cubic_state.w_max * BETA_CUBIC) as usize;
276         r.ssthresh = cmp::max(r.ssthresh, recovery::MINIMUM_WINDOW);
277         r.congestion_window = r.ssthresh;
278         r.cubic_state.k = r.cubic_state.cubic_k();
279 
280         r.cubic_state.cwnd_inc =
281             (r.cubic_state.cwnd_inc as f64 * BETA_CUBIC) as usize;
282 
283         if r.hystart.enabled() && epoch == packet::EPOCH_APPLICATION {
284             r.hystart.congestion_event();
285         }
286     }
287 }
288 
289 #[cfg(test)]
290 mod tests {
291     use super::*;
292     use crate::recovery::hystart;
293 
294     #[test]
cubic_init()295     fn cubic_init() {
296         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
297         cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
298 
299         let r = Recovery::new(&cfg);
300 
301         assert!(r.cwnd() > 0);
302         assert_eq!(r.bytes_in_flight, 0);
303     }
304 
305     #[test]
cubic_send()306     fn cubic_send() {
307         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
308         cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
309 
310         let mut r = Recovery::new(&cfg);
311 
312         r.on_packet_sent_cc(1000, Instant::now());
313 
314         assert_eq!(r.bytes_in_flight, 1000);
315     }
316 
317     #[test]
cubic_slow_start()318     fn cubic_slow_start() {
319         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
320         cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
321 
322         let mut r = Recovery::new(&cfg);
323         let now = Instant::now();
324 
325         let p = recovery::Sent {
326             pkt_num: 0,
327             frames: vec![],
328             time_sent: now,
329             time_acked: None,
330             time_lost: None,
331             size: 5000,
332             ack_eliciting: true,
333             in_flight: true,
334             delivered: 0,
335             delivered_time: now,
336             recent_delivered_packet_sent_time: now,
337             is_app_limited: false,
338             has_data: false,
339         };
340 
341         // Send 5k x 4 = 20k, higher than default cwnd(~15k)
342         // to become no longer app limited
343         r.on_packet_sent_cc(p.size, now);
344         r.on_packet_sent_cc(p.size, now);
345         r.on_packet_sent_cc(p.size, now);
346         r.on_packet_sent_cc(p.size, now);
347 
348         let cwnd_prev = r.cwnd();
349 
350         let acked = vec![Acked {
351             pkt_num: p.pkt_num,
352             time_sent: p.time_sent,
353             size: p.size,
354         }];
355 
356         r.on_packets_acked(acked, packet::EPOCH_APPLICATION, now);
357 
358         // Check if cwnd increased by packet size (slow start)
359         assert_eq!(r.cwnd(), cwnd_prev + p.size);
360     }
361 
362     #[test]
cubic_congestion_event()363     fn cubic_congestion_event() {
364         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
365         cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
366 
367         let mut r = Recovery::new(&cfg);
368         let now = Instant::now();
369         let prev_cwnd = r.cwnd();
370 
371         r.congestion_event(now, packet::EPOCH_APPLICATION, now);
372 
373         // In CUBIC, after congestion event, cwnd will be reduced by (1 -
374         // CUBIC_BETA)
375         assert_eq!(prev_cwnd as f64 * BETA_CUBIC, r.cwnd() as f64);
376     }
377 
378     #[test]
cubic_congestion_avoidance()379     fn cubic_congestion_avoidance() {
380         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
381         cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
382 
383         let mut r = Recovery::new(&cfg);
384         let now = Instant::now();
385         let prev_cwnd = r.cwnd();
386 
387         // Fill up bytes_in_flight to avoid app_limited=true
388         r.on_packet_sent_cc(20000, now);
389 
390         // Trigger congestion event to update ssthresh
391         r.congestion_event(now, packet::EPOCH_APPLICATION, now);
392 
393         // After congestion event, cwnd will be reduced.
394         let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize;
395         assert_eq!(r.cwnd(), cur_cwnd);
396 
397         let rtt = Duration::from_millis(100);
398 
399         let acked = vec![Acked {
400             pkt_num: 0,
401             // To exit from recovery
402             time_sent: now + rtt,
403             size: 8000,
404         }];
405 
406         // Ack more than cwnd bytes with rtt=100ms
407         r.update_rtt(rtt, Duration::from_millis(0), now);
408         r.on_packets_acked(acked, packet::EPOCH_APPLICATION, now + rtt * 3);
409 
410         // After acking more than cwnd, expect cwnd increased by MSS
411         assert_eq!(r.cwnd(), cur_cwnd + recovery::MAX_DATAGRAM_SIZE);
412     }
413 
414     #[test]
cubic_collapse_cwnd_and_restart()415     fn cubic_collapse_cwnd_and_restart() {
416         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
417         cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
418 
419         let mut r = Recovery::new(&cfg);
420         let now = Instant::now();
421 
422         // Fill up bytes_in_flight to avoid app_limited=true
423         r.on_packet_sent_cc(30000, now);
424 
425         // Trigger congestion event to update ssthresh
426         r.congestion_event(now, packet::EPOCH_APPLICATION, now);
427 
428         // After persistent congestion, cwnd should be MINIMUM_WINDOW
429         r.collapse_cwnd();
430         assert_eq!(r.cwnd(), recovery::MINIMUM_WINDOW);
431 
432         let acked = vec![Acked {
433             pkt_num: 0,
434             // To exit from recovery
435             time_sent: now + Duration::from_millis(1),
436             size: 10000,
437         }];
438 
439         // rtt = 100ms
440         let rtt = Duration::from_millis(100);
441         std::thread::sleep(rtt);
442 
443         // Ack 10000 x 2 to exit from slow start
444         r.on_packets_acked(acked.clone(), packet::EPOCH_APPLICATION, now);
445         std::thread::sleep(rtt);
446 
447         // This will make CC into congestion avoidance mode
448         r.on_packets_acked(acked, packet::EPOCH_APPLICATION, now);
449 
450         assert_eq!(r.cwnd(), recovery::MINIMUM_WINDOW + 10000);
451     }
452 
453     #[test]
cubic_hystart_limited_slow_start()454     fn cubic_hystart_limited_slow_start() {
455         let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap();
456         cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::CUBIC);
457         cfg.enable_hystart(true);
458 
459         let mut r = Recovery::new(&cfg);
460         let now = Instant::now();
461         let pkt_num = 0;
462         let epoch = packet::EPOCH_APPLICATION;
463 
464         let p = recovery::Sent {
465             pkt_num: 0,
466             frames: vec![],
467             time_sent: now,
468             time_acked: None,
469             time_lost: None,
470             size: recovery::MAX_DATAGRAM_SIZE,
471             ack_eliciting: true,
472             in_flight: true,
473             delivered: 0,
474             delivered_time: now,
475             recent_delivered_packet_sent_time: now,
476             is_app_limited: false,
477             has_data: false,
478         };
479 
480         // 1st round.
481         let n_rtt_sample = hystart::N_RTT_SAMPLE;
482         let pkts_1st_round = n_rtt_sample as u64;
483         r.hystart.start_round(pkt_num);
484 
485         let rtt_1st = 50;
486 
487         // Send 1st round packets.
488         for _ in 0..n_rtt_sample {
489             r.on_packet_sent_cc(p.size, now);
490         }
491 
492         // Receving Acks.
493         let now = now + Duration::from_millis(rtt_1st);
494         for _ in 0..n_rtt_sample {
495             r.update_rtt(
496                 Duration::from_millis(rtt_1st),
497                 Duration::from_millis(0),
498                 now,
499             );
500 
501             let acked = vec![Acked {
502                 pkt_num: p.pkt_num,
503                 time_sent: p.time_sent,
504                 size: p.size,
505             }];
506 
507             r.on_packets_acked(acked, epoch, now);
508         }
509 
510         // Not in LSS yet.
511         assert_eq!(r.hystart.lss_start_time().is_some(), false);
512 
513         // 2nd round.
514         r.hystart.start_round(pkts_1st_round * 2 + 1);
515 
516         let mut rtt_2nd = 100;
517         let now = now + Duration::from_millis(rtt_2nd);
518 
519         // Send 2nd round packets.
520         for _ in 0..n_rtt_sample + 1 {
521             r.on_packet_sent_cc(p.size, now);
522         }
523 
524         // Receving Acks.
525         // Last ack will cause to exit to LSS.
526         let mut cwnd_prev = r.cwnd();
527 
528         for _ in 0..n_rtt_sample + 1 {
529             cwnd_prev = r.cwnd();
530             r.update_rtt(
531                 Duration::from_millis(rtt_2nd),
532                 Duration::from_millis(0),
533                 now,
534             );
535 
536             let acked = vec![Acked {
537                 pkt_num: p.pkt_num,
538                 time_sent: p.time_sent,
539                 size: p.size,
540             }];
541 
542             r.on_packets_acked(acked, epoch, now);
543 
544             // Keep increasing RTT so that hystart exits to LSS.
545             rtt_2nd += 4;
546         }
547 
548         // Now we are in LSS.
549         assert_eq!(r.hystart.lss_start_time().is_some(), true);
550         assert_eq!(r.cwnd(), cwnd_prev);
551 
552         // Ack'ing more packet to increase cwnd by 1 MSS
553         for _ in 0..3 {
554             let acked = vec![Acked {
555                 pkt_num: p.pkt_num,
556                 time_sent: p.time_sent,
557                 size: p.size,
558             }];
559             r.on_packets_acked(acked, epoch, now);
560         }
561 
562         assert_eq!(r.cwnd(), cwnd_prev + recovery::MAX_DATAGRAM_SIZE);
563     }
564 }
565