quinn/
connection.rs

1use std::{
2    any::Any,
3    fmt,
4    future::Future,
5    io,
6    net::{IpAddr, SocketAddr},
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll, Waker, ready},
10};
11
12use bytes::Bytes;
13use pin_project_lite::pin_project;
14use rustc_hash::FxHashMap;
15use thiserror::Error;
16use tokio::sync::{Notify, futures::Notified, mpsc, oneshot};
17use tracing::{Instrument, Span, debug_span};
18
19use crate::{
20    ConnectionEvent, Duration, Instant, VarInt,
21    mutex::Mutex,
22    recv_stream::RecvStream,
23    runtime::{AsyncTimer, AsyncUdpSocket, Runtime, UdpPoller},
24    send_stream::SendStream,
25    udp_transmit,
26};
27use proto::{
28    ConnectionError, ConnectionHandle, ConnectionStats, Dir, EndpointEvent, StreamEvent, StreamId,
29    congestion::Controller,
30};
31
32/// In-progress connection attempt future
33#[derive(Debug)]
34pub struct Connecting {
35    conn: Option<ConnectionRef>,
36    connected: oneshot::Receiver<bool>,
37    handshake_data_ready: Option<oneshot::Receiver<()>>,
38}
39
40impl Connecting {
41    pub(crate) fn new(
42        handle: ConnectionHandle,
43        conn: proto::Connection,
44        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
45        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
46        socket: Arc<dyn AsyncUdpSocket>,
47        runtime: Arc<dyn Runtime>,
48    ) -> Self {
49        let (on_handshake_data_send, on_handshake_data_recv) = oneshot::channel();
50        let (on_connected_send, on_connected_recv) = oneshot::channel();
51        let conn = ConnectionRef::new(
52            handle,
53            conn,
54            endpoint_events,
55            conn_events,
56            on_handshake_data_send,
57            on_connected_send,
58            socket,
59            runtime.clone(),
60        );
61
62        let driver = ConnectionDriver(conn.clone());
63        runtime.spawn(Box::pin(
64            async {
65                if let Err(e) = driver.await {
66                    tracing::error!("I/O error: {e}");
67                }
68            }
69            .instrument(Span::current()),
70        ));
71
72        Self {
73            conn: Some(conn),
74            connected: on_connected_recv,
75            handshake_data_ready: Some(on_handshake_data_recv),
76        }
77    }
78
79    /// Convert into a 0-RTT or 0.5-RTT connection at the cost of weakened security
80    ///
81    /// Returns `Ok` immediately if the local endpoint is able to attempt sending 0/0.5-RTT data.
82    /// If so, the returned [`Connection`] can be used to send application data without waiting for
83    /// the rest of the handshake to complete, at the cost of weakened cryptographic security
84    /// guarantees. The returned [`ZeroRttAccepted`] future resolves when the handshake does
85    /// complete, at which point subsequently opened streams and written data will have full
86    /// cryptographic protection.
87    ///
88    /// ## Outgoing
89    ///
90    /// For outgoing connections, the initial attempt to convert to a [`Connection`] which sends
91    /// 0-RTT data will proceed if the [`crypto::ClientConfig`][crate::crypto::ClientConfig]
92    /// attempts to resume a previous TLS session. However, **the remote endpoint may not actually
93    /// _accept_ the 0-RTT data**--yet still accept the connection attempt in general. This
94    /// possibility is conveyed through the [`ZeroRttAccepted`] future--when the handshake
95    /// completes, it resolves to true if the 0-RTT data was accepted and false if it was rejected.
96    /// If it was rejected, the existence of streams opened and other application data sent prior
97    /// to the handshake completing will not be conveyed to the remote application, and local
98    /// operations on them will return `ZeroRttRejected` errors.
99    ///
100    /// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
101    /// relevant resumption state to be stored in the server, which servers may limit or lose for
102    /// various reasons including not persisting resumption state across server restarts.
103    ///
104    /// If manually providing a [`crypto::ClientConfig`][crate::crypto::ClientConfig], check your
105    /// implementation's docs for 0-RTT pitfalls.
106    ///
107    /// ## Incoming
108    ///
109    /// For incoming connections, conversion to 0.5-RTT will always fully succeed. `into_0rtt` will
110    /// always return `Ok` and the [`ZeroRttAccepted`] will always resolve to true.
111    ///
112    /// If manually providing a [`crypto::ServerConfig`][crate::crypto::ServerConfig], check your
113    /// implementation's docs for 0-RTT pitfalls.
114    ///
115    /// ## Security
116    ///
117    /// On outgoing connections, this enables transmission of 0-RTT data, which is vulnerable to
118    /// replay attacks, and should therefore never invoke non-idempotent operations.
119    ///
120    /// On incoming connections, this enables transmission of 0.5-RTT data, which may be sent
121    /// before TLS client authentication has occurred, and should therefore not be used to send
122    /// data for which client authentication is being used.
123    pub fn into_0rtt(mut self) -> Result<(Connection, ZeroRttAccepted), Self> {
124        // This lock borrows `self` and would normally be dropped at the end of this scope, so we'll
125        // have to release it explicitly before returning `self` by value.
126        let conn = (self.conn.as_mut().unwrap()).state.lock("into_0rtt");
127
128        let is_ok = conn.inner.has_0rtt() || conn.inner.side().is_server();
129        drop(conn);
130
131        if is_ok {
132            let conn = self.conn.take().unwrap();
133            Ok((Connection(conn), ZeroRttAccepted(self.connected)))
134        } else {
135            Err(self)
136        }
137    }
138
139    /// Parameters negotiated during the handshake
140    ///
141    /// The dynamic type returned is determined by the configured
142    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
143    /// be [`downcast`](Box::downcast) to a
144    /// [`crypto::rustls::HandshakeData`](crate::crypto::rustls::HandshakeData).
145    pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
146        // Taking &mut self allows us to use a single oneshot channel rather than dealing with
147        // potentially many tasks waiting on the same event. It's a bit of a hack, but keeps things
148        // simple.
149        if let Some(x) = self.handshake_data_ready.take() {
150            let _ = x.await;
151        }
152        let conn = self.conn.as_ref().unwrap();
153        let inner = conn.state.lock("handshake");
154        inner
155            .inner
156            .crypto_session()
157            .handshake_data()
158            .ok_or_else(|| {
159                inner
160                    .error
161                    .clone()
162                    .expect("spurious handshake data ready notification")
163            })
164    }
165
166    /// The local IP address which was used when the peer established
167    /// the connection
168    ///
169    /// This can be different from the address the endpoint is bound to, in case
170    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
171    ///
172    /// This will return `None` for clients, or when the platform does not expose this
173    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
174    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
175    ///
176    /// Will panic if called after `poll` has returned `Ready`.
177    pub fn local_ip(&self) -> Option<IpAddr> {
178        let conn = self.conn.as_ref().unwrap();
179        let inner = conn.state.lock("local_ip");
180
181        inner.inner.local_ip()
182    }
183
184    /// The peer's UDP address
185    ///
186    /// Will panic if called after `poll` has returned `Ready`.
187    pub fn remote_address(&self) -> SocketAddr {
188        let conn_ref: &ConnectionRef = self.conn.as_ref().expect("used after yielding Ready");
189        conn_ref.state.lock("remote_address").inner.remote_address()
190    }
191}
192
193impl Future for Connecting {
194    type Output = Result<Connection, ConnectionError>;
195    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
196        Pin::new(&mut self.connected).poll(cx).map(|_| {
197            let conn = self.conn.take().unwrap();
198            let inner = conn.state.lock("connecting");
199            if inner.connected {
200                drop(inner);
201                Ok(Connection(conn))
202            } else {
203                Err(inner
204                    .error
205                    .clone()
206                    .expect("connected signaled without connection success or error"))
207            }
208        })
209    }
210}
211
212/// Future that completes when a connection is fully established
213///
214/// For clients, the resulting value indicates if 0-RTT was accepted. For servers, the resulting
215/// value is meaningless.
216pub struct ZeroRttAccepted(oneshot::Receiver<bool>);
217
218impl Future for ZeroRttAccepted {
219    type Output = bool;
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
221        Pin::new(&mut self.0).poll(cx).map(|x| x.unwrap_or(false))
222    }
223}
224
225/// A future that drives protocol logic for a connection
226///
227/// This future handles the protocol logic for a single connection, routing events from the
228/// `Connection` API object to the `Endpoint` task and the related stream-related interfaces.
229/// It also keeps track of outstanding timeouts for the `Connection`.
230///
231/// If the connection encounters an error condition, this future will yield an error. It will
232/// terminate (yielding `Ok(())`) if the connection was closed without error. Unlike other
233/// connection-related futures, this waits for the draining period to complete to ensure that
234/// packets still in flight from the peer are handled gracefully.
235#[must_use = "connection drivers must be spawned for their connections to function"]
236#[derive(Debug)]
237struct ConnectionDriver(ConnectionRef);
238
239impl Future for ConnectionDriver {
240    type Output = Result<(), io::Error>;
241
242    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
243        let conn = &mut *self.0.state.lock("poll");
244
245        let span = debug_span!("drive", id = conn.handle.0);
246        let _guard = span.enter();
247
248        if let Err(e) = conn.process_conn_events(&self.0.shared, cx) {
249            conn.terminate(e, &self.0.shared);
250            return Poll::Ready(Ok(()));
251        }
252        let mut keep_going = conn.drive_transmit(cx)?;
253        // If a timer expires, there might be more to transmit. When we transmit something, we
254        // might need to reset a timer. Hence, we must loop until neither happens.
255        keep_going |= conn.drive_timer(cx);
256        conn.forward_endpoint_events();
257        conn.forward_app_events(&self.0.shared);
258
259        if !conn.inner.is_drained() {
260            if keep_going {
261                // If the connection hasn't processed all tasks, schedule it again
262                cx.waker().wake_by_ref();
263            } else {
264                conn.driver = Some(cx.waker().clone());
265            }
266            return Poll::Pending;
267        }
268        if conn.error.is_none() {
269            unreachable!("drained connections always have an error");
270        }
271        Poll::Ready(Ok(()))
272    }
273}
274
275/// A QUIC connection.
276///
277/// If all references to a connection (including every clone of the `Connection` handle, streams of
278/// incoming streams, and the various stream types) have been dropped, then the connection will be
279/// automatically closed with an `error_code` of 0 and an empty `reason`. You can also close the
280/// connection explicitly by calling [`Connection::close()`].
281///
282/// Closing the connection immediately abandons efforts to deliver data to the peer.  Upon
283/// receiving CONNECTION_CLOSE the peer *may* drop any stream data not yet delivered to the
284/// application. [`Connection::close()`] describes in more detail how to gracefully close a
285/// connection without losing application data.
286///
287/// May be cloned to obtain another handle to the same connection.
288///
289/// [`Connection::close()`]: Connection::close
290#[derive(Debug, Clone)]
291pub struct Connection(ConnectionRef);
292
293impl Connection {
294    /// Initiate a new outgoing unidirectional stream.
295    ///
296    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
297    /// consequence, the peer won't be notified that a stream has been opened until the stream is
298    /// actually used.
299    pub fn open_uni(&self) -> OpenUni<'_> {
300        OpenUni {
301            conn: &self.0,
302            notify: self.0.shared.stream_budget_available[Dir::Uni as usize].notified(),
303        }
304    }
305
306    /// Initiate a new outgoing bidirectional stream.
307    ///
308    /// Streams are cheap and instantaneous to open unless blocked by flow control. As a
309    /// consequence, the peer won't be notified that a stream has been opened until the stream is
310    /// actually used. Calling [`open_bi()`] then waiting on the [`RecvStream`] without writing
311    /// anything to [`SendStream`] will never succeed.
312    ///
313    /// [`open_bi()`]: crate::Connection::open_bi
314    /// [`SendStream`]: crate::SendStream
315    /// [`RecvStream`]: crate::RecvStream
316    pub fn open_bi(&self) -> OpenBi<'_> {
317        OpenBi {
318            conn: &self.0,
319            notify: self.0.shared.stream_budget_available[Dir::Bi as usize].notified(),
320        }
321    }
322
323    /// Accept the next incoming uni-directional stream
324    pub fn accept_uni(&self) -> AcceptUni<'_> {
325        AcceptUni {
326            conn: &self.0,
327            notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
328        }
329    }
330
331    /// Accept the next incoming bidirectional stream
332    ///
333    /// **Important Note**: The `Connection` that calls [`open_bi()`] must write to its [`SendStream`]
334    /// before the other `Connection` is able to `accept_bi()`. Calling [`open_bi()`] then
335    /// waiting on the [`RecvStream`] without writing anything to [`SendStream`] will never succeed.
336    ///
337    /// [`accept_bi()`]: crate::Connection::accept_bi
338    /// [`open_bi()`]: crate::Connection::open_bi
339    /// [`SendStream`]: crate::SendStream
340    /// [`RecvStream`]: crate::RecvStream
341    pub fn accept_bi(&self) -> AcceptBi<'_> {
342        AcceptBi {
343            conn: &self.0,
344            notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
345        }
346    }
347
348    /// Receive an application datagram
349    pub fn read_datagram(&self) -> ReadDatagram<'_> {
350        ReadDatagram {
351            conn: &self.0,
352            notify: self.0.shared.datagram_received.notified(),
353        }
354    }
355
356    /// Wait for the connection to be closed for any reason
357    ///
358    /// Despite the return type's name, closed connections are often not an error condition at the
359    /// application layer. Cases that might be routine include [`ConnectionError::LocallyClosed`]
360    /// and [`ConnectionError::ApplicationClosed`].
361    pub async fn closed(&self) -> ConnectionError {
362        {
363            let conn = self.0.state.lock("closed");
364            if let Some(error) = conn.error.as_ref() {
365                return error.clone();
366            }
367            // Construct the future while the lock is held to ensure we can't miss a wakeup if
368            // the `Notify` is signaled immediately after we release the lock. `await` it after
369            // the lock guard is out of scope.
370            self.0.shared.closed.notified()
371        }
372        .await;
373        self.0
374            .state
375            .lock("closed")
376            .error
377            .as_ref()
378            .expect("closed without an error")
379            .clone()
380    }
381
382    /// If the connection is closed, the reason why.
383    ///
384    /// Returns `None` if the connection is still open.
385    pub fn close_reason(&self) -> Option<ConnectionError> {
386        self.0.state.lock("close_reason").error.clone()
387    }
388
389    /// Close the connection immediately.
390    ///
391    /// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
392    /// more data is sent to the peer and the peer may drop buffered data upon receiving
393    /// the CONNECTION_CLOSE frame.
394    ///
395    /// `error_code` and `reason` are not interpreted, and are provided directly to the peer.
396    ///
397    /// `reason` will be truncated to fit in a single packet with overhead; to improve odds that it
398    /// is preserved in full, it should be kept under 1KiB.
399    ///
400    /// # Gracefully closing a connection
401    ///
402    /// Only the peer last receiving application data can be certain that all data is
403    /// delivered. The only reliable action it can then take is to close the connection,
404    /// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
405    /// frame is very likely if both endpoints stay online long enough, and
406    /// [`Endpoint::wait_idle()`] can be used to provide sufficient time. Otherwise, the
407    /// remote peer will time out the connection, provided that the idle timeout is not
408    /// disabled.
409    ///
410    /// The sending side can not guarantee all stream data is delivered to the remote
411    /// application. It only knows the data is delivered to the QUIC stack of the remote
412    /// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
413    /// [`close()`] the remote endpoint may drop any data it received but is as yet
414    /// undelivered to the application, including data that was acknowledged as received to
415    /// the local endpoint.
416    ///
417    /// [`ConnectionError::LocallyClosed`]: crate::ConnectionError::LocallyClosed
418    /// [`Endpoint::wait_idle()`]: crate::Endpoint::wait_idle
419    /// [`close()`]: Connection::close
420    pub fn close(&self, error_code: VarInt, reason: &[u8]) {
421        let conn = &mut *self.0.state.lock("close");
422        conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
423    }
424
425    /// Transmit `data` as an unreliable, unordered application datagram
426    ///
427    /// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
428    /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
429    /// dictated by the peer.
430    pub fn send_datagram(&self, data: Bytes) -> Result<(), SendDatagramError> {
431        let conn = &mut *self.0.state.lock("send_datagram");
432        if let Some(ref x) = conn.error {
433            return Err(SendDatagramError::ConnectionLost(x.clone()));
434        }
435        use proto::SendDatagramError::*;
436        match conn.inner.datagrams().send(data, true) {
437            Ok(()) => {
438                conn.wake();
439                Ok(())
440            }
441            Err(e) => Err(match e {
442                Blocked(..) => unreachable!(),
443                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
444                Disabled => SendDatagramError::Disabled,
445                TooLarge => SendDatagramError::TooLarge,
446            }),
447        }
448    }
449
450    /// Transmit `data` as an unreliable, unordered application datagram
451    ///
452    /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
453    /// conditions, which effectively prioritizes old datagrams over new datagrams.
454    ///
455    /// See [`send_datagram()`] for details.
456    ///
457    /// [`send_datagram()`]: Connection::send_datagram
458    pub fn send_datagram_wait(&self, data: Bytes) -> SendDatagram<'_> {
459        SendDatagram {
460            conn: &self.0,
461            data: Some(data),
462            notify: self.0.shared.datagrams_unblocked.notified(),
463        }
464    }
465
466    /// Compute the maximum size of datagrams that may be passed to [`send_datagram()`].
467    ///
468    /// Returns `None` if datagrams are unsupported by the peer or disabled locally.
469    ///
470    /// This may change over the lifetime of a connection according to variation in the path MTU
471    /// estimate. The peer can also enforce an arbitrarily small fixed limit, but if the peer's
472    /// limit is large this is guaranteed to be a little over a kilobyte at minimum.
473    ///
474    /// Not necessarily the maximum size of received datagrams.
475    ///
476    /// [`send_datagram()`]: Connection::send_datagram
477    pub fn max_datagram_size(&self) -> Option<usize> {
478        self.0
479            .state
480            .lock("max_datagram_size")
481            .inner
482            .datagrams()
483            .max_size()
484    }
485
486    /// Bytes available in the outgoing datagram buffer
487    ///
488    /// When greater than zero, calling [`send_datagram()`](Self::send_datagram) with a datagram of
489    /// at most this size is guaranteed not to cause older datagrams to be dropped.
490    pub fn datagram_send_buffer_space(&self) -> usize {
491        self.0
492            .state
493            .lock("datagram_send_buffer_space")
494            .inner
495            .datagrams()
496            .send_buffer_space()
497    }
498
499    /// The peer's UDP address
500    ///
501    /// If `ServerConfig::migration` is `true`, clients may change addresses at will, e.g. when
502    /// switching to a cellular internet connection.
503    pub fn remote_address(&self) -> SocketAddr {
504        self.0.state.lock("remote_address").inner.remote_address()
505    }
506
507    /// The local IP address which was used when the peer established
508    /// the connection
509    ///
510    /// This can be different from the address the endpoint is bound to, in case
511    /// the endpoint is bound to a wildcard address like `0.0.0.0` or `::`.
512    ///
513    /// This will return `None` for clients, or when the platform does not expose this
514    /// information. See [`quinn_udp::RecvMeta::dst_ip`](udp::RecvMeta::dst_ip) for a list of
515    /// supported platforms when using [`quinn_udp`](udp) for I/O, which is the default.
516    pub fn local_ip(&self) -> Option<IpAddr> {
517        self.0.state.lock("local_ip").inner.local_ip()
518    }
519
520    /// Current best estimate of this connection's latency (round-trip-time)
521    pub fn rtt(&self) -> Duration {
522        self.0.state.lock("rtt").inner.rtt()
523    }
524
525    /// Returns connection statistics
526    pub fn stats(&self) -> ConnectionStats {
527        self.0.state.lock("stats").inner.stats()
528    }
529
530    /// Current state of the congestion control algorithm, for debugging purposes
531    pub fn congestion_state(&self) -> Box<dyn Controller> {
532        self.0
533            .state
534            .lock("congestion_state")
535            .inner
536            .congestion_state()
537            .clone_box()
538    }
539
540    /// Parameters negotiated during the handshake
541    ///
542    /// Guaranteed to return `Some` on fully established connections or after
543    /// [`Connecting::handshake_data()`] succeeds. See that method's documentations for details on
544    /// the returned value.
545    ///
546    /// [`Connection::handshake_data()`]: crate::Connecting::handshake_data
547    pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
548        self.0
549            .state
550            .lock("handshake_data")
551            .inner
552            .crypto_session()
553            .handshake_data()
554    }
555
556    /// Cryptographic identity of the peer
557    ///
558    /// The dynamic type returned is determined by the configured
559    /// [`Session`](proto::crypto::Session). For the default `rustls` session, the return value can
560    /// be [`downcast`](Box::downcast) to a <code>Vec<[rustls::pki_types::CertificateDer]></code>
561    pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
562        self.0
563            .state
564            .lock("peer_identity")
565            .inner
566            .crypto_session()
567            .peer_identity()
568    }
569
570    /// A stable identifier for this connection
571    ///
572    /// Peer addresses and connection IDs can change, but this value will remain
573    /// fixed for the lifetime of the connection.
574    pub fn stable_id(&self) -> usize {
575        self.0.stable_id()
576    }
577
578    /// Update traffic keys spontaneously
579    ///
580    /// This primarily exists for testing purposes.
581    pub fn force_key_update(&self) {
582        self.0
583            .state
584            .lock("force_key_update")
585            .inner
586            .force_key_update()
587    }
588
589    /// Derive keying material from this connection's TLS session secrets.
590    ///
591    /// When both peers call this method with the same `label` and `context`
592    /// arguments and `output` buffers of equal length, they will get the
593    /// same sequence of bytes in `output`. These bytes are cryptographically
594    /// strong and pseudorandom, and are suitable for use as keying material.
595    ///
596    /// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
597    pub fn export_keying_material(
598        &self,
599        output: &mut [u8],
600        label: &[u8],
601        context: &[u8],
602    ) -> Result<(), proto::crypto::ExportKeyingMaterialError> {
603        self.0
604            .state
605            .lock("export_keying_material")
606            .inner
607            .crypto_session()
608            .export_keying_material(output, label, context)
609    }
610
611    /// Modify the number of remotely initiated unidirectional streams that may be concurrently open
612    ///
613    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
614    /// `count`s increase both minimum and worst-case memory consumption.
615    pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
616        let mut conn = self.0.state.lock("set_max_concurrent_uni_streams");
617        conn.inner.set_max_concurrent_streams(Dir::Uni, count);
618        // May need to send MAX_STREAMS to make progress
619        conn.wake();
620    }
621
622    /// See [`proto::TransportConfig::receive_window()`]
623    pub fn set_receive_window(&self, receive_window: VarInt) {
624        let mut conn = self.0.state.lock("set_receive_window");
625        conn.inner.set_receive_window(receive_window);
626        conn.wake();
627    }
628
629    /// Modify the number of remotely initiated bidirectional streams that may be concurrently open
630    ///
631    /// No streams may be opened by the peer unless fewer than `count` are already open. Large
632    /// `count`s increase both minimum and worst-case memory consumption.
633    pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
634        let mut conn = self.0.state.lock("set_max_concurrent_bi_streams");
635        conn.inner.set_max_concurrent_streams(Dir::Bi, count);
636        // May need to send MAX_STREAMS to make progress
637        conn.wake();
638    }
639}
640
641pin_project! {
642    /// Future produced by [`Connection::open_uni`]
643    pub struct OpenUni<'a> {
644        conn: &'a ConnectionRef,
645        #[pin]
646        notify: Notified<'a>,
647    }
648}
649
650impl Future for OpenUni<'_> {
651    type Output = Result<SendStream, ConnectionError>;
652    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
653        let this = self.project();
654        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
655        Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
656    }
657}
658
659pin_project! {
660    /// Future produced by [`Connection::open_bi`]
661    pub struct OpenBi<'a> {
662        conn: &'a ConnectionRef,
663        #[pin]
664        notify: Notified<'a>,
665    }
666}
667
668impl Future for OpenBi<'_> {
669    type Output = Result<(SendStream, RecvStream), ConnectionError>;
670    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
671        let this = self.project();
672        let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;
673
674        Poll::Ready(Ok((
675            SendStream::new(conn.clone(), id, is_0rtt),
676            RecvStream::new(conn, id, is_0rtt),
677        )))
678    }
679}
680
681fn poll_open<'a>(
682    ctx: &mut Context<'_>,
683    conn: &'a ConnectionRef,
684    mut notify: Pin<&mut Notified<'a>>,
685    dir: Dir,
686) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
687    let mut state = conn.state.lock("poll_open");
688    if let Some(ref e) = state.error {
689        return Poll::Ready(Err(e.clone()));
690    } else if let Some(id) = state.inner.streams().open(dir) {
691        let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
692        drop(state); // Release the lock so clone can take it
693        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
694    }
695    loop {
696        match notify.as_mut().poll(ctx) {
697            // `state` lock ensures we didn't race with readiness
698            Poll::Pending => return Poll::Pending,
699            // Spurious wakeup, get a new future
700            Poll::Ready(()) => {
701                notify.set(conn.shared.stream_budget_available[dir as usize].notified())
702            }
703        }
704    }
705}
706
707pin_project! {
708    /// Future produced by [`Connection::accept_uni`]
709    pub struct AcceptUni<'a> {
710        conn: &'a ConnectionRef,
711        #[pin]
712        notify: Notified<'a>,
713    }
714}
715
716impl Future for AcceptUni<'_> {
717    type Output = Result<RecvStream, ConnectionError>;
718
719    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
720        let this = self.project();
721        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
722        Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
723    }
724}
725
726pin_project! {
727    /// Future produced by [`Connection::accept_bi`]
728    pub struct AcceptBi<'a> {
729        conn: &'a ConnectionRef,
730        #[pin]
731        notify: Notified<'a>,
732    }
733}
734
735impl Future for AcceptBi<'_> {
736    type Output = Result<(SendStream, RecvStream), ConnectionError>;
737
738    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
739        let this = self.project();
740        let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
741        Poll::Ready(Ok((
742            SendStream::new(conn.clone(), id, is_0rtt),
743            RecvStream::new(conn, id, is_0rtt),
744        )))
745    }
746}
747
748fn poll_accept<'a>(
749    ctx: &mut Context<'_>,
750    conn: &'a ConnectionRef,
751    mut notify: Pin<&mut Notified<'a>>,
752    dir: Dir,
753) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
754    let mut state = conn.state.lock("poll_accept");
755    // Check for incoming streams before checking `state.error` so that already-received streams,
756    // which are necessarily finite, can be drained from a closed connection.
757    if let Some(id) = state.inner.streams().accept(dir) {
758        let is_0rtt = state.inner.is_handshaking();
759        state.wake(); // To send additional stream ID credit
760        drop(state); // Release the lock so clone can take it
761        return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
762    } else if let Some(ref e) = state.error {
763        return Poll::Ready(Err(e.clone()));
764    }
765    loop {
766        match notify.as_mut().poll(ctx) {
767            // `state` lock ensures we didn't race with readiness
768            Poll::Pending => return Poll::Pending,
769            // Spurious wakeup, get a new future
770            Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
771        }
772    }
773}
774
775pin_project! {
776    /// Future produced by [`Connection::read_datagram`]
777    pub struct ReadDatagram<'a> {
778        conn: &'a ConnectionRef,
779        #[pin]
780        notify: Notified<'a>,
781    }
782}
783
784impl Future for ReadDatagram<'_> {
785    type Output = Result<Bytes, ConnectionError>;
786    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
787        let mut this = self.project();
788        let mut state = this.conn.state.lock("ReadDatagram::poll");
789        // Check for buffered datagrams before checking `state.error` so that already-received
790        // datagrams, which are necessarily finite, can be drained from a closed connection.
791        if let Some(x) = state.inner.datagrams().recv() {
792            return Poll::Ready(Ok(x));
793        } else if let Some(ref e) = state.error {
794            return Poll::Ready(Err(e.clone()));
795        }
796        loop {
797            match this.notify.as_mut().poll(ctx) {
798                // `state` lock ensures we didn't race with readiness
799                Poll::Pending => return Poll::Pending,
800                // Spurious wakeup, get a new future
801                Poll::Ready(()) => this
802                    .notify
803                    .set(this.conn.shared.datagram_received.notified()),
804            }
805        }
806    }
807}
808
809pin_project! {
810    /// Future produced by [`Connection::send_datagram_wait`]
811    pub struct SendDatagram<'a> {
812        conn: &'a ConnectionRef,
813        data: Option<Bytes>,
814        #[pin]
815        notify: Notified<'a>,
816    }
817}
818
819impl Future for SendDatagram<'_> {
820    type Output = Result<(), SendDatagramError>;
821    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
822        let mut this = self.project();
823        let mut state = this.conn.state.lock("SendDatagram::poll");
824        if let Some(ref e) = state.error {
825            return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
826        }
827        use proto::SendDatagramError::*;
828        match state
829            .inner
830            .datagrams()
831            .send(this.data.take().unwrap(), false)
832        {
833            Ok(()) => {
834                state.wake();
835                Poll::Ready(Ok(()))
836            }
837            Err(e) => Poll::Ready(Err(match e {
838                Blocked(data) => {
839                    this.data.replace(data);
840                    loop {
841                        match this.notify.as_mut().poll(ctx) {
842                            Poll::Pending => return Poll::Pending,
843                            // Spurious wakeup, get a new future
844                            Poll::Ready(()) => this
845                                .notify
846                                .set(this.conn.shared.datagrams_unblocked.notified()),
847                        }
848                    }
849                }
850                UnsupportedByPeer => SendDatagramError::UnsupportedByPeer,
851                Disabled => SendDatagramError::Disabled,
852                TooLarge => SendDatagramError::TooLarge,
853            })),
854        }
855    }
856}
857
858#[derive(Debug)]
859pub(crate) struct ConnectionRef(Arc<ConnectionInner>);
860
861impl ConnectionRef {
862    #[allow(clippy::too_many_arguments)]
863    fn new(
864        handle: ConnectionHandle,
865        conn: proto::Connection,
866        endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
867        conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
868        on_handshake_data: oneshot::Sender<()>,
869        on_connected: oneshot::Sender<bool>,
870        socket: Arc<dyn AsyncUdpSocket>,
871        runtime: Arc<dyn Runtime>,
872    ) -> Self {
873        Self(Arc::new(ConnectionInner {
874            state: Mutex::new(State {
875                inner: conn,
876                driver: None,
877                handle,
878                on_handshake_data: Some(on_handshake_data),
879                on_connected: Some(on_connected),
880                connected: false,
881                timer: None,
882                timer_deadline: None,
883                conn_events,
884                endpoint_events,
885                blocked_writers: FxHashMap::default(),
886                blocked_readers: FxHashMap::default(),
887                stopped: FxHashMap::default(),
888                error: None,
889                ref_count: 0,
890                io_poller: socket.clone().create_io_poller(),
891                socket,
892                runtime,
893                send_buffer: Vec::new(),
894                buffered_transmit: None,
895            }),
896            shared: Shared::default(),
897        }))
898    }
899
900    fn stable_id(&self) -> usize {
901        &*self.0 as *const _ as usize
902    }
903}
904
905impl Clone for ConnectionRef {
906    fn clone(&self) -> Self {
907        self.state.lock("clone").ref_count += 1;
908        Self(self.0.clone())
909    }
910}
911
912impl Drop for ConnectionRef {
913    fn drop(&mut self) {
914        let conn = &mut *self.state.lock("drop");
915        if let Some(x) = conn.ref_count.checked_sub(1) {
916            conn.ref_count = x;
917            if x == 0 && !conn.inner.is_closed() {
918                // If the driver is alive, it's just it and us, so we'd better shut it down. If it's
919                // not, we can't do any harm. If there were any streams being opened, then either
920                // the connection will be closed for an unrelated reason or a fresh reference will
921                // be constructed for the newly opened stream.
922                conn.implicit_close(&self.shared);
923            }
924        }
925    }
926}
927
928impl std::ops::Deref for ConnectionRef {
929    type Target = ConnectionInner;
930    fn deref(&self) -> &Self::Target {
931        &self.0
932    }
933}
934
935#[derive(Debug)]
936pub(crate) struct ConnectionInner {
937    pub(crate) state: Mutex<State>,
938    pub(crate) shared: Shared,
939}
940
941#[derive(Debug, Default)]
942pub(crate) struct Shared {
943    /// Notified when new streams may be locally initiated due to an increase in stream ID flow
944    /// control budget
945    stream_budget_available: [Notify; 2],
946    /// Notified when the peer has initiated a new stream
947    stream_incoming: [Notify; 2],
948    datagram_received: Notify,
949    datagrams_unblocked: Notify,
950    closed: Notify,
951}
952
953pub(crate) struct State {
954    pub(crate) inner: proto::Connection,
955    driver: Option<Waker>,
956    handle: ConnectionHandle,
957    on_handshake_data: Option<oneshot::Sender<()>>,
958    on_connected: Option<oneshot::Sender<bool>>,
959    connected: bool,
960    timer: Option<Pin<Box<dyn AsyncTimer>>>,
961    timer_deadline: Option<Instant>,
962    conn_events: mpsc::UnboundedReceiver<ConnectionEvent>,
963    endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
964    pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
965    pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
966    pub(crate) stopped: FxHashMap<StreamId, Waker>,
967    /// Always set to Some before the connection becomes drained
968    pub(crate) error: Option<ConnectionError>,
969    /// Number of live handles that can be used to initiate or handle I/O; excludes the driver
970    ref_count: usize,
971    socket: Arc<dyn AsyncUdpSocket>,
972    io_poller: Pin<Box<dyn UdpPoller>>,
973    runtime: Arc<dyn Runtime>,
974    send_buffer: Vec<u8>,
975    /// We buffer a transmit when the underlying I/O would block
976    buffered_transmit: Option<proto::Transmit>,
977}
978
979impl State {
980    fn drive_transmit(&mut self, cx: &mut Context) -> io::Result<bool> {
981        let now = self.runtime.now();
982        let mut transmits = 0;
983
984        let max_datagrams = self
985            .socket
986            .max_transmit_segments()
987            .min(MAX_TRANSMIT_SEGMENTS);
988
989        loop {
990            // Retry the last transmit, or get a new one.
991            let t = match self.buffered_transmit.take() {
992                Some(t) => t,
993                None => {
994                    self.send_buffer.clear();
995                    self.send_buffer.reserve(self.inner.current_mtu() as usize);
996                    match self
997                        .inner
998                        .poll_transmit(now, max_datagrams, &mut self.send_buffer)
999                    {
1000                        Some(t) => {
1001                            transmits += match t.segment_size {
1002                                None => 1,
1003                                Some(s) => (t.size + s - 1) / s, // round up
1004                            };
1005                            t
1006                        }
1007                        None => break,
1008                    }
1009                }
1010            };
1011
1012            if self.io_poller.as_mut().poll_writable(cx)?.is_pending() {
1013                // Retry after a future wakeup
1014                self.buffered_transmit = Some(t);
1015                return Ok(false);
1016            }
1017
1018            let len = t.size;
1019            let retry = match self
1020                .socket
1021                .try_send(&udp_transmit(&t, &self.send_buffer[..len]))
1022            {
1023                Ok(()) => false,
1024                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
1025                Err(e) => return Err(e),
1026            };
1027            if retry {
1028                // We thought the socket was writable, but it wasn't. Retry so that either another
1029                // `poll_writable` call determines that the socket is indeed not writable and
1030                // registers us for a wakeup, or the send succeeds if this really was just a
1031                // transient failure.
1032                self.buffered_transmit = Some(t);
1033                continue;
1034            }
1035
1036            if transmits >= MAX_TRANSMIT_DATAGRAMS {
1037                // TODO: What isn't ideal here yet is that if we don't poll all
1038                // datagrams that could be sent we don't go into the `app_limited`
1039                // state and CWND continues to grow until we get here the next time.
1040                // See https://github.com/quinn-rs/quinn/issues/1126
1041                return Ok(true);
1042            }
1043        }
1044
1045        Ok(false)
1046    }
1047
1048    fn forward_endpoint_events(&mut self) {
1049        while let Some(event) = self.inner.poll_endpoint_events() {
1050            // If the endpoint driver is gone, noop.
1051            let _ = self.endpoint_events.send((self.handle, event));
1052        }
1053    }
1054
1055    /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
1056    fn process_conn_events(
1057        &mut self,
1058        shared: &Shared,
1059        cx: &mut Context,
1060    ) -> Result<(), ConnectionError> {
1061        loop {
1062            match self.conn_events.poll_recv(cx) {
1063                Poll::Ready(Some(ConnectionEvent::Rebind(socket))) => {
1064                    self.socket = socket;
1065                    self.io_poller = self.socket.clone().create_io_poller();
1066                    self.inner.local_address_changed();
1067                }
1068                Poll::Ready(Some(ConnectionEvent::Proto(event))) => {
1069                    self.inner.handle_event(event);
1070                }
1071                Poll::Ready(Some(ConnectionEvent::Close { reason, error_code })) => {
1072                    self.close(error_code, reason, shared);
1073                }
1074                Poll::Ready(None) => {
1075                    return Err(ConnectionError::TransportError(proto::TransportError {
1076                        code: proto::TransportErrorCode::INTERNAL_ERROR,
1077                        frame: None,
1078                        reason: "endpoint driver future was dropped".to_string(),
1079                    }));
1080                }
1081                Poll::Pending => {
1082                    return Ok(());
1083                }
1084            }
1085        }
1086    }
1087
1088    fn forward_app_events(&mut self, shared: &Shared) {
1089        while let Some(event) = self.inner.poll() {
1090            use proto::Event::*;
1091            match event {
1092                HandshakeDataReady => {
1093                    if let Some(x) = self.on_handshake_data.take() {
1094                        let _ = x.send(());
1095                    }
1096                }
1097                Connected => {
1098                    self.connected = true;
1099                    if let Some(x) = self.on_connected.take() {
1100                        // We don't care if the on-connected future was dropped
1101                        let _ = x.send(self.inner.accepted_0rtt());
1102                    }
1103                    if self.inner.side().is_client() && !self.inner.accepted_0rtt() {
1104                        // Wake up rejected 0-RTT streams so they can fail immediately with
1105                        // `ZeroRttRejected` errors.
1106                        wake_all(&mut self.blocked_writers);
1107                        wake_all(&mut self.blocked_readers);
1108                        wake_all(&mut self.stopped);
1109                    }
1110                }
1111                ConnectionLost { reason } => {
1112                    self.terminate(reason, shared);
1113                }
1114                Stream(StreamEvent::Writable { id }) => wake_stream(id, &mut self.blocked_writers),
1115                Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
1116                    shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1117                }
1118                Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
1119                    shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1120                }
1121                DatagramReceived => {
1122                    shared.datagram_received.notify_waiters();
1123                }
1124                DatagramsUnblocked => {
1125                    shared.datagrams_unblocked.notify_waiters();
1126                }
1127                Stream(StreamEvent::Readable { id }) => wake_stream(id, &mut self.blocked_readers),
1128                Stream(StreamEvent::Available { dir }) => {
1129                    // Might mean any number of streams are ready, so we wake up everyone
1130                    shared.stream_budget_available[dir as usize].notify_waiters();
1131                }
1132                Stream(StreamEvent::Finished { id }) => wake_stream(id, &mut self.stopped),
1133                Stream(StreamEvent::Stopped { id, .. }) => {
1134                    wake_stream(id, &mut self.stopped);
1135                    wake_stream(id, &mut self.blocked_writers);
1136                }
1137            }
1138        }
1139    }
1140
1141    fn drive_timer(&mut self, cx: &mut Context) -> bool {
1142        // Check whether we need to (re)set the timer. If so, we must poll again to ensure the
1143        // timer is registered with the runtime (and check whether it's already
1144        // expired).
1145        match self.inner.poll_timeout() {
1146            Some(deadline) => {
1147                if let Some(delay) = &mut self.timer {
1148                    // There is no need to reset the tokio timer if the deadline
1149                    // did not change
1150                    if self
1151                        .timer_deadline
1152                        .map(|current_deadline| current_deadline != deadline)
1153                        .unwrap_or(true)
1154                    {
1155                        delay.as_mut().reset(deadline);
1156                    }
1157                } else {
1158                    self.timer = Some(self.runtime.new_timer(deadline));
1159                }
1160                // Store the actual expiration time of the timer
1161                self.timer_deadline = Some(deadline);
1162            }
1163            None => {
1164                self.timer_deadline = None;
1165                return false;
1166            }
1167        }
1168
1169        if self.timer_deadline.is_none() {
1170            return false;
1171        }
1172
1173        let delay = self
1174            .timer
1175            .as_mut()
1176            .expect("timer must exist in this state")
1177            .as_mut();
1178        if delay.poll(cx).is_pending() {
1179            // Since there wasn't a timeout event, there is nothing new
1180            // for the connection to do
1181            return false;
1182        }
1183
1184        // A timer expired, so the caller needs to check for
1185        // new transmits, which might cause new timers to be set.
1186        self.inner.handle_timeout(self.runtime.now());
1187        self.timer_deadline = None;
1188        true
1189    }
1190
1191    /// Wake up a blocked `Driver` task to process I/O
1192    pub(crate) fn wake(&mut self) {
1193        if let Some(x) = self.driver.take() {
1194            x.wake();
1195        }
1196    }
1197
1198    /// Used to wake up all blocked futures when the connection becomes closed for any reason
1199    fn terminate(&mut self, reason: ConnectionError, shared: &Shared) {
1200        self.error = Some(reason.clone());
1201        if let Some(x) = self.on_handshake_data.take() {
1202            let _ = x.send(());
1203        }
1204        wake_all(&mut self.blocked_writers);
1205        wake_all(&mut self.blocked_readers);
1206        shared.stream_budget_available[Dir::Uni as usize].notify_waiters();
1207        shared.stream_budget_available[Dir::Bi as usize].notify_waiters();
1208        shared.stream_incoming[Dir::Uni as usize].notify_waiters();
1209        shared.stream_incoming[Dir::Bi as usize].notify_waiters();
1210        shared.datagram_received.notify_waiters();
1211        shared.datagrams_unblocked.notify_waiters();
1212        if let Some(x) = self.on_connected.take() {
1213            let _ = x.send(false);
1214        }
1215        wake_all(&mut self.stopped);
1216        shared.closed.notify_waiters();
1217    }
1218
1219    fn close(&mut self, error_code: VarInt, reason: Bytes, shared: &Shared) {
1220        self.inner.close(self.runtime.now(), error_code, reason);
1221        self.terminate(ConnectionError::LocallyClosed, shared);
1222        self.wake();
1223    }
1224
1225    /// Close for a reason other than the application's explicit request
1226    pub(crate) fn implicit_close(&mut self, shared: &Shared) {
1227        self.close(0u32.into(), Bytes::new(), shared);
1228    }
1229
1230    pub(crate) fn check_0rtt(&self) -> Result<(), ()> {
1231        if self.inner.is_handshaking()
1232            || self.inner.accepted_0rtt()
1233            || self.inner.side().is_server()
1234        {
1235            Ok(())
1236        } else {
1237            Err(())
1238        }
1239    }
1240}
1241
1242impl Drop for State {
1243    fn drop(&mut self) {
1244        if !self.inner.is_drained() {
1245            // Ensure the endpoint can tidy up
1246            let _ = self
1247                .endpoint_events
1248                .send((self.handle, proto::EndpointEvent::drained()));
1249        }
1250    }
1251}
1252
1253impl fmt::Debug for State {
1254    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1255        f.debug_struct("State").field("inner", &self.inner).finish()
1256    }
1257}
1258
1259fn wake_stream(stream_id: StreamId, wakers: &mut FxHashMap<StreamId, Waker>) {
1260    if let Some(waker) = wakers.remove(&stream_id) {
1261        waker.wake();
1262    }
1263}
1264
1265fn wake_all(wakers: &mut FxHashMap<StreamId, Waker>) {
1266    wakers.drain().for_each(|(_, waker)| waker.wake())
1267}
1268
1269/// Errors that can arise when sending a datagram
1270#[derive(Debug, Error, Clone, Eq, PartialEq)]
1271pub enum SendDatagramError {
1272    /// The peer does not support receiving datagram frames
1273    #[error("datagrams not supported by peer")]
1274    UnsupportedByPeer,
1275    /// Datagram support is disabled locally
1276    #[error("datagram support disabled")]
1277    Disabled,
1278    /// The datagram is larger than the connection can currently accommodate
1279    ///
1280    /// Indicates that the path MTU minus overhead or the limit advertised by the peer has been
1281    /// exceeded.
1282    #[error("datagram too large")]
1283    TooLarge,
1284    /// The connection was lost
1285    #[error("connection lost")]
1286    ConnectionLost(#[from] ConnectionError),
1287}
1288
1289/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
1290///
1291/// This limits the amount of CPU resources consumed by datagram generation,
1292/// and allows other tasks (like receiving ACKs) to run in between.
1293const MAX_TRANSMIT_DATAGRAMS: usize = 20;
1294
1295/// The maximum amount of datagrams that are sent in a single transmit
1296///
1297/// This can be lower than the maximum platform capabilities, to avoid excessive
1298/// memory allocations when calling `poll_transmit()`. Benchmarks have shown
1299/// that numbers around 10 are a good compromise.
1300const MAX_TRANSMIT_SEGMENTS: usize = 10;