1use std::collections::hash_map::Entry;
2use std::mem;
3
4use thiserror::Error;
5use tracing::debug;
6
7use super::state::get_or_insert_recv;
8use super::{ClosedStream, Retransmits, ShouldTransmit, StreamId, StreamsState};
9use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
10use crate::connection::streams::state::StreamRecv;
11use crate::{TransportError, VarInt, frame};
12
13#[derive(Debug, Default)]
14pub(super) struct Recv {
15 state: RecvState,
17 pub(super) assembler: Assembler,
18 sent_max_stream_data: u64,
19 pub(super) end: u64,
20 pub(super) stopped: bool,
21}
22
23impl Recv {
24 pub(super) fn new(initial_max_data: u64) -> Box<Self> {
25 Box::new(Self {
26 state: RecvState::default(),
27 assembler: Assembler::new(),
28 sent_max_stream_data: initial_max_data,
29 end: 0,
30 stopped: false,
31 })
32 }
33
34 pub(super) fn reinit(&mut self, initial_max_data: u64) {
36 self.state = RecvState::default();
37 self.assembler.reinit();
38 self.sent_max_stream_data = initial_max_data;
39 self.end = 0;
40 self.stopped = false;
41 }
42
43 pub(super) fn ingest(
47 &mut self,
48 frame: frame::Stream,
49 payload_len: usize,
50 received: u64,
51 max_data: u64,
52 ) -> Result<(u64, bool), TransportError> {
53 let end = frame.offset + frame.data.len() as u64;
54 if end >= 2u64.pow(62) {
55 return Err(TransportError::FLOW_CONTROL_ERROR(
56 "maximum stream offset too large",
57 ));
58 }
59
60 if let Some(final_offset) = self.final_offset() {
61 if end > final_offset || (frame.fin && end != final_offset) {
62 debug!(end, final_offset, "final size error");
63 return Err(TransportError::FINAL_SIZE_ERROR(""));
64 }
65 }
66
67 let new_bytes = self.credit_consumed_by(end, received, max_data)?;
68
69 if frame.fin && !self.stopped {
72 if let RecvState::Recv { ref mut size } = self.state {
73 *size = Some(end);
74 }
75 }
76
77 self.end = self.end.max(end);
78 if !self.stopped {
81 self.assembler.insert(frame.offset, frame.data, payload_len);
82 }
83
84 Ok((new_bytes, frame.fin && self.stopped))
85 }
86
87 pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> {
88 if self.stopped {
89 return Err(ClosedStream { _private: () });
90 }
91
92 self.stopped = true;
93 self.assembler.clear();
94 let read_credits = self.end - self.assembler.bytes_read();
96 Ok((read_credits, ShouldTransmit(self.is_receiving())))
101 }
102
103 pub(super) fn max_stream_data(&mut self, stream_receive_window: u64) -> (u64, ShouldTransmit) {
111 let max_stream_data = self.assembler.bytes_read() + stream_receive_window;
112
113 let diff = max_stream_data - self.sent_max_stream_data;
121 let transmit = self.can_send_flow_control() && diff >= (stream_receive_window / 8);
122 (max_stream_data, ShouldTransmit(transmit))
123 }
124
125 pub(super) fn record_sent_max_stream_data(&mut self, sent_value: u64) {
131 if sent_value > self.sent_max_stream_data {
132 self.sent_max_stream_data = sent_value;
133 }
134 }
135
136 pub(super) fn final_offset_unknown(&self) -> bool {
144 matches!(self.state, RecvState::Recv { size: None })
145 }
146
147 pub(super) fn can_send_flow_control(&self) -> bool {
149 self.final_offset_unknown() && !self.stopped
152 }
153
154 pub(super) fn is_receiving(&self) -> bool {
156 matches!(self.state, RecvState::Recv { .. })
157 }
158
159 fn final_offset(&self) -> Option<u64> {
160 match self.state {
161 RecvState::Recv { size } => size,
162 RecvState::ResetRecvd { size, .. } => Some(size),
163 }
164 }
165
166 pub(super) fn reset(
168 &mut self,
169 error_code: VarInt,
170 final_offset: VarInt,
171 received: u64,
172 max_data: u64,
173 ) -> Result<bool, TransportError> {
174 if let Some(offset) = self.final_offset() {
176 if offset != final_offset.into_inner() {
177 return Err(TransportError::FINAL_SIZE_ERROR("inconsistent value"));
178 }
179 } else if self.end > u64::from(final_offset) {
180 return Err(TransportError::FINAL_SIZE_ERROR(
181 "lower than high water mark",
182 ));
183 }
184 self.credit_consumed_by(final_offset.into(), received, max_data)?;
185
186 if matches!(self.state, RecvState::ResetRecvd { .. }) {
187 return Ok(false);
188 }
189 self.state = RecvState::ResetRecvd {
190 size: final_offset.into(),
191 error_code,
192 };
193 self.assembler.clear();
198 Ok(true)
199 }
200
201 pub(super) fn reset_code(&self) -> Option<VarInt> {
202 match self.state {
203 RecvState::ResetRecvd { error_code, .. } => Some(error_code),
204 _ => None,
205 }
206 }
207
208 fn credit_consumed_by(
211 &self,
212 offset: u64,
213 received: u64,
214 max_data: u64,
215 ) -> Result<u64, TransportError> {
216 let prev_end = self.end;
217 let new_bytes = offset.saturating_sub(prev_end);
218 if offset > self.sent_max_stream_data || received + new_bytes > max_data {
219 debug!(
220 received,
221 new_bytes,
222 max_data,
223 offset,
224 stream_max_data = self.sent_max_stream_data,
225 "flow control error"
226 );
227 return Err(TransportError::FLOW_CONTROL_ERROR(""));
228 }
229
230 Ok(new_bytes)
231 }
232}
233
234pub struct Chunks<'a> {
246 id: StreamId,
247 ordered: bool,
248 streams: &'a mut StreamsState,
249 pending: &'a mut Retransmits,
250 state: ChunksState,
251 read: u64,
252}
253
254impl<'a> Chunks<'a> {
255 pub(super) fn new(
256 id: StreamId,
257 ordered: bool,
258 streams: &'a mut StreamsState,
259 pending: &'a mut Retransmits,
260 ) -> Result<Self, ReadableError> {
261 let mut entry = match streams.recv.entry(id) {
262 Entry::Occupied(entry) => entry,
263 Entry::Vacant(_) => return Err(ReadableError::ClosedStream),
264 };
265
266 let mut recv =
267 match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
268 true => return Err(ReadableError::ClosedStream),
269 false => entry.remove().unwrap().into_inner(), };
271
272 recv.assembler.ensure_ordering(ordered)?;
273 Ok(Self {
274 id,
275 ordered,
276 streams,
277 pending,
278 state: ChunksState::Readable(recv),
279 read: 0,
280 })
281 }
282
283 pub fn next(&mut self, max_length: usize) -> Result<Option<Chunk>, ReadError> {
287 let rs = match self.state {
288 ChunksState::Readable(ref mut rs) => rs,
289 ChunksState::Reset(error_code) => {
290 return Err(ReadError::Reset(error_code));
291 }
292 ChunksState::Finished => {
293 return Ok(None);
294 }
295 ChunksState::Finalized => panic!("must not call next() after finalize()"),
296 };
297
298 if let Some(chunk) = rs.assembler.read(max_length, self.ordered) {
299 self.read += chunk.bytes.len() as u64;
300 return Ok(Some(chunk));
301 }
302
303 match rs.state {
304 RecvState::ResetRecvd { error_code, .. } => {
305 debug_assert_eq!(self.read, 0, "reset streams have empty buffers");
306 let state = mem::replace(&mut self.state, ChunksState::Reset(error_code));
307 let recv = match state {
309 ChunksState::Readable(recv) => StreamRecv::Open(recv),
310 _ => unreachable!("state must be ChunkState::Readable"),
311 };
312 self.streams.stream_recv_freed(self.id, recv);
313 Err(ReadError::Reset(error_code))
314 }
315 RecvState::Recv { size } => {
316 if size == Some(rs.end) && rs.assembler.bytes_read() == rs.end {
317 let state = mem::replace(&mut self.state, ChunksState::Finished);
318 let recv = match state {
320 ChunksState::Readable(recv) => StreamRecv::Open(recv),
321 _ => unreachable!("state must be ChunkState::Readable"),
322 };
323 self.streams.stream_recv_freed(self.id, recv);
324 Ok(None)
325 } else {
326 Err(ReadError::Blocked)
331 }
332 }
333 }
334 }
335
336 pub fn finalize(mut self) -> ShouldTransmit {
346 self.finalize_inner()
347 }
348
349 fn finalize_inner(&mut self) -> ShouldTransmit {
350 let state = mem::replace(&mut self.state, ChunksState::Finalized);
351 if let ChunksState::Finalized = state {
352 return ShouldTransmit(false);
354 }
355
356 let mut should_transmit = self.streams.queue_max_stream_id(self.pending);
360
361 if let ChunksState::Readable(mut rs) = state {
363 let (_, max_stream_data) = rs.max_stream_data(self.streams.stream_receive_window);
364 should_transmit |= max_stream_data.0;
365 if max_stream_data.0 {
366 self.pending.max_stream_data.insert(self.id);
367 }
368 self.streams
370 .recv
371 .insert(self.id, Some(StreamRecv::Open(rs)));
372 }
373
374 let max_data = self.streams.add_read_credits(self.read);
376 self.pending.max_data |= max_data.0;
377 should_transmit |= max_data.0;
378 ShouldTransmit(should_transmit)
379 }
380}
381
382impl Drop for Chunks<'_> {
383 fn drop(&mut self) {
384 let _ = self.finalize_inner();
385 }
386}
387
388enum ChunksState {
389 Readable(Box<Recv>),
390 Reset(VarInt),
391 Finished,
392 Finalized,
393}
394
395#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
397pub enum ReadError {
398 #[error("blocked")]
403 Blocked,
404 #[error("reset by peer: code {0}")]
408 Reset(VarInt),
409}
410
411#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
413pub enum ReadableError {
414 #[error("closed stream")]
416 ClosedStream,
417 #[error("ordered read after unordered read")]
422 IllegalOrderedRead,
423}
424
425impl From<IllegalOrderedRead> for ReadableError {
426 fn from(_: IllegalOrderedRead) -> Self {
427 Self::IllegalOrderedRead
428 }
429}
430
431#[derive(Debug, Copy, Clone, Eq, PartialEq)]
432enum RecvState {
433 Recv { size: Option<u64> },
434 ResetRecvd { size: u64, error_code: VarInt },
435}
436
437impl Default for RecvState {
438 fn default() -> Self {
439 Self::Recv { size: None }
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use bytes::Bytes;
446
447 use crate::{Dir, Side};
448
449 use super::*;
450
451 #[test]
452 fn reordered_frames_while_stopped() {
453 const INITIAL_BYTES: u64 = 3;
454 const INITIAL_OFFSET: u64 = 3;
455 const RECV_WINDOW: u64 = 8;
456 let mut s = Recv::new(RECV_WINDOW);
457 let mut data_recvd = 0;
458 let (new_bytes, is_closed) = s
460 .ingest(
461 frame::Stream {
462 id: StreamId::new(Side::Client, Dir::Uni, 0),
463 offset: INITIAL_OFFSET,
464 fin: false,
465 data: Bytes::from_static(&[0; INITIAL_BYTES as usize]),
466 },
467 123,
468 data_recvd,
469 data_recvd + 1024,
470 )
471 .unwrap();
472 data_recvd += new_bytes;
473 assert_eq!(new_bytes, INITIAL_OFFSET + INITIAL_BYTES);
474 assert!(!is_closed);
475
476 let (credits, transmit) = s.stop().unwrap();
477 assert!(transmit.should_transmit());
478 assert_eq!(
479 credits,
480 INITIAL_OFFSET + INITIAL_BYTES,
481 "full connection flow control credit is issued by stop"
482 );
483
484 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
485 assert!(!transmit.should_transmit());
486 assert_eq!(
487 max_stream_data, RECV_WINDOW,
488 "stream flow control credit isn't issued by stop"
489 );
490
491 let (new_bytes, is_closed) = s
493 .ingest(
494 frame::Stream {
495 id: StreamId::new(Side::Client, Dir::Uni, 0),
496 offset: RECV_WINDOW - 1,
497 fin: false,
498 data: Bytes::from_static(&[0; 1]),
499 },
500 123,
501 data_recvd,
502 data_recvd + 1024,
503 )
504 .unwrap();
505 data_recvd += new_bytes;
506 assert_eq!(new_bytes, RECV_WINDOW - (INITIAL_OFFSET + INITIAL_BYTES));
507 assert!(!is_closed);
508
509 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
510 assert!(!transmit.should_transmit());
511 assert_eq!(
512 max_stream_data, RECV_WINDOW,
513 "stream flow control credit isn't issued after stop"
514 );
515
516 let (new_bytes, is_closed) = s
518 .ingest(
519 frame::Stream {
520 id: StreamId::new(Side::Client, Dir::Uni, 0),
521 offset: 0,
522 fin: false,
523 data: Bytes::from_static(&[0; INITIAL_OFFSET as usize]),
524 },
525 123,
526 data_recvd,
527 data_recvd + 1024,
528 )
529 .unwrap();
530 assert_eq!(
531 new_bytes, 0,
532 "reordered frames don't issue connection-level flow control for stopped streams"
533 );
534 assert!(!is_closed);
535
536 let (max_stream_data, transmit) = s.max_stream_data(RECV_WINDOW);
537 assert!(!transmit.should_transmit());
538 assert_eq!(
539 max_stream_data, RECV_WINDOW,
540 "stream flow control credit isn't issued after stop"
541 );
542 }
543}