quinn/
recv_stream.rs

1use std::{
2    future::{Future, poll_fn},
3    io,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use bytes::Bytes;
9use proto::{Chunk, Chunks, ClosedStream, ConnectionError, ReadableError, StreamId};
10use thiserror::Error;
11use tokio::io::ReadBuf;
12
13use crate::{VarInt, connection::ConnectionRef};
14
15/// A stream that can only be used to receive data
16///
17/// `stop(0)` is implicitly called on drop unless:
18/// - A variant of [`ReadError`] has been yielded by a read call
19/// - [`stop()`] was called explicitly
20///
21/// # Cancellation
22///
23/// A `read` method is said to be *cancel-safe* when dropping its future before the future becomes
24/// ready cannot lead to loss of stream data. This is true of methods which succeed immediately when
25/// any progress is made, and is not true of methods which might need to perform multiple reads
26/// internally before succeeding. Each `read` method documents whether it is cancel-safe.
27///
28/// # Common issues
29///
30/// ## Data never received on a locally-opened stream
31///
32/// Peers are not notified of streams until they or a later-numbered stream are used to send
33/// data. If a bidirectional stream is locally opened but never used to send, then the peer may
34/// never see it. Application protocols should always arrange for the endpoint which will first
35/// transmit on a stream to be the endpoint responsible for opening it.
36///
37/// ## Data never received on a remotely-opened stream
38///
39/// Verify that the stream you are receiving is the same one that the server is sending on, e.g. by
40/// logging the [`id`] of each. Streams are always accepted in the same order as they are created,
41/// i.e. ascending order by [`StreamId`]. For example, even if a sender first transmits on
42/// bidirectional stream 1, the first stream yielded by [`Connection::accept_bi`] on the receiver
43/// will be bidirectional stream 0.
44///
45/// [`ReadError`]: crate::ReadError
46/// [`stop()`]: RecvStream::stop
47/// [`SendStream::finish`]: crate::SendStream::finish
48/// [`WriteError::Stopped`]: crate::WriteError::Stopped
49/// [`id`]: RecvStream::id
50/// [`Connection::accept_bi`]: crate::Connection::accept_bi
51#[derive(Debug)]
52pub struct RecvStream {
53    conn: ConnectionRef,
54    stream: StreamId,
55    is_0rtt: bool,
56    all_data_read: bool,
57    reset: Option<VarInt>,
58}
59
60impl RecvStream {
61    pub(crate) fn new(conn: ConnectionRef, stream: StreamId, is_0rtt: bool) -> Self {
62        Self {
63            conn,
64            stream,
65            is_0rtt,
66            all_data_read: false,
67            reset: None,
68        }
69    }
70
71    /// Read data contiguously from the stream.
72    ///
73    /// Yields the number of bytes read into `buf` on success, or `None` if the stream was finished.
74    ///
75    /// This operation is cancel-safe.
76    pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
77        Read {
78            stream: self,
79            buf: ReadBuf::new(buf),
80        }
81        .await
82    }
83
84    /// Read an exact number of bytes contiguously from the stream.
85    ///
86    /// See [`read()`] for details. This operation is *not* cancel-safe.
87    ///
88    /// [`read()`]: RecvStream::read
89    pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ReadExactError> {
90        ReadExact {
91            stream: self,
92            buf: ReadBuf::new(buf),
93        }
94        .await
95    }
96
97    /// Attempts to read from the stream into the provided buffer
98    ///
99    /// On success, returns `Poll::Ready(Ok(num_bytes_read))` and places data into `buf`. If this
100    /// returns zero bytes read (and `buf` has a non-zero length), that indicates that the remote
101    /// side has [`finish`]ed the stream and the local side has already read all bytes.
102    ///
103    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
104    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
105    /// closed.
106    ///
107    /// [`finish`]: crate::SendStream::finish
108    pub fn poll_read(
109        &mut self,
110        cx: &mut Context,
111        buf: &mut [u8],
112    ) -> Poll<Result<usize, ReadError>> {
113        let mut buf = ReadBuf::new(buf);
114        ready!(self.poll_read_buf(cx, &mut buf))?;
115        Poll::Ready(Ok(buf.filled().len()))
116    }
117
118    /// Attempts to read from the stream into the provided buffer, which may be uninitialized
119    ///
120    /// On success, returns `Poll::Ready(Ok(()))` and places data into the unfilled portion of
121    /// `buf`. If this does not write any bytes to `buf` (and `buf.remaining()` is non-zero), that
122    /// indicates that the remote side has [`finish`]ed the stream and the local side has already
123    /// read all bytes.
124    ///
125    /// If no data is available for reading, this returns `Poll::Pending` and arranges for the
126    /// current task (via `cx.waker()`) to be notified when the stream becomes readable or is
127    /// closed.
128    ///
129    /// [`finish`]: crate::SendStream::finish
130    pub fn poll_read_buf(
131        &mut self,
132        cx: &mut Context,
133        buf: &mut ReadBuf<'_>,
134    ) -> Poll<Result<(), ReadError>> {
135        if buf.remaining() == 0 {
136            return Poll::Ready(Ok(()));
137        }
138
139        self.poll_read_generic(cx, true, |chunks| {
140            let mut read = false;
141            loop {
142                if buf.remaining() == 0 {
143                    // We know `read` is `true` because `buf.remaining()` was not 0 before
144                    return ReadStatus::Readable(());
145                }
146
147                match chunks.next(buf.remaining()) {
148                    Ok(Some(chunk)) => {
149                        buf.put_slice(&chunk.bytes);
150                        read = true;
151                    }
152                    res => return (if read { Some(()) } else { None }, res.err()).into(),
153                }
154            }
155        })
156        .map(|res| res.map(|_| ()))
157    }
158
159    /// Read the next segment of data
160    ///
161    /// Yields `None` if the stream was finished. Otherwise, yields a segment of data and its
162    /// offset in the stream. If `ordered` is `true`, the chunk's offset will be immediately after
163    /// the last data yielded by `read()` or `read_chunk()`. If `ordered` is `false`, segments may
164    /// be received in any order, and the `Chunk`'s `offset` field can be used to determine
165    /// ordering in the caller. Unordered reads are less prone to head-of-line blocking within a
166    /// stream, but require the application to manage reassembling the original data.
167    ///
168    /// Slightly more efficient than `read` due to not copying. Chunk boundaries do not correspond
169    /// to peer writes, and hence cannot be used as framing.
170    ///
171    /// This operation is cancel-safe.
172    pub async fn read_chunk(
173        &mut self,
174        max_length: usize,
175        ordered: bool,
176    ) -> Result<Option<Chunk>, ReadError> {
177        ReadChunk {
178            stream: self,
179            max_length,
180            ordered,
181        }
182        .await
183    }
184
185    /// Attempts to read a chunk from the stream.
186    ///
187    /// On success, returns `Poll::Ready(Ok(Some(chunk)))`. If `Poll::Ready(Ok(None))`
188    /// is returned, it implies that EOF has been reached.
189    ///
190    /// If no data is available for reading, the method returns `Poll::Pending`
191    /// and arranges for the current task (via cx.waker()) to receive a notification
192    /// when the stream becomes readable or is closed.
193    fn poll_read_chunk(
194        &mut self,
195        cx: &mut Context,
196        max_length: usize,
197        ordered: bool,
198    ) -> Poll<Result<Option<Chunk>, ReadError>> {
199        self.poll_read_generic(cx, ordered, |chunks| match chunks.next(max_length) {
200            Ok(Some(chunk)) => ReadStatus::Readable(chunk),
201            res => (None, res.err()).into(),
202        })
203    }
204
205    /// Read the next segments of data
206    ///
207    /// Fills `bufs` with the segments of data beginning immediately after the
208    /// last data yielded by `read` or `read_chunk`, or `None` if the stream was
209    /// finished.
210    ///
211    /// Slightly more efficient than `read` due to not copying. Chunk boundaries
212    /// do not correspond to peer writes, and hence cannot be used as framing.
213    ///
214    /// This operation is cancel-safe.
215    pub async fn read_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Option<usize>, ReadError> {
216        ReadChunks { stream: self, bufs }.await
217    }
218
219    /// Foundation of [`Self::read_chunks`]
220    fn poll_read_chunks(
221        &mut self,
222        cx: &mut Context,
223        bufs: &mut [Bytes],
224    ) -> Poll<Result<Option<usize>, ReadError>> {
225        if bufs.is_empty() {
226            return Poll::Ready(Ok(Some(0)));
227        }
228
229        self.poll_read_generic(cx, true, |chunks| {
230            let mut read = 0;
231            loop {
232                if read >= bufs.len() {
233                    // We know `read > 0` because `bufs` cannot be empty here
234                    return ReadStatus::Readable(read);
235                }
236
237                match chunks.next(usize::MAX) {
238                    Ok(Some(chunk)) => {
239                        bufs[read] = chunk.bytes;
240                        read += 1;
241                    }
242                    res => return (if read == 0 { None } else { Some(read) }, res.err()).into(),
243                }
244            }
245        })
246    }
247
248    /// Convenience method to read all remaining data into a buffer
249    ///
250    /// Fails with [`ReadToEndError::TooLong`] on reading more than `size_limit` bytes, discarding
251    /// all data read. Uses unordered reads to be more efficient than using `AsyncRead` would
252    /// allow. `size_limit` should be set to limit worst-case memory use.
253    ///
254    /// If unordered reads have already been made, the resulting buffer may have gaps containing
255    /// arbitrary data.
256    ///
257    /// This operation is *not* cancel-safe.
258    ///
259    /// [`ReadToEndError::TooLong`]: crate::ReadToEndError::TooLong
260    pub async fn read_to_end(&mut self, size_limit: usize) -> Result<Vec<u8>, ReadToEndError> {
261        ReadToEnd {
262            stream: self,
263            size_limit,
264            read: Vec::new(),
265            start: u64::MAX,
266            end: 0,
267        }
268        .await
269    }
270
271    /// Stop accepting data
272    ///
273    /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
274    /// attempts to operate on a stream will yield `ClosedStream` errors.
275    pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
276        let mut conn = self.conn.state.lock("RecvStream::stop");
277        if self.is_0rtt && conn.check_0rtt().is_err() {
278            return Ok(());
279        }
280        conn.inner.recv_stream(self.stream).stop(error_code)?;
281        conn.wake();
282        self.all_data_read = true;
283        Ok(())
284    }
285
286    /// Check if this stream has been opened during 0-RTT.
287    ///
288    /// In which case any non-idempotent request should be considered dangerous at the application
289    /// level. Because read data is subject to replay attacks.
290    pub fn is_0rtt(&self) -> bool {
291        self.is_0rtt
292    }
293
294    /// Get the identity of this stream
295    pub fn id(&self) -> StreamId {
296        self.stream
297    }
298
299    /// Completes when the stream has been reset by the peer or otherwise closed
300    ///
301    /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
302    /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
303    /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
304    /// which it is no longer meaningful for the stream to be reset.
305    ///
306    /// This operation is cancel-safe.
307    pub async fn received_reset(&mut self) -> Result<Option<VarInt>, ResetError> {
308        poll_fn(|cx| {
309            let mut conn = self.conn.state.lock("RecvStream::reset");
310            if self.is_0rtt && conn.check_0rtt().is_err() {
311                return Poll::Ready(Err(ResetError::ZeroRttRejected));
312            }
313
314            if let Some(code) = self.reset {
315                return Poll::Ready(Ok(Some(code)));
316            }
317
318            match conn.inner.recv_stream(self.stream).received_reset() {
319                Err(_) => Poll::Ready(Ok(None)),
320                Ok(Some(error_code)) => {
321                    // Stream state has just now been freed, so the connection may need to issue new
322                    // stream ID flow control credit
323                    conn.wake();
324                    Poll::Ready(Ok(Some(error_code)))
325                }
326                Ok(None) => {
327                    if let Some(e) = &conn.error {
328                        return Poll::Ready(Err(e.clone().into()));
329                    }
330                    // Resets always notify readers, since a reset is an immediate read error. We
331                    // could introduce a dedicated channel to reduce the risk of spurious wakeups,
332                    // but that increased complexity is probably not justified, as an application
333                    // that is expecting a reset is not likely to receive large amounts of data.
334                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
335                    Poll::Pending
336                }
337            }
338        })
339        .await
340    }
341
342    /// Handle common logic related to reading out of a receive stream
343    ///
344    /// This takes an `FnMut` closure that takes care of the actual reading process, matching
345    /// the detailed read semantics for the calling function with a particular return type.
346    /// The closure can read from the passed `&mut Chunks` and has to return the status after
347    /// reading: the amount of data read, and the status after the final read call.
348    fn poll_read_generic<T, U>(
349        &mut self,
350        cx: &mut Context,
351        ordered: bool,
352        mut read_fn: T,
353    ) -> Poll<Result<Option<U>, ReadError>>
354    where
355        T: FnMut(&mut Chunks) -> ReadStatus<U>,
356    {
357        use proto::ReadError::*;
358        if self.all_data_read {
359            return Poll::Ready(Ok(None));
360        }
361
362        let mut conn = self.conn.state.lock("RecvStream::poll_read");
363        if self.is_0rtt {
364            conn.check_0rtt().map_err(|()| ReadError::ZeroRttRejected)?;
365        }
366
367        // If we stored an error during a previous call, return it now. This can happen if a
368        // `read_fn` both wants to return data and also returns an error in its final stream status.
369        let status = match self.reset {
370            Some(code) => ReadStatus::Failed(None, Reset(code)),
371            None => {
372                let mut recv = conn.inner.recv_stream(self.stream);
373                let mut chunks = recv.read(ordered)?;
374                let status = read_fn(&mut chunks);
375                if chunks.finalize().should_transmit() {
376                    conn.wake();
377                }
378                status
379            }
380        };
381
382        match status {
383            ReadStatus::Readable(read) => Poll::Ready(Ok(Some(read))),
384            ReadStatus::Finished(read) => {
385                self.all_data_read = true;
386                Poll::Ready(Ok(read))
387            }
388            ReadStatus::Failed(read, Blocked) => match read {
389                Some(val) => Poll::Ready(Ok(Some(val))),
390                None => {
391                    if let Some(ref x) = conn.error {
392                        return Poll::Ready(Err(ReadError::ConnectionLost(x.clone())));
393                    }
394                    conn.blocked_readers.insert(self.stream, cx.waker().clone());
395                    Poll::Pending
396                }
397            },
398            ReadStatus::Failed(read, Reset(error_code)) => match read {
399                None => {
400                    self.all_data_read = true;
401                    self.reset = Some(error_code);
402                    Poll::Ready(Err(ReadError::Reset(error_code)))
403                }
404                done => {
405                    self.reset = Some(error_code);
406                    Poll::Ready(Ok(done))
407                }
408            },
409        }
410    }
411}
412
413enum ReadStatus<T> {
414    Readable(T),
415    Finished(Option<T>),
416    Failed(Option<T>, proto::ReadError),
417}
418
419impl<T> From<(Option<T>, Option<proto::ReadError>)> for ReadStatus<T> {
420    fn from(status: (Option<T>, Option<proto::ReadError>)) -> Self {
421        match status {
422            (read, None) => Self::Finished(read),
423            (read, Some(e)) => Self::Failed(read, e),
424        }
425    }
426}
427
428/// Future produced by [`RecvStream::read_to_end()`].
429///
430/// [`RecvStream::read_to_end()`]: crate::RecvStream::read_to_end
431struct ReadToEnd<'a> {
432    stream: &'a mut RecvStream,
433    read: Vec<(Bytes, u64)>,
434    start: u64,
435    end: u64,
436    size_limit: usize,
437}
438
439impl Future for ReadToEnd<'_> {
440    type Output = Result<Vec<u8>, ReadToEndError>;
441    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
442        loop {
443            match ready!(self.stream.poll_read_chunk(cx, usize::MAX, false))? {
444                Some(chunk) => {
445                    self.start = self.start.min(chunk.offset);
446                    let end = chunk.bytes.len() as u64 + chunk.offset;
447                    if (end - self.start) > self.size_limit as u64 {
448                        return Poll::Ready(Err(ReadToEndError::TooLong));
449                    }
450                    self.end = self.end.max(end);
451                    self.read.push((chunk.bytes, chunk.offset));
452                }
453                None => {
454                    if self.end == 0 {
455                        // Never received anything
456                        return Poll::Ready(Ok(Vec::new()));
457                    }
458                    let start = self.start;
459                    let mut buffer = vec![0; (self.end - start) as usize];
460                    for (data, offset) in self.read.drain(..) {
461                        let offset = (offset - start) as usize;
462                        buffer[offset..offset + data.len()].copy_from_slice(&data);
463                    }
464                    return Poll::Ready(Ok(buffer));
465                }
466            }
467        }
468    }
469}
470
471/// Errors from [`RecvStream::read_to_end`]
472#[derive(Debug, Error, Clone, PartialEq, Eq)]
473pub enum ReadToEndError {
474    /// An error occurred during reading
475    #[error("read error: {0}")]
476    Read(#[from] ReadError),
477    /// The stream is larger than the user-supplied limit
478    #[error("stream too long")]
479    TooLong,
480}
481
482#[cfg(feature = "futures-io")]
483impl futures_io::AsyncRead for RecvStream {
484    fn poll_read(
485        self: Pin<&mut Self>,
486        cx: &mut Context,
487        buf: &mut [u8],
488    ) -> Poll<io::Result<usize>> {
489        let mut buf = ReadBuf::new(buf);
490        ready!(Self::poll_read_buf(self.get_mut(), cx, &mut buf))?;
491        Poll::Ready(Ok(buf.filled().len()))
492    }
493}
494
495impl tokio::io::AsyncRead for RecvStream {
496    fn poll_read(
497        self: Pin<&mut Self>,
498        cx: &mut Context<'_>,
499        buf: &mut ReadBuf<'_>,
500    ) -> Poll<io::Result<()>> {
501        ready!(Self::poll_read_buf(self.get_mut(), cx, buf))?;
502        Poll::Ready(Ok(()))
503    }
504}
505
506impl Drop for RecvStream {
507    fn drop(&mut self) {
508        let mut conn = self.conn.state.lock("RecvStream::drop");
509
510        // clean up any previously registered wakers
511        conn.blocked_readers.remove(&self.stream);
512
513        if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) {
514            return;
515        }
516        if !self.all_data_read {
517            // Ignore ClosedStream errors
518            let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
519            conn.wake();
520        }
521    }
522}
523
524/// Errors that arise from reading from a stream.
525#[derive(Debug, Error, Clone, PartialEq, Eq)]
526pub enum ReadError {
527    /// The peer abandoned transmitting data on this stream
528    ///
529    /// Carries an application-defined error code.
530    #[error("stream reset by peer: error {0}")]
531    Reset(VarInt),
532    /// The connection was lost
533    #[error("connection lost")]
534    ConnectionLost(#[from] ConnectionError),
535    /// The stream has already been stopped, finished, or reset
536    #[error("closed stream")]
537    ClosedStream,
538    /// Attempted an ordered read following an unordered read
539    ///
540    /// Performing an unordered read allows discontinuities to arise in the receive buffer of a
541    /// stream which cannot be recovered, making further ordered reads impossible.
542    #[error("ordered read after unordered read")]
543    IllegalOrderedRead,
544    /// This was a 0-RTT stream and the server rejected it
545    ///
546    /// Can only occur on clients for 0-RTT streams, which can be opened using
547    /// [`Connecting::into_0rtt()`].
548    ///
549    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
550    #[error("0-RTT rejected")]
551    ZeroRttRejected,
552}
553
554impl From<ReadableError> for ReadError {
555    fn from(e: ReadableError) -> Self {
556        match e {
557            ReadableError::ClosedStream => Self::ClosedStream,
558            ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
559        }
560    }
561}
562
563impl From<ResetError> for ReadError {
564    fn from(e: ResetError) -> Self {
565        match e {
566            ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
567            ResetError::ZeroRttRejected => Self::ZeroRttRejected,
568        }
569    }
570}
571
572impl From<ReadError> for io::Error {
573    fn from(x: ReadError) -> Self {
574        use ReadError::*;
575        let kind = match x {
576            Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
577            ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
578            IllegalOrderedRead => io::ErrorKind::InvalidInput,
579        };
580        Self::new(kind, x)
581    }
582}
583
584/// Errors that arise while waiting for a stream to be reset
585#[derive(Debug, Error, Clone, PartialEq, Eq)]
586pub enum ResetError {
587    /// The connection was lost
588    #[error("connection lost")]
589    ConnectionLost(#[from] ConnectionError),
590    /// This was a 0-RTT stream and the server rejected it
591    ///
592    /// Can only occur on clients for 0-RTT streams, which can be opened using
593    /// [`Connecting::into_0rtt()`].
594    ///
595    /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
596    #[error("0-RTT rejected")]
597    ZeroRttRejected,
598}
599
600impl From<ResetError> for io::Error {
601    fn from(x: ResetError) -> Self {
602        use ResetError::*;
603        let kind = match x {
604            ZeroRttRejected => io::ErrorKind::ConnectionReset,
605            ConnectionLost(_) => io::ErrorKind::NotConnected,
606        };
607        Self::new(kind, x)
608    }
609}
610
611/// Future produced by [`RecvStream::read()`].
612///
613/// [`RecvStream::read()`]: crate::RecvStream::read
614struct Read<'a> {
615    stream: &'a mut RecvStream,
616    buf: ReadBuf<'a>,
617}
618
619impl Future for Read<'_> {
620    type Output = Result<Option<usize>, ReadError>;
621
622    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
623        let this = self.get_mut();
624        ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
625        match this.buf.filled().len() {
626            0 if this.buf.capacity() != 0 => Poll::Ready(Ok(None)),
627            n => Poll::Ready(Ok(Some(n))),
628        }
629    }
630}
631
632/// Future produced by [`RecvStream::read_exact()`].
633///
634/// [`RecvStream::read_exact()`]: crate::RecvStream::read_exact
635struct ReadExact<'a> {
636    stream: &'a mut RecvStream,
637    buf: ReadBuf<'a>,
638}
639
640impl Future for ReadExact<'_> {
641    type Output = Result<(), ReadExactError>;
642    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
643        let this = self.get_mut();
644        let mut remaining = this.buf.remaining();
645        while remaining > 0 {
646            ready!(this.stream.poll_read_buf(cx, &mut this.buf))?;
647            let new = this.buf.remaining();
648            if new == remaining {
649                return Poll::Ready(Err(ReadExactError::FinishedEarly(this.buf.filled().len())));
650            }
651            remaining = new;
652        }
653        Poll::Ready(Ok(()))
654    }
655}
656
657/// Errors that arise from reading from a stream.
658#[derive(Debug, Error, Clone, PartialEq, Eq)]
659pub enum ReadExactError {
660    /// The stream finished before all bytes were read
661    #[error("stream finished early ({0} bytes read)")]
662    FinishedEarly(usize),
663    /// A read error occurred
664    #[error(transparent)]
665    ReadError(#[from] ReadError),
666}
667
668/// Future produced by [`RecvStream::read_chunk()`].
669///
670/// [`RecvStream::read_chunk()`]: crate::RecvStream::read_chunk
671struct ReadChunk<'a> {
672    stream: &'a mut RecvStream,
673    max_length: usize,
674    ordered: bool,
675}
676
677impl Future for ReadChunk<'_> {
678    type Output = Result<Option<Chunk>, ReadError>;
679    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
680        let (max_length, ordered) = (self.max_length, self.ordered);
681        self.stream.poll_read_chunk(cx, max_length, ordered)
682    }
683}
684
685/// Future produced by [`RecvStream::read_chunks()`].
686///
687/// [`RecvStream::read_chunks()`]: crate::RecvStream::read_chunks
688struct ReadChunks<'a> {
689    stream: &'a mut RecvStream,
690    bufs: &'a mut [Bytes],
691}
692
693impl Future for ReadChunks<'_> {
694    type Output = Result<Option<usize>, ReadError>;
695    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
696        let this = self.get_mut();
697        this.stream.poll_read_chunks(cx, this.bufs)
698    }
699}