quinn_proto/connection/
datagrams.rs

1use std::collections::VecDeque;
2
3use bytes::Bytes;
4use thiserror::Error;
5use tracing::{debug, trace};
6
7use super::Connection;
8use crate::{
9    TransportError,
10    frame::{Datagram, FrameStruct},
11};
12
13/// API to control datagram traffic
14pub struct Datagrams<'a> {
15    pub(super) conn: &'a mut Connection,
16}
17
18impl Datagrams<'_> {
19    /// Queue an unreliable, unordered datagram for immediate transmission
20    ///
21    /// If `drop` is true, previously queued datagrams which are still unsent may be discarded to
22    /// make space for this datagram, in order of oldest to newest. If `drop` is false, and there
23    /// isn't enough space due to previously queued datagrams, this function will return
24    /// `SendDatagramError::Blocked`. `Event::DatagramsUnblocked` will be emitted once datagrams
25    /// have been sent.
26    ///
27    /// Returns `Err` iff a `len`-byte datagram cannot currently be sent.
28    pub fn send(&mut self, data: Bytes, drop: bool) -> Result<(), SendDatagramError> {
29        if self.conn.config.datagram_receive_buffer_size.is_none() {
30            return Err(SendDatagramError::Disabled);
31        }
32        let max = self
33            .max_size()
34            .ok_or(SendDatagramError::UnsupportedByPeer)?;
35        if data.len() > max {
36            return Err(SendDatagramError::TooLarge);
37        }
38        if drop {
39            while self.conn.datagrams.outgoing_total > self.conn.config.datagram_send_buffer_size {
40                let prev = self
41                    .conn
42                    .datagrams
43                    .outgoing
44                    .pop_front()
45                    .expect("datagrams.outgoing_total desynchronized");
46                trace!(len = prev.data.len(), "dropping outgoing datagram");
47                self.conn.datagrams.outgoing_total -= prev.data.len();
48            }
49        } else if self.conn.datagrams.outgoing_total + data.len()
50            > self.conn.config.datagram_send_buffer_size
51        {
52            self.conn.datagrams.send_blocked = true;
53            return Err(SendDatagramError::Blocked(data));
54        }
55        self.conn.datagrams.outgoing_total += data.len();
56        self.conn.datagrams.outgoing.push_back(Datagram { data });
57        Ok(())
58    }
59
60    /// Compute the maximum size of datagrams that may passed to `send_datagram`
61    ///
62    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
63    ///
64    /// This may change over the lifetime of a connection according to variation in the path MTU
65    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
66    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
67    ///
68    /// Not necessarily the maximum size of received datagrams.
69    pub fn max_size(&self) -> Option<usize> {
70        // We use the conservative overhead bound for any packet number, reducing the budget by at
71        // most 3 bytes, so that PN size fluctuations don't cause users sending maximum-size
72        // datagrams to suffer avoidable packet loss.
73        let max_size = self.conn.path.current_mtu() as usize
74            - self.conn.predict_1rtt_overhead(None)
75            - Datagram::SIZE_BOUND;
76        let limit = self
77            .conn
78            .peer_params
79            .max_datagram_frame_size?
80            .into_inner()
81            .saturating_sub(Datagram::SIZE_BOUND as u64);
82        Some(limit.min(max_size as u64) as usize)
83    }
84
85    /// Receive an unreliable, unordered datagram
86    pub fn recv(&mut self) -> Option<Bytes> {
87        self.conn.datagrams.recv()
88    }
89
90    /// Bytes available in the outgoing datagram buffer
91    ///
92    /// When greater than zero, [`send`](Self::send)ing a datagram of at most this size is
93    /// guaranteed not to cause older datagrams to be dropped.
94    pub fn send_buffer_space(&self) -> usize {
95        self.conn
96            .config
97            .datagram_send_buffer_size
98            .saturating_sub(self.conn.datagrams.outgoing_total)
99    }
100}
101
102#[derive(Default)]
103pub(super) struct DatagramState {
104    /// Number of bytes of datagrams that have been received by the local transport but not
105    /// delivered to the application
106    pub(super) recv_buffered: usize,
107    pub(super) incoming: VecDeque<Datagram>,
108    pub(super) outgoing: VecDeque<Datagram>,
109    pub(super) outgoing_total: usize,
110    pub(super) send_blocked: bool,
111}
112
113impl DatagramState {
114    pub(super) fn received(
115        &mut self,
116        datagram: Datagram,
117        window: &Option<usize>,
118    ) -> Result<bool, TransportError> {
119        let window = match window {
120            None => {
121                return Err(TransportError::PROTOCOL_VIOLATION(
122                    "unexpected DATAGRAM frame",
123                ));
124            }
125            Some(x) => *x,
126        };
127
128        if datagram.data.len() > window {
129            return Err(TransportError::PROTOCOL_VIOLATION("oversized datagram"));
130        }
131
132        let was_empty = self.recv_buffered == 0;
133        while datagram.data.len() + self.recv_buffered > window {
134            debug!("dropping stale datagram");
135            self.recv();
136        }
137
138        self.recv_buffered += datagram.data.len();
139        self.incoming.push_back(datagram);
140        Ok(was_empty)
141    }
142
143    /// Discard outgoing datagrams with a payload larger than `max_payload` bytes
144    ///
145    /// Used to ensure that reductions in MTU don't get us stuck in a state where we have a datagram
146    /// queued but can't send it.
147    pub(super) fn drop_oversized(&mut self, max_payload: usize) {
148        self.outgoing.retain(|datagram| {
149            let result = datagram.data.len() < max_payload;
150            if !result {
151                trace!(
152                    "dropping {} byte datagram violating {} byte limit",
153                    datagram.data.len(),
154                    max_payload
155                );
156                self.outgoing_total -= datagram.data.len();
157            }
158            result
159        });
160    }
161
162    /// Attempt to write a datagram frame into `buf`, consuming it from `self.outgoing`
163    ///
164    /// Returns whether a frame was written. At most `max_size` bytes will be written, including
165    /// framing.
166    pub(super) fn write(&mut self, buf: &mut Vec<u8>, max_size: usize) -> bool {
167        let datagram = match self.outgoing.pop_front() {
168            Some(x) => x,
169            None => return false,
170        };
171
172        if buf.len() + datagram.size(true) > max_size {
173            // Future work: we could be more clever about cramming small datagrams into
174            // mostly-full packets when a larger one is queued first
175            self.outgoing.push_front(datagram);
176            return false;
177        }
178
179        trace!(len = datagram.data.len(), "DATAGRAM");
180
181        self.outgoing_total -= datagram.data.len();
182        datagram.encode(true, buf);
183        true
184    }
185
186    pub(super) fn recv(&mut self) -> Option<Bytes> {
187        let x = self.incoming.pop_front()?.data;
188        self.recv_buffered -= x.len();
189        Some(x)
190    }
191}
192
193/// Errors that can arise when sending a datagram
194#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
195pub enum SendDatagramError {
196    /// The peer does not support receiving datagram frames
197    #[error("datagrams not supported by peer")]
198    UnsupportedByPeer,
199    /// Datagram support is disabled locally
200    #[error("datagram support disabled")]
201    Disabled,
202    /// The datagram is larger than the connection can currently accommodate
203    ///
204    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
205    /// exceeded.
206    #[error("datagram too large")]
207    TooLarge,
208    /// Send would block
209    #[error("datagram send blocked")]
210    Blocked(Bytes),
211}