1use std::{
2 cmp::Ordering,
3 collections::{BinaryHeap, binary_heap::PeekMut},
4 mem,
5};
6
7use bytes::{Buf, Bytes, BytesMut};
8
9use crate::range_set::RangeSet;
10
11#[derive(Debug, Default)]
13pub(super) struct Assembler {
14 state: State,
15 data: BinaryHeap<Buffer>,
16 buffered: usize,
18 allocated: usize,
20 bytes_read: u64,
24 end: u64,
25}
26
27impl Assembler {
28 pub(super) fn new() -> Self {
29 Self::default()
30 }
31
32 pub(super) fn reinit(&mut self) {
34 let old_data = mem::take(&mut self.data);
35 *self = Self::default();
36 self.data = old_data;
37 self.data.clear();
38 }
39
40 pub(super) fn ensure_ordering(&mut self, ordered: bool) -> Result<(), IllegalOrderedRead> {
41 if ordered && !self.state.is_ordered() {
42 return Err(IllegalOrderedRead);
43 } else if !ordered && self.state.is_ordered() {
44 if !self.data.is_empty() {
46 self.defragment();
48 }
49 let mut recvd = RangeSet::new();
50 recvd.insert(0..self.bytes_read);
51 for chunk in &self.data {
52 recvd.insert(chunk.offset..chunk.offset + chunk.bytes.len() as u64);
53 }
54 self.state = State::Unordered { recvd };
55 }
56 Ok(())
57 }
58
59 pub(super) fn read(&mut self, max_length: usize, ordered: bool) -> Option<Chunk> {
61 loop {
62 let mut chunk = self.data.peek_mut()?;
63
64 if ordered {
65 if chunk.offset > self.bytes_read {
66 return None;
68 } else if (chunk.offset + chunk.bytes.len() as u64) <= self.bytes_read {
69 self.buffered -= chunk.bytes.len();
71 self.allocated -= chunk.allocation_size;
72 PeekMut::pop(chunk);
73 continue;
74 }
75
76 let start = (self.bytes_read - chunk.offset) as usize;
78 if start > 0 {
79 chunk.bytes.advance(start);
80 chunk.offset += start as u64;
81 self.buffered -= start;
82 }
83 }
84
85 return Some(if max_length < chunk.bytes.len() {
86 self.bytes_read += max_length as u64;
87 let offset = chunk.offset;
88 chunk.offset += max_length as u64;
89 self.buffered -= max_length;
90 Chunk::new(offset, chunk.bytes.split_to(max_length))
91 } else {
92 self.bytes_read += chunk.bytes.len() as u64;
93 self.buffered -= chunk.bytes.len();
94 self.allocated -= chunk.allocation_size;
95 let chunk = PeekMut::pop(chunk);
96 Chunk::new(chunk.offset, chunk.bytes)
97 });
98 }
99 }
100
101 fn defragment(&mut self) {
106 let new = BinaryHeap::with_capacity(self.data.len());
107 let old = mem::replace(&mut self.data, new);
108 let mut buffers = old.into_sorted_vec();
109 self.buffered = 0;
110 let mut fragmented_buffered = 0;
111 let mut offset = 0;
112 for chunk in buffers.iter_mut().rev() {
113 chunk.try_mark_defragment(offset);
114 let size = chunk.bytes.len();
115 offset = chunk.offset + size as u64;
116 self.buffered += size;
117 if !chunk.defragmented {
118 fragmented_buffered += size;
119 }
120 }
121 self.allocated = self.buffered;
122 let mut buffer = BytesMut::with_capacity(fragmented_buffered);
123 let mut offset = 0;
124 for chunk in buffers.into_iter().rev() {
125 if chunk.defragmented {
126 if !chunk.bytes.is_empty() {
128 self.data.push(chunk);
129 }
130 continue;
131 }
132 if chunk.offset != offset + (buffer.len() as u64) {
134 if !buffer.is_empty() {
135 self.data
136 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
137 }
138 offset = chunk.offset;
139 }
140 buffer.extend_from_slice(&chunk.bytes);
141 }
142 if !buffer.is_empty() {
143 self.data
144 .push(Buffer::new_defragmented(offset, buffer.split().freeze()));
145 }
146 }
147
148 pub(super) fn insert(&mut self, mut offset: u64, mut bytes: Bytes, allocation_size: usize) {
151 debug_assert!(
152 bytes.len() <= allocation_size,
153 "allocation_size less than bytes.len(): {:?} < {:?}",
154 allocation_size,
155 bytes.len()
156 );
157 self.end = self.end.max(offset + bytes.len() as u64);
158 if let State::Unordered { ref mut recvd } = self.state {
159 for duplicate in recvd.replace(offset..offset + bytes.len() as u64) {
161 if duplicate.start > offset {
162 let buffer = Buffer::new(
163 offset,
164 bytes.split_to((duplicate.start - offset) as usize),
165 allocation_size,
166 );
167 self.buffered += buffer.bytes.len();
168 self.allocated += buffer.allocation_size;
169 self.data.push(buffer);
170 offset = duplicate.start;
171 }
172 bytes.advance((duplicate.end - offset) as usize);
173 offset = duplicate.end;
174 }
175 } else if offset < self.bytes_read {
176 if (offset + bytes.len() as u64) <= self.bytes_read {
177 return;
178 } else {
179 let diff = self.bytes_read - offset;
180 offset += diff;
181 bytes.advance(diff as usize);
182 }
183 }
184
185 if bytes.is_empty() {
186 return;
187 }
188 let buffer = Buffer::new(offset, bytes, allocation_size);
189 self.buffered += buffer.bytes.len();
190 self.allocated += buffer.allocation_size;
191 self.data.push(buffer);
192 let buffered = self.buffered.min((self.end - self.bytes_read) as usize);
197 let over_allocation = self.allocated - buffered;
198 let threshold = 32768.max(buffered * 3 / 2);
206 if over_allocation > threshold {
207 self.defragment()
208 }
209 }
210
211 pub(super) fn bytes_read(&self) -> u64 {
213 self.bytes_read
214 }
215
216 pub(super) fn clear(&mut self) {
218 self.data.clear();
219 self.buffered = 0;
220 self.allocated = 0;
221 }
222}
223
224#[derive(Debug, PartialEq, Eq)]
226pub struct Chunk {
227 pub offset: u64,
229 pub bytes: Bytes,
231}
232
233impl Chunk {
234 fn new(offset: u64, bytes: Bytes) -> Self {
235 Self { offset, bytes }
236 }
237}
238
239#[derive(Debug, Eq)]
240struct Buffer {
241 offset: u64,
242 bytes: Bytes,
243 allocation_size: usize,
247 defragmented: bool,
248}
249
250impl Buffer {
251 fn new(offset: u64, bytes: Bytes, allocation_size: usize) -> Self {
253 Self {
254 offset,
255 bytes,
256 allocation_size,
257 defragmented: false,
258 }
259 }
260
261 fn new_defragmented(offset: u64, bytes: Bytes) -> Self {
263 let allocation_size = bytes.len();
264 Self {
265 offset,
266 bytes,
267 allocation_size,
268 defragmented: true,
269 }
270 }
271
272 fn try_mark_defragment(&mut self, offset: u64) {
274 let duplicate = offset.saturating_sub(self.offset) as usize;
275 self.offset = self.offset.max(offset);
276 if duplicate >= self.bytes.len() {
277 self.bytes = Bytes::new();
279 self.defragmented = true;
280 self.allocation_size = 0;
281 return;
282 }
283 self.bytes.advance(duplicate);
284 self.defragmented = self.defragmented || self.bytes.len() * 6 / 5 >= self.allocation_size;
287 if self.defragmented {
288 self.allocation_size = self.bytes.len();
290 }
291 }
292}
293
294impl Ord for Buffer {
295 fn cmp(&self, other: &Self) -> Ordering {
298 self.offset
299 .cmp(&other.offset)
300 .reverse()
301 .then(self.bytes.len().cmp(&other.bytes.len()))
302 }
303}
304
305impl PartialOrd for Buffer {
306 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
307 Some(self.cmp(other))
308 }
309}
310
311impl PartialEq for Buffer {
312 fn eq(&self, other: &Self) -> bool {
313 (self.offset, self.bytes.len()) == (other.offset, other.bytes.len())
314 }
315}
316
317#[derive(Debug)]
318enum State {
319 Ordered,
320 Unordered {
321 recvd: RangeSet,
324 },
325}
326
327impl State {
328 fn is_ordered(&self) -> bool {
329 matches!(self, Self::Ordered)
330 }
331}
332
333impl Default for State {
334 fn default() -> Self {
335 Self::Ordered
336 }
337}
338
339#[derive(Debug)]
341pub struct IllegalOrderedRead;
342
343#[cfg(test)]
344mod test {
345 use super::*;
346 use assert_matches::assert_matches;
347
348 #[test]
349 fn assemble_ordered() {
350 let mut x = Assembler::new();
351 assert_matches!(next(&mut x, 32), None);
352 x.insert(0, Bytes::from_static(b"123"), 3);
353 assert_matches!(next(&mut x, 1), Some(ref y) if &y[..] == b"1");
354 assert_matches!(next(&mut x, 3), Some(ref y) if &y[..] == b"23");
355 x.insert(3, Bytes::from_static(b"456"), 3);
356 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
357 x.insert(6, Bytes::from_static(b"789"), 3);
358 x.insert(9, Bytes::from_static(b"10"), 2);
359 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"789");
360 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"10");
361 assert_matches!(next(&mut x, 32), None);
362 }
363
364 #[test]
365 fn assemble_unordered() {
366 let mut x = Assembler::new();
367 x.ensure_ordering(false).unwrap();
368 x.insert(3, Bytes::from_static(b"456"), 3);
369 assert_matches!(next(&mut x, 32), None);
370 x.insert(0, Bytes::from_static(b"123"), 3);
371 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
372 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"456");
373 assert_matches!(next(&mut x, 32), None);
374 }
375
376 #[test]
377 fn assemble_duplicate() {
378 let mut x = Assembler::new();
379 x.insert(0, Bytes::from_static(b"123"), 3);
380 x.insert(0, Bytes::from_static(b"123"), 3);
381 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
382 assert_matches!(next(&mut x, 32), None);
383 }
384
385 #[test]
386 fn assemble_duplicate_compact() {
387 let mut x = Assembler::new();
388 x.insert(0, Bytes::from_static(b"123"), 3);
389 x.insert(0, Bytes::from_static(b"123"), 3);
390 x.defragment();
391 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
392 assert_matches!(next(&mut x, 32), None);
393 }
394
395 #[test]
396 fn assemble_contained() {
397 let mut x = Assembler::new();
398 x.insert(0, Bytes::from_static(b"12345"), 5);
399 x.insert(1, Bytes::from_static(b"234"), 3);
400 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
401 assert_matches!(next(&mut x, 32), None);
402 }
403
404 #[test]
405 fn assemble_contained_compact() {
406 let mut x = Assembler::new();
407 x.insert(0, Bytes::from_static(b"12345"), 5);
408 x.insert(1, Bytes::from_static(b"234"), 3);
409 x.defragment();
410 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
411 assert_matches!(next(&mut x, 32), None);
412 }
413
414 #[test]
415 fn assemble_contains() {
416 let mut x = Assembler::new();
417 x.insert(1, Bytes::from_static(b"234"), 3);
418 x.insert(0, Bytes::from_static(b"12345"), 5);
419 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
420 assert_matches!(next(&mut x, 32), None);
421 }
422
423 #[test]
424 fn assemble_contains_compact() {
425 let mut x = Assembler::new();
426 x.insert(1, Bytes::from_static(b"234"), 3);
427 x.insert(0, Bytes::from_static(b"12345"), 5);
428 x.defragment();
429 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"12345");
430 assert_matches!(next(&mut x, 32), None);
431 }
432
433 #[test]
434 fn assemble_overlapping() {
435 let mut x = Assembler::new();
436 x.insert(0, Bytes::from_static(b"123"), 3);
437 x.insert(1, Bytes::from_static(b"234"), 3);
438 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123");
439 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"4");
440 assert_matches!(next(&mut x, 32), None);
441 }
442
443 #[test]
444 fn assemble_overlapping_compact() {
445 let mut x = Assembler::new();
446 x.insert(0, Bytes::from_static(b"123"), 4);
447 x.insert(1, Bytes::from_static(b"234"), 4);
448 x.defragment();
449 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
450 assert_matches!(next(&mut x, 32), None);
451 }
452
453 #[test]
454 fn assemble_complex() {
455 let mut x = Assembler::new();
456 x.insert(0, Bytes::from_static(b"1"), 1);
457 x.insert(2, Bytes::from_static(b"3"), 1);
458 x.insert(4, Bytes::from_static(b"5"), 1);
459 x.insert(0, Bytes::from_static(b"123456"), 6);
460 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
461 assert_matches!(next(&mut x, 32), None);
462 }
463
464 #[test]
465 fn assemble_complex_compact() {
466 let mut x = Assembler::new();
467 x.insert(0, Bytes::from_static(b"1"), 1);
468 x.insert(2, Bytes::from_static(b"3"), 1);
469 x.insert(4, Bytes::from_static(b"5"), 1);
470 x.insert(0, Bytes::from_static(b"123456"), 6);
471 x.defragment();
472 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"123456");
473 assert_matches!(next(&mut x, 32), None);
474 }
475
476 #[test]
477 fn assemble_old() {
478 let mut x = Assembler::new();
479 x.insert(0, Bytes::from_static(b"1234"), 4);
480 assert_matches!(next(&mut x, 32), Some(ref y) if &y[..] == b"1234");
481 x.insert(0, Bytes::from_static(b"1234"), 4);
482 assert_matches!(next(&mut x, 32), None);
483 }
484
485 #[test]
486 fn compact() {
487 let mut x = Assembler::new();
488 x.insert(0, Bytes::from_static(b"abc"), 4);
489 x.insert(3, Bytes::from_static(b"def"), 4);
490 x.insert(9, Bytes::from_static(b"jkl"), 4);
491 x.insert(12, Bytes::from_static(b"mno"), 4);
492 x.defragment();
493 assert_eq!(
494 next_unordered(&mut x),
495 Chunk::new(0, Bytes::from_static(b"abcdef"))
496 );
497 assert_eq!(
498 next_unordered(&mut x),
499 Chunk::new(9, Bytes::from_static(b"jklmno"))
500 );
501 }
502
503 #[test]
504 fn defrag_with_missing_prefix() {
505 let mut x = Assembler::new();
506 x.insert(3, Bytes::from_static(b"def"), 3);
507 x.defragment();
508 assert_eq!(
509 next_unordered(&mut x),
510 Chunk::new(3, Bytes::from_static(b"def"))
511 );
512 }
513
514 #[test]
515 fn defrag_read_chunk() {
516 let mut x = Assembler::new();
517 x.insert(3, Bytes::from_static(b"def"), 4);
518 x.insert(0, Bytes::from_static(b"abc"), 4);
519 x.insert(7, Bytes::from_static(b"hij"), 4);
520 x.insert(11, Bytes::from_static(b"lmn"), 4);
521 x.defragment();
522 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"abcdef");
523 x.insert(5, Bytes::from_static(b"fghijklmn"), 9);
524 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"ghijklmn");
525 x.insert(13, Bytes::from_static(b"nopq"), 4);
526 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"opq");
527 x.insert(15, Bytes::from_static(b"pqrs"), 4);
528 assert_matches!(x.read(usize::MAX, true), Some(ref y) if &y.bytes[..] == b"rs");
529 assert_matches!(x.read(usize::MAX, true), None);
530 }
531
532 #[test]
533 fn unordered_happy_path() {
534 let mut x = Assembler::new();
535 x.ensure_ordering(false).unwrap();
536 x.insert(0, Bytes::from_static(b"abc"), 3);
537 assert_eq!(
538 next_unordered(&mut x),
539 Chunk::new(0, Bytes::from_static(b"abc"))
540 );
541 assert_eq!(x.read(usize::MAX, false), None);
542 x.insert(3, Bytes::from_static(b"def"), 3);
543 assert_eq!(
544 next_unordered(&mut x),
545 Chunk::new(3, Bytes::from_static(b"def"))
546 );
547 assert_eq!(x.read(usize::MAX, false), None);
548 }
549
550 #[test]
551 fn unordered_dedup() {
552 let mut x = Assembler::new();
553 x.ensure_ordering(false).unwrap();
554 x.insert(3, Bytes::from_static(b"def"), 3);
555 assert_eq!(
556 next_unordered(&mut x),
557 Chunk::new(3, Bytes::from_static(b"def"))
558 );
559 assert_eq!(x.read(usize::MAX, false), None);
560 x.insert(0, Bytes::from_static(b"a"), 1);
561 x.insert(0, Bytes::from_static(b"abcdefghi"), 9);
562 x.insert(0, Bytes::from_static(b"abcd"), 4);
563 assert_eq!(
564 next_unordered(&mut x),
565 Chunk::new(0, Bytes::from_static(b"a"))
566 );
567 assert_eq!(
568 next_unordered(&mut x),
569 Chunk::new(1, Bytes::from_static(b"bc"))
570 );
571 assert_eq!(
572 next_unordered(&mut x),
573 Chunk::new(6, Bytes::from_static(b"ghi"))
574 );
575 assert_eq!(x.read(usize::MAX, false), None);
576 x.insert(8, Bytes::from_static(b"ijkl"), 4);
577 assert_eq!(
578 next_unordered(&mut x),
579 Chunk::new(9, Bytes::from_static(b"jkl"))
580 );
581 assert_eq!(x.read(usize::MAX, false), None);
582 x.insert(12, Bytes::from_static(b"mno"), 3);
583 assert_eq!(
584 next_unordered(&mut x),
585 Chunk::new(12, Bytes::from_static(b"mno"))
586 );
587 assert_eq!(x.read(usize::MAX, false), None);
588 x.insert(2, Bytes::from_static(b"cde"), 3);
589 assert_eq!(x.read(usize::MAX, false), None);
590 }
591
592 #[test]
593 fn chunks_dedup() {
594 let mut x = Assembler::new();
595 x.insert(3, Bytes::from_static(b"def"), 3);
596 assert_eq!(x.read(usize::MAX, true), None);
597 x.insert(0, Bytes::from_static(b"a"), 1);
598 x.insert(1, Bytes::from_static(b"bcdefghi"), 9);
599 x.insert(0, Bytes::from_static(b"abcd"), 4);
600 assert_eq!(
601 x.read(usize::MAX, true),
602 Some(Chunk::new(0, Bytes::from_static(b"abcd")))
603 );
604 assert_eq!(
605 x.read(usize::MAX, true),
606 Some(Chunk::new(4, Bytes::from_static(b"efghi")))
607 );
608 assert_eq!(x.read(usize::MAX, true), None);
609 x.insert(8, Bytes::from_static(b"ijkl"), 4);
610 assert_eq!(
611 x.read(usize::MAX, true),
612 Some(Chunk::new(9, Bytes::from_static(b"jkl")))
613 );
614 assert_eq!(x.read(usize::MAX, true), None);
615 x.insert(12, Bytes::from_static(b"mno"), 3);
616 assert_eq!(
617 x.read(usize::MAX, true),
618 Some(Chunk::new(12, Bytes::from_static(b"mno")))
619 );
620 assert_eq!(x.read(usize::MAX, true), None);
621 x.insert(2, Bytes::from_static(b"cde"), 3);
622 assert_eq!(x.read(usize::MAX, true), None);
623 }
624
625 #[test]
626 fn ordered_eager_discard() {
627 let mut x = Assembler::new();
628 x.insert(0, Bytes::from_static(b"abc"), 3);
629 assert_eq!(x.data.len(), 1);
630 assert_eq!(
631 x.read(usize::MAX, true),
632 Some(Chunk::new(0, Bytes::from_static(b"abc")))
633 );
634 x.insert(0, Bytes::from_static(b"ab"), 2);
635 assert_eq!(x.data.len(), 0);
636 x.insert(2, Bytes::from_static(b"cd"), 2);
637 assert_eq!(
638 x.data.peek(),
639 Some(&Buffer::new(3, Bytes::from_static(b"d"), 2))
640 );
641 }
642
643 #[test]
644 fn ordered_insert_unordered_read() {
645 let mut x = Assembler::new();
646 x.insert(0, Bytes::from_static(b"abc"), 3);
647 x.insert(0, Bytes::from_static(b"abc"), 3);
648 x.ensure_ordering(false).unwrap();
649 assert_eq!(
650 x.read(3, false),
651 Some(Chunk::new(0, Bytes::from_static(b"abc")))
652 );
653 assert_eq!(x.read(3, false), None);
654 }
655
656 fn next_unordered(x: &mut Assembler) -> Chunk {
657 x.read(usize::MAX, false).unwrap()
658 }
659
660 fn next(x: &mut Assembler, size: usize) -> Option<Bytes> {
661 x.read(size, true).map(|chunk| chunk.bytes)
662 }
663}