quinn_proto/connection/streams/
send.rs1use bytes::Bytes;
2use thiserror::Error;
3
4use crate::{VarInt, connection::send_buffer::SendBuffer, frame};
5
6#[derive(Debug)]
7pub(super) struct Send {
8 pub(super) max_data: u64,
9 pub(super) state: SendState,
10 pub(super) pending: SendBuffer,
11 pub(super) priority: i32,
12 pub(super) fin_pending: bool,
14 pub(super) connection_blocked: bool,
16 pub(super) stop_reason: Option<VarInt>,
18}
19
20impl Send {
21 pub(super) fn new(max_data: VarInt) -> Box<Self> {
22 Box::new(Self {
23 max_data: max_data.into(),
24 state: SendState::Ready,
25 pending: SendBuffer::new(),
26 priority: 0,
27 fin_pending: false,
28 connection_blocked: false,
29 stop_reason: None,
30 })
31 }
32
33 pub(super) fn is_reset(&self) -> bool {
35 matches!(self.state, SendState::ResetSent)
36 }
37
38 pub(super) fn finish(&mut self) -> Result<(), FinishError> {
39 if let Some(error_code) = self.stop_reason {
40 Err(FinishError::Stopped(error_code))
41 } else if self.state == SendState::Ready {
42 self.state = SendState::DataSent {
43 finish_acked: false,
44 };
45 self.fin_pending = true;
46 Ok(())
47 } else {
48 Err(FinishError::ClosedStream)
49 }
50 }
51
52 pub(super) fn write<S: BytesSource>(
53 &mut self,
54 source: &mut S,
55 limit: u64,
56 ) -> Result<Written, WriteError> {
57 if !self.is_writable() {
58 return Err(WriteError::ClosedStream);
59 }
60 if let Some(error_code) = self.stop_reason {
61 return Err(WriteError::Stopped(error_code));
62 }
63 let budget = self.max_data - self.pending.offset();
64 if budget == 0 {
65 return Err(WriteError::Blocked);
66 }
67 let mut limit = limit.min(budget) as usize;
68
69 let mut result = Written::default();
70 loop {
71 let (chunk, chunks_consumed) = source.pop_chunk(limit);
72 result.chunks += chunks_consumed;
73 result.bytes += chunk.len();
74
75 if chunk.is_empty() {
76 break;
77 }
78
79 limit -= chunk.len();
80 self.pending.write(chunk);
81 }
82
83 Ok(result)
84 }
85
86 pub(super) fn reset(&mut self) {
88 use SendState::*;
89 if let DataSent { .. } | Ready = self.state {
90 self.state = ResetSent;
91 }
92 }
93
94 pub(super) fn try_stop(&mut self, error_code: VarInt) -> bool {
99 if self.stop_reason.is_none() {
100 self.stop_reason = Some(error_code);
101 true
102 } else {
103 false
104 }
105 }
106
107 pub(super) fn ack(&mut self, frame: frame::StreamMeta) -> bool {
109 self.pending.ack(frame.offsets);
110 match self.state {
111 SendState::DataSent {
112 ref mut finish_acked,
113 } => {
114 *finish_acked |= frame.fin;
115 *finish_acked && self.pending.is_fully_acked()
116 }
117 _ => false,
118 }
119 }
120
121 pub(super) fn increase_max_data(&mut self, offset: u64) -> bool {
125 if offset <= self.max_data || self.state != SendState::Ready {
126 return false;
127 }
128 let was_blocked = self.pending.offset() == self.max_data;
129 self.max_data = offset;
130 was_blocked
131 }
132
133 pub(super) fn offset(&self) -> u64 {
134 self.pending.offset()
135 }
136
137 pub(super) fn is_pending(&self) -> bool {
138 self.pending.has_unsent_data() || self.fin_pending
139 }
140
141 pub(super) fn is_writable(&self) -> bool {
142 matches!(self.state, SendState::Ready)
143 }
144}
145
146pub(crate) struct BytesArray<'a> {
151 chunks: &'a mut [Bytes],
153 consumed: usize,
155}
156
157impl<'a> BytesArray<'a> {
158 pub(crate) fn from_chunks(chunks: &'a mut [Bytes]) -> Self {
159 Self {
160 chunks,
161 consumed: 0,
162 }
163 }
164}
165
166impl BytesSource for BytesArray<'_> {
167 fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
168 let mut chunks_consumed = 0;
171
172 while self.consumed < self.chunks.len() {
173 let chunk = &mut self.chunks[self.consumed];
174
175 if chunk.len() <= limit {
176 let chunk = std::mem::take(chunk);
177 self.consumed += 1;
178 chunks_consumed += 1;
179 if chunk.is_empty() {
180 continue;
181 }
182 return (chunk, chunks_consumed);
183 } else if limit > 0 {
184 let chunk = chunk.split_to(limit);
185 return (chunk, chunks_consumed);
186 } else {
187 break;
188 }
189 }
190
191 (Bytes::new(), chunks_consumed)
192 }
193}
194
195pub(crate) struct ByteSlice<'a> {
201 data: &'a [u8],
203}
204
205impl<'a> ByteSlice<'a> {
206 pub(crate) fn from_slice(data: &'a [u8]) -> Self {
207 Self { data }
208 }
209}
210
211impl BytesSource for ByteSlice<'_> {
212 fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
213 let limit = limit.min(self.data.len());
214 if limit == 0 {
215 return (Bytes::new(), 0);
216 }
217
218 let chunk = Bytes::from(self.data[..limit].to_owned());
219 self.data = &self.data[chunk.len()..];
220
221 let chunks_consumed = usize::from(self.data.is_empty());
222 (chunk, chunks_consumed)
223 }
224}
225
226pub trait BytesSource {
231 fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
244}
245
246#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
248pub struct Written {
249 pub bytes: usize,
251 pub chunks: usize,
255}
256
257#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
259pub enum WriteError {
260 #[error("unable to accept further writes")]
267 Blocked,
268 #[error("stopped by peer: code {0}")]
275 Stopped(VarInt),
276 #[error("closed stream")]
278 ClosedStream,
279}
280
281#[derive(Debug, Copy, Clone, Eq, PartialEq)]
282pub(super) enum SendState {
283 Ready,
285 DataSent { finish_acked: bool },
287 ResetSent,
289}
290
291#[derive(Debug, Error, Clone, PartialEq, Eq)]
293pub enum FinishError {
294 #[error("stopped by peer: code {0}")]
301 Stopped(VarInt),
302 #[error("closed stream")]
304 ClosedStream,
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn bytes_array() {
313 let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
314 for limit in 0..full.len() {
315 let mut chunks = [
316 Bytes::from_static(b""),
317 Bytes::from_static(b"Hello "),
318 Bytes::from_static(b"Wo"),
319 Bytes::from_static(b""),
320 Bytes::from_static(b"r"),
321 Bytes::from_static(b"ld"),
322 Bytes::from_static(b""),
323 Bytes::from_static(b" 12345678"),
324 Bytes::from_static(b"9 ABCDE"),
325 Bytes::from_static(b"F"),
326 Bytes::from_static(b"GHJIJKLMNOPQRSTUVWXYZ"),
327 ];
328 let num_chunks = chunks.len();
329 let last_chunk_len = chunks[chunks.len() - 1].len();
330
331 let mut array = BytesArray::from_chunks(&mut chunks);
332
333 let mut buf = Vec::new();
334 let mut chunks_popped = 0;
335 let mut chunks_consumed = 0;
336 let mut remaining = limit;
337 loop {
338 let (chunk, consumed) = array.pop_chunk(remaining);
339 chunks_consumed += consumed;
340
341 if !chunk.is_empty() {
342 buf.extend_from_slice(&chunk);
343 remaining -= chunk.len();
344 chunks_popped += 1;
345 } else {
346 break;
347 }
348 }
349
350 assert_eq!(&buf[..], &full[..limit]);
351
352 if limit == full.len() {
353 assert_eq!(chunks_consumed, num_chunks);
355 assert_eq!(chunks_consumed, chunks_popped + 3);
357 } else if limit > full.len() - last_chunk_len {
358 assert_eq!(chunks_consumed, num_chunks - 1);
360 assert_eq!(chunks_consumed, chunks_popped + 2);
361 }
362 }
363 }
364
365 #[test]
366 fn byte_slice() {
367 let full = b"Hello World 123456789 ABCDEFGHJIJKLMNOPQRSTUVWXYZ".to_owned();
368 for limit in 0..full.len() {
369 let mut array = ByteSlice::from_slice(&full[..]);
370
371 let mut buf = Vec::new();
372 let mut chunks_popped = 0;
373 let mut chunks_consumed = 0;
374 let mut remaining = limit;
375 loop {
376 let (chunk, consumed) = array.pop_chunk(remaining);
377 chunks_consumed += consumed;
378
379 if !chunk.is_empty() {
380 buf.extend_from_slice(&chunk);
381 remaining -= chunk.len();
382 chunks_popped += 1;
383 } else {
384 break;
385 }
386 }
387
388 assert_eq!(&buf[..], &full[..limit]);
389 if limit != 0 {
390 assert_eq!(chunks_popped, 1);
391 } else {
392 assert_eq!(chunks_popped, 0);
393 }
394
395 if limit == full.len() {
396 assert_eq!(chunks_consumed, 1);
397 } else {
398 assert_eq!(chunks_consumed, 0);
399 }
400 }
401 }
402}