1use crate::acceptor::{AcmeAccept, AcmeAcceptor};
2use crate::{crypto_provider, AcmeState};
3use core::fmt;
4use futures::stream::{FusedStream, FuturesUnordered};
5use futures::{AsyncRead, AsyncWrite, Stream};
6use futures_rustls::rustls::crypto::CryptoProvider;
7use futures_rustls::rustls::ServerConfig;
8use futures_rustls::server::TlsStream;
9use futures_rustls::Accept;
10use std::fmt::Debug;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14
15pub struct Incoming<
16 TCP: AsyncRead + AsyncWrite + Unpin,
17 ETCP,
18 ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
19 EC: Debug + 'static,
20 EA: Debug + 'static,
21> {
22 state: AcmeState<EC, EA>,
23 acceptor: AcmeAcceptor,
24 rustls_config: Arc<ServerConfig>,
25 tcp_incoming: Option<ITCP>,
26 acme_accepting: FuturesUnordered<AcmeAccept<TCP>>,
27 tls_accepting: FuturesUnordered<Accept<TCP>>,
28}
29
30impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> fmt::Debug
31 for Incoming<TCP, ETCP, ITCP, EC, EA>
32{
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 f.debug_struct("Incoming")
35 .field("state", &self.state)
36 .field("acceptor", &self.acceptor)
37 .field("in_progress", &(self.acme_accepting.len() + self.tls_accepting.len()))
38 .field("terminated", &self.is_terminated())
39 .finish_non_exhaustive()
40 }
41}
42
43impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> Unpin
44 for Incoming<TCP, ETCP, ITCP, EC, EA>
45{
46}
47
48impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static>
49 Incoming<TCP, ETCP, ITCP, EC, EA>
50{
51 #[cfg(any(feature = "ring", feature = "aws-lc-rs"))]
52 pub fn new(tcp_incoming: ITCP, state: AcmeState<EC, EA>, acceptor: AcmeAcceptor, alpn_protocols: Vec<Vec<u8>>) -> Self {
53 Self::new_with_provider(tcp_incoming, state, acceptor, alpn_protocols, crypto_provider().into())
54 }
55
56 pub fn new_with_provider(
58 tcp_incoming: ITCP,
59 state: AcmeState<EC, EA>,
60 acceptor: AcmeAcceptor,
61 alpn_protocols: Vec<Vec<u8>>,
62 provider: Arc<CryptoProvider>,
63 ) -> Self {
64 let mut config = ServerConfig::builder_with_provider(provider)
65 .with_safe_default_protocol_versions()
66 .unwrap()
67 .with_no_client_auth()
68 .with_cert_resolver(state.resolver());
69 config.alpn_protocols = alpn_protocols;
70 Self {
71 state,
72 acceptor,
73 rustls_config: Arc::new(config),
74 tcp_incoming: Some(tcp_incoming),
75 acme_accepting: FuturesUnordered::new(),
76 tls_accepting: FuturesUnordered::new(),
77 }
78 }
79}
80
81impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> Stream
82 for Incoming<TCP, ETCP, ITCP, EC, EA>
83{
84 type Item = Result<TlsStream<TCP>, ETCP>;
85
86 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 loop {
88 match Pin::new(&mut self.state).poll_next(cx) {
89 Poll::Ready(Some(event)) => {
90 match event {
91 Ok(ok) => log::info!("event: {:?}", ok),
92 Err(err) => log::error!("event: {:?}", err),
93 }
94 continue;
95 }
96 Poll::Ready(None) => unreachable!(),
97 Poll::Pending => {}
98 }
99 match Pin::new(&mut self.acme_accepting).poll_next(cx) {
100 Poll::Ready(Some(Ok(Some(tls)))) => self.tls_accepting.push(tls.into_stream(self.rustls_config.clone())),
101 Poll::Ready(Some(Ok(None))) => {
102 log::info!("received TLS-ALPN-01 validation request");
103 continue;
104 }
105 Poll::Ready(Some(Err(err))) => {
106 log::error!("tls accept failed, {:?}", err);
107 continue;
108 }
109 Poll::Ready(None) | Poll::Pending => {}
110 }
111 match Pin::new(&mut self.tls_accepting).poll_next(cx) {
112 Poll::Ready(Some(Ok(tls))) => return Poll::Ready(Some(Ok(tls))),
113 Poll::Ready(Some(Err(err))) => {
114 log::error!("tls accept failed, {:?}", err);
115 continue;
116 }
117 Poll::Ready(None) | Poll::Pending => {}
118 }
119 let tcp_incoming = match &mut self.tcp_incoming {
120 Some(tcp_incoming) => tcp_incoming,
121 None => match self.is_terminated() {
122 true => return Poll::Ready(None),
123 false => return Poll::Pending,
124 },
125 };
126 match Pin::new(tcp_incoming).poll_next(cx) {
127 Poll::Ready(Some(Ok(tcp))) => self.acme_accepting.push(self.acceptor.accept(tcp)),
128 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
129 Poll::Ready(None) => drop(self.tcp_incoming.as_mut().take()),
130 Poll::Pending => return Poll::Pending,
131 }
132 }
133 }
134}
135
136impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> FusedStream
137 for Incoming<TCP, ETCP, ITCP, EC, EA>
138{
139 fn is_terminated(&self) -> bool {
140 self.tcp_incoming.is_none() && self.acme_accepting.is_terminated() && self.tls_accepting.is_terminated()
141 }
142}