quinn_proto/connection/streams/
recv.rs

1use std::collections::hash_map::Entry;
2use std::mem;
3
4use thiserror::Error;
5use tracing::debug;
6
7use super::state::get_or_insert_recv;
8use super::{ClosedStream, Retransmits, ShouldTransmit, StreamId, StreamsState};
9use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
10use crate::connection::streams::state::StreamRecv;
11use crate::{TransportError, VarInt, frame};
12
13#[derive(Debug, Default)]
14pub(super) struct Recv {
15    // NB: when adding or removing fields, remember to update `reinit`.
16    state: RecvState,
17    pub(super) assembler: Assembler,
18    sent_max_stream_data: u64,
19    pub(super) end: u64,
20    pub(super) stopped: bool,
21}
22
23impl Recv {
24    pub(super) fn new(initial_max_data: u64) -> Box<Self> {
25        Box::new(Self {
26            state: RecvState::default(),
27            assembler: Assembler::new(),
28            sent_max_stream_data: initial_max_data,
29            end: 0,
30            stopped: false,
31        })
32    }
33
34    /// Reset to the initial state
35    pub(super) fn reinit(&mut self, initial_max_data: u64) {
36        self.state = RecvState::default();
37        self.assembler.reinit();
38        self.sent_max_stream_data = initial_max_data;
39        self.end = 0;
40        self.stopped = false;
41    }
42
43    /// Process a STREAM frame
44    ///
45    /// Return value is `(number_of_new_bytes_ingested, stream_is_closed)`
46    pub(super) fn ingest(
47        &mut self,
48        frame: frame::Stream,
49        payload_len: usize,
50        received: u64,
51        max_data: u64,
52    ) -> Result<(u64, bool), TransportError> {
53        let end = frame.offset + frame.data.len() as u64;
54        if end >= 2u64.pow(62) {
55            return Err(TransportError::FLOW_CONTROL_ERROR(
56                "maximum stream offset too large",
57            ));
58        }
59
60        if let Some(final_offset) = self.final_offset() {
61            if end > final_offset || (frame.fin && end != final_offset) {
62                debug!(end, final_offset, "final size error");
63                return Err(TransportError::FINAL_SIZE_ERROR(""));
64            }
65        }
66
67        let new_bytes = self.credit_consumed_by(end, received, max_data)?;
68
69        // Stopped streams don't need to wait for the actual data, they just need to know
70        // how much there was.
71        if frame.fin && !self.stopped {
72            if let RecvState::Recv { ref mut size } = self.state {
73                *size = Some(end);
74            }
75        }
76
77        self.end = self.end.max(end);
78        // Don't bother storing data or releasing stream-level flow control credit if the stream's
79        // already stopped
80        if !self.stopped {
81            self.assembler.insert(frame.offset, frame.data, payload_len);
82        }
83
84        Ok((new_bytes, frame.fin && self.stopped))
85    }
86
87    pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> {
88        if self.stopped {
89            return Err(ClosedStream { _private: () });
90        }
91
92        self.stopped = true;
93        self.assembler.clear();
94        // Issue flow control credit for unread data
95        let read_credits = self.end - self.assembler.bytes_read();
96        // This may send a spurious STOP_SENDING if we've already received all data, but it's a bit
97        // fiddly to distinguish that from the case where we've received a FIN but are missing some
98        // data that the peer might still be trying to retransmit, in which case a STOP_SENDING is
99        // still useful.
100        Ok((read_credits, ShouldTransmit(self.is_receiving())))
101    }
102
103    /// Returns the window that should be advertised in a `MAX_STREAM_DATA` frame
104    ///
105    /// The method returns a tuple which consists of the window that should be
106    /// announced, as well as a boolean parameter which indicates if a new
107    /// transmission of the value is recommended. If the boolean value is
108    /// `false` the new window should only be transmitted if a previous transmission
109    /// had failed.
110    pub(super) fn max_stream_data(&mut self, stream_receive_window: u64) -> (u64, ShouldTransmit) {
111        let max_stream_data = self.assembler.bytes_read() + stream_receive_window;
112
113        // Only announce a window update if it's significant enough
114        // to make it worthwhile sending a MAX_STREAM_DATA frame.
115        // We use here a fraction of the configured stream receive window to make
116        // the decision, and accommodate for streams using bigger windows requiring
117        // less updates. A fixed size would also work - but it would need to be
118        // smaller than `stream_receive_window` in order to make sure the stream
119        // does not get stuck.
120        let diff = max_stream_data - self.sent_max_stream_data;
121        let transmit = self.can_send_flow_control() && diff >= (stream_receive_window / 8);
122        (max_stream_data, ShouldTransmit(transmit))
123    }
124
125    /// Records that a `MAX_STREAM_DATA` announcing a certain window was sent
126    ///
127    /// This will suppress enqueuing further `MAX_STREAM_DATA` frames unless
128    /// either the previous transmission was not acknowledged or the window
129    /// further increased.
130    pub(super) fn record_sent_max_stream_data(&mut self, sent_value: u64) {
131        if sent_value > self.sent_max_stream_data {
132            self.sent_max_stream_data = sent_value;
133        }
134    }
135
136    /// Whether the total amount of data that the peer will send on this stream is unknown
137    ///
138    /// True until we've received either a reset or the final frame.
139    ///
140    /// Implies that the sender might benefit from stream-level flow control updates, and we might
141    /// need to issue connection-level flow control updates due to flow control budget use by this
142    /// stream in the future, even if it's been stopped.
143    pub(super) fn final_offset_unknown(&self) -> bool {
144        matches!(self.state, RecvState::Recv { size: None })
145    }
146
147    /// Whether stream-level flow control updates should be sent for this stream
148    pub(super) fn can_send_flow_control(&self) -> bool {
149        // Stream-level flow control is redundant if the sender has already sent the whole stream,
150        // and moot if we no longer want data on this stream.
151        self.final_offset_unknown() && !self.stopped
152    }
153
154    /// Whether data is still being accepted from the peer
155    pub(super) fn is_receiving(&self) -> bool {
156        matches!(self.state, RecvState::Recv { .. })
157    }
158
159    fn final_offset(&self) -> Option<u64> {
160        match self.state {
161            RecvState::Recv { size } => size,
162            RecvState::ResetRecvd { size, .. } => Some(size),
163        }
164    }
165
166    /// Returns `false` iff the reset was redundant
167    pub(super) fn reset(
168        &mut self,
169        error_code: VarInt,
170        final_offset: VarInt,
171        received: u64,
172        max_data: u64,
173    ) -> Result<bool, TransportError> {
174        // Validate final_offset
175        if let Some(offset) = self.final_offset() {
176            if offset != final_offset.into_inner() {
177                return Err(TransportError::FINAL_SIZE_ERROR("inconsistent value"));
178            }
179        } else if self.end > u64::from(final_offset) {
180            return Err(TransportError::FINAL_SIZE_ERROR(
181                "lower than high water mark",
182            ));
183        }
184        self.credit_consumed_by(final_offset.into(), received, max_data)?;
185
186        if matches!(self.state, RecvState::ResetRecvd { .. }) {
187            return Ok(false);
188        }
189        self.state = RecvState::ResetRecvd {
190            size: final_offset.into(),
191            error_code,
192        };
193        // Nuke buffers so that future reads fail immediately, which ensures future reads don't
194        // issue flow control credit redundant to that already issued. We could instead special-case
195        // reset streams during read, but it's unclear if there's any benefit to retaining data for
196        // reset streams.
197        self.assembler.clear();
198        Ok(true)
199    }
200
201    pub(super) fn reset_code(&self) -> Option<VarInt> {
202        match self.state {
203            RecvState::ResetRecvd { error_code, .. } => Some(error_code),
204            _ => None,
205        }
206    }
207
208    /// Compute the amount of flow control credit consumed, or return an error if more was consumed
209    /// than issued
210    fn credit_consumed_by(
211        &self,
212        offset: u64,
213        received: u64,
214        max_data: u64,
215    ) -> Result<u64, TransportError> {
216        let prev_end = self.end;
217        let new_bytes = offset.saturating_sub(prev_end);
218        if offset > self.sent_max_stream_data || received + new_bytes > max_data {
219            debug!(
220                received,
221                new_bytes,
222                max_data,
223                offset,
224                stream_max_data = self.sent_max_stream_data,
225                "flow control error"
226            );
227            return Err(TransportError::FLOW_CONTROL_ERROR(""));
228        }
229
230        Ok(new_bytes)
231    }
232}
233
234/// Chunks returned from [`RecvStream::read()`][crate::RecvStream::read].
235///
236/// ### Note: Finalization Needed
237/// Bytes read from the stream are not released from the congestion window until
238/// either [`Self::finalize()`] is called, or this type is dropped.
239///
240/// It is recommended that you call [`Self::finalize()`] because it returns a flag
241/// telling you whether reading from the stream has resulted in the need to transmit a packet.
242///
243/// If this type is leaked, the stream will remain blocked on the remote peer until
244/// another read from the stream is done.
245pub struct Chunks<'a> {
246    id: StreamId,
247    ordered: bool,
248    streams: &'a mut StreamsState,
249    pending: &'a mut Retransmits,
250    state: ChunksState,
251    read: u64,
252}
253
254impl<'a> Chunks<'a> {
255    pub(super) fn new(
256        id: StreamId,
257        ordered: bool,
258        streams: &'a mut StreamsState,
259        pending: &'a mut Retransmits,
260    ) -> Result<Self, ReadableError> {
261        let mut entry = match streams.recv.entry(id) {
262            Entry::Occupied(entry) => entry,
263            Entry::Vacant(_) => return Err(ReadableError::ClosedStream),
264        };
265
266        let mut recv =
267            match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
268                true => return Err(ReadableError::ClosedStream),
269                false => entry.remove().unwrap().into_inner(), // this can't fail due to the previous get_or_insert_with
270            };
271
272        recv.assembler.ensure_ordering(ordered)?;
273        Ok(Self {
274            id,
275            ordered,
276            streams,
277            pending,
278            state: ChunksState::Readable(recv),
279            read: 0,
280        })
281    }
282
283    /// Next
284    ///
285    /// Should call finalize() when done calling this.
286    pub fn next(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
287        let rs = match self.state {
288            ChunksState::Readable(ref mut rs) => rs,
289            ChunksState::Reset(error_code) => {
290                return Err(ReadError::Reset(error_code));
291            }
292            ChunksState::Finished => {
293                return Ok(None);
294            }
295            ChunksState::Finalized => panic!("must not call next() after finalize()"),
296        };
297
298        if let Some(chunk) = rs.assembler.read(max_length, self.ordered) {
299            self.read += chunk.bytes.len() as u64;
300            return Ok(Some(chunk));
301        }
302
303        match rs.state {
304            RecvState::ResetRecvd { error_code, .. } => {
305                debug_assert_eq!(self.read, 0, "reset streams have empty buffers");
306                let state = mem::replace(&mut self.state, ChunksState::Reset(error_code));
307                // At this point if we have `rs` self.state must be `ChunksState::Readable`
308                let recv = match state {
309                    ChunksState::Readable(recv) => StreamRecv::Open(recv),
310                    _ => unreachable!("state must be ChunkState::Readable"),
311                };
312                self.streams.stream_recv_freed(self.id, recv);
313                Err(ReadError::Reset(error_code))
314            }
315            RecvState::Recv { size } => {
316                if size == Some(rs.end) && rs.assembler.bytes_read() == rs.end {
317                    let state = mem::replace(&mut self.state, ChunksState::Finished);
318                    // At this point if we have `rs` self.state must be `ChunksState::Readable`
319                    let recv = match state {
320                        ChunksState::Readable(recv) => StreamRecv::Open(recv),
321                        _ => unreachable!("state must be ChunkState::Readable"),
322                    };
323                    self.streams.stream_recv_freed(self.id, recv);
324                    Ok(None)
325                } else {
326                    // We don't need a distinct `ChunksState` variant for a blocked stream because
327                    // retrying a read harmlessly re-traces our steps back to returning
328                    // `Err(Blocked)` again. The buffers can't refill and the stream's own state
329                    // can't change so long as this `Chunks` exists.
330                    Err(ReadError::Blocked)
331                }
332            }
333        }
334    }
335
336    /// Mark the read data as consumed from the stream.
337    ///
338    /// The number of read bytes will be released from the congestion window,
339    /// allowing the remote peer to send more data if it was previously blocked.
340    ///
341    /// If [`ShouldTransmit::should_transmit()`] returns `true`,
342    /// a packet needs to be sent to the peer informing them that the stream is unblocked.
343    /// This means that you should call [`Connection::poll_transmit()`][crate::Connection::poll_transmit]
344    /// and send the returned packet as soon as is reasonable, to unblock the remote peer.
345    pub fn finalize(mut self) -> ShouldTransmit {
346        self.finalize_inner()
347    }
348
349    fn finalize_inner(&mut self) -> ShouldTransmit {
350        let state = mem::replace(&mut self.state, ChunksState::Finalized);
351        if let ChunksState::Finalized = state {
352            // Noop on repeated calls
353            return ShouldTransmit(false);
354        }
355
356        // We issue additional stream ID credit after the application is notified that a previously
357        // open stream has finished or been reset and we've therefore disposed of its state, as
358        // recorded by `stream_freed` calls in `next`.
359        let mut should_transmit = self.streams.queue_max_stream_id(self.pending);
360
361        // If the stream hasn't finished, we may need to issue stream-level flow control credit
362        if let ChunksState::Readable(mut rs) = state {
363            let (_, max_stream_data) = rs.max_stream_data(self.streams.stream_receive_window);
364            should_transmit |= max_stream_data.0;
365            if max_stream_data.0 {
366                self.pending.max_stream_data.insert(self.id);
367            }
368            // Return the stream to storage for future use
369            self.streams
370                .recv
371                .insert(self.id, Some(StreamRecv::Open(rs)));
372        }
373
374        // Issue connection-level flow control credit for any data we read regardless of state
375        let max_data = self.streams.add_read_credits(self.read);
376        self.pending.max_data |= max_data.0;
377        should_transmit |= max_data.0;
378        ShouldTransmit(should_transmit)
379    }
380}
381
382impl Drop for Chunks<'_> {
383    fn drop(&mut self) {
384        let _ = self.finalize_inner();
385    }
386}
387
388enum ChunksState {
389    Readable(Box<Recv>),
390    Reset(VarInt),
391    Finished,
392    Finalized,
393}
394
395/// Errors triggered when reading from a recv stream
396#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
397pub enum ReadError {
398    /// No more data is currently available on this stream.
399    ///
400    /// If more data on this stream is received from the peer, an `Event::StreamReadable` will be
401    /// generated for this stream, indicating that retrying the read might succeed.
402    #[error("blocked")]
403    Blocked,
404    /// The peer abandoned transmitting data on this stream.
405    ///
406    /// Carries an application-defined error code.
407    #[error("reset by peer: code {0}")]
408    Reset(VarInt),
409}
410
411/// Errors triggered when opening a recv stream for reading
412#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
413pub enum ReadableError {
414    /// The stream has not been opened or was already stopped, finished, or reset
415    #[error("closed stream")]
416    ClosedStream,
417    /// Attempted an ordered read following an unordered read
418    ///
419    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
420    /// stream which cannot be recovered, making further ordered reads impossible.
421    #[error("ordered read after unordered read")]
422    IllegalOrderedRead,
423}
424
425impl From<IllegalOrderedRead> for ReadableError {
426    fn from(_: IllegalOrderedRead) -> Self {
427        Self::IllegalOrderedRead
428    }
429}
430
431#[derive(Debug, Copy, Clone, Eq, PartialEq)]
432enum RecvState {
433    Recv { size: Option<u64> },
434    ResetRecvd { size: u64, error_code: VarInt },
435}
436
437impl Default for RecvState {
438    fn default() -> Self {
439        Self::Recv { size: None }
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use bytes::Bytes;
446
447    use crate::{Dir, Side};
448
449    use super::*;
450
451    #[test]
452    fn reordered_frames_while_stopped() {
453        const INITIAL_BYTES: u64 = 3;
454        const INITIAL_OFFSET: u64 = 3;
455        const RECV_WINDOW: u64 = 8;
456        let mut s = Recv::new(RECV_WINDOW);
457        let mut data_recvd = 0;
458        // Receive bytes 3..6
459        let (new_bytes, is_closed) = s
460            .ingest(
461                frame::Stream {
462                    id: StreamId::new(Side::Client, Dir::Uni, 0),
463                    offset: INITIAL_OFFSET,
464                    fin: false,
465                    data: Bytes::from_static(&[0; INITIAL_BYTES as usize]),
466                },
467                123,
468                data_recvd,
469                data_recvd + 1024,
470            )
471            .unwrap();
472        data_recvd += new_bytes;
473        assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES);
474        assert!(!is_closed);
475
476        let (credits, transmit) = s.stop().unwrap();
477        assert!(transmit.should_transmit());
478        assert_eq!(
479            credits,
480            INITIAL_OFFSET + INITIAL_BYTES,
481            "full connection flow control credit is issued by stop"
482        );
483
484        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
485        assert!(!transmit.should_transmit());
486        assert_eq!(
487            max_stream_data, RECV_WINDOW,
488            "stream flow control credit isn't issued by stop"
489        );
490
491        // Receive byte 7
492        let (new_bytes, is_closed) = s
493            .ingest(
494                frame::Stream {
495                    id: StreamId::new(Side::Client, Dir::Uni, 0),
496                    offset: RECV_WINDOW - 1,
497                    fin: false,
498                    data: Bytes::from_static(&[0; 1]),
499                },
500                123,
501                data_recvd,
502                data_recvd + 1024,
503            )
504            .unwrap();
505        data_recvd += new_bytes;
506        assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES));
507        assert!(!is_closed);
508
509        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
510        assert!(!transmit.should_transmit());
511        assert_eq!(
512            max_stream_data, RECV_WINDOW,
513            "stream flow control credit isn't issued after stop"
514        );
515
516        // Receive bytes 0..3
517        let (new_bytes, is_closed) = s
518            .ingest(
519                frame::Stream {
520                    id: StreamId::new(Side::Client, Dir::Uni, 0),
521                    offset: 0,
522                    fin: false,
523                    data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]),
524                },
525                123,
526                data_recvd,
527                data_recvd + 1024,
528            )
529            .unwrap();
530        assert_eq!(
531            new_bytes, 0,
532            "reordered frames don't issue connection-level flow control for stopped streams"
533        );
534        assert!(!is_closed);
535
536        let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
537        assert!(!transmit.should_transmit());
538        assert_eq!(
539            max_stream_data, RECV_WINDOW,
540            "stream flow control credit isn't issued after stop"
541        );
542    }
543}