1use std::{
2 collections::{HashMap, hash_map},
3 convert::TryFrom,
4 fmt, mem,
5 net::{IpAddr, SocketAddr},
6 ops::{Index, IndexMut},
7 sync::Arc,
8};
9
10use bytes::{BufMut, Bytes, BytesMut};
11use rand::{Rng, RngCore, SeedableRng, rngs::StdRng};
12use rustc_hash::FxHashMap;
13use slab::Slab;
14use thiserror::Error;
15use tracing::{debug, error, trace, warn};
16
17use crate::{
18 Duration, INITIAL_MTU, Instant, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, ResetToken,
19 Side, Transmit, TransportConfig, TransportError,
20 cid_generator::ConnectionIdGenerator,
21 coding::BufMutExt,
22 config::{ClientConfig, EndpointConfig, ServerConfig},
23 connection::{Connection, ConnectionError, SideArgs},
24 crypto::{self, Keys, UnsupportedVersion},
25 frame,
26 packet::{
27 FixedLengthConnectionIdParser, Header, InitialHeader, InitialPacket, PacketDecodeError,
28 PacketNumber, PartialDecode, ProtectedInitialHeader,
29 },
30 shared::{
31 ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
32 EndpointEvent, EndpointEventInner, IssuedCid,
33 },
34 token::{IncomingToken, InvalidRetryTokenError, Token, TokenPayload},
35 transport_parameters::{PreferredAddress, TransportParameters},
36};
37
38pub struct Endpoint {
43 rng: StdRng,
44 index: ConnectionIndex,
45 connections: Slab<ConnectionMeta>,
46 local_cid_generator: Box<dyn ConnectionIdGenerator>,
47 config: Arc<EndpointConfig>,
48 server_config: Option<Arc<ServerConfig>>,
49 allow_mtud: bool,
51 last_stateless_reset: Option<Instant>,
53 incoming_buffers: Slab<IncomingBuffer>,
55 all_incoming_buffers_total_bytes: u64,
56}
57
58impl Endpoint {
59 pub fn new(
70 config: Arc<EndpointConfig>,
71 server_config: Option<Arc<ServerConfig>>,
72 allow_mtud: bool,
73 rng_seed: Option<[u8; 32]>,
74 ) -> Self {
75 let rng_seed = rng_seed.or(config.rng_seed);
76 Self {
77 rng: rng_seed.map_or(StdRng::from_os_rng(), StdRng::from_seed),
78 index: ConnectionIndex::default(),
79 connections: Slab::new(),
80 local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
81 config,
82 server_config,
83 allow_mtud,
84 last_stateless_reset: None,
85 incoming_buffers: Slab::new(),
86 all_incoming_buffers_total_bytes: 0,
87 }
88 }
89
90 pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
92 self.server_config = server_config;
93 }
94
95 pub fn handle_event(
99 &mut self,
100 ch: ConnectionHandle,
101 event: EndpointEvent,
102 ) -> Option<ConnectionEvent> {
103 use EndpointEventInner::*;
104 match event.0 {
105 NeedIdentifiers(now, n) => {
106 return Some(self.send_new_identifiers(now, ch, n));
107 }
108 ResetToken(remote, token) => {
109 if let Some(old) = self.connections[ch].reset_token.replace((remote, token)) {
110 self.index.connection_reset_tokens.remove(old.0, old.1);
111 }
112 if self.index.connection_reset_tokens.insert(remote, token, ch) {
113 warn!("duplicate reset token");
114 }
115 }
116 RetireConnectionId(now, seq, allow_more_cids) => {
117 if let Some(cid) = self.connections[ch].loc_cids.remove(&seq) {
118 trace!("peer retired CID {}: {}", seq, cid);
119 self.index.retire(cid);
120 if allow_more_cids {
121 return Some(self.send_new_identifiers(now, ch, 1));
122 }
123 }
124 }
125 Drained => {
126 if let Some(conn) = self.connections.try_remove(ch.0) {
127 self.index.remove(&conn);
128 } else {
129 error!(id = ch.0, "unknown connection drained");
133 }
134 }
135 }
136 None
137 }
138
139 pub fn handle(
141 &mut self,
142 now: Instant,
143 remote: SocketAddr,
144 local_ip: Option<IpAddr>,
145 ecn: Option<EcnCodepoint>,
146 data: BytesMut,
147 buf: &mut Vec<u8>,
148 ) -> Option<DatagramEvent> {
149 let datagram_len = data.len();
151 let event = match PartialDecode::new(
152 data,
153 &FixedLengthConnectionIdParser::new(self.local_cid_generator.cid_len()),
154 &self.config.supported_versions,
155 self.config.grease_quic_bit,
156 ) {
157 Ok((first_decode, remaining)) => DatagramConnectionEvent {
158 now,
159 remote,
160 ecn,
161 first_decode,
162 remaining,
163 },
164 Err(PacketDecodeError::UnsupportedVersion {
165 src_cid,
166 dst_cid,
167 version,
168 }) => {
169 if self.server_config.is_none() {
170 debug!("dropping packet with unsupported version");
171 return None;
172 }
173 trace!("sending version negotiation");
174 Header::VersionNegotiate {
176 random: self.rng.random::<u8>() | 0x40,
177 src_cid: dst_cid,
178 dst_cid: src_cid,
179 }
180 .encode(buf);
181 buf.write::<u32>(match version {
183 0x0a1a_2a3a => 0x0a1a_2a4a,
184 _ => 0x0a1a_2a3a,
185 });
186 for &version in &self.config.supported_versions {
187 buf.write(version);
188 }
189 return Some(DatagramEvent::Response(Transmit {
190 destination: remote,
191 ecn: None,
192 size: buf.len(),
193 segment_size: None,
194 src_ip: local_ip,
195 }));
196 }
197 Err(e) => {
198 trace!("malformed header: {}", e);
199 return None;
200 }
201 };
202
203 let addresses = FourTuple { remote, local_ip };
204 let dst_cid = event.first_decode.dst_cid();
205
206 if let Some(route_to) = self.index.get(&addresses, &event.first_decode) {
207 match route_to {
209 RouteDatagramTo::Incoming(incoming_idx) => {
210 let incoming_buffer = &mut self.incoming_buffers[incoming_idx];
211 let config = &self.server_config.as_ref().unwrap();
212
213 if incoming_buffer
214 .total_bytes
215 .checked_add(datagram_len as u64)
216 .is_some_and(|n| n <= config.incoming_buffer_size)
217 && self
218 .all_incoming_buffers_total_bytes
219 .checked_add(datagram_len as u64)
220 .is_some_and(|n| n <= config.incoming_buffer_size_total)
221 {
222 incoming_buffer.datagrams.push(event);
223 incoming_buffer.total_bytes += datagram_len as u64;
224 self.all_incoming_buffers_total_bytes += datagram_len as u64;
225 }
226
227 None
228 }
229 RouteDatagramTo::Connection(ch) => Some(DatagramEvent::ConnectionEvent(
230 ch,
231 ConnectionEvent(ConnectionEventInner::Datagram(event)),
232 )),
233 }
234 } else if event.first_decode.initial_header().is_some() {
235 self.handle_first_packet(datagram_len, event, addresses, buf)
238 } else if event.first_decode.has_long_header() {
239 debug!(
240 "ignoring non-initial packet for unknown connection {}",
241 dst_cid
242 );
243 None
244 } else if !event.first_decode.is_initial()
245 && self.local_cid_generator.validate(dst_cid).is_err()
246 {
247 debug!("dropping packet with invalid CID");
251 None
252 } else if dst_cid.is_empty() {
253 trace!("dropping unrecognized short packet without ID");
254 None
255 } else {
256 self.stateless_reset(now, datagram_len, addresses, *dst_cid, buf)
257 .map(DatagramEvent::Response)
258 }
259 }
260
261 fn stateless_reset(
262 &mut self,
263 now: Instant,
264 inciting_dgram_len: usize,
265 addresses: FourTuple,
266 dst_cid: ConnectionId,
267 buf: &mut Vec<u8>,
268 ) -> Option<Transmit> {
269 if self
270 .last_stateless_reset
271 .is_some_and(|last| last + self.config.min_reset_interval > now)
272 {
273 debug!("ignoring unexpected packet within minimum stateless reset interval");
274 return None;
275 }
276
277 const MIN_PADDING_LEN: usize = 5;
279
280 let max_padding_len = match inciting_dgram_len.checked_sub(RESET_TOKEN_SIZE) {
283 Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
284 _ => {
285 debug!(
286 "ignoring unexpected {} byte packet: not larger than minimum stateless reset size",
287 inciting_dgram_len
288 );
289 return None;
290 }
291 };
292
293 debug!(
294 "sending stateless reset for {} to {}",
295 dst_cid, addresses.remote
296 );
297 self.last_stateless_reset = Some(now);
298 const IDEAL_MIN_PADDING_LEN: usize = MIN_PADDING_LEN + MAX_CID_SIZE;
300 let padding_len = if max_padding_len <= IDEAL_MIN_PADDING_LEN {
301 max_padding_len
302 } else {
303 self.rng
304 .random_range(IDEAL_MIN_PADDING_LEN..max_padding_len)
305 };
306 buf.reserve(padding_len + RESET_TOKEN_SIZE);
307 buf.resize(padding_len, 0);
308 self.rng.fill_bytes(&mut buf[0..padding_len]);
309 buf[0] = 0b0100_0000 | (buf[0] >> 2);
310 buf.extend_from_slice(&ResetToken::new(&*self.config.reset_key, dst_cid));
311
312 debug_assert!(buf.len() < inciting_dgram_len);
313
314 Some(Transmit {
315 destination: addresses.remote,
316 ecn: None,
317 size: buf.len(),
318 segment_size: None,
319 src_ip: addresses.local_ip,
320 })
321 }
322
323 pub fn connect(
325 &mut self,
326 now: Instant,
327 config: ClientConfig,
328 remote: SocketAddr,
329 server_name: &str,
330 ) -> Result<(ConnectionHandle, Connection), ConnectError> {
331 if self.cids_exhausted() {
332 return Err(ConnectError::CidsExhausted);
333 }
334 if remote.port() == 0 || remote.ip().is_unspecified() {
335 return Err(ConnectError::InvalidRemoteAddress(remote));
336 }
337 if !self.config.supported_versions.contains(&config.version) {
338 return Err(ConnectError::UnsupportedVersion);
339 }
340
341 let remote_id = (config.initial_dst_cid_provider)();
342 trace!(initial_dcid = %remote_id);
343
344 let ch = ConnectionHandle(self.connections.vacant_key());
345 let loc_cid = self.new_cid(ch);
346 let params = TransportParameters::new(
347 &config.transport,
348 &self.config,
349 self.local_cid_generator.as_ref(),
350 loc_cid,
351 None,
352 &mut self.rng,
353 );
354 let tls = config
355 .crypto
356 .start_session(config.version, server_name, ¶ms)?;
357
358 let conn = self.add_connection(
359 ch,
360 config.version,
361 remote_id,
362 loc_cid,
363 remote_id,
364 FourTuple {
365 remote,
366 local_ip: None,
367 },
368 now,
369 tls,
370 config.transport,
371 SideArgs::Client {
372 token_store: config.token_store,
373 server_name: server_name.into(),
374 },
375 );
376 Ok((ch, conn))
377 }
378
379 fn send_new_identifiers(
380 &mut self,
381 now: Instant,
382 ch: ConnectionHandle,
383 num: u64,
384 ) -> ConnectionEvent {
385 let mut ids = vec![];
386 for _ in 0..num {
387 let id = self.new_cid(ch);
388 let meta = &mut self.connections[ch];
389 let sequence = meta.cids_issued;
390 meta.cids_issued += 1;
391 meta.loc_cids.insert(sequence, id);
392 ids.push(IssuedCid {
393 sequence,
394 id,
395 reset_token: ResetToken::new(&*self.config.reset_key, id),
396 });
397 }
398 ConnectionEvent(ConnectionEventInner::NewIdentifiers(ids, now))
399 }
400
401 fn new_cid(&mut self, ch: ConnectionHandle) -> ConnectionId {
403 loop {
404 let cid = self.local_cid_generator.generate_cid();
405 if cid.is_empty() {
406 debug_assert_eq!(self.local_cid_generator.cid_len(), 0);
408 return cid;
409 }
410 if let hash_map::Entry::Vacant(e) = self.index.connection_ids.entry(cid) {
411 e.insert(ch);
412 break cid;
413 }
414 }
415 }
416
417 fn handle_first_packet(
418 &mut self,
419 datagram_len: usize,
420 event: DatagramConnectionEvent,
421 addresses: FourTuple,
422 buf: &mut Vec<u8>,
423 ) -> Option<DatagramEvent> {
424 let dst_cid = event.first_decode.dst_cid();
425 let header = event.first_decode.initial_header().unwrap();
426
427 let Some(server_config) = &self.server_config else {
428 debug!("packet for unrecognized connection {}", dst_cid);
429 return self
430 .stateless_reset(event.now, datagram_len, addresses, *dst_cid, buf)
431 .map(DatagramEvent::Response);
432 };
433
434 if datagram_len < MIN_INITIAL_SIZE as usize {
435 debug!("ignoring short initial for connection {}", dst_cid);
436 return None;
437 }
438
439 let crypto = match server_config.crypto.initial_keys(header.version, dst_cid) {
440 Ok(keys) => keys,
441 Err(UnsupportedVersion) => {
442 debug!(
445 "ignoring initial packet version {:#x} unsupported by cryptographic layer",
446 header.version
447 );
448 return None;
449 }
450 };
451
452 if let Err(reason) = self.early_validate_first_packet(header) {
453 return Some(DatagramEvent::Response(self.initial_close(
454 header.version,
455 addresses,
456 &crypto,
457 &header.src_cid,
458 reason,
459 buf,
460 )));
461 }
462
463 let packet = match event.first_decode.finish(Some(&*crypto.header.remote)) {
464 Ok(packet) => packet,
465 Err(e) => {
466 trace!("unable to decode initial packet: {}", e);
467 return None;
468 }
469 };
470
471 if !packet.reserved_bits_valid() {
472 debug!("dropping connection attempt with invalid reserved bits");
473 return None;
474 }
475
476 let Header::Initial(header) = packet.header else {
477 panic!("non-initial packet in handle_first_packet()");
478 };
479
480 let server_config = self.server_config.as_ref().unwrap().clone();
481
482 let token = match IncomingToken::from_header(&header, &server_config, addresses.remote) {
483 Ok(token) => token,
484 Err(InvalidRetryTokenError) => {
485 debug!("rejecting invalid retry token");
486 return Some(DatagramEvent::Response(self.initial_close(
487 header.version,
488 addresses,
489 &crypto,
490 &header.src_cid,
491 TransportError::INVALID_TOKEN(""),
492 buf,
493 )));
494 }
495 };
496
497 let incoming_idx = self.incoming_buffers.insert(IncomingBuffer::default());
498 self.index
499 .insert_initial_incoming(header.dst_cid, incoming_idx);
500
501 Some(DatagramEvent::NewConnection(Incoming {
502 received_at: event.now,
503 addresses,
504 ecn: event.ecn,
505 packet: InitialPacket {
506 header,
507 header_data: packet.header_data,
508 payload: packet.payload,
509 },
510 rest: event.remaining,
511 crypto,
512 token,
513 incoming_idx,
514 improper_drop_warner: IncomingImproperDropWarner,
515 }))
516 }
517
518 pub fn accept(
520 &mut self,
521 mut incoming: Incoming,
522 now: Instant,
523 buf: &mut Vec<u8>,
524 server_config: Option<Arc<ServerConfig>>,
525 ) -> Result<(ConnectionHandle, Connection), AcceptError> {
526 let remote_address_validated = incoming.remote_address_validated();
527 incoming.improper_drop_warner.dismiss();
528 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
529 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
530
531 let packet_number = incoming.packet.header.number.expand(0);
532 let InitialHeader {
533 src_cid,
534 dst_cid,
535 version,
536 ..
537 } = incoming.packet.header;
538 let server_config =
539 server_config.unwrap_or_else(|| self.server_config.as_ref().unwrap().clone());
540
541 if server_config
542 .transport
543 .max_idle_timeout
544 .is_some_and(|timeout| {
545 incoming.received_at + Duration::from_millis(timeout.into()) <= now
546 })
547 {
548 debug!("abandoning accept of stale initial");
549 self.index.remove_initial(dst_cid);
550 return Err(AcceptError {
551 cause: ConnectionError::TimedOut,
552 response: None,
553 });
554 }
555
556 if self.cids_exhausted() {
557 debug!("refusing connection");
558 self.index.remove_initial(dst_cid);
559 return Err(AcceptError {
560 cause: ConnectionError::CidsExhausted,
561 response: Some(self.initial_close(
562 version,
563 incoming.addresses,
564 &incoming.crypto,
565 &src_cid,
566 TransportError::CONNECTION_REFUSED(""),
567 buf,
568 )),
569 });
570 }
571
572 if incoming
573 .crypto
574 .packet
575 .remote
576 .decrypt(
577 packet_number,
578 &incoming.packet.header_data,
579 &mut incoming.packet.payload,
580 )
581 .is_err()
582 {
583 debug!(packet_number, "failed to authenticate initial packet");
584 self.index.remove_initial(dst_cid);
585 return Err(AcceptError {
586 cause: TransportError::PROTOCOL_VIOLATION("authentication failed").into(),
587 response: None,
588 });
589 };
590
591 let ch = ConnectionHandle(self.connections.vacant_key());
592 let loc_cid = self.new_cid(ch);
593 let mut params = TransportParameters::new(
594 &server_config.transport,
595 &self.config,
596 self.local_cid_generator.as_ref(),
597 loc_cid,
598 Some(&server_config),
599 &mut self.rng,
600 );
601 params.stateless_reset_token = Some(ResetToken::new(&*self.config.reset_key, loc_cid));
602 params.original_dst_cid = Some(incoming.token.orig_dst_cid);
603 params.retry_src_cid = incoming.token.retry_src_cid;
604 let mut pref_addr_cid = None;
605 if server_config.preferred_address_v4.is_some()
606 || server_config.preferred_address_v6.is_some()
607 {
608 let cid = self.new_cid(ch);
609 pref_addr_cid = Some(cid);
610 params.preferred_address = Some(PreferredAddress {
611 address_v4: server_config.preferred_address_v4,
612 address_v6: server_config.preferred_address_v6,
613 connection_id: cid,
614 stateless_reset_token: ResetToken::new(&*self.config.reset_key, cid),
615 });
616 }
617
618 let tls = server_config.crypto.clone().start_session(version, ¶ms);
619 let transport_config = server_config.transport.clone();
620 let mut conn = self.add_connection(
621 ch,
622 version,
623 dst_cid,
624 loc_cid,
625 src_cid,
626 incoming.addresses,
627 incoming.received_at,
628 tls,
629 transport_config,
630 SideArgs::Server {
631 server_config,
632 pref_addr_cid,
633 path_validated: remote_address_validated,
634 },
635 );
636 self.index.insert_initial(dst_cid, ch);
637
638 match conn.handle_first_packet(
639 incoming.received_at,
640 incoming.addresses.remote,
641 incoming.ecn,
642 packet_number,
643 incoming.packet,
644 incoming.rest,
645 ) {
646 Ok(()) => {
647 trace!(id = ch.0, icid = %dst_cid, "new connection");
648
649 for event in incoming_buffer.datagrams {
650 conn.handle_event(ConnectionEvent(ConnectionEventInner::Datagram(event)))
651 }
652
653 Ok((ch, conn))
654 }
655 Err(e) => {
656 debug!("handshake failed: {}", e);
657 self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
658 let response = match e {
659 ConnectionError::TransportError(ref e) => Some(self.initial_close(
660 version,
661 incoming.addresses,
662 &incoming.crypto,
663 &src_cid,
664 e.clone(),
665 buf,
666 )),
667 _ => None,
668 };
669 Err(AcceptError { cause: e, response })
670 }
671 }
672 }
673
674 fn early_validate_first_packet(
676 &mut self,
677 header: &ProtectedInitialHeader,
678 ) -> Result<(), TransportError> {
679 let config = &self.server_config.as_ref().unwrap();
680 if self.cids_exhausted() || self.incoming_buffers.len() >= config.max_incoming {
681 return Err(TransportError::CONNECTION_REFUSED(""));
682 }
683
684 if header.dst_cid.len() < 8
689 && (header.token_pos.is_empty()
690 || header.dst_cid.len() != self.local_cid_generator.cid_len())
691 {
692 debug!(
693 "rejecting connection due to invalid DCID length {}",
694 header.dst_cid.len()
695 );
696 return Err(TransportError::PROTOCOL_VIOLATION(
697 "invalid destination CID length",
698 ));
699 }
700
701 Ok(())
702 }
703
704 pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Transmit {
706 self.clean_up_incoming(&incoming);
707 incoming.improper_drop_warner.dismiss();
708
709 self.initial_close(
710 incoming.packet.header.version,
711 incoming.addresses,
712 &incoming.crypto,
713 &incoming.packet.header.src_cid,
714 TransportError::CONNECTION_REFUSED(""),
715 buf,
716 )
717 }
718
719 pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec<u8>) -> Result<Transmit, RetryError> {
723 if !incoming.may_retry() {
724 return Err(RetryError(incoming));
725 }
726
727 self.clean_up_incoming(&incoming);
728 incoming.improper_drop_warner.dismiss();
729
730 let server_config = self.server_config.as_ref().unwrap();
731
732 let loc_cid = self.local_cid_generator.generate_cid();
739
740 let payload = TokenPayload::Retry {
741 address: incoming.addresses.remote,
742 orig_dst_cid: incoming.packet.header.dst_cid,
743 issued: server_config.time_source.now(),
744 };
745 let token = Token::new(payload, &mut self.rng).encode(&*server_config.token_key);
746
747 let header = Header::Retry {
748 src_cid: loc_cid,
749 dst_cid: incoming.packet.header.src_cid,
750 version: incoming.packet.header.version,
751 };
752
753 let encode = header.encode(buf);
754 buf.put_slice(&token);
755 buf.extend_from_slice(&server_config.crypto.retry_tag(
756 incoming.packet.header.version,
757 &incoming.packet.header.dst_cid,
758 buf,
759 ));
760 encode.finish(buf, &*incoming.crypto.header.local, None);
761
762 Ok(Transmit {
763 destination: incoming.addresses.remote,
764 ecn: None,
765 size: buf.len(),
766 segment_size: None,
767 src_ip: incoming.addresses.local_ip,
768 })
769 }
770
771 pub fn ignore(&mut self, incoming: Incoming) {
776 self.clean_up_incoming(&incoming);
777 incoming.improper_drop_warner.dismiss();
778 }
779
780 fn clean_up_incoming(&mut self, incoming: &Incoming) {
782 self.index.remove_initial(incoming.packet.header.dst_cid);
783 let incoming_buffer = self.incoming_buffers.remove(incoming.incoming_idx);
784 self.all_incoming_buffers_total_bytes -= incoming_buffer.total_bytes;
785 }
786
787 fn add_connection(
788 &mut self,
789 ch: ConnectionHandle,
790 version: u32,
791 init_cid: ConnectionId,
792 loc_cid: ConnectionId,
793 rem_cid: ConnectionId,
794 addresses: FourTuple,
795 now: Instant,
796 tls: Box<dyn crypto::Session>,
797 transport_config: Arc<TransportConfig>,
798 side_args: SideArgs,
799 ) -> Connection {
800 let mut rng_seed = [0; 32];
801 self.rng.fill_bytes(&mut rng_seed);
802 let side = side_args.side();
803 let pref_addr_cid = side_args.pref_addr_cid();
804 let conn = Connection::new(
805 self.config.clone(),
806 transport_config,
807 init_cid,
808 loc_cid,
809 rem_cid,
810 addresses.remote,
811 addresses.local_ip,
812 tls,
813 self.local_cid_generator.as_ref(),
814 now,
815 version,
816 self.allow_mtud,
817 rng_seed,
818 side_args,
819 );
820
821 let mut cids_issued = 0;
822 let mut loc_cids = FxHashMap::default();
823
824 loc_cids.insert(cids_issued, loc_cid);
825 cids_issued += 1;
826
827 if let Some(cid) = pref_addr_cid {
828 debug_assert_eq!(cids_issued, 1, "preferred address cid seq must be 1");
829 loc_cids.insert(cids_issued, cid);
830 cids_issued += 1;
831 }
832
833 let id = self.connections.insert(ConnectionMeta {
834 init_cid,
835 cids_issued,
836 loc_cids,
837 addresses,
838 side,
839 reset_token: None,
840 });
841 debug_assert_eq!(id, ch.0, "connection handle allocation out of sync");
842
843 self.index.insert_conn(addresses, loc_cid, ch, side);
844
845 conn
846 }
847
848 fn initial_close(
849 &mut self,
850 version: u32,
851 addresses: FourTuple,
852 crypto: &Keys,
853 remote_id: &ConnectionId,
854 reason: TransportError,
855 buf: &mut Vec<u8>,
856 ) -> Transmit {
857 let local_id = self.local_cid_generator.generate_cid();
861 let number = PacketNumber::U8(0);
862 let header = Header::Initial(InitialHeader {
863 dst_cid: *remote_id,
864 src_cid: local_id,
865 number,
866 token: Bytes::new(),
867 version,
868 });
869
870 let partial_encode = header.encode(buf);
871 let max_len =
872 INITIAL_MTU as usize - partial_encode.header_len - crypto.packet.local.tag_len();
873 frame::Close::from(reason).encode(buf, max_len);
874 buf.resize(buf.len() + crypto.packet.local.tag_len(), 0);
875 partial_encode.finish(buf, &*crypto.header.local, Some((0, &*crypto.packet.local)));
876 Transmit {
877 destination: addresses.remote,
878 ecn: None,
879 size: buf.len(),
880 segment_size: None,
881 src_ip: addresses.local_ip,
882 }
883 }
884
885 pub fn config(&self) -> &EndpointConfig {
887 &self.config
888 }
889
890 pub fn open_connections(&self) -> usize {
892 self.connections.len()
893 }
894
895 pub fn incoming_buffer_bytes(&self) -> u64 {
898 self.all_incoming_buffers_total_bytes
899 }
900
901 #[cfg(test)]
902 pub(crate) fn known_connections(&self) -> usize {
903 let x = self.connections.len();
904 debug_assert_eq!(x, self.index.connection_ids_initial.len());
905 debug_assert!(x >= self.index.connection_reset_tokens.0.len());
907 debug_assert!(x >= self.index.incoming_connection_remotes.len());
909 debug_assert!(x >= self.index.outgoing_connection_remotes.len());
910 x
911 }
912
913 #[cfg(test)]
914 pub(crate) fn known_cids(&self) -> usize {
915 self.index.connection_ids.len()
916 }
917
918 fn cids_exhausted(&self) -> bool {
923 self.local_cid_generator.cid_len() <= 4
924 && self.local_cid_generator.cid_len() != 0
925 && (2usize.pow(self.local_cid_generator.cid_len() as u32 * 8)
926 - self.index.connection_ids.len())
927 < 2usize.pow(self.local_cid_generator.cid_len() as u32 * 8 - 2)
928 }
929}
930
931impl fmt::Debug for Endpoint {
932 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
933 fmt.debug_struct("Endpoint")
934 .field("rng", &self.rng)
935 .field("index", &self.index)
936 .field("connections", &self.connections)
937 .field("config", &self.config)
938 .field("server_config", &self.server_config)
939 .field("incoming_buffers.len", &self.incoming_buffers.len())
941 .field(
942 "all_incoming_buffers_total_bytes",
943 &self.all_incoming_buffers_total_bytes,
944 )
945 .finish()
946 }
947}
948
949#[derive(Default)]
951struct IncomingBuffer {
952 datagrams: Vec<DatagramConnectionEvent>,
953 total_bytes: u64,
954}
955
956#[derive(Copy, Clone, Debug)]
958enum RouteDatagramTo {
959 Incoming(usize),
960 Connection(ConnectionHandle),
961}
962
963#[derive(Default, Debug)]
965struct ConnectionIndex {
966 connection_ids_initial: HashMap<ConnectionId, RouteDatagramTo>,
972 connection_ids: FxHashMap<ConnectionId, ConnectionHandle>,
976 incoming_connection_remotes: HashMap<FourTuple, ConnectionHandle>,
980 outgoing_connection_remotes: HashMap<SocketAddr, ConnectionHandle>,
989 connection_reset_tokens: ResetTokenTable,
994}
995
996impl ConnectionIndex {
997 fn insert_initial_incoming(&mut self, dst_cid: ConnectionId, incoming_key: usize) {
999 if dst_cid.is_empty() {
1000 return;
1001 }
1002 self.connection_ids_initial
1003 .insert(dst_cid, RouteDatagramTo::Incoming(incoming_key));
1004 }
1005
1006 fn remove_initial(&mut self, dst_cid: ConnectionId) {
1008 if dst_cid.is_empty() {
1009 return;
1010 }
1011 let removed = self.connection_ids_initial.remove(&dst_cid);
1012 debug_assert!(removed.is_some());
1013 }
1014
1015 fn insert_initial(&mut self, dst_cid: ConnectionId, connection: ConnectionHandle) {
1017 if dst_cid.is_empty() {
1018 return;
1019 }
1020 self.connection_ids_initial
1021 .insert(dst_cid, RouteDatagramTo::Connection(connection));
1022 }
1023
1024 fn insert_conn(
1027 &mut self,
1028 addresses: FourTuple,
1029 dst_cid: ConnectionId,
1030 connection: ConnectionHandle,
1031 side: Side,
1032 ) {
1033 match dst_cid.len() {
1034 0 => match side {
1035 Side::Server => {
1036 self.incoming_connection_remotes
1037 .insert(addresses, connection);
1038 }
1039 Side::Client => {
1040 self.outgoing_connection_remotes
1041 .insert(addresses.remote, connection);
1042 }
1043 },
1044 _ => {
1045 self.connection_ids.insert(dst_cid, connection);
1046 }
1047 }
1048 }
1049
1050 fn retire(&mut self, dst_cid: ConnectionId) {
1052 self.connection_ids.remove(&dst_cid);
1053 }
1054
1055 fn remove(&mut self, conn: &ConnectionMeta) {
1057 if conn.side.is_server() {
1058 self.remove_initial(conn.init_cid);
1059 }
1060 for cid in conn.loc_cids.values() {
1061 self.connection_ids.remove(cid);
1062 }
1063 self.incoming_connection_remotes.remove(&conn.addresses);
1064 self.outgoing_connection_remotes
1065 .remove(&conn.addresses.remote);
1066 if let Some((remote, token)) = conn.reset_token {
1067 self.connection_reset_tokens.remove(remote, token);
1068 }
1069 }
1070
1071 fn get(&self, addresses: &FourTuple, datagram: &PartialDecode) -> Option<RouteDatagramTo> {
1073 if !datagram.dst_cid().is_empty() {
1074 if let Some(&ch) = self.connection_ids.get(datagram.dst_cid()) {
1075 return Some(RouteDatagramTo::Connection(ch));
1076 }
1077 }
1078 if datagram.is_initial() || datagram.is_0rtt() {
1079 if let Some(&ch) = self.connection_ids_initial.get(datagram.dst_cid()) {
1080 return Some(ch);
1081 }
1082 }
1083 if datagram.dst_cid().is_empty() {
1084 if let Some(&ch) = self.incoming_connection_remotes.get(addresses) {
1085 return Some(RouteDatagramTo::Connection(ch));
1086 }
1087 if let Some(&ch) = self.outgoing_connection_remotes.get(&addresses.remote) {
1088 return Some(RouteDatagramTo::Connection(ch));
1089 }
1090 }
1091 let data = datagram.data();
1092 if data.len() < RESET_TOKEN_SIZE {
1093 return None;
1094 }
1095 self.connection_reset_tokens
1096 .get(addresses.remote, &data[data.len() - RESET_TOKEN_SIZE..])
1097 .cloned()
1098 .map(RouteDatagramTo::Connection)
1099 }
1100}
1101
1102#[derive(Debug)]
1103pub(crate) struct ConnectionMeta {
1104 init_cid: ConnectionId,
1105 cids_issued: u64,
1107 loc_cids: FxHashMap<u64, ConnectionId>,
1108 addresses: FourTuple,
1113 side: Side,
1114 reset_token: Option<(SocketAddr, ResetToken)>,
1117}
1118
1119#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
1121pub struct ConnectionHandle(pub usize);
1122
1123impl From<ConnectionHandle> for usize {
1124 fn from(x: ConnectionHandle) -> Self {
1125 x.0
1126 }
1127}
1128
1129impl Index<ConnectionHandle> for Slab<ConnectionMeta> {
1130 type Output = ConnectionMeta;
1131 fn index(&self, ch: ConnectionHandle) -> &ConnectionMeta {
1132 &self[ch.0]
1133 }
1134}
1135
1136impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
1137 fn index_mut(&mut self, ch: ConnectionHandle) -> &mut ConnectionMeta {
1138 &mut self[ch.0]
1139 }
1140}
1141
1142pub enum DatagramEvent {
1144 ConnectionEvent(ConnectionHandle, ConnectionEvent),
1146 NewConnection(Incoming),
1148 Response(Transmit),
1150}
1151
1152pub struct Incoming {
1154 received_at: Instant,
1155 addresses: FourTuple,
1156 ecn: Option<EcnCodepoint>,
1157 packet: InitialPacket,
1158 rest: Option<BytesMut>,
1159 crypto: Keys,
1160 token: IncomingToken,
1161 incoming_idx: usize,
1162 improper_drop_warner: IncomingImproperDropWarner,
1163}
1164
1165impl Incoming {
1166 pub fn local_ip(&self) -> Option<IpAddr> {
1170 self.addresses.local_ip
1171 }
1172
1173 pub fn remote_address(&self) -> SocketAddr {
1175 self.addresses.remote
1176 }
1177
1178 pub fn remote_address_validated(&self) -> bool {
1186 self.token.validated
1187 }
1188
1189 pub fn may_retry(&self) -> bool {
1194 self.token.retry_src_cid.is_none()
1195 }
1196
1197 pub fn orig_dst_cid(&self) -> &ConnectionId {
1199 &self.token.orig_dst_cid
1200 }
1201}
1202
1203impl fmt::Debug for Incoming {
1204 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1205 f.debug_struct("Incoming")
1206 .field("addresses", &self.addresses)
1207 .field("ecn", &self.ecn)
1208 .field("token", &self.token)
1211 .field("incoming_idx", &self.incoming_idx)
1212 .finish_non_exhaustive()
1214 }
1215}
1216
1217struct IncomingImproperDropWarner;
1218
1219impl IncomingImproperDropWarner {
1220 fn dismiss(self) {
1221 mem::forget(self);
1222 }
1223}
1224
1225impl Drop for IncomingImproperDropWarner {
1226 fn drop(&mut self) {
1227 warn!(
1228 "quinn_proto::Incoming dropped without passing to Endpoint::accept/refuse/retry/ignore \
1229 (may cause memory leak and eventual inability to accept new connections)"
1230 );
1231 }
1232}
1233
1234#[derive(Debug, Error, Clone, PartialEq, Eq)]
1238pub enum ConnectError {
1239 #[error("endpoint stopping")]
1243 EndpointStopping,
1244 #[error("CIDs exhausted")]
1248 CidsExhausted,
1249 #[error("invalid server name: {0}")]
1251 InvalidServerName(String),
1252 #[error("invalid remote address: {0}")]
1256 InvalidRemoteAddress(SocketAddr),
1257 #[error("no default client config")]
1261 NoDefaultClientConfig,
1262 #[error("unsupported QUIC version")]
1264 UnsupportedVersion,
1265}
1266
1267#[derive(Debug)]
1269pub struct AcceptError {
1270 pub cause: ConnectionError,
1272 pub response: Option<Transmit>,
1274}
1275
1276#[derive(Debug, Error)]
1278#[error("retry() with validated Incoming")]
1279pub struct RetryError(Incoming);
1280
1281impl RetryError {
1282 pub fn into_incoming(self) -> Incoming {
1284 self.0
1285 }
1286}
1287
1288#[derive(Default, Debug)]
1293struct ResetTokenTable(HashMap<SocketAddr, HashMap<ResetToken, ConnectionHandle>>);
1294
1295impl ResetTokenTable {
1296 fn insert(&mut self, remote: SocketAddr, token: ResetToken, ch: ConnectionHandle) -> bool {
1297 self.0
1298 .entry(remote)
1299 .or_default()
1300 .insert(token, ch)
1301 .is_some()
1302 }
1303
1304 fn remove(&mut self, remote: SocketAddr, token: ResetToken) {
1305 use std::collections::hash_map::Entry;
1306 match self.0.entry(remote) {
1307 Entry::Vacant(_) => {}
1308 Entry::Occupied(mut e) => {
1309 e.get_mut().remove(&token);
1310 if e.get().is_empty() {
1311 e.remove_entry();
1312 }
1313 }
1314 }
1315 }
1316
1317 fn get(&self, remote: SocketAddr, token: &[u8]) -> Option<&ConnectionHandle> {
1318 let token = ResetToken::from(<[u8; RESET_TOKEN_SIZE]>::try_from(token).ok()?);
1319 self.0.get(&remote)?.get(&token)
1320 }
1321}
1322
1323#[derive(Hash, Eq, PartialEq, Debug, Copy, Clone)]
1328struct FourTuple {
1329 remote: SocketAddr,
1330 local_ip: Option<IpAddr>,
1332}