quinn/runtime.rs
1use std::{
2 fmt::Debug,
3 future::Future,
4 io::{self, IoSliceMut},
5 net::SocketAddr,
6 pin::Pin,
7 sync::Arc,
8 task::{Context, Poll},
9};
10
11use udp::{RecvMeta, Transmit};
12
13use crate::Instant;
14
15/// Abstracts I/O and timer operations for runtime independence
16pub trait Runtime: Send + Sync + Debug + 'static {
17 /// Construct a timer that will expire at `i`
18 fn new_timer(&self, i: Instant) -> Pin<Box<dyn AsyncTimer>>;
19 /// Drive `future` to completion in the background
20 fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
21 /// Convert `t` into the socket type used by this runtime
22 #[cfg(not(wasm_browser))]
23 fn wrap_udp_socket(&self, t: std::net::UdpSocket) -> io::Result<Arc<dyn AsyncUdpSocket>>;
24 /// Look up the current time
25 ///
26 /// Allows simulating the flow of time for testing.
27 fn now(&self) -> Instant {
28 Instant::now()
29 }
30}
31
32/// Abstract implementation of an async timer for runtime independence
33pub trait AsyncTimer: Send + Debug + 'static {
34 /// Update the timer to expire at `i`
35 fn reset(self: Pin<&mut Self>, i: Instant);
36 /// Check whether the timer has expired, and register to be woken if not
37 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()>;
38}
39
40/// Abstract implementation of a UDP socket for runtime independence
41pub trait AsyncUdpSocket: Send + Sync + Debug + 'static {
42 /// Create a [`UdpPoller`] that can register a single task for write-readiness notifications
43 ///
44 /// A `poll_send` method on a single object can usually store only one [`Waker`] at a time,
45 /// i.e. allow at most one caller to wait for an event. This method allows any number of
46 /// interested tasks to construct their own [`UdpPoller`] object. They can all then wait for the
47 /// same event and be notified concurrently, because each [`UdpPoller`] can store a separate
48 /// [`Waker`].
49 ///
50 /// [`Waker`]: std::task::Waker
51 fn create_io_poller(self: Arc<Self>) -> Pin<Box<dyn UdpPoller>>;
52
53 /// Send UDP datagrams from `transmits`, or return `WouldBlock` and clear the underlying
54 /// socket's readiness, or return an I/O error
55 ///
56 /// If this returns [`io::ErrorKind::WouldBlock`], [`UdpPoller::poll_writable`] must be called
57 /// to register the calling task to be woken when a send should be attempted again.
58 fn try_send(&self, transmit: &Transmit) -> io::Result<()>;
59
60 /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future
61 fn poll_recv(
62 &self,
63 cx: &mut Context,
64 bufs: &mut [IoSliceMut<'_>],
65 meta: &mut [RecvMeta],
66 ) -> Poll<io::Result<usize>>;
67
68 /// Look up the local IP address and port used by this socket
69 fn local_addr(&self) -> io::Result<SocketAddr>;
70
71 /// Maximum number of datagrams that a [`Transmit`] may encode
72 fn max_transmit_segments(&self) -> usize {
73 1
74 }
75
76 /// Maximum number of datagrams that might be described by a single [`RecvMeta`]
77 fn max_receive_segments(&self) -> usize {
78 1
79 }
80
81 /// Whether datagrams might get fragmented into multiple parts
82 ///
83 /// Sockets should prevent this for best performance. See e.g. the `IPV6_DONTFRAG` socket
84 /// option.
85 fn may_fragment(&self) -> bool {
86 true
87 }
88}
89
90/// An object polled to detect when an associated [`AsyncUdpSocket`] is writable
91///
92/// Any number of `UdpPoller`s may exist for a single [`AsyncUdpSocket`]. Each `UdpPoller` is
93/// responsible for notifying at most one task when that socket becomes writable.
94pub trait UdpPoller: Send + Sync + Debug + 'static {
95 /// Check whether the associated socket is likely to be writable
96 ///
97 /// Must be called after [`AsyncUdpSocket::try_send`] returns [`io::ErrorKind::WouldBlock`] to
98 /// register the task associated with `cx` to be woken when a send should be attempted
99 /// again. Unlike in [`Future::poll`], a [`UdpPoller`] may be reused indefinitely no matter how
100 /// many times `poll_writable` returns [`Poll::Ready`].
101 fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>>;
102}
103
104pin_project_lite::pin_project! {
105 /// Helper adapting a function `MakeFut` that constructs a single-use future `Fut` into a
106 /// [`UdpPoller`] that may be reused indefinitely
107 struct UdpPollHelper<MakeFut, Fut> {
108 make_fut: MakeFut,
109 #[pin]
110 fut: Option<Fut>,
111 }
112}
113
114impl<MakeFut, Fut> UdpPollHelper<MakeFut, Fut> {
115 /// Construct a [`UdpPoller`] that calls `make_fut` to get the future to poll, storing it until
116 /// it yields [`Poll::Ready`], then creating a new one on the next
117 /// [`poll_writable`](UdpPoller::poll_writable)
118 #[cfg(any(
119 feature = "runtime-async-std",
120 feature = "runtime-smol",
121 feature = "runtime-tokio",
122 feature = "async-io"
123 ))]
124 fn new(make_fut: MakeFut) -> Self {
125 Self {
126 make_fut,
127 fut: None,
128 }
129 }
130}
131
132impl<MakeFut, Fut> UdpPoller for UdpPollHelper<MakeFut, Fut>
133where
134 MakeFut: Fn() -> Fut + Send + Sync + 'static,
135 Fut: Future<Output = io::Result<()>> + Send + Sync + 'static,
136{
137 fn poll_writable(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
138 let mut this = self.project();
139 if this.fut.is_none() {
140 this.fut.set(Some((this.make_fut)()));
141 }
142 // We're forced to `unwrap` here because `Fut` may be `!Unpin`, which means we can't safely
143 // obtain an `&mut Fut` after storing it in `self.fut` when `self` is already behind `Pin`,
144 // and if we didn't store it then we wouldn't be able to keep it alive between
145 // `poll_writable` calls.
146 let result = this.fut.as_mut().as_pin_mut().unwrap().poll(cx);
147 if result.is_ready() {
148 // Polling an arbitrary `Future` after it becomes ready is a logic error, so arrange for
149 // a new `Future` to be created on the next call.
150 this.fut.set(None);
151 }
152 result
153 }
154}
155
156impl<MakeFut, Fut> Debug for UdpPollHelper<MakeFut, Fut> {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("UdpPollHelper").finish_non_exhaustive()
159 }
160}
161
162/// Automatically select an appropriate runtime from those enabled at compile time
163///
164/// If `runtime-tokio` is enabled and this function is called from within a Tokio runtime context,
165/// then `TokioRuntime` is returned. Otherwise, if `runtime-async-std` is enabled, `AsyncStdRuntime`
166/// is returned. Otherwise, if `runtime-smol` is enabled, `SmolRuntime` is returned.
167/// Otherwise, `None` is returned.
168#[allow(clippy::needless_return)] // Be sure we return the right thing
169pub fn default_runtime() -> Option<Arc<dyn Runtime>> {
170 #[cfg(feature = "runtime-tokio")]
171 {
172 if ::tokio::runtime::Handle::try_current().is_ok() {
173 return Some(Arc::new(TokioRuntime));
174 }
175 }
176
177 #[cfg(feature = "runtime-async-std")]
178 {
179 return Some(Arc::new(AsyncStdRuntime));
180 }
181
182 #[cfg(all(feature = "runtime-smol", not(feature = "runtime-async-std")))]
183 {
184 return Some(Arc::new(SmolRuntime));
185 }
186
187 #[cfg(not(any(feature = "runtime-async-std", feature = "runtime-smol")))]
188 None
189}
190
191#[cfg(feature = "runtime-tokio")]
192mod tokio;
193// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
194#[cfg(feature = "runtime-tokio")]
195pub use self::tokio::TokioRuntime;
196
197#[cfg(feature = "async-io")]
198mod async_io;
199// Due to MSRV, we must specify `self::` where there's crate/module ambiguity
200#[cfg(any(feature = "runtime-smol", feature = "runtime-async-std"))]
201pub use self::async_io::*;