quinn_proto/
frame.rs

1use std::{
2    fmt::{self, Write},
3    mem,
4    ops::{Range, RangeInclusive},
5};
6
7use bytes::{Buf, BufMut, Bytes};
8use tinyvec::TinyVec;
9
10use crate::{
11    Dir, MAX_CID_SIZE, RESET_TOKEN_SIZE, ResetToken, StreamId, TransportError, TransportErrorCode,
12    VarInt,
13    coding::{self, BufExt, BufMutExt, UnexpectedEnd},
14    range_set::ArrayRangeSet,
15    shared::{ConnectionId, EcnCodepoint},
16};
17
18#[cfg(feature = "arbitrary")]
19use arbitrary::Arbitrary;
20
21/// A QUIC frame type
22#[derive(Copy, Clone, Eq, PartialEq)]
23pub struct FrameType(u64);
24
25impl FrameType {
26    fn stream(self) -> Option<StreamInfo> {
27        if STREAM_TYS.contains(&self.0) {
28            Some(StreamInfo(self.0 as u8))
29        } else {
30            None
31        }
32    }
33    fn datagram(self) -> Option<DatagramInfo> {
34        if DATAGRAM_TYS.contains(&self.0) {
35            Some(DatagramInfo(self.0 as u8))
36        } else {
37            None
38        }
39    }
40}
41
42impl coding::Codec for FrameType {
43    fn decode<B: Buf>(buf: &mut B) -> coding::Result<Self> {
44        Ok(Self(buf.get_var()?))
45    }
46    fn encode<B: BufMut>(&self, buf: &mut B) {
47        buf.write_var(self.0);
48    }
49}
50
51pub(crate) trait FrameStruct {
52    /// Smallest number of bytes this type of frame is guaranteed to fit within.
53    const SIZE_BOUND: usize;
54}
55
56macro_rules! frame_types {
57    {$($name:ident = $val:expr,)*} => {
58        impl FrameType {
59            $(pub(crate) const $name: FrameType = FrameType($val);)*
60        }
61
62        impl fmt::Debug for FrameType {
63            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64                match self.0 {
65                    $($val => f.write_str(stringify!($name)),)*
66                    _ => write!(f, "Type({:02x})", self.0)
67                }
68            }
69        }
70
71        impl fmt::Display for FrameType {
72            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73                match self.0 {
74                    $($val => f.write_str(stringify!($name)),)*
75                    x if STREAM_TYS.contains(&x) => f.write_str("STREAM"),
76                    x if DATAGRAM_TYS.contains(&x) => f.write_str("DATAGRAM"),
77                    _ => write!(f, "<unknown {:02x}>", self.0),
78                }
79            }
80        }
81    }
82}
83
84#[derive(Debug, Copy, Clone, Eq, PartialEq)]
85struct StreamInfo(u8);
86
87impl StreamInfo {
88    fn fin(self) -> bool {
89        self.0 & 0x01 != 0
90    }
91    fn len(self) -> bool {
92        self.0 & 0x02 != 0
93    }
94    fn off(self) -> bool {
95        self.0 & 0x04 != 0
96    }
97}
98
99#[derive(Debug, Copy, Clone, Eq, PartialEq)]
100struct DatagramInfo(u8);
101
102impl DatagramInfo {
103    fn len(self) -> bool {
104        self.0 & 0x01 != 0
105    }
106}
107
108frame_types! {
109    PADDING = 0x00,
110    PING = 0x01,
111    ACK = 0x02,
112    ACK_ECN = 0x03,
113    RESET_STREAM = 0x04,
114    STOP_SENDING = 0x05,
115    CRYPTO = 0x06,
116    NEW_TOKEN = 0x07,
117    // STREAM
118    MAX_DATA = 0x10,
119    MAX_STREAM_DATA = 0x11,
120    MAX_STREAMS_BIDI = 0x12,
121    MAX_STREAMS_UNI = 0x13,
122    DATA_BLOCKED = 0x14,
123    STREAM_DATA_BLOCKED = 0x15,
124    STREAMS_BLOCKED_BIDI = 0x16,
125    STREAMS_BLOCKED_UNI = 0x17,
126    NEW_CONNECTION_ID = 0x18,
127    RETIRE_CONNECTION_ID = 0x19,
128    PATH_CHALLENGE = 0x1a,
129    PATH_RESPONSE = 0x1b,
130    CONNECTION_CLOSE = 0x1c,
131    APPLICATION_CLOSE = 0x1d,
132    HANDSHAKE_DONE = 0x1e,
133    // ACK Frequency
134    ACK_FREQUENCY = 0xaf,
135    IMMEDIATE_ACK = 0x1f,
136    // DATAGRAM
137}
138
139const STREAM_TYS: RangeInclusive<u64> = RangeInclusive::new(0x08, 0x0f);
140const DATAGRAM_TYS: RangeInclusive<u64> = RangeInclusive::new(0x30, 0x31);
141
142#[derive(Debug)]
143pub(crate) enum Frame {
144    Padding,
145    Ping,
146    Ack(Ack),
147    ResetStream(ResetStream),
148    StopSending(StopSending),
149    Crypto(Crypto),
150    NewToken(NewToken),
151    Stream(Stream),
152    MaxData(VarInt),
153    MaxStreamData { id: StreamId, offset: u64 },
154    MaxStreams { dir: Dir, count: u64 },
155    DataBlocked { offset: u64 },
156    StreamDataBlocked { id: StreamId, offset: u64 },
157    StreamsBlocked { dir: Dir, limit: u64 },
158    NewConnectionId(NewConnectionId),
159    RetireConnectionId { sequence: u64 },
160    PathChallenge(u64),
161    PathResponse(u64),
162    Close(Close),
163    Datagram(Datagram),
164    AckFrequency(AckFrequency),
165    ImmediateAck,
166    HandshakeDone,
167}
168
169impl Frame {
170    pub(crate) fn ty(&self) -> FrameType {
171        use Frame::*;
172        match *self {
173            Padding => FrameType::PADDING,
174            ResetStream(_) => FrameType::RESET_STREAM,
175            Close(self::Close::Connection(_)) => FrameType::CONNECTION_CLOSE,
176            Close(self::Close::Application(_)) => FrameType::APPLICATION_CLOSE,
177            MaxData(_) => FrameType::MAX_DATA,
178            MaxStreamData { .. } => FrameType::MAX_STREAM_DATA,
179            MaxStreams { dir: Dir::Bi, .. } => FrameType::MAX_STREAMS_BIDI,
180            MaxStreams { dir: Dir::Uni, .. } => FrameType::MAX_STREAMS_UNI,
181            Ping => FrameType::PING,
182            DataBlocked { .. } => FrameType::DATA_BLOCKED,
183            StreamDataBlocked { .. } => FrameType::STREAM_DATA_BLOCKED,
184            StreamsBlocked { dir: Dir::Bi, .. } => FrameType::STREAMS_BLOCKED_BIDI,
185            StreamsBlocked { dir: Dir::Uni, .. } => FrameType::STREAMS_BLOCKED_UNI,
186            StopSending { .. } => FrameType::STOP_SENDING,
187            RetireConnectionId { .. } => FrameType::RETIRE_CONNECTION_ID,
188            Ack(_) => FrameType::ACK,
189            Stream(ref x) => {
190                let mut ty = *STREAM_TYS.start();
191                if x.fin {
192                    ty |= 0x01;
193                }
194                if x.offset != 0 {
195                    ty |= 0x04;
196                }
197                FrameType(ty)
198            }
199            PathChallenge(_) => FrameType::PATH_CHALLENGE,
200            PathResponse(_) => FrameType::PATH_RESPONSE,
201            NewConnectionId { .. } => FrameType::NEW_CONNECTION_ID,
202            Crypto(_) => FrameType::CRYPTO,
203            NewToken(_) => FrameType::NEW_TOKEN,
204            Datagram(_) => FrameType(*DATAGRAM_TYS.start()),
205            AckFrequency(_) => FrameType::ACK_FREQUENCY,
206            ImmediateAck => FrameType::IMMEDIATE_ACK,
207            HandshakeDone => FrameType::HANDSHAKE_DONE,
208        }
209    }
210
211    pub(crate) fn is_ack_eliciting(&self) -> bool {
212        !matches!(*self, Self::Ack(_) | Self::Padding | Self::Close(_))
213    }
214}
215
216#[derive(Clone, Debug)]
217pub enum Close {
218    Connection(ConnectionClose),
219    Application(ApplicationClose),
220}
221
222impl Close {
223    pub(crate) fn encode<W: BufMut>(&self, out: &mut W, max_len: usize) {
224        match *self {
225            Self::Connection(ref x) => x.encode(out, max_len),
226            Self::Application(ref x) => x.encode(out, max_len),
227        }
228    }
229
230    pub(crate) fn is_transport_layer(&self) -> bool {
231        matches!(*self, Self::Connection(_))
232    }
233}
234
235impl From<TransportError> for Close {
236    fn from(x: TransportError) -> Self {
237        Self::Connection(x.into())
238    }
239}
240impl From<ConnectionClose> for Close {
241    fn from(x: ConnectionClose) -> Self {
242        Self::Connection(x)
243    }
244}
245impl From<ApplicationClose> for Close {
246    fn from(x: ApplicationClose) -> Self {
247        Self::Application(x)
248    }
249}
250
251/// Reason given by the transport for closing the connection
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct ConnectionClose {
254    /// Class of error as encoded in the specification
255    pub error_code: TransportErrorCode,
256    /// Type of frame that caused the close
257    pub frame_type: Option<FrameType>,
258    /// Human-readable reason for the close
259    pub reason: Bytes,
260}
261
262impl fmt::Display for ConnectionClose {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        self.error_code.fmt(f)?;
265        if !self.reason.as_ref().is_empty() {
266            f.write_str(": ")?;
267            f.write_str(&String::from_utf8_lossy(&self.reason))?;
268        }
269        Ok(())
270    }
271}
272
273impl From<TransportError> for ConnectionClose {
274    fn from(x: TransportError) -> Self {
275        Self {
276            error_code: x.code,
277            frame_type: x.frame,
278            reason: x.reason.into(),
279        }
280    }
281}
282
283impl FrameStruct for ConnectionClose {
284    const SIZE_BOUND: usize = 1 + 8 + 8 + 8;
285}
286
287impl ConnectionClose {
288    pub(crate) fn encode<W: BufMut>(&self, out: &mut W, max_len: usize) {
289        out.write(FrameType::CONNECTION_CLOSE); // 1 byte
290        out.write(self.error_code); // <= 8 bytes
291        let ty = self.frame_type.map_or(0, |x| x.0);
292        out.write_var(ty); // <= 8 bytes
293        let max_len = max_len
294            - 3
295            - VarInt::from_u64(ty).unwrap().size()
296            - VarInt::from_u64(self.reason.len() as u64).unwrap().size();
297        let actual_len = self.reason.len().min(max_len);
298        out.write_var(actual_len as u64); // <= 8 bytes
299        out.put_slice(&self.reason[0..actual_len]); // whatever's left
300    }
301}
302
303/// Reason given by an application for closing the connection
304#[derive(Debug, Clone, PartialEq, Eq)]
305pub struct ApplicationClose {
306    /// Application-specific reason code
307    pub error_code: VarInt,
308    /// Human-readable reason for the close
309    pub reason: Bytes,
310}
311
312impl fmt::Display for ApplicationClose {
313    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
314        if !self.reason.as_ref().is_empty() {
315            f.write_str(&String::from_utf8_lossy(&self.reason))?;
316            f.write_str(" (code ")?;
317            self.error_code.fmt(f)?;
318            f.write_str(")")?;
319        } else {
320            self.error_code.fmt(f)?;
321        }
322        Ok(())
323    }
324}
325
326impl FrameStruct for ApplicationClose {
327    const SIZE_BOUND: usize = 1 + 8 + 8;
328}
329
330impl ApplicationClose {
331    pub(crate) fn encode<W: BufMut>(&self, out: &mut W, max_len: usize) {
332        out.write(FrameType::APPLICATION_CLOSE); // 1 byte
333        out.write(self.error_code); // <= 8 bytes
334        let max_len = max_len - 3 - VarInt::from_u64(self.reason.len() as u64).unwrap().size();
335        let actual_len = self.reason.len().min(max_len);
336        out.write_var(actual_len as u64); // <= 8 bytes
337        out.put_slice(&self.reason[0..actual_len]); // whatever's left
338    }
339}
340
341#[derive(Clone, Eq, PartialEq)]
342pub struct Ack {
343    pub largest: u64,
344    pub delay: u64,
345    pub additional: Bytes,
346    pub ecn: Option<EcnCounts>,
347}
348
349impl fmt::Debug for Ack {
350    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
351        let mut ranges = "[".to_string();
352        let mut first = true;
353        for range in self.iter() {
354            if !first {
355                ranges.push(',');
356            }
357            write!(ranges, "{range:?}").unwrap();
358            first = false;
359        }
360        ranges.push(']');
361
362        f.debug_struct("Ack")
363            .field("largest", &self.largest)
364            .field("delay", &self.delay)
365            .field("ecn", &self.ecn)
366            .field("ranges", &ranges)
367            .finish()
368    }
369}
370
371impl<'a> IntoIterator for &'a Ack {
372    type Item = RangeInclusive<u64>;
373    type IntoIter = AckIter<'a>;
374
375    fn into_iter(self) -> AckIter<'a> {
376        AckIter::new(self.largest, &self.additional[..])
377    }
378}
379
380impl Ack {
381    pub fn encode<W: BufMut>(
382        delay: u64,
383        ranges: &ArrayRangeSet,
384        ecn: Option<&EcnCounts>,
385        buf: &mut W,
386    ) {
387        let mut rest = ranges.iter().rev();
388        let first = rest.next().unwrap();
389        let largest = first.end - 1;
390        let first_size = first.end - first.start;
391        buf.write(if ecn.is_some() {
392            FrameType::ACK_ECN
393        } else {
394            FrameType::ACK
395        });
396        buf.write_var(largest);
397        buf.write_var(delay);
398        buf.write_var(ranges.len() as u64 - 1);
399        buf.write_var(first_size - 1);
400        let mut prev = first.start;
401        for block in rest {
402            let size = block.end - block.start;
403            buf.write_var(prev - block.end - 1);
404            buf.write_var(size - 1);
405            prev = block.start;
406        }
407        if let Some(x) = ecn {
408            x.encode(buf)
409        }
410    }
411
412    pub fn iter(&self) -> AckIter<'_> {
413        self.into_iter()
414    }
415}
416
417#[derive(Debug, Copy, Clone, Eq, PartialEq)]
418pub struct EcnCounts {
419    pub ect0: u64,
420    pub ect1: u64,
421    pub ce: u64,
422}
423
424impl std::ops::AddAssign<EcnCodepoint> for EcnCounts {
425    fn add_assign(&mut self, rhs: EcnCodepoint) {
426        match rhs {
427            EcnCodepoint::Ect0 => {
428                self.ect0 += 1;
429            }
430            EcnCodepoint::Ect1 => {
431                self.ect1 += 1;
432            }
433            EcnCodepoint::Ce => {
434                self.ce += 1;
435            }
436        }
437    }
438}
439
440impl EcnCounts {
441    pub const ZERO: Self = Self {
442        ect0: 0,
443        ect1: 0,
444        ce: 0,
445    };
446
447    pub fn encode<W: BufMut>(&self, out: &mut W) {
448        out.write_var(self.ect0);
449        out.write_var(self.ect1);
450        out.write_var(self.ce);
451    }
452}
453
454#[derive(Debug, Clone)]
455pub(crate) struct Stream {
456    pub(crate) id: StreamId,
457    pub(crate) offset: u64,
458    pub(crate) fin: bool,
459    pub(crate) data: Bytes,
460}
461
462impl FrameStruct for Stream {
463    const SIZE_BOUND: usize = 1 + 8 + 8 + 8;
464}
465
466/// Metadata from a stream frame
467#[derive(Debug, Clone)]
468pub(crate) struct StreamMeta {
469    pub(crate) id: StreamId,
470    pub(crate) offsets: Range<u64>,
471    pub(crate) fin: bool,
472}
473
474// This manual implementation exists because `Default` is not implemented for `StreamId`
475impl Default for StreamMeta {
476    fn default() -> Self {
477        Self {
478            id: StreamId(0),
479            offsets: 0..0,
480            fin: false,
481        }
482    }
483}
484
485impl StreamMeta {
486    pub(crate) fn encode<W: BufMut>(&self, length: bool, out: &mut W) {
487        let mut ty = *STREAM_TYS.start();
488        if self.offsets.start != 0 {
489            ty |= 0x04;
490        }
491        if length {
492            ty |= 0x02;
493        }
494        if self.fin {
495            ty |= 0x01;
496        }
497        out.write_var(ty); // 1 byte
498        out.write(self.id); // <=8 bytes
499        if self.offsets.start != 0 {
500            out.write_var(self.offsets.start); // <=8 bytes
501        }
502        if length {
503            out.write_var(self.offsets.end - self.offsets.start); // <=8 bytes
504        }
505    }
506}
507
508/// A vector of [`StreamMeta`] with optimization for the single element case
509pub(crate) type StreamMetaVec = TinyVec<[StreamMeta; 1]>;
510
511#[derive(Debug, Clone)]
512pub(crate) struct Crypto {
513    pub(crate) offset: u64,
514    pub(crate) data: Bytes,
515}
516
517impl Crypto {
518    pub(crate) const SIZE_BOUND: usize = 17;
519
520    pub(crate) fn encode<W: BufMut>(&self, out: &mut W) {
521        out.write(FrameType::CRYPTO);
522        out.write_var(self.offset);
523        out.write_var(self.data.len() as u64);
524        out.put_slice(&self.data);
525    }
526}
527
528#[derive(Debug, Clone)]
529pub(crate) struct NewToken {
530    pub(crate) token: Bytes,
531}
532
533impl NewToken {
534    pub(crate) fn encode<W: BufMut>(&self, out: &mut W) {
535        out.write(FrameType::NEW_TOKEN);
536        out.write_var(self.token.len() as u64);
537        out.put_slice(&self.token);
538    }
539
540    pub(crate) fn size(&self) -> usize {
541        1 + VarInt::from_u64(self.token.len() as u64).unwrap().size() + self.token.len()
542    }
543}
544
545pub(crate) struct Iter {
546    bytes: Bytes,
547    last_ty: Option<FrameType>,
548}
549
550impl Iter {
551    pub(crate) fn new(payload: Bytes) -> Result<Self, TransportError> {
552        if payload.is_empty() {
553            // "An endpoint MUST treat receipt of a packet containing no frames as a
554            // connection error of type PROTOCOL_VIOLATION."
555            // https://www.rfc-editor.org/rfc/rfc9000.html#name-frames-and-frame-types
556            return Err(TransportError::PROTOCOL_VIOLATION(
557                "packet payload is empty",
558            ));
559        }
560
561        Ok(Self {
562            bytes: payload,
563            last_ty: None,
564        })
565    }
566
567    fn take_len(&mut self) -> Result<Bytes, UnexpectedEnd> {
568        let len = self.bytes.get_var()?;
569        if len > self.bytes.remaining() as u64 {
570            return Err(UnexpectedEnd);
571        }
572        Ok(self.bytes.split_to(len as usize))
573    }
574
575    fn try_next(&mut self) -> Result<Frame, IterErr> {
576        let ty = self.bytes.get::<FrameType>()?;
577        self.last_ty = Some(ty);
578        Ok(match ty {
579            FrameType::PADDING => Frame::Padding,
580            FrameType::RESET_STREAM => Frame::ResetStream(ResetStream {
581                id: self.bytes.get()?,
582                error_code: self.bytes.get()?,
583                final_offset: self.bytes.get()?,
584            }),
585            FrameType::CONNECTION_CLOSE => Frame::Close(Close::Connection(ConnectionClose {
586                error_code: self.bytes.get()?,
587                frame_type: {
588                    let x = self.bytes.get_var()?;
589                    if x == 0 { None } else { Some(FrameType(x)) }
590                },
591                reason: self.take_len()?,
592            })),
593            FrameType::APPLICATION_CLOSE => Frame::Close(Close::Application(ApplicationClose {
594                error_code: self.bytes.get()?,
595                reason: self.take_len()?,
596            })),
597            FrameType::MAX_DATA => Frame::MaxData(self.bytes.get()?),
598            FrameType::MAX_STREAM_DATA => Frame::MaxStreamData {
599                id: self.bytes.get()?,
600                offset: self.bytes.get_var()?,
601            },
602            FrameType::MAX_STREAMS_BIDI => Frame::MaxStreams {
603                dir: Dir::Bi,
604                count: self.bytes.get_var()?,
605            },
606            FrameType::MAX_STREAMS_UNI => Frame::MaxStreams {
607                dir: Dir::Uni,
608                count: self.bytes.get_var()?,
609            },
610            FrameType::PING => Frame::Ping,
611            FrameType::DATA_BLOCKED => Frame::DataBlocked {
612                offset: self.bytes.get_var()?,
613            },
614            FrameType::STREAM_DATA_BLOCKED => Frame::StreamDataBlocked {
615                id: self.bytes.get()?,
616                offset: self.bytes.get_var()?,
617            },
618            FrameType::STREAMS_BLOCKED_BIDI => Frame::StreamsBlocked {
619                dir: Dir::Bi,
620                limit: self.bytes.get_var()?,
621            },
622            FrameType::STREAMS_BLOCKED_UNI => Frame::StreamsBlocked {
623                dir: Dir::Uni,
624                limit: self.bytes.get_var()?,
625            },
626            FrameType::STOP_SENDING => Frame::StopSending(StopSending {
627                id: self.bytes.get()?,
628                error_code: self.bytes.get()?,
629            }),
630            FrameType::RETIRE_CONNECTION_ID => Frame::RetireConnectionId {
631                sequence: self.bytes.get_var()?,
632            },
633            FrameType::ACK | FrameType::ACK_ECN => {
634                let largest = self.bytes.get_var()?;
635                let delay = self.bytes.get_var()?;
636                let extra_blocks = self.bytes.get_var()? as usize;
637                let n = scan_ack_blocks(&self.bytes, largest, extra_blocks)?;
638                Frame::Ack(Ack {
639                    delay,
640                    largest,
641                    additional: self.bytes.split_to(n),
642                    ecn: if ty != FrameType::ACK_ECN {
643                        None
644                    } else {
645                        Some(EcnCounts {
646                            ect0: self.bytes.get_var()?,
647                            ect1: self.bytes.get_var()?,
648                            ce: self.bytes.get_var()?,
649                        })
650                    },
651                })
652            }
653            FrameType::PATH_CHALLENGE => Frame::PathChallenge(self.bytes.get()?),
654            FrameType::PATH_RESPONSE => Frame::PathResponse(self.bytes.get()?),
655            FrameType::NEW_CONNECTION_ID => {
656                let sequence = self.bytes.get_var()?;
657                let retire_prior_to = self.bytes.get_var()?;
658                if retire_prior_to > sequence {
659                    return Err(IterErr::Malformed);
660                }
661                let length = self.bytes.get::<u8>()? as usize;
662                if length > MAX_CID_SIZE || length == 0 {
663                    return Err(IterErr::Malformed);
664                }
665                if length > self.bytes.remaining() {
666                    return Err(IterErr::UnexpectedEnd);
667                }
668                let mut stage = [0; MAX_CID_SIZE];
669                self.bytes.copy_to_slice(&mut stage[0..length]);
670                let id = ConnectionId::new(&stage[..length]);
671                if self.bytes.remaining() < 16 {
672                    return Err(IterErr::UnexpectedEnd);
673                }
674                let mut reset_token = [0; RESET_TOKEN_SIZE];
675                self.bytes.copy_to_slice(&mut reset_token);
676                Frame::NewConnectionId(NewConnectionId {
677                    sequence,
678                    retire_prior_to,
679                    id,
680                    reset_token: reset_token.into(),
681                })
682            }
683            FrameType::CRYPTO => Frame::Crypto(Crypto {
684                offset: self.bytes.get_var()?,
685                data: self.take_len()?,
686            }),
687            FrameType::NEW_TOKEN => Frame::NewToken(NewToken {
688                token: self.take_len()?,
689            }),
690            FrameType::HANDSHAKE_DONE => Frame::HandshakeDone,
691            FrameType::ACK_FREQUENCY => Frame::AckFrequency(AckFrequency {
692                sequence: self.bytes.get()?,
693                ack_eliciting_threshold: self.bytes.get()?,
694                request_max_ack_delay: self.bytes.get()?,
695                reordering_threshold: self.bytes.get()?,
696            }),
697            FrameType::IMMEDIATE_ACK => Frame::ImmediateAck,
698            _ => {
699                if let Some(s) = ty.stream() {
700                    Frame::Stream(Stream {
701                        id: self.bytes.get()?,
702                        offset: if s.off() { self.bytes.get_var()? } else { 0 },
703                        fin: s.fin(),
704                        data: if s.len() {
705                            self.take_len()?
706                        } else {
707                            self.take_remaining()
708                        },
709                    })
710                } else if let Some(d) = ty.datagram() {
711                    Frame::Datagram(Datagram {
712                        data: if d.len() {
713                            self.take_len()?
714                        } else {
715                            self.take_remaining()
716                        },
717                    })
718                } else {
719                    return Err(IterErr::InvalidFrameId);
720                }
721            }
722        })
723    }
724
725    fn take_remaining(&mut self) -> Bytes {
726        mem::take(&mut self.bytes)
727    }
728}
729
730impl Iterator for Iter {
731    type Item = Result<Frame, InvalidFrame>;
732    fn next(&mut self) -> Option<Self::Item> {
733        if !self.bytes.has_remaining() {
734            return None;
735        }
736        match self.try_next() {
737            Ok(x) => Some(Ok(x)),
738            Err(e) => {
739                // Corrupt frame, skip it and everything that follows
740                self.bytes.clear();
741                Some(Err(InvalidFrame {
742                    ty: self.last_ty,
743                    reason: e.reason(),
744                }))
745            }
746        }
747    }
748}
749
750#[derive(Debug)]
751pub(crate) struct InvalidFrame {
752    pub(crate) ty: Option<FrameType>,
753    pub(crate) reason: &'static str,
754}
755
756impl From<InvalidFrame> for TransportError {
757    fn from(err: InvalidFrame) -> Self {
758        let mut te = Self::FRAME_ENCODING_ERROR(err.reason);
759        te.frame = err.ty;
760        te
761    }
762}
763
764/// Validate exactly `n` ACK ranges in `buf` and return the number of bytes they cover
765fn scan_ack_blocks(mut buf: &[u8], largest: u64, n: usize) -> Result<usize, IterErr> {
766    let total_len = buf.remaining();
767    let first_block = buf.get_var()?;
768    let mut smallest = largest.checked_sub(first_block).ok_or(IterErr::Malformed)?;
769    for _ in 0..n {
770        let gap = buf.get_var()?;
771        smallest = smallest.checked_sub(gap + 2).ok_or(IterErr::Malformed)?;
772        let block = buf.get_var()?;
773        smallest = smallest.checked_sub(block).ok_or(IterErr::Malformed)?;
774    }
775    Ok(total_len - buf.remaining())
776}
777
778enum IterErr {
779    UnexpectedEnd,
780    InvalidFrameId,
781    Malformed,
782}
783
784impl IterErr {
785    fn reason(&self) -> &'static str {
786        use IterErr::*;
787        match *self {
788            UnexpectedEnd => "unexpected end",
789            InvalidFrameId => "invalid frame ID",
790            Malformed => "malformed",
791        }
792    }
793}
794
795impl From<UnexpectedEnd> for IterErr {
796    fn from(_: UnexpectedEnd) -> Self {
797        Self::UnexpectedEnd
798    }
799}
800
801#[derive(Debug, Clone)]
802pub struct AckIter<'a> {
803    largest: u64,
804    data: &'a [u8],
805}
806
807impl<'a> AckIter<'a> {
808    fn new(largest: u64, data: &'a [u8]) -> Self {
809        Self { largest, data }
810    }
811}
812
813impl Iterator for AckIter<'_> {
814    type Item = RangeInclusive<u64>;
815    fn next(&mut self) -> Option<RangeInclusive<u64>> {
816        if !self.data.has_remaining() {
817            return None;
818        }
819        let block = self.data.get_var().unwrap();
820        let largest = self.largest;
821        if let Ok(gap) = self.data.get_var() {
822            self.largest -= block + gap + 2;
823        }
824        Some(largest - block..=largest)
825    }
826}
827
828#[allow(unreachable_pub)] // fuzzing only
829#[cfg_attr(feature = "arbitrary", derive(Arbitrary))]
830#[derive(Debug, Copy, Clone)]
831pub struct ResetStream {
832    pub(crate) id: StreamId,
833    pub(crate) error_code: VarInt,
834    pub(crate) final_offset: VarInt,
835}
836
837impl FrameStruct for ResetStream {
838    const SIZE_BOUND: usize = 1 + 8 + 8 + 8;
839}
840
841impl ResetStream {
842    pub(crate) fn encode<W: BufMut>(&self, out: &mut W) {
843        out.write(FrameType::RESET_STREAM); // 1 byte
844        out.write(self.id); // <= 8 bytes
845        out.write(self.error_code); // <= 8 bytes
846        out.write(self.final_offset); // <= 8 bytes
847    }
848}
849
850#[derive(Debug, Copy, Clone)]
851pub(crate) struct StopSending {
852    pub(crate) id: StreamId,
853    pub(crate) error_code: VarInt,
854}
855
856impl FrameStruct for StopSending {
857    const SIZE_BOUND: usize = 1 + 8 + 8;
858}
859
860impl StopSending {
861    pub(crate) fn encode<W: BufMut>(&self, out: &mut W) {
862        out.write(FrameType::STOP_SENDING); // 1 byte
863        out.write(self.id); // <= 8 bytes
864        out.write(self.error_code) // <= 8 bytes
865    }
866}
867
868#[derive(Debug, Copy, Clone)]
869pub(crate) struct NewConnectionId {
870    pub(crate) sequence: u64,
871    pub(crate) retire_prior_to: u64,
872    pub(crate) id: ConnectionId,
873    pub(crate) reset_token: ResetToken,
874}
875
876impl NewConnectionId {
877    pub(crate) fn encode<W: BufMut>(&self, out: &mut W) {
878        out.write(FrameType::NEW_CONNECTION_ID);
879        out.write_var(self.sequence);
880        out.write_var(self.retire_prior_to);
881        out.write(self.id.len() as u8);
882        out.put_slice(&self.id);
883        out.put_slice(&self.reset_token);
884    }
885}
886
887/// Smallest number of bytes this type of frame is guaranteed to fit within.
888pub(crate) const RETIRE_CONNECTION_ID_SIZE_BOUND: usize = 9;
889
890/// An unreliable datagram
891#[derive(Debug, Clone)]
892pub struct Datagram {
893    /// Payload
894    pub data: Bytes,
895}
896
897impl FrameStruct for Datagram {
898    const SIZE_BOUND: usize = 1 + 8;
899}
900
901impl Datagram {
902    pub(crate) fn encode(&self, length: bool, out: &mut Vec<u8>) {
903        out.write(FrameType(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte
904        if length {
905            // Safe to unwrap because we check length sanity before queueing datagrams
906            out.write(VarInt::from_u64(self.data.len() as u64).unwrap()); // <= 8 bytes
907        }
908        out.extend_from_slice(&self.data);
909    }
910
911    pub(crate) fn size(&self, length: bool) -> usize {
912        1 + if length {
913            VarInt::from_u64(self.data.len() as u64).unwrap().size()
914        } else {
915            0
916        } + self.data.len()
917    }
918}
919
920#[derive(Debug, Copy, Clone, PartialEq, Eq)]
921pub(crate) struct AckFrequency {
922    pub(crate) sequence: VarInt,
923    pub(crate) ack_eliciting_threshold: VarInt,
924    pub(crate) request_max_ack_delay: VarInt,
925    pub(crate) reordering_threshold: VarInt,
926}
927
928impl AckFrequency {
929    pub(crate) fn encode<W: BufMut>(&self, buf: &mut W) {
930        buf.write(FrameType::ACK_FREQUENCY);
931        buf.write(self.sequence);
932        buf.write(self.ack_eliciting_threshold);
933        buf.write(self.request_max_ack_delay);
934        buf.write(self.reordering_threshold);
935    }
936}
937
938#[cfg(test)]
939mod test {
940    use super::*;
941    use crate::coding::Codec;
942    use assert_matches::assert_matches;
943
944    fn frames(buf: Vec<u8>) -> Vec<Frame> {
945        Iter::new(Bytes::from(buf))
946            .unwrap()
947            .collect::<Result<Vec<_>, _>>()
948            .unwrap()
949    }
950
951    #[test]
952    fn ack_coding() {
953        const PACKETS: &[u64] = &[1, 2, 3, 5, 10, 11, 14];
954        let mut ranges = ArrayRangeSet::new();
955        for &packet in PACKETS {
956            ranges.insert(packet..packet + 1);
957        }
958        let mut buf = Vec::new();
959        const ECN: EcnCounts = EcnCounts {
960            ect0: 42,
961            ect1: 24,
962            ce: 12,
963        };
964        Ack::encode(42, &ranges, Some(&ECN), &mut buf);
965        let frames = frames(buf);
966        assert_eq!(frames.len(), 1);
967        match frames[0] {
968            Frame::Ack(ref ack) => {
969                let mut packets = ack.iter().flatten().collect::<Vec<_>>();
970                packets.sort_unstable();
971                assert_eq!(&packets[..], PACKETS);
972                assert_eq!(ack.ecn, Some(ECN));
973            }
974            ref x => panic!("incorrect frame {x:?}"),
975        }
976    }
977
978    #[test]
979    fn ack_frequency_coding() {
980        let mut buf = Vec::new();
981        let original = AckFrequency {
982            sequence: VarInt(42),
983            ack_eliciting_threshold: VarInt(20),
984            request_max_ack_delay: VarInt(50_000),
985            reordering_threshold: VarInt(1),
986        };
987        original.encode(&mut buf);
988        let frames = frames(buf);
989        assert_eq!(frames.len(), 1);
990        match &frames[0] {
991            Frame::AckFrequency(decoded) => assert_eq!(decoded, &original),
992            x => panic!("incorrect frame {x:?}"),
993        }
994    }
995
996    #[test]
997    fn immediate_ack_coding() {
998        let mut buf = Vec::new();
999        FrameType::IMMEDIATE_ACK.encode(&mut buf);
1000        let frames = frames(buf);
1001        assert_eq!(frames.len(), 1);
1002        assert_matches!(&frames[0], Frame::ImmediateAck);
1003    }
1004}