quinn_proto/connection/
mod.rs

1use std::{
2    cmp,
3    collections::VecDeque,
4    convert::TryFrom,
5    fmt, io, mem,
6    net::{IpAddr, SocketAddr},
7    sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{Rng, SeedableRng, rngs::StdRng};
13use thiserror::Error;
14use tracing::{debug, error, trace, trace_span, warn};
15
16use crate::{
17    Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
18    MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
19    TransportErrorCode, VarInt,
20    cid_generator::ConnectionIdGenerator,
21    cid_queue::CidQueue,
22    coding::BufMutExt,
23    config::{ServerConfig, TransportConfig},
24    crypto::{self, KeyPair, Keys, PacketKey},
25    frame::{self, Close, Datagram, FrameStruct, NewToken},
26    packet::{
27        FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
28        PacketNumber, PartialDecode, SpaceId,
29    },
30    range_set::ArrayRangeSet,
31    shared::{
32        ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
33        EndpointEvent, EndpointEventInner,
34    },
35    token::{ResetToken, Token, TokenPayload},
36    transport_parameters::TransportParameters,
37};
38
39mod ack_frequency;
40use ack_frequency::AckFrequencyState;
41
42mod assembler;
43pub use assembler::Chunk;
44
45mod cid_state;
46use cid_state::CidState;
47
48mod datagrams;
49use datagrams::DatagramState;
50pub use datagrams::{Datagrams, SendDatagramError};
51
52mod mtud;
53mod pacing;
54
55mod packet_builder;
56use packet_builder::PacketBuilder;
57
58mod packet_crypto;
59use packet_crypto::{PrevCrypto, ZeroRttCrypto};
60
61mod paths;
62pub use paths::RttEstimator;
63use paths::{PathData, PathResponses};
64
65mod send_buffer;
66
67mod spaces;
68#[cfg(fuzzing)]
69pub use spaces::Retransmits;
70#[cfg(not(fuzzing))]
71use spaces::Retransmits;
72use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
73
74mod stats;
75pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
76
77mod streams;
78#[cfg(fuzzing)]
79pub use streams::StreamsState;
80#[cfg(not(fuzzing))]
81use streams::StreamsState;
82pub use streams::{
83    BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream,
84    SendStream, ShouldTransmit, StreamEvent, Streams, WriteError, Written,
85};
86
87mod timer;
88use crate::congestion::Controller;
89use timer::{Timer, TimerTable};
90
91/// Protocol state and logic for a single QUIC connection
92///
93/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
94/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
95/// expects timeouts through various methods. A number of simple getter methods are exposed
96/// to allow callers to inspect some of the connection state.
97///
98/// `Connection` has roughly 4 types of methods:
99///
100/// - A. Simple getters, taking `&self`
101/// - B. Handlers for incoming events from the network or system, named `handle_*`.
102/// - C. State machine mutators, for incoming commands from the application. For convenience we
103///   refer to this as "performing I/O" below, however as per the design of this library none of the
104///   functions actually perform system-level I/O. For example, [`read`](RecvStream::read) and
105///   [`write`](SendStream::write), but also things like [`reset`](SendStream::reset).
106/// - D. Polling functions for outgoing events or actions for the caller to
107///   take, named `poll_*`.
108///
109/// The simplest way to use this API correctly is to call (B) and (C) whenever
110/// appropriate, then after each of those calls, as soon as feasible call all
111/// polling methods (D) and deal with their outputs appropriately, e.g. by
112/// passing it to the application or by making a system-level I/O call. You
113/// should call the polling functions in this order:
114///
115/// 1. [`poll_transmit`](Self::poll_transmit)
116/// 2. [`poll_timeout`](Self::poll_timeout)
117/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
118/// 4. [`poll`](Self::poll)
119///
120/// Currently the only actual dependency is from (2) to (1), however additional
121/// dependencies may be added in future, so the above order is recommended.
122///
123/// (A) may be called whenever desired.
124///
125/// Care should be made to ensure that the input events represent monotonically
126/// increasing time. Specifically, calling [`handle_timeout`](Self::handle_timeout)
127/// with events of the same [`Instant`] may be interleaved in any order with a
128/// call to [`handle_event`](Self::handle_event) at that same instant; however
129/// events or timeouts with different instants must not be interleaved.
130pub struct Connection {
131    endpoint_config: Arc<EndpointConfig>,
132    config: Arc<TransportConfig>,
133    rng: StdRng,
134    crypto: Box<dyn crypto::Session>,
135    /// The CID we initially chose, for use during the handshake
136    handshake_cid: ConnectionId,
137    /// The CID the peer initially chose, for use during the handshake
138    rem_handshake_cid: ConnectionId,
139    /// The "real" local IP address which was was used to receive the initial packet.
140    /// This is only populated for the server case, and if known
141    local_ip: Option<IpAddr>,
142    path: PathData,
143    /// Whether MTU detection is supported in this environment
144    allow_mtud: bool,
145    prev_path: Option<(ConnectionId, PathData)>,
146    state: State,
147    side: ConnectionSide,
148    /// Whether or not 0-RTT was enabled during the handshake. Does not imply acceptance.
149    zero_rtt_enabled: bool,
150    /// Set if 0-RTT is supported, then cleared when no longer needed.
151    zero_rtt_crypto: Option<ZeroRttCrypto>,
152    key_phase: bool,
153    /// How many packets are in the current key phase. Used only for `Data` space.
154    key_phase_size: u64,
155    /// Transport parameters set by the peer
156    peer_params: TransportParameters,
157    /// Source ConnectionId of the first packet received from the peer
158    orig_rem_cid: ConnectionId,
159    /// Destination ConnectionId sent by the client on the first Initial
160    initial_dst_cid: ConnectionId,
161    /// The value that the server included in the Source Connection ID field of a Retry packet, if
162    /// one was received
163    retry_src_cid: Option<ConnectionId>,
164    /// Total number of outgoing packets that have been deemed lost
165    lost_packets: u64,
166    events: VecDeque<Event>,
167    endpoint_events: VecDeque<EndpointEventInner>,
168    /// Whether the spin bit is in use for this connection
169    spin_enabled: bool,
170    /// Outgoing spin bit state
171    spin: bool,
172    /// Packet number spaces: initial, handshake, 1-RTT
173    spaces: [PacketSpace; 3],
174    /// Highest usable packet number space
175    highest_space: SpaceId,
176    /// 1-RTT keys used prior to a key update
177    prev_crypto: Option<PrevCrypto>,
178    /// 1-RTT keys to be used for the next key update
179    ///
180    /// These are generated in advance to prevent timing attacks and/or DoS by third-party attackers
181    /// spoofing key updates.
182    next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
183    accepted_0rtt: bool,
184    /// Whether the idle timer should be reset the next time an ack-eliciting packet is transmitted.
185    permit_idle_reset: bool,
186    /// Negotiated idle timeout
187    idle_timeout: Option<Duration>,
188    timers: TimerTable,
189    /// Number of packets received which could not be authenticated
190    authentication_failures: u64,
191    /// Why the connection was lost, if it has been
192    error: Option<ConnectionError>,
193    /// Identifies Data-space packet numbers to skip. Not used in earlier spaces.
194    packet_number_filter: PacketNumberFilter,
195
196    //
197    // Queued non-retransmittable 1-RTT data
198    //
199    /// Responses to PATH_CHALLENGE frames
200    path_responses: PathResponses,
201    close: bool,
202
203    //
204    // ACK frequency
205    //
206    ack_frequency: AckFrequencyState,
207
208    //
209    // Loss Detection
210    //
211    /// The number of times a PTO has been sent without receiving an ack.
212    pto_count: u32,
213
214    //
215    // Congestion Control
216    //
217    /// Whether the most recently received packet had an ECN codepoint set
218    receiving_ecn: bool,
219    /// Number of packets authenticated
220    total_authed_packets: u64,
221    /// Whether the last `poll_transmit` call yielded no data because there was
222    /// no outgoing application data.
223    app_limited: bool,
224
225    streams: StreamsState,
226    /// Surplus remote CIDs for future use on new paths
227    rem_cids: CidQueue,
228    // Attributes of CIDs generated by local peer
229    local_cid_state: CidState,
230    /// State of the unreliable datagram extension
231    datagrams: DatagramState,
232    /// Connection level statistics
233    stats: ConnectionStats,
234    /// QUIC version used for the connection.
235    version: u32,
236}
237
238impl Connection {
239    pub(crate) fn new(
240        endpoint_config: Arc<EndpointConfig>,
241        config: Arc<TransportConfig>,
242        init_cid: ConnectionId,
243        loc_cid: ConnectionId,
244        rem_cid: ConnectionId,
245        remote: SocketAddr,
246        local_ip: Option<IpAddr>,
247        crypto: Box<dyn crypto::Session>,
248        cid_gen: &dyn ConnectionIdGenerator,
249        now: Instant,
250        version: u32,
251        allow_mtud: bool,
252        rng_seed: [u8; 32],
253        side_args: SideArgs,
254    ) -> Self {
255        let pref_addr_cid = side_args.pref_addr_cid();
256        let path_validated = side_args.path_validated();
257        let connection_side = ConnectionSide::from(side_args);
258        let side = connection_side.side();
259        let initial_space = PacketSpace {
260            crypto: Some(crypto.initial_keys(&init_cid, side)),
261            ..PacketSpace::new(now)
262        };
263        let state = State::Handshake(state::Handshake {
264            rem_cid_set: side.is_server(),
265            expected_token: Bytes::new(),
266            client_hello: None,
267        });
268        let mut rng = StdRng::from_seed(rng_seed);
269        let mut this = Self {
270            endpoint_config,
271            crypto,
272            handshake_cid: loc_cid,
273            rem_handshake_cid: rem_cid,
274            local_cid_state: CidState::new(
275                cid_gen.cid_len(),
276                cid_gen.cid_lifetime(),
277                now,
278                if pref_addr_cid.is_some() { 2 } else { 1 },
279            ),
280            path: PathData::new(remote, allow_mtud, None, now, &config),
281            allow_mtud,
282            local_ip,
283            prev_path: None,
284            state,
285            side: connection_side,
286            zero_rtt_enabled: false,
287            zero_rtt_crypto: None,
288            key_phase: false,
289            // A small initial key phase size ensures peers that don't handle key updates correctly
290            // fail sooner rather than later. It's okay for both peers to do this, as the first one
291            // to perform an update will reset the other's key phase size in `update_keys`, and a
292            // simultaneous key update by both is just like a regular key update with a really fast
293            // response. Inspired by quic-go's similar behavior of performing the first key update
294            // at the 100th short-header packet.
295            key_phase_size: rng.random_range(10..1000),
296            peer_params: TransportParameters::default(),
297            orig_rem_cid: rem_cid,
298            initial_dst_cid: init_cid,
299            retry_src_cid: None,
300            lost_packets: 0,
301            events: VecDeque::new(),
302            endpoint_events: VecDeque::new(),
303            spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
304            spin: false,
305            spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
306            highest_space: SpaceId::Initial,
307            prev_crypto: None,
308            next_crypto: None,
309            accepted_0rtt: false,
310            permit_idle_reset: true,
311            idle_timeout: match config.max_idle_timeout {
312                None | Some(VarInt(0)) => None,
313                Some(dur) => Some(Duration::from_millis(dur.0)),
314            },
315            timers: TimerTable::default(),
316            authentication_failures: 0,
317            error: None,
318            #[cfg(test)]
319            packet_number_filter: match config.deterministic_packet_numbers {
320                false => PacketNumberFilter::new(&mut rng),
321                true => PacketNumberFilter::disabled(),
322            },
323            #[cfg(not(test))]
324            packet_number_filter: PacketNumberFilter::new(&mut rng),
325
326            path_responses: PathResponses::default(),
327            close: false,
328
329            ack_frequency: AckFrequencyState::new(get_max_ack_delay(
330                &TransportParameters::default(),
331            )),
332
333            pto_count: 0,
334
335            app_limited: false,
336            receiving_ecn: false,
337            total_authed_packets: 0,
338
339            streams: StreamsState::new(
340                side,
341                config.max_concurrent_uni_streams,
342                config.max_concurrent_bidi_streams,
343                config.send_window,
344                config.receive_window,
345                config.stream_receive_window,
346            ),
347            datagrams: DatagramState::default(),
348            config,
349            rem_cids: CidQueue::new(rem_cid),
350            rng,
351            stats: ConnectionStats::default(),
352            version,
353        };
354        if path_validated {
355            this.on_path_validated();
356        }
357        if side.is_client() {
358            // Kick off the connection
359            this.write_crypto();
360            this.init_0rtt();
361        }
362        this
363    }
364
365    /// Returns the next time at which `handle_timeout` should be called
366    ///
367    /// The value returned may change after:
368    /// - the application performed some I/O on the connection
369    /// - a call was made to `handle_event`
370    /// - a call to `poll_transmit` returned `Some`
371    /// - a call was made to `handle_timeout`
372    #[must_use]
373    pub fn poll_timeout(&mut self) -> Option<Instant> {
374        self.timers.next_timeout()
375    }
376
377    /// Returns application-facing events
378    ///
379    /// Connections should be polled for events after:
380    /// - a call was made to `handle_event`
381    /// - a call was made to `handle_timeout`
382    #[must_use]
383    pub fn poll(&mut self) -> Option<Event> {
384        if let Some(x) = self.events.pop_front() {
385            return Some(x);
386        }
387
388        if let Some(event) = self.streams.poll() {
389            return Some(Event::Stream(event));
390        }
391
392        if let Some(err) = self.error.take() {
393            return Some(Event::ConnectionLost { reason: err });
394        }
395
396        None
397    }
398
399    /// Return endpoint-facing events
400    #[must_use]
401    pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
402        self.endpoint_events.pop_front().map(EndpointEvent)
403    }
404
405    /// Provide control over streams
406    #[must_use]
407    pub fn streams(&mut self) -> Streams<'_> {
408        Streams {
409            state: &mut self.streams,
410            conn_state: &self.state,
411        }
412    }
413
414    /// Provide control over streams
415    #[must_use]
416    pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
417        assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
418        RecvStream {
419            id,
420            state: &mut self.streams,
421            pending: &mut self.spaces[SpaceId::Data].pending,
422        }
423    }
424
425    /// Provide control over streams
426    #[must_use]
427    pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
428        assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
429        SendStream {
430            id,
431            state: &mut self.streams,
432            pending: &mut self.spaces[SpaceId::Data].pending,
433            conn_state: &self.state,
434        }
435    }
436
437    /// Returns packets to transmit
438    ///
439    /// Connections should be polled for transmit after:
440    /// - the application performed some I/O on the connection
441    /// - a call was made to `handle_event`
442    /// - a call was made to `handle_timeout`
443    ///
444    /// `max_datagrams` specifies how many datagrams can be returned inside a
445    /// single Transmit using GSO. This must be at least 1.
446    #[must_use]
447    pub fn poll_transmit(
448        &mut self,
449        now: Instant,
450        max_datagrams: usize,
451        buf: &mut Vec<u8>,
452    ) -> Option<Transmit> {
453        assert!(max_datagrams != 0);
454        let max_datagrams = match self.config.enable_segmentation_offload {
455            false => 1,
456            true => max_datagrams,
457        };
458
459        let mut num_datagrams = 0;
460        // Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
461        // packets, this can be earlier than the start of the current QUIC packet.
462        let mut datagram_start = 0;
463        let mut segment_size = usize::from(self.path.current_mtu());
464
465        if let Some(challenge) = self.send_path_challenge(now, buf) {
466            return Some(challenge);
467        }
468
469        // If we need to send a probe, make sure we have something to send.
470        for space in SpaceId::iter() {
471            let request_immediate_ack =
472                space == SpaceId::Data && self.peer_supports_ack_frequency();
473            self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
474        }
475
476        // Check whether we need to send a close message
477        let close = match self.state {
478            State::Drained => {
479                self.app_limited = true;
480                return None;
481            }
482            State::Draining | State::Closed(_) => {
483                // self.close is only reset once the associated packet had been
484                // encoded successfully
485                if !self.close {
486                    self.app_limited = true;
487                    return None;
488                }
489                true
490            }
491            _ => false,
492        };
493
494        // Check whether we need to send an ACK_FREQUENCY frame
495        if let Some(config) = &self.config.ack_frequency_config {
496            self.spaces[SpaceId::Data].pending.ack_frequency = self
497                .ack_frequency
498                .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
499                && self.highest_space == SpaceId::Data
500                && self.peer_supports_ack_frequency();
501        }
502
503        // Reserving capacity can provide more capacity than we asked for. However, we are not
504        // allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
505        // separately.
506        let mut buf_capacity = 0;
507
508        let mut coalesce = true;
509        let mut builder_storage: Option<PacketBuilder> = None;
510        let mut sent_frames = None;
511        let mut pad_datagram = false;
512        let mut congestion_blocked = false;
513
514        // Iterate over all spaces and find data to send
515        let mut space_idx = 0;
516        let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
517        // This loop will potentially spend multiple iterations in the same `SpaceId`,
518        // so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
519        while space_idx < spaces.len() {
520            let space_id = spaces[space_idx];
521            // Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
522            // be able to send an individual frame at least this large in the next 1-RTT
523            // packet. This could be generalized to support every space, but it's only needed to
524            // handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
525            // don't account for coalesced packets potentially occupying space because frames can
526            // always spill into the next datagram.
527            let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
528            let frame_space_1rtt =
529                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
530
531            // Is there data or a close message to send in this space?
532            let can_send = self.space_can_send(space_id, frame_space_1rtt);
533            if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
534                space_idx += 1;
535                continue;
536            }
537
538            let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
539                || self.spaces[space_id].ping_pending
540                || self.spaces[space_id].immediate_ack_pending;
541            if space_id == SpaceId::Data {
542                ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
543            }
544
545            // Can we append more data into the current buffer?
546            // It is not safe to assume that `buf.len()` is the end of the data,
547            // since the last packet might not have been finished.
548            let buf_end = if let Some(builder) = &builder_storage {
549                buf.len().max(builder.min_size) + builder.tag_len
550            } else {
551                buf.len()
552            };
553
554            let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
555                crypto.packet.local.tag_len()
556            } else if space_id == SpaceId::Data {
557                self.zero_rtt_crypto.as_ref().expect(
558                    "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
559                ).packet.tag_len()
560            } else {
561                unreachable!("tried to send {:?} packet without keys", space_id)
562            };
563            if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
564                // We need to send 1 more datagram and extend the buffer for that.
565
566                // Is 1 more datagram allowed?
567                if num_datagrams >= max_datagrams {
568                    // No more datagrams allowed
569                    break;
570                }
571
572                // Anti-amplification is only based on `total_sent`, which gets
573                // updated at the end of this method. Therefore we pass the amount
574                // of bytes for datagrams that are already created, as well as 1 byte
575                // for starting another datagram. If there is any anti-amplification
576                // budget left, we always allow a full MTU to be sent
577                // (see https://github.com/quinn-rs/quinn/issues/1082)
578                if self
579                    .path
580                    .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
581                {
582                    trace!("blocked by anti-amplification");
583                    break;
584                }
585
586                // Congestion control and pacing checks
587                // Tail loss probes must not be blocked by congestion, or a deadlock could arise
588                if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
589                    // Assume the current packet will get padded to fill the segment
590                    let untracked_bytes = if let Some(builder) = &builder_storage {
591                        buf_capacity - builder.partial_encode.start
592                    } else {
593                        0
594                    } as u64;
595                    debug_assert!(untracked_bytes <= segment_size as u64);
596
597                    let bytes_to_send = segment_size as u64 + untracked_bytes;
598                    if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
599                        space_idx += 1;
600                        congestion_blocked = true;
601                        // We continue instead of breaking here in order to avoid
602                        // blocking loss probes queued for higher spaces.
603                        trace!("blocked by congestion control");
604                        continue;
605                    }
606
607                    // Check whether the next datagram is blocked by pacing
608                    let smoothed_rtt = self.path.rtt.get();
609                    if let Some(delay) = self.path.pacing.delay(
610                        smoothed_rtt,
611                        bytes_to_send,
612                        self.path.current_mtu(),
613                        self.path.congestion.window(),
614                        now,
615                    ) {
616                        self.timers.set(Timer::Pacing, delay);
617                        congestion_blocked = true;
618                        // Loss probes should be subject to pacing, even though
619                        // they are not congestion controlled.
620                        trace!("blocked by pacing");
621                        break;
622                    }
623                }
624
625                // Finish current packet
626                if let Some(mut builder) = builder_storage.take() {
627                    if pad_datagram {
628                        builder.pad_to(MIN_INITIAL_SIZE);
629                    }
630
631                    if num_datagrams > 1 {
632                        // If too many padding bytes would be required to continue the GSO batch
633                        // after this packet, end the GSO batch here. Ensures that fixed-size frames
634                        // with heterogeneous sizes (e.g. application datagrams) won't inadvertently
635                        // waste large amounts of bandwidth. The exact threshold is a bit arbitrary
636                        // and might benefit from further tuning, though there's no universally
637                        // optimal value.
638                        //
639                        // Additionally, if this datagram is a loss probe and `segment_size` is
640                        // larger than `INITIAL_MTU`, then padding it to `segment_size` to continue
641                        // the GSO batch would risk failure to recover from a reduction in path
642                        // MTU. Loss probes are the only packets for which we might grow
643                        // `buf_capacity` by less than `segment_size`.
644                        const MAX_PADDING: usize = 16;
645                        let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
646                            - datagram_start
647                            + builder.tag_len;
648                        if packet_len_unpadded + MAX_PADDING < segment_size
649                            || datagram_start + segment_size > buf_capacity
650                        {
651                            trace!(
652                                "GSO truncated by demand for {} padding bytes or loss probe",
653                                segment_size - packet_len_unpadded
654                            );
655                            builder_storage = Some(builder);
656                            break;
657                        }
658
659                        // Pad the current datagram to GSO segment size so it can be included in the
660                        // GSO batch.
661                        builder.pad_to(segment_size as u16);
662                    }
663
664                    builder.finish_and_track(now, self, sent_frames.take(), buf);
665
666                    if num_datagrams == 1 {
667                        // Set the segment size for this GSO batch to the size of the first UDP
668                        // datagram in the batch. Larger data that cannot be fragmented
669                        // (e.g. application datagrams) will be included in a future batch. When
670                        // sending large enough volumes of data for GSO to be useful, we expect
671                        // packet sizes to usually be consistent, e.g. populated by max-size STREAM
672                        // frames or uniformly sized datagrams.
673                        segment_size = buf.len();
674                        // Clip the unused capacity out of the buffer so future packets don't
675                        // overrun
676                        buf_capacity = buf.len();
677
678                        // Check whether the data we planned to send will fit in the reduced segment
679                        // size. If not, bail out and leave it for the next GSO batch so we don't
680                        // end up trying to send an empty packet. We can't easily compute the right
681                        // segment size before the original call to `space_can_send`, because at
682                        // that time we haven't determined whether we're going to coalesce with the
683                        // first datagram or potentially pad it to `MIN_INITIAL_SIZE`.
684                        if space_id == SpaceId::Data {
685                            let frame_space_1rtt =
686                                segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
687                            if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
688                                break;
689                            }
690                        }
691                    }
692                }
693
694                // Allocate space for another datagram
695                let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
696                    0 => segment_size,
697                    _ => {
698                        self.spaces[space_id].loss_probes -= 1;
699                        // Clamp the datagram to at most the minimum MTU to ensure that loss probes
700                        // can get through and enable recovery even if the path MTU has shrank
701                        // unexpectedly.
702                        std::cmp::min(segment_size, usize::from(INITIAL_MTU))
703                    }
704                };
705                buf_capacity += next_datagram_size_limit;
706                if buf.capacity() < buf_capacity {
707                    // We reserve the maximum space for sending `max_datagrams` upfront
708                    // to avoid any reallocations if more datagrams have to be appended later on.
709                    // Benchmarks have shown shown a 5-10% throughput improvement
710                    // compared to continuously resizing the datagram buffer.
711                    // While this will lead to over-allocation for small transmits
712                    // (e.g. purely containing ACKs), modern memory allocators
713                    // (e.g. mimalloc and jemalloc) will pool certain allocation sizes
714                    // and therefore this is still rather efficient.
715                    buf.reserve(max_datagrams * segment_size);
716                }
717                num_datagrams += 1;
718                coalesce = true;
719                pad_datagram = false;
720                datagram_start = buf.len();
721
722                debug_assert_eq!(
723                    datagram_start % segment_size,
724                    0,
725                    "datagrams in a GSO batch must be aligned to the segment size"
726                );
727            } else {
728                // We can append/coalesce the next packet into the current
729                // datagram.
730                // Finish current packet without adding extra padding
731                if let Some(builder) = builder_storage.take() {
732                    builder.finish_and_track(now, self, sent_frames.take(), buf);
733                }
734            }
735
736            debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
737
738            //
739            // From here on, we've determined that a packet will definitely be sent.
740            //
741
742            if self.spaces[SpaceId::Initial].crypto.is_some()
743                && space_id == SpaceId::Handshake
744                && self.side.is_client()
745            {
746                // A client stops both sending and processing Initial packets when it
747                // sends its first Handshake packet.
748                self.discard_space(now, SpaceId::Initial);
749            }
750            if let Some(ref mut prev) = self.prev_crypto {
751                prev.update_unacked = false;
752            }
753
754            debug_assert!(
755                builder_storage.is_none() && sent_frames.is_none(),
756                "Previous packet must have been finished"
757            );
758
759            let builder = builder_storage.insert(PacketBuilder::new(
760                now,
761                space_id,
762                self.rem_cids.active(),
763                buf,
764                buf_capacity,
765                datagram_start,
766                ack_eliciting,
767                self,
768            )?);
769            coalesce = coalesce && !builder.short_header;
770
771            // https://tools.ietf.org/html/draft-ietf-quic-transport-34#section-14.1
772            pad_datagram |=
773                space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
774
775            if close {
776                trace!("sending CONNECTION_CLOSE");
777                // Encode ACKs before the ConnectionClose message, to give the receiver
778                // a better approximate on what data has been processed. This is
779                // especially important with ack delay, since the peer might not
780                // have gotten any other ACK for the data earlier on.
781                if !self.spaces[space_id].pending_acks.ranges().is_empty() {
782                    Self::populate_acks(
783                        now,
784                        self.receiving_ecn,
785                        &mut SentFrames::default(),
786                        &mut self.spaces[space_id],
787                        buf,
788                        &mut self.stats,
789                    );
790                }
791
792                // Since there only 64 ACK frames there will always be enough space
793                // to encode the ConnectionClose frame too. However we still have the
794                // check here to prevent crashes if something changes.
795                debug_assert!(
796                    buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
797                    "ACKs should leave space for ConnectionClose"
798                );
799                if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
800                    let max_frame_size = builder.max_size - buf.len();
801                    match self.state {
802                        State::Closed(state::Closed { ref reason }) => {
803                            if space_id == SpaceId::Data || reason.is_transport_layer() {
804                                reason.encode(buf, max_frame_size)
805                            } else {
806                                frame::ConnectionClose {
807                                    error_code: TransportErrorCode::APPLICATION_ERROR,
808                                    frame_type: None,
809                                    reason: Bytes::new(),
810                                }
811                                .encode(buf, max_frame_size)
812                            }
813                        }
814                        State::Draining => frame::ConnectionClose {
815                            error_code: TransportErrorCode::NO_ERROR,
816                            frame_type: None,
817                            reason: Bytes::new(),
818                        }
819                        .encode(buf, max_frame_size),
820                        _ => unreachable!(
821                            "tried to make a close packet when the connection wasn't closed"
822                        ),
823                    }
824                }
825                if space_id == self.highest_space {
826                    // Don't send another close packet
827                    self.close = false;
828                    // `CONNECTION_CLOSE` is the final packet
829                    break;
830                } else {
831                    // Send a close frame in every possible space for robustness, per RFC9000
832                    // "Immediate Close during the Handshake". Don't bother trying to send anything
833                    // else.
834                    space_idx += 1;
835                    continue;
836                }
837            }
838
839            // Send an off-path PATH_RESPONSE. Prioritized over on-path data to ensure that path
840            // validation can occur while the link is saturated.
841            if space_id == SpaceId::Data && num_datagrams == 1 {
842                if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
843                    // `unwrap` guaranteed to succeed because `builder_storage` was populated just
844                    // above.
845                    let mut builder = builder_storage.take().unwrap();
846                    trace!("PATH_RESPONSE {:08x} (off-path)", token);
847                    buf.write(frame::FrameType::PATH_RESPONSE);
848                    buf.write(token);
849                    self.stats.frame_tx.path_response += 1;
850                    builder.pad_to(MIN_INITIAL_SIZE);
851                    builder.finish_and_track(
852                        now,
853                        self,
854                        Some(SentFrames {
855                            non_retransmits: true,
856                            ..SentFrames::default()
857                        }),
858                        buf,
859                    );
860                    self.stats.udp_tx.on_sent(1, buf.len());
861                    return Some(Transmit {
862                        destination: remote,
863                        size: buf.len(),
864                        ecn: None,
865                        segment_size: None,
866                        src_ip: self.local_ip,
867                    });
868                }
869            }
870
871            let sent =
872                self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
873
874            // ACK-only packets should only be sent when explicitly allowed. If we write them due to
875            // any other reason, there is a bug which leads to one component announcing write
876            // readiness while not writing any data. This degrades performance. The condition is
877            // only checked if the full MTU is available and when potentially large fixed-size
878            // frames aren't queued, so that lack of space in the datagram isn't the reason for just
879            // writing ACKs.
880            debug_assert!(
881                !(sent.is_ack_only(&self.streams)
882                    && !can_send.acks
883                    && can_send.other
884                    && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
885                    && self.datagrams.outgoing.is_empty()),
886                "SendableFrames was {can_send:?}, but only ACKs have been written"
887            );
888            pad_datagram |= sent.requires_padding;
889
890            if sent.largest_acked.is_some() {
891                self.spaces[space_id].pending_acks.acks_sent();
892                self.timers.stop(Timer::MaxAckDelay);
893            }
894
895            // Keep information about the packet around until it gets finalized
896            sent_frames = Some(sent);
897
898            // Don't increment space_idx.
899            // We stay in the current space and check if there is more data to send.
900        }
901
902        // Finish the last packet
903        if let Some(mut builder) = builder_storage {
904            if pad_datagram {
905                builder.pad_to(MIN_INITIAL_SIZE);
906            }
907            let last_packet_number = builder.exact_number;
908            builder.finish_and_track(now, self, sent_frames, buf);
909            self.path
910                .congestion
911                .on_sent(now, buf.len() as u64, last_packet_number);
912        }
913
914        self.app_limited = buf.is_empty() && !congestion_blocked;
915
916        // Send MTU probe if necessary
917        if buf.is_empty() && self.state.is_established() {
918            let space_id = SpaceId::Data;
919            let probe_size = self
920                .path
921                .mtud
922                .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
923
924            let buf_capacity = probe_size as usize;
925            buf.reserve(buf_capacity);
926
927            let mut builder = PacketBuilder::new(
928                now,
929                space_id,
930                self.rem_cids.active(),
931                buf,
932                buf_capacity,
933                0,
934                true,
935                self,
936            )?;
937
938            // We implement MTU probes as ping packets padded up to the probe size
939            buf.write(frame::FrameType::PING);
940            self.stats.frame_tx.ping += 1;
941
942            // If supported by the peer, we want no delays to the probe's ACK
943            if self.peer_supports_ack_frequency() {
944                buf.write(frame::FrameType::IMMEDIATE_ACK);
945                self.stats.frame_tx.immediate_ack += 1;
946            }
947
948            builder.pad_to(probe_size);
949            let sent_frames = SentFrames {
950                non_retransmits: true,
951                ..Default::default()
952            };
953            builder.finish_and_track(now, self, Some(sent_frames), buf);
954
955            self.stats.path.sent_plpmtud_probes += 1;
956            num_datagrams = 1;
957
958            trace!(?probe_size, "writing MTUD probe");
959        }
960
961        if buf.is_empty() {
962            return None;
963        }
964
965        trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
966        self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
967
968        self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
969
970        Some(Transmit {
971            destination: self.path.remote,
972            size: buf.len(),
973            ecn: if self.path.sending_ecn {
974                Some(EcnCodepoint::Ect0)
975            } else {
976                None
977            },
978            segment_size: match num_datagrams {
979                1 => None,
980                _ => Some(segment_size),
981            },
982            src_ip: self.local_ip,
983        })
984    }
985
986    /// Send PATH_CHALLENGE for a previous path if necessary
987    fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
988        let (prev_cid, prev_path) = self.prev_path.as_mut()?;
989        if !prev_path.challenge_pending {
990            return None;
991        }
992        prev_path.challenge_pending = false;
993        let token = prev_path
994            .challenge
995            .expect("previous path challenge pending without token");
996        let destination = prev_path.remote;
997        debug_assert_eq!(
998            self.highest_space,
999            SpaceId::Data,
1000            "PATH_CHALLENGE queued without 1-RTT keys"
1001        );
1002        buf.reserve(MIN_INITIAL_SIZE as usize);
1003
1004        let buf_capacity = buf.capacity();
1005
1006        // Use the previous CID to avoid linking the new path with the previous path. We
1007        // don't bother accounting for possible retirement of that prev_cid because this is
1008        // sent once, immediately after migration, when the CID is known to be valid. Even
1009        // if a post-migration packet caused the CID to be retired, it's fair to pretend
1010        // this is sent first.
1011        let mut builder = PacketBuilder::new(
1012            now,
1013            SpaceId::Data,
1014            *prev_cid,
1015            buf,
1016            buf_capacity,
1017            0,
1018            false,
1019            self,
1020        )?;
1021        trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1022        buf.write(frame::FrameType::PATH_CHALLENGE);
1023        buf.write(token);
1024        self.stats.frame_tx.path_challenge += 1;
1025
1026        // An endpoint MUST expand datagrams that contain a PATH_CHALLENGE frame
1027        // to at least the smallest allowed maximum datagram size of 1200 bytes,
1028        // unless the anti-amplification limit for the path does not permit
1029        // sending a datagram of this size
1030        builder.pad_to(MIN_INITIAL_SIZE);
1031
1032        builder.finish(self, buf);
1033        self.stats.udp_tx.on_sent(1, buf.len());
1034
1035        Some(Transmit {
1036            destination,
1037            size: buf.len(),
1038            ecn: None,
1039            segment_size: None,
1040            src_ip: self.local_ip,
1041        })
1042    }
1043
1044    /// Indicate what types of frames are ready to send for the given space
1045    fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1046        if self.spaces[space_id].crypto.is_none()
1047            && (space_id != SpaceId::Data
1048                || self.zero_rtt_crypto.is_none()
1049                || self.side.is_server())
1050        {
1051            // No keys available for this space
1052            return SendableFrames::empty();
1053        }
1054        let mut can_send = self.spaces[space_id].can_send(&self.streams);
1055        if space_id == SpaceId::Data {
1056            can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1057        }
1058        can_send
1059    }
1060
1061    /// Process `ConnectionEvent`s generated by the associated `Endpoint`
1062    ///
1063    /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
1064    /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
1065    /// extracted through the relevant methods.
1066    pub fn handle_event(&mut self, event: ConnectionEvent) {
1067        use ConnectionEventInner::*;
1068        match event.0 {
1069            Datagram(DatagramConnectionEvent {
1070                now,
1071                remote,
1072                ecn,
1073                first_decode,
1074                remaining,
1075            }) => {
1076                // If this packet could initiate a migration and we're a client or a server that
1077                // forbids migration, drop the datagram. This could be relaxed to heuristically
1078                // permit NAT-rebinding-like migration.
1079                if remote != self.path.remote && !self.side.remote_may_migrate() {
1080                    trace!("discarding packet from unrecognized peer {}", remote);
1081                    return;
1082                }
1083
1084                let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1085
1086                self.stats.udp_rx.datagrams += 1;
1087                self.stats.udp_rx.bytes += first_decode.len() as u64;
1088                let data_len = first_decode.len();
1089
1090                self.handle_decode(now, remote, ecn, first_decode);
1091                // The current `path` might have changed inside `handle_decode`,
1092                // since the packet could have triggered a migration. Make sure
1093                // the data received is accounted for the most recent path by accessing
1094                // `path` after `handle_decode`.
1095                self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1096
1097                if let Some(data) = remaining {
1098                    self.stats.udp_rx.bytes += data.len() as u64;
1099                    self.handle_coalesced(now, remote, ecn, data);
1100                }
1101
1102                if was_anti_amplification_blocked {
1103                    // A prior attempt to set the loss detection timer may have failed due to
1104                    // anti-amplification, so ensure it's set now. Prevents a handshake deadlock if
1105                    // the server's first flight is lost.
1106                    self.set_loss_detection_timer(now);
1107                }
1108            }
1109            NewIdentifiers(ids, now) => {
1110                self.local_cid_state.new_cids(&ids, now);
1111                ids.into_iter().rev().for_each(|frame| {
1112                    self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1113                });
1114                // Update Timer::PushNewCid
1115                if self
1116                    .timers
1117                    .get(Timer::PushNewCid)
1118                    .map_or(true, |x| x <= now)
1119                {
1120                    self.reset_cid_retirement();
1121                }
1122            }
1123        }
1124    }
1125
1126    /// Process timer expirations
1127    ///
1128    /// Executes protocol logic, potentially preparing signals (including application `Event`s,
1129    /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
1130    /// methods.
1131    ///
1132    /// It is most efficient to call this immediately after the system clock reaches the latest
1133    /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
1134    /// no-op and therefore are safe.
1135    pub fn handle_timeout(&mut self, now: Instant) {
1136        for &timer in &Timer::VALUES {
1137            if !self.timers.is_expired(timer, now) {
1138                continue;
1139            }
1140            self.timers.stop(timer);
1141            trace!(timer = ?timer, "timeout");
1142            match timer {
1143                Timer::Close => {
1144                    self.state = State::Drained;
1145                    self.endpoint_events.push_back(EndpointEventInner::Drained);
1146                }
1147                Timer::Idle => {
1148                    self.kill(ConnectionError::TimedOut);
1149                }
1150                Timer::KeepAlive => {
1151                    trace!("sending keep-alive");
1152                    self.ping();
1153                }
1154                Timer::LossDetection => {
1155                    self.on_loss_detection_timeout(now);
1156                }
1157                Timer::KeyDiscard => {
1158                    self.zero_rtt_crypto = None;
1159                    self.prev_crypto = None;
1160                }
1161                Timer::PathValidation => {
1162                    debug!("path validation failed");
1163                    if let Some((_, prev)) = self.prev_path.take() {
1164                        self.path = prev;
1165                    }
1166                    self.path.challenge = None;
1167                    self.path.challenge_pending = false;
1168                }
1169                Timer::Pacing => trace!("pacing timer expired"),
1170                Timer::PushNewCid => {
1171                    // Update `retire_prior_to` field in NEW_CONNECTION_ID frame
1172                    let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1173                    if !self.state.is_closed() {
1174                        trace!(
1175                            "push a new cid to peer RETIRE_PRIOR_TO field {}",
1176                            self.local_cid_state.retire_prior_to()
1177                        );
1178                        self.endpoint_events
1179                            .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1180                    }
1181                }
1182                Timer::MaxAckDelay => {
1183                    trace!("max ack delay reached");
1184                    // This timer is only armed in the Data space
1185                    self.spaces[SpaceId::Data]
1186                        .pending_acks
1187                        .on_max_ack_delay_timeout()
1188                }
1189            }
1190        }
1191    }
1192
1193    /// Close a connection immediately
1194    ///
1195    /// This does not ensure delivery of outstanding data. It is the application's responsibility to
1196    /// call this only when all important communications have been completed, e.g. by calling
1197    /// [`SendStream::finish`] on outstanding streams and waiting for the corresponding
1198    /// [`StreamEvent::Finished`] event.
1199    ///
1200    /// If [`Streams::send_streams`] returns 0, all outstanding stream data has been
1201    /// delivered. There may still be data from the peer that has not been received.
1202    ///
1203    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
1204    pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1205        self.close_inner(
1206            now,
1207            Close::Application(frame::ApplicationClose { error_code, reason }),
1208        )
1209    }
1210
1211    fn close_inner(&mut self, now: Instant, reason: Close) {
1212        let was_closed = self.state.is_closed();
1213        if !was_closed {
1214            self.close_common();
1215            self.set_close_timer(now);
1216            self.close = true;
1217            self.state = State::Closed(state::Closed { reason });
1218        }
1219    }
1220
1221    /// Control datagrams
1222    pub fn datagrams(&mut self) -> Datagrams<'_> {
1223        Datagrams { conn: self }
1224    }
1225
1226    /// Returns connection statistics
1227    pub fn stats(&self) -> ConnectionStats {
1228        let mut stats = self.stats;
1229        stats.path.rtt = self.path.rtt.get();
1230        stats.path.cwnd = self.path.congestion.window();
1231        stats.path.current_mtu = self.path.mtud.current_mtu();
1232
1233        stats
1234    }
1235
1236    /// Ping the remote endpoint
1237    ///
1238    /// Causes an ACK-eliciting packet to be transmitted.
1239    pub fn ping(&mut self) {
1240        self.spaces[self.highest_space].ping_pending = true;
1241    }
1242
1243    /// Update traffic keys spontaneously
1244    ///
1245    /// This can be useful for testing key updates, as they otherwise only happen infrequently.
1246    pub fn force_key_update(&mut self) {
1247        self.update_keys(None, false);
1248    }
1249
1250    // Compatibility wrapper for quinn < 0.11.7. Remove for 0.12.
1251    #[doc(hidden)]
1252    #[deprecated]
1253    pub fn initiate_key_update(&mut self) {
1254        self.force_key_update();
1255    }
1256
1257    /// Get a session reference
1258    pub fn crypto_session(&self) -> &dyn crypto::Session {
1259        &*self.crypto
1260    }
1261
1262    /// Whether the connection is in the process of being established
1263    ///
1264    /// If this returns `false`, the connection may be either established or closed, signaled by the
1265    /// emission of a `Connected` or `ConnectionLost` message respectively.
1266    pub fn is_handshaking(&self) -> bool {
1267        self.state.is_handshake()
1268    }
1269
1270    /// Whether the connection is closed
1271    ///
1272    /// Closed connections cannot transport any further data. A connection becomes closed when
1273    /// either peer application intentionally closes it, or when either transport layer detects an
1274    /// error such as a time-out or certificate validation failure.
1275    ///
1276    /// A `ConnectionLost` event is emitted with details when the connection becomes closed.
1277    pub fn is_closed(&self) -> bool {
1278        self.state.is_closed()
1279    }
1280
1281    /// Whether there is no longer any need to keep the connection around
1282    ///
1283    /// Closed connections become drained after a brief timeout to absorb any remaining in-flight
1284    /// packets from the peer. All drained connections have been closed.
1285    pub fn is_drained(&self) -> bool {
1286        self.state.is_drained()
1287    }
1288
1289    /// For clients, if the peer accepted the 0-RTT data packets
1290    ///
1291    /// The value is meaningless until after the handshake completes.
1292    pub fn accepted_0rtt(&self) -> bool {
1293        self.accepted_0rtt
1294    }
1295
1296    /// Whether 0-RTT is/was possible during the handshake
1297    pub fn has_0rtt(&self) -> bool {
1298        self.zero_rtt_enabled
1299    }
1300
1301    /// Whether there are any pending retransmits
1302    pub fn has_pending_retransmits(&self) -> bool {
1303        !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1304    }
1305
1306    /// Look up whether we're the client or server of this Connection
1307    pub fn side(&self) -> Side {
1308        self.side.side()
1309    }
1310
1311    /// The latest socket address for this connection's peer
1312    pub fn remote_address(&self) -> SocketAddr {
1313        self.path.remote
1314    }
1315
1316    /// The local IP address which was used when the peer established
1317    /// the connection
1318    ///
1319    /// This can be different from the address the endpoint is bound to, in case
1320    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
1321    ///
1322    /// This will return `None` for clients, or when no `local_ip` was passed to
1323    /// [`Endpoint::handle()`](crate::Endpoint::handle) for the datagrams establishing this
1324    /// connection.
1325    pub fn local_ip(&self) -> Option<IpAddr> {
1326        self.local_ip
1327    }
1328
1329    /// Current best estimate of this connection's latency (round-trip-time)
1330    pub fn rtt(&self) -> Duration {
1331        self.path.rtt.get()
1332    }
1333
1334    /// Current state of this connection's congestion controller, for debugging purposes
1335    pub fn congestion_state(&self) -> &dyn Controller {
1336        self.path.congestion.as_ref()
1337    }
1338
1339    /// Resets path-specific settings.
1340    ///
1341    /// This will force-reset several subsystems related to a specific network path.
1342    /// Currently this is the congestion controller, round-trip estimator, and the MTU
1343    /// discovery.
1344    ///
1345    /// This is useful when it is known the underlying network path has changed and the old
1346    /// state of these subsystems is no longer valid or optimal. In this case it might be
1347    /// faster or reduce loss to settle on optimal values by restarting from the initial
1348    /// configuration in the [`TransportConfig`].
1349    pub fn path_changed(&mut self, now: Instant) {
1350        self.path.reset(now, &self.config);
1351    }
1352
1353    /// Modify the number of remotely initiated streams that may be concurrently open
1354    ///
1355    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
1356    /// `count`s increase both minimum and worst-case memory consumption.
1357    pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1358        self.streams.set_max_concurrent(dir, count);
1359        // If the limit was reduced, then a flow control update previously deemed insignificant may
1360        // now be significant.
1361        let pending = &mut self.spaces[SpaceId::Data].pending;
1362        self.streams.queue_max_stream_id(pending);
1363    }
1364
1365    /// Current number of remotely initiated streams that may be concurrently open
1366    ///
1367    /// If the target for this limit is reduced using [`set_max_concurrent_streams`](Self::set_max_concurrent_streams),
1368    /// it will not change immediately, even if fewer streams are open. Instead, it will
1369    /// decrement by one for each time a remotely initiated stream of matching directionality is closed.
1370    pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1371        self.streams.max_concurrent(dir)
1372    }
1373
1374    /// See [`TransportConfig::receive_window()`]
1375    pub fn set_receive_window(&mut self, receive_window: VarInt) {
1376        if self.streams.set_receive_window(receive_window) {
1377            self.spaces[SpaceId::Data].pending.max_data = true;
1378        }
1379    }
1380
1381    fn on_ack_received(
1382        &mut self,
1383        now: Instant,
1384        space: SpaceId,
1385        ack: frame::Ack,
1386    ) -> Result<(), TransportError> {
1387        if ack.largest >= self.spaces[space].next_packet_number {
1388            return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1389        }
1390        let new_largest = {
1391            let space = &mut self.spaces[space];
1392            if space
1393                .largest_acked_packet
1394                .map_or(true, |pn| ack.largest > pn)
1395            {
1396                space.largest_acked_packet = Some(ack.largest);
1397                if let Some(info) = space.sent_packets.get(&ack.largest) {
1398                    // This should always succeed, but a misbehaving peer might ACK a packet we
1399                    // haven't sent. At worst, that will result in us spuriously reducing the
1400                    // congestion window.
1401                    space.largest_acked_packet_sent = info.time_sent;
1402                }
1403                true
1404            } else {
1405                false
1406            }
1407        };
1408
1409        // Avoid DoS from unreasonably huge ack ranges by filtering out just the new acks.
1410        let mut newly_acked = ArrayRangeSet::new();
1411        for range in ack.iter() {
1412            self.packet_number_filter.check_ack(space, range.clone())?;
1413            for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1414                newly_acked.insert_one(pn);
1415            }
1416        }
1417
1418        if newly_acked.is_empty() {
1419            return Ok(());
1420        }
1421
1422        let mut ack_eliciting_acked = false;
1423        for packet in newly_acked.elts() {
1424            if let Some(info) = self.spaces[space].take(packet) {
1425                if let Some(acked) = info.largest_acked {
1426                    // Assume ACKs for all packets below the largest acknowledged in `packet` have
1427                    // been received. This can cause the peer to spuriously retransmit if some of
1428                    // our earlier ACKs were lost, but allows for simpler state tracking. See
1429                    // discussion at
1430                    // https://www.rfc-editor.org/rfc/rfc9000.html#name-limiting-ranges-by-tracking
1431                    self.spaces[space].pending_acks.subtract_below(acked);
1432                }
1433                ack_eliciting_acked |= info.ack_eliciting;
1434
1435                // Notify MTU discovery that a packet was acked, because it might be an MTU probe
1436                let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1437                if mtu_updated {
1438                    self.path
1439                        .congestion
1440                        .on_mtu_update(self.path.mtud.current_mtu());
1441                }
1442
1443                // Notify ack frequency that a packet was acked, because it might contain an ACK_FREQUENCY frame
1444                self.ack_frequency.on_acked(packet);
1445
1446                self.on_packet_acked(now, packet, info);
1447            }
1448        }
1449
1450        self.path.congestion.on_end_acks(
1451            now,
1452            self.path.in_flight.bytes,
1453            self.app_limited,
1454            self.spaces[space].largest_acked_packet,
1455        );
1456
1457        if new_largest && ack_eliciting_acked {
1458            let ack_delay = if space != SpaceId::Data {
1459                Duration::from_micros(0)
1460            } else {
1461                cmp::min(
1462                    self.ack_frequency.peer_max_ack_delay,
1463                    Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1464                )
1465            };
1466            let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1467            self.path.rtt.update(ack_delay, rtt);
1468            if self.path.first_packet_after_rtt_sample.is_none() {
1469                self.path.first_packet_after_rtt_sample =
1470                    Some((space, self.spaces[space].next_packet_number));
1471            }
1472        }
1473
1474        // Must be called before crypto/pto_count are clobbered
1475        self.detect_lost_packets(now, space, true);
1476
1477        if self.peer_completed_address_validation() {
1478            self.pto_count = 0;
1479        }
1480
1481        // Explicit congestion notification
1482        if self.path.sending_ecn {
1483            if let Some(ecn) = ack.ecn {
1484                // We only examine ECN counters from ACKs that we are certain we received in transmit
1485                // order, allowing us to compute an increase in ECN counts to compare against the number
1486                // of newly acked packets that remains well-defined in the presence of arbitrary packet
1487                // reordering.
1488                if new_largest {
1489                    let sent = self.spaces[space].largest_acked_packet_sent;
1490                    self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1491                }
1492            } else {
1493                // We always start out sending ECN, so any ack that doesn't acknowledge it disables it.
1494                debug!("ECN not acknowledged by peer");
1495                self.path.sending_ecn = false;
1496            }
1497        }
1498
1499        self.set_loss_detection_timer(now);
1500        Ok(())
1501    }
1502
1503    /// Process a new ECN block from an in-order ACK
1504    fn process_ecn(
1505        &mut self,
1506        now: Instant,
1507        space: SpaceId,
1508        newly_acked: u64,
1509        ecn: frame::EcnCounts,
1510        largest_sent_time: Instant,
1511    ) {
1512        match self.spaces[space].detect_ecn(newly_acked, ecn) {
1513            Err(e) => {
1514                debug!("halting ECN due to verification failure: {}", e);
1515                self.path.sending_ecn = false;
1516                // Wipe out the existing value because it might be garbage and could interfere with
1517                // future attempts to use ECN on new paths.
1518                self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1519            }
1520            Ok(false) => {}
1521            Ok(true) => {
1522                self.stats.path.congestion_events += 1;
1523                self.path
1524                    .congestion
1525                    .on_congestion_event(now, largest_sent_time, false, 0);
1526            }
1527        }
1528    }
1529
1530    // Not timing-aware, so it's safe to call this for inferred acks, such as arise from
1531    // high-latency handshakes
1532    fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1533        self.remove_in_flight(pn, &info);
1534        if info.ack_eliciting && self.path.challenge.is_none() {
1535            // Only pass ACKs to the congestion controller if we are not validating the current
1536            // path, so as to ignore any ACKs from older paths still coming in.
1537            self.path.congestion.on_ack(
1538                now,
1539                info.time_sent,
1540                info.size.into(),
1541                self.app_limited,
1542                &self.path.rtt,
1543            );
1544        }
1545
1546        // Update state for confirmed delivery of frames
1547        if let Some(retransmits) = info.retransmits.get() {
1548            for (id, _) in retransmits.reset_stream.iter() {
1549                self.streams.reset_acked(*id);
1550            }
1551        }
1552
1553        for frame in info.stream_frames {
1554            self.streams.received_ack_of(frame);
1555        }
1556    }
1557
1558    fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1559        let start = if self.zero_rtt_crypto.is_some() {
1560            now
1561        } else {
1562            self.prev_crypto
1563                .as_ref()
1564                .expect("no previous keys")
1565                .end_packet
1566                .as_ref()
1567                .expect("update not acknowledged yet")
1568                .1
1569        };
1570        self.timers
1571            .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1572    }
1573
1574    fn on_loss_detection_timeout(&mut self, now: Instant) {
1575        if let Some((_, pn_space)) = self.loss_time_and_space() {
1576            // Time threshold loss Detection
1577            self.detect_lost_packets(now, pn_space, false);
1578            self.set_loss_detection_timer(now);
1579            return;
1580        }
1581
1582        let (_, space) = match self.pto_time_and_space(now) {
1583            Some(x) => x,
1584            None => {
1585                error!("PTO expired while unset");
1586                return;
1587            }
1588        };
1589        trace!(
1590            in_flight = self.path.in_flight.bytes,
1591            count = self.pto_count,
1592            ?space,
1593            "PTO fired"
1594        );
1595
1596        let count = match self.path.in_flight.ack_eliciting {
1597            // A PTO when we're not expecting any ACKs must be due to handshake anti-amplification
1598            // deadlock preventions
1599            0 => {
1600                debug_assert!(!self.peer_completed_address_validation());
1601                1
1602            }
1603            // Conventional loss probe
1604            _ => 2,
1605        };
1606        self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1607        self.pto_count = self.pto_count.saturating_add(1);
1608        self.set_loss_detection_timer(now);
1609    }
1610
1611    fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1612        let mut lost_packets = Vec::<u64>::new();
1613        let mut lost_mtu_probe = None;
1614        let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1615        let rtt = self.path.rtt.conservative();
1616        let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1617
1618        // Packets sent before this time are deemed lost.
1619        let lost_send_time = now.checked_sub(loss_delay).unwrap();
1620        let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1621        let packet_threshold = self.config.packet_threshold as u64;
1622        let mut size_of_lost_packets = 0u64;
1623
1624        // InPersistentCongestion: Determine if all packets in the time period before the newest
1625        // lost packet, including the edges, are marked lost. PTO computation must always
1626        // include max ACK delay, i.e. operate as if in Data space (see RFC9001 §7.6.1).
1627        let congestion_period =
1628            self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1629        let mut persistent_congestion_start: Option<Instant> = None;
1630        let mut prev_packet = None;
1631        let mut in_persistent_congestion = false;
1632
1633        let space = &mut self.spaces[pn_space];
1634        space.loss_time = None;
1635
1636        for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1637            if prev_packet != Some(packet.wrapping_sub(1)) {
1638                // An intervening packet was acknowledged
1639                persistent_congestion_start = None;
1640            }
1641
1642            if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1643            {
1644                if Some(packet) == in_flight_mtu_probe {
1645                    // Lost MTU probes are not included in `lost_packets`, because they should not
1646                    // trigger a congestion control response
1647                    lost_mtu_probe = in_flight_mtu_probe;
1648                } else {
1649                    lost_packets.push(packet);
1650                    size_of_lost_packets += info.size as u64;
1651                    if info.ack_eliciting && due_to_ack {
1652                        match persistent_congestion_start {
1653                            // Two ACK-eliciting packets lost more than congestion_period apart, with no
1654                            // ACKed packets in between
1655                            Some(start) if info.time_sent - start > congestion_period => {
1656                                in_persistent_congestion = true;
1657                            }
1658                            // Persistent congestion must start after the first RTT sample
1659                            None if self
1660                                .path
1661                                .first_packet_after_rtt_sample
1662                                .is_some_and(|x| x < (pn_space, packet)) =>
1663                            {
1664                                persistent_congestion_start = Some(info.time_sent);
1665                            }
1666                            _ => {}
1667                        }
1668                    }
1669                }
1670            } else {
1671                let next_loss_time = info.time_sent + loss_delay;
1672                space.loss_time = Some(
1673                    space
1674                        .loss_time
1675                        .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1676                );
1677                persistent_congestion_start = None;
1678            }
1679
1680            prev_packet = Some(packet);
1681        }
1682
1683        // OnPacketsLost
1684        if let Some(largest_lost) = lost_packets.last().cloned() {
1685            let old_bytes_in_flight = self.path.in_flight.bytes;
1686            let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1687            self.lost_packets += lost_packets.len() as u64;
1688            self.stats.path.lost_packets += lost_packets.len() as u64;
1689            self.stats.path.lost_bytes += size_of_lost_packets;
1690            trace!(
1691                "packets lost: {:?}, bytes lost: {}",
1692                lost_packets, size_of_lost_packets
1693            );
1694
1695            for &packet in &lost_packets {
1696                let info = self.spaces[pn_space].take(packet).unwrap(); // safe: lost_packets is populated just above
1697                self.remove_in_flight(packet, &info);
1698                for frame in info.stream_frames {
1699                    self.streams.retransmit(frame);
1700                }
1701                self.spaces[pn_space].pending |= info.retransmits;
1702                self.path.mtud.on_non_probe_lost(packet, info.size);
1703            }
1704
1705            if self.path.mtud.black_hole_detected(now) {
1706                self.stats.path.black_holes_detected += 1;
1707                self.path
1708                    .congestion
1709                    .on_mtu_update(self.path.mtud.current_mtu());
1710                if let Some(max_datagram_size) = self.datagrams().max_size() {
1711                    self.datagrams.drop_oversized(max_datagram_size);
1712                }
1713            }
1714
1715            // Don't apply congestion penalty for lost ack-only packets
1716            let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1717
1718            if lost_ack_eliciting {
1719                self.stats.path.congestion_events += 1;
1720                self.path.congestion.on_congestion_event(
1721                    now,
1722                    largest_lost_sent,
1723                    in_persistent_congestion,
1724                    size_of_lost_packets,
1725                );
1726            }
1727        }
1728
1729        // Handle a lost MTU probe
1730        if let Some(packet) = lost_mtu_probe {
1731            let info = self.spaces[SpaceId::Data].take(packet).unwrap(); // safe: lost_mtu_probe is omitted from lost_packets, and therefore must not have been removed yet
1732            self.remove_in_flight(packet, &info);
1733            self.path.mtud.on_probe_lost();
1734            self.stats.path.lost_plpmtud_probes += 1;
1735        }
1736    }
1737
1738    fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1739        SpaceId::iter()
1740            .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1741            .min_by_key(|&(time, _)| time)
1742    }
1743
1744    fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1745        let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1746        let mut duration = self.path.rtt.pto_base() * backoff;
1747
1748        if self.path.in_flight.ack_eliciting == 0 {
1749            debug_assert!(!self.peer_completed_address_validation());
1750            let space = match self.highest_space {
1751                SpaceId::Handshake => SpaceId::Handshake,
1752                _ => SpaceId::Initial,
1753            };
1754            return Some((now + duration, space));
1755        }
1756
1757        let mut result = None;
1758        for space in SpaceId::iter() {
1759            if self.spaces[space].in_flight == 0 {
1760                continue;
1761            }
1762            if space == SpaceId::Data {
1763                // Skip ApplicationData until handshake completes.
1764                if self.is_handshaking() {
1765                    return result;
1766                }
1767                // Include max_ack_delay and backoff for ApplicationData.
1768                duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1769            }
1770            let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1771                Some(time) => time,
1772                None => continue,
1773            };
1774            let pto = last_ack_eliciting + duration;
1775            if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1776                result = Some((pto, space));
1777            }
1778        }
1779        result
1780    }
1781
1782    fn peer_completed_address_validation(&self) -> bool {
1783        if self.side.is_server() || self.state.is_closed() {
1784            return true;
1785        }
1786        // The server is guaranteed to have validated our address if any of our handshake or 1-RTT
1787        // packets are acknowledged or we've seen HANDSHAKE_DONE and discarded handshake keys.
1788        self.spaces[SpaceId::Handshake]
1789            .largest_acked_packet
1790            .is_some()
1791            || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1792            || (self.spaces[SpaceId::Data].crypto.is_some()
1793                && self.spaces[SpaceId::Handshake].crypto.is_none())
1794    }
1795
1796    fn set_loss_detection_timer(&mut self, now: Instant) {
1797        if self.state.is_closed() {
1798            // No loss detection takes place on closed connections, and `close_common` already
1799            // stopped time timer. Ensure we don't restart it inadvertently, e.g. in response to a
1800            // reordered packet being handled by state-insensitive code.
1801            return;
1802        }
1803
1804        if let Some((loss_time, _)) = self.loss_time_and_space() {
1805            // Time threshold loss detection.
1806            self.timers.set(Timer::LossDetection, loss_time);
1807            return;
1808        }
1809
1810        if self.path.anti_amplification_blocked(1) {
1811            // We wouldn't be able to send anything, so don't bother.
1812            self.timers.stop(Timer::LossDetection);
1813            return;
1814        }
1815
1816        if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1817            // There is nothing to detect lost, so no timer is set. However, the client needs to arm
1818            // the timer if the server might be blocked by the anti-amplification limit.
1819            self.timers.stop(Timer::LossDetection);
1820            return;
1821        }
1822
1823        // Determine which PN space to arm PTO for.
1824        // Calculate PTO duration
1825        if let Some((timeout, _)) = self.pto_time_and_space(now) {
1826            self.timers.set(Timer::LossDetection, timeout);
1827        } else {
1828            self.timers.stop(Timer::LossDetection);
1829        }
1830    }
1831
1832    /// Probe Timeout
1833    fn pto(&self, space: SpaceId) -> Duration {
1834        let max_ack_delay = match space {
1835            SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1836            SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1837        };
1838        self.path.rtt.pto_base() + max_ack_delay
1839    }
1840
1841    fn on_packet_authenticated(
1842        &mut self,
1843        now: Instant,
1844        space_id: SpaceId,
1845        ecn: Option<EcnCodepoint>,
1846        packet: Option<u64>,
1847        spin: bool,
1848        is_1rtt: bool,
1849    ) {
1850        self.total_authed_packets += 1;
1851        self.reset_keep_alive(now);
1852        self.reset_idle_timeout(now, space_id);
1853        self.permit_idle_reset = true;
1854        self.receiving_ecn |= ecn.is_some();
1855        if let Some(x) = ecn {
1856            let space = &mut self.spaces[space_id];
1857            space.ecn_counters += x;
1858
1859            if x.is_ce() {
1860                space.pending_acks.set_immediate_ack_required();
1861            }
1862        }
1863
1864        let packet = match packet {
1865            Some(x) => x,
1866            None => return,
1867        };
1868        if self.side.is_server() {
1869            if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1870                // A server stops sending and processing Initial packets when it receives its first Handshake packet.
1871                self.discard_space(now, SpaceId::Initial);
1872            }
1873            if self.zero_rtt_crypto.is_some() && is_1rtt {
1874                // Discard 0-RTT keys soon after receiving a 1-RTT packet
1875                self.set_key_discard_timer(now, space_id)
1876            }
1877        }
1878        let space = &mut self.spaces[space_id];
1879        space.pending_acks.insert_one(packet, now);
1880        if packet >= space.rx_packet {
1881            space.rx_packet = packet;
1882            // Update outgoing spin bit, inverting iff we're the client
1883            self.spin = self.side.is_client() ^ spin;
1884        }
1885    }
1886
1887    fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1888        let timeout = match self.idle_timeout {
1889            None => return,
1890            Some(dur) => dur,
1891        };
1892        if self.state.is_closed() {
1893            self.timers.stop(Timer::Idle);
1894            return;
1895        }
1896        let dt = cmp::max(timeout, 3 * self.pto(space));
1897        self.timers.set(Timer::Idle, now + dt);
1898    }
1899
1900    fn reset_keep_alive(&mut self, now: Instant) {
1901        let interval = match self.config.keep_alive_interval {
1902            Some(x) if self.state.is_established() => x,
1903            _ => return,
1904        };
1905        self.timers.set(Timer::KeepAlive, now + interval);
1906    }
1907
1908    fn reset_cid_retirement(&mut self) {
1909        if let Some(t) = self.local_cid_state.next_timeout() {
1910            self.timers.set(Timer::PushNewCid, t);
1911        }
1912    }
1913
1914    /// Handle the already-decrypted first packet from the client
1915    ///
1916    /// Decrypting the first packet in the `Endpoint` allows stateless packet handling to be more
1917    /// efficient.
1918    pub(crate) fn handle_first_packet(
1919        &mut self,
1920        now: Instant,
1921        remote: SocketAddr,
1922        ecn: Option<EcnCodepoint>,
1923        packet_number: u64,
1924        packet: InitialPacket,
1925        remaining: Option<BytesMut>,
1926    ) -> Result<(), ConnectionError> {
1927        let span = trace_span!("first recv");
1928        let _guard = span.enter();
1929        debug_assert!(self.side.is_server());
1930        let len = packet.header_data.len() + packet.payload.len();
1931        self.path.total_recvd = len as u64;
1932
1933        match self.state {
1934            State::Handshake(ref mut state) => {
1935                state.expected_token = packet.header.token.clone();
1936            }
1937            _ => unreachable!("first packet must be delivered in Handshake state"),
1938        }
1939
1940        self.on_packet_authenticated(
1941            now,
1942            SpaceId::Initial,
1943            ecn,
1944            Some(packet_number),
1945            false,
1946            false,
1947        );
1948
1949        self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
1950        if let Some(data) = remaining {
1951            self.handle_coalesced(now, remote, ecn, data);
1952        }
1953        Ok(())
1954    }
1955
1956    fn init_0rtt(&mut self) {
1957        let (header, packet) = match self.crypto.early_crypto() {
1958            Some(x) => x,
1959            None => return,
1960        };
1961        if self.side.is_client() {
1962            match self.crypto.transport_parameters() {
1963                Ok(params) => {
1964                    let params = params
1965                        .expect("crypto layer didn't supply transport parameters with ticket");
1966                    // Certain values must not be cached
1967                    let params = TransportParameters {
1968                        initial_src_cid: None,
1969                        original_dst_cid: None,
1970                        preferred_address: None,
1971                        retry_src_cid: None,
1972                        stateless_reset_token: None,
1973                        min_ack_delay: None,
1974                        ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
1975                        max_ack_delay: TransportParameters::default().max_ack_delay,
1976                        ..params
1977                    };
1978                    self.set_peer_params(params);
1979                }
1980                Err(e) => {
1981                    error!("session ticket has malformed transport parameters: {}", e);
1982                    return;
1983                }
1984            }
1985        }
1986        trace!("0-RTT enabled");
1987        self.zero_rtt_enabled = true;
1988        self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
1989    }
1990
1991    fn read_crypto(
1992        &mut self,
1993        space: SpaceId,
1994        crypto: &frame::Crypto,
1995        payload_len: usize,
1996    ) -> Result<(), TransportError> {
1997        let expected = if !self.state.is_handshake() {
1998            SpaceId::Data
1999        } else if self.highest_space == SpaceId::Initial {
2000            SpaceId::Initial
2001        } else {
2002            // On the server, self.highest_space can be Data after receiving the client's first
2003            // flight, but we expect Handshake CRYPTO until the handshake is complete.
2004            SpaceId::Handshake
2005        };
2006        // We can't decrypt Handshake packets when highest_space is Initial, CRYPTO frames in 0-RTT
2007        // packets are illegal, and we don't process 1-RTT packets until the handshake is
2008        // complete. Therefore, we will never see CRYPTO data from a later-than-expected space.
2009        debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2010
2011        let end = crypto.offset + crypto.data.len() as u64;
2012        if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2013            warn!(
2014                "received new {:?} CRYPTO data when expecting {:?}",
2015                space, expected
2016            );
2017            return Err(TransportError::PROTOCOL_VIOLATION(
2018                "new data at unexpected encryption level",
2019            ));
2020        }
2021
2022        let space = &mut self.spaces[space];
2023        let max = end.saturating_sub(space.crypto_stream.bytes_read());
2024        if max > self.config.crypto_buffer_size as u64 {
2025            return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2026        }
2027
2028        space
2029            .crypto_stream
2030            .insert(crypto.offset, crypto.data.clone(), payload_len);
2031        while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2032            trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2033            if self.crypto.read_handshake(&chunk.bytes)? {
2034                self.events.push_back(Event::HandshakeDataReady);
2035            }
2036        }
2037
2038        Ok(())
2039    }
2040
2041    fn write_crypto(&mut self) {
2042        loop {
2043            let space = self.highest_space;
2044            let mut outgoing = Vec::new();
2045            if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2046                match space {
2047                    SpaceId::Initial => {
2048                        self.upgrade_crypto(SpaceId::Handshake, crypto);
2049                    }
2050                    SpaceId::Handshake => {
2051                        self.upgrade_crypto(SpaceId::Data, crypto);
2052                    }
2053                    _ => unreachable!("got updated secrets during 1-RTT"),
2054                }
2055            }
2056            if outgoing.is_empty() {
2057                if space == self.highest_space {
2058                    break;
2059                } else {
2060                    // Keys updated, check for more data to send
2061                    continue;
2062                }
2063            }
2064            let offset = self.spaces[space].crypto_offset;
2065            let outgoing = Bytes::from(outgoing);
2066            if let State::Handshake(ref mut state) = self.state {
2067                if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2068                    state.client_hello = Some(outgoing.clone());
2069                }
2070            }
2071            self.spaces[space].crypto_offset += outgoing.len() as u64;
2072            trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2073            self.spaces[space].pending.crypto.push_back(frame::Crypto {
2074                offset,
2075                data: outgoing,
2076            });
2077        }
2078    }
2079
2080    /// Switch to stronger cryptography during handshake
2081    fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2082        debug_assert!(
2083            self.spaces[space].crypto.is_none(),
2084            "already reached packet space {space:?}"
2085        );
2086        trace!("{:?} keys ready", space);
2087        if space == SpaceId::Data {
2088            // Precompute the first key update
2089            self.next_crypto = Some(
2090                self.crypto
2091                    .next_1rtt_keys()
2092                    .expect("handshake should be complete"),
2093            );
2094        }
2095
2096        self.spaces[space].crypto = Some(crypto);
2097        debug_assert!(space as usize > self.highest_space as usize);
2098        self.highest_space = space;
2099        if space == SpaceId::Data && self.side.is_client() {
2100            // Discard 0-RTT keys because 1-RTT keys are available.
2101            self.zero_rtt_crypto = None;
2102        }
2103    }
2104
2105    fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2106        debug_assert!(space_id != SpaceId::Data);
2107        trace!("discarding {:?} keys", space_id);
2108        if space_id == SpaceId::Initial {
2109            // No longer needed
2110            if let ConnectionSide::Client { token, .. } = &mut self.side {
2111                *token = Bytes::new();
2112            }
2113        }
2114        let space = &mut self.spaces[space_id];
2115        space.crypto = None;
2116        space.time_of_last_ack_eliciting_packet = None;
2117        space.loss_time = None;
2118        space.in_flight = 0;
2119        let sent_packets = mem::take(&mut space.sent_packets);
2120        for (pn, packet) in sent_packets.into_iter() {
2121            self.remove_in_flight(pn, &packet);
2122        }
2123        self.set_loss_detection_timer(now)
2124    }
2125
2126    fn handle_coalesced(
2127        &mut self,
2128        now: Instant,
2129        remote: SocketAddr,
2130        ecn: Option<EcnCodepoint>,
2131        data: BytesMut,
2132    ) {
2133        self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2134        let mut remaining = Some(data);
2135        while let Some(data) = remaining {
2136            match PartialDecode::new(
2137                data,
2138                &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2139                &[self.version],
2140                self.endpoint_config.grease_quic_bit,
2141            ) {
2142                Ok((partial_decode, rest)) => {
2143                    remaining = rest;
2144                    self.handle_decode(now, remote, ecn, partial_decode);
2145                }
2146                Err(e) => {
2147                    trace!("malformed header: {}", e);
2148                    return;
2149                }
2150            }
2151        }
2152    }
2153
2154    fn handle_decode(
2155        &mut self,
2156        now: Instant,
2157        remote: SocketAddr,
2158        ecn: Option<EcnCodepoint>,
2159        partial_decode: PartialDecode,
2160    ) {
2161        if let Some(decoded) = packet_crypto::unprotect_header(
2162            partial_decode,
2163            &self.spaces,
2164            self.zero_rtt_crypto.as_ref(),
2165            self.peer_params.stateless_reset_token,
2166        ) {
2167            self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2168        }
2169    }
2170
2171    fn handle_packet(
2172        &mut self,
2173        now: Instant,
2174        remote: SocketAddr,
2175        ecn: Option<EcnCodepoint>,
2176        packet: Option<Packet>,
2177        stateless_reset: bool,
2178    ) {
2179        self.stats.udp_rx.ios += 1;
2180        if let Some(ref packet) = packet {
2181            trace!(
2182                "got {:?} packet ({} bytes) from {} using id {}",
2183                packet.header.space(),
2184                packet.payload.len() + packet.header_data.len(),
2185                remote,
2186                packet.header.dst_cid(),
2187            );
2188        }
2189
2190        if self.is_handshaking() && remote != self.path.remote {
2191            debug!("discarding packet with unexpected remote during handshake");
2192            return;
2193        }
2194
2195        let was_closed = self.state.is_closed();
2196        let was_drained = self.state.is_drained();
2197
2198        let decrypted = match packet {
2199            None => Err(None),
2200            Some(mut packet) => self
2201                .decrypt_packet(now, &mut packet)
2202                .map(move |number| (packet, number)),
2203        };
2204        let result = match decrypted {
2205            _ if stateless_reset => {
2206                debug!("got stateless reset");
2207                Err(ConnectionError::Reset)
2208            }
2209            Err(Some(e)) => {
2210                warn!("illegal packet: {}", e);
2211                Err(e.into())
2212            }
2213            Err(None) => {
2214                debug!("failed to authenticate packet");
2215                self.authentication_failures += 1;
2216                let integrity_limit = self.spaces[self.highest_space]
2217                    .crypto
2218                    .as_ref()
2219                    .unwrap()
2220                    .packet
2221                    .local
2222                    .integrity_limit();
2223                if self.authentication_failures > integrity_limit {
2224                    Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2225                } else {
2226                    return;
2227                }
2228            }
2229            Ok((packet, number)) => {
2230                let span = match number {
2231                    Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2232                    None => trace_span!("recv", space = ?packet.header.space()),
2233                };
2234                let _guard = span.enter();
2235
2236                let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2237                if number.is_some_and(is_duplicate) {
2238                    debug!("discarding possible duplicate packet");
2239                    return;
2240                } else if self.state.is_handshake() && packet.header.is_short() {
2241                    // TODO: SHOULD buffer these to improve reordering tolerance.
2242                    trace!("dropping short packet during handshake");
2243                    return;
2244                } else {
2245                    if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2246                        if let State::Handshake(ref hs) = self.state {
2247                            if self.side.is_server() && token != &hs.expected_token {
2248                                // Clients must send the same retry token in every Initial. Initial
2249                                // packets can be spoofed, so we discard rather than killing the
2250                                // connection.
2251                                warn!("discarding Initial with invalid retry token");
2252                                return;
2253                            }
2254                        }
2255                    }
2256
2257                    if !self.state.is_closed() {
2258                        let spin = match packet.header {
2259                            Header::Short { spin, .. } => spin,
2260                            _ => false,
2261                        };
2262                        self.on_packet_authenticated(
2263                            now,
2264                            packet.header.space(),
2265                            ecn,
2266                            number,
2267                            spin,
2268                            packet.header.is_1rtt(),
2269                        );
2270                    }
2271                    self.process_decrypted_packet(now, remote, number, packet)
2272                }
2273            }
2274        };
2275
2276        // State transitions for error cases
2277        if let Err(conn_err) = result {
2278            self.error = Some(conn_err.clone());
2279            self.state = match conn_err {
2280                ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2281                ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2282                ConnectionError::Reset
2283                | ConnectionError::TransportError(TransportError {
2284                    code: TransportErrorCode::AEAD_LIMIT_REACHED,
2285                    ..
2286                }) => State::Drained,
2287                ConnectionError::TimedOut => {
2288                    unreachable!("timeouts aren't generated by packet processing");
2289                }
2290                ConnectionError::TransportError(err) => {
2291                    debug!("closing connection due to transport error: {}", err);
2292                    State::closed(err)
2293                }
2294                ConnectionError::VersionMismatch => State::Draining,
2295                ConnectionError::LocallyClosed => {
2296                    unreachable!("LocallyClosed isn't generated by packet processing");
2297                }
2298                ConnectionError::CidsExhausted => {
2299                    unreachable!("CidsExhausted isn't generated by packet processing");
2300                }
2301            };
2302        }
2303
2304        if !was_closed && self.state.is_closed() {
2305            self.close_common();
2306            if !self.state.is_drained() {
2307                self.set_close_timer(now);
2308            }
2309        }
2310        if !was_drained && self.state.is_drained() {
2311            self.endpoint_events.push_back(EndpointEventInner::Drained);
2312            // Close timer may have been started previously, e.g. if we sent a close and got a
2313            // stateless reset in response
2314            self.timers.stop(Timer::Close);
2315        }
2316
2317        // Transmit CONNECTION_CLOSE if necessary
2318        if let State::Closed(_) = self.state {
2319            self.close = remote == self.path.remote;
2320        }
2321    }
2322
2323    fn process_decrypted_packet(
2324        &mut self,
2325        now: Instant,
2326        remote: SocketAddr,
2327        number: Option<u64>,
2328        packet: Packet,
2329    ) -> Result<(), ConnectionError> {
2330        let state = match self.state {
2331            State::Established => {
2332                match packet.header.space() {
2333                    SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2334                    _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2335                    _ => {
2336                        trace!("discarding unexpected pre-handshake packet");
2337                    }
2338                }
2339                return Ok(());
2340            }
2341            State::Closed(_) => {
2342                for result in frame::Iter::new(packet.payload.freeze())? {
2343                    let frame = match result {
2344                        Ok(frame) => frame,
2345                        Err(err) => {
2346                            debug!("frame decoding error: {err:?}");
2347                            continue;
2348                        }
2349                    };
2350
2351                    if let Frame::Padding = frame {
2352                        continue;
2353                    };
2354
2355                    self.stats.frame_rx.record(&frame);
2356
2357                    if let Frame::Close(_) = frame {
2358                        trace!("draining");
2359                        self.state = State::Draining;
2360                        break;
2361                    }
2362                }
2363                return Ok(());
2364            }
2365            State::Draining | State::Drained => return Ok(()),
2366            State::Handshake(ref mut state) => state,
2367        };
2368
2369        match packet.header {
2370            Header::Retry {
2371                src_cid: rem_cid, ..
2372            } => {
2373                if self.side.is_server() {
2374                    return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2375                }
2376
2377                if self.total_authed_packets > 1
2378                            || packet.payload.len() <= 16 // token + 16 byte tag
2379                            || !self.crypto.is_valid_retry(
2380                                &self.rem_cids.active(),
2381                                &packet.header_data,
2382                                &packet.payload,
2383                            )
2384                {
2385                    trace!("discarding invalid Retry");
2386                    // - After the client has received and processed an Initial or Retry
2387                    //   packet from the server, it MUST discard any subsequent Retry
2388                    //   packets that it receives.
2389                    // - A client MUST discard a Retry packet with a zero-length Retry Token
2390                    //   field.
2391                    // - Clients MUST discard Retry packets that have a Retry Integrity Tag
2392                    //   that cannot be validated
2393                    return Ok(());
2394                }
2395
2396                trace!("retrying with CID {}", rem_cid);
2397                let client_hello = state.client_hello.take().unwrap();
2398                self.retry_src_cid = Some(rem_cid);
2399                self.rem_cids.update_initial_cid(rem_cid);
2400                self.rem_handshake_cid = rem_cid;
2401
2402                let space = &mut self.spaces[SpaceId::Initial];
2403                if let Some(info) = space.take(0) {
2404                    self.on_packet_acked(now, 0, info);
2405                };
2406
2407                self.discard_space(now, SpaceId::Initial); // Make sure we clean up after any retransmitted Initials
2408                self.spaces[SpaceId::Initial] = PacketSpace {
2409                    crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2410                    next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2411                    crypto_offset: client_hello.len() as u64,
2412                    ..PacketSpace::new(now)
2413                };
2414                self.spaces[SpaceId::Initial]
2415                    .pending
2416                    .crypto
2417                    .push_back(frame::Crypto {
2418                        offset: 0,
2419                        data: client_hello,
2420                    });
2421
2422                // Retransmit all 0-RTT data
2423                let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2424                for (pn, info) in zero_rtt {
2425                    self.remove_in_flight(pn, &info);
2426                    self.spaces[SpaceId::Data].pending |= info.retransmits;
2427                }
2428                self.streams.retransmit_all_for_0rtt();
2429
2430                let token_len = packet.payload.len() - 16;
2431                let ConnectionSide::Client { ref mut token, .. } = self.side else {
2432                    unreachable!("we already short-circuited if we're server");
2433                };
2434                *token = packet.payload.freeze().split_to(token_len);
2435                self.state = State::Handshake(state::Handshake {
2436                    expected_token: Bytes::new(),
2437                    rem_cid_set: false,
2438                    client_hello: None,
2439                });
2440                Ok(())
2441            }
2442            Header::Long {
2443                ty: LongType::Handshake,
2444                src_cid: rem_cid,
2445                ..
2446            } => {
2447                if rem_cid != self.rem_handshake_cid {
2448                    debug!(
2449                        "discarding packet with mismatched remote CID: {} != {}",
2450                        self.rem_handshake_cid, rem_cid
2451                    );
2452                    return Ok(());
2453                }
2454                self.on_path_validated();
2455
2456                self.process_early_payload(now, packet)?;
2457                if self.state.is_closed() {
2458                    return Ok(());
2459                }
2460
2461                if self.crypto.is_handshaking() {
2462                    trace!("handshake ongoing");
2463                    return Ok(());
2464                }
2465
2466                if self.side.is_client() {
2467                    // Client-only because server params were set from the client's Initial
2468                    let params =
2469                        self.crypto
2470                            .transport_parameters()?
2471                            .ok_or_else(|| TransportError {
2472                                code: TransportErrorCode::crypto(0x6d),
2473                                frame: None,
2474                                reason: "transport parameters missing".into(),
2475                            })?;
2476
2477                    if self.has_0rtt() {
2478                        if !self.crypto.early_data_accepted().unwrap() {
2479                            debug_assert!(self.side.is_client());
2480                            debug!("0-RTT rejected");
2481                            self.accepted_0rtt = false;
2482                            self.streams.zero_rtt_rejected();
2483
2484                            // Discard already-queued frames
2485                            self.spaces[SpaceId::Data].pending = Retransmits::default();
2486
2487                            // Discard 0-RTT packets
2488                            let sent_packets =
2489                                mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2490                            for (pn, packet) in sent_packets {
2491                                self.remove_in_flight(pn, &packet);
2492                            }
2493                        } else {
2494                            self.accepted_0rtt = true;
2495                            params.validate_resumption_from(&self.peer_params)?;
2496                        }
2497                    }
2498                    if let Some(token) = params.stateless_reset_token {
2499                        self.endpoint_events
2500                            .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2501                    }
2502                    self.handle_peer_params(params)?;
2503                    self.issue_first_cids(now);
2504                } else {
2505                    // Server-only
2506                    self.spaces[SpaceId::Data].pending.handshake_done = true;
2507                    self.discard_space(now, SpaceId::Handshake);
2508                }
2509
2510                self.events.push_back(Event::Connected);
2511                self.state = State::Established;
2512                trace!("established");
2513                Ok(())
2514            }
2515            Header::Initial(InitialHeader {
2516                src_cid: rem_cid, ..
2517            }) => {
2518                if !state.rem_cid_set {
2519                    trace!("switching remote CID to {}", rem_cid);
2520                    let mut state = state.clone();
2521                    self.rem_cids.update_initial_cid(rem_cid);
2522                    self.rem_handshake_cid = rem_cid;
2523                    self.orig_rem_cid = rem_cid;
2524                    state.rem_cid_set = true;
2525                    self.state = State::Handshake(state);
2526                } else if rem_cid != self.rem_handshake_cid {
2527                    debug!(
2528                        "discarding packet with mismatched remote CID: {} != {}",
2529                        self.rem_handshake_cid, rem_cid
2530                    );
2531                    return Ok(());
2532                }
2533
2534                let starting_space = self.highest_space;
2535                self.process_early_payload(now, packet)?;
2536
2537                if self.side.is_server()
2538                    && starting_space == SpaceId::Initial
2539                    && self.highest_space != SpaceId::Initial
2540                {
2541                    let params =
2542                        self.crypto
2543                            .transport_parameters()?
2544                            .ok_or_else(|| TransportError {
2545                                code: TransportErrorCode::crypto(0x6d),
2546                                frame: None,
2547                                reason: "transport parameters missing".into(),
2548                            })?;
2549                    self.handle_peer_params(params)?;
2550                    self.issue_first_cids(now);
2551                    self.init_0rtt();
2552                }
2553                Ok(())
2554            }
2555            Header::Long {
2556                ty: LongType::ZeroRtt,
2557                ..
2558            } => {
2559                self.process_payload(now, remote, number.unwrap(), packet)?;
2560                Ok(())
2561            }
2562            Header::VersionNegotiate { .. } => {
2563                if self.total_authed_packets > 1 {
2564                    return Ok(());
2565                }
2566                let supported = packet
2567                    .payload
2568                    .chunks(4)
2569                    .any(|x| match <[u8; 4]>::try_from(x) {
2570                        Ok(version) => self.version == u32::from_be_bytes(version),
2571                        Err(_) => false,
2572                    });
2573                if supported {
2574                    return Ok(());
2575                }
2576                debug!("remote doesn't support our version");
2577                Err(ConnectionError::VersionMismatch)
2578            }
2579            Header::Short { .. } => unreachable!(
2580                "short packets received during handshake are discarded in handle_packet"
2581            ),
2582        }
2583    }
2584
2585    /// Process an Initial or Handshake packet payload
2586    fn process_early_payload(
2587        &mut self,
2588        now: Instant,
2589        packet: Packet,
2590    ) -> Result<(), TransportError> {
2591        debug_assert_ne!(packet.header.space(), SpaceId::Data);
2592        let payload_len = packet.payload.len();
2593        let mut ack_eliciting = false;
2594        for result in frame::Iter::new(packet.payload.freeze())? {
2595            let frame = result?;
2596            let span = match frame {
2597                Frame::Padding => continue,
2598                _ => Some(trace_span!("frame", ty = %frame.ty())),
2599            };
2600
2601            self.stats.frame_rx.record(&frame);
2602
2603            let _guard = span.as_ref().map(|x| x.enter());
2604            ack_eliciting |= frame.is_ack_eliciting();
2605
2606            // Process frames
2607            match frame {
2608                Frame::Padding | Frame::Ping => {}
2609                Frame::Crypto(frame) => {
2610                    self.read_crypto(packet.header.space(), &frame, payload_len)?;
2611                }
2612                Frame::Ack(ack) => {
2613                    self.on_ack_received(now, packet.header.space(), ack)?;
2614                }
2615                Frame::Close(reason) => {
2616                    self.error = Some(reason.into());
2617                    self.state = State::Draining;
2618                    return Ok(());
2619                }
2620                _ => {
2621                    let mut err =
2622                        TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2623                    err.frame = Some(frame.ty());
2624                    return Err(err);
2625                }
2626            }
2627        }
2628
2629        if ack_eliciting {
2630            // In the initial and handshake spaces, ACKs must be sent immediately
2631            self.spaces[packet.header.space()]
2632                .pending_acks
2633                .set_immediate_ack_required();
2634        }
2635
2636        self.write_crypto();
2637        Ok(())
2638    }
2639
2640    fn process_payload(
2641        &mut self,
2642        now: Instant,
2643        remote: SocketAddr,
2644        number: u64,
2645        packet: Packet,
2646    ) -> Result<(), TransportError> {
2647        let payload = packet.payload.freeze();
2648        let mut is_probing_packet = true;
2649        let mut close = None;
2650        let payload_len = payload.len();
2651        let mut ack_eliciting = false;
2652        for result in frame::Iter::new(payload)? {
2653            let frame = result?;
2654            let span = match frame {
2655                Frame::Padding => continue,
2656                _ => Some(trace_span!("frame", ty = %frame.ty())),
2657            };
2658
2659            self.stats.frame_rx.record(&frame);
2660            // Crypto, Stream and Datagram frames are special cased in order no pollute
2661            // the log with payload data
2662            match &frame {
2663                Frame::Crypto(f) => {
2664                    trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2665                }
2666                Frame::Stream(f) => {
2667                    trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2668                }
2669                Frame::Datagram(f) => {
2670                    trace!(len = f.data.len(), "got datagram frame");
2671                }
2672                f => {
2673                    trace!("got frame {:?}", f);
2674                }
2675            }
2676
2677            let _guard = span.as_ref().map(|x| x.enter());
2678            if packet.header.is_0rtt() {
2679                match frame {
2680                    Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2681                        return Err(TransportError::PROTOCOL_VIOLATION(
2682                            "illegal frame type in 0-RTT",
2683                        ));
2684                    }
2685                    _ => {}
2686                }
2687            }
2688            ack_eliciting |= frame.is_ack_eliciting();
2689
2690            // Check whether this could be a probing packet
2691            match frame {
2692                Frame::Padding
2693                | Frame::PathChallenge(_)
2694                | Frame::PathResponse(_)
2695                | Frame::NewConnectionId(_) => {}
2696                _ => {
2697                    is_probing_packet = false;
2698                }
2699            }
2700            match frame {
2701                Frame::Crypto(frame) => {
2702                    self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2703                }
2704                Frame::Stream(frame) => {
2705                    if self.streams.received(frame, payload_len)?.should_transmit() {
2706                        self.spaces[SpaceId::Data].pending.max_data = true;
2707                    }
2708                }
2709                Frame::Ack(ack) => {
2710                    self.on_ack_received(now, SpaceId::Data, ack)?;
2711                }
2712                Frame::Padding | Frame::Ping => {}
2713                Frame::Close(reason) => {
2714                    close = Some(reason);
2715                }
2716                Frame::PathChallenge(token) => {
2717                    self.path_responses.push(number, token, remote);
2718                    if remote == self.path.remote {
2719                        // PATH_CHALLENGE on active path, possible off-path packet forwarding
2720                        // attack. Send a non-probing packet to recover the active path.
2721                        match self.peer_supports_ack_frequency() {
2722                            true => self.immediate_ack(),
2723                            false => self.ping(),
2724                        }
2725                    }
2726                }
2727                Frame::PathResponse(token) => {
2728                    if self.path.challenge == Some(token) && remote == self.path.remote {
2729                        trace!("new path validated");
2730                        self.timers.stop(Timer::PathValidation);
2731                        self.path.challenge = None;
2732                        self.path.validated = true;
2733                        if let Some((_, ref mut prev_path)) = self.prev_path {
2734                            prev_path.challenge = None;
2735                            prev_path.challenge_pending = false;
2736                        }
2737                    } else {
2738                        debug!(token, "ignoring invalid PATH_RESPONSE");
2739                    }
2740                }
2741                Frame::MaxData(bytes) => {
2742                    self.streams.received_max_data(bytes);
2743                }
2744                Frame::MaxStreamData { id, offset } => {
2745                    self.streams.received_max_stream_data(id, offset)?;
2746                }
2747                Frame::MaxStreams { dir, count } => {
2748                    self.streams.received_max_streams(dir, count)?;
2749                }
2750                Frame::ResetStream(frame) => {
2751                    if self.streams.received_reset(frame)?.should_transmit() {
2752                        self.spaces[SpaceId::Data].pending.max_data = true;
2753                    }
2754                }
2755                Frame::DataBlocked { offset } => {
2756                    debug!(offset, "peer claims to be blocked at connection level");
2757                }
2758                Frame::StreamDataBlocked { id, offset } => {
2759                    if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2760                        debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2761                        return Err(TransportError::STREAM_STATE_ERROR(
2762                            "STREAM_DATA_BLOCKED on send-only stream",
2763                        ));
2764                    }
2765                    debug!(
2766                        stream = %id,
2767                        offset, "peer claims to be blocked at stream level"
2768                    );
2769                }
2770                Frame::StreamsBlocked { dir, limit } => {
2771                    if limit > MAX_STREAM_COUNT {
2772                        return Err(TransportError::FRAME_ENCODING_ERROR(
2773                            "unrepresentable stream limit",
2774                        ));
2775                    }
2776                    debug!(
2777                        "peer claims to be blocked opening more than {} {} streams",
2778                        limit, dir
2779                    );
2780                }
2781                Frame::StopSending(frame::StopSending { id, error_code }) => {
2782                    if id.initiator() != self.side.side() {
2783                        if id.dir() == Dir::Uni {
2784                            debug!("got STOP_SENDING on recv-only {}", id);
2785                            return Err(TransportError::STREAM_STATE_ERROR(
2786                                "STOP_SENDING on recv-only stream",
2787                            ));
2788                        }
2789                    } else if self.streams.is_local_unopened(id) {
2790                        return Err(TransportError::STREAM_STATE_ERROR(
2791                            "STOP_SENDING on unopened stream",
2792                        ));
2793                    }
2794                    self.streams.received_stop_sending(id, error_code);
2795                }
2796                Frame::RetireConnectionId { sequence } => {
2797                    let allow_more_cids = self
2798                        .local_cid_state
2799                        .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2800                    self.endpoint_events
2801                        .push_back(EndpointEventInner::RetireConnectionId(
2802                            now,
2803                            sequence,
2804                            allow_more_cids,
2805                        ));
2806                }
2807                Frame::NewConnectionId(frame) => {
2808                    trace!(
2809                        sequence = frame.sequence,
2810                        id = %frame.id,
2811                        retire_prior_to = frame.retire_prior_to,
2812                    );
2813                    if self.rem_cids.active().is_empty() {
2814                        return Err(TransportError::PROTOCOL_VIOLATION(
2815                            "NEW_CONNECTION_ID when CIDs aren't in use",
2816                        ));
2817                    }
2818                    if frame.retire_prior_to > frame.sequence {
2819                        return Err(TransportError::PROTOCOL_VIOLATION(
2820                            "NEW_CONNECTION_ID retiring unissued CIDs",
2821                        ));
2822                    }
2823
2824                    use crate::cid_queue::InsertError;
2825                    match self.rem_cids.insert(frame) {
2826                        Ok(None) => {}
2827                        Ok(Some((retired, reset_token))) => {
2828                            let pending_retired =
2829                                &mut self.spaces[SpaceId::Data].pending.retire_cids;
2830                            /// Ensure `pending_retired` cannot grow without bound. Limit is
2831                            /// somewhat arbitrary but very permissive.
2832                            const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2833                            // We don't bother counting in-flight frames because those are bounded
2834                            // by congestion control.
2835                            if (pending_retired.len() as u64)
2836                                .saturating_add(retired.end.saturating_sub(retired.start))
2837                                > MAX_PENDING_RETIRED_CIDS
2838                            {
2839                                return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2840                                    "queued too many retired CIDs",
2841                                ));
2842                            }
2843                            pending_retired.extend(retired);
2844                            self.set_reset_token(reset_token);
2845                        }
2846                        Err(InsertError::ExceedsLimit) => {
2847                            return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2848                        }
2849                        Err(InsertError::Retired) => {
2850                            trace!("discarding already-retired");
2851                            // RETIRE_CONNECTION_ID might not have been previously sent if e.g. a
2852                            // range of connection IDs larger than the active connection ID limit
2853                            // was retired all at once via retire_prior_to.
2854                            self.spaces[SpaceId::Data]
2855                                .pending
2856                                .retire_cids
2857                                .push(frame.sequence);
2858                            continue;
2859                        }
2860                    };
2861
2862                    if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2863                        // We're a server still using the initial remote CID for the client, so
2864                        // let's switch immediately to enable clientside stateless resets.
2865                        self.update_rem_cid();
2866                    }
2867                }
2868                Frame::NewToken(NewToken { token }) => {
2869                    let ConnectionSide::Client {
2870                        token_store,
2871                        server_name,
2872                        ..
2873                    } = &self.side
2874                    else {
2875                        return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2876                    };
2877                    if token.is_empty() {
2878                        return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2879                    }
2880                    trace!("got new token");
2881                    token_store.insert(server_name, token);
2882                }
2883                Frame::Datagram(datagram) => {
2884                    if self
2885                        .datagrams
2886                        .received(datagram, &self.config.datagram_receive_buffer_size)?
2887                    {
2888                        self.events.push_back(Event::DatagramReceived);
2889                    }
2890                }
2891                Frame::AckFrequency(ack_frequency) => {
2892                    // This frame can only be sent in the Data space
2893                    let space = &mut self.spaces[SpaceId::Data];
2894
2895                    if !self
2896                        .ack_frequency
2897                        .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2898                    {
2899                        // The AckFrequency frame is stale (we have already received a more recent one)
2900                        continue;
2901                    }
2902
2903                    // Our `max_ack_delay` has been updated, so we may need to adjust its associated
2904                    // timeout
2905                    if let Some(timeout) = space
2906                        .pending_acks
2907                        .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2908                    {
2909                        self.timers.set(Timer::MaxAckDelay, timeout);
2910                    }
2911                }
2912                Frame::ImmediateAck => {
2913                    // This frame can only be sent in the Data space
2914                    self.spaces[SpaceId::Data]
2915                        .pending_acks
2916                        .set_immediate_ack_required();
2917                }
2918                Frame::HandshakeDone => {
2919                    if self.side.is_server() {
2920                        return Err(TransportError::PROTOCOL_VIOLATION(
2921                            "client sent HANDSHAKE_DONE",
2922                        ));
2923                    }
2924                    if self.spaces[SpaceId::Handshake].crypto.is_some() {
2925                        self.discard_space(now, SpaceId::Handshake);
2926                    }
2927                }
2928            }
2929        }
2930
2931        let space = &mut self.spaces[SpaceId::Data];
2932        if space
2933            .pending_acks
2934            .packet_received(now, number, ack_eliciting, &space.dedup)
2935        {
2936            self.timers
2937                .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
2938        }
2939
2940        // Issue stream ID credit due to ACKs of outgoing finish/resets and incoming finish/resets
2941        // on stopped streams. Incoming finishes/resets on open streams are not handled here as they
2942        // are only freed, and hence only issue credit, once the application has been notified
2943        // during a read on the stream.
2944        let pending = &mut self.spaces[SpaceId::Data].pending;
2945        self.streams.queue_max_stream_id(pending);
2946
2947        if let Some(reason) = close {
2948            self.error = Some(reason.into());
2949            self.state = State::Draining;
2950            self.close = true;
2951        }
2952
2953        if remote != self.path.remote
2954            && !is_probing_packet
2955            && number == self.spaces[SpaceId::Data].rx_packet
2956        {
2957            let ConnectionSide::Server { ref server_config } = self.side else {
2958                panic!("packets from unknown remote should be dropped by clients");
2959            };
2960            debug_assert!(
2961                server_config.migration,
2962                "migration-initiating packets should have been dropped immediately"
2963            );
2964            self.migrate(now, remote);
2965            // Break linkability, if possible
2966            self.update_rem_cid();
2967            self.spin = false;
2968        }
2969
2970        Ok(())
2971    }
2972
2973    fn migrate(&mut self, now: Instant, remote: SocketAddr) {
2974        trace!(%remote, "migration initiated");
2975        // Reset rtt/congestion state for new path unless it looks like a NAT rebinding.
2976        // Note that the congestion window will not grow until validation terminates. Helps mitigate
2977        // amplification attacks performed by spoofing source addresses.
2978        let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
2979            PathData::from_previous(remote, &self.path, now)
2980        } else {
2981            let peer_max_udp_payload_size =
2982                u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
2983                    .unwrap_or(u16::MAX);
2984            PathData::new(
2985                remote,
2986                self.allow_mtud,
2987                Some(peer_max_udp_payload_size),
2988                now,
2989                &self.config,
2990            )
2991        };
2992        new_path.challenge = Some(self.rng.random());
2993        new_path.challenge_pending = true;
2994        let prev_pto = self.pto(SpaceId::Data);
2995
2996        let mut prev = mem::replace(&mut self.path, new_path);
2997        // Don't clobber the original path if the previous one hasn't been validated yet
2998        if prev.challenge.is_none() {
2999            prev.challenge = Some(self.rng.random());
3000            prev.challenge_pending = true;
3001            // We haven't updated the remote CID yet, this captures the remote CID we were using on
3002            // the previous path.
3003            self.prev_path = Some((self.rem_cids.active(), prev));
3004        }
3005
3006        self.timers.set(
3007            Timer::PathValidation,
3008            now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3009        );
3010    }
3011
3012    /// Handle a change in the local address, i.e. an active migration
3013    pub fn local_address_changed(&mut self) {
3014        self.update_rem_cid();
3015        self.ping();
3016    }
3017
3018    /// Switch to a previously unused remote connection ID, if possible
3019    fn update_rem_cid(&mut self) {
3020        let (reset_token, retired) = match self.rem_cids.next() {
3021            Some(x) => x,
3022            None => return,
3023        };
3024
3025        // Retire the current remote CID and any CIDs we had to skip.
3026        self.spaces[SpaceId::Data]
3027            .pending
3028            .retire_cids
3029            .extend(retired);
3030        self.set_reset_token(reset_token);
3031    }
3032
3033    fn set_reset_token(&mut self, reset_token: ResetToken) {
3034        self.endpoint_events
3035            .push_back(EndpointEventInner::ResetToken(
3036                self.path.remote,
3037                reset_token,
3038            ));
3039        self.peer_params.stateless_reset_token = Some(reset_token);
3040    }
3041
3042    /// Issue an initial set of connection IDs to the peer upon connection
3043    fn issue_first_cids(&mut self, now: Instant) {
3044        if self.local_cid_state.cid_len() == 0 {
3045            return;
3046        }
3047
3048        // Subtract 1 to account for the CID we supplied while handshaking
3049        let n = self.peer_params.issue_cids_limit() - 1;
3050        self.endpoint_events
3051            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3052    }
3053
3054    fn populate_packet(
3055        &mut self,
3056        now: Instant,
3057        space_id: SpaceId,
3058        buf: &mut Vec<u8>,
3059        max_size: usize,
3060        pn: u64,
3061    ) -> SentFrames {
3062        let mut sent = SentFrames::default();
3063        let space = &mut self.spaces[space_id];
3064        let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3065        space.pending_acks.maybe_ack_non_eliciting();
3066
3067        // HANDSHAKE_DONE
3068        if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3069            buf.write(frame::FrameType::HANDSHAKE_DONE);
3070            sent.retransmits.get_or_create().handshake_done = true;
3071            // This is just a u8 counter and the frame is typically just sent once
3072            self.stats.frame_tx.handshake_done =
3073                self.stats.frame_tx.handshake_done.saturating_add(1);
3074        }
3075
3076        // PING
3077        if mem::replace(&mut space.ping_pending, false) {
3078            trace!("PING");
3079            buf.write(frame::FrameType::PING);
3080            sent.non_retransmits = true;
3081            self.stats.frame_tx.ping += 1;
3082        }
3083
3084        // IMMEDIATE_ACK
3085        if mem::replace(&mut space.immediate_ack_pending, false) {
3086            trace!("IMMEDIATE_ACK");
3087            buf.write(frame::FrameType::IMMEDIATE_ACK);
3088            sent.non_retransmits = true;
3089            self.stats.frame_tx.immediate_ack += 1;
3090        }
3091
3092        // ACK
3093        if space.pending_acks.can_send() {
3094            Self::populate_acks(
3095                now,
3096                self.receiving_ecn,
3097                &mut sent,
3098                space,
3099                buf,
3100                &mut self.stats,
3101            );
3102        }
3103
3104        // ACK_FREQUENCY
3105        if mem::replace(&mut space.pending.ack_frequency, false) {
3106            let sequence_number = self.ack_frequency.next_sequence_number();
3107
3108            // Safe to unwrap because this is always provided when ACK frequency is enabled
3109            let config = self.config.ack_frequency_config.as_ref().unwrap();
3110
3111            // Ensure the delay is within bounds to avoid a PROTOCOL_VIOLATION error
3112            let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3113                self.path.rtt.get(),
3114                config,
3115                &self.peer_params,
3116            );
3117
3118            trace!(?max_ack_delay, "ACK_FREQUENCY");
3119
3120            frame::AckFrequency {
3121                sequence: sequence_number,
3122                ack_eliciting_threshold: config.ack_eliciting_threshold,
3123                request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3124                reordering_threshold: config.reordering_threshold,
3125            }
3126            .encode(buf);
3127
3128            sent.retransmits.get_or_create().ack_frequency = true;
3129
3130            self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3131            self.stats.frame_tx.ack_frequency += 1;
3132        }
3133
3134        // PATH_CHALLENGE
3135        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3136            // Transmit challenges with every outgoing frame on an unvalidated path
3137            if let Some(token) = self.path.challenge {
3138                // But only send a packet solely for that purpose at most once
3139                self.path.challenge_pending = false;
3140                sent.non_retransmits = true;
3141                sent.requires_padding = true;
3142                trace!("PATH_CHALLENGE {:08x}", token);
3143                buf.write(frame::FrameType::PATH_CHALLENGE);
3144                buf.write(token);
3145                self.stats.frame_tx.path_challenge += 1;
3146            }
3147        }
3148
3149        // PATH_RESPONSE
3150        if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3151            if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3152                sent.non_retransmits = true;
3153                sent.requires_padding = true;
3154                trace!("PATH_RESPONSE {:08x}", token);
3155                buf.write(frame::FrameType::PATH_RESPONSE);
3156                buf.write(token);
3157                self.stats.frame_tx.path_response += 1;
3158            }
3159        }
3160
3161        // CRYPTO
3162        while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3163            let mut frame = match space.pending.crypto.pop_front() {
3164                Some(x) => x,
3165                None => break,
3166            };
3167
3168            // Calculate the maximum amount of crypto data we can store in the buffer.
3169            // Since the offset is known, we can reserve the exact size required to encode it.
3170            // For length we reserve 2bytes which allows to encode up to 2^14,
3171            // which is more than what fits into normally sized QUIC frames.
3172            let max_crypto_data_size = max_size
3173                - buf.len()
3174                - 1 // Frame Type
3175                - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3176                - 2; // Maximum encoded length for frame size, given we send less than 2^14 bytes
3177
3178            let len = frame
3179                .data
3180                .len()
3181                .min(2usize.pow(14) - 1)
3182                .min(max_crypto_data_size);
3183
3184            let data = frame.data.split_to(len);
3185            let truncated = frame::Crypto {
3186                offset: frame.offset,
3187                data,
3188            };
3189            trace!(
3190                "CRYPTO: off {} len {}",
3191                truncated.offset,
3192                truncated.data.len()
3193            );
3194            truncated.encode(buf);
3195            self.stats.frame_tx.crypto += 1;
3196            sent.retransmits.get_or_create().crypto.push_back(truncated);
3197            if !frame.data.is_empty() {
3198                frame.offset += len as u64;
3199                space.pending.crypto.push_front(frame);
3200            }
3201        }
3202
3203        if space_id == SpaceId::Data {
3204            self.streams.write_control_frames(
3205                buf,
3206                &mut space.pending,
3207                &mut sent.retransmits,
3208                &mut self.stats.frame_tx,
3209                max_size,
3210            );
3211        }
3212
3213        // NEW_CONNECTION_ID
3214        while buf.len() + 44 < max_size {
3215            let issued = match space.pending.new_cids.pop() {
3216                Some(x) => x,
3217                None => break,
3218            };
3219            trace!(
3220                sequence = issued.sequence,
3221                id = %issued.id,
3222                "NEW_CONNECTION_ID"
3223            );
3224            frame::NewConnectionId {
3225                sequence: issued.sequence,
3226                retire_prior_to: self.local_cid_state.retire_prior_to(),
3227                id: issued.id,
3228                reset_token: issued.reset_token,
3229            }
3230            .encode(buf);
3231            sent.retransmits.get_or_create().new_cids.push(issued);
3232            self.stats.frame_tx.new_connection_id += 1;
3233        }
3234
3235        // RETIRE_CONNECTION_ID
3236        while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3237            let seq = match space.pending.retire_cids.pop() {
3238                Some(x) => x,
3239                None => break,
3240            };
3241            trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3242            buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3243            buf.write_var(seq);
3244            sent.retransmits.get_or_create().retire_cids.push(seq);
3245            self.stats.frame_tx.retire_connection_id += 1;
3246        }
3247
3248        // DATAGRAM
3249        let mut sent_datagrams = false;
3250        while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3251            match self.datagrams.write(buf, max_size) {
3252                true => {
3253                    sent_datagrams = true;
3254                    sent.non_retransmits = true;
3255                    self.stats.frame_tx.datagram += 1;
3256                }
3257                false => break,
3258            }
3259        }
3260        if self.datagrams.send_blocked && sent_datagrams {
3261            self.events.push_back(Event::DatagramsUnblocked);
3262            self.datagrams.send_blocked = false;
3263        }
3264
3265        // NEW_TOKEN
3266        while let Some(remote_addr) = space.pending.new_tokens.pop() {
3267            debug_assert_eq!(space_id, SpaceId::Data);
3268            let ConnectionSide::Server { server_config } = &self.side else {
3269                panic!("NEW_TOKEN frames should not be enqueued by clients");
3270            };
3271
3272            if remote_addr != self.path.remote {
3273                // NEW_TOKEN frames contain tokens bound to a client's IP address, and are only
3274                // useful if used from the same IP address.  Thus, we abandon enqueued NEW_TOKEN
3275                // frames upon an path change. Instead, when the new path becomes validated,
3276                // NEW_TOKEN frames may be enqueued for the new path instead.
3277                continue;
3278            }
3279
3280            let token = Token::new(
3281                TokenPayload::Validation {
3282                    ip: remote_addr.ip(),
3283                    issued: server_config.time_source.now(),
3284                },
3285                &mut self.rng,
3286            );
3287            let new_token = NewToken {
3288                token: token.encode(&*server_config.token_key).into(),
3289            };
3290
3291            if buf.len() + new_token.size() >= max_size {
3292                space.pending.new_tokens.push(remote_addr);
3293                break;
3294            }
3295
3296            new_token.encode(buf);
3297            sent.retransmits
3298                .get_or_create()
3299                .new_tokens
3300                .push(remote_addr);
3301            self.stats.frame_tx.new_token += 1;
3302        }
3303
3304        // STREAM
3305        if space_id == SpaceId::Data {
3306            sent.stream_frames =
3307                self.streams
3308                    .write_stream_frames(buf, max_size, self.config.send_fairness);
3309            self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3310        }
3311
3312        sent
3313    }
3314
3315    /// Write pending ACKs into a buffer
3316    ///
3317    /// This method assumes ACKs are pending, and should only be called if
3318    /// `!PendingAcks::ranges().is_empty()` returns `true`.
3319    fn populate_acks(
3320        now: Instant,
3321        receiving_ecn: bool,
3322        sent: &mut SentFrames,
3323        space: &mut PacketSpace,
3324        buf: &mut Vec<u8>,
3325        stats: &mut ConnectionStats,
3326    ) {
3327        debug_assert!(!space.pending_acks.ranges().is_empty());
3328
3329        // 0-RTT packets must never carry acks (which would have to be of handshake packets)
3330        debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3331        let ecn = if receiving_ecn {
3332            Some(&space.ecn_counters)
3333        } else {
3334            None
3335        };
3336        sent.largest_acked = space.pending_acks.ranges().max();
3337
3338        let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3339
3340        // TODO: This should come from `TransportConfig` if that gets configurable.
3341        let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3342        let delay = delay_micros >> ack_delay_exp.into_inner();
3343
3344        trace!(
3345            "ACK {:?}, Delay = {}us",
3346            space.pending_acks.ranges(),
3347            delay_micros
3348        );
3349
3350        frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3351        stats.frame_tx.acks += 1;
3352    }
3353
3354    fn close_common(&mut self) {
3355        trace!("connection closed");
3356        for &timer in &Timer::VALUES {
3357            self.timers.stop(timer);
3358        }
3359    }
3360
3361    fn set_close_timer(&mut self, now: Instant) {
3362        self.timers
3363            .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3364    }
3365
3366    /// Handle transport parameters received from the peer
3367    fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3368        if Some(self.orig_rem_cid) != params.initial_src_cid
3369            || (self.side.is_client()
3370                && (Some(self.initial_dst_cid) != params.original_dst_cid
3371                    || self.retry_src_cid != params.retry_src_cid))
3372        {
3373            return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3374                "CID authentication failure",
3375            ));
3376        }
3377
3378        self.set_peer_params(params);
3379
3380        Ok(())
3381    }
3382
3383    fn set_peer_params(&mut self, params: TransportParameters) {
3384        self.streams.set_params(&params);
3385        self.idle_timeout =
3386            negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3387        trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3388        if let Some(ref info) = params.preferred_address {
3389            self.rem_cids.insert(frame::NewConnectionId {
3390                sequence: 1,
3391                id: info.connection_id,
3392                reset_token: info.stateless_reset_token,
3393                retire_prior_to: 0,
3394            }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3395        }
3396        self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(&params);
3397        self.peer_params = params;
3398        self.path.mtud.on_peer_max_udp_payload_size_received(
3399            u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3400        );
3401    }
3402
3403    fn decrypt_packet(
3404        &mut self,
3405        now: Instant,
3406        packet: &mut Packet,
3407    ) -> Result<Option<u64>, Option<TransportError>> {
3408        let result = packet_crypto::decrypt_packet_body(
3409            packet,
3410            &self.spaces,
3411            self.zero_rtt_crypto.as_ref(),
3412            self.key_phase,
3413            self.prev_crypto.as_ref(),
3414            self.next_crypto.as_ref(),
3415        )?;
3416
3417        let result = match result {
3418            Some(r) => r,
3419            None => return Ok(None),
3420        };
3421
3422        if result.outgoing_key_update_acked {
3423            if let Some(prev) = self.prev_crypto.as_mut() {
3424                prev.end_packet = Some((result.number, now));
3425                self.set_key_discard_timer(now, packet.header.space());
3426            }
3427        }
3428
3429        if result.incoming_key_update {
3430            trace!("key update authenticated");
3431            self.update_keys(Some((result.number, now)), true);
3432            self.set_key_discard_timer(now, packet.header.space());
3433        }
3434
3435        Ok(Some(result.number))
3436    }
3437
3438    fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3439        trace!("executing key update");
3440        // Generate keys for the key phase after the one we're switching to, store them in
3441        // `next_crypto`, make the contents of `next_crypto` current, and move the current keys into
3442        // `prev_crypto`.
3443        let new = self
3444            .crypto
3445            .next_1rtt_keys()
3446            .expect("only called for `Data` packets");
3447        self.key_phase_size = new
3448            .local
3449            .confidentiality_limit()
3450            .saturating_sub(KEY_UPDATE_MARGIN);
3451        let old = mem::replace(
3452            &mut self.spaces[SpaceId::Data]
3453                .crypto
3454                .as_mut()
3455                .unwrap() // safe because update_keys() can only be triggered by short packets
3456                .packet,
3457            mem::replace(self.next_crypto.as_mut().unwrap(), new),
3458        );
3459        self.spaces[SpaceId::Data].sent_with_keys = 0;
3460        self.prev_crypto = Some(PrevCrypto {
3461            crypto: old,
3462            end_packet,
3463            update_unacked: remote,
3464        });
3465        self.key_phase = !self.key_phase;
3466    }
3467
3468    fn peer_supports_ack_frequency(&self) -> bool {
3469        self.peer_params.min_ack_delay.is_some()
3470    }
3471
3472    /// Send an IMMEDIATE_ACK frame to the remote endpoint
3473    ///
3474    /// According to the spec, this will result in an error if the remote endpoint does not support
3475    /// the Acknowledgement Frequency extension
3476    pub(crate) fn immediate_ack(&mut self) {
3477        self.spaces[self.highest_space].immediate_ack_pending = true;
3478    }
3479
3480    /// Decodes a packet, returning its decrypted payload, so it can be inspected in tests
3481    #[cfg(test)]
3482    pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3483        let (first_decode, remaining) = match &event.0 {
3484            ConnectionEventInner::Datagram(DatagramConnectionEvent {
3485                first_decode,
3486                remaining,
3487                ..
3488            }) => (first_decode, remaining),
3489            _ => return None,
3490        };
3491
3492        if remaining.is_some() {
3493            panic!("Packets should never be coalesced in tests");
3494        }
3495
3496        let decrypted_header = packet_crypto::unprotect_header(
3497            first_decode.clone(),
3498            &self.spaces,
3499            self.zero_rtt_crypto.as_ref(),
3500            self.peer_params.stateless_reset_token,
3501        )?;
3502
3503        let mut packet = decrypted_header.packet?;
3504        packet_crypto::decrypt_packet_body(
3505            &mut packet,
3506            &self.spaces,
3507            self.zero_rtt_crypto.as_ref(),
3508            self.key_phase,
3509            self.prev_crypto.as_ref(),
3510            self.next_crypto.as_ref(),
3511        )
3512        .ok()?;
3513
3514        Some(packet.payload.to_vec())
3515    }
3516
3517    /// The number of bytes of packets containing retransmittable frames that have not been
3518    /// acknowledged or declared lost.
3519    #[cfg(test)]
3520    pub(crate) fn bytes_in_flight(&self) -> u64 {
3521        self.path.in_flight.bytes
3522    }
3523
3524    /// Number of bytes worth of non-ack-only packets that may be sent
3525    #[cfg(test)]
3526    pub(crate) fn congestion_window(&self) -> u64 {
3527        self.path
3528            .congestion
3529            .window()
3530            .saturating_sub(self.path.in_flight.bytes)
3531    }
3532
3533    /// Whether no timers but keepalive, idle, rtt, pushnewcid, and key discard are running
3534    #[cfg(test)]
3535    pub(crate) fn is_idle(&self) -> bool {
3536        Timer::VALUES
3537            .iter()
3538            .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
3539            .filter_map(|&t| Some((t, self.timers.get(t)?)))
3540            .min_by_key(|&(_, time)| time)
3541            .map_or(true, |(timer, _)| timer == Timer::Idle)
3542    }
3543
3544    /// Total number of outgoing packets that have been deemed lost
3545    #[cfg(test)]
3546    pub(crate) fn lost_packets(&self) -> u64 {
3547        self.lost_packets
3548    }
3549
3550    /// Whether explicit congestion notification is in use on outgoing packets.
3551    #[cfg(test)]
3552    pub(crate) fn using_ecn(&self) -> bool {
3553        self.path.sending_ecn
3554    }
3555
3556    /// The number of received bytes in the current path
3557    #[cfg(test)]
3558    pub(crate) fn total_recvd(&self) -> u64 {
3559        self.path.total_recvd
3560    }
3561
3562    #[cfg(test)]
3563    pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3564        self.local_cid_state.active_seq()
3565    }
3566
3567    /// Instruct the peer to replace previously issued CIDs by sending a NEW_CONNECTION_ID frame
3568    /// with updated `retire_prior_to` field set to `v`
3569    #[cfg(test)]
3570    pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3571        let n = self.local_cid_state.assign_retire_seq(v);
3572        self.endpoint_events
3573            .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3574    }
3575
3576    /// Check the current active remote CID sequence
3577    #[cfg(test)]
3578    pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3579        self.rem_cids.active_seq()
3580    }
3581
3582    /// Returns the detected maximum udp payload size for the current path
3583    #[cfg(test)]
3584    pub(crate) fn path_mtu(&self) -> u16 {
3585        self.path.current_mtu()
3586    }
3587
3588    /// Whether we have 1-RTT data to send
3589    ///
3590    /// See also `self.space(SpaceId::Data).can_send()`
3591    fn can_send_1rtt(&self, max_size: usize) -> bool {
3592        self.streams.can_send_stream_data()
3593            || self.path.challenge_pending
3594            || self
3595                .prev_path
3596                .as_ref()
3597                .is_some_and(|(_, x)| x.challenge_pending)
3598            || !self.path_responses.is_empty()
3599            || self
3600                .datagrams
3601                .outgoing
3602                .front()
3603                .is_some_and(|x| x.size(true) <= max_size)
3604    }
3605
3606    /// Update counters to account for a packet becoming acknowledged, lost, or abandoned
3607    fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
3608        // Visit known paths from newest to oldest to find the one `pn` was sent on
3609        for path in [&mut self.path]
3610            .into_iter()
3611            .chain(self.prev_path.as_mut().map(|(_, data)| data))
3612        {
3613            if path.remove_in_flight(pn, packet) {
3614                return;
3615            }
3616        }
3617    }
3618
3619    /// Terminate the connection instantly, without sending a close packet
3620    fn kill(&mut self, reason: ConnectionError) {
3621        self.close_common();
3622        self.error = Some(reason);
3623        self.state = State::Drained;
3624        self.endpoint_events.push_back(EndpointEventInner::Drained);
3625    }
3626
3627    /// Storage size required for the largest packet known to be supported by the current path
3628    ///
3629    /// Buffers passed to [`Connection::poll_transmit`] should be at least this large.
3630    pub fn current_mtu(&self) -> u16 {
3631        self.path.current_mtu()
3632    }
3633
3634    /// Size of non-frame data for a 1-RTT packet
3635    ///
3636    /// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
3637    /// frames. Changes if the length of the remote connection ID changes, which is expected to be
3638    /// rare. If `pn` is specified, may additionally change unpredictably due to variations in
3639    /// latency and packet loss.
3640    fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3641        let pn_len = match pn {
3642            Some(pn) => PacketNumber::new(
3643                pn,
3644                self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3645            )
3646            .len(),
3647            // Upper bound
3648            None => 4,
3649        };
3650
3651        // 1 byte for flags
3652        1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3653    }
3654
3655    fn tag_len_1rtt(&self) -> usize {
3656        let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3657            Some(crypto) => Some(&*crypto.packet.local),
3658            None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3659        };
3660        // If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
3661        // this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
3662        // but that would needlessly prevent sending datagrams during 0-RTT.
3663        key.map_or(16, |x| x.tag_len())
3664    }
3665
3666    /// Mark the path as validated, and enqueue NEW_TOKEN frames to be sent as appropriate
3667    fn on_path_validated(&mut self) {
3668        self.path.validated = true;
3669        let ConnectionSide::Server { server_config } = &self.side else {
3670            return;
3671        };
3672        let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3673        new_tokens.clear();
3674        for _ in 0..server_config.validation_token.sent {
3675            new_tokens.push(self.path.remote);
3676        }
3677    }
3678}
3679
3680impl fmt::Debug for Connection {
3681    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3682        f.debug_struct("Connection")
3683            .field("handshake_cid", &self.handshake_cid)
3684            .finish()
3685    }
3686}
3687
3688/// Fields of `Connection` specific to it being client-side or server-side
3689enum ConnectionSide {
3690    Client {
3691        /// Sent in every outgoing Initial packet. Always empty after Initial keys are discarded
3692        token: Bytes,
3693        token_store: Arc<dyn TokenStore>,
3694        server_name: String,
3695    },
3696    Server {
3697        server_config: Arc<ServerConfig>,
3698    },
3699}
3700
3701impl ConnectionSide {
3702    fn remote_may_migrate(&self) -> bool {
3703        match self {
3704            Self::Server { server_config } => server_config.migration,
3705            Self::Client { .. } => false,
3706        }
3707    }
3708
3709    fn is_client(&self) -> bool {
3710        self.side().is_client()
3711    }
3712
3713    fn is_server(&self) -> bool {
3714        self.side().is_server()
3715    }
3716
3717    fn side(&self) -> Side {
3718        match *self {
3719            Self::Client { .. } => Side::Client,
3720            Self::Server { .. } => Side::Server,
3721        }
3722    }
3723}
3724
3725impl From<SideArgs> for ConnectionSide {
3726    fn from(side: SideArgs) -> Self {
3727        match side {
3728            SideArgs::Client {
3729                token_store,
3730                server_name,
3731            } => Self::Client {
3732                token: token_store.take(&server_name).unwrap_or_default(),
3733                token_store,
3734                server_name,
3735            },
3736            SideArgs::Server {
3737                server_config,
3738                pref_addr_cid: _,
3739                path_validated: _,
3740            } => Self::Server { server_config },
3741        }
3742    }
3743}
3744
3745/// Parameters to `Connection::new` specific to it being client-side or server-side
3746pub(crate) enum SideArgs {
3747    Client {
3748        token_store: Arc<dyn TokenStore>,
3749        server_name: String,
3750    },
3751    Server {
3752        server_config: Arc<ServerConfig>,
3753        pref_addr_cid: Option<ConnectionId>,
3754        path_validated: bool,
3755    },
3756}
3757
3758impl SideArgs {
3759    pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3760        match *self {
3761            Self::Client { .. } => None,
3762            Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3763        }
3764    }
3765
3766    pub(crate) fn path_validated(&self) -> bool {
3767        match *self {
3768            Self::Client { .. } => true,
3769            Self::Server { path_validated, .. } => path_validated,
3770        }
3771    }
3772
3773    pub(crate) fn side(&self) -> Side {
3774        match *self {
3775            Self::Client { .. } => Side::Client,
3776            Self::Server { .. } => Side::Server,
3777        }
3778    }
3779}
3780
3781/// Reasons why a connection might be lost
3782#[derive(Debug, Error, Clone, PartialEq, Eq)]
3783pub enum ConnectionError {
3784    /// The peer doesn't implement any supported version
3785    #[error("peer doesn't implement any supported version")]
3786    VersionMismatch,
3787    /// The peer violated the QUIC specification as understood by this implementation
3788    #[error(transparent)]
3789    TransportError(#[from] TransportError),
3790    /// The peer's QUIC stack aborted the connection automatically
3791    #[error("aborted by peer: {0}")]
3792    ConnectionClosed(frame::ConnectionClose),
3793    /// The peer closed the connection
3794    #[error("closed by peer: {0}")]
3795    ApplicationClosed(frame::ApplicationClose),
3796    /// The peer is unable to continue processing this connection, usually due to having restarted
3797    #[error("reset by peer")]
3798    Reset,
3799    /// Communication with the peer has lapsed for longer than the negotiated idle timeout
3800    ///
3801    /// If neither side is sending keep-alives, a connection will time out after a long enough idle
3802    /// period even if the peer is still reachable. See also [`TransportConfig::max_idle_timeout()`]
3803    /// and [`TransportConfig::keep_alive_interval()`].
3804    #[error("timed out")]
3805    TimedOut,
3806    /// The local application closed the connection
3807    #[error("closed")]
3808    LocallyClosed,
3809    /// The connection could not be created because not enough of the CID space is available
3810    ///
3811    /// Try using longer connection IDs.
3812    #[error("CIDs exhausted")]
3813    CidsExhausted,
3814}
3815
3816impl From<Close> for ConnectionError {
3817    fn from(x: Close) -> Self {
3818        match x {
3819            Close::Connection(reason) => Self::ConnectionClosed(reason),
3820            Close::Application(reason) => Self::ApplicationClosed(reason),
3821        }
3822    }
3823}
3824
3825// For compatibility with API consumers
3826impl From<ConnectionError> for io::Error {
3827    fn from(x: ConnectionError) -> Self {
3828        use ConnectionError::*;
3829        let kind = match x {
3830            TimedOut => io::ErrorKind::TimedOut,
3831            Reset => io::ErrorKind::ConnectionReset,
3832            ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3833            TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3834                io::ErrorKind::Other
3835            }
3836        };
3837        Self::new(kind, x)
3838    }
3839}
3840
3841#[allow(unreachable_pub)] // fuzzing only
3842#[derive(Clone)]
3843pub enum State {
3844    Handshake(state::Handshake),
3845    Established,
3846    Closed(state::Closed),
3847    Draining,
3848    /// Waiting for application to call close so we can dispose of the resources
3849    Drained,
3850}
3851
3852impl State {
3853    fn closed<R: Into<Close>>(reason: R) -> Self {
3854        Self::Closed(state::Closed {
3855            reason: reason.into(),
3856        })
3857    }
3858
3859    fn is_handshake(&self) -> bool {
3860        matches!(*self, Self::Handshake(_))
3861    }
3862
3863    fn is_established(&self) -> bool {
3864        matches!(*self, Self::Established)
3865    }
3866
3867    fn is_closed(&self) -> bool {
3868        matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3869    }
3870
3871    fn is_drained(&self) -> bool {
3872        matches!(*self, Self::Drained)
3873    }
3874}
3875
3876mod state {
3877    use super::*;
3878
3879    #[allow(unreachable_pub)] // fuzzing only
3880    #[derive(Clone)]
3881    pub struct Handshake {
3882        /// Whether the remote CID has been set by the peer yet
3883        ///
3884        /// Always set for servers
3885        pub(super) rem_cid_set: bool,
3886        /// Stateless retry token received in the first Initial by a server.
3887        ///
3888        /// Must be present in every Initial. Always empty for clients.
3889        pub(super) expected_token: Bytes,
3890        /// First cryptographic message
3891        ///
3892        /// Only set for clients
3893        pub(super) client_hello: Option<Bytes>,
3894    }
3895
3896    #[allow(unreachable_pub)] // fuzzing only
3897    #[derive(Clone)]
3898    pub struct Closed {
3899        pub(super) reason: Close,
3900    }
3901}
3902
3903/// Events of interest to the application
3904#[derive(Debug)]
3905pub enum Event {
3906    /// The connection's handshake data is ready
3907    HandshakeDataReady,
3908    /// The connection was successfully established
3909    Connected,
3910    /// The connection was lost
3911    ///
3912    /// Emitted if the peer closes the connection or an error is encountered.
3913    ConnectionLost {
3914        /// Reason that the connection was closed
3915        reason: ConnectionError,
3916    },
3917    /// Stream events
3918    Stream(StreamEvent),
3919    /// One or more application datagrams have been received
3920    DatagramReceived,
3921    /// One or more application datagrams have been sent after blocking
3922    DatagramsUnblocked,
3923}
3924
3925fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
3926    if x > y { x - y } else { Duration::ZERO }
3927}
3928
3929fn get_max_ack_delay(params: &TransportParameters) -> Duration {
3930    Duration::from_micros(params.max_ack_delay.0 * 1000)
3931}
3932
3933// Prevents overflow and improves behavior in extreme circumstances
3934const MAX_BACKOFF_EXPONENT: u32 = 16;
3935
3936/// Minimal remaining size to allow packet coalescing, excluding cryptographic tag
3937///
3938/// This must be at least as large as the header for a well-formed empty packet to be coalesced,
3939/// plus some space for frames. We only care about handshake headers because short header packets
3940/// necessarily have smaller headers, and initial packets are only ever the first packet in a
3941/// datagram (because we coalesce in ascending packet space order and the only reason to split a
3942/// packet is when packet space changes).
3943const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
3944
3945/// Largest amount of space that could be occupied by a Handshake or 0-RTT packet's header
3946///
3947/// Excludes packet-type-specific fields such as packet number or Initial token
3948// https://www.rfc-editor.org/rfc/rfc9000.html#name-0-rtt: flags + version + dcid len + dcid +
3949// scid len + scid + length + pn
3950const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
3951    1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
3952
3953/// Perform key updates this many packets before the AEAD confidentiality limit.
3954///
3955/// Chosen arbitrarily, intended to be large enough to prevent spurious connection loss.
3956const KEY_UPDATE_MARGIN: u64 = 10_000;
3957
3958#[derive(Default)]
3959struct SentFrames {
3960    retransmits: ThinRetransmits,
3961    largest_acked: Option<u64>,
3962    stream_frames: StreamMetaVec,
3963    /// Whether the packet contains non-retransmittable frames (like datagrams)
3964    non_retransmits: bool,
3965    requires_padding: bool,
3966}
3967
3968impl SentFrames {
3969    /// Returns whether the packet contains only ACKs
3970    fn is_ack_only(&self, streams: &StreamsState) -> bool {
3971        self.largest_acked.is_some()
3972            && !self.non_retransmits
3973            && self.stream_frames.is_empty()
3974            && self.retransmits.is_empty(streams)
3975    }
3976}
3977
3978/// Compute the negotiated idle timeout based on local and remote max_idle_timeout transport parameters.
3979///
3980/// According to the definition of max_idle_timeout, a value of `0` means the timeout is disabled; see <https://www.rfc-editor.org/rfc/rfc9000#section-18.2-4.4.1.>
3981///
3982/// According to the negotiation procedure, either the minimum of the timeouts or one specified is used as the negotiated value; see <https://www.rfc-editor.org/rfc/rfc9000#section-10.1-2.>
3983///
3984/// Returns the negotiated idle timeout as a `Duration`, or `None` when both endpoints have opted out of idle timeout.
3985fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
3986    match (x, y) {
3987        (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
3988        (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
3989        (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
3990        (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
3991    }
3992}
3993
3994#[cfg(test)]
3995mod tests {
3996    use super::*;
3997
3998    #[test]
3999    fn negotiate_max_idle_timeout_commutative() {
4000        let test_params = [
4001            (None, None, None),
4002            (None, Some(VarInt(0)), None),
4003            (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4004            (Some(VarInt(0)), Some(VarInt(0)), None),
4005            (
4006                Some(VarInt(2)),
4007                Some(VarInt(0)),
4008                Some(Duration::from_millis(2)),
4009            ),
4010            (
4011                Some(VarInt(1)),
4012                Some(VarInt(4)),
4013                Some(Duration::from_millis(1)),
4014            ),
4015        ];
4016
4017        for (left, right, result) in test_params {
4018            assert_eq!(negotiate_max_idle_timeout(left, right), result);
4019            assert_eq!(negotiate_max_idle_timeout(right, left), result);
4020        }
4021    }
4022}