1use std::any::Any;
2use std::fmt::Debug;
3use std::sync::Arc;
4
5use rand::{Rng, SeedableRng};
6
7use crate::congestion::bbr::bw_estimation::BandwidthEstimation;
8use crate::congestion::bbr::min_max::MinMax;
9use crate::connection::RttEstimator;
10use crate::{Duration, Instant};
11
12use super::{BASE_DATAGRAM_SIZE, Controller, ControllerFactory};
13
14mod bw_estimation;
15mod min_max;
16
17#[derive(Debug, Clone)]
24pub struct Bbr {
25 config: Arc<BbrConfig>,
26 current_mtu: u64,
27 max_bandwidth: BandwidthEstimation,
28 acked_bytes: u64,
29 mode: Mode,
30 loss_state: LossState,
31 recovery_state: RecoveryState,
32 recovery_window: u64,
33 is_at_full_bandwidth: bool,
34 pacing_gain: f32,
35 high_gain: f32,
36 drain_gain: f32,
37 cwnd_gain: f32,
38 high_cwnd_gain: f32,
39 last_cycle_start: Option<Instant>,
40 current_cycle_offset: u8,
41 init_cwnd: u64,
42 min_cwnd: u64,
43 prev_in_flight_count: u64,
44 exit_probe_rtt_at: Option<Instant>,
45 probe_rtt_last_started_at: Option<Instant>,
46 min_rtt: Duration,
47 exiting_quiescence: bool,
48 pacing_rate: u64,
49 max_acked_packet_number: u64,
50 max_sent_packet_number: u64,
51 end_recovery_at_packet_number: u64,
52 cwnd: u64,
53 current_round_trip_end_packet_number: u64,
54 round_count: u64,
55 bw_at_last_round: u64,
56 round_wo_bw_gain: u64,
57 ack_aggregation: AckAggregationState,
58 random_number_generator: rand::rngs::StdRng,
59}
60
61impl Bbr {
62 pub fn new(config: Arc<BbrConfig>, current_mtu: u16) -> Self {
64 let initial_window = config.initial_window;
65 Self {
66 config,
67 current_mtu: current_mtu as u64,
68 max_bandwidth: BandwidthEstimation::default(),
69 acked_bytes: 0,
70 mode: Mode::Startup,
71 loss_state: Default::default(),
72 recovery_state: RecoveryState::NotInRecovery,
73 recovery_window: 0,
74 is_at_full_bandwidth: false,
75 pacing_gain: K_DEFAULT_HIGH_GAIN,
76 high_gain: K_DEFAULT_HIGH_GAIN,
77 drain_gain: 1.0 / K_DEFAULT_HIGH_GAIN,
78 cwnd_gain: K_DEFAULT_HIGH_GAIN,
79 high_cwnd_gain: K_DEFAULT_HIGH_GAIN,
80 last_cycle_start: None,
81 current_cycle_offset: 0,
82 init_cwnd: initial_window,
83 min_cwnd: calculate_min_window(current_mtu as u64),
84 prev_in_flight_count: 0,
85 exit_probe_rtt_at: None,
86 probe_rtt_last_started_at: None,
87 min_rtt: Default::default(),
88 exiting_quiescence: false,
89 pacing_rate: 0,
90 max_acked_packet_number: 0,
91 max_sent_packet_number: 0,
92 end_recovery_at_packet_number: 0,
93 cwnd: initial_window,
94 current_round_trip_end_packet_number: 0,
95 round_count: 0,
96 bw_at_last_round: 0,
97 round_wo_bw_gain: 0,
98 ack_aggregation: AckAggregationState::default(),
99 random_number_generator: rand::rngs::StdRng::from_os_rng(),
100 }
101 }
102
103 fn enter_startup_mode(&mut self) {
104 self.mode = Mode::Startup;
105 self.pacing_gain = self.high_gain;
106 self.cwnd_gain = self.high_cwnd_gain;
107 }
108
109 fn enter_probe_bandwidth_mode(&mut self, now: Instant) {
110 self.mode = Mode::ProbeBw;
111 self.cwnd_gain = K_DERIVED_HIGH_CWNDGAIN;
112 self.last_cycle_start = Some(now);
113 let mut rand_index = self
117 .random_number_generator
118 .random_range(0..K_PACING_GAIN.len() as u8 - 1);
119 if rand_index >= 1 {
120 rand_index += 1;
121 }
122 self.current_cycle_offset = rand_index;
123 self.pacing_gain = K_PACING_GAIN[rand_index as usize];
124 }
125
126 fn update_recovery_state(&mut self, is_round_start: bool) {
127 if self.loss_state.has_losses() {
129 self.end_recovery_at_packet_number = self.max_sent_packet_number;
130 }
131 match self.recovery_state {
132 RecoveryState::NotInRecovery if self.loss_state.has_losses() => {
134 self.recovery_state = RecoveryState::Conservation;
135 self.recovery_window = 0;
138 self.current_round_trip_end_packet_number = self.max_sent_packet_number;
141 }
142 RecoveryState::Growth | RecoveryState::Conservation => {
143 if self.recovery_state == RecoveryState::Conservation && is_round_start {
144 self.recovery_state = RecoveryState::Growth;
145 }
146 if !self.loss_state.has_losses()
148 && self.max_acked_packet_number > self.end_recovery_at_packet_number
149 {
150 self.recovery_state = RecoveryState::NotInRecovery;
151 }
152 }
153 _ => {}
154 }
155 }
156
157 fn update_gain_cycle_phase(&mut self, now: Instant, in_flight: u64) {
158 let mut should_advance_gain_cycling = self
160 .last_cycle_start
161 .map(|last_cycle_start| now.duration_since(last_cycle_start) > self.min_rtt)
162 .unwrap_or(false);
163 if self.pacing_gain > 1.0
169 && !self.loss_state.has_losses()
170 && self.prev_in_flight_count < self.get_target_cwnd(self.pacing_gain)
171 {
172 should_advance_gain_cycling = false;
173 }
174
175 if self.pacing_gain < 1.0 && in_flight <= self.get_target_cwnd(1.0) {
181 should_advance_gain_cycling = true;
182 }
183
184 if should_advance_gain_cycling {
185 self.current_cycle_offset = (self.current_cycle_offset + 1) % K_PACING_GAIN.len() as u8;
186 self.last_cycle_start = Some(now);
187 if DRAIN_TO_TARGET
190 && self.pacing_gain < 1.0
191 && (K_PACING_GAIN[self.current_cycle_offset as usize] - 1.0).abs() < f32::EPSILON
192 && in_flight > self.get_target_cwnd(1.0)
193 {
194 return;
195 }
196 self.pacing_gain = K_PACING_GAIN[self.current_cycle_offset as usize];
197 }
198 }
199
200 fn maybe_exit_startup_or_drain(&mut self, now: Instant, in_flight: u64) {
201 if self.mode == Mode::Startup && self.is_at_full_bandwidth {
202 self.mode = Mode::Drain;
203 self.pacing_gain = self.drain_gain;
204 self.cwnd_gain = self.high_cwnd_gain;
205 }
206 if self.mode == Mode::Drain && in_flight <= self.get_target_cwnd(1.0) {
207 self.enter_probe_bandwidth_mode(now);
208 }
209 }
210
211 fn is_min_rtt_expired(&self, now: Instant, app_limited: bool) -> bool {
212 !app_limited
213 && self
214 .probe_rtt_last_started_at
215 .map(|last| now.saturating_duration_since(last) > Duration::from_secs(10))
216 .unwrap_or(true)
217 }
218
219 fn maybe_enter_or_exit_probe_rtt(
220 &mut self,
221 now: Instant,
222 is_round_start: bool,
223 bytes_in_flight: u64,
224 app_limited: bool,
225 ) {
226 let min_rtt_expired = self.is_min_rtt_expired(now, app_limited);
227 if min_rtt_expired && !self.exiting_quiescence && self.mode != Mode::ProbeRtt {
228 self.mode = Mode::ProbeRtt;
229 self.pacing_gain = 1.0;
230 self.exit_probe_rtt_at = None;
233 self.probe_rtt_last_started_at = Some(now);
234 }
235
236 if self.mode == Mode::ProbeRtt {
237 if self.exit_probe_rtt_at.is_none() {
238 if bytes_in_flight < self.get_probe_rtt_cwnd() + self.current_mtu {
243 const K_PROBE_RTT_TIME: Duration = Duration::from_millis(200);
244 self.exit_probe_rtt_at = Some(now + K_PROBE_RTT_TIME);
245 }
246 } else if is_round_start && now >= self.exit_probe_rtt_at.unwrap() {
247 if !self.is_at_full_bandwidth {
248 self.enter_startup_mode();
249 } else {
250 self.enter_probe_bandwidth_mode(now);
251 }
252 }
253 }
254
255 self.exiting_quiescence = false;
256 }
257
258 fn get_target_cwnd(&self, gain: f32) -> u64 {
259 let bw = self.max_bandwidth.get_estimate();
260 let bdp = self.min_rtt.as_micros() as u64 * bw;
261 let bdpf = bdp as f64;
262 let cwnd = ((gain as f64 * bdpf) / 1_000_000f64) as u64;
263 if cwnd == 0 {
265 return self.init_cwnd;
266 }
267 cwnd.max(self.min_cwnd)
268 }
269
270 fn get_probe_rtt_cwnd(&self) -> u64 {
271 const K_MODERATE_PROBE_RTT_MULTIPLIER: f32 = 0.75;
272 if PROBE_RTT_BASED_ON_BDP {
273 return self.get_target_cwnd(K_MODERATE_PROBE_RTT_MULTIPLIER);
274 }
275 self.min_cwnd
276 }
277
278 fn calculate_pacing_rate(&mut self) {
279 let bw = self.max_bandwidth.get_estimate();
280 if bw == 0 {
281 return;
282 }
283 let target_rate = (bw as f64 * self.pacing_gain as f64) as u64;
284 if self.is_at_full_bandwidth {
285 self.pacing_rate = target_rate;
286 return;
287 }
288
289 if self.pacing_rate == 0 && self.min_rtt.as_nanos() != 0 {
292 self.pacing_rate =
293 BandwidthEstimation::bw_from_delta(self.init_cwnd, self.min_rtt).unwrap();
294 return;
295 }
296
297 if self.pacing_rate < target_rate {
299 self.pacing_rate = target_rate;
300 }
301 }
302
303 fn calculate_cwnd(&mut self, bytes_acked: u64, excess_acked: u64) {
304 if self.mode == Mode::ProbeRtt {
305 return;
306 }
307 let mut target_window = self.get_target_cwnd(self.cwnd_gain);
308 if self.is_at_full_bandwidth {
309 target_window += self.ack_aggregation.max_ack_height.get();
311 } else {
312 target_window += excess_acked;
315 }
316 if self.is_at_full_bandwidth {
320 self.cwnd = target_window.min(self.cwnd + bytes_acked);
321 } else if (self.cwnd_gain < target_window as f32) || (self.acked_bytes < self.init_cwnd) {
322 self.cwnd += bytes_acked;
325 }
326
327 if self.cwnd < self.min_cwnd {
329 self.cwnd = self.min_cwnd;
330 }
331 }
332
333 fn calculate_recovery_window(&mut self, bytes_acked: u64, bytes_lost: u64, in_flight: u64) {
334 if !self.recovery_state.in_recovery() {
335 return;
336 }
337 if self.recovery_window == 0 {
339 self.recovery_window = self.min_cwnd.max(in_flight + bytes_acked);
340 return;
341 }
342
343 if self.recovery_window >= bytes_lost {
346 self.recovery_window -= bytes_lost;
347 } else {
348 self.recovery_window = self.current_mtu;
350 }
351 if self.recovery_state == RecoveryState::Growth {
354 self.recovery_window += bytes_acked;
355 }
356
357 self.recovery_window = self
360 .recovery_window
361 .max(in_flight + bytes_acked)
362 .max(self.min_cwnd);
363 }
364
365 fn check_if_full_bw_reached(&mut self, app_limited: bool) {
367 if app_limited {
368 return;
369 }
370 let target = (self.bw_at_last_round as f64 * K_STARTUP_GROWTH_TARGET as f64) as u64;
371 let bw = self.max_bandwidth.get_estimate();
372 if bw >= target {
373 self.bw_at_last_round = bw;
374 self.round_wo_bw_gain = 0;
375 self.ack_aggregation.max_ack_height.reset();
376 return;
377 }
378
379 self.round_wo_bw_gain += 1;
380 if self.round_wo_bw_gain >= K_ROUND_TRIPS_WITHOUT_GROWTH_BEFORE_EXITING_STARTUP as u64
381 || (self.recovery_state.in_recovery())
382 {
383 self.is_at_full_bandwidth = true;
384 }
385 }
386}
387
388impl Controller for Bbr {
389 fn on_sent(&mut self, now: Instant, bytes: u64, last_packet_number: u64) {
390 self.max_sent_packet_number = last_packet_number;
391 self.max_bandwidth.on_sent(now, bytes);
392 }
393
394 fn on_ack(
395 &mut self,
396 now: Instant,
397 sent: Instant,
398 bytes: u64,
399 app_limited: bool,
400 rtt: &RttEstimator,
401 ) {
402 self.max_bandwidth
403 .on_ack(now, sent, bytes, self.round_count, app_limited);
404 self.acked_bytes += bytes;
405 if self.is_min_rtt_expired(now, app_limited) || self.min_rtt > rtt.min() {
406 self.min_rtt = rtt.min();
407 }
408 }
409
410 fn on_end_acks(
411 &mut self,
412 now: Instant,
413 in_flight: u64,
414 app_limited: bool,
415 largest_packet_num_acked: Option<u64>,
416 ) {
417 let bytes_acked = self.max_bandwidth.bytes_acked_this_window();
418 let excess_acked = self.ack_aggregation.update_ack_aggregation_bytes(
419 bytes_acked,
420 now,
421 self.round_count,
422 self.max_bandwidth.get_estimate(),
423 );
424 self.max_bandwidth.end_acks(self.round_count, app_limited);
425 if let Some(largest_acked_packet) = largest_packet_num_acked {
426 self.max_acked_packet_number = largest_acked_packet;
427 }
428
429 let mut is_round_start = false;
430 if bytes_acked > 0 {
431 is_round_start =
432 self.max_acked_packet_number > self.current_round_trip_end_packet_number;
433 if is_round_start {
434 self.current_round_trip_end_packet_number = self.max_sent_packet_number;
435 self.round_count += 1;
436 }
437 }
438
439 self.update_recovery_state(is_round_start);
440
441 if self.mode == Mode::ProbeBw {
442 self.update_gain_cycle_phase(now, in_flight);
443 }
444
445 if is_round_start && !self.is_at_full_bandwidth {
446 self.check_if_full_bw_reached(app_limited);
447 }
448
449 self.maybe_exit_startup_or_drain(now, in_flight);
450
451 self.maybe_enter_or_exit_probe_rtt(now, is_round_start, in_flight, app_limited);
452
453 self.calculate_pacing_rate();
455 self.calculate_cwnd(bytes_acked, excess_acked);
456 self.calculate_recovery_window(bytes_acked, self.loss_state.lost_bytes, in_flight);
457
458 self.prev_in_flight_count = in_flight;
459 self.loss_state.reset();
460 }
461
462 fn on_congestion_event(
463 &mut self,
464 _now: Instant,
465 _sent: Instant,
466 _is_persistent_congestion: bool,
467 lost_bytes: u64,
468 ) {
469 self.loss_state.lost_bytes += lost_bytes;
470 }
471
472 fn on_mtu_update(&mut self, new_mtu: u16) {
473 self.current_mtu = new_mtu as u64;
474 self.min_cwnd = calculate_min_window(self.current_mtu);
475 self.init_cwnd = self.config.initial_window.max(self.min_cwnd);
476 self.cwnd = self.cwnd.max(self.min_cwnd);
477 }
478
479 fn window(&self) -> u64 {
480 if self.mode == Mode::ProbeRtt {
481 return self.get_probe_rtt_cwnd();
482 } else if self.recovery_state.in_recovery() && self.mode != Mode::Startup {
483 return self.cwnd.min(self.recovery_window);
484 }
485 self.cwnd
486 }
487
488 fn clone_box(&self) -> Box<dyn Controller> {
489 Box::new(self.clone())
490 }
491
492 fn initial_window(&self) -> u64 {
493 self.config.initial_window
494 }
495
496 fn into_any(self: Box<Self>) -> Box<dyn Any> {
497 self
498 }
499}
500
501#[derive(Debug, Clone)]
503pub struct BbrConfig {
504 initial_window: u64,
505}
506
507impl BbrConfig {
508 pub fn initial_window(&mut self, value: u64) -> &mut Self {
512 self.initial_window = value;
513 self
514 }
515}
516
517impl Default for BbrConfig {
518 fn default() -> Self {
519 Self {
520 initial_window: K_MAX_INITIAL_CONGESTION_WINDOW * BASE_DATAGRAM_SIZE,
521 }
522 }
523}
524
525impl ControllerFactory for BbrConfig {
526 fn build(self: Arc<Self>, _now: Instant, current_mtu: u16) -> Box<dyn Controller> {
527 Box::new(Bbr::new(self, current_mtu))
528 }
529}
530
531#[derive(Debug, Default, Copy, Clone)]
532struct AckAggregationState {
533 max_ack_height: MinMax,
534 aggregation_epoch_start_time: Option<Instant>,
535 aggregation_epoch_bytes: u64,
536}
537
538impl AckAggregationState {
539 fn update_ack_aggregation_bytes(
540 &mut self,
541 newly_acked_bytes: u64,
542 now: Instant,
543 round: u64,
544 max_bandwidth: u64,
545 ) -> u64 {
546 let expected_bytes_acked = max_bandwidth
549 * now
550 .saturating_duration_since(self.aggregation_epoch_start_time.unwrap_or(now))
551 .as_micros() as u64
552 / 1_000_000;
553
554 if self.aggregation_epoch_bytes <= expected_bytes_acked {
557 self.aggregation_epoch_bytes = newly_acked_bytes;
559 self.aggregation_epoch_start_time = Some(now);
560 return 0;
561 }
562
563 self.aggregation_epoch_bytes += newly_acked_bytes;
566 let diff = self.aggregation_epoch_bytes - expected_bytes_acked;
567 self.max_ack_height.update_max(round, diff);
568 diff
569 }
570}
571
572#[derive(Debug, Clone, Copy, Eq, PartialEq)]
573enum Mode {
574 Startup,
576 Drain,
579 ProbeBw,
581 ProbeRtt,
584}
585
586#[derive(Debug, Clone, Copy, Eq, PartialEq)]
588enum RecoveryState {
589 NotInRecovery,
591 Conservation,
593 Growth,
596}
597
598impl RecoveryState {
599 pub(super) fn in_recovery(&self) -> bool {
600 !matches!(self, Self::NotInRecovery)
601 }
602}
603
604#[derive(Debug, Clone, Default)]
605struct LossState {
606 lost_bytes: u64,
607}
608
609impl LossState {
610 pub(super) fn reset(&mut self) {
611 self.lost_bytes = 0;
612 }
613
614 pub(super) fn has_losses(&self) -> bool {
615 self.lost_bytes != 0
616 }
617}
618
619fn calculate_min_window(current_mtu: u64) -> u64 {
620 4 * current_mtu
621}
622
623const K_DEFAULT_HIGH_GAIN: f32 = 2.885;
625const K_DERIVED_HIGH_CWNDGAIN: f32 = 2.0;
627const K_PACING_GAIN: [f32; 8] = [1.25, 0.75, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0];
629
630const K_STARTUP_GROWTH_TARGET: f32 = 1.25;
631const K_ROUND_TRIPS_WITHOUT_GROWTH_BEFORE_EXITING_STARTUP: u8 = 3;
632
633const K_MAX_INITIAL_CONGESTION_WINDOW: u64 = 200;
635
636const PROBE_RTT_BASED_ON_BDP: bool = true;
637const DRAIN_TO_TARGET: bool = true;