1use std::{
2 cmp,
3 collections::VecDeque,
4 convert::TryFrom,
5 fmt, io, mem,
6 net::{IpAddr, SocketAddr},
7 sync::Arc,
8};
9
10use bytes::{Bytes, BytesMut};
11use frame::StreamMetaVec;
12use rand::{Rng, SeedableRng, rngs::StdRng};
13use thiserror::Error;
14use tracing::{debug, error, trace, trace_span, warn};
15
16use crate::{
17 Dir, Duration, EndpointConfig, Frame, INITIAL_MTU, Instant, MAX_CID_SIZE, MAX_STREAM_COUNT,
18 MIN_INITIAL_SIZE, Side, StreamId, TIMER_GRANULARITY, TokenStore, Transmit, TransportError,
19 TransportErrorCode, VarInt,
20 cid_generator::ConnectionIdGenerator,
21 cid_queue::CidQueue,
22 coding::BufMutExt,
23 config::{ServerConfig, TransportConfig},
24 crypto::{self, KeyPair, Keys, PacketKey},
25 frame::{self, Close, Datagram, FrameStruct, NewToken},
26 packet::{
27 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, LongType, Packet,
28 PacketNumber, PartialDecode, SpaceId,
29 },
30 range_set::ArrayRangeSet,
31 shared::{
32 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
33 EndpointEvent, EndpointEventInner,
34 },
35 token::{ResetToken, Token, TokenPayload},
36 transport_parameters::TransportParameters,
37};
38
39mod ack_frequency;
40use ack_frequency::AckFrequencyState;
41
42mod assembler;
43pub use assembler::Chunk;
44
45mod cid_state;
46use cid_state::CidState;
47
48mod datagrams;
49use datagrams::DatagramState;
50pub use datagrams::{Datagrams, SendDatagramError};
51
52mod mtud;
53mod pacing;
54
55mod packet_builder;
56use packet_builder::PacketBuilder;
57
58mod packet_crypto;
59use packet_crypto::{PrevCrypto, ZeroRttCrypto};
60
61mod paths;
62pub use paths::RttEstimator;
63use paths::{PathData, PathResponses};
64
65mod send_buffer;
66
67mod spaces;
68#[cfg(fuzzing)]
69pub use spaces::Retransmits;
70#[cfg(not(fuzzing))]
71use spaces::Retransmits;
72use spaces::{PacketNumberFilter, PacketSpace, SendableFrames, SentPacket, ThinRetransmits};
73
74mod stats;
75pub use stats::{ConnectionStats, FrameStats, PathStats, UdpStats};
76
77mod streams;
78#[cfg(fuzzing)]
79pub use streams::StreamsState;
80#[cfg(not(fuzzing))]
81use streams::StreamsState;
82pub use streams::{
83 BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream,
84 SendStream, ShouldTransmit, StreamEvent, Streams, WriteError, Written,
85};
86
87mod timer;
88use crate::congestion::Controller;
89use timer::{Timer, TimerTable};
90
91pub struct Connection {
131 endpoint_config: Arc<EndpointConfig>,
132 config: Arc<TransportConfig>,
133 rng: StdRng,
134 crypto: Box<dyn crypto::Session>,
135 handshake_cid: ConnectionId,
137 rem_handshake_cid: ConnectionId,
139 local_ip: Option<IpAddr>,
142 path: PathData,
143 allow_mtud: bool,
145 prev_path: Option<(ConnectionId, PathData)>,
146 state: State,
147 side: ConnectionSide,
148 zero_rtt_enabled: bool,
150 zero_rtt_crypto: Option<ZeroRttCrypto>,
152 key_phase: bool,
153 key_phase_size: u64,
155 peer_params: TransportParameters,
157 orig_rem_cid: ConnectionId,
159 initial_dst_cid: ConnectionId,
161 retry_src_cid: Option<ConnectionId>,
164 lost_packets: u64,
166 events: VecDeque<Event>,
167 endpoint_events: VecDeque<EndpointEventInner>,
168 spin_enabled: bool,
170 spin: bool,
172 spaces: [PacketSpace; 3],
174 highest_space: SpaceId,
176 prev_crypto: Option<PrevCrypto>,
178 next_crypto: Option<KeyPair<Box<dyn PacketKey>>>,
183 accepted_0rtt: bool,
184 permit_idle_reset: bool,
186 idle_timeout: Option<Duration>,
188 timers: TimerTable,
189 authentication_failures: u64,
191 error: Option<ConnectionError>,
193 packet_number_filter: PacketNumberFilter,
195
196 path_responses: PathResponses,
201 close: bool,
202
203 ack_frequency: AckFrequencyState,
207
208 pto_count: u32,
213
214 receiving_ecn: bool,
219 total_authed_packets: u64,
221 app_limited: bool,
224
225 streams: StreamsState,
226 rem_cids: CidQueue,
228 local_cid_state: CidState,
230 datagrams: DatagramState,
232 stats: ConnectionStats,
234 version: u32,
236}
237
238impl Connection {
239 pub(crate) fn new(
240 endpoint_config: Arc<EndpointConfig>,
241 config: Arc<TransportConfig>,
242 init_cid: ConnectionId,
243 loc_cid: ConnectionId,
244 rem_cid: ConnectionId,
245 remote: SocketAddr,
246 local_ip: Option<IpAddr>,
247 crypto: Box<dyn crypto::Session>,
248 cid_gen: &dyn ConnectionIdGenerator,
249 now: Instant,
250 version: u32,
251 allow_mtud: bool,
252 rng_seed: [u8; 32],
253 side_args: SideArgs,
254 ) -> Self {
255 let pref_addr_cid = side_args.pref_addr_cid();
256 let path_validated = side_args.path_validated();
257 let connection_side = ConnectionSide::from(side_args);
258 let side = connection_side.side();
259 let initial_space = PacketSpace {
260 crypto: Some(crypto.initial_keys(&init_cid, side)),
261 ..PacketSpace::new(now)
262 };
263 let state = State::Handshake(state::Handshake {
264 rem_cid_set: side.is_server(),
265 expected_token: Bytes::new(),
266 client_hello: None,
267 });
268 let mut rng = StdRng::from_seed(rng_seed);
269 let mut this = Self {
270 endpoint_config,
271 crypto,
272 handshake_cid: loc_cid,
273 rem_handshake_cid: rem_cid,
274 local_cid_state: CidState::new(
275 cid_gen.cid_len(),
276 cid_gen.cid_lifetime(),
277 now,
278 if pref_addr_cid.is_some() { 2 } else { 1 },
279 ),
280 path: PathData::new(remote, allow_mtud, None, now, &config),
281 allow_mtud,
282 local_ip,
283 prev_path: None,
284 state,
285 side: connection_side,
286 zero_rtt_enabled: false,
287 zero_rtt_crypto: None,
288 key_phase: false,
289 key_phase_size: rng.random_range(10..1000),
296 peer_params: TransportParameters::default(),
297 orig_rem_cid: rem_cid,
298 initial_dst_cid: init_cid,
299 retry_src_cid: None,
300 lost_packets: 0,
301 events: VecDeque::new(),
302 endpoint_events: VecDeque::new(),
303 spin_enabled: config.allow_spin && rng.random_ratio(7, 8),
304 spin: false,
305 spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
306 highest_space: SpaceId::Initial,
307 prev_crypto: None,
308 next_crypto: None,
309 accepted_0rtt: false,
310 permit_idle_reset: true,
311 idle_timeout: match config.max_idle_timeout {
312 None | Some(VarInt(0)) => None,
313 Some(dur) => Some(Duration::from_millis(dur.0)),
314 },
315 timers: TimerTable::default(),
316 authentication_failures: 0,
317 error: None,
318 #[cfg(test)]
319 packet_number_filter: match config.deterministic_packet_numbers {
320 false => PacketNumberFilter::new(&mut rng),
321 true => PacketNumberFilter::disabled(),
322 },
323 #[cfg(not(test))]
324 packet_number_filter: PacketNumberFilter::new(&mut rng),
325
326 path_responses: PathResponses::default(),
327 close: false,
328
329 ack_frequency: AckFrequencyState::new(get_max_ack_delay(
330 &TransportParameters::default(),
331 )),
332
333 pto_count: 0,
334
335 app_limited: false,
336 receiving_ecn: false,
337 total_authed_packets: 0,
338
339 streams: StreamsState::new(
340 side,
341 config.max_concurrent_uni_streams,
342 config.max_concurrent_bidi_streams,
343 config.send_window,
344 config.receive_window,
345 config.stream_receive_window,
346 ),
347 datagrams: DatagramState::default(),
348 config,
349 rem_cids: CidQueue::new(rem_cid),
350 rng,
351 stats: ConnectionStats::default(),
352 version,
353 };
354 if path_validated {
355 this.on_path_validated();
356 }
357 if side.is_client() {
358 this.write_crypto();
360 this.init_0rtt();
361 }
362 this
363 }
364
365 #[must_use]
373 pub fn poll_timeout(&mut self) -> Option<Instant> {
374 self.timers.next_timeout()
375 }
376
377 #[must_use]
383 pub fn poll(&mut self) -> Option<Event> {
384 if let Some(x) = self.events.pop_front() {
385 return Some(x);
386 }
387
388 if let Some(event) = self.streams.poll() {
389 return Some(Event::Stream(event));
390 }
391
392 if let Some(err) = self.error.take() {
393 return Some(Event::ConnectionLost { reason: err });
394 }
395
396 None
397 }
398
399 #[must_use]
401 pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
402 self.endpoint_events.pop_front().map(EndpointEvent)
403 }
404
405 #[must_use]
407 pub fn streams(&mut self) -> Streams<'_> {
408 Streams {
409 state: &mut self.streams,
410 conn_state: &self.state,
411 }
412 }
413
414 #[must_use]
416 pub fn recv_stream(&mut self, id: StreamId) -> RecvStream<'_> {
417 assert!(id.dir() == Dir::Bi || id.initiator() != self.side.side());
418 RecvStream {
419 id,
420 state: &mut self.streams,
421 pending: &mut self.spaces[SpaceId::Data].pending,
422 }
423 }
424
425 #[must_use]
427 pub fn send_stream(&mut self, id: StreamId) -> SendStream<'_> {
428 assert!(id.dir() == Dir::Bi || id.initiator() == self.side.side());
429 SendStream {
430 id,
431 state: &mut self.streams,
432 pending: &mut self.spaces[SpaceId::Data].pending,
433 conn_state: &self.state,
434 }
435 }
436
437 #[must_use]
447 pub fn poll_transmit(
448 &mut self,
449 now: Instant,
450 max_datagrams: usize,
451 buf: &mut Vec<u8>,
452 ) -> Option<Transmit> {
453 assert!(max_datagrams != 0);
454 let max_datagrams = match self.config.enable_segmentation_offload {
455 false => 1,
456 true => max_datagrams,
457 };
458
459 let mut num_datagrams = 0;
460 let mut datagram_start = 0;
463 let mut segment_size = usize::from(self.path.current_mtu());
464
465 if let Some(challenge) = self.send_path_challenge(now, buf) {
466 return Some(challenge);
467 }
468
469 for space in SpaceId::iter() {
471 let request_immediate_ack =
472 space == SpaceId::Data && self.peer_supports_ack_frequency();
473 self.spaces[space].maybe_queue_probe(request_immediate_ack, &self.streams);
474 }
475
476 let close = match self.state {
478 State::Drained => {
479 self.app_limited = true;
480 return None;
481 }
482 State::Draining | State::Closed(_) => {
483 if !self.close {
486 self.app_limited = true;
487 return None;
488 }
489 true
490 }
491 _ => false,
492 };
493
494 if let Some(config) = &self.config.ack_frequency_config {
496 self.spaces[SpaceId::Data].pending.ack_frequency = self
497 .ack_frequency
498 .should_send_ack_frequency(self.path.rtt.get(), config, &self.peer_params)
499 && self.highest_space == SpaceId::Data
500 && self.peer_supports_ack_frequency();
501 }
502
503 let mut buf_capacity = 0;
507
508 let mut coalesce = true;
509 let mut builder_storage: Option<PacketBuilder> = None;
510 let mut sent_frames = None;
511 let mut pad_datagram = false;
512 let mut congestion_blocked = false;
513
514 let mut space_idx = 0;
516 let spaces = [SpaceId::Initial, SpaceId::Handshake, SpaceId::Data];
517 while space_idx < spaces.len() {
520 let space_id = spaces[space_idx];
521 let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
528 let frame_space_1rtt =
529 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
530
531 let can_send = self.space_can_send(space_id, frame_space_1rtt);
533 if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
534 space_idx += 1;
535 continue;
536 }
537
538 let mut ack_eliciting = !self.spaces[space_id].pending.is_empty(&self.streams)
539 || self.spaces[space_id].ping_pending
540 || self.spaces[space_id].immediate_ack_pending;
541 if space_id == SpaceId::Data {
542 ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
543 }
544
545 let buf_end = if let Some(builder) = &builder_storage {
549 buf.len().max(builder.min_size) + builder.tag_len
550 } else {
551 buf.len()
552 };
553
554 let tag_len = if let Some(ref crypto) = self.spaces[space_id].crypto {
555 crypto.packet.local.tag_len()
556 } else if space_id == SpaceId::Data {
557 self.zero_rtt_crypto.as_ref().expect(
558 "sending packets in the application data space requires known 0-RTT or 1-RTT keys",
559 ).packet.tag_len()
560 } else {
561 unreachable!("tried to send {:?} packet without keys", space_id)
562 };
563 if !coalesce || buf_capacity - buf_end < MIN_PACKET_SPACE + tag_len {
564 if num_datagrams >= max_datagrams {
568 break;
570 }
571
572 if self
579 .path
580 .anti_amplification_blocked(segment_size as u64 * (num_datagrams as u64) + 1)
581 {
582 trace!("blocked by anti-amplification");
583 break;
584 }
585
586 if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
589 let untracked_bytes = if let Some(builder) = &builder_storage {
591 buf_capacity - builder.partial_encode.start
592 } else {
593 0
594 } as u64;
595 debug_assert!(untracked_bytes <= segment_size as u64);
596
597 let bytes_to_send = segment_size as u64 + untracked_bytes;
598 if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
599 space_idx += 1;
600 congestion_blocked = true;
601 trace!("blocked by congestion control");
604 continue;
605 }
606
607 let smoothed_rtt = self.path.rtt.get();
609 if let Some(delay) = self.path.pacing.delay(
610 smoothed_rtt,
611 bytes_to_send,
612 self.path.current_mtu(),
613 self.path.congestion.window(),
614 now,
615 ) {
616 self.timers.set(Timer::Pacing, delay);
617 congestion_blocked = true;
618 trace!("blocked by pacing");
621 break;
622 }
623 }
624
625 if let Some(mut builder) = builder_storage.take() {
627 if pad_datagram {
628 builder.pad_to(MIN_INITIAL_SIZE);
629 }
630
631 if num_datagrams > 1 {
632 const MAX_PADDING: usize = 16;
645 let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
646 - datagram_start
647 + builder.tag_len;
648 if packet_len_unpadded + MAX_PADDING < segment_size
649 || datagram_start + segment_size > buf_capacity
650 {
651 trace!(
652 "GSO truncated by demand for {} padding bytes or loss probe",
653 segment_size - packet_len_unpadded
654 );
655 builder_storage = Some(builder);
656 break;
657 }
658
659 builder.pad_to(segment_size as u16);
662 }
663
664 builder.finish_and_track(now, self, sent_frames.take(), buf);
665
666 if num_datagrams == 1 {
667 segment_size = buf.len();
674 buf_capacity = buf.len();
677
678 if space_id == SpaceId::Data {
685 let frame_space_1rtt =
686 segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));
687 if self.space_can_send(space_id, frame_space_1rtt).is_empty() {
688 break;
689 }
690 }
691 }
692 }
693
694 let next_datagram_size_limit = match self.spaces[space_id].loss_probes {
696 0 => segment_size,
697 _ => {
698 self.spaces[space_id].loss_probes -= 1;
699 std::cmp::min(segment_size, usize::from(INITIAL_MTU))
703 }
704 };
705 buf_capacity += next_datagram_size_limit;
706 if buf.capacity() < buf_capacity {
707 buf.reserve(max_datagrams * segment_size);
716 }
717 num_datagrams += 1;
718 coalesce = true;
719 pad_datagram = false;
720 datagram_start = buf.len();
721
722 debug_assert_eq!(
723 datagram_start % segment_size,
724 0,
725 "datagrams in a GSO batch must be aligned to the segment size"
726 );
727 } else {
728 if let Some(builder) = builder_storage.take() {
732 builder.finish_and_track(now, self, sent_frames.take(), buf);
733 }
734 }
735
736 debug_assert!(buf_capacity - buf.len() >= MIN_PACKET_SPACE);
737
738 if self.spaces[SpaceId::Initial].crypto.is_some()
743 && space_id == SpaceId::Handshake
744 && self.side.is_client()
745 {
746 self.discard_space(now, SpaceId::Initial);
749 }
750 if let Some(ref mut prev) = self.prev_crypto {
751 prev.update_unacked = false;
752 }
753
754 debug_assert!(
755 builder_storage.is_none() && sent_frames.is_none(),
756 "Previous packet must have been finished"
757 );
758
759 let builder = builder_storage.insert(PacketBuilder::new(
760 now,
761 space_id,
762 self.rem_cids.active(),
763 buf,
764 buf_capacity,
765 datagram_start,
766 ack_eliciting,
767 self,
768 )?);
769 coalesce = coalesce && !builder.short_header;
770
771 pad_datagram |=
773 space_id == SpaceId::Initial && (self.side.is_client() || ack_eliciting);
774
775 if close {
776 trace!("sending CONNECTION_CLOSE");
777 if !self.spaces[space_id].pending_acks.ranges().is_empty() {
782 Self::populate_acks(
783 now,
784 self.receiving_ecn,
785 &mut SentFrames::default(),
786 &mut self.spaces[space_id],
787 buf,
788 &mut self.stats,
789 );
790 }
791
792 debug_assert!(
796 buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size,
797 "ACKs should leave space for ConnectionClose"
798 );
799 if buf.len() + frame::ConnectionClose::SIZE_BOUND < builder.max_size {
800 let max_frame_size = builder.max_size - buf.len();
801 match self.state {
802 State::Closed(state::Closed { ref reason }) => {
803 if space_id == SpaceId::Data || reason.is_transport_layer() {
804 reason.encode(buf, max_frame_size)
805 } else {
806 frame::ConnectionClose {
807 error_code: TransportErrorCode::APPLICATION_ERROR,
808 frame_type: None,
809 reason: Bytes::new(),
810 }
811 .encode(buf, max_frame_size)
812 }
813 }
814 State::Draining => frame::ConnectionClose {
815 error_code: TransportErrorCode::NO_ERROR,
816 frame_type: None,
817 reason: Bytes::new(),
818 }
819 .encode(buf, max_frame_size),
820 _ => unreachable!(
821 "tried to make a close packet when the connection wasn't closed"
822 ),
823 }
824 }
825 if space_id == self.highest_space {
826 self.close = false;
828 break;
830 } else {
831 space_idx += 1;
835 continue;
836 }
837 }
838
839 if space_id == SpaceId::Data && num_datagrams == 1 {
842 if let Some((token, remote)) = self.path_responses.pop_off_path(self.path.remote) {
843 let mut builder = builder_storage.take().unwrap();
846 trace!("PATH_RESPONSE {:08x} (off-path)", token);
847 buf.write(frame::FrameType::PATH_RESPONSE);
848 buf.write(token);
849 self.stats.frame_tx.path_response += 1;
850 builder.pad_to(MIN_INITIAL_SIZE);
851 builder.finish_and_track(
852 now,
853 self,
854 Some(SentFrames {
855 non_retransmits: true,
856 ..SentFrames::default()
857 }),
858 buf,
859 );
860 self.stats.udp_tx.on_sent(1, buf.len());
861 return Some(Transmit {
862 destination: remote,
863 size: buf.len(),
864 ecn: None,
865 segment_size: None,
866 src_ip: self.local_ip,
867 });
868 }
869 }
870
871 let sent =
872 self.populate_packet(now, space_id, buf, builder.max_size, builder.exact_number);
873
874 debug_assert!(
881 !(sent.is_ack_only(&self.streams)
882 && !can_send.acks
883 && can_send.other
884 && (buf_capacity - builder.datagram_start) == self.path.current_mtu() as usize
885 && self.datagrams.outgoing.is_empty()),
886 "SendableFrames was {can_send:?}, but only ACKs have been written"
887 );
888 pad_datagram |= sent.requires_padding;
889
890 if sent.largest_acked.is_some() {
891 self.spaces[space_id].pending_acks.acks_sent();
892 self.timers.stop(Timer::MaxAckDelay);
893 }
894
895 sent_frames = Some(sent);
897
898 }
901
902 if let Some(mut builder) = builder_storage {
904 if pad_datagram {
905 builder.pad_to(MIN_INITIAL_SIZE);
906 }
907 let last_packet_number = builder.exact_number;
908 builder.finish_and_track(now, self, sent_frames, buf);
909 self.path
910 .congestion
911 .on_sent(now, buf.len() as u64, last_packet_number);
912 }
913
914 self.app_limited = buf.is_empty() && !congestion_blocked;
915
916 if buf.is_empty() && self.state.is_established() {
918 let space_id = SpaceId::Data;
919 let probe_size = self
920 .path
921 .mtud
922 .poll_transmit(now, self.packet_number_filter.peek(&self.spaces[space_id]))?;
923
924 let buf_capacity = probe_size as usize;
925 buf.reserve(buf_capacity);
926
927 let mut builder = PacketBuilder::new(
928 now,
929 space_id,
930 self.rem_cids.active(),
931 buf,
932 buf_capacity,
933 0,
934 true,
935 self,
936 )?;
937
938 buf.write(frame::FrameType::PING);
940 self.stats.frame_tx.ping += 1;
941
942 if self.peer_supports_ack_frequency() {
944 buf.write(frame::FrameType::IMMEDIATE_ACK);
945 self.stats.frame_tx.immediate_ack += 1;
946 }
947
948 builder.pad_to(probe_size);
949 let sent_frames = SentFrames {
950 non_retransmits: true,
951 ..Default::default()
952 };
953 builder.finish_and_track(now, self, Some(sent_frames), buf);
954
955 self.stats.path.sent_plpmtud_probes += 1;
956 num_datagrams = 1;
957
958 trace!(?probe_size, "writing MTUD probe");
959 }
960
961 if buf.is_empty() {
962 return None;
963 }
964
965 trace!("sending {} bytes in {} datagrams", buf.len(), num_datagrams);
966 self.path.total_sent = self.path.total_sent.saturating_add(buf.len() as u64);
967
968 self.stats.udp_tx.on_sent(num_datagrams as u64, buf.len());
969
970 Some(Transmit {
971 destination: self.path.remote,
972 size: buf.len(),
973 ecn: if self.path.sending_ecn {
974 Some(EcnCodepoint::Ect0)
975 } else {
976 None
977 },
978 segment_size: match num_datagrams {
979 1 => None,
980 _ => Some(segment_size),
981 },
982 src_ip: self.local_ip,
983 })
984 }
985
986 fn send_path_challenge(&mut self, now: Instant, buf: &mut Vec<u8>) -> Option<Transmit> {
988 let (prev_cid, prev_path) = self.prev_path.as_mut()?;
989 if !prev_path.challenge_pending {
990 return None;
991 }
992 prev_path.challenge_pending = false;
993 let token = prev_path
994 .challenge
995 .expect("previous path challenge pending without token");
996 let destination = prev_path.remote;
997 debug_assert_eq!(
998 self.highest_space,
999 SpaceId::Data,
1000 "PATH_CHALLENGE queued without 1-RTT keys"
1001 );
1002 buf.reserve(MIN_INITIAL_SIZE as usize);
1003
1004 let buf_capacity = buf.capacity();
1005
1006 let mut builder = PacketBuilder::new(
1012 now,
1013 SpaceId::Data,
1014 *prev_cid,
1015 buf,
1016 buf_capacity,
1017 0,
1018 false,
1019 self,
1020 )?;
1021 trace!("validating previous path with PATH_CHALLENGE {:08x}", token);
1022 buf.write(frame::FrameType::PATH_CHALLENGE);
1023 buf.write(token);
1024 self.stats.frame_tx.path_challenge += 1;
1025
1026 builder.pad_to(MIN_INITIAL_SIZE);
1031
1032 builder.finish(self, buf);
1033 self.stats.udp_tx.on_sent(1, buf.len());
1034
1035 Some(Transmit {
1036 destination,
1037 size: buf.len(),
1038 ecn: None,
1039 segment_size: None,
1040 src_ip: self.local_ip,
1041 })
1042 }
1043
1044 fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
1046 if self.spaces[space_id].crypto.is_none()
1047 && (space_id != SpaceId::Data
1048 || self.zero_rtt_crypto.is_none()
1049 || self.side.is_server())
1050 {
1051 return SendableFrames::empty();
1053 }
1054 let mut can_send = self.spaces[space_id].can_send(&self.streams);
1055 if space_id == SpaceId::Data {
1056 can_send.other |= self.can_send_1rtt(frame_space_1rtt);
1057 }
1058 can_send
1059 }
1060
1061 pub fn handle_event(&mut self, event: ConnectionEvent) {
1067 use ConnectionEventInner::*;
1068 match event.0 {
1069 Datagram(DatagramConnectionEvent {
1070 now,
1071 remote,
1072 ecn,
1073 first_decode,
1074 remaining,
1075 }) => {
1076 if remote != self.path.remote && !self.side.remote_may_migrate() {
1080 trace!("discarding packet from unrecognized peer {}", remote);
1081 return;
1082 }
1083
1084 let was_anti_amplification_blocked = self.path.anti_amplification_blocked(1);
1085
1086 self.stats.udp_rx.datagrams += 1;
1087 self.stats.udp_rx.bytes += first_decode.len() as u64;
1088 let data_len = first_decode.len();
1089
1090 self.handle_decode(now, remote, ecn, first_decode);
1091 self.path.total_recvd = self.path.total_recvd.saturating_add(data_len as u64);
1096
1097 if let Some(data) = remaining {
1098 self.stats.udp_rx.bytes += data.len() as u64;
1099 self.handle_coalesced(now, remote, ecn, data);
1100 }
1101
1102 if was_anti_amplification_blocked {
1103 self.set_loss_detection_timer(now);
1107 }
1108 }
1109 NewIdentifiers(ids, now) => {
1110 self.local_cid_state.new_cids(&ids, now);
1111 ids.into_iter().rev().for_each(|frame| {
1112 self.spaces[SpaceId::Data].pending.new_cids.push(frame);
1113 });
1114 if self
1116 .timers
1117 .get(Timer::PushNewCid)
1118 .map_or(true, |x| x <= now)
1119 {
1120 self.reset_cid_retirement();
1121 }
1122 }
1123 }
1124 }
1125
1126 pub fn handle_timeout(&mut self, now: Instant) {
1136 for &timer in &Timer::VALUES {
1137 if !self.timers.is_expired(timer, now) {
1138 continue;
1139 }
1140 self.timers.stop(timer);
1141 trace!(timer = ?timer, "timeout");
1142 match timer {
1143 Timer::Close => {
1144 self.state = State::Drained;
1145 self.endpoint_events.push_back(EndpointEventInner::Drained);
1146 }
1147 Timer::Idle => {
1148 self.kill(ConnectionError::TimedOut);
1149 }
1150 Timer::KeepAlive => {
1151 trace!("sending keep-alive");
1152 self.ping();
1153 }
1154 Timer::LossDetection => {
1155 self.on_loss_detection_timeout(now);
1156 }
1157 Timer::KeyDiscard => {
1158 self.zero_rtt_crypto = None;
1159 self.prev_crypto = None;
1160 }
1161 Timer::PathValidation => {
1162 debug!("path validation failed");
1163 if let Some((_, prev)) = self.prev_path.take() {
1164 self.path = prev;
1165 }
1166 self.path.challenge = None;
1167 self.path.challenge_pending = false;
1168 }
1169 Timer::Pacing => trace!("pacing timer expired"),
1170 Timer::PushNewCid => {
1171 let num_new_cid = self.local_cid_state.on_cid_timeout().into();
1173 if !self.state.is_closed() {
1174 trace!(
1175 "push a new cid to peer RETIRE_PRIOR_TO field {}",
1176 self.local_cid_state.retire_prior_to()
1177 );
1178 self.endpoint_events
1179 .push_back(EndpointEventInner::NeedIdentifiers(now, num_new_cid));
1180 }
1181 }
1182 Timer::MaxAckDelay => {
1183 trace!("max ack delay reached");
1184 self.spaces[SpaceId::Data]
1186 .pending_acks
1187 .on_max_ack_delay_timeout()
1188 }
1189 }
1190 }
1191 }
1192
1193 pub fn close(&mut self, now: Instant, error_code: VarInt, reason: Bytes) {
1205 self.close_inner(
1206 now,
1207 Close::Application(frame::ApplicationClose { error_code, reason }),
1208 )
1209 }
1210
1211 fn close_inner(&mut self, now: Instant, reason: Close) {
1212 let was_closed = self.state.is_closed();
1213 if !was_closed {
1214 self.close_common();
1215 self.set_close_timer(now);
1216 self.close = true;
1217 self.state = State::Closed(state::Closed { reason });
1218 }
1219 }
1220
1221 pub fn datagrams(&mut self) -> Datagrams<'_> {
1223 Datagrams { conn: self }
1224 }
1225
1226 pub fn stats(&self) -> ConnectionStats {
1228 let mut stats = self.stats;
1229 stats.path.rtt = self.path.rtt.get();
1230 stats.path.cwnd = self.path.congestion.window();
1231 stats.path.current_mtu = self.path.mtud.current_mtu();
1232
1233 stats
1234 }
1235
1236 pub fn ping(&mut self) {
1240 self.spaces[self.highest_space].ping_pending = true;
1241 }
1242
1243 pub fn force_key_update(&mut self) {
1247 self.update_keys(None, false);
1248 }
1249
1250 #[doc(hidden)]
1252 #[deprecated]
1253 pub fn initiate_key_update(&mut self) {
1254 self.force_key_update();
1255 }
1256
1257 pub fn crypto_session(&self) -> &dyn crypto::Session {
1259 &*self.crypto
1260 }
1261
1262 pub fn is_handshaking(&self) -> bool {
1267 self.state.is_handshake()
1268 }
1269
1270 pub fn is_closed(&self) -> bool {
1278 self.state.is_closed()
1279 }
1280
1281 pub fn is_drained(&self) -> bool {
1286 self.state.is_drained()
1287 }
1288
1289 pub fn accepted_0rtt(&self) -> bool {
1293 self.accepted_0rtt
1294 }
1295
1296 pub fn has_0rtt(&self) -> bool {
1298 self.zero_rtt_enabled
1299 }
1300
1301 pub fn has_pending_retransmits(&self) -> bool {
1303 !self.spaces[SpaceId::Data].pending.is_empty(&self.streams)
1304 }
1305
1306 pub fn side(&self) -> Side {
1308 self.side.side()
1309 }
1310
1311 pub fn remote_address(&self) -> SocketAddr {
1313 self.path.remote
1314 }
1315
1316 pub fn local_ip(&self) -> Option<IpAddr> {
1326 self.local_ip
1327 }
1328
1329 pub fn rtt(&self) -> Duration {
1331 self.path.rtt.get()
1332 }
1333
1334 pub fn congestion_state(&self) -> &dyn Controller {
1336 self.path.congestion.as_ref()
1337 }
1338
1339 pub fn path_changed(&mut self, now: Instant) {
1350 self.path.reset(now, &self.config);
1351 }
1352
1353 pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1358 self.streams.set_max_concurrent(dir, count);
1359 let pending = &mut self.spaces[SpaceId::Data].pending;
1362 self.streams.queue_max_stream_id(pending);
1363 }
1364
1365 pub fn max_concurrent_streams(&self, dir: Dir) -> u64 {
1371 self.streams.max_concurrent(dir)
1372 }
1373
1374 pub fn set_receive_window(&mut self, receive_window: VarInt) {
1376 if self.streams.set_receive_window(receive_window) {
1377 self.spaces[SpaceId::Data].pending.max_data = true;
1378 }
1379 }
1380
1381 fn on_ack_received(
1382 &mut self,
1383 now: Instant,
1384 space: SpaceId,
1385 ack: frame::Ack,
1386 ) -> Result<(), TransportError> {
1387 if ack.largest >= self.spaces[space].next_packet_number {
1388 return Err(TransportError::PROTOCOL_VIOLATION("unsent packet acked"));
1389 }
1390 let new_largest = {
1391 let space = &mut self.spaces[space];
1392 if space
1393 .largest_acked_packet
1394 .map_or(true, |pn| ack.largest > pn)
1395 {
1396 space.largest_acked_packet = Some(ack.largest);
1397 if let Some(info) = space.sent_packets.get(&ack.largest) {
1398 space.largest_acked_packet_sent = info.time_sent;
1402 }
1403 true
1404 } else {
1405 false
1406 }
1407 };
1408
1409 let mut newly_acked = ArrayRangeSet::new();
1411 for range in ack.iter() {
1412 self.packet_number_filter.check_ack(space, range.clone())?;
1413 for (&pn, _) in self.spaces[space].sent_packets.range(range) {
1414 newly_acked.insert_one(pn);
1415 }
1416 }
1417
1418 if newly_acked.is_empty() {
1419 return Ok(());
1420 }
1421
1422 let mut ack_eliciting_acked = false;
1423 for packet in newly_acked.elts() {
1424 if let Some(info) = self.spaces[space].take(packet) {
1425 if let Some(acked) = info.largest_acked {
1426 self.spaces[space].pending_acks.subtract_below(acked);
1432 }
1433 ack_eliciting_acked |= info.ack_eliciting;
1434
1435 let mtu_updated = self.path.mtud.on_acked(space, packet, info.size);
1437 if mtu_updated {
1438 self.path
1439 .congestion
1440 .on_mtu_update(self.path.mtud.current_mtu());
1441 }
1442
1443 self.ack_frequency.on_acked(packet);
1445
1446 self.on_packet_acked(now, packet, info);
1447 }
1448 }
1449
1450 self.path.congestion.on_end_acks(
1451 now,
1452 self.path.in_flight.bytes,
1453 self.app_limited,
1454 self.spaces[space].largest_acked_packet,
1455 );
1456
1457 if new_largest && ack_eliciting_acked {
1458 let ack_delay = if space != SpaceId::Data {
1459 Duration::from_micros(0)
1460 } else {
1461 cmp::min(
1462 self.ack_frequency.peer_max_ack_delay,
1463 Duration::from_micros(ack.delay << self.peer_params.ack_delay_exponent.0),
1464 )
1465 };
1466 let rtt = instant_saturating_sub(now, self.spaces[space].largest_acked_packet_sent);
1467 self.path.rtt.update(ack_delay, rtt);
1468 if self.path.first_packet_after_rtt_sample.is_none() {
1469 self.path.first_packet_after_rtt_sample =
1470 Some((space, self.spaces[space].next_packet_number));
1471 }
1472 }
1473
1474 self.detect_lost_packets(now, space, true);
1476
1477 if self.peer_completed_address_validation() {
1478 self.pto_count = 0;
1479 }
1480
1481 if self.path.sending_ecn {
1483 if let Some(ecn) = ack.ecn {
1484 if new_largest {
1489 let sent = self.spaces[space].largest_acked_packet_sent;
1490 self.process_ecn(now, space, newly_acked.len() as u64, ecn, sent);
1491 }
1492 } else {
1493 debug!("ECN not acknowledged by peer");
1495 self.path.sending_ecn = false;
1496 }
1497 }
1498
1499 self.set_loss_detection_timer(now);
1500 Ok(())
1501 }
1502
1503 fn process_ecn(
1505 &mut self,
1506 now: Instant,
1507 space: SpaceId,
1508 newly_acked: u64,
1509 ecn: frame::EcnCounts,
1510 largest_sent_time: Instant,
1511 ) {
1512 match self.spaces[space].detect_ecn(newly_acked, ecn) {
1513 Err(e) => {
1514 debug!("halting ECN due to verification failure: {}", e);
1515 self.path.sending_ecn = false;
1516 self.spaces[space].ecn_feedback = frame::EcnCounts::ZERO;
1519 }
1520 Ok(false) => {}
1521 Ok(true) => {
1522 self.stats.path.congestion_events += 1;
1523 self.path
1524 .congestion
1525 .on_congestion_event(now, largest_sent_time, false, 0);
1526 }
1527 }
1528 }
1529
1530 fn on_packet_acked(&mut self, now: Instant, pn: u64, info: SentPacket) {
1533 self.remove_in_flight(pn, &info);
1534 if info.ack_eliciting && self.path.challenge.is_none() {
1535 self.path.congestion.on_ack(
1538 now,
1539 info.time_sent,
1540 info.size.into(),
1541 self.app_limited,
1542 &self.path.rtt,
1543 );
1544 }
1545
1546 if let Some(retransmits) = info.retransmits.get() {
1548 for (id, _) in retransmits.reset_stream.iter() {
1549 self.streams.reset_acked(*id);
1550 }
1551 }
1552
1553 for frame in info.stream_frames {
1554 self.streams.received_ack_of(frame);
1555 }
1556 }
1557
1558 fn set_key_discard_timer(&mut self, now: Instant, space: SpaceId) {
1559 let start = if self.zero_rtt_crypto.is_some() {
1560 now
1561 } else {
1562 self.prev_crypto
1563 .as_ref()
1564 .expect("no previous keys")
1565 .end_packet
1566 .as_ref()
1567 .expect("update not acknowledged yet")
1568 .1
1569 };
1570 self.timers
1571 .set(Timer::KeyDiscard, start + self.pto(space) * 3);
1572 }
1573
1574 fn on_loss_detection_timeout(&mut self, now: Instant) {
1575 if let Some((_, pn_space)) = self.loss_time_and_space() {
1576 self.detect_lost_packets(now, pn_space, false);
1578 self.set_loss_detection_timer(now);
1579 return;
1580 }
1581
1582 let (_, space) = match self.pto_time_and_space(now) {
1583 Some(x) => x,
1584 None => {
1585 error!("PTO expired while unset");
1586 return;
1587 }
1588 };
1589 trace!(
1590 in_flight = self.path.in_flight.bytes,
1591 count = self.pto_count,
1592 ?space,
1593 "PTO fired"
1594 );
1595
1596 let count = match self.path.in_flight.ack_eliciting {
1597 0 => {
1600 debug_assert!(!self.peer_completed_address_validation());
1601 1
1602 }
1603 _ => 2,
1605 };
1606 self.spaces[space].loss_probes = self.spaces[space].loss_probes.saturating_add(count);
1607 self.pto_count = self.pto_count.saturating_add(1);
1608 self.set_loss_detection_timer(now);
1609 }
1610
1611 fn detect_lost_packets(&mut self, now: Instant, pn_space: SpaceId, due_to_ack: bool) {
1612 let mut lost_packets = Vec::<u64>::new();
1613 let mut lost_mtu_probe = None;
1614 let in_flight_mtu_probe = self.path.mtud.in_flight_mtu_probe();
1615 let rtt = self.path.rtt.conservative();
1616 let loss_delay = cmp::max(rtt.mul_f32(self.config.time_threshold), TIMER_GRANULARITY);
1617
1618 let lost_send_time = now.checked_sub(loss_delay).unwrap();
1620 let largest_acked_packet = self.spaces[pn_space].largest_acked_packet.unwrap();
1621 let packet_threshold = self.config.packet_threshold as u64;
1622 let mut size_of_lost_packets = 0u64;
1623
1624 let congestion_period =
1628 self.pto(SpaceId::Data) * self.config.persistent_congestion_threshold;
1629 let mut persistent_congestion_start: Option<Instant> = None;
1630 let mut prev_packet = None;
1631 let mut in_persistent_congestion = false;
1632
1633 let space = &mut self.spaces[pn_space];
1634 space.loss_time = None;
1635
1636 for (&packet, info) in space.sent_packets.range(0..largest_acked_packet) {
1637 if prev_packet != Some(packet.wrapping_sub(1)) {
1638 persistent_congestion_start = None;
1640 }
1641
1642 if info.time_sent <= lost_send_time || largest_acked_packet >= packet + packet_threshold
1643 {
1644 if Some(packet) == in_flight_mtu_probe {
1645 lost_mtu_probe = in_flight_mtu_probe;
1648 } else {
1649 lost_packets.push(packet);
1650 size_of_lost_packets += info.size as u64;
1651 if info.ack_eliciting && due_to_ack {
1652 match persistent_congestion_start {
1653 Some(start) if info.time_sent - start > congestion_period => {
1656 in_persistent_congestion = true;
1657 }
1658 None if self
1660 .path
1661 .first_packet_after_rtt_sample
1662 .is_some_and(|x| x < (pn_space, packet)) =>
1663 {
1664 persistent_congestion_start = Some(info.time_sent);
1665 }
1666 _ => {}
1667 }
1668 }
1669 }
1670 } else {
1671 let next_loss_time = info.time_sent + loss_delay;
1672 space.loss_time = Some(
1673 space
1674 .loss_time
1675 .map_or(next_loss_time, |x| cmp::min(x, next_loss_time)),
1676 );
1677 persistent_congestion_start = None;
1678 }
1679
1680 prev_packet = Some(packet);
1681 }
1682
1683 if let Some(largest_lost) = lost_packets.last().cloned() {
1685 let old_bytes_in_flight = self.path.in_flight.bytes;
1686 let largest_lost_sent = self.spaces[pn_space].sent_packets[&largest_lost].time_sent;
1687 self.lost_packets += lost_packets.len() as u64;
1688 self.stats.path.lost_packets += lost_packets.len() as u64;
1689 self.stats.path.lost_bytes += size_of_lost_packets;
1690 trace!(
1691 "packets lost: {:?}, bytes lost: {}",
1692 lost_packets, size_of_lost_packets
1693 );
1694
1695 for &packet in &lost_packets {
1696 let info = self.spaces[pn_space].take(packet).unwrap(); self.remove_in_flight(packet, &info);
1698 for frame in info.stream_frames {
1699 self.streams.retransmit(frame);
1700 }
1701 self.spaces[pn_space].pending |= info.retransmits;
1702 self.path.mtud.on_non_probe_lost(packet, info.size);
1703 }
1704
1705 if self.path.mtud.black_hole_detected(now) {
1706 self.stats.path.black_holes_detected += 1;
1707 self.path
1708 .congestion
1709 .on_mtu_update(self.path.mtud.current_mtu());
1710 if let Some(max_datagram_size) = self.datagrams().max_size() {
1711 self.datagrams.drop_oversized(max_datagram_size);
1712 }
1713 }
1714
1715 let lost_ack_eliciting = old_bytes_in_flight != self.path.in_flight.bytes;
1717
1718 if lost_ack_eliciting {
1719 self.stats.path.congestion_events += 1;
1720 self.path.congestion.on_congestion_event(
1721 now,
1722 largest_lost_sent,
1723 in_persistent_congestion,
1724 size_of_lost_packets,
1725 );
1726 }
1727 }
1728
1729 if let Some(packet) = lost_mtu_probe {
1731 let info = self.spaces[SpaceId::Data].take(packet).unwrap(); self.remove_in_flight(packet, &info);
1733 self.path.mtud.on_probe_lost();
1734 self.stats.path.lost_plpmtud_probes += 1;
1735 }
1736 }
1737
1738 fn loss_time_and_space(&self) -> Option<(Instant, SpaceId)> {
1739 SpaceId::iter()
1740 .filter_map(|id| Some((self.spaces[id].loss_time?, id)))
1741 .min_by_key(|&(time, _)| time)
1742 }
1743
1744 fn pto_time_and_space(&self, now: Instant) -> Option<(Instant, SpaceId)> {
1745 let backoff = 2u32.pow(self.pto_count.min(MAX_BACKOFF_EXPONENT));
1746 let mut duration = self.path.rtt.pto_base() * backoff;
1747
1748 if self.path.in_flight.ack_eliciting == 0 {
1749 debug_assert!(!self.peer_completed_address_validation());
1750 let space = match self.highest_space {
1751 SpaceId::Handshake => SpaceId::Handshake,
1752 _ => SpaceId::Initial,
1753 };
1754 return Some((now + duration, space));
1755 }
1756
1757 let mut result = None;
1758 for space in SpaceId::iter() {
1759 if self.spaces[space].in_flight == 0 {
1760 continue;
1761 }
1762 if space == SpaceId::Data {
1763 if self.is_handshaking() {
1765 return result;
1766 }
1767 duration += self.ack_frequency.max_ack_delay_for_pto() * backoff;
1769 }
1770 let last_ack_eliciting = match self.spaces[space].time_of_last_ack_eliciting_packet {
1771 Some(time) => time,
1772 None => continue,
1773 };
1774 let pto = last_ack_eliciting + duration;
1775 if result.map_or(true, |(earliest_pto, _)| pto < earliest_pto) {
1776 result = Some((pto, space));
1777 }
1778 }
1779 result
1780 }
1781
1782 fn peer_completed_address_validation(&self) -> bool {
1783 if self.side.is_server() || self.state.is_closed() {
1784 return true;
1785 }
1786 self.spaces[SpaceId::Handshake]
1789 .largest_acked_packet
1790 .is_some()
1791 || self.spaces[SpaceId::Data].largest_acked_packet.is_some()
1792 || (self.spaces[SpaceId::Data].crypto.is_some()
1793 && self.spaces[SpaceId::Handshake].crypto.is_none())
1794 }
1795
1796 fn set_loss_detection_timer(&mut self, now: Instant) {
1797 if self.state.is_closed() {
1798 return;
1802 }
1803
1804 if let Some((loss_time, _)) = self.loss_time_and_space() {
1805 self.timers.set(Timer::LossDetection, loss_time);
1807 return;
1808 }
1809
1810 if self.path.anti_amplification_blocked(1) {
1811 self.timers.stop(Timer::LossDetection);
1813 return;
1814 }
1815
1816 if self.path.in_flight.ack_eliciting == 0 && self.peer_completed_address_validation() {
1817 self.timers.stop(Timer::LossDetection);
1820 return;
1821 }
1822
1823 if let Some((timeout, _)) = self.pto_time_and_space(now) {
1826 self.timers.set(Timer::LossDetection, timeout);
1827 } else {
1828 self.timers.stop(Timer::LossDetection);
1829 }
1830 }
1831
1832 fn pto(&self, space: SpaceId) -> Duration {
1834 let max_ack_delay = match space {
1835 SpaceId::Initial | SpaceId::Handshake => Duration::ZERO,
1836 SpaceId::Data => self.ack_frequency.max_ack_delay_for_pto(),
1837 };
1838 self.path.rtt.pto_base() + max_ack_delay
1839 }
1840
1841 fn on_packet_authenticated(
1842 &mut self,
1843 now: Instant,
1844 space_id: SpaceId,
1845 ecn: Option<EcnCodepoint>,
1846 packet: Option<u64>,
1847 spin: bool,
1848 is_1rtt: bool,
1849 ) {
1850 self.total_authed_packets += 1;
1851 self.reset_keep_alive(now);
1852 self.reset_idle_timeout(now, space_id);
1853 self.permit_idle_reset = true;
1854 self.receiving_ecn |= ecn.is_some();
1855 if let Some(x) = ecn {
1856 let space = &mut self.spaces[space_id];
1857 space.ecn_counters += x;
1858
1859 if x.is_ce() {
1860 space.pending_acks.set_immediate_ack_required();
1861 }
1862 }
1863
1864 let packet = match packet {
1865 Some(x) => x,
1866 None => return,
1867 };
1868 if self.side.is_server() {
1869 if self.spaces[SpaceId::Initial].crypto.is_some() && space_id == SpaceId::Handshake {
1870 self.discard_space(now, SpaceId::Initial);
1872 }
1873 if self.zero_rtt_crypto.is_some() && is_1rtt {
1874 self.set_key_discard_timer(now, space_id)
1876 }
1877 }
1878 let space = &mut self.spaces[space_id];
1879 space.pending_acks.insert_one(packet, now);
1880 if packet >= space.rx_packet {
1881 space.rx_packet = packet;
1882 self.spin = self.side.is_client() ^ spin;
1884 }
1885 }
1886
1887 fn reset_idle_timeout(&mut self, now: Instant, space: SpaceId) {
1888 let timeout = match self.idle_timeout {
1889 None => return,
1890 Some(dur) => dur,
1891 };
1892 if self.state.is_closed() {
1893 self.timers.stop(Timer::Idle);
1894 return;
1895 }
1896 let dt = cmp::max(timeout, 3 * self.pto(space));
1897 self.timers.set(Timer::Idle, now + dt);
1898 }
1899
1900 fn reset_keep_alive(&mut self, now: Instant) {
1901 let interval = match self.config.keep_alive_interval {
1902 Some(x) if self.state.is_established() => x,
1903 _ => return,
1904 };
1905 self.timers.set(Timer::KeepAlive, now + interval);
1906 }
1907
1908 fn reset_cid_retirement(&mut self) {
1909 if let Some(t) = self.local_cid_state.next_timeout() {
1910 self.timers.set(Timer::PushNewCid, t);
1911 }
1912 }
1913
1914 pub(crate) fn handle_first_packet(
1919 &mut self,
1920 now: Instant,
1921 remote: SocketAddr,
1922 ecn: Option<EcnCodepoint>,
1923 packet_number: u64,
1924 packet: InitialPacket,
1925 remaining: Option<BytesMut>,
1926 ) -> Result<(), ConnectionError> {
1927 let span = trace_span!("first recv");
1928 let _guard = span.enter();
1929 debug_assert!(self.side.is_server());
1930 let len = packet.header_data.len() + packet.payload.len();
1931 self.path.total_recvd = len as u64;
1932
1933 match self.state {
1934 State::Handshake(ref mut state) => {
1935 state.expected_token = packet.header.token.clone();
1936 }
1937 _ => unreachable!("first packet must be delivered in Handshake state"),
1938 }
1939
1940 self.on_packet_authenticated(
1941 now,
1942 SpaceId::Initial,
1943 ecn,
1944 Some(packet_number),
1945 false,
1946 false,
1947 );
1948
1949 self.process_decrypted_packet(now, remote, Some(packet_number), packet.into())?;
1950 if let Some(data) = remaining {
1951 self.handle_coalesced(now, remote, ecn, data);
1952 }
1953 Ok(())
1954 }
1955
1956 fn init_0rtt(&mut self) {
1957 let (header, packet) = match self.crypto.early_crypto() {
1958 Some(x) => x,
1959 None => return,
1960 };
1961 if self.side.is_client() {
1962 match self.crypto.transport_parameters() {
1963 Ok(params) => {
1964 let params = params
1965 .expect("crypto layer didn't supply transport parameters with ticket");
1966 let params = TransportParameters {
1968 initial_src_cid: None,
1969 original_dst_cid: None,
1970 preferred_address: None,
1971 retry_src_cid: None,
1972 stateless_reset_token: None,
1973 min_ack_delay: None,
1974 ack_delay_exponent: TransportParameters::default().ack_delay_exponent,
1975 max_ack_delay: TransportParameters::default().max_ack_delay,
1976 ..params
1977 };
1978 self.set_peer_params(params);
1979 }
1980 Err(e) => {
1981 error!("session ticket has malformed transport parameters: {}", e);
1982 return;
1983 }
1984 }
1985 }
1986 trace!("0-RTT enabled");
1987 self.zero_rtt_enabled = true;
1988 self.zero_rtt_crypto = Some(ZeroRttCrypto { header, packet });
1989 }
1990
1991 fn read_crypto(
1992 &mut self,
1993 space: SpaceId,
1994 crypto: &frame::Crypto,
1995 payload_len: usize,
1996 ) -> Result<(), TransportError> {
1997 let expected = if !self.state.is_handshake() {
1998 SpaceId::Data
1999 } else if self.highest_space == SpaceId::Initial {
2000 SpaceId::Initial
2001 } else {
2002 SpaceId::Handshake
2005 };
2006 debug_assert!(space <= expected, "received out-of-order CRYPTO data");
2010
2011 let end = crypto.offset + crypto.data.len() as u64;
2012 if space < expected && end > self.spaces[space].crypto_stream.bytes_read() {
2013 warn!(
2014 "received new {:?} CRYPTO data when expecting {:?}",
2015 space, expected
2016 );
2017 return Err(TransportError::PROTOCOL_VIOLATION(
2018 "new data at unexpected encryption level",
2019 ));
2020 }
2021
2022 let space = &mut self.spaces[space];
2023 let max = end.saturating_sub(space.crypto_stream.bytes_read());
2024 if max > self.config.crypto_buffer_size as u64 {
2025 return Err(TransportError::CRYPTO_BUFFER_EXCEEDED(""));
2026 }
2027
2028 space
2029 .crypto_stream
2030 .insert(crypto.offset, crypto.data.clone(), payload_len);
2031 while let Some(chunk) = space.crypto_stream.read(usize::MAX, true) {
2032 trace!("consumed {} CRYPTO bytes", chunk.bytes.len());
2033 if self.crypto.read_handshake(&chunk.bytes)? {
2034 self.events.push_back(Event::HandshakeDataReady);
2035 }
2036 }
2037
2038 Ok(())
2039 }
2040
2041 fn write_crypto(&mut self) {
2042 loop {
2043 let space = self.highest_space;
2044 let mut outgoing = Vec::new();
2045 if let Some(crypto) = self.crypto.write_handshake(&mut outgoing) {
2046 match space {
2047 SpaceId::Initial => {
2048 self.upgrade_crypto(SpaceId::Handshake, crypto);
2049 }
2050 SpaceId::Handshake => {
2051 self.upgrade_crypto(SpaceId::Data, crypto);
2052 }
2053 _ => unreachable!("got updated secrets during 1-RTT"),
2054 }
2055 }
2056 if outgoing.is_empty() {
2057 if space == self.highest_space {
2058 break;
2059 } else {
2060 continue;
2062 }
2063 }
2064 let offset = self.spaces[space].crypto_offset;
2065 let outgoing = Bytes::from(outgoing);
2066 if let State::Handshake(ref mut state) = self.state {
2067 if space == SpaceId::Initial && offset == 0 && self.side.is_client() {
2068 state.client_hello = Some(outgoing.clone());
2069 }
2070 }
2071 self.spaces[space].crypto_offset += outgoing.len() as u64;
2072 trace!("wrote {} {:?} CRYPTO bytes", outgoing.len(), space);
2073 self.spaces[space].pending.crypto.push_back(frame::Crypto {
2074 offset,
2075 data: outgoing,
2076 });
2077 }
2078 }
2079
2080 fn upgrade_crypto(&mut self, space: SpaceId, crypto: Keys) {
2082 debug_assert!(
2083 self.spaces[space].crypto.is_none(),
2084 "already reached packet space {space:?}"
2085 );
2086 trace!("{:?} keys ready", space);
2087 if space == SpaceId::Data {
2088 self.next_crypto = Some(
2090 self.crypto
2091 .next_1rtt_keys()
2092 .expect("handshake should be complete"),
2093 );
2094 }
2095
2096 self.spaces[space].crypto = Some(crypto);
2097 debug_assert!(space as usize > self.highest_space as usize);
2098 self.highest_space = space;
2099 if space == SpaceId::Data && self.side.is_client() {
2100 self.zero_rtt_crypto = None;
2102 }
2103 }
2104
2105 fn discard_space(&mut self, now: Instant, space_id: SpaceId) {
2106 debug_assert!(space_id != SpaceId::Data);
2107 trace!("discarding {:?} keys", space_id);
2108 if space_id == SpaceId::Initial {
2109 if let ConnectionSide::Client { token, .. } = &mut self.side {
2111 *token = Bytes::new();
2112 }
2113 }
2114 let space = &mut self.spaces[space_id];
2115 space.crypto = None;
2116 space.time_of_last_ack_eliciting_packet = None;
2117 space.loss_time = None;
2118 space.in_flight = 0;
2119 let sent_packets = mem::take(&mut space.sent_packets);
2120 for (pn, packet) in sent_packets.into_iter() {
2121 self.remove_in_flight(pn, &packet);
2122 }
2123 self.set_loss_detection_timer(now)
2124 }
2125
2126 fn handle_coalesced(
2127 &mut self,
2128 now: Instant,
2129 remote: SocketAddr,
2130 ecn: Option<EcnCodepoint>,
2131 data: BytesMut,
2132 ) {
2133 self.path.total_recvd = self.path.total_recvd.saturating_add(data.len() as u64);
2134 let mut remaining = Some(data);
2135 while let Some(data) = remaining {
2136 match PartialDecode::new(
2137 data,
2138 &FixedLengthConnectionIdParser::new(self.local_cid_state.cid_len()),
2139 &[self.version],
2140 self.endpoint_config.grease_quic_bit,
2141 ) {
2142 Ok((partial_decode, rest)) => {
2143 remaining = rest;
2144 self.handle_decode(now, remote, ecn, partial_decode);
2145 }
2146 Err(e) => {
2147 trace!("malformed header: {}", e);
2148 return;
2149 }
2150 }
2151 }
2152 }
2153
2154 fn handle_decode(
2155 &mut self,
2156 now: Instant,
2157 remote: SocketAddr,
2158 ecn: Option<EcnCodepoint>,
2159 partial_decode: PartialDecode,
2160 ) {
2161 if let Some(decoded) = packet_crypto::unprotect_header(
2162 partial_decode,
2163 &self.spaces,
2164 self.zero_rtt_crypto.as_ref(),
2165 self.peer_params.stateless_reset_token,
2166 ) {
2167 self.handle_packet(now, remote, ecn, decoded.packet, decoded.stateless_reset);
2168 }
2169 }
2170
2171 fn handle_packet(
2172 &mut self,
2173 now: Instant,
2174 remote: SocketAddr,
2175 ecn: Option<EcnCodepoint>,
2176 packet: Option<Packet>,
2177 stateless_reset: bool,
2178 ) {
2179 self.stats.udp_rx.ios += 1;
2180 if let Some(ref packet) = packet {
2181 trace!(
2182 "got {:?} packet ({} bytes) from {} using id {}",
2183 packet.header.space(),
2184 packet.payload.len() + packet.header_data.len(),
2185 remote,
2186 packet.header.dst_cid(),
2187 );
2188 }
2189
2190 if self.is_handshaking() && remote != self.path.remote {
2191 debug!("discarding packet with unexpected remote during handshake");
2192 return;
2193 }
2194
2195 let was_closed = self.state.is_closed();
2196 let was_drained = self.state.is_drained();
2197
2198 let decrypted = match packet {
2199 None => Err(None),
2200 Some(mut packet) => self
2201 .decrypt_packet(now, &mut packet)
2202 .map(move |number| (packet, number)),
2203 };
2204 let result = match decrypted {
2205 _ if stateless_reset => {
2206 debug!("got stateless reset");
2207 Err(ConnectionError::Reset)
2208 }
2209 Err(Some(e)) => {
2210 warn!("illegal packet: {}", e);
2211 Err(e.into())
2212 }
2213 Err(None) => {
2214 debug!("failed to authenticate packet");
2215 self.authentication_failures += 1;
2216 let integrity_limit = self.spaces[self.highest_space]
2217 .crypto
2218 .as_ref()
2219 .unwrap()
2220 .packet
2221 .local
2222 .integrity_limit();
2223 if self.authentication_failures > integrity_limit {
2224 Err(TransportError::AEAD_LIMIT_REACHED("integrity limit violated").into())
2225 } else {
2226 return;
2227 }
2228 }
2229 Ok((packet, number)) => {
2230 let span = match number {
2231 Some(pn) => trace_span!("recv", space = ?packet.header.space(), pn),
2232 None => trace_span!("recv", space = ?packet.header.space()),
2233 };
2234 let _guard = span.enter();
2235
2236 let is_duplicate = |n| self.spaces[packet.header.space()].dedup.insert(n);
2237 if number.is_some_and(is_duplicate) {
2238 debug!("discarding possible duplicate packet");
2239 return;
2240 } else if self.state.is_handshake() && packet.header.is_short() {
2241 trace!("dropping short packet during handshake");
2243 return;
2244 } else {
2245 if let Header::Initial(InitialHeader { ref token, .. }) = packet.header {
2246 if let State::Handshake(ref hs) = self.state {
2247 if self.side.is_server() && token != &hs.expected_token {
2248 warn!("discarding Initial with invalid retry token");
2252 return;
2253 }
2254 }
2255 }
2256
2257 if !self.state.is_closed() {
2258 let spin = match packet.header {
2259 Header::Short { spin, .. } => spin,
2260 _ => false,
2261 };
2262 self.on_packet_authenticated(
2263 now,
2264 packet.header.space(),
2265 ecn,
2266 number,
2267 spin,
2268 packet.header.is_1rtt(),
2269 );
2270 }
2271 self.process_decrypted_packet(now, remote, number, packet)
2272 }
2273 }
2274 };
2275
2276 if let Err(conn_err) = result {
2278 self.error = Some(conn_err.clone());
2279 self.state = match conn_err {
2280 ConnectionError::ApplicationClosed(reason) => State::closed(reason),
2281 ConnectionError::ConnectionClosed(reason) => State::closed(reason),
2282 ConnectionError::Reset
2283 | ConnectionError::TransportError(TransportError {
2284 code: TransportErrorCode::AEAD_LIMIT_REACHED,
2285 ..
2286 }) => State::Drained,
2287 ConnectionError::TimedOut => {
2288 unreachable!("timeouts aren't generated by packet processing");
2289 }
2290 ConnectionError::TransportError(err) => {
2291 debug!("closing connection due to transport error: {}", err);
2292 State::closed(err)
2293 }
2294 ConnectionError::VersionMismatch => State::Draining,
2295 ConnectionError::LocallyClosed => {
2296 unreachable!("LocallyClosed isn't generated by packet processing");
2297 }
2298 ConnectionError::CidsExhausted => {
2299 unreachable!("CidsExhausted isn't generated by packet processing");
2300 }
2301 };
2302 }
2303
2304 if !was_closed && self.state.is_closed() {
2305 self.close_common();
2306 if !self.state.is_drained() {
2307 self.set_close_timer(now);
2308 }
2309 }
2310 if !was_drained && self.state.is_drained() {
2311 self.endpoint_events.push_back(EndpointEventInner::Drained);
2312 self.timers.stop(Timer::Close);
2315 }
2316
2317 if let State::Closed(_) = self.state {
2319 self.close = remote == self.path.remote;
2320 }
2321 }
2322
2323 fn process_decrypted_packet(
2324 &mut self,
2325 now: Instant,
2326 remote: SocketAddr,
2327 number: Option<u64>,
2328 packet: Packet,
2329 ) -> Result<(), ConnectionError> {
2330 let state = match self.state {
2331 State::Established => {
2332 match packet.header.space() {
2333 SpaceId::Data => self.process_payload(now, remote, number.unwrap(), packet)?,
2334 _ if packet.header.has_frames() => self.process_early_payload(now, packet)?,
2335 _ => {
2336 trace!("discarding unexpected pre-handshake packet");
2337 }
2338 }
2339 return Ok(());
2340 }
2341 State::Closed(_) => {
2342 for result in frame::Iter::new(packet.payload.freeze())? {
2343 let frame = match result {
2344 Ok(frame) => frame,
2345 Err(err) => {
2346 debug!("frame decoding error: {err:?}");
2347 continue;
2348 }
2349 };
2350
2351 if let Frame::Padding = frame {
2352 continue;
2353 };
2354
2355 self.stats.frame_rx.record(&frame);
2356
2357 if let Frame::Close(_) = frame {
2358 trace!("draining");
2359 self.state = State::Draining;
2360 break;
2361 }
2362 }
2363 return Ok(());
2364 }
2365 State::Draining | State::Drained => return Ok(()),
2366 State::Handshake(ref mut state) => state,
2367 };
2368
2369 match packet.header {
2370 Header::Retry {
2371 src_cid: rem_cid, ..
2372 } => {
2373 if self.side.is_server() {
2374 return Err(TransportError::PROTOCOL_VIOLATION("client sent Retry").into());
2375 }
2376
2377 if self.total_authed_packets > 1
2378 || packet.payload.len() <= 16 || !self.crypto.is_valid_retry(
2380 &self.rem_cids.active(),
2381 &packet.header_data,
2382 &packet.payload,
2383 )
2384 {
2385 trace!("discarding invalid Retry");
2386 return Ok(());
2394 }
2395
2396 trace!("retrying with CID {}", rem_cid);
2397 let client_hello = state.client_hello.take().unwrap();
2398 self.retry_src_cid = Some(rem_cid);
2399 self.rem_cids.update_initial_cid(rem_cid);
2400 self.rem_handshake_cid = rem_cid;
2401
2402 let space = &mut self.spaces[SpaceId::Initial];
2403 if let Some(info) = space.take(0) {
2404 self.on_packet_acked(now, 0, info);
2405 };
2406
2407 self.discard_space(now, SpaceId::Initial); self.spaces[SpaceId::Initial] = PacketSpace {
2409 crypto: Some(self.crypto.initial_keys(&rem_cid, self.side.side())),
2410 next_packet_number: self.spaces[SpaceId::Initial].next_packet_number,
2411 crypto_offset: client_hello.len() as u64,
2412 ..PacketSpace::new(now)
2413 };
2414 self.spaces[SpaceId::Initial]
2415 .pending
2416 .crypto
2417 .push_back(frame::Crypto {
2418 offset: 0,
2419 data: client_hello,
2420 });
2421
2422 let zero_rtt = mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2424 for (pn, info) in zero_rtt {
2425 self.remove_in_flight(pn, &info);
2426 self.spaces[SpaceId::Data].pending |= info.retransmits;
2427 }
2428 self.streams.retransmit_all_for_0rtt();
2429
2430 let token_len = packet.payload.len() - 16;
2431 let ConnectionSide::Client { ref mut token, .. } = self.side else {
2432 unreachable!("we already short-circuited if we're server");
2433 };
2434 *token = packet.payload.freeze().split_to(token_len);
2435 self.state = State::Handshake(state::Handshake {
2436 expected_token: Bytes::new(),
2437 rem_cid_set: false,
2438 client_hello: None,
2439 });
2440 Ok(())
2441 }
2442 Header::Long {
2443 ty: LongType::Handshake,
2444 src_cid: rem_cid,
2445 ..
2446 } => {
2447 if rem_cid != self.rem_handshake_cid {
2448 debug!(
2449 "discarding packet with mismatched remote CID: {} != {}",
2450 self.rem_handshake_cid, rem_cid
2451 );
2452 return Ok(());
2453 }
2454 self.on_path_validated();
2455
2456 self.process_early_payload(now, packet)?;
2457 if self.state.is_closed() {
2458 return Ok(());
2459 }
2460
2461 if self.crypto.is_handshaking() {
2462 trace!("handshake ongoing");
2463 return Ok(());
2464 }
2465
2466 if self.side.is_client() {
2467 let params =
2469 self.crypto
2470 .transport_parameters()?
2471 .ok_or_else(|| TransportError {
2472 code: TransportErrorCode::crypto(0x6d),
2473 frame: None,
2474 reason: "transport parameters missing".into(),
2475 })?;
2476
2477 if self.has_0rtt() {
2478 if !self.crypto.early_data_accepted().unwrap() {
2479 debug_assert!(self.side.is_client());
2480 debug!("0-RTT rejected");
2481 self.accepted_0rtt = false;
2482 self.streams.zero_rtt_rejected();
2483
2484 self.spaces[SpaceId::Data].pending = Retransmits::default();
2486
2487 let sent_packets =
2489 mem::take(&mut self.spaces[SpaceId::Data].sent_packets);
2490 for (pn, packet) in sent_packets {
2491 self.remove_in_flight(pn, &packet);
2492 }
2493 } else {
2494 self.accepted_0rtt = true;
2495 params.validate_resumption_from(&self.peer_params)?;
2496 }
2497 }
2498 if let Some(token) = params.stateless_reset_token {
2499 self.endpoint_events
2500 .push_back(EndpointEventInner::ResetToken(self.path.remote, token));
2501 }
2502 self.handle_peer_params(params)?;
2503 self.issue_first_cids(now);
2504 } else {
2505 self.spaces[SpaceId::Data].pending.handshake_done = true;
2507 self.discard_space(now, SpaceId::Handshake);
2508 }
2509
2510 self.events.push_back(Event::Connected);
2511 self.state = State::Established;
2512 trace!("established");
2513 Ok(())
2514 }
2515 Header::Initial(InitialHeader {
2516 src_cid: rem_cid, ..
2517 }) => {
2518 if !state.rem_cid_set {
2519 trace!("switching remote CID to {}", rem_cid);
2520 let mut state = state.clone();
2521 self.rem_cids.update_initial_cid(rem_cid);
2522 self.rem_handshake_cid = rem_cid;
2523 self.orig_rem_cid = rem_cid;
2524 state.rem_cid_set = true;
2525 self.state = State::Handshake(state);
2526 } else if rem_cid != self.rem_handshake_cid {
2527 debug!(
2528 "discarding packet with mismatched remote CID: {} != {}",
2529 self.rem_handshake_cid, rem_cid
2530 );
2531 return Ok(());
2532 }
2533
2534 let starting_space = self.highest_space;
2535 self.process_early_payload(now, packet)?;
2536
2537 if self.side.is_server()
2538 && starting_space == SpaceId::Initial
2539 && self.highest_space != SpaceId::Initial
2540 {
2541 let params =
2542 self.crypto
2543 .transport_parameters()?
2544 .ok_or_else(|| TransportError {
2545 code: TransportErrorCode::crypto(0x6d),
2546 frame: None,
2547 reason: "transport parameters missing".into(),
2548 })?;
2549 self.handle_peer_params(params)?;
2550 self.issue_first_cids(now);
2551 self.init_0rtt();
2552 }
2553 Ok(())
2554 }
2555 Header::Long {
2556 ty: LongType::ZeroRtt,
2557 ..
2558 } => {
2559 self.process_payload(now, remote, number.unwrap(), packet)?;
2560 Ok(())
2561 }
2562 Header::VersionNegotiate { .. } => {
2563 if self.total_authed_packets > 1 {
2564 return Ok(());
2565 }
2566 let supported = packet
2567 .payload
2568 .chunks(4)
2569 .any(|x| match <[u8; 4]>::try_from(x) {
2570 Ok(version) => self.version == u32::from_be_bytes(version),
2571 Err(_) => false,
2572 });
2573 if supported {
2574 return Ok(());
2575 }
2576 debug!("remote doesn't support our version");
2577 Err(ConnectionError::VersionMismatch)
2578 }
2579 Header::Short { .. } => unreachable!(
2580 "short packets received during handshake are discarded in handle_packet"
2581 ),
2582 }
2583 }
2584
2585 fn process_early_payload(
2587 &mut self,
2588 now: Instant,
2589 packet: Packet,
2590 ) -> Result<(), TransportError> {
2591 debug_assert_ne!(packet.header.space(), SpaceId::Data);
2592 let payload_len = packet.payload.len();
2593 let mut ack_eliciting = false;
2594 for result in frame::Iter::new(packet.payload.freeze())? {
2595 let frame = result?;
2596 let span = match frame {
2597 Frame::Padding => continue,
2598 _ => Some(trace_span!("frame", ty = %frame.ty())),
2599 };
2600
2601 self.stats.frame_rx.record(&frame);
2602
2603 let _guard = span.as_ref().map(|x| x.enter());
2604 ack_eliciting |= frame.is_ack_eliciting();
2605
2606 match frame {
2608 Frame::Padding | Frame::Ping => {}
2609 Frame::Crypto(frame) => {
2610 self.read_crypto(packet.header.space(), &frame, payload_len)?;
2611 }
2612 Frame::Ack(ack) => {
2613 self.on_ack_received(now, packet.header.space(), ack)?;
2614 }
2615 Frame::Close(reason) => {
2616 self.error = Some(reason.into());
2617 self.state = State::Draining;
2618 return Ok(());
2619 }
2620 _ => {
2621 let mut err =
2622 TransportError::PROTOCOL_VIOLATION("illegal frame type in handshake");
2623 err.frame = Some(frame.ty());
2624 return Err(err);
2625 }
2626 }
2627 }
2628
2629 if ack_eliciting {
2630 self.spaces[packet.header.space()]
2632 .pending_acks
2633 .set_immediate_ack_required();
2634 }
2635
2636 self.write_crypto();
2637 Ok(())
2638 }
2639
2640 fn process_payload(
2641 &mut self,
2642 now: Instant,
2643 remote: SocketAddr,
2644 number: u64,
2645 packet: Packet,
2646 ) -> Result<(), TransportError> {
2647 let payload = packet.payload.freeze();
2648 let mut is_probing_packet = true;
2649 let mut close = None;
2650 let payload_len = payload.len();
2651 let mut ack_eliciting = false;
2652 for result in frame::Iter::new(payload)? {
2653 let frame = result?;
2654 let span = match frame {
2655 Frame::Padding => continue,
2656 _ => Some(trace_span!("frame", ty = %frame.ty())),
2657 };
2658
2659 self.stats.frame_rx.record(&frame);
2660 match &frame {
2663 Frame::Crypto(f) => {
2664 trace!(offset = f.offset, len = f.data.len(), "got crypto frame");
2665 }
2666 Frame::Stream(f) => {
2667 trace!(id = %f.id, offset = f.offset, len = f.data.len(), fin = f.fin, "got stream frame");
2668 }
2669 Frame::Datagram(f) => {
2670 trace!(len = f.data.len(), "got datagram frame");
2671 }
2672 f => {
2673 trace!("got frame {:?}", f);
2674 }
2675 }
2676
2677 let _guard = span.as_ref().map(|x| x.enter());
2678 if packet.header.is_0rtt() {
2679 match frame {
2680 Frame::Crypto(_) | Frame::Close(Close::Application(_)) => {
2681 return Err(TransportError::PROTOCOL_VIOLATION(
2682 "illegal frame type in 0-RTT",
2683 ));
2684 }
2685 _ => {}
2686 }
2687 }
2688 ack_eliciting |= frame.is_ack_eliciting();
2689
2690 match frame {
2692 Frame::Padding
2693 | Frame::PathChallenge(_)
2694 | Frame::PathResponse(_)
2695 | Frame::NewConnectionId(_) => {}
2696 _ => {
2697 is_probing_packet = false;
2698 }
2699 }
2700 match frame {
2701 Frame::Crypto(frame) => {
2702 self.read_crypto(SpaceId::Data, &frame, payload_len)?;
2703 }
2704 Frame::Stream(frame) => {
2705 if self.streams.received(frame, payload_len)?.should_transmit() {
2706 self.spaces[SpaceId::Data].pending.max_data = true;
2707 }
2708 }
2709 Frame::Ack(ack) => {
2710 self.on_ack_received(now, SpaceId::Data, ack)?;
2711 }
2712 Frame::Padding | Frame::Ping => {}
2713 Frame::Close(reason) => {
2714 close = Some(reason);
2715 }
2716 Frame::PathChallenge(token) => {
2717 self.path_responses.push(number, token, remote);
2718 if remote == self.path.remote {
2719 match self.peer_supports_ack_frequency() {
2722 true => self.immediate_ack(),
2723 false => self.ping(),
2724 }
2725 }
2726 }
2727 Frame::PathResponse(token) => {
2728 if self.path.challenge == Some(token) && remote == self.path.remote {
2729 trace!("new path validated");
2730 self.timers.stop(Timer::PathValidation);
2731 self.path.challenge = None;
2732 self.path.validated = true;
2733 if let Some((_, ref mut prev_path)) = self.prev_path {
2734 prev_path.challenge = None;
2735 prev_path.challenge_pending = false;
2736 }
2737 } else {
2738 debug!(token, "ignoring invalid PATH_RESPONSE");
2739 }
2740 }
2741 Frame::MaxData(bytes) => {
2742 self.streams.received_max_data(bytes);
2743 }
2744 Frame::MaxStreamData { id, offset } => {
2745 self.streams.received_max_stream_data(id, offset)?;
2746 }
2747 Frame::MaxStreams { dir, count } => {
2748 self.streams.received_max_streams(dir, count)?;
2749 }
2750 Frame::ResetStream(frame) => {
2751 if self.streams.received_reset(frame)?.should_transmit() {
2752 self.spaces[SpaceId::Data].pending.max_data = true;
2753 }
2754 }
2755 Frame::DataBlocked { offset } => {
2756 debug!(offset, "peer claims to be blocked at connection level");
2757 }
2758 Frame::StreamDataBlocked { id, offset } => {
2759 if id.initiator() == self.side.side() && id.dir() == Dir::Uni {
2760 debug!("got STREAM_DATA_BLOCKED on send-only {}", id);
2761 return Err(TransportError::STREAM_STATE_ERROR(
2762 "STREAM_DATA_BLOCKED on send-only stream",
2763 ));
2764 }
2765 debug!(
2766 stream = %id,
2767 offset, "peer claims to be blocked at stream level"
2768 );
2769 }
2770 Frame::StreamsBlocked { dir, limit } => {
2771 if limit > MAX_STREAM_COUNT {
2772 return Err(TransportError::FRAME_ENCODING_ERROR(
2773 "unrepresentable stream limit",
2774 ));
2775 }
2776 debug!(
2777 "peer claims to be blocked opening more than {} {} streams",
2778 limit, dir
2779 );
2780 }
2781 Frame::StopSending(frame::StopSending { id, error_code }) => {
2782 if id.initiator() != self.side.side() {
2783 if id.dir() == Dir::Uni {
2784 debug!("got STOP_SENDING on recv-only {}", id);
2785 return Err(TransportError::STREAM_STATE_ERROR(
2786 "STOP_SENDING on recv-only stream",
2787 ));
2788 }
2789 } else if self.streams.is_local_unopened(id) {
2790 return Err(TransportError::STREAM_STATE_ERROR(
2791 "STOP_SENDING on unopened stream",
2792 ));
2793 }
2794 self.streams.received_stop_sending(id, error_code);
2795 }
2796 Frame::RetireConnectionId { sequence } => {
2797 let allow_more_cids = self
2798 .local_cid_state
2799 .on_cid_retirement(sequence, self.peer_params.issue_cids_limit())?;
2800 self.endpoint_events
2801 .push_back(EndpointEventInner::RetireConnectionId(
2802 now,
2803 sequence,
2804 allow_more_cids,
2805 ));
2806 }
2807 Frame::NewConnectionId(frame) => {
2808 trace!(
2809 sequence = frame.sequence,
2810 id = %frame.id,
2811 retire_prior_to = frame.retire_prior_to,
2812 );
2813 if self.rem_cids.active().is_empty() {
2814 return Err(TransportError::PROTOCOL_VIOLATION(
2815 "NEW_CONNECTION_ID when CIDs aren't in use",
2816 ));
2817 }
2818 if frame.retire_prior_to > frame.sequence {
2819 return Err(TransportError::PROTOCOL_VIOLATION(
2820 "NEW_CONNECTION_ID retiring unissued CIDs",
2821 ));
2822 }
2823
2824 use crate::cid_queue::InsertError;
2825 match self.rem_cids.insert(frame) {
2826 Ok(None) => {}
2827 Ok(Some((retired, reset_token))) => {
2828 let pending_retired =
2829 &mut self.spaces[SpaceId::Data].pending.retire_cids;
2830 const MAX_PENDING_RETIRED_CIDS: u64 = CidQueue::LEN as u64 * 10;
2833 if (pending_retired.len() as u64)
2836 .saturating_add(retired.end.saturating_sub(retired.start))
2837 > MAX_PENDING_RETIRED_CIDS
2838 {
2839 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(
2840 "queued too many retired CIDs",
2841 ));
2842 }
2843 pending_retired.extend(retired);
2844 self.set_reset_token(reset_token);
2845 }
2846 Err(InsertError::ExceedsLimit) => {
2847 return Err(TransportError::CONNECTION_ID_LIMIT_ERROR(""));
2848 }
2849 Err(InsertError::Retired) => {
2850 trace!("discarding already-retired");
2851 self.spaces[SpaceId::Data]
2855 .pending
2856 .retire_cids
2857 .push(frame.sequence);
2858 continue;
2859 }
2860 };
2861
2862 if self.side.is_server() && self.rem_cids.active_seq() == 0 {
2863 self.update_rem_cid();
2866 }
2867 }
2868 Frame::NewToken(NewToken { token }) => {
2869 let ConnectionSide::Client {
2870 token_store,
2871 server_name,
2872 ..
2873 } = &self.side
2874 else {
2875 return Err(TransportError::PROTOCOL_VIOLATION("client sent NEW_TOKEN"));
2876 };
2877 if token.is_empty() {
2878 return Err(TransportError::FRAME_ENCODING_ERROR("empty token"));
2879 }
2880 trace!("got new token");
2881 token_store.insert(server_name, token);
2882 }
2883 Frame::Datagram(datagram) => {
2884 if self
2885 .datagrams
2886 .received(datagram, &self.config.datagram_receive_buffer_size)?
2887 {
2888 self.events.push_back(Event::DatagramReceived);
2889 }
2890 }
2891 Frame::AckFrequency(ack_frequency) => {
2892 let space = &mut self.spaces[SpaceId::Data];
2894
2895 if !self
2896 .ack_frequency
2897 .ack_frequency_received(&ack_frequency, &mut space.pending_acks)?
2898 {
2899 continue;
2901 }
2902
2903 if let Some(timeout) = space
2906 .pending_acks
2907 .max_ack_delay_timeout(self.ack_frequency.max_ack_delay)
2908 {
2909 self.timers.set(Timer::MaxAckDelay, timeout);
2910 }
2911 }
2912 Frame::ImmediateAck => {
2913 self.spaces[SpaceId::Data]
2915 .pending_acks
2916 .set_immediate_ack_required();
2917 }
2918 Frame::HandshakeDone => {
2919 if self.side.is_server() {
2920 return Err(TransportError::PROTOCOL_VIOLATION(
2921 "client sent HANDSHAKE_DONE",
2922 ));
2923 }
2924 if self.spaces[SpaceId::Handshake].crypto.is_some() {
2925 self.discard_space(now, SpaceId::Handshake);
2926 }
2927 }
2928 }
2929 }
2930
2931 let space = &mut self.spaces[SpaceId::Data];
2932 if space
2933 .pending_acks
2934 .packet_received(now, number, ack_eliciting, &space.dedup)
2935 {
2936 self.timers
2937 .set(Timer::MaxAckDelay, now + self.ack_frequency.max_ack_delay);
2938 }
2939
2940 let pending = &mut self.spaces[SpaceId::Data].pending;
2945 self.streams.queue_max_stream_id(pending);
2946
2947 if let Some(reason) = close {
2948 self.error = Some(reason.into());
2949 self.state = State::Draining;
2950 self.close = true;
2951 }
2952
2953 if remote != self.path.remote
2954 && !is_probing_packet
2955 && number == self.spaces[SpaceId::Data].rx_packet
2956 {
2957 let ConnectionSide::Server { ref server_config } = self.side else {
2958 panic!("packets from unknown remote should be dropped by clients");
2959 };
2960 debug_assert!(
2961 server_config.migration,
2962 "migration-initiating packets should have been dropped immediately"
2963 );
2964 self.migrate(now, remote);
2965 self.update_rem_cid();
2967 self.spin = false;
2968 }
2969
2970 Ok(())
2971 }
2972
2973 fn migrate(&mut self, now: Instant, remote: SocketAddr) {
2974 trace!(%remote, "migration initiated");
2975 let mut new_path = if remote.is_ipv4() && remote.ip() == self.path.remote.ip() {
2979 PathData::from_previous(remote, &self.path, now)
2980 } else {
2981 let peer_max_udp_payload_size =
2982 u16::try_from(self.peer_params.max_udp_payload_size.into_inner())
2983 .unwrap_or(u16::MAX);
2984 PathData::new(
2985 remote,
2986 self.allow_mtud,
2987 Some(peer_max_udp_payload_size),
2988 now,
2989 &self.config,
2990 )
2991 };
2992 new_path.challenge = Some(self.rng.random());
2993 new_path.challenge_pending = true;
2994 let prev_pto = self.pto(SpaceId::Data);
2995
2996 let mut prev = mem::replace(&mut self.path, new_path);
2997 if prev.challenge.is_none() {
2999 prev.challenge = Some(self.rng.random());
3000 prev.challenge_pending = true;
3001 self.prev_path = Some((self.rem_cids.active(), prev));
3004 }
3005
3006 self.timers.set(
3007 Timer::PathValidation,
3008 now + 3 * cmp::max(self.pto(SpaceId::Data), prev_pto),
3009 );
3010 }
3011
3012 pub fn local_address_changed(&mut self) {
3014 self.update_rem_cid();
3015 self.ping();
3016 }
3017
3018 fn update_rem_cid(&mut self) {
3020 let (reset_token, retired) = match self.rem_cids.next() {
3021 Some(x) => x,
3022 None => return,
3023 };
3024
3025 self.spaces[SpaceId::Data]
3027 .pending
3028 .retire_cids
3029 .extend(retired);
3030 self.set_reset_token(reset_token);
3031 }
3032
3033 fn set_reset_token(&mut self, reset_token: ResetToken) {
3034 self.endpoint_events
3035 .push_back(EndpointEventInner::ResetToken(
3036 self.path.remote,
3037 reset_token,
3038 ));
3039 self.peer_params.stateless_reset_token = Some(reset_token);
3040 }
3041
3042 fn issue_first_cids(&mut self, now: Instant) {
3044 if self.local_cid_state.cid_len() == 0 {
3045 return;
3046 }
3047
3048 let n = self.peer_params.issue_cids_limit() - 1;
3050 self.endpoint_events
3051 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3052 }
3053
3054 fn populate_packet(
3055 &mut self,
3056 now: Instant,
3057 space_id: SpaceId,
3058 buf: &mut Vec<u8>,
3059 max_size: usize,
3060 pn: u64,
3061 ) -> SentFrames {
3062 let mut sent = SentFrames::default();
3063 let space = &mut self.spaces[space_id];
3064 let is_0rtt = space_id == SpaceId::Data && space.crypto.is_none();
3065 space.pending_acks.maybe_ack_non_eliciting();
3066
3067 if !is_0rtt && mem::replace(&mut space.pending.handshake_done, false) {
3069 buf.write(frame::FrameType::HANDSHAKE_DONE);
3070 sent.retransmits.get_or_create().handshake_done = true;
3071 self.stats.frame_tx.handshake_done =
3073 self.stats.frame_tx.handshake_done.saturating_add(1);
3074 }
3075
3076 if mem::replace(&mut space.ping_pending, false) {
3078 trace!("PING");
3079 buf.write(frame::FrameType::PING);
3080 sent.non_retransmits = true;
3081 self.stats.frame_tx.ping += 1;
3082 }
3083
3084 if mem::replace(&mut space.immediate_ack_pending, false) {
3086 trace!("IMMEDIATE_ACK");
3087 buf.write(frame::FrameType::IMMEDIATE_ACK);
3088 sent.non_retransmits = true;
3089 self.stats.frame_tx.immediate_ack += 1;
3090 }
3091
3092 if space.pending_acks.can_send() {
3094 Self::populate_acks(
3095 now,
3096 self.receiving_ecn,
3097 &mut sent,
3098 space,
3099 buf,
3100 &mut self.stats,
3101 );
3102 }
3103
3104 if mem::replace(&mut space.pending.ack_frequency, false) {
3106 let sequence_number = self.ack_frequency.next_sequence_number();
3107
3108 let config = self.config.ack_frequency_config.as_ref().unwrap();
3110
3111 let max_ack_delay = self.ack_frequency.candidate_max_ack_delay(
3113 self.path.rtt.get(),
3114 config,
3115 &self.peer_params,
3116 );
3117
3118 trace!(?max_ack_delay, "ACK_FREQUENCY");
3119
3120 frame::AckFrequency {
3121 sequence: sequence_number,
3122 ack_eliciting_threshold: config.ack_eliciting_threshold,
3123 request_max_ack_delay: max_ack_delay.as_micros().try_into().unwrap_or(VarInt::MAX),
3124 reordering_threshold: config.reordering_threshold,
3125 }
3126 .encode(buf);
3127
3128 sent.retransmits.get_or_create().ack_frequency = true;
3129
3130 self.ack_frequency.ack_frequency_sent(pn, max_ack_delay);
3131 self.stats.frame_tx.ack_frequency += 1;
3132 }
3133
3134 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3136 if let Some(token) = self.path.challenge {
3138 self.path.challenge_pending = false;
3140 sent.non_retransmits = true;
3141 sent.requires_padding = true;
3142 trace!("PATH_CHALLENGE {:08x}", token);
3143 buf.write(frame::FrameType::PATH_CHALLENGE);
3144 buf.write(token);
3145 self.stats.frame_tx.path_challenge += 1;
3146 }
3147 }
3148
3149 if buf.len() + 9 < max_size && space_id == SpaceId::Data {
3151 if let Some(token) = self.path_responses.pop_on_path(self.path.remote) {
3152 sent.non_retransmits = true;
3153 sent.requires_padding = true;
3154 trace!("PATH_RESPONSE {:08x}", token);
3155 buf.write(frame::FrameType::PATH_RESPONSE);
3156 buf.write(token);
3157 self.stats.frame_tx.path_response += 1;
3158 }
3159 }
3160
3161 while buf.len() + frame::Crypto::SIZE_BOUND < max_size && !is_0rtt {
3163 let mut frame = match space.pending.crypto.pop_front() {
3164 Some(x) => x,
3165 None => break,
3166 };
3167
3168 let max_crypto_data_size = max_size
3173 - buf.len()
3174 - 1 - VarInt::size(unsafe { VarInt::from_u64_unchecked(frame.offset) })
3176 - 2; let len = frame
3179 .data
3180 .len()
3181 .min(2usize.pow(14) - 1)
3182 .min(max_crypto_data_size);
3183
3184 let data = frame.data.split_to(len);
3185 let truncated = frame::Crypto {
3186 offset: frame.offset,
3187 data,
3188 };
3189 trace!(
3190 "CRYPTO: off {} len {}",
3191 truncated.offset,
3192 truncated.data.len()
3193 );
3194 truncated.encode(buf);
3195 self.stats.frame_tx.crypto += 1;
3196 sent.retransmits.get_or_create().crypto.push_back(truncated);
3197 if !frame.data.is_empty() {
3198 frame.offset += len as u64;
3199 space.pending.crypto.push_front(frame);
3200 }
3201 }
3202
3203 if space_id == SpaceId::Data {
3204 self.streams.write_control_frames(
3205 buf,
3206 &mut space.pending,
3207 &mut sent.retransmits,
3208 &mut self.stats.frame_tx,
3209 max_size,
3210 );
3211 }
3212
3213 while buf.len() + 44 < max_size {
3215 let issued = match space.pending.new_cids.pop() {
3216 Some(x) => x,
3217 None => break,
3218 };
3219 trace!(
3220 sequence = issued.sequence,
3221 id = %issued.id,
3222 "NEW_CONNECTION_ID"
3223 );
3224 frame::NewConnectionId {
3225 sequence: issued.sequence,
3226 retire_prior_to: self.local_cid_state.retire_prior_to(),
3227 id: issued.id,
3228 reset_token: issued.reset_token,
3229 }
3230 .encode(buf);
3231 sent.retransmits.get_or_create().new_cids.push(issued);
3232 self.stats.frame_tx.new_connection_id += 1;
3233 }
3234
3235 while buf.len() + frame::RETIRE_CONNECTION_ID_SIZE_BOUND < max_size {
3237 let seq = match space.pending.retire_cids.pop() {
3238 Some(x) => x,
3239 None => break,
3240 };
3241 trace!(sequence = seq, "RETIRE_CONNECTION_ID");
3242 buf.write(frame::FrameType::RETIRE_CONNECTION_ID);
3243 buf.write_var(seq);
3244 sent.retransmits.get_or_create().retire_cids.push(seq);
3245 self.stats.frame_tx.retire_connection_id += 1;
3246 }
3247
3248 let mut sent_datagrams = false;
3250 while buf.len() + Datagram::SIZE_BOUND < max_size && space_id == SpaceId::Data {
3251 match self.datagrams.write(buf, max_size) {
3252 true => {
3253 sent_datagrams = true;
3254 sent.non_retransmits = true;
3255 self.stats.frame_tx.datagram += 1;
3256 }
3257 false => break,
3258 }
3259 }
3260 if self.datagrams.send_blocked && sent_datagrams {
3261 self.events.push_back(Event::DatagramsUnblocked);
3262 self.datagrams.send_blocked = false;
3263 }
3264
3265 while let Some(remote_addr) = space.pending.new_tokens.pop() {
3267 debug_assert_eq!(space_id, SpaceId::Data);
3268 let ConnectionSide::Server { server_config } = &self.side else {
3269 panic!("NEW_TOKEN frames should not be enqueued by clients");
3270 };
3271
3272 if remote_addr != self.path.remote {
3273 continue;
3278 }
3279
3280 let token = Token::new(
3281 TokenPayload::Validation {
3282 ip: remote_addr.ip(),
3283 issued: server_config.time_source.now(),
3284 },
3285 &mut self.rng,
3286 );
3287 let new_token = NewToken {
3288 token: token.encode(&*server_config.token_key).into(),
3289 };
3290
3291 if buf.len() + new_token.size() >= max_size {
3292 space.pending.new_tokens.push(remote_addr);
3293 break;
3294 }
3295
3296 new_token.encode(buf);
3297 sent.retransmits
3298 .get_or_create()
3299 .new_tokens
3300 .push(remote_addr);
3301 self.stats.frame_tx.new_token += 1;
3302 }
3303
3304 if space_id == SpaceId::Data {
3306 sent.stream_frames =
3307 self.streams
3308 .write_stream_frames(buf, max_size, self.config.send_fairness);
3309 self.stats.frame_tx.stream += sent.stream_frames.len() as u64;
3310 }
3311
3312 sent
3313 }
3314
3315 fn populate_acks(
3320 now: Instant,
3321 receiving_ecn: bool,
3322 sent: &mut SentFrames,
3323 space: &mut PacketSpace,
3324 buf: &mut Vec<u8>,
3325 stats: &mut ConnectionStats,
3326 ) {
3327 debug_assert!(!space.pending_acks.ranges().is_empty());
3328
3329 debug_assert!(space.crypto.is_some(), "tried to send ACK in 0-RTT");
3331 let ecn = if receiving_ecn {
3332 Some(&space.ecn_counters)
3333 } else {
3334 None
3335 };
3336 sent.largest_acked = space.pending_acks.ranges().max();
3337
3338 let delay_micros = space.pending_acks.ack_delay(now).as_micros() as u64;
3339
3340 let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
3342 let delay = delay_micros >> ack_delay_exp.into_inner();
3343
3344 trace!(
3345 "ACK {:?}, Delay = {}us",
3346 space.pending_acks.ranges(),
3347 delay_micros
3348 );
3349
3350 frame::Ack::encode(delay as _, space.pending_acks.ranges(), ecn, buf);
3351 stats.frame_tx.acks += 1;
3352 }
3353
3354 fn close_common(&mut self) {
3355 trace!("connection closed");
3356 for &timer in &Timer::VALUES {
3357 self.timers.stop(timer);
3358 }
3359 }
3360
3361 fn set_close_timer(&mut self, now: Instant) {
3362 self.timers
3363 .set(Timer::Close, now + 3 * self.pto(self.highest_space));
3364 }
3365
3366 fn handle_peer_params(&mut self, params: TransportParameters) -> Result<(), TransportError> {
3368 if Some(self.orig_rem_cid) != params.initial_src_cid
3369 || (self.side.is_client()
3370 && (Some(self.initial_dst_cid) != params.original_dst_cid
3371 || self.retry_src_cid != params.retry_src_cid))
3372 {
3373 return Err(TransportError::TRANSPORT_PARAMETER_ERROR(
3374 "CID authentication failure",
3375 ));
3376 }
3377
3378 self.set_peer_params(params);
3379
3380 Ok(())
3381 }
3382
3383 fn set_peer_params(&mut self, params: TransportParameters) {
3384 self.streams.set_params(¶ms);
3385 self.idle_timeout =
3386 negotiate_max_idle_timeout(self.config.max_idle_timeout, Some(params.max_idle_timeout));
3387 trace!("negotiated max idle timeout {:?}", self.idle_timeout);
3388 if let Some(ref info) = params.preferred_address {
3389 self.rem_cids.insert(frame::NewConnectionId {
3390 sequence: 1,
3391 id: info.connection_id,
3392 reset_token: info.stateless_reset_token,
3393 retire_prior_to: 0,
3394 }).expect("preferred address CID is the first received, and hence is guaranteed to be legal");
3395 }
3396 self.ack_frequency.peer_max_ack_delay = get_max_ack_delay(¶ms);
3397 self.peer_params = params;
3398 self.path.mtud.on_peer_max_udp_payload_size_received(
3399 u16::try_from(self.peer_params.max_udp_payload_size.into_inner()).unwrap_or(u16::MAX),
3400 );
3401 }
3402
3403 fn decrypt_packet(
3404 &mut self,
3405 now: Instant,
3406 packet: &mut Packet,
3407 ) -> Result<Option<u64>, Option<TransportError>> {
3408 let result = packet_crypto::decrypt_packet_body(
3409 packet,
3410 &self.spaces,
3411 self.zero_rtt_crypto.as_ref(),
3412 self.key_phase,
3413 self.prev_crypto.as_ref(),
3414 self.next_crypto.as_ref(),
3415 )?;
3416
3417 let result = match result {
3418 Some(r) => r,
3419 None => return Ok(None),
3420 };
3421
3422 if result.outgoing_key_update_acked {
3423 if let Some(prev) = self.prev_crypto.as_mut() {
3424 prev.end_packet = Some((result.number, now));
3425 self.set_key_discard_timer(now, packet.header.space());
3426 }
3427 }
3428
3429 if result.incoming_key_update {
3430 trace!("key update authenticated");
3431 self.update_keys(Some((result.number, now)), true);
3432 self.set_key_discard_timer(now, packet.header.space());
3433 }
3434
3435 Ok(Some(result.number))
3436 }
3437
3438 fn update_keys(&mut self, end_packet: Option<(u64, Instant)>, remote: bool) {
3439 trace!("executing key update");
3440 let new = self
3444 .crypto
3445 .next_1rtt_keys()
3446 .expect("only called for `Data` packets");
3447 self.key_phase_size = new
3448 .local
3449 .confidentiality_limit()
3450 .saturating_sub(KEY_UPDATE_MARGIN);
3451 let old = mem::replace(
3452 &mut self.spaces[SpaceId::Data]
3453 .crypto
3454 .as_mut()
3455 .unwrap() .packet,
3457 mem::replace(self.next_crypto.as_mut().unwrap(), new),
3458 );
3459 self.spaces[SpaceId::Data].sent_with_keys = 0;
3460 self.prev_crypto = Some(PrevCrypto {
3461 crypto: old,
3462 end_packet,
3463 update_unacked: remote,
3464 });
3465 self.key_phase = !self.key_phase;
3466 }
3467
3468 fn peer_supports_ack_frequency(&self) -> bool {
3469 self.peer_params.min_ack_delay.is_some()
3470 }
3471
3472 pub(crate) fn immediate_ack(&mut self) {
3477 self.spaces[self.highest_space].immediate_ack_pending = true;
3478 }
3479
3480 #[cfg(test)]
3482 pub(crate) fn decode_packet(&self, event: &ConnectionEvent) -> Option<Vec<u8>> {
3483 let (first_decode, remaining) = match &event.0 {
3484 ConnectionEventInner::Datagram(DatagramConnectionEvent {
3485 first_decode,
3486 remaining,
3487 ..
3488 }) => (first_decode, remaining),
3489 _ => return None,
3490 };
3491
3492 if remaining.is_some() {
3493 panic!("Packets should never be coalesced in tests");
3494 }
3495
3496 let decrypted_header = packet_crypto::unprotect_header(
3497 first_decode.clone(),
3498 &self.spaces,
3499 self.zero_rtt_crypto.as_ref(),
3500 self.peer_params.stateless_reset_token,
3501 )?;
3502
3503 let mut packet = decrypted_header.packet?;
3504 packet_crypto::decrypt_packet_body(
3505 &mut packet,
3506 &self.spaces,
3507 self.zero_rtt_crypto.as_ref(),
3508 self.key_phase,
3509 self.prev_crypto.as_ref(),
3510 self.next_crypto.as_ref(),
3511 )
3512 .ok()?;
3513
3514 Some(packet.payload.to_vec())
3515 }
3516
3517 #[cfg(test)]
3520 pub(crate) fn bytes_in_flight(&self) -> u64 {
3521 self.path.in_flight.bytes
3522 }
3523
3524 #[cfg(test)]
3526 pub(crate) fn congestion_window(&self) -> u64 {
3527 self.path
3528 .congestion
3529 .window()
3530 .saturating_sub(self.path.in_flight.bytes)
3531 }
3532
3533 #[cfg(test)]
3535 pub(crate) fn is_idle(&self) -> bool {
3536 Timer::VALUES
3537 .iter()
3538 .filter(|&&t| !matches!(t, Timer::KeepAlive | Timer::PushNewCid | Timer::KeyDiscard))
3539 .filter_map(|&t| Some((t, self.timers.get(t)?)))
3540 .min_by_key(|&(_, time)| time)
3541 .map_or(true, |(timer, _)| timer == Timer::Idle)
3542 }
3543
3544 #[cfg(test)]
3546 pub(crate) fn lost_packets(&self) -> u64 {
3547 self.lost_packets
3548 }
3549
3550 #[cfg(test)]
3552 pub(crate) fn using_ecn(&self) -> bool {
3553 self.path.sending_ecn
3554 }
3555
3556 #[cfg(test)]
3558 pub(crate) fn total_recvd(&self) -> u64 {
3559 self.path.total_recvd
3560 }
3561
3562 #[cfg(test)]
3563 pub(crate) fn active_local_cid_seq(&self) -> (u64, u64) {
3564 self.local_cid_state.active_seq()
3565 }
3566
3567 #[cfg(test)]
3570 pub(crate) fn rotate_local_cid(&mut self, v: u64, now: Instant) {
3571 let n = self.local_cid_state.assign_retire_seq(v);
3572 self.endpoint_events
3573 .push_back(EndpointEventInner::NeedIdentifiers(now, n));
3574 }
3575
3576 #[cfg(test)]
3578 pub(crate) fn active_rem_cid_seq(&self) -> u64 {
3579 self.rem_cids.active_seq()
3580 }
3581
3582 #[cfg(test)]
3584 pub(crate) fn path_mtu(&self) -> u16 {
3585 self.path.current_mtu()
3586 }
3587
3588 fn can_send_1rtt(&self, max_size: usize) -> bool {
3592 self.streams.can_send_stream_data()
3593 || self.path.challenge_pending
3594 || self
3595 .prev_path
3596 .as_ref()
3597 .is_some_and(|(_, x)| x.challenge_pending)
3598 || !self.path_responses.is_empty()
3599 || self
3600 .datagrams
3601 .outgoing
3602 .front()
3603 .is_some_and(|x| x.size(true) <= max_size)
3604 }
3605
3606 fn remove_in_flight(&mut self, pn: u64, packet: &SentPacket) {
3608 for path in [&mut self.path]
3610 .into_iter()
3611 .chain(self.prev_path.as_mut().map(|(_, data)| data))
3612 {
3613 if path.remove_in_flight(pn, packet) {
3614 return;
3615 }
3616 }
3617 }
3618
3619 fn kill(&mut self, reason: ConnectionError) {
3621 self.close_common();
3622 self.error = Some(reason);
3623 self.state = State::Drained;
3624 self.endpoint_events.push_back(EndpointEventInner::Drained);
3625 }
3626
3627 pub fn current_mtu(&self) -> u16 {
3631 self.path.current_mtu()
3632 }
3633
3634 fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
3641 let pn_len = match pn {
3642 Some(pn) => PacketNumber::new(
3643 pn,
3644 self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
3645 )
3646 .len(),
3647 None => 4,
3649 };
3650
3651 1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
3653 }
3654
3655 fn tag_len_1rtt(&self) -> usize {
3656 let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
3657 Some(crypto) => Some(&*crypto.packet.local),
3658 None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
3659 };
3660 key.map_or(16, |x| x.tag_len())
3664 }
3665
3666 fn on_path_validated(&mut self) {
3668 self.path.validated = true;
3669 let ConnectionSide::Server { server_config } = &self.side else {
3670 return;
3671 };
3672 let new_tokens = &mut self.spaces[SpaceId::Data as usize].pending.new_tokens;
3673 new_tokens.clear();
3674 for _ in 0..server_config.validation_token.sent {
3675 new_tokens.push(self.path.remote);
3676 }
3677 }
3678}
3679
3680impl fmt::Debug for Connection {
3681 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3682 f.debug_struct("Connection")
3683 .field("handshake_cid", &self.handshake_cid)
3684 .finish()
3685 }
3686}
3687
3688enum ConnectionSide {
3690 Client {
3691 token: Bytes,
3693 token_store: Arc<dyn TokenStore>,
3694 server_name: String,
3695 },
3696 Server {
3697 server_config: Arc<ServerConfig>,
3698 },
3699}
3700
3701impl ConnectionSide {
3702 fn remote_may_migrate(&self) -> bool {
3703 match self {
3704 Self::Server { server_config } => server_config.migration,
3705 Self::Client { .. } => false,
3706 }
3707 }
3708
3709 fn is_client(&self) -> bool {
3710 self.side().is_client()
3711 }
3712
3713 fn is_server(&self) -> bool {
3714 self.side().is_server()
3715 }
3716
3717 fn side(&self) -> Side {
3718 match *self {
3719 Self::Client { .. } => Side::Client,
3720 Self::Server { .. } => Side::Server,
3721 }
3722 }
3723}
3724
3725impl From<SideArgs> for ConnectionSide {
3726 fn from(side: SideArgs) -> Self {
3727 match side {
3728 SideArgs::Client {
3729 token_store,
3730 server_name,
3731 } => Self::Client {
3732 token: token_store.take(&server_name).unwrap_or_default(),
3733 token_store,
3734 server_name,
3735 },
3736 SideArgs::Server {
3737 server_config,
3738 pref_addr_cid: _,
3739 path_validated: _,
3740 } => Self::Server { server_config },
3741 }
3742 }
3743}
3744
3745pub(crate) enum SideArgs {
3747 Client {
3748 token_store: Arc<dyn TokenStore>,
3749 server_name: String,
3750 },
3751 Server {
3752 server_config: Arc<ServerConfig>,
3753 pref_addr_cid: Option<ConnectionId>,
3754 path_validated: bool,
3755 },
3756}
3757
3758impl SideArgs {
3759 pub(crate) fn pref_addr_cid(&self) -> Option<ConnectionId> {
3760 match *self {
3761 Self::Client { .. } => None,
3762 Self::Server { pref_addr_cid, .. } => pref_addr_cid,
3763 }
3764 }
3765
3766 pub(crate) fn path_validated(&self) -> bool {
3767 match *self {
3768 Self::Client { .. } => true,
3769 Self::Server { path_validated, .. } => path_validated,
3770 }
3771 }
3772
3773 pub(crate) fn side(&self) -> Side {
3774 match *self {
3775 Self::Client { .. } => Side::Client,
3776 Self::Server { .. } => Side::Server,
3777 }
3778 }
3779}
3780
3781#[derive(Debug, Error, Clone, PartialEq, Eq)]
3783pub enum ConnectionError {
3784 #[error("peer doesn't implement any supported version")]
3786 VersionMismatch,
3787 #[error(transparent)]
3789 TransportError(#[from] TransportError),
3790 #[error("aborted by peer: {0}")]
3792 ConnectionClosed(frame::ConnectionClose),
3793 #[error("closed by peer: {0}")]
3795 ApplicationClosed(frame::ApplicationClose),
3796 #[error("reset by peer")]
3798 Reset,
3799 #[error("timed out")]
3805 TimedOut,
3806 #[error("closed")]
3808 LocallyClosed,
3809 #[error("CIDs exhausted")]
3813 CidsExhausted,
3814}
3815
3816impl From<Close> for ConnectionError {
3817 fn from(x: Close) -> Self {
3818 match x {
3819 Close::Connection(reason) => Self::ConnectionClosed(reason),
3820 Close::Application(reason) => Self::ApplicationClosed(reason),
3821 }
3822 }
3823}
3824
3825impl From<ConnectionError> for io::Error {
3827 fn from(x: ConnectionError) -> Self {
3828 use ConnectionError::*;
3829 let kind = match x {
3830 TimedOut => io::ErrorKind::TimedOut,
3831 Reset => io::ErrorKind::ConnectionReset,
3832 ApplicationClosed(_) | ConnectionClosed(_) => io::ErrorKind::ConnectionAborted,
3833 TransportError(_) | VersionMismatch | LocallyClosed | CidsExhausted => {
3834 io::ErrorKind::Other
3835 }
3836 };
3837 Self::new(kind, x)
3838 }
3839}
3840
3841#[allow(unreachable_pub)] #[derive(Clone)]
3843pub enum State {
3844 Handshake(state::Handshake),
3845 Established,
3846 Closed(state::Closed),
3847 Draining,
3848 Drained,
3850}
3851
3852impl State {
3853 fn closed<R: Into<Close>>(reason: R) -> Self {
3854 Self::Closed(state::Closed {
3855 reason: reason.into(),
3856 })
3857 }
3858
3859 fn is_handshake(&self) -> bool {
3860 matches!(*self, Self::Handshake(_))
3861 }
3862
3863 fn is_established(&self) -> bool {
3864 matches!(*self, Self::Established)
3865 }
3866
3867 fn is_closed(&self) -> bool {
3868 matches!(*self, Self::Closed(_) | Self::Draining | Self::Drained)
3869 }
3870
3871 fn is_drained(&self) -> bool {
3872 matches!(*self, Self::Drained)
3873 }
3874}
3875
3876mod state {
3877 use super::*;
3878
3879 #[allow(unreachable_pub)] #[derive(Clone)]
3881 pub struct Handshake {
3882 pub(super) rem_cid_set: bool,
3886 pub(super) expected_token: Bytes,
3890 pub(super) client_hello: Option<Bytes>,
3894 }
3895
3896 #[allow(unreachable_pub)] #[derive(Clone)]
3898 pub struct Closed {
3899 pub(super) reason: Close,
3900 }
3901}
3902
3903#[derive(Debug)]
3905pub enum Event {
3906 HandshakeDataReady,
3908 Connected,
3910 ConnectionLost {
3914 reason: ConnectionError,
3916 },
3917 Stream(StreamEvent),
3919 DatagramReceived,
3921 DatagramsUnblocked,
3923}
3924
3925fn instant_saturating_sub(x: Instant, y: Instant) -> Duration {
3926 if x > y { x - y } else { Duration::ZERO }
3927}
3928
3929fn get_max_ack_delay(params: &TransportParameters) -> Duration {
3930 Duration::from_micros(params.max_ack_delay.0 * 1000)
3931}
3932
3933const MAX_BACKOFF_EXPONENT: u32 = 16;
3935
3936const MIN_PACKET_SPACE: usize = MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE + 32;
3944
3945const MAX_HANDSHAKE_OR_0RTT_HEADER_SIZE: usize =
3951 1 + 4 + 1 + MAX_CID_SIZE + 1 + MAX_CID_SIZE + VarInt::from_u32(u16::MAX as u32).size() + 4;
3952
3953const KEY_UPDATE_MARGIN: u64 = 10_000;
3957
3958#[derive(Default)]
3959struct SentFrames {
3960 retransmits: ThinRetransmits,
3961 largest_acked: Option<u64>,
3962 stream_frames: StreamMetaVec,
3963 non_retransmits: bool,
3965 requires_padding: bool,
3966}
3967
3968impl SentFrames {
3969 fn is_ack_only(&self, streams: &StreamsState) -> bool {
3971 self.largest_acked.is_some()
3972 && !self.non_retransmits
3973 && self.stream_frames.is_empty()
3974 && self.retransmits.is_empty(streams)
3975 }
3976}
3977
3978fn negotiate_max_idle_timeout(x: Option<VarInt>, y: Option<VarInt>) -> Option<Duration> {
3986 match (x, y) {
3987 (Some(VarInt(0)) | None, Some(VarInt(0)) | None) => None,
3988 (Some(VarInt(0)) | None, Some(y)) => Some(Duration::from_millis(y.0)),
3989 (Some(x), Some(VarInt(0)) | None) => Some(Duration::from_millis(x.0)),
3990 (Some(x), Some(y)) => Some(Duration::from_millis(cmp::min(x, y).0)),
3991 }
3992}
3993
3994#[cfg(test)]
3995mod tests {
3996 use super::*;
3997
3998 #[test]
3999 fn negotiate_max_idle_timeout_commutative() {
4000 let test_params = [
4001 (None, None, None),
4002 (None, Some(VarInt(0)), None),
4003 (None, Some(VarInt(2)), Some(Duration::from_millis(2))),
4004 (Some(VarInt(0)), Some(VarInt(0)), None),
4005 (
4006 Some(VarInt(2)),
4007 Some(VarInt(0)),
4008 Some(Duration::from_millis(2)),
4009 ),
4010 (
4011 Some(VarInt(1)),
4012 Some(VarInt(4)),
4013 Some(Duration::from_millis(1)),
4014 ),
4015 ];
4016
4017 for (left, right, result) in test_params {
4018 assert_eq!(negotiate_max_idle_timeout(left, right), result);
4019 assert_eq!(negotiate_max_idle_timeout(right, left), result);
4020 }
4021 }
4022}