rustls_acme/
incoming.rs

1use crate::acceptor::{AcmeAccept, AcmeAcceptor};
2use crate::{crypto_provider, AcmeState};
3use core::fmt;
4use futures::stream::{FusedStream, FuturesUnordered};
5use futures::{AsyncRead, AsyncWrite, Stream};
6use futures_rustls::rustls::crypto::CryptoProvider;
7use futures_rustls::rustls::ServerConfig;
8use futures_rustls::server::TlsStream;
9use futures_rustls::Accept;
10use std::fmt::Debug;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14
15pub struct Incoming<
16    TCP: AsyncRead + AsyncWrite + Unpin,
17    ETCP,
18    ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
19    EC: Debug + 'static,
20    EA: Debug + 'static,
21> {
22    state: AcmeState<EC, EA>,
23    acceptor: AcmeAcceptor,
24    rustls_config: Arc<ServerConfig>,
25    tcp_incoming: Option<ITCP>,
26    acme_accepting: FuturesUnordered<AcmeAccept<TCP>>,
27    tls_accepting: FuturesUnordered<Accept<TCP>>,
28}
29
30impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> fmt::Debug
31    for Incoming<TCP, ETCP, ITCP, EC, EA>
32{
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        f.debug_struct("Incoming")
35            .field("state", &self.state)
36            .field("acceptor", &self.acceptor)
37            .field("in_progress", &(self.acme_accepting.len() + self.tls_accepting.len()))
38            .field("terminated", &self.is_terminated())
39            .finish_non_exhaustive()
40    }
41}
42
43impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> Unpin
44    for Incoming<TCP, ETCP, ITCP, EC, EA>
45{
46}
47
48impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static>
49    Incoming<TCP, ETCP, ITCP, EC, EA>
50{
51    #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
52    pub fn new(tcp_incoming: ITCP, state: AcmeState<EC, EA>, acceptor: AcmeAcceptor, alpn_protocols: Vec<Vec<u8>>) -> Self {
53        Self::new_with_provider(tcp_incoming, state, acceptor, alpn_protocols, crypto_provider().into())
54    }
55
56    /// Same as [Incoming::new], with a specific [CryptoProvider].
57    pub fn new_with_provider(
58        tcp_incoming: ITCP,
59        state: AcmeState<EC, EA>,
60        acceptor: AcmeAcceptor,
61        alpn_protocols: Vec<Vec<u8>>,
62        provider: Arc<CryptoProvider>,
63    ) -> Self {
64        let mut config = ServerConfig::builder_with_provider(provider)
65            .with_safe_default_protocol_versions()
66            .unwrap()
67            .with_no_client_auth()
68            .with_cert_resolver(state.resolver());
69        config.alpn_protocols = alpn_protocols;
70        Self {
71            state,
72            acceptor,
73            rustls_config: Arc::new(config),
74            tcp_incoming: Some(tcp_incoming),
75            acme_accepting: FuturesUnordered::new(),
76            tls_accepting: FuturesUnordered::new(),
77        }
78    }
79}
80
81impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> Stream
82    for Incoming<TCP, ETCP, ITCP, EC, EA>
83{
84    type Item = Result<TlsStream<TCP>, ETCP>;
85
86    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        loop {
88            match Pin::new(&mut self.state).poll_next(cx) {
89                Poll::Ready(Some(event)) => {
90                    match event {
91                        Ok(ok) => log::info!("event: {:?}", ok),
92                        Err(err) => log::error!("event: {:?}", err),
93                    }
94                    continue;
95                }
96                Poll::Ready(None) => unreachable!(),
97                Poll::Pending => {}
98            }
99            match Pin::new(&mut self.acme_accepting).poll_next(cx) {
100                Poll::Ready(Some(Ok(Some(tls)))) => self.tls_accepting.push(tls.into_stream(self.rustls_config.clone())),
101                Poll::Ready(Some(Ok(None))) => {
102                    log::info!("received TLS-ALPN-01 validation request");
103                    continue;
104                }
105                Poll::Ready(Some(Err(err))) => {
106                    log::error!("tls accept failed, {:?}", err);
107                    continue;
108                }
109                Poll::Ready(None) | Poll::Pending => {}
110            }
111            match Pin::new(&mut self.tls_accepting).poll_next(cx) {
112                Poll::Ready(Some(Ok(tls))) => return Poll::Ready(Some(Ok(tls))),
113                Poll::Ready(Some(Err(err))) => {
114                    log::error!("tls accept failed, {:?}", err);
115                    continue;
116                }
117                Poll::Ready(None) | Poll::Pending => {}
118            }
119            let tcp_incoming = match &mut self.tcp_incoming {
120                Some(tcp_incoming) => tcp_incoming,
121                None => match self.is_terminated() {
122                    true => return Poll::Ready(None),
123                    false => return Poll::Pending,
124                },
125            };
126            match Pin::new(tcp_incoming).poll_next(cx) {
127                Poll::Ready(Some(Ok(tcp))) => self.acme_accepting.push(self.acceptor.accept(tcp)),
128                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
129                Poll::Ready(None) => drop(self.tcp_incoming.as_mut().take()),
130                Poll::Pending => return Poll::Pending,
131            }
132        }
133    }
134}
135
136impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> FusedStream
137    for Incoming<TCP, ETCP, ITCP, EC, EA>
138{
139    fn is_terminated(&self) -> bool {
140        self.tcp_incoming.is_none() && self.acme_accepting.is_terminated() && self.tls_accepting.is_terminated()
141    }
142}