quinn_proto/connection/streams/
send.rs

1use bytes::Bytes;
2use thiserror::Error;
3
4use crate::{VarInt, connection::send_buffer::SendBuffer, frame};
5
6#[derive(Debug)]
7pub(super) struct Send {
8    pub(super) max_data: u64,
9    pub(super) state: SendState,
10    pub(super) pending: SendBuffer,
11    pub(super) priority: i32,
12    /// Whether a frame containing a FIN bit must be transmitted, even if we don't have any new data
13    pub(super) fin_pending: bool,
14    /// Whether this stream is in the `connection_blocked` list of `Streams`
15    pub(super) connection_blocked: bool,
16    /// The reason the peer wants us to stop, if `STOP_SENDING` was received
17    pub(super) stop_reason: Option<VarInt>,
18}
19
20impl Send {
21    pub(super) fn new(max_data: VarInt) -> Box<Self> {
22        Box::new(Self {
23            max_data: max_data.into(),
24            state: SendState::Ready,
25            pending: SendBuffer::new(),
26            priority: 0,
27            fin_pending: false,
28            connection_blocked: false,
29            stop_reason: None,
30        })
31    }
32
33    /// Whether the stream has been reset
34    pub(super) fn is_reset(&self) -> bool {
35        matches!(self.state, SendState::ResetSent)
36    }
37
38    pub(super) fn finish(&mut self) -> Result<(), FinishError> {
39        if let Some(error_code) = self.stop_reason {
40            Err(FinishError::Stopped(error_code))
41        } else if self.state == SendState::Ready {
42            self.state = SendState::DataSent {
43                finish_acked: false,
44            };
45            self.fin_pending = true;
46            Ok(())
47        } else {
48            Err(FinishError::ClosedStream)
49        }
50    }
51
52    pub(super) fn write<S: BytesSource>(
53        &mut self,
54        source: &mut S,
55        limit: u64,
56    ) -> Result<Written, WriteError> {
57        if !self.is_writable() {
58            return Err(WriteError::ClosedStream);
59        }
60        if let Some(error_code) = self.stop_reason {
61            return Err(WriteError::Stopped(error_code));
62        }
63        let budget = self.max_data - self.pending.offset();
64        if budget == 0 {
65            return Err(WriteError::Blocked);
66        }
67        let mut limit = limit.min(budget) as usize;
68
69        let mut result = Written::default();
70        loop {
71            let (chunk, chunks_consumed) = source.pop_chunk(limit);
72            result.chunks += chunks_consumed;
73            result.bytes += chunk.len();
74
75            if chunk.is_empty() {
76                break;
77            }
78
79            limit -= chunk.len();
80            self.pending.write(chunk);
81        }
82
83        Ok(result)
84    }
85
86    /// Update stream state due to a reset sent by the local application
87    pub(super) fn reset(&mut self) {
88        use SendState::*;
89        if let DataSent { .. } | Ready = self.state {
90            self.state = ResetSent;
91        }
92    }
93
94    /// Handle STOP_SENDING
95    ///
96    /// Returns true if the stream was stopped due to this frame, and false
97    /// if it had been stopped before
98    pub(super) fn try_stop(&mut self, error_code: VarInt) -> bool {
99        if self.stop_reason.is_none() {
100            self.stop_reason = Some(error_code);
101            true
102        } else {
103            false
104        }
105    }
106
107    /// Returns whether the stream has been finished and all data has been acknowledged by the peer
108    pub(super) fn ack(&mut self, frame: frame::StreamMeta) -> bool {
109        self.pending.ack(frame.offsets);
110        match self.state {
111            SendState::DataSent {
112                ref mut finish_acked,
113            } => {
114                *finish_acked |= frame.fin;
115                *finish_acked && self.pending.is_fully_acked()
116            }
117            _ => false,
118        }
119    }
120
121    /// Handle increase to stream-level flow control limit
122    ///
123    /// Returns whether the stream was unblocked
124    pub(super) fn increase_max_data(&mut self, offset: u64) -> bool {
125        if offset <= self.max_data || self.state != SendState::Ready {
126            return false;
127        }
128        let was_blocked = self.pending.offset() == self.max_data;
129        self.max_data = offset;
130        was_blocked
131    }
132
133    pub(super) fn offset(&self) -> u64 {
134        self.pending.offset()
135    }
136
137    pub(super) fn is_pending(&self) -> bool {
138        self.pending.has_unsent_data() || self.fin_pending
139    }
140
141    pub(super) fn is_writable(&self) -> bool {
142        matches!(self.state, SendState::Ready)
143    }
144}
145
146/// A [`BytesSource`] implementation for `&'a mut [Bytes]`
147///
148/// The type allows to dequeue [`Bytes`] chunks from an array of chunks, up to
149/// a configured limit.
150pub(crate) struct BytesArray<'a> {
151    /// The wrapped slice of `Bytes`
152    chunks: &'a mut [Bytes],
153    /// The amount of chunks consumed from this source
154    consumed: usize,
155}
156
157impl<'a> BytesArray<'a> {
158    pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
159        Self {
160            chunks,
161            consumed: 0,
162        }
163    }
164}
165
166impl BytesSource for BytesArray<'_> {
167    fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
168        // The loop exists to skip empty chunks while still marking them as
169        // consumed
170        let mut chunks_consumed = 0;
171
172        while self.consumed < self.chunks.len() {
173            let chunk = &mut self.chunks[self.consumed];
174
175            if chunk.len() <= limit {
176                let chunk = std::mem::take(chunk);
177                self.consumed += 1;
178                chunks_consumed += 1;
179                if chunk.is_empty() {
180                    continue;
181                }
182                return (chunk, chunks_consumed);
183            } else if limit > 0 {
184                let chunk = chunk.split_to(limit);
185                return (chunk, chunks_consumed);
186            } else {
187                break;
188            }
189        }
190
191        (Bytes::new(), chunks_consumed)
192    }
193}
194
195/// A [`BytesSource`] implementation for `&[u8]`
196///
197/// The type allows to dequeue a single [`Bytes`] chunk, which will be lazily
198/// created from a reference. This allows to defer the allocation until it is
199/// known how much data needs to be copied.
200pub(crate) struct ByteSlice<'a> {
201    /// The wrapped byte slice
202    data: &'a [u8],
203}
204
205impl<'a> ByteSlice<'a> {
206    pub(crate) fn from_slice(data: &'a [u8]) -> Self {
207        Self { data }
208    }
209}
210
211impl BytesSource for ByteSlice<'_> {
212    fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
213        let limit = limit.min(self.data.len());
214        if limit == 0 {
215            return (Bytes::new(), 0);
216        }
217
218        let chunk = Bytes::from(self.data[..limit].to_owned());
219        self.data = &self.data[chunk.len()..];
220
221        let chunks_consumed = usize::from(self.data.is_empty());
222        (chunk, chunks_consumed)
223    }
224}
225
226/// A source of one or more buffers which can be converted into `Bytes` buffers on demand
227///
228/// The purpose of this data type is to defer conversion as long as possible,
229/// so that no heap allocation is required in case no data is writable.
230pub trait BytesSource {
231    /// Returns the next chunk from the source of owned chunks.
232    ///
233    /// This method will consume parts of the source.
234    /// Calling it will yield `Bytes` elements up to the configured `limit`.
235    ///
236    /// The method returns a tuple:
237    /// - The first item is the yielded `Bytes` element. The element will be
238    ///   empty if the limit is zero or no more data is available.
239    /// - The second item returns how many complete chunks inside the source had
240    ///   had been consumed. This can be less than 1, if a chunk inside the
241    ///   source had been truncated in order to adhere to the limit. It can also
242    ///   be more than 1, if zero-length chunks had been skipped.
243    fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
244}
245
246/// Indicates how many bytes and chunks had been transferred in a write operation
247#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
248pub struct Written {
249    /// The amount of bytes which had been written
250    pub bytes: usize,
251    /// The amount of full chunks which had been written
252    ///
253    /// If a chunk was only partially written, it will not be counted by this field.
254    pub chunks: usize,
255}
256
257/// Errors triggered while writing to a send stream
258#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
259pub enum WriteError {
260    /// The peer is not able to accept additional data, or the connection is congested.
261    ///
262    /// If the peer issues additional flow control credit, a [`StreamEvent::Writable`] event will
263    /// be generated, indicating that retrying the write might succeed.
264    ///
265    /// [`StreamEvent::Writable`]: crate::StreamEvent::Writable
266    #[error("unable to accept further writes")]
267    Blocked,
268    /// The peer is no longer accepting data on this stream, and it has been implicitly reset. The
269    /// stream cannot be finished or further written to.
270    ///
271    /// Carries an application-defined error code.
272    ///
273    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
274    #[error("stopped by peer: code {0}")]
275    Stopped(VarInt),
276    /// The stream has not been opened or has already been finished or reset
277    #[error("closed stream")]
278    ClosedStream,
279}
280
281#[derive(Debug, Copy, Clone, Eq, PartialEq)]
282pub(super) enum SendState {
283    /// Sending new data
284    Ready,
285    /// Stream was finished; now sending retransmits only
286    DataSent { finish_acked: bool },
287    /// Sent RESET
288    ResetSent,
289}
290
291/// Reasons why attempting to finish a stream might fail
292#[derive(Debug, Error, Clone, PartialEq, Eq)]
293pub enum FinishError {
294    /// The peer is no longer accepting data on this stream. No
295    /// [`StreamEvent::Finished`] event will be emitted for this stream.
296    ///
297    /// Carries an application-defined error code.
298    ///
299    /// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
300    #[error("stopped by peer: code {0}")]
301    Stopped(VarInt),
302    /// The stream has not been opened or was already finished or reset
303    #[error("closed stream")]
304    ClosedStream,
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn bytes_array() {
313        let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
314        for limit in 0..full.len() {
315            let mut chunks = [
316                Bytes::from_static(b""),
317                Bytes::from_static(b"Hello "),
318                Bytes::from_static(b"Wo"),
319                Bytes::from_static(b""),
320                Bytes::from_static(b"r"),
321                Bytes::from_static(b"ld"),
322                Bytes::from_static(b""),
323                Bytes::from_static(b" 12345678"),
324                Bytes::from_static(b"9 ABCDE"),
325                Bytes::from_static(b"F"),
326                Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
327            ];
328            let num_chunks = chunks.len();
329            let last_chunk_len = chunks[chunks.len() - 1].len();
330
331            let mut array = BytesArray::from_chunks(&mut chunks);
332
333            let mut buf = Vec::new();
334            let mut chunks_popped = 0;
335            let mut chunks_consumed = 0;
336            let mut remaining = limit;
337            loop {
338                let (chunk, consumed) = array.pop_chunk(remaining);
339                chunks_consumed += consumed;
340
341                if !chunk.is_empty() {
342                    buf.extend_from_slice(&chunk);
343                    remaining -= chunk.len();
344                    chunks_popped += 1;
345                } else {
346                    break;
347                }
348            }
349
350            assert_eq!(&buf[..], &full[..limit]);
351
352            if limit == full.len() {
353                // Full consumption of the last chunk
354                assert_eq!(chunks_consumed, num_chunks);
355                // Since there are empty chunks, we consume more than there are popped
356                assert_eq!(chunks_consumed, chunks_popped + 3);
357            } else if limit > full.len() - last_chunk_len {
358                // Partial consumption of the last chunk
359                assert_eq!(chunks_consumed, num_chunks - 1);
360                assert_eq!(chunks_consumed, chunks_popped + 2);
361            }
362        }
363    }
364
365    #[test]
366    fn byte_slice() {
367        let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
368        for limit in 0..full.len() {
369            let mut array = ByteSlice::from_slice(&full[..]);
370
371            let mut buf = Vec::new();
372            let mut chunks_popped = 0;
373            let mut chunks_consumed = 0;
374            let mut remaining = limit;
375            loop {
376                let (chunk, consumed) = array.pop_chunk(remaining);
377                chunks_consumed += consumed;
378
379                if !chunk.is_empty() {
380                    buf.extend_from_slice(&chunk);
381                    remaining -= chunk.len();
382                    chunks_popped += 1;
383                } else {
384                    break;
385                }
386            }
387
388            assert_eq!(&buf[..], &full[..limit]);
389            if limit != 0 {
390                assert_eq!(chunks_popped, 1);
391            } else {
392                assert_eq!(chunks_popped, 0);
393            }
394
395            if limit == full.len() {
396                assert_eq!(chunks_consumed, 1);
397            } else {
398                assert_eq!(chunks_consumed, 0);
399            }
400        }
401    }
402}