1use std::error::Error;
2use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::{env, thread};
6
7use crate::ferron_common::{LogMessage, ServerModule, ServerModuleHandlers};
8use crate::ferron_request_handler::request_handler;
9use crate::ferron_util::env_config;
10use crate::ferron_util::load_tls::{load_certs, load_private_key};
11use crate::ferron_util::sni::CustomSniResolver;
12use crate::ferron_util::validate_config::{prepare_config_for_validation, validate_config};
13use async_channel::Sender;
14use chrono::prelude::*;
15use futures_util::StreamExt;
16use h3_quinn::quinn;
17use h3_quinn::quinn::crypto::rustls::QuicServerConfig;
18use http::Response;
19use http_body_util::{BodyExt, StreamBody};
20use hyper::body::{Buf, Bytes, Frame, Incoming};
21use hyper::service::service_fn;
22use hyper::Request;
23use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
24use ocsp_stapler::Stapler;
25use rustls::crypto::ring::cipher_suite::*;
26use rustls::crypto::ring::default_provider;
27use rustls::crypto::ring::kx_group::*;
28use rustls::server::{Acceptor, WebPkiClientVerifier};
29use rustls::sign::CertifiedKey;
30use rustls::version::{TLS12, TLS13};
31use rustls::{RootCertStore, ServerConfig};
32use rustls_acme::acme::ACME_TLS_ALPN_NAME;
33use rustls_acme::caches::DirCache;
34use rustls_acme::{is_tls_alpn_challenge, AcmeConfig, ResolvesServerCertAcme, UseChallenge};
35use rustls_native_certs::load_native_certs;
36use tokio::fs;
37use tokio::io::{AsyncWriteExt, BufWriter};
38use tokio::net::{TcpListener, TcpStream};
39use tokio::runtime::Handle;
40use tokio::signal;
41use tokio::sync::Mutex;
42use tokio::time;
43use tokio_rustls::server::TlsStream;
44use tokio_rustls::LazyConfigAcceptor;
45use tokio_util::sync::CancellationToken;
46use yaml_rust2::Yaml;
47
48#[allow(clippy::large_enum_variant)]
50enum MaybeTlsStream {
51 Tls(TlsStream<TcpStream>),
52 Plain(TcpStream),
53}
54
55#[allow(clippy::too_many_arguments)]
57async fn accept_quic_connection(
58 connection_attempt: quinn::Incoming,
59 local_address: SocketAddr,
60 config: Arc<Yaml>,
61 logger: Sender<LogMessage>,
62 modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
63) {
64 let remote_address = connection_attempt.remote_address();
65
66 let logger_clone = logger.clone();
67
68 tokio::task::spawn(async move {
69 match connection_attempt.await {
70 Ok(connection) => {
71 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
72 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
73 Ok(h3_conn) => h3_conn,
74 Err(err) => {
75 logger_clone
76 .send(LogMessage::new(
77 format!("Error serving HTTP/3 connection: {err}"),
78 true,
79 ))
80 .await
81 .unwrap_or_default();
82 return;
83 }
84 };
85
86 loop {
87 match h3_conn.accept().await {
88 Ok(Some(resolver)) => {
89 let config = config.clone();
90 let remote_address = remote_address;
91
92 let logger_clone = logger_clone.clone();
93 let modules = modules.clone();
94 tokio::spawn(async move {
95 let handlers_vec = modules
96 .iter()
97 .map(|module| module.get_handlers(Handle::current()));
98
99 let (request, stream) = match resolver.resolve_request().await {
100 Ok(resolved) => resolved,
101 Err(err) => {
102 logger_clone
103 .send(LogMessage::new(
104 format!("Error serving HTTP/3 connection: {err}"),
105 true,
106 ))
107 .await
108 .unwrap_or_default();
109 return;
110 }
111 };
112 let (mut send, receive) = stream.split();
113 let request_body_stream = futures_util::stream::unfold(
114 (receive, false),
115 async move |(mut receive, mut is_body_finished)| loop {
116 if !is_body_finished {
117 match receive.recv_data().await {
118 Ok(Some(mut data)) => {
119 return Some((
120 Ok(Frame::data(data.copy_to_bytes(data.remaining()))),
121 (receive, false),
122 ))
123 }
124 Ok(None) => is_body_finished = true,
125 Err(err) => {
126 return Some((
127 Err(std::io::Error::other(err.to_string())),
128 (receive, false),
129 ))
130 }
131 }
132 } else {
133 match receive.recv_trailers().await {
134 Ok(Some(trailers)) => {
135 return Some((Ok(Frame::trailers(trailers)), (receive, true)))
136 }
137 Ok(None) => {
138 return None;
139 }
140 Err(err) => {
141 return Some((
142 Err(std::io::Error::other(err.to_string())),
143 (receive, true),
144 ))
145 }
146 }
147 }
148 },
149 );
150 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
151 let (request_parts, _) = request.into_parts();
152 let request = Request::from_parts(request_parts, request_body);
153 let handlers_vec_clone = handlers_vec
154 .clone()
155 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
156 let mut response = match request_handler(
157 request,
158 remote_address,
159 local_address,
160 true,
161 config,
162 logger_clone.clone(),
163 handlers_vec_clone,
164 None,
165 None,
166 )
167 .await
168 {
169 Ok(response) => response,
170 Err(err) => {
171 logger_clone
172 .send(LogMessage::new(
173 format!("Error serving HTTP/3 connection: {err}"),
174 true,
175 ))
176 .await
177 .unwrap_or_default();
178 return;
179 }
180 };
181 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
182 response
183 .headers_mut()
184 .entry(http::header::DATE)
185 .or_insert(http_date);
186 }
187 let (response_parts, mut response_body) = response.into_parts();
188 if let Err(err) = send
189 .send_response(Response::from_parts(response_parts, ()))
190 .await
191 {
192 logger_clone
193 .send(LogMessage::new(
194 format!("Error serving HTTP/3 connection: {err}"),
195 true,
196 ))
197 .await
198 .unwrap_or_default();
199 return;
200 }
201 let mut had_trailers = false;
202 while let Some(chunk) = response_body.frame().await {
203 match chunk {
204 Ok(frame) => {
205 if frame.is_data() {
206 match frame.into_data() {
207 Ok(data) => {
208 if let Err(err) = send.send_data(data).await {
209 logger_clone
210 .send(LogMessage::new(
211 format!("Error serving HTTP/3 connection: {err}"),
212 true,
213 ))
214 .await
215 .unwrap_or_default();
216 return;
217 }
218 }
219 Err(_) => {
220 logger_clone
221 .send(LogMessage::new(
222 "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
223 true,
224 ))
225 .await
226 .unwrap_or_default();
227 return;
228 }
229 }
230 } else if frame.is_trailers() {
231 match frame.into_trailers() {
232 Ok(trailers) => {
233 had_trailers = true;
234 if let Err(err) = send.send_trailers(trailers).await {
235 logger_clone
236 .send(LogMessage::new(
237 format!("Error serving HTTP/3 connection: {err}"),
238 true,
239 ))
240 .await
241 .unwrap_or_default();
242 return;
243 }
244 }
245 Err(_) => {
246 logger_clone
247 .send(LogMessage::new(
248 "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
249 true,
250 ))
251 .await
252 .unwrap_or_default();
253 return;
254 }
255 }
256 }
257 }
258 Err(err) => {
259 logger_clone
260 .send(LogMessage::new(
261 format!("Error serving HTTP/3 connection: {err}"),
262 true,
263 ))
264 .await
265 .unwrap_or_default();
266 return;
267 }
268 }
269 }
270 if !had_trailers {
271 if let Err(err) = send.finish().await {
272 logger_clone
273 .send(LogMessage::new(
274 format!("Error serving HTTP/3 connection: {err}"),
275 true,
276 ))
277 .await
278 .unwrap_or_default();
279 }
280 }
281 });
282 }
283 Ok(None) => break,
284 Err(err) => {
285 logger_clone
286 .send(LogMessage::new(
287 format!("Error serving HTTP/3 connection: {err}"),
288 true,
289 ))
290 .await
291 .unwrap_or_default();
292 return;
293 }
294 }
295 }
296 }
297 Err(err) => {
298 logger_clone
299 .send(LogMessage::new(
300 format!("Cannot accept a connection: {err}"),
301 true,
302 ))
303 .await
304 .unwrap_or_default();
305 }
306 }
307 });
308}
309
310#[allow(clippy::too_many_arguments)]
312async fn accept_connection(
313 stream: TcpStream,
314 remote_address: SocketAddr,
315 tls_config_option: Option<(Arc<ServerConfig>, Option<Arc<ServerConfig>>)>,
316 acme_http01_resolver_option: Option<Arc<ResolvesServerCertAcme>>,
317 config: Arc<Yaml>,
318 logger: Sender<LogMessage>,
319 modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
320 http3_enabled: Option<u16>,
321) {
322 if let Err(err) = stream.set_nodelay(true) {
324 logger
325 .send(LogMessage::new(
326 format!("Cannot disable Nagle algorithm: {err}"),
327 true,
328 ))
329 .await
330 .unwrap_or_default();
331 return;
332 };
333
334 let config = config.clone();
335 let local_address = match stream.local_addr() {
336 Ok(local_address) => local_address,
337 Err(err) => {
338 logger
339 .send(LogMessage::new(
340 format!("Cannot obtain local address of the connection: {err}"),
341 true,
342 ))
343 .await
344 .unwrap_or_default();
345 return;
346 }
347 };
348
349 let logger_clone = logger.clone();
350
351 tokio::task::spawn(async move {
352 let maybe_tls_stream = if let Some((tls_config, acme_config_option)) = tls_config_option {
353 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), stream).await {
354 Ok(start_handshake) => start_handshake,
355 Err(err) => {
356 logger
357 .send(LogMessage::new(
358 format!("Error during TLS handshake: {err}"),
359 true,
360 ))
361 .await
362 .unwrap_or_default();
363 return;
364 }
365 };
366
367 if let Some(acme_config) = acme_config_option {
368 if is_tls_alpn_challenge(&start_handshake.client_hello()) {
369 match start_handshake.into_stream(acme_config).await {
370 Ok(_) => (),
371 Err(err) => {
372 logger
373 .send(LogMessage::new(
374 format!("Error during TLS handshake: {err}"),
375 true,
376 ))
377 .await
378 .unwrap_or_default();
379 return;
380 }
381 };
382 return;
383 }
384 }
385
386 let tls_stream = match start_handshake.into_stream(tls_config).await {
387 Ok(tls_stream) => tls_stream,
388 Err(err) => {
389 logger
390 .send(LogMessage::new(
391 format!("Error during TLS handshake: {err}"),
392 true,
393 ))
394 .await
395 .unwrap_or_default();
396 return;
397 }
398 };
399
400 MaybeTlsStream::Tls(tls_stream)
401 } else {
402 MaybeTlsStream::Plain(stream)
403 };
404
405 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
406 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
407 let is_http2;
408
409 if config["global"]["enableHTTP2"].as_bool().unwrap_or(true) {
410 if alpn_protocol == Some("h2".as_bytes()) {
411 is_http2 = true;
412 } else {
413 is_http2 = false;
415 }
416 } else {
417 is_http2 = false;
418 }
419
420 let io = TokioIo::new(tls_stream);
421 let handlers_vec = modules
422 .iter()
423 .map(|module| module.get_handlers(Handle::current()));
424
425 if is_http2 {
426 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
427 http2_builder.timer(TokioTimer::new());
428 if let Some(initial_window_size) =
429 config["global"]["http2Settings"]["initialWindowSize"].as_i64()
430 {
431 http2_builder.initial_stream_window_size(initial_window_size as u32);
432 }
433 if let Some(max_frame_size) = config["global"]["http2Settings"]["maxFrameSize"].as_i64() {
434 http2_builder.max_frame_size(max_frame_size as u32);
435 }
436 if let Some(max_concurrent_streams) =
437 config["global"]["http2Settings"]["maxConcurrentStreams"].as_i64()
438 {
439 http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
440 }
441 if let Some(max_header_list_size) =
442 config["global"]["http2Settings"]["maxHeaderListSize"].as_i64()
443 {
444 http2_builder.max_header_list_size(max_header_list_size as u32);
445 }
446 if let Some(enable_connect_protocol) =
447 config["global"]["http2Settings"]["enableConnectProtocol"].as_bool()
448 {
449 if enable_connect_protocol {
450 http2_builder.enable_connect_protocol();
451 }
452 }
453
454 if let Err(err) = http2_builder
455 .serve_connection(
456 io,
457 service_fn(move |request: Request<Incoming>| {
458 let config = config.clone();
459 let logger = logger_clone.clone();
460 let handlers_vec_clone = handlers_vec
461 .clone()
462 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
463 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
464 let (request_parts, request_body) = request.into_parts();
465 let request = Request::from_parts(
466 request_parts,
467 request_body
468 .map_err(|e| std::io::Error::other(e.to_string()))
469 .boxed(),
470 );
471 request_handler(
472 request,
473 remote_address,
474 local_address,
475 true,
476 config,
477 logger,
478 handlers_vec_clone,
479 acme_http01_resolver_option_clone,
480 http3_enabled,
481 )
482 }),
483 )
484 .await
485 {
486 logger
487 .send(LogMessage::new(
488 format!("Error serving HTTPS connection: {err}"),
489 true,
490 ))
491 .await
492 .unwrap_or_default();
493 }
494 } else {
495 let mut http1_builder = hyper::server::conn::http1::Builder::new();
496
497 http1_builder.timer(TokioTimer::new());
499
500 if let Err(err) = http1_builder
501 .serve_connection(
502 io,
503 service_fn(move |request: Request<Incoming>| {
504 let config = config.clone();
505 let logger = logger_clone.clone();
506 let handlers_vec_clone = handlers_vec
507 .clone()
508 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
509 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
510 let (request_parts, request_body) = request.into_parts();
511 let request = Request::from_parts(
512 request_parts,
513 request_body
514 .map_err(|e| std::io::Error::other(e.to_string()))
515 .boxed(),
516 );
517 request_handler(
518 request,
519 remote_address,
520 local_address,
521 true,
522 config,
523 logger,
524 handlers_vec_clone,
525 acme_http01_resolver_option_clone,
526 http3_enabled,
527 )
528 }),
529 )
530 .with_upgrades()
531 .await
532 {
533 logger
534 .send(LogMessage::new(
535 format!("Error serving HTTPS connection: {err}"),
536 true,
537 ))
538 .await
539 .unwrap_or_default();
540 }
541 }
542 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
543 let io = TokioIo::new(stream);
544 let handlers_vec = modules
545 .iter()
546 .map(|module| module.get_handlers(Handle::current()));
547
548 let mut http1_builder = hyper::server::conn::http1::Builder::new();
549
550 http1_builder.timer(TokioTimer::new());
552
553 if let Err(err) = http1_builder
554 .serve_connection(
555 io,
556 service_fn(move |request: Request<Incoming>| {
557 let config = config.clone();
558 let logger = logger_clone.clone();
559 let handlers_vec_clone = handlers_vec
560 .clone()
561 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
562 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
563 let (request_parts, request_body) = request.into_parts();
564 let request = Request::from_parts(
565 request_parts,
566 request_body
567 .map_err(|e| std::io::Error::other(e.to_string()))
568 .boxed(),
569 );
570 request_handler(
571 request,
572 remote_address,
573 local_address,
574 false,
575 config,
576 logger,
577 handlers_vec_clone,
578 acme_http01_resolver_option_clone,
579 http3_enabled,
580 )
581 }),
582 )
583 .with_upgrades()
584 .await
585 {
586 logger
587 .send(LogMessage::new(
588 format!("Error serving HTTP connection: {err}"),
589 true,
590 ))
591 .await
592 .unwrap_or_default();
593 }
594 }
595 });
596}
597
598#[allow(clippy::type_complexity)]
600async fn server_event_loop(
601 yaml_config: Arc<Yaml>,
602 logger: Sender<LogMessage>,
603 modules: Vec<Box<dyn ServerModule + Send + Sync>>,
604 module_error: Option<anyhow::Error>,
605 modules_optional_builtin: Vec<String>,
606 first_startup: bool,
607) -> Result<(), Box<dyn Error + Send + Sync>> {
608 if let Some(module_error) = module_error {
609 logger
610 .send(LogMessage::new(module_error.to_string(), true))
611 .await
612 .unwrap_or_default();
613 Err(module_error)?
614 }
615
616 let prepared_config = match prepare_config_for_validation(&yaml_config) {
617 Ok(prepared_config) => prepared_config,
618 Err(err) => {
619 logger
620 .send(LogMessage::new(
621 format!("Server configuration validation failed: {err}"),
622 true,
623 ))
624 .await
625 .unwrap_or_default();
626 Err(anyhow::anyhow!(format!(
627 "Server configuration validation failed: {}",
628 err
629 )))?
630 }
631 };
632
633 for (config_to_validate, is_global, is_location, is_error_config) in prepared_config {
634 match validate_config(
635 config_to_validate,
636 is_global,
637 is_location,
638 is_error_config,
639 &modules_optional_builtin,
640 ) {
641 Ok(unused_properties) => {
642 for unused_property in unused_properties {
643 logger
644 .send(LogMessage::new(
645 format!(
646 "Unused configuration property detected: \"{unused_property}\". You might load an appropriate module to use this configuration property"
647 ),
648 true,
649 ))
650 .await
651 .unwrap_or_default();
652 }
653 }
654 Err(err) => {
655 logger
656 .send(LogMessage::new(
657 format!("Server configuration validation failed: {err}"),
658 true,
659 ))
660 .await
661 .unwrap_or_default();
662 Err(anyhow::anyhow!(format!(
663 "Server configuration validation failed: {}",
664 err
665 )))?
666 }
667 };
668 }
669
670 let mut crypto_provider = default_provider();
671
672 if let Some(cipher_suite) = yaml_config["global"]["cipherSuite"].as_vec() {
673 let mut cipher_suites = Vec::new();
674 let cipher_suite_iter = cipher_suite.iter();
675 for cipher_suite_yaml in cipher_suite_iter {
676 if let Some(cipher_suite) = cipher_suite_yaml.as_str() {
677 let cipher_suite_to_add = match cipher_suite {
678 "TLS_AES_128_GCM_SHA256" => TLS13_AES_128_GCM_SHA256,
679 "TLS_AES_256_GCM_SHA384" => TLS13_AES_256_GCM_SHA384,
680 "TLS_CHACHA20_POLY1305_SHA256" => TLS13_CHACHA20_POLY1305_SHA256,
681 "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
682 "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
683 "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256" => {
684 TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
685 }
686 "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
687 "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
688 "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256" => {
689 TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
690 }
691 _ => {
692 logger
693 .send(LogMessage::new(
694 format!("The \"{cipher_suite}\" cipher suite is not supported"),
695 true,
696 ))
697 .await
698 .unwrap_or_default();
699 Err(anyhow::anyhow!(format!(
700 "The \"{}\" cipher suite is not supported",
701 cipher_suite
702 )))?
703 }
704 };
705 cipher_suites.push(cipher_suite_to_add);
706 }
707 }
708 crypto_provider.cipher_suites = cipher_suites;
709 }
710
711 if let Some(ecdh_curves) = yaml_config["global"]["ecdhCurve"].as_vec() {
712 let mut kx_groups = Vec::new();
713 let ecdh_curves_iter = ecdh_curves.iter();
714 for ecdh_curve_yaml in ecdh_curves_iter {
715 if let Some(ecdh_curve) = ecdh_curve_yaml.as_str() {
716 let kx_group_to_add = match ecdh_curve {
717 "secp256r1" => SECP256R1,
718 "secp384r1" => SECP384R1,
719 "x25519" => X25519,
720 _ => {
721 logger
722 .send(LogMessage::new(
723 format!("The \"{ecdh_curve}\" ECDH curve is not supported"),
724 true,
725 ))
726 .await
727 .unwrap_or_default();
728 Err(anyhow::anyhow!(format!(
729 "The \"{}\" ECDH curve is not supported",
730 ecdh_curve
731 )))?
732 }
733 };
734 kx_groups.push(kx_group_to_add);
735 }
736 }
737 crypto_provider.kx_groups = kx_groups;
738 }
739
740 let crypto_provider_cloned = crypto_provider.clone();
741 let mut sni_resolver = CustomSniResolver::new();
742 let mut certified_keys = Vec::new();
743
744 let mut automatic_tls_enabled = false;
745 let mut acme_letsencrypt_production = true;
746 let acme_use_http_challenge = yaml_config["global"]["useAutomaticTLSHTTPChallenge"]
747 .as_bool()
748 .unwrap_or(false);
749 let acme_challenge_type = if acme_use_http_challenge {
750 UseChallenge::Http01
751 } else {
752 UseChallenge::TlsAlpn01
753 };
754
755 if let Some(read_automatic_tls_enabled) = yaml_config["global"]["enableAutomaticTLS"].as_bool() {
757 automatic_tls_enabled = read_automatic_tls_enabled;
758 }
759
760 let acme_contact = yaml_config["global"]["automaticTLSContactEmail"].as_str();
761 let acme_cache = yaml_config["global"]["automaticTLSContactCacheDirectory"]
762 .as_str()
763 .map(|s| s.to_string())
764 .map(DirCache::new);
765
766 if let Some(read_acme_letsencrypt_production) =
767 yaml_config["global"]["automaticTLSLetsEncryptProduction"].as_bool()
768 {
769 acme_letsencrypt_production = read_acme_letsencrypt_production;
770 }
771
772 if !automatic_tls_enabled {
773 if let Some(cert_path) = yaml_config["global"]["cert"].as_str() {
775 if let Some(key_path) = yaml_config["global"]["key"].as_str() {
776 let certs = match load_certs(cert_path) {
777 Ok(certs) => certs,
778 Err(err) => {
779 logger
780 .send(LogMessage::new(
781 format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
782 true,
783 ))
784 .await
785 .unwrap_or_default();
786 Err(anyhow::anyhow!(format!(
787 "Cannot load the \"{}\" TLS certificate: {}",
788 cert_path, err
789 )))?
790 }
791 };
792 let key = match load_private_key(key_path) {
793 Ok(key) => key,
794 Err(err) => {
795 logger
796 .send(LogMessage::new(
797 format!("Cannot load the \"{cert_path}\" private key: {err}"),
798 true,
799 ))
800 .await
801 .unwrap_or_default();
802 Err(anyhow::anyhow!(format!(
803 "Cannot load the \"{}\" private key: {}",
804 cert_path, err
805 )))?
806 }
807 };
808 let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
809 Ok(key) => key,
810 Err(err) => {
811 logger
812 .send(LogMessage::new(
813 format!("Cannot load the \"{cert_path}\" private key: {err}"),
814 true,
815 ))
816 .await
817 .unwrap_or_default();
818 Err(anyhow::anyhow!(format!(
819 "Cannot load the \"{}\" private key: {}",
820 cert_path, err
821 )))?
822 }
823 };
824 let certified_key = CertifiedKey::new(certs, signing_key);
825 sni_resolver.load_fallback_cert_key(Arc::new(certified_key));
826 }
827 }
828
829 if let Some(sni) = yaml_config["global"]["sni"].as_hash() {
830 let sni_hostnames = sni.keys();
831 for sni_hostname_unknown in sni_hostnames {
832 if let Some(sni_hostname) = sni_hostname_unknown.as_str() {
833 if let Some(cert_path) = sni[sni_hostname_unknown]["cert"].as_str() {
834 if let Some(key_path) = sni[sni_hostname_unknown]["key"].as_str() {
835 let certs = match load_certs(cert_path) {
836 Ok(certs) => certs,
837 Err(err) => {
838 logger
839 .send(LogMessage::new(
840 format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
841 true,
842 ))
843 .await
844 .unwrap_or_default();
845 Err(anyhow::anyhow!(format!(
846 "Cannot load the \"{}\" TLS certificate: {}",
847 cert_path, err
848 )))?
849 }
850 };
851 let key = match load_private_key(key_path) {
852 Ok(key) => key,
853 Err(err) => {
854 logger
855 .send(LogMessage::new(
856 format!("Cannot load the \"{cert_path}\" private key: {err}"),
857 true,
858 ))
859 .await
860 .unwrap_or_default();
861 Err(anyhow::anyhow!(format!(
862 "Cannot load the \"{}\" private key: {}",
863 cert_path, err
864 )))?
865 }
866 };
867 let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
868 Ok(key) => key,
869 Err(err) => {
870 logger
871 .send(LogMessage::new(
872 format!("Cannot load the \"{cert_path}\" private key: {err}"),
873 true,
874 ))
875 .await
876 .unwrap_or_default();
877 Err(anyhow::anyhow!(format!(
878 "Cannot load the \"{}\" private key: {}",
879 cert_path, err
880 )))?
881 }
882 };
883 let certified_key_arc = Arc::new(CertifiedKey::new(certs, signing_key));
884 sni_resolver.load_host_cert_key(sni_hostname, certified_key_arc.clone());
885 certified_keys.push(certified_key_arc);
886 }
887 }
888 }
889 }
890 }
891 }
892
893 let tls_config_builder_wants_versions =
895 ServerConfig::builder_with_provider(Arc::new(crypto_provider_cloned));
896
897 let min_tls_version_option = yaml_config["global"]["tlsMinVersion"].as_str();
899 let max_tls_version_option = yaml_config["global"]["tlsMaxVersion"].as_str();
900 let tls_config_builder_wants_verifier = match min_tls_version_option {
901 Some("TLSv1.3") => match max_tls_version_option {
902 Some("TLSv1.2") => {
903 logger
904 .send(LogMessage::new(
905 String::from("The maximum TLS version is older than the minimum TLS version"),
906 true,
907 ))
908 .await
909 .unwrap_or_default();
910 Err(anyhow::anyhow!(String::from(
911 "The maximum TLS version is older than the minimum TLS version"
912 )))?
913 }
914 Some("TLSv1.3") | None => {
915 match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS13]) {
916 Ok(builder) => builder,
917 Err(err) => {
918 logger
919 .send(LogMessage::new(
920 format!("Couldn't create the TLS server configuration: {err}"),
921 true,
922 ))
923 .await
924 .unwrap_or_default();
925 Err(anyhow::anyhow!(format!(
926 "Couldn't create the TLS server configuration: {}",
927 err
928 )))?
929 }
930 }
931 }
932 _ => {
933 logger
934 .send(LogMessage::new(
935 String::from("Invalid maximum TLS version"),
936 true,
937 ))
938 .await
939 .unwrap_or_default();
940 Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
941 }
942 },
943 Some("TLSv1.2") | None => match max_tls_version_option {
944 Some("TLSv1.2") => {
945 match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12]) {
946 Ok(builder) => builder,
947 Err(err) => {
948 logger
949 .send(LogMessage::new(
950 format!("Couldn't create the TLS server configuration: {err}"),
951 true,
952 ))
953 .await
954 .unwrap_or_default();
955 Err(anyhow::anyhow!(format!(
956 "Couldn't create the TLS server configuration: {}",
957 err
958 )))?
959 }
960 }
961 }
962 Some("TLSv1.3") | None => {
963 match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12, &TLS13]) {
964 Ok(builder) => builder,
965 Err(err) => {
966 logger
967 .send(LogMessage::new(
968 format!("Couldn't create the TLS server configuration: {err}"),
969 true,
970 ))
971 .await
972 .unwrap_or_default();
973 Err(anyhow::anyhow!(format!(
974 "Couldn't create the TLS server configuration: {}",
975 err
976 )))?
977 }
978 }
979 }
980 _ => {
981 logger
982 .send(LogMessage::new(
983 String::from("Invalid maximum TLS version"),
984 true,
985 ))
986 .await
987 .unwrap_or_default();
988 Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
989 }
990 },
991 _ => {
992 logger
993 .send(LogMessage::new(
994 String::from("Invalid minimum TLS version"),
995 true,
996 ))
997 .await
998 .unwrap_or_default();
999 Err(anyhow::anyhow!(String::from("Invalid minimum TLS version")))?
1000 }
1001 };
1002
1003 let tls_config_builder_wants_server_cert =
1004 match yaml_config["global"]["useClientCertificate"].as_bool() {
1005 Some(true) => {
1006 let mut roots = RootCertStore::empty();
1007 let certs_result = load_native_certs();
1008 if !certs_result.errors.is_empty() {
1009 logger
1010 .send(LogMessage::new(
1011 format!(
1012 "Couldn't load the native certificate store: {}",
1013 certs_result.errors[0]
1014 ),
1015 true,
1016 ))
1017 .await
1018 .unwrap_or_default();
1019 Err(anyhow::anyhow!(format!(
1020 "Couldn't load the native certificate store: {}",
1021 certs_result.errors[0]
1022 )))?
1023 }
1024 let certs = certs_result.certs;
1025
1026 for cert in certs {
1027 match roots.add(cert) {
1028 Ok(_) => (),
1029 Err(err) => {
1030 logger
1031 .send(LogMessage::new(
1032 format!("Couldn't add a certificate to the certificate store: {err}"),
1033 true,
1034 ))
1035 .await
1036 .unwrap_or_default();
1037 Err(anyhow::anyhow!(format!(
1038 "Couldn't add a certificate to the certificate store: {}",
1039 err
1040 )))?
1041 }
1042 }
1043 }
1044 tls_config_builder_wants_verifier
1045 .with_client_cert_verifier(WebPkiClientVerifier::builder(Arc::new(roots)).build()?)
1046 }
1047 _ => tls_config_builder_wants_verifier.with_no_client_auth(),
1048 };
1049
1050 let mut tls_config;
1051
1052 let mut addr = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 80));
1053 let mut addr_tls = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 443));
1054 let mut tls_enabled = false;
1055 let mut non_tls_disabled = false;
1056
1057 if crypto_provider.install_default().is_err() && first_startup {
1059 logger
1060 .send(LogMessage::new(
1061 "Cannot install a process-wide cryptography provider".to_string(),
1062 true,
1063 ))
1064 .await
1065 .unwrap_or_default();
1066 Err(anyhow::anyhow!(
1067 "Cannot install a process-wide cryptography provider"
1068 ))?;
1069 }
1070
1071 if let Some(read_port) = yaml_config["global"]["port"].as_i64() {
1073 addr = SocketAddr::from((
1074 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1075 match read_port.try_into() {
1076 Ok(port) => port,
1077 Err(_) => {
1078 logger
1079 .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1080 .await
1081 .unwrap_or_default();
1082 Err(anyhow::anyhow!("Invalid HTTP port"))?
1083 }
1084 },
1085 ));
1086 } else if let Some(read_port) = yaml_config["global"]["port"].as_str() {
1087 addr = match read_port.parse() {
1088 Ok(addr) => addr,
1089 Err(_) => {
1090 logger
1091 .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1092 .await
1093 .unwrap_or_default();
1094 Err(anyhow::anyhow!("Invalid HTTP port"))?
1095 }
1096 };
1097 }
1098
1099 if let Some(read_tls_enabled) = yaml_config["global"]["secure"].as_bool() {
1100 tls_enabled = read_tls_enabled;
1101 if let Some(read_non_tls_disabled) =
1102 yaml_config["global"]["disableNonEncryptedServer"].as_bool()
1103 {
1104 non_tls_disabled = read_non_tls_disabled;
1105 }
1106 }
1107
1108 if let Some(read_port) = yaml_config["global"]["sport"].as_i64() {
1109 addr_tls = SocketAddr::from((
1110 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1111 match read_port.try_into() {
1112 Ok(port) => port,
1113 Err(_) => {
1114 logger
1115 .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1116 .await
1117 .unwrap_or_default();
1118 Err(anyhow::anyhow!("Invalid HTTPS port"))?
1119 }
1120 },
1121 ));
1122 } else if let Some(read_port) = yaml_config["global"]["sport"].as_str() {
1123 addr_tls = match read_port.parse() {
1124 Ok(addr) => addr,
1125 Err(_) => {
1126 logger
1127 .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1128 .await
1129 .unwrap_or_default();
1130 Err(anyhow::anyhow!("Invalid HTTPS port"))?
1131 }
1132 };
1133 }
1134
1135 let mut acme_domains = Vec::new();
1137 if let Some(hosts_config) = yaml_config["hosts"].as_vec() {
1138 for host_yaml in hosts_config.iter() {
1139 if let Some(host) = host_yaml.as_hash() {
1140 if let Some(domain_yaml) = host.get(&Yaml::from_str("domain")) {
1141 if let Some(domain) = domain_yaml.as_str() {
1142 if !domain.contains("*") {
1143 acme_domains.push(domain);
1144 }
1145 }
1146 }
1147 }
1148 }
1149 }
1150
1151 let mut acme_config = AcmeConfig::new(acme_domains).challenge_type(acme_challenge_type);
1153 if let Some(acme_contact_unwrapped) = acme_contact {
1154 acme_config = acme_config.contact_push(format!("mailto:{acme_contact_unwrapped}"));
1155 }
1156 let mut acme_config_with_cache = acme_config.cache_option(acme_cache);
1157 acme_config_with_cache =
1158 acme_config_with_cache.directory_lets_encrypt(acme_letsencrypt_production);
1159
1160 let (acme_config, acme_http01_resolver) = if tls_enabled && automatic_tls_enabled {
1161 let mut acme_state = acme_config_with_cache.state();
1162
1163 let acme_resolver = acme_state.resolver();
1164
1165 tls_config = if yaml_config["global"]["enableOCSPStapling"]
1167 .as_bool()
1168 .unwrap_or(true)
1169 {
1170 tls_config_builder_wants_server_cert
1171 .with_cert_resolver(Arc::new(Stapler::new(acme_resolver.clone())))
1172 } else {
1173 tls_config_builder_wants_server_cert.with_cert_resolver(acme_resolver.clone())
1174 };
1175
1176 let acme_logger = logger.clone();
1177 tokio::spawn(async move {
1178 while let Some(acme_result) = acme_state.next().await {
1179 if let Err(acme_error) = acme_result {
1180 acme_logger
1181 .send(LogMessage::new(
1182 format!("Error while obtaining a TLS certificate: {acme_error}"),
1183 true,
1184 ))
1185 .await
1186 .unwrap_or_default();
1187 }
1188 }
1189 });
1190
1191 if acme_use_http_challenge {
1192 (None, Some(acme_resolver))
1193 } else {
1194 let mut acme_config = tls_config.clone();
1195 acme_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
1196
1197 (Some(acme_config), None)
1198 }
1199 } else {
1200 tls_config = if yaml_config["global"]["enableOCSPStapling"]
1202 .as_bool()
1203 .unwrap_or(true)
1204 {
1205 let ocsp_stapler_arc = Arc::new(Stapler::new(Arc::new(sni_resolver)));
1206 for certified_key in certified_keys.iter() {
1207 ocsp_stapler_arc.preload(certified_key.clone());
1208 }
1209 tls_config_builder_wants_server_cert.with_cert_resolver(ocsp_stapler_arc.clone())
1210 } else {
1211 tls_config_builder_wants_server_cert.with_cert_resolver(Arc::new(sni_resolver))
1212 };
1213
1214 drop(acme_config_with_cache);
1216 (None, None)
1217 };
1218
1219 let quic_config = if tls_enabled
1220 && yaml_config["global"]["enableHTTP3"]
1221 .as_bool()
1222 .unwrap_or(false)
1223 {
1224 let mut quic_tls_config = tls_config.clone();
1225 quic_tls_config.max_early_data_size = u32::MAX;
1226 quic_tls_config.alpn_protocols = vec![b"h3".to_vec(), b"h3-29".to_vec()];
1227 let quic_config = quinn::ServerConfig::with_crypto(Arc::new(match QuicServerConfig::try_from(
1228 quic_tls_config,
1229 ) {
1230 Ok(quinn_config) => quinn_config,
1231 Err(err) => {
1232 logger
1233 .send(LogMessage::new(
1234 format!("There was a problem when starting HTTP/3 server: {err}"),
1235 true,
1236 ))
1237 .await
1238 .unwrap_or_default();
1239 Err(anyhow::anyhow!(format!(
1240 "There was a problem when starting HTTP/3 server: {}",
1241 err
1242 )))?
1243 }
1244 }));
1245 Some(quic_config)
1246 } else {
1247 None
1248 };
1249
1250 let mut alpn_protocols = vec![b"http/1.1".to_vec(), b"http/1.0".to_vec()];
1252 if yaml_config["global"]["enableHTTP2"]
1253 .as_bool()
1254 .unwrap_or(true)
1255 {
1256 alpn_protocols.insert(0, b"h2".to_vec());
1257 }
1258 tls_config.alpn_protocols = alpn_protocols;
1259 let tls_config_arc = Arc::new(tls_config);
1260 let acme_config_arc = acme_config.map(Arc::new);
1261
1262 let mut listener = None;
1263 let mut listener_tls = None;
1264 let mut listener_quic = None;
1265
1266 if !non_tls_disabled {
1268 println!("HTTP server is listening at {addr}");
1269 listener = Some(match TcpListener::bind(addr).await {
1270 Ok(listener) => listener,
1271 Err(err) => {
1272 logger
1273 .send(LogMessage::new(
1274 format!("Cannot listen to HTTP port: {err}"),
1275 true,
1276 ))
1277 .await
1278 .unwrap_or_default();
1279 Err(anyhow::anyhow!(format!(
1280 "Cannot listen to HTTP port: {}",
1281 err
1282 )))?
1283 }
1284 });
1285 }
1286
1287 if tls_enabled {
1288 println!("HTTPS server is listening at {addr_tls}");
1289 listener_tls = Some(match TcpListener::bind(addr_tls).await {
1290 Ok(listener) => listener,
1291 Err(err) => {
1292 logger
1293 .send(LogMessage::new(
1294 format!("Cannot listen to HTTPS port: {err}"),
1295 true,
1296 ))
1297 .await
1298 .unwrap_or_default();
1299 Err(anyhow::anyhow!(format!(
1300 "Cannot listen to HTTPS port: {}",
1301 err
1302 )))?
1303 }
1304 });
1305
1306 if let Some(quic_config) = quic_config {
1307 println!("HTTP/3 server is listening at {addr_tls}");
1308 listener_quic = Some(match quinn::Endpoint::server(quic_config, addr_tls) {
1309 Ok(listener) => listener,
1310 Err(err) => {
1311 logger
1312 .send(LogMessage::new(
1313 format!("Cannot listen to HTTP/3 port: {err}"),
1314 true,
1315 ))
1316 .await
1317 .unwrap_or_default();
1318 Err(anyhow::anyhow!(format!(
1319 "Cannot listen to HTTP/3 port: {}",
1320 err
1321 )))?
1322 }
1323 });
1324 }
1325 }
1326
1327 let modules_arc = Arc::new(modules);
1329
1330 let http3_enabled = if listener_quic.is_some() {
1331 Some(addr_tls.port())
1332 } else {
1333 None
1334 };
1335
1336 loop {
1338 let listener_borrowed = &listener;
1339 let listener_accept = async move {
1340 if let Some(listener) = listener_borrowed {
1341 listener.accept().await
1342 } else {
1343 futures_util::future::pending().await
1344 }
1345 };
1346
1347 let listener_tls_borrowed = &listener_tls;
1348 let listener_tls_accept = async move {
1349 if let Some(listener_tls) = listener_tls_borrowed {
1350 listener_tls.accept().await
1351 } else {
1352 futures_util::future::pending().await
1353 }
1354 };
1355
1356 let listener_quic_borrowed = &listener_quic;
1357 let listener_quic_accept = async move {
1358 if let Some(listener_quic) = listener_quic_borrowed {
1359 listener_quic.accept().await
1360 } else {
1361 futures_util::future::pending().await
1362 }
1363 };
1364
1365 if listener_borrowed.is_none()
1366 && listener_tls_borrowed.is_none()
1367 && listener_quic_borrowed.is_none()
1368 {
1369 logger
1370 .send(LogMessage::new(
1371 String::from("No server is listening"),
1372 true,
1373 ))
1374 .await
1375 .unwrap_or_default();
1376 Err(anyhow::anyhow!("No server is listening"))?;
1377 }
1378
1379 tokio::select! {
1380 status = listener_accept => {
1381 match status {
1382 Ok((stream, remote_address)) => {
1383 accept_connection(
1384 stream,
1385 remote_address,
1386 None,
1387 acme_http01_resolver.clone(),
1388 yaml_config.clone(),
1389 logger.clone(),
1390 modules_arc.clone(),
1391 None
1392 )
1393 .await;
1394 }
1395 Err(err) => {
1396 logger
1397 .send(LogMessage::new(
1398 format!("Cannot accept a connection: {err}"),
1399 true,
1400 ))
1401 .await
1402 .unwrap_or_default();
1403 }
1404 }
1405 },
1406 status = listener_tls_accept => {
1407 match status {
1408 Ok((stream, remote_address)) => {
1409 accept_connection(
1410 stream,
1411 remote_address,
1412 Some((tls_config_arc.clone(), acme_config_arc.clone())),
1413 None,
1414 yaml_config.clone(),
1415 logger.clone(),
1416 modules_arc.clone(),
1417 http3_enabled
1418 )
1419 .await;
1420 }
1421 Err(err) => {
1422 logger
1423 .send(LogMessage::new(
1424 format!("Cannot accept a connection: {err}"),
1425 true,
1426 ))
1427 .await
1428 .unwrap_or_default();
1429 }
1430 }
1431 },
1432 status = listener_quic_accept => {
1433 match status {
1434 Some(connection_attempt) => {
1435 let local_ip = SocketAddr::new(connection_attempt.local_ip().unwrap_or(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))), addr_tls.port());
1436 accept_quic_connection(
1437 connection_attempt,
1438 local_ip,
1439 yaml_config.clone(),
1440 logger.clone(),
1441 modules_arc.clone()
1442 )
1443 .await;
1444 }
1445 None => {
1446 logger
1447 .send(LogMessage::new(
1448 "HTTP/3 connections can't be accepted anymore".to_string(),
1449 true,
1450 ))
1451 .await
1452 .unwrap_or_default();
1453 }
1454 }
1455 }
1456 };
1457 }
1458}
1459
1460#[allow(clippy::type_complexity)]
1462pub fn start_server(
1463 yaml_config: Arc<Yaml>,
1464 modules: Vec<Box<dyn ServerModule + Send + Sync>>,
1465 module_error: Option<anyhow::Error>,
1466 modules_optional_builtin: Vec<String>,
1467 first_startup: bool,
1468) -> Result<bool, Box<dyn Error + Send + Sync>> {
1469 if let Some(environment_variables_hash) = yaml_config["global"]["environmentVariables"].as_hash()
1470 {
1471 let environment_variables_hash_iter = environment_variables_hash.iter();
1472 for (variable_name, variable_value) in environment_variables_hash_iter {
1473 if let Some(variable_name) = variable_name.as_str() {
1474 if let Some(variable_value) = variable_value.as_str() {
1475 if !variable_name.is_empty()
1476 && !variable_name.contains('\0')
1477 && !variable_name.contains('=')
1478 && !variable_value.contains('\0')
1479 {
1480 env::set_var(variable_name, variable_value);
1484 }
1485 }
1486 }
1487 }
1488 }
1489
1490 let available_parallelism = thread::available_parallelism()?.get();
1491
1492 let server_runtime = tokio::runtime::Builder::new_multi_thread()
1494 .worker_threads(available_parallelism)
1495 .max_blocking_threads(1536)
1496 .event_interval(25)
1497 .thread_name("server-pool")
1498 .enable_all()
1499 .build()?;
1500
1501 let log_runtime = tokio::runtime::Builder::new_multi_thread()
1503 .worker_threads(match available_parallelism / 2 {
1504 0 => 1,
1505 non_zero => non_zero,
1506 })
1507 .max_blocking_threads(768)
1508 .thread_name("log-pool")
1509 .enable_time()
1510 .build()?;
1511
1512 let (logger, receive_log) = async_channel::bounded::<LogMessage>(10000);
1513
1514 let log_filename = yaml_config["global"]["logFilePath"]
1515 .as_str()
1516 .map(String::from);
1517 let error_log_filename = yaml_config["global"]["errorLogFilePath"]
1518 .as_str()
1519 .map(String::from);
1520
1521 log_runtime.spawn(async move {
1522 let log_file = match log_filename {
1523 Some(log_filename) => Some(
1524 fs::OpenOptions::new()
1525 .append(true)
1526 .create(true)
1527 .open(log_filename)
1528 .await,
1529 ),
1530 None => None,
1531 };
1532
1533 let error_log_file = match error_log_filename {
1534 Some(error_log_filename) => Some(
1535 fs::OpenOptions::new()
1536 .append(true)
1537 .create(true)
1538 .open(error_log_filename)
1539 .await,
1540 ),
1541 None => None,
1542 };
1543
1544 let log_file_wrapped = match log_file {
1545 Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1546 Some(Err(e)) => {
1547 eprintln!("Failed to open log file: {e}");
1548 None
1549 }
1550 None => None,
1551 };
1552
1553 let error_log_file_wrapped = match error_log_file {
1554 Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1555 Some(Err(e)) => {
1556 eprintln!("Failed to open error log file: {e}");
1557 None
1558 }
1559 None => None,
1560 };
1561
1562 let log_file_wrapped_cloned_for_sleep = log_file_wrapped.clone();
1564 let error_log_file_wrapped_cloned_for_sleep = error_log_file_wrapped.clone();
1565 tokio::task::spawn(async move {
1566 let mut interval = time::interval(time::Duration::from_millis(100));
1567 loop {
1568 interval.tick().await;
1569 if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned_for_sleep.clone() {
1570 let mut locked_file = log_file_wrapped_cloned.lock().await;
1571 locked_file.flush().await.unwrap_or_default();
1572 }
1573 if let Some(error_log_file_wrapped_cloned) = error_log_file_wrapped_cloned_for_sleep.clone()
1574 {
1575 let mut locked_file = error_log_file_wrapped_cloned.lock().await;
1576 locked_file.flush().await.unwrap_or_default();
1577 }
1578 }
1579 });
1580
1581 while let Ok(message) = receive_log.recv().await {
1583 let (mut message, is_error) = message.get_message();
1584 let log_file_wrapped_cloned = if !is_error {
1585 log_file_wrapped.clone()
1586 } else {
1587 error_log_file_wrapped.clone()
1588 };
1589
1590 if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned {
1591 tokio::task::spawn(async move {
1592 let mut locked_file = log_file_wrapped_cloned.lock().await;
1593 if is_error {
1594 let now: DateTime<Local> = Local::now();
1595 let formatted_time = now.format("%Y-%m-%d %H:%M:%S").to_string();
1596 message = format!("[{formatted_time}]: {message}");
1597 }
1598 message.push('\n');
1599 if let Err(e) = locked_file.write(message.as_bytes()).await {
1600 eprintln!("Failed to write to log file: {e}");
1601 }
1602 });
1603 }
1604 }
1605 });
1606
1607 for msg in env_config::log_env_var_overrides() {
1609 logger
1610 .send_blocking(LogMessage::new(msg, true))
1611 .unwrap_or_default();
1612 }
1613
1614 let result = server_runtime.block_on(async {
1616 let event_loop_future = server_event_loop(
1617 yaml_config,
1618 logger,
1619 modules,
1620 module_error,
1621 modules_optional_builtin,
1622 first_startup,
1623 );
1624
1625 let (continue_tx, continue_rx) = async_channel::unbounded::<bool>();
1626 let cancel_token = CancellationToken::new();
1627
1628 #[cfg(unix)]
1629 {
1630 let cancel_token_clone = cancel_token.clone();
1631 let continue_tx_clone = continue_tx.clone();
1632 tokio::spawn(async move {
1633 if let Ok(mut signal) = signal::unix::signal(signal::unix::SignalKind::hangup()) {
1634 tokio::select! {
1635 _ = signal.recv() => {
1636 continue_tx_clone.send(true).await.unwrap_or_default();
1637 }
1638 _ = cancel_token_clone.cancelled() => {}
1639 }
1640 }
1641 });
1642 }
1643
1644 let cancel_token_clone = cancel_token.clone();
1645 tokio::spawn(async move {
1646 tokio::select! {
1647 result = signal::ctrl_c() => {
1648 if result.is_ok() {
1649 continue_tx.send(false).await.unwrap_or_default();
1650 }
1651 }
1652 _ = cancel_token_clone.cancelled() => {}
1653 }
1654 });
1655
1656 let result = tokio::select! {
1657 result = event_loop_future => {
1658 time::sleep(tokio::time::Duration::from_millis(100)).await;
1660
1661 result.map(|_| false)
1662 },
1663 continue_running = continue_rx.recv() => Ok(continue_running?)
1664 };
1665
1666 cancel_token.cancel();
1667
1668 result
1669 });
1670
1671 server_runtime.shutdown_timeout(time::Duration::from_secs(10));
1673
1674 result
1675}