quinn_proto/connection/
paths.rs

1use std::{cmp, net::SocketAddr};
2
3use tracing::trace;
4
5use super::{
6    mtud::MtuDiscovery,
7    pacing::Pacer,
8    spaces::{PacketSpace, SentPacket},
9};
10use crate::{Duration, Instant, TIMER_GRANULARITY, TransportConfig, congestion, packet::SpaceId};
11
12/// Description of a particular network path
13pub(super) struct PathData {
14    pub(super) remote: SocketAddr,
15    pub(super) rtt: RttEstimator,
16    /// Whether we're enabling ECN on outgoing packets
17    pub(super) sending_ecn: bool,
18    /// Congestion controller state
19    pub(super) congestion: Box<dyn congestion::Controller>,
20    /// Pacing state
21    pub(super) pacing: Pacer,
22    pub(super) challenge: Option<u64>,
23    pub(super) challenge_pending: bool,
24    /// Whether we're certain the peer can both send and receive on this address
25    ///
26    /// Initially equal to `use_stateless_retry` for servers, and becomes false again on every
27    /// migration. Always true for clients.
28    pub(super) validated: bool,
29    /// Total size of all UDP datagrams sent on this path
30    pub(super) total_sent: u64,
31    /// Total size of all UDP datagrams received on this path
32    pub(super) total_recvd: u64,
33    /// The state of the MTU discovery process
34    pub(super) mtud: MtuDiscovery,
35    /// Packet number of the first packet sent after an RTT sample was collected on this path
36    ///
37    /// Used in persistent congestion determination.
38    pub(super) first_packet_after_rtt_sample: Option<(SpaceId, u64)>,
39    pub(super) in_flight: InFlight,
40    /// Number of the first packet sent on this path
41    ///
42    /// Used to determine whether a packet was sent on an earlier path. Insufficient to determine if
43    /// a packet was sent on a later path.
44    first_packet: Option<u64>,
45}
46
47impl PathData {
48    pub(super) fn new(
49        remote: SocketAddr,
50        allow_mtud: bool,
51        peer_max_udp_payload_size: Option<u16>,
52        now: Instant,
53        config: &TransportConfig,
54    ) -> Self {
55        let congestion = config
56            .congestion_controller_factory
57            .clone()
58            .build(now, config.get_initial_mtu());
59        Self {
60            remote,
61            rtt: RttEstimator::new(config.initial_rtt),
62            sending_ecn: true,
63            pacing: Pacer::new(
64                config.initial_rtt,
65                congestion.initial_window(),
66                config.get_initial_mtu(),
67                now,
68            ),
69            congestion,
70            challenge: None,
71            challenge_pending: false,
72            validated: false,
73            total_sent: 0,
74            total_recvd: 0,
75            mtud: config
76                .mtu_discovery_config
77                .as_ref()
78                .filter(|_| allow_mtud)
79                .map_or(
80                    MtuDiscovery::disabled(config.get_initial_mtu(), config.min_mtu),
81                    |mtud_config| {
82                        MtuDiscovery::new(
83                            config.get_initial_mtu(),
84                            config.min_mtu,
85                            peer_max_udp_payload_size,
86                            mtud_config.clone(),
87                        )
88                    },
89                ),
90            first_packet_after_rtt_sample: None,
91            in_flight: InFlight::new(),
92            first_packet: None,
93        }
94    }
95
96    pub(super) fn from_previous(remote: SocketAddr, prev: &Self, now: Instant) -> Self {
97        let congestion = prev.congestion.clone_box();
98        let smoothed_rtt = prev.rtt.get();
99        Self {
100            remote,
101            rtt: prev.rtt,
102            pacing: Pacer::new(smoothed_rtt, congestion.window(), prev.current_mtu(), now),
103            sending_ecn: true,
104            congestion,
105            challenge: None,
106            challenge_pending: false,
107            validated: false,
108            total_sent: 0,
109            total_recvd: 0,
110            mtud: prev.mtud.clone(),
111            first_packet_after_rtt_sample: prev.first_packet_after_rtt_sample,
112            in_flight: InFlight::new(),
113            first_packet: None,
114        }
115    }
116
117    /// Resets RTT, congestion control and MTU states.
118    ///
119    /// This is useful when it is known the underlying path has changed.
120    pub(super) fn reset(&mut self, now: Instant, config: &TransportConfig) {
121        self.rtt = RttEstimator::new(config.initial_rtt);
122        self.congestion = config
123            .congestion_controller_factory
124            .clone()
125            .build(now, config.get_initial_mtu());
126        self.mtud.reset(config.get_initial_mtu(), config.min_mtu);
127    }
128
129    /// Indicates whether we're a server that hasn't validated the peer's address and hasn't
130    /// received enough data from the peer to permit sending `bytes_to_send` additional bytes
131    pub(super) fn anti_amplification_blocked(&self, bytes_to_send: u64) -> bool {
132        !self.validated && self.total_recvd * 3 < self.total_sent + bytes_to_send
133    }
134
135    /// Returns the path's current MTU
136    pub(super) fn current_mtu(&self) -> u16 {
137        self.mtud.current_mtu()
138    }
139
140    /// Account for transmission of `packet` with number `pn` in `space`
141    pub(super) fn sent(&mut self, pn: u64, packet: SentPacket, space: &mut PacketSpace) {
142        self.in_flight.insert(&packet);
143        if self.first_packet.is_none() {
144            self.first_packet = Some(pn);
145        }
146        self.in_flight.bytes -= space.sent(pn, packet);
147    }
148
149    /// Remove `packet` with number `pn` from this path's congestion control counters, or return
150    /// `false` if `pn` was sent before this path was established.
151    pub(super) fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) -> bool {
152        if self.first_packet.map_or(true, |first| first > pn) {
153            return false;
154        }
155        self.in_flight.remove(packet);
156        true
157    }
158}
159
160/// RTT estimation for a particular network path
161#[derive(Copy, Clone)]
162pub struct RttEstimator {
163    /// The most recent RTT measurement made when receiving an ack for a previously unacked packet
164    latest: Duration,
165    /// The smoothed RTT of the connection, computed as described in RFC6298
166    smoothed: Option<Duration>,
167    /// The RTT variance, computed as described in RFC6298
168    var: Duration,
169    /// The minimum RTT seen in the connection, ignoring ack delay.
170    min: Duration,
171}
172
173impl RttEstimator {
174    fn new(initial_rtt: Duration) -> Self {
175        Self {
176            latest: initial_rtt,
177            smoothed: None,
178            var: initial_rtt / 2,
179            min: initial_rtt,
180        }
181    }
182
183    /// The current best RTT estimation.
184    pub fn get(&self) -> Duration {
185        self.smoothed.unwrap_or(self.latest)
186    }
187
188    /// Conservative estimate of RTT
189    ///
190    /// Takes the maximum of smoothed and latest RTT, as recommended
191    /// in 6.1.2 of the recovery spec (draft 29).
192    pub fn conservative(&self) -> Duration {
193        self.get().max(self.latest)
194    }
195
196    /// Minimum RTT registered so far for this estimator.
197    pub fn min(&self) -> Duration {
198        self.min
199    }
200
201    // PTO computed as described in RFC9002#6.2.1
202    pub(crate) fn pto_base(&self) -> Duration {
203        self.get() + cmp::max(4 * self.var, TIMER_GRANULARITY)
204    }
205
206    pub(crate) fn update(&mut self, ack_delay: Duration, rtt: Duration) {
207        self.latest = rtt;
208        // min_rtt ignores ack delay.
209        self.min = cmp::min(self.min, self.latest);
210        // Based on RFC6298.
211        if let Some(smoothed) = self.smoothed {
212            let adjusted_rtt = if self.min + ack_delay <= self.latest {
213                self.latest - ack_delay
214            } else {
215                self.latest
216            };
217            let var_sample = if smoothed > adjusted_rtt {
218                smoothed - adjusted_rtt
219            } else {
220                adjusted_rtt - smoothed
221            };
222            self.var = (3 * self.var + var_sample) / 4;
223            self.smoothed = Some((7 * smoothed + adjusted_rtt) / 8);
224        } else {
225            self.smoothed = Some(self.latest);
226            self.var = self.latest / 2;
227            self.min = self.latest;
228        }
229    }
230}
231
232#[derive(Default)]
233pub(crate) struct PathResponses {
234    pending: Vec<PathResponse>,
235}
236
237impl PathResponses {
238    pub(crate) fn push(&mut self, packet: u64, token: u64, remote: SocketAddr) {
239        /// Arbitrary permissive limit to prevent abuse
240        const MAX_PATH_RESPONSES: usize = 16;
241        let response = PathResponse {
242            packet,
243            token,
244            remote,
245        };
246        let existing = self.pending.iter_mut().find(|x| x.remote == remote);
247        if let Some(existing) = existing {
248            // Update a queued response
249            if existing.packet <= packet {
250                *existing = response;
251            }
252            return;
253        }
254        if self.pending.len() < MAX_PATH_RESPONSES {
255            self.pending.push(response);
256        } else {
257            // We don't expect to ever hit this with well-behaved peers, so we don't bother dropping
258            // older challenges.
259            trace!("ignoring excessive PATH_CHALLENGE");
260        }
261    }
262
263    pub(crate) fn pop_off_path(&mut self, remote: SocketAddr) -> Option<(u64, SocketAddr)> {
264        let response = *self.pending.last()?;
265        if response.remote == remote {
266            // We don't bother searching further because we expect that the on-path response will
267            // get drained in the immediate future by a call to `pop_on_path`
268            return None;
269        }
270        self.pending.pop();
271        Some((response.token, response.remote))
272    }
273
274    pub(crate) fn pop_on_path(&mut self, remote: SocketAddr) -> Option<u64> {
275        let response = *self.pending.last()?;
276        if response.remote != remote {
277            // We don't bother searching further because we expect that the off-path response will
278            // get drained in the immediate future by a call to `pop_off_path`
279            return None;
280        }
281        self.pending.pop();
282        Some(response.token)
283    }
284
285    pub(crate) fn is_empty(&self) -> bool {
286        self.pending.is_empty()
287    }
288}
289
290#[derive(Copy, Clone)]
291struct PathResponse {
292    /// The packet number the corresponding PATH_CHALLENGE was received in
293    packet: u64,
294    token: u64,
295    /// The address the corresponding PATH_CHALLENGE was received from
296    remote: SocketAddr,
297}
298
299/// Summary statistics of packets that have been sent on a particular path, but which have not yet
300/// been acked or deemed lost
301pub(super) struct InFlight {
302    /// Sum of the sizes of all sent packets considered "in flight" by congestion control
303    ///
304    /// The size does not include IP or UDP overhead. Packets only containing ACK frames do not
305    /// count towards this to ensure congestion control does not impede congestion feedback.
306    pub(super) bytes: u64,
307    /// Number of packets in flight containing frames other than ACK and PADDING
308    ///
309    /// This can be 0 even when bytes is not 0 because PADDING frames cause a packet to be
310    /// considered "in flight" by congestion control. However, if this is nonzero, bytes will always
311    /// also be nonzero.
312    pub(super) ack_eliciting: u64,
313}
314
315impl InFlight {
316    fn new() -> Self {
317        Self {
318            bytes: 0,
319            ack_eliciting: 0,
320        }
321    }
322
323    fn insert(&mut self, packet: &SentPacket) {
324        self.bytes += u64::from(packet.size);
325        self.ack_eliciting += u64::from(packet.ack_eliciting);
326    }
327
328    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
329    fn remove(&mut self, packet: &SentPacket) {
330        self.bytes -= u64::from(packet.size);
331        self.ack_eliciting -= u64::from(packet.ack_eliciting);
332    }
333}