quinn_proto/connection/
assembler.rs

1use std::{
2    cmp::Ordering,
3    collections::{BinaryHeap, binary_heap::PeekMut},
4    mem,
5};
6
7use bytes::{Buf, Bytes, BytesMut};
8
9use crate::range_set::RangeSet;
10
11/// Helper to assemble unordered stream frames into an ordered stream
12#[derive(Debug, Default)]
13pub(super) struct Assembler {
14    state: State,
15    data: BinaryHeap<Buffer>,
16    /// Total number of buffered bytes, including duplicates in ordered mode.
17    buffered: usize,
18    /// Estimated number of allocated bytes, will never be less than `buffered`.
19    allocated: usize,
20    /// Number of bytes read by the application. When only ordered reads have been used, this is the
21    /// length of the contiguous prefix of the stream which has been consumed by the application,
22    /// aka the stream offset.
23    bytes_read: u64,
24    end: u64,
25}
26
27impl Assembler {
28    pub(super) fn new() -> Self {
29        Self::default()
30    }
31
32    /// Reset to the initial state
33    pub(super) fn reinit(&mut self) {
34        let old_data = mem::take(&mut self.data);
35        *self = Self::default();
36        self.data = old_data;
37        self.data.clear();
38    }
39
40    pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
41        if ordered && !self.state.is_ordered() {
42            return Err(IllegalOrderedRead);
43        } else if !ordered && self.state.is_ordered() {
44            // Enter unordered mode
45            if !self.data.is_empty() {
46                // Get rid of possible duplicates
47                self.defragment();
48            }
49            let mut recvd = RangeSet::new();
50            recvd.insert(0..self.bytes_read);
51            for chunk in &self.data {
52                recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
53            }
54            self.state = State::Unordered { recvd };
55        }
56        Ok(())
57    }
58
59    /// Get the the next chunk
60    pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
61        loop {
62            let mut chunk = self.data.peek_mut()?;
63
64            if ordered {
65                if chunk.offset > self.bytes_read {
66                    // Next chunk is after current read index
67                    return None;
68                } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
69                    // Next chunk is useless as the read index is beyond its end
70                    self.buffered -= chunk.bytes.len();
71                    self.allocated -= chunk.allocation_size;
72                    PeekMut::pop(chunk);
73                    continue;
74                }
75
76                // Determine `start` and `len` of the slice of useful data in chunk
77                let start = (self.bytes_read - chunk.offset) as usize;
78                if start > 0 {
79                    chunk.bytes.advance(start);
80                    chunk.offset += start as u64;
81                    self.buffered -= start;
82                }
83            }
84
85            return Some(if max_length < chunk.bytes.len() {
86                self.bytes_read += max_length as u64;
87                let offset = chunk.offset;
88                chunk.offset += max_length as u64;
89                self.buffered -= max_length;
90                Chunk::new(offset, chunk.bytes.split_to(max_length))
91            } else {
92                self.bytes_read += chunk.bytes.len() as u64;
93                self.buffered -= chunk.bytes.len();
94                self.allocated -= chunk.allocation_size;
95                let chunk = PeekMut::pop(chunk);
96                Chunk::new(chunk.offset, chunk.bytes)
97            });
98        }
99    }
100
101    /// Copy fragmented chunk data to new chunks backed by a single buffer
102    ///
103    /// This makes sure we're not unnecessarily holding on to many larger allocations.
104    /// We merge contiguous chunks in the process of doing so.
105    fn defragment(&mut self) {
106        let new = BinaryHeap::with_capacity(self.data.len());
107        let old = mem::replace(&mut self.data, new);
108        let mut buffers = old.into_sorted_vec();
109        self.buffered = 0;
110        let mut fragmented_buffered = 0;
111        let mut offset = 0;
112        for chunk in buffers.iter_mut().rev() {
113            chunk.try_mark_defragment(offset);
114            let size = chunk.bytes.len();
115            offset = chunk.offset + size as u64;
116            self.buffered += size;
117            if !chunk.defragmented {
118                fragmented_buffered += size;
119            }
120        }
121        self.allocated = self.buffered;
122        let mut buffer = BytesMut::with_capacity(fragmented_buffered);
123        let mut offset = 0;
124        for chunk in buffers.into_iter().rev() {
125            if chunk.defragmented {
126                // bytes might be empty after try_mark_defragment
127                if !chunk.bytes.is_empty() {
128                    self.data.push(chunk);
129                }
130                continue;
131            }
132            // Overlap is resolved by try_mark_defragment
133            if chunk.offset != offset + (buffer.len() as u64) {
134                if !buffer.is_empty() {
135                    self.data
136                        .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
137                }
138                offset = chunk.offset;
139            }
140            buffer.extend_from_slice(&chunk.bytes);
141        }
142        if !buffer.is_empty() {
143            self.data
144                .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
145        }
146    }
147
148    // Note: If a packet contains many frames from the same stream, the estimated over-allocation
149    // will be much higher because we are counting the same allocation multiple times.
150    pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
151        debug_assert!(
152            bytes.len() <= allocation_size,
153            "allocation_size less than bytes.len(): {:?} < {:?}",
154            allocation_size,
155            bytes.len()
156        );
157        self.end = self.end.max(offset + bytes.len() as u64);
158        if let State::Unordered { ref mut recvd } = self.state {
159            // Discard duplicate data
160            for duplicate in recvd.replace(offset..offset + bytes.len() as u64) {
161                if duplicate.start > offset {
162                    let buffer = Buffer::new(
163                        offset,
164                        bytes.split_to((duplicate.start - offset) as usize),
165                        allocation_size,
166                    );
167                    self.buffered += buffer.bytes.len();
168                    self.allocated += buffer.allocation_size;
169                    self.data.push(buffer);
170                    offset = duplicate.start;
171                }
172                bytes.advance((duplicate.end - offset) as usize);
173                offset = duplicate.end;
174            }
175        } else if offset < self.bytes_read {
176            if (offset + bytes.len() as u64) <= self.bytes_read {
177                return;
178            } else {
179                let diff = self.bytes_read - offset;
180                offset += diff;
181                bytes.advance(diff as usize);
182            }
183        }
184
185        if bytes.is_empty() {
186            return;
187        }
188        let buffer = Buffer::new(offset, bytes, allocation_size);
189        self.buffered += buffer.bytes.len();
190        self.allocated += buffer.allocation_size;
191        self.data.push(buffer);
192        // `self.buffered` also counts duplicate bytes, therefore we use
193        // `self.end - self.bytes_read` as an upper bound of buffered unique
194        // bytes. This will cause a defragmentation if the amount of duplicate
195        // bytes exceedes a proportion of the receive window size.
196        let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
197        let over_allocation = self.allocated - buffered;
198        // Rationale: on the one hand, we want to defragment rarely, ideally never
199        // in non-pathological scenarios. However, a pathological or malicious
200        // peer could send us one-byte frames, and since we use reference-counted
201        // buffers in order to prevent copying, this could result in keeping a lot
202        // of memory allocated. This limits over-allocation in proportion to the
203        // buffered data. The constants are chosen somewhat arbitrarily and try to
204        // balance between defragmentation overhead and over-allocation.
205        let threshold = 32768.max(buffered * 3 / 2);
206        if over_allocation > threshold {
207            self.defragment()
208        }
209    }
210
211    /// Number of bytes consumed by the application
212    pub(super) fn bytes_read(&self) -> u64 {
213        self.bytes_read
214    }
215
216    /// Discard all buffered data
217    pub(super) fn clear(&mut self) {
218        self.data.clear();
219        self.buffered = 0;
220        self.allocated = 0;
221    }
222}
223
224/// A chunk of data from the receive stream
225#[derive(Debug, PartialEq, Eq)]
226pub struct Chunk {
227    /// The offset in the stream
228    pub offset: u64,
229    /// The contents of the chunk
230    pub bytes: Bytes,
231}
232
233impl Chunk {
234    fn new(offset: u64, bytes: Bytes) -> Self {
235        Self { offset, bytes }
236    }
237}
238
239#[derive(Debug, Eq)]
240struct Buffer {
241    offset: u64,
242    bytes: Bytes,
243    /// Size of the allocation behind `bytes`, if `defragmented == false`.
244    /// Otherwise this will be set to `bytes.len()` by `try_mark_defragment`.
245    /// Will never be less than `bytes.len()`.
246    allocation_size: usize,
247    defragmented: bool,
248}
249
250impl Buffer {
251    /// Constructs a new fragmented Buffer
252    fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
253        Self {
254            offset,
255            bytes,
256            allocation_size,
257            defragmented: false,
258        }
259    }
260
261    /// Constructs a new defragmented Buffer
262    fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
263        let allocation_size = bytes.len();
264        Self {
265            offset,
266            bytes,
267            allocation_size,
268            defragmented: true,
269        }
270    }
271
272    /// Discards data before `offset` and flags `self` as defragmented if it has good utilization
273    fn try_mark_defragment(&mut self, offset: u64) {
274        let duplicate = offset.saturating_sub(self.offset) as usize;
275        self.offset = self.offset.max(offset);
276        if duplicate >= self.bytes.len() {
277            // All bytes are duplicate
278            self.bytes = Bytes::new();
279            self.defragmented = true;
280            self.allocation_size = 0;
281            return;
282        }
283        self.bytes.advance(duplicate);
284        // Make sure that fragmented buffers with high utilization become defragmented and
285        // defragmented buffers remain defragmented
286        self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
287        if self.defragmented {
288            // Make sure that defragmented buffers do not contribute to over-allocation
289            self.allocation_size = self.bytes.len();
290        }
291    }
292}
293
294impl Ord for Buffer {
295    // Invert ordering based on offset (max-heap, min offset first),
296    // prioritize longer chunks at the same offset.
297    fn cmp(&self, other: &Self) -> Ordering {
298        self.offset
299            .cmp(&other.offset)
300            .reverse()
301            .then(self.bytes.len().cmp(&other.bytes.len()))
302    }
303}
304
305impl PartialOrd for Buffer {
306    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
307        Some(self.cmp(other))
308    }
309}
310
311impl PartialEq for Buffer {
312    fn eq(&self, other: &Self) -> bool {
313        (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
314    }
315}
316
317#[derive(Debug)]
318enum State {
319    Ordered,
320    Unordered {
321        /// The set of offsets that have been received from the peer, including portions not yet
322        /// read by the application.
323        recvd: RangeSet,
324    },
325}
326
327impl State {
328    fn is_ordered(&self) -> bool {
329        matches!(self, Self::Ordered)
330    }
331}
332
333impl Default for State {
334    fn default() -> Self {
335        Self::Ordered
336    }
337}
338
339/// Error indicating that an ordered read was performed on a stream after an unordered read
340#[derive(Debug)]
341pub struct IllegalOrderedRead;
342
343#[cfg(test)]
344mod test {
345    use super::*;
346    use assert_matches::assert_matches;
347
348    #[test]
349    fn assemble_ordered() {
350        let mut x = Assembler::new();
351        assert_matches!(next(&mut x, 32), None);
352        x.insert(0, Bytes::from_static(b"123"), 3);
353        assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
354        assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
355        x.insert(3, Bytes::from_static(b"456"), 3);
356        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
357        x.insert(6, Bytes::from_static(b"789"), 3);
358        x.insert(9, Bytes::from_static(b"10"), 2);
359        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
360        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
361        assert_matches!(next(&mut x, 32), None);
362    }
363
364    #[test]
365    fn assemble_unordered() {
366        let mut x = Assembler::new();
367        x.ensure_ordering(false).unwrap();
368        x.insert(3, Bytes::from_static(b"456"), 3);
369        assert_matches!(next(&mut x, 32), None);
370        x.insert(0, Bytes::from_static(b"123"), 3);
371        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
372        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
373        assert_matches!(next(&mut x, 32), None);
374    }
375
376    #[test]
377    fn assemble_duplicate() {
378        let mut x = Assembler::new();
379        x.insert(0, Bytes::from_static(b"123"), 3);
380        x.insert(0, Bytes::from_static(b"123"), 3);
381        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
382        assert_matches!(next(&mut x, 32), None);
383    }
384
385    #[test]
386    fn assemble_duplicate_compact() {
387        let mut x = Assembler::new();
388        x.insert(0, Bytes::from_static(b"123"), 3);
389        x.insert(0, Bytes::from_static(b"123"), 3);
390        x.defragment();
391        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
392        assert_matches!(next(&mut x, 32), None);
393    }
394
395    #[test]
396    fn assemble_contained() {
397        let mut x = Assembler::new();
398        x.insert(0, Bytes::from_static(b"12345"), 5);
399        x.insert(1, Bytes::from_static(b"234"), 3);
400        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
401        assert_matches!(next(&mut x, 32), None);
402    }
403
404    #[test]
405    fn assemble_contained_compact() {
406        let mut x = Assembler::new();
407        x.insert(0, Bytes::from_static(b"12345"), 5);
408        x.insert(1, Bytes::from_static(b"234"), 3);
409        x.defragment();
410        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
411        assert_matches!(next(&mut x, 32), None);
412    }
413
414    #[test]
415    fn assemble_contains() {
416        let mut x = Assembler::new();
417        x.insert(1, Bytes::from_static(b"234"), 3);
418        x.insert(0, Bytes::from_static(b"12345"), 5);
419        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
420        assert_matches!(next(&mut x, 32), None);
421    }
422
423    #[test]
424    fn assemble_contains_compact() {
425        let mut x = Assembler::new();
426        x.insert(1, Bytes::from_static(b"234"), 3);
427        x.insert(0, Bytes::from_static(b"12345"), 5);
428        x.defragment();
429        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
430        assert_matches!(next(&mut x, 32), None);
431    }
432
433    #[test]
434    fn assemble_overlapping() {
435        let mut x = Assembler::new();
436        x.insert(0, Bytes::from_static(b"123"), 3);
437        x.insert(1, Bytes::from_static(b"234"), 3);
438        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
439        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
440        assert_matches!(next(&mut x, 32), None);
441    }
442
443    #[test]
444    fn assemble_overlapping_compact() {
445        let mut x = Assembler::new();
446        x.insert(0, Bytes::from_static(b"123"), 4);
447        x.insert(1, Bytes::from_static(b"234"), 4);
448        x.defragment();
449        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
450        assert_matches!(next(&mut x, 32), None);
451    }
452
453    #[test]
454    fn assemble_complex() {
455        let mut x = Assembler::new();
456        x.insert(0, Bytes::from_static(b"1"), 1);
457        x.insert(2, Bytes::from_static(b"3"), 1);
458        x.insert(4, Bytes::from_static(b"5"), 1);
459        x.insert(0, Bytes::from_static(b"123456"), 6);
460        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
461        assert_matches!(next(&mut x, 32), None);
462    }
463
464    #[test]
465    fn assemble_complex_compact() {
466        let mut x = Assembler::new();
467        x.insert(0, Bytes::from_static(b"1"), 1);
468        x.insert(2, Bytes::from_static(b"3"), 1);
469        x.insert(4, Bytes::from_static(b"5"), 1);
470        x.insert(0, Bytes::from_static(b"123456"), 6);
471        x.defragment();
472        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
473        assert_matches!(next(&mut x, 32), None);
474    }
475
476    #[test]
477    fn assemble_old() {
478        let mut x = Assembler::new();
479        x.insert(0, Bytes::from_static(b"1234"), 4);
480        assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
481        x.insert(0, Bytes::from_static(b"1234"), 4);
482        assert_matches!(next(&mut x, 32), None);
483    }
484
485    #[test]
486    fn compact() {
487        let mut x = Assembler::new();
488        x.insert(0, Bytes::from_static(b"abc"), 4);
489        x.insert(3, Bytes::from_static(b"def"), 4);
490        x.insert(9, Bytes::from_static(b"jkl"), 4);
491        x.insert(12, Bytes::from_static(b"mno"), 4);
492        x.defragment();
493        assert_eq!(
494            next_unordered(&mut x),
495            Chunk::new(0, Bytes::from_static(b"abcdef"))
496        );
497        assert_eq!(
498            next_unordered(&mut x),
499            Chunk::new(9, Bytes::from_static(b"jklmno"))
500        );
501    }
502
503    #[test]
504    fn defrag_with_missing_prefix() {
505        let mut x = Assembler::new();
506        x.insert(3, Bytes::from_static(b"def"), 3);
507        x.defragment();
508        assert_eq!(
509            next_unordered(&mut x),
510            Chunk::new(3, Bytes::from_static(b"def"))
511        );
512    }
513
514    #[test]
515    fn defrag_read_chunk() {
516        let mut x = Assembler::new();
517        x.insert(3, Bytes::from_static(b"def"), 4);
518        x.insert(0, Bytes::from_static(b"abc"), 4);
519        x.insert(7, Bytes::from_static(b"hij"), 4);
520        x.insert(11, Bytes::from_static(b"lmn"), 4);
521        x.defragment();
522        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
523        x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
524        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
525        x.insert(13, Bytes::from_static(b"nopq"), 4);
526        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
527        x.insert(15, Bytes::from_static(b"pqrs"), 4);
528        assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
529        assert_matches!(x.read(usize::MAX, true), None);
530    }
531
532    #[test]
533    fn unordered_happy_path() {
534        let mut x = Assembler::new();
535        x.ensure_ordering(false).unwrap();
536        x.insert(0, Bytes::from_static(b"abc"), 3);
537        assert_eq!(
538            next_unordered(&mut x),
539            Chunk::new(0, Bytes::from_static(b"abc"))
540        );
541        assert_eq!(x.read(usize::MAX, false), None);
542        x.insert(3, Bytes::from_static(b"def"), 3);
543        assert_eq!(
544            next_unordered(&mut x),
545            Chunk::new(3, Bytes::from_static(b"def"))
546        );
547        assert_eq!(x.read(usize::MAX, false), None);
548    }
549
550    #[test]
551    fn unordered_dedup() {
552        let mut x = Assembler::new();
553        x.ensure_ordering(false).unwrap();
554        x.insert(3, Bytes::from_static(b"def"), 3);
555        assert_eq!(
556            next_unordered(&mut x),
557            Chunk::new(3, Bytes::from_static(b"def"))
558        );
559        assert_eq!(x.read(usize::MAX, false), None);
560        x.insert(0, Bytes::from_static(b"a"), 1);
561        x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
562        x.insert(0, Bytes::from_static(b"abcd"), 4);
563        assert_eq!(
564            next_unordered(&mut x),
565            Chunk::new(0, Bytes::from_static(b"a"))
566        );
567        assert_eq!(
568            next_unordered(&mut x),
569            Chunk::new(1, Bytes::from_static(b"bc"))
570        );
571        assert_eq!(
572            next_unordered(&mut x),
573            Chunk::new(6, Bytes::from_static(b"ghi"))
574        );
575        assert_eq!(x.read(usize::MAX, false), None);
576        x.insert(8, Bytes::from_static(b"ijkl"), 4);
577        assert_eq!(
578            next_unordered(&mut x),
579            Chunk::new(9, Bytes::from_static(b"jkl"))
580        );
581        assert_eq!(x.read(usize::MAX, false), None);
582        x.insert(12, Bytes::from_static(b"mno"), 3);
583        assert_eq!(
584            next_unordered(&mut x),
585            Chunk::new(12, Bytes::from_static(b"mno"))
586        );
587        assert_eq!(x.read(usize::MAX, false), None);
588        x.insert(2, Bytes::from_static(b"cde"), 3);
589        assert_eq!(x.read(usize::MAX, false), None);
590    }
591
592    #[test]
593    fn chunks_dedup() {
594        let mut x = Assembler::new();
595        x.insert(3, Bytes::from_static(b"def"), 3);
596        assert_eq!(x.read(usize::MAX, true), None);
597        x.insert(0, Bytes::from_static(b"a"), 1);
598        x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
599        x.insert(0, Bytes::from_static(b"abcd"), 4);
600        assert_eq!(
601            x.read(usize::MAX, true),
602            Some(Chunk::new(0, Bytes::from_static(b"abcd")))
603        );
604        assert_eq!(
605            x.read(usize::MAX, true),
606            Some(Chunk::new(4, Bytes::from_static(b"efghi")))
607        );
608        assert_eq!(x.read(usize::MAX, true), None);
609        x.insert(8, Bytes::from_static(b"ijkl"), 4);
610        assert_eq!(
611            x.read(usize::MAX, true),
612            Some(Chunk::new(9, Bytes::from_static(b"jkl")))
613        );
614        assert_eq!(x.read(usize::MAX, true), None);
615        x.insert(12, Bytes::from_static(b"mno"), 3);
616        assert_eq!(
617            x.read(usize::MAX, true),
618            Some(Chunk::new(12, Bytes::from_static(b"mno")))
619        );
620        assert_eq!(x.read(usize::MAX, true), None);
621        x.insert(2, Bytes::from_static(b"cde"), 3);
622        assert_eq!(x.read(usize::MAX, true), None);
623    }
624
625    #[test]
626    fn ordered_eager_discard() {
627        let mut x = Assembler::new();
628        x.insert(0, Bytes::from_static(b"abc"), 3);
629        assert_eq!(x.data.len(), 1);
630        assert_eq!(
631            x.read(usize::MAX, true),
632            Some(Chunk::new(0, Bytes::from_static(b"abc")))
633        );
634        x.insert(0, Bytes::from_static(b"ab"), 2);
635        assert_eq!(x.data.len(), 0);
636        x.insert(2, Bytes::from_static(b"cd"), 2);
637        assert_eq!(
638            x.data.peek(),
639            Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
640        );
641    }
642
643    #[test]
644    fn ordered_insert_unordered_read() {
645        let mut x = Assembler::new();
646        x.insert(0, Bytes::from_static(b"abc"), 3);
647        x.insert(0, Bytes::from_static(b"abc"), 3);
648        x.ensure_ordering(false).unwrap();
649        assert_eq!(
650            x.read(3, false),
651            Some(Chunk::new(0, Bytes::from_static(b"abc")))
652        );
653        assert_eq!(x.read(3, false), None);
654    }
655
656    fn next_unordered(x: &mut Assembler) -> Chunk {
657        x.read(usize::MAX, false).unwrap()
658    }
659
660    fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
661        x.read(size, true).map(|chunk| chunk.bytes)
662    }
663}