quinn_proto/congestion/bbr/
mod.rs

1use std::any::Any;
2use std::fmt::Debug;
3use std::sync::Arc;
4
5use rand::{Rng, SeedableRng};
6
7use crate::congestion::bbr::bw_estimation::BandwidthEstimation;
8use crate::congestion::bbr::min_max::MinMax;
9use crate::connection::RttEstimator;
10use crate::{Duration, Instant};
11
12use super::{BASE_DATAGRAM_SIZE, Controller, ControllerFactory};
13
14mod bw_estimation;
15mod min_max;
16
17/// Experimental! Use at your own risk.
18///
19/// Aims for reduced buffer bloat and improved performance over high bandwidth-delay product networks.
20/// Based on google's quiche implementation <https://source.chromium.org/chromium/chromium/src/+/master:net/third_party/quiche/src/quic/core/congestion_control/bbr_sender.cc>
21/// of BBR <https://datatracker.ietf.org/doc/html/draft-cardwell-iccrg-bbr-congestion-control>.
22/// More discussion and links at <https://groups.google.com/g/bbr-dev>.
23#[derive(Debug, Clone)]
24pub struct Bbr {
25    config: Arc<BbrConfig>,
26    current_mtu: u64,
27    max_bandwidth: BandwidthEstimation,
28    acked_bytes: u64,
29    mode: Mode,
30    loss_state: LossState,
31    recovery_state: RecoveryState,
32    recovery_window: u64,
33    is_at_full_bandwidth: bool,
34    pacing_gain: f32,
35    high_gain: f32,
36    drain_gain: f32,
37    cwnd_gain: f32,
38    high_cwnd_gain: f32,
39    last_cycle_start: Option<Instant>,
40    current_cycle_offset: u8,
41    init_cwnd: u64,
42    min_cwnd: u64,
43    prev_in_flight_count: u64,
44    exit_probe_rtt_at: Option<Instant>,
45    probe_rtt_last_started_at: Option<Instant>,
46    min_rtt: Duration,
47    exiting_quiescence: bool,
48    pacing_rate: u64,
49    max_acked_packet_number: u64,
50    max_sent_packet_number: u64,
51    end_recovery_at_packet_number: u64,
52    cwnd: u64,
53    current_round_trip_end_packet_number: u64,
54    round_count: u64,
55    bw_at_last_round: u64,
56    round_wo_bw_gain: u64,
57    ack_aggregation: AckAggregationState,
58    random_number_generator: rand::rngs::StdRng,
59}
60
61impl Bbr {
62    /// Construct a state using the given `config` and current time `now`
63    pub fn new(config: Arc<BbrConfig>, current_mtu: u16) -> Self {
64        let initial_window = config.initial_window;
65        Self {
66            config,
67            current_mtu: current_mtu as u64,
68            max_bandwidth: BandwidthEstimation::default(),
69            acked_bytes: 0,
70            mode: Mode::Startup,
71            loss_state: Default::default(),
72            recovery_state: RecoveryState::NotInRecovery,
73            recovery_window: 0,
74            is_at_full_bandwidth: false,
75            pacing_gain: K_DEFAULT_HIGH_GAIN,
76            high_gain: K_DEFAULT_HIGH_GAIN,
77            drain_gain: 1.0 / K_DEFAULT_HIGH_GAIN,
78            cwnd_gain: K_DEFAULT_HIGH_GAIN,
79            high_cwnd_gain: K_DEFAULT_HIGH_GAIN,
80            last_cycle_start: None,
81            current_cycle_offset: 0,
82            init_cwnd: initial_window,
83            min_cwnd: calculate_min_window(current_mtu as u64),
84            prev_in_flight_count: 0,
85            exit_probe_rtt_at: None,
86            probe_rtt_last_started_at: None,
87            min_rtt: Default::default(),
88            exiting_quiescence: false,
89            pacing_rate: 0,
90            max_acked_packet_number: 0,
91            max_sent_packet_number: 0,
92            end_recovery_at_packet_number: 0,
93            cwnd: initial_window,
94            current_round_trip_end_packet_number: 0,
95            round_count: 0,
96            bw_at_last_round: 0,
97            round_wo_bw_gain: 0,
98            ack_aggregation: AckAggregationState::default(),
99            random_number_generator: rand::rngs::StdRng::from_os_rng(),
100        }
101    }
102
103    fn enter_startup_mode(&mut self) {
104        self.mode = Mode::Startup;
105        self.pacing_gain = self.high_gain;
106        self.cwnd_gain = self.high_cwnd_gain;
107    }
108
109    fn enter_probe_bandwidth_mode(&mut self, now: Instant) {
110        self.mode = Mode::ProbeBw;
111        self.cwnd_gain = K_DERIVED_HIGH_CWNDGAIN;
112        self.last_cycle_start = Some(now);
113        // Pick a random offset for the gain cycle out of {0, 2..7} range. 1 is
114        // excluded because in that case increased gain and decreased gain would not
115        // follow each other.
116        let mut rand_index = self
117            .random_number_generator
118            .random_range(0..K_PACING_GAIN.len() as u8 - 1);
119        if rand_index >= 1 {
120            rand_index += 1;
121        }
122        self.current_cycle_offset = rand_index;
123        self.pacing_gain = K_PACING_GAIN[rand_index as usize];
124    }
125
126    fn update_recovery_state(&mut self, is_round_start: bool) {
127        // Exit recovery when there are no losses for a round.
128        if self.loss_state.has_losses() {
129            self.end_recovery_at_packet_number = self.max_sent_packet_number;
130        }
131        match self.recovery_state {
132            // Enter conservation on the first loss.
133            RecoveryState::NotInRecovery if self.loss_state.has_losses() => {
134                self.recovery_state = RecoveryState::Conservation;
135                // This will cause the |recovery_window| to be set to the
136                // correct value in CalculateRecoveryWindow().
137                self.recovery_window = 0;
138                // Since the conservation phase is meant to be lasting for a whole
139                // round, extend the current round as if it were started right now.
140                self.current_round_trip_end_packet_number = self.max_sent_packet_number;
141            }
142            RecoveryState::Growth | RecoveryState::Conservation => {
143                if self.recovery_state == RecoveryState::Conservation && is_round_start {
144                    self.recovery_state = RecoveryState::Growth;
145                }
146                // Exit recovery if appropriate.
147                if !self.loss_state.has_losses()
148                    && self.max_acked_packet_number > self.end_recovery_at_packet_number
149                {
150                    self.recovery_state = RecoveryState::NotInRecovery;
151                }
152            }
153            _ => {}
154        }
155    }
156
157    fn update_gain_cycle_phase(&mut self, now: Instant, in_flight: u64) {
158        // In most cases, the cycle is advanced after an RTT passes.
159        let mut should_advance_gain_cycling = self
160            .last_cycle_start
161            .map(|last_cycle_start| now.duration_since(last_cycle_start) > self.min_rtt)
162            .unwrap_or(false);
163        // If the pacing gain is above 1.0, the connection is trying to probe the
164        // bandwidth by increasing the number of bytes in flight to at least
165        // pacing_gain * BDP.  Make sure that it actually reaches the target, as
166        // long as there are no losses suggesting that the buffers are not able to
167        // hold that much.
168        if self.pacing_gain > 1.0
169            && !self.loss_state.has_losses()
170            && self.prev_in_flight_count < self.get_target_cwnd(self.pacing_gain)
171        {
172            should_advance_gain_cycling = false;
173        }
174
175        // If pacing gain is below 1.0, the connection is trying to drain the extra
176        // queue which could have been incurred by probing prior to it.  If the
177        // number of bytes in flight falls down to the estimated BDP value earlier,
178        // conclude that the queue has been successfully drained and exit this cycle
179        // early.
180        if self.pacing_gain < 1.0 && in_flight <= self.get_target_cwnd(1.0) {
181            should_advance_gain_cycling = true;
182        }
183
184        if should_advance_gain_cycling {
185            self.current_cycle_offset = (self.current_cycle_offset + 1) % K_PACING_GAIN.len() as u8;
186            self.last_cycle_start = Some(now);
187            // Stay in low gain mode until the target BDP is hit.  Low gain mode
188            // will be exited immediately when the target BDP is achieved.
189            if DRAIN_TO_TARGET
190                && self.pacing_gain < 1.0
191                && (K_PACING_GAIN[self.current_cycle_offset as usize] - 1.0).abs() < f32::EPSILON
192                && in_flight > self.get_target_cwnd(1.0)
193            {
194                return;
195            }
196            self.pacing_gain = K_PACING_GAIN[self.current_cycle_offset as usize];
197        }
198    }
199
200    fn maybe_exit_startup_or_drain(&mut self, now: Instant, in_flight: u64) {
201        if self.mode == Mode::Startup && self.is_at_full_bandwidth {
202            self.mode = Mode::Drain;
203            self.pacing_gain = self.drain_gain;
204            self.cwnd_gain = self.high_cwnd_gain;
205        }
206        if self.mode == Mode::Drain && in_flight <= self.get_target_cwnd(1.0) {
207            self.enter_probe_bandwidth_mode(now);
208        }
209    }
210
211    fn is_min_rtt_expired(&self, now: Instant, app_limited: bool) -> bool {
212        !app_limited
213            && self
214                .probe_rtt_last_started_at
215                .map(|last| now.saturating_duration_since(last) > Duration::from_secs(10))
216                .unwrap_or(true)
217    }
218
219    fn maybe_enter_or_exit_probe_rtt(
220        &mut self,
221        now: Instant,
222        is_round_start: bool,
223        bytes_in_flight: u64,
224        app_limited: bool,
225    ) {
226        let min_rtt_expired = self.is_min_rtt_expired(now, app_limited);
227        if min_rtt_expired && !self.exiting_quiescence && self.mode != Mode::ProbeRtt {
228            self.mode = Mode::ProbeRtt;
229            self.pacing_gain = 1.0;
230            // Do not decide on the time to exit ProbeRtt until the
231            // |bytes_in_flight| is at the target small value.
232            self.exit_probe_rtt_at = None;
233            self.probe_rtt_last_started_at = Some(now);
234        }
235
236        if self.mode == Mode::ProbeRtt {
237            if self.exit_probe_rtt_at.is_none() {
238                // If the window has reached the appropriate size, schedule exiting
239                // ProbeRtt.  The CWND during ProbeRtt is
240                // kMinimumCongestionWindow, but we allow an extra packet since QUIC
241                // checks CWND before sending a packet.
242                if bytes_in_flight < self.get_probe_rtt_cwnd() + self.current_mtu {
243                    const K_PROBE_RTT_TIME: Duration = Duration::from_millis(200);
244                    self.exit_probe_rtt_at = Some(now + K_PROBE_RTT_TIME);
245                }
246            } else if is_round_start && now >= self.exit_probe_rtt_at.unwrap() {
247                if !self.is_at_full_bandwidth {
248                    self.enter_startup_mode();
249                } else {
250                    self.enter_probe_bandwidth_mode(now);
251                }
252            }
253        }
254
255        self.exiting_quiescence = false;
256    }
257
258    fn get_target_cwnd(&self, gain: f32) -> u64 {
259        let bw = self.max_bandwidth.get_estimate();
260        let bdp = self.min_rtt.as_micros() as u64 * bw;
261        let bdpf = bdp as f64;
262        let cwnd = ((gain as f64 * bdpf) / 1_000_000f64) as u64;
263        // BDP estimate will be zero if no bandwidth samples are available yet.
264        if cwnd == 0 {
265            return self.init_cwnd;
266        }
267        cwnd.max(self.min_cwnd)
268    }
269
270    fn get_probe_rtt_cwnd(&self) -> u64 {
271        const K_MODERATE_PROBE_RTT_MULTIPLIER: f32 = 0.75;
272        if PROBE_RTT_BASED_ON_BDP {
273            return self.get_target_cwnd(K_MODERATE_PROBE_RTT_MULTIPLIER);
274        }
275        self.min_cwnd
276    }
277
278    fn calculate_pacing_rate(&mut self) {
279        let bw = self.max_bandwidth.get_estimate();
280        if bw == 0 {
281            return;
282        }
283        let target_rate = (bw as f64 * self.pacing_gain as f64) as u64;
284        if self.is_at_full_bandwidth {
285            self.pacing_rate = target_rate;
286            return;
287        }
288
289        // Pace at the rate of initial_window / RTT as soon as RTT measurements are
290        // available.
291        if self.pacing_rate == 0 && self.min_rtt.as_nanos() != 0 {
292            self.pacing_rate =
293                BandwidthEstimation::bw_from_delta(self.init_cwnd, self.min_rtt).unwrap();
294            return;
295        }
296
297        // Do not decrease the pacing rate during startup.
298        if self.pacing_rate < target_rate {
299            self.pacing_rate = target_rate;
300        }
301    }
302
303    fn calculate_cwnd(&mut self, bytes_acked: u64, excess_acked: u64) {
304        if self.mode == Mode::ProbeRtt {
305            return;
306        }
307        let mut target_window = self.get_target_cwnd(self.cwnd_gain);
308        if self.is_at_full_bandwidth {
309            // Add the max recently measured ack aggregation to CWND.
310            target_window += self.ack_aggregation.max_ack_height.get();
311        } else {
312            // Add the most recent excess acked.  Because CWND never decreases in
313            // STARTUP, this will automatically create a very localized max filter.
314            target_window += excess_acked;
315        }
316        // Instead of immediately setting the target CWND as the new one, BBR grows
317        // the CWND towards |target_window| by only increasing it |bytes_acked| at a
318        // time.
319        if self.is_at_full_bandwidth {
320            self.cwnd = target_window.min(self.cwnd + bytes_acked);
321        } else if (self.cwnd_gain < target_window as f32) || (self.acked_bytes < self.init_cwnd) {
322            // If the connection is not yet out of startup phase, do not decrease
323            // the window.
324            self.cwnd += bytes_acked;
325        }
326
327        // Enforce the limits on the congestion window.
328        if self.cwnd < self.min_cwnd {
329            self.cwnd = self.min_cwnd;
330        }
331    }
332
333    fn calculate_recovery_window(&mut self, bytes_acked: u64, bytes_lost: u64, in_flight: u64) {
334        if !self.recovery_state.in_recovery() {
335            return;
336        }
337        // Set up the initial recovery window.
338        if self.recovery_window == 0 {
339            self.recovery_window = self.min_cwnd.max(in_flight + bytes_acked);
340            return;
341        }
342
343        // Remove losses from the recovery window, while accounting for a potential
344        // integer underflow.
345        if self.recovery_window >= bytes_lost {
346            self.recovery_window -= bytes_lost;
347        } else {
348            // k_max_segment_size = current_mtu
349            self.recovery_window = self.current_mtu;
350        }
351        // In CONSERVATION mode, just subtracting losses is sufficient.  In GROWTH,
352        // release additional |bytes_acked| to achieve a slow-start-like behavior.
353        if self.recovery_state == RecoveryState::Growth {
354            self.recovery_window += bytes_acked;
355        }
356
357        // Sanity checks.  Ensure that we always allow to send at least an MSS or
358        // |bytes_acked| in response, whichever is larger.
359        self.recovery_window = self
360            .recovery_window
361            .max(in_flight + bytes_acked)
362            .max(self.min_cwnd);
363    }
364
365    /// <https://datatracker.ietf.org/doc/html/draft-cardwell-iccrg-bbr-congestion-control#section-4.3.2.2>
366    fn check_if_full_bw_reached(&mut self, app_limited: bool) {
367        if app_limited {
368            return;
369        }
370        let target = (self.bw_at_last_round as f64 * K_STARTUP_GROWTH_TARGET as f64) as u64;
371        let bw = self.max_bandwidth.get_estimate();
372        if bw >= target {
373            self.bw_at_last_round = bw;
374            self.round_wo_bw_gain = 0;
375            self.ack_aggregation.max_ack_height.reset();
376            return;
377        }
378
379        self.round_wo_bw_gain += 1;
380        if self.round_wo_bw_gain >= K_ROUND_TRIPS_WITHOUT_GROWTH_BEFORE_EXITING_STARTUP as u64
381            || (self.recovery_state.in_recovery())
382        {
383            self.is_at_full_bandwidth = true;
384        }
385    }
386}
387
388impl Controller for Bbr {
389    fn on_sent(&mut self, now: Instant, bytes: u64, last_packet_number: u64) {
390        self.max_sent_packet_number = last_packet_number;
391        self.max_bandwidth.on_sent(now, bytes);
392    }
393
394    fn on_ack(
395        &mut self,
396        now: Instant,
397        sent: Instant,
398        bytes: u64,
399        app_limited: bool,
400        rtt: &RttEstimator,
401    ) {
402        self.max_bandwidth
403            .on_ack(now, sent, bytes, self.round_count, app_limited);
404        self.acked_bytes += bytes;
405        if self.is_min_rtt_expired(now, app_limited) || self.min_rtt > rtt.min() {
406            self.min_rtt = rtt.min();
407        }
408    }
409
410    fn on_end_acks(
411        &mut self,
412        now: Instant,
413        in_flight: u64,
414        app_limited: bool,
415        largest_packet_num_acked: Option<u64>,
416    ) {
417        let bytes_acked = self.max_bandwidth.bytes_acked_this_window();
418        let excess_acked = self.ack_aggregation.update_ack_aggregation_bytes(
419            bytes_acked,
420            now,
421            self.round_count,
422            self.max_bandwidth.get_estimate(),
423        );
424        self.max_bandwidth.end_acks(self.round_count, app_limited);
425        if let Some(largest_acked_packet) = largest_packet_num_acked {
426            self.max_acked_packet_number = largest_acked_packet;
427        }
428
429        let mut is_round_start = false;
430        if bytes_acked > 0 {
431            is_round_start =
432                self.max_acked_packet_number > self.current_round_trip_end_packet_number;
433            if is_round_start {
434                self.current_round_trip_end_packet_number = self.max_sent_packet_number;
435                self.round_count += 1;
436            }
437        }
438
439        self.update_recovery_state(is_round_start);
440
441        if self.mode == Mode::ProbeBw {
442            self.update_gain_cycle_phase(now, in_flight);
443        }
444
445        if is_round_start && !self.is_at_full_bandwidth {
446            self.check_if_full_bw_reached(app_limited);
447        }
448
449        self.maybe_exit_startup_or_drain(now, in_flight);
450
451        self.maybe_enter_or_exit_probe_rtt(now, is_round_start, in_flight, app_limited);
452
453        // After the model is updated, recalculate the pacing rate and congestion window.
454        self.calculate_pacing_rate();
455        self.calculate_cwnd(bytes_acked, excess_acked);
456        self.calculate_recovery_window(bytes_acked, self.loss_state.lost_bytes, in_flight);
457
458        self.prev_in_flight_count = in_flight;
459        self.loss_state.reset();
460    }
461
462    fn on_congestion_event(
463        &mut self,
464        _now: Instant,
465        _sent: Instant,
466        _is_persistent_congestion: bool,
467        lost_bytes: u64,
468    ) {
469        self.loss_state.lost_bytes += lost_bytes;
470    }
471
472    fn on_mtu_update(&mut self, new_mtu: u16) {
473        self.current_mtu = new_mtu as u64;
474        self.min_cwnd = calculate_min_window(self.current_mtu);
475        self.init_cwnd = self.config.initial_window.max(self.min_cwnd);
476        self.cwnd = self.cwnd.max(self.min_cwnd);
477    }
478
479    fn window(&self) -> u64 {
480        if self.mode == Mode::ProbeRtt {
481            return self.get_probe_rtt_cwnd();
482        } else if self.recovery_state.in_recovery() && self.mode != Mode::Startup {
483            return self.cwnd.min(self.recovery_window);
484        }
485        self.cwnd
486    }
487
488    fn clone_box(&self) -> Box<dyn Controller> {
489        Box::new(self.clone())
490    }
491
492    fn initial_window(&self) -> u64 {
493        self.config.initial_window
494    }
495
496    fn into_any(self: Box<Self>) -> Box<dyn Any> {
497        self
498    }
499}
500
501/// Configuration for the [`Bbr`] congestion controller
502#[derive(Debug, Clone)]
503pub struct BbrConfig {
504    initial_window: u64,
505}
506
507impl BbrConfig {
508    /// Default limit on the amount of outstanding data in bytes.
509    ///
510    /// Recommended value: `min(10 * max_datagram_size, max(2 * max_datagram_size, 14720))`
511    pub fn initial_window(&mut self, value: u64) -> &mut Self {
512        self.initial_window = value;
513        self
514    }
515}
516
517impl Default for BbrConfig {
518    fn default() -> Self {
519        Self {
520            initial_window: K_MAX_INITIAL_CONGESTION_WINDOW * BASE_DATAGRAM_SIZE,
521        }
522    }
523}
524
525impl ControllerFactory for BbrConfig {
526    fn build(self: Arc<Self>, _now: Instant, current_mtu: u16) -> Box<dyn Controller> {
527        Box::new(Bbr::new(self, current_mtu))
528    }
529}
530
531#[derive(Debug, Default, Copy, Clone)]
532struct AckAggregationState {
533    max_ack_height: MinMax,
534    aggregation_epoch_start_time: Option<Instant>,
535    aggregation_epoch_bytes: u64,
536}
537
538impl AckAggregationState {
539    fn update_ack_aggregation_bytes(
540        &mut self,
541        newly_acked_bytes: u64,
542        now: Instant,
543        round: u64,
544        max_bandwidth: u64,
545    ) -> u64 {
546        // Compute how many bytes are expected to be delivered, assuming max
547        // bandwidth is correct.
548        let expected_bytes_acked = max_bandwidth
549            * now
550                .saturating_duration_since(self.aggregation_epoch_start_time.unwrap_or(now))
551                .as_micros() as u64
552            / 1_000_000;
553
554        // Reset the current aggregation epoch as soon as the ack arrival rate is
555        // less than or equal to the max bandwidth.
556        if self.aggregation_epoch_bytes <= expected_bytes_acked {
557            // Reset to start measuring a new aggregation epoch.
558            self.aggregation_epoch_bytes = newly_acked_bytes;
559            self.aggregation_epoch_start_time = Some(now);
560            return 0;
561        }
562
563        // Compute how many extra bytes were delivered vs max bandwidth.
564        // Include the bytes most recently acknowledged to account for stretch acks.
565        self.aggregation_epoch_bytes += newly_acked_bytes;
566        let diff = self.aggregation_epoch_bytes - expected_bytes_acked;
567        self.max_ack_height.update_max(round, diff);
568        diff
569    }
570}
571
572#[derive(Debug, Clone, Copy, Eq, PartialEq)]
573enum Mode {
574    // Startup phase of the connection.
575    Startup,
576    // After achieving the highest possible bandwidth during the startup, lower
577    // the pacing rate in order to drain the queue.
578    Drain,
579    // Cruising mode.
580    ProbeBw,
581    // Temporarily slow down sending in order to empty the buffer and measure
582    // the real minimum RTT.
583    ProbeRtt,
584}
585
586// Indicates how the congestion control limits the amount of bytes in flight.
587#[derive(Debug, Clone, Copy, Eq, PartialEq)]
588enum RecoveryState {
589    // Do not limit.
590    NotInRecovery,
591    // Allow an extra outstanding byte for each byte acknowledged.
592    Conservation,
593    // Allow two extra outstanding bytes for each byte acknowledged (slow
594    // start).
595    Growth,
596}
597
598impl RecoveryState {
599    pub(super) fn in_recovery(&self) -> bool {
600        !matches!(self, Self::NotInRecovery)
601    }
602}
603
604#[derive(Debug, Clone, Default)]
605struct LossState {
606    lost_bytes: u64,
607}
608
609impl LossState {
610    pub(super) fn reset(&mut self) {
611        self.lost_bytes = 0;
612    }
613
614    pub(super) fn has_losses(&self) -> bool {
615        self.lost_bytes != 0
616    }
617}
618
619fn calculate_min_window(current_mtu: u64) -> u64 {
620    4 * current_mtu
621}
622
623// The gain used for the STARTUP, equal to 2/ln(2).
624const K_DEFAULT_HIGH_GAIN: f32 = 2.885;
625// The newly derived CWND gain for STARTUP, 2.
626const K_DERIVED_HIGH_CWNDGAIN: f32 = 2.0;
627// The cycle of gains used during the ProbeBw stage.
628const K_PACING_GAIN: [f32; 8] = [1.25, 0.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
629
630const K_STARTUP_GROWTH_TARGET: f32 = 1.25;
631const K_ROUND_TRIPS_WITHOUT_GROWTH_BEFORE_EXITING_STARTUP: u8 = 3;
632
633// Do not allow initial congestion window to be greater than 200 packets.
634const K_MAX_INITIAL_CONGESTION_WINDOW: u64 = 200;
635
636const PROBE_RTT_BASED_ON_BDP: bool = true;
637const DRAIN_TO_TARGET: bool = true;