1use std::error::Error;
2use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::{env, thread};
6
7use crate::ferron_common::{LogMessage, ServerModule, ServerModuleHandlers};
8use crate::ferron_request_handler::request_handler;
9use crate::ferron_util::env_config;
10use crate::ferron_util::load_tls::{load_certs, load_private_key};
11use crate::ferron_util::sni::CustomSniResolver;
12use crate::ferron_util::validate_config::{prepare_config_for_validation, validate_config};
13use async_channel::Sender;
14use chrono::prelude::*;
15use futures_util::StreamExt;
16use h3_quinn::quinn;
17use h3_quinn::quinn::crypto::rustls::QuicServerConfig;
18use http::Response;
19use http_body_util::{BodyExt, StreamBody};
20use hyper::body::{Buf, Bytes, Frame, Incoming};
21use hyper::service::service_fn;
22use hyper::Request;
23use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
24use ocsp_stapler::Stapler;
25use rustls::crypto::ring::cipher_suite::*;
26use rustls::crypto::ring::default_provider;
27use rustls::crypto::ring::kx_group::*;
28use rustls::server::{Acceptor, WebPkiClientVerifier};
29use rustls::sign::CertifiedKey;
30use rustls::version::{TLS12, TLS13};
31use rustls::{RootCertStore, ServerConfig};
32use rustls_acme::acme::ACME_TLS_ALPN_NAME;
33use rustls_acme::caches::DirCache;
34use rustls_acme::{is_tls_alpn_challenge, AcmeConfig, ResolvesServerCertAcme, UseChallenge};
35use rustls_native_certs::load_native_certs;
36use tokio::fs;
37use tokio::io::{AsyncWriteExt, BufWriter};
38use tokio::net::{TcpListener, TcpStream};
39use tokio::runtime::Handle;
40use tokio::signal;
41use tokio::sync::Mutex;
42use tokio::time;
43use tokio_rustls::server::TlsStream;
44use tokio_rustls::LazyConfigAcceptor;
45use tokio_util::sync::CancellationToken;
46use yaml_rust2::Yaml;
47
48#[allow(clippy::large_enum_variant)]
50enum MaybeTlsStream {
51 Tls(TlsStream<TcpStream>),
52 Plain(TcpStream),
53}
54
55#[allow(clippy::too_many_arguments)]
57async fn accept_quic_connection(
58 connection_attempt: quinn::Incoming,
59 local_address: SocketAddr,
60 config: Arc<Yaml>,
61 logger: Sender<LogMessage>,
62 modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
63) {
64 let remote_address = connection_attempt.remote_address();
65
66 let logger_clone = logger.clone();
67
68 tokio::task::spawn(async move {
69 match connection_attempt.await {
70 Ok(connection) => {
71 let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
72 match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
73 Ok(h3_conn) => h3_conn,
74 Err(err) => {
75 logger_clone
76 .send(LogMessage::new(
77 format!("Error serving HTTP/3 connection: {}", err),
78 true,
79 ))
80 .await
81 .unwrap_or_default();
82 return;
83 }
84 };
85
86 loop {
87 match h3_conn.accept().await {
88 Ok(Some(resolver)) => {
89 let config = config.clone();
90 let remote_address = remote_address;
91
92 let logger_clone = logger_clone.clone();
93 let modules = modules.clone();
94 tokio::spawn(async move {
95 let handlers_vec = modules
96 .iter()
97 .map(|module| module.get_handlers(Handle::current()));
98
99 let (request, stream) = match resolver.resolve_request().await {
100 Ok(resolved) => resolved,
101 Err(err) => {
102 logger_clone
103 .send(LogMessage::new(
104 format!("Error serving HTTP/3 connection: {}", err),
105 true,
106 ))
107 .await
108 .unwrap_or_default();
109 return;
110 }
111 };
112 let (mut send, receive) = stream.split();
113 let request_body_stream = futures_util::stream::unfold(
114 (receive, false),
115 async move |(mut receive, mut is_body_finished)| loop {
116 if !is_body_finished {
117 match receive.recv_data().await {
118 Ok(Some(mut data)) => {
119 return Some((
120 Ok(Frame::data(data.copy_to_bytes(data.remaining()))),
121 (receive, false),
122 ))
123 }
124 Ok(None) => is_body_finished = true,
125 Err(err) => {
126 return Some((
127 Err(std::io::Error::other(err.to_string())),
128 (receive, false),
129 ))
130 }
131 }
132 } else {
133 match receive.recv_trailers().await {
134 Ok(Some(trailers)) => {
135 return Some((Ok(Frame::trailers(trailers)), (receive, true)))
136 }
137 Ok(None) => {
138 return None;
139 }
140 Err(err) => {
141 return Some((
142 Err(std::io::Error::other(err.to_string())),
143 (receive, true),
144 ))
145 }
146 }
147 }
148 },
149 );
150 let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
151 let (request_parts, _) = request.into_parts();
152 let request = Request::from_parts(request_parts, request_body);
153 let handlers_vec_clone = handlers_vec
154 .clone()
155 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
156 let mut response = match request_handler(
157 request,
158 remote_address,
159 local_address,
160 true,
161 config,
162 logger_clone.clone(),
163 handlers_vec_clone,
164 None,
165 None,
166 )
167 .await
168 {
169 Ok(response) => response,
170 Err(err) => {
171 logger_clone
172 .send(LogMessage::new(
173 format!("Error serving HTTP/3 connection: {}", err),
174 true,
175 ))
176 .await
177 .unwrap_or_default();
178 return;
179 }
180 };
181 if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
182 response
183 .headers_mut()
184 .entry(http::header::DATE)
185 .or_insert(http_date);
186 }
187 let (response_parts, mut response_body) = response.into_parts();
188 if let Err(err) = send
189 .send_response(Response::from_parts(response_parts, ()))
190 .await
191 {
192 logger_clone
193 .send(LogMessage::new(
194 format!("Error serving HTTP/3 connection: {}", err),
195 true,
196 ))
197 .await
198 .unwrap_or_default();
199 return;
200 }
201 let mut had_trailers = false;
202 while let Some(chunk) = response_body.frame().await {
203 match chunk {
204 Ok(frame) => {
205 if frame.is_data() {
206 match frame.into_data() {
207 Ok(data) => {
208 if let Err(err) = send.send_data(data).await {
209 logger_clone
210 .send(LogMessage::new(
211 format!("Error serving HTTP/3 connection: {}", err),
212 true,
213 ))
214 .await
215 .unwrap_or_default();
216 return;
217 }
218 }
219 Err(_) => {
220 logger_clone
221 .send(LogMessage::new(
222 "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
223 true,
224 ))
225 .await
226 .unwrap_or_default();
227 return;
228 }
229 }
230 } else if frame.is_trailers() {
231 match frame.into_trailers() {
232 Ok(trailers) => {
233 had_trailers = true;
234 if let Err(err) = send.send_trailers(trailers).await {
235 logger_clone
236 .send(LogMessage::new(
237 format!("Error serving HTTP/3 connection: {}", err),
238 true,
239 ))
240 .await
241 .unwrap_or_default();
242 return;
243 }
244 }
245 Err(_) => {
246 logger_clone
247 .send(LogMessage::new(
248 "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
249 true,
250 ))
251 .await
252 .unwrap_or_default();
253 return;
254 }
255 }
256 }
257 }
258 Err(err) => {
259 logger_clone
260 .send(LogMessage::new(
261 format!("Error serving HTTP/3 connection: {}", err),
262 true,
263 ))
264 .await
265 .unwrap_or_default();
266 return;
267 }
268 }
269 }
270 if !had_trailers {
271 if let Err(err) = send.finish().await {
272 logger_clone
273 .send(LogMessage::new(
274 format!("Error serving HTTP/3 connection: {}", err),
275 true,
276 ))
277 .await
278 .unwrap_or_default();
279 }
280 }
281 });
282 }
283 Ok(None) => break,
284 Err(err) => {
285 logger_clone
286 .send(LogMessage::new(
287 format!("Error serving HTTP/3 connection: {}", err),
288 true,
289 ))
290 .await
291 .unwrap_or_default();
292 return;
293 }
294 }
295 }
296 }
297 Err(err) => {
298 logger_clone
299 .send(LogMessage::new(
300 format!("Cannot accept a connection: {}", err),
301 true,
302 ))
303 .await
304 .unwrap_or_default();
305 }
306 }
307 });
308}
309
310#[allow(clippy::too_many_arguments)]
312async fn accept_connection(
313 stream: TcpStream,
314 remote_address: SocketAddr,
315 tls_config_option: Option<(Arc<ServerConfig>, Option<Arc<ServerConfig>>)>,
316 acme_http01_resolver_option: Option<Arc<ResolvesServerCertAcme>>,
317 config: Arc<Yaml>,
318 logger: Sender<LogMessage>,
319 modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
320 http3_enabled: Option<u16>,
321) {
322 if let Err(err) = stream.set_nodelay(true) {
324 logger
325 .send(LogMessage::new(
326 format!("Cannot disable Nagle algorithm: {}", err),
327 true,
328 ))
329 .await
330 .unwrap_or_default();
331 return;
332 };
333
334 let config = config.clone();
335 let local_address = match stream.local_addr() {
336 Ok(local_address) => local_address,
337 Err(err) => {
338 logger
339 .send(LogMessage::new(
340 format!("Cannot obtain local address of the connection: {}", err),
341 true,
342 ))
343 .await
344 .unwrap_or_default();
345 return;
346 }
347 };
348
349 let logger_clone = logger.clone();
350
351 tokio::task::spawn(async move {
352 let maybe_tls_stream = if let Some((tls_config, acme_config_option)) = tls_config_option {
353 let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), stream).await {
354 Ok(start_handshake) => start_handshake,
355 Err(err) => {
356 logger
357 .send(LogMessage::new(
358 format!("Error during TLS handshake: {}", err),
359 true,
360 ))
361 .await
362 .unwrap_or_default();
363 return;
364 }
365 };
366
367 if let Some(acme_config) = acme_config_option {
368 if is_tls_alpn_challenge(&start_handshake.client_hello()) {
369 match start_handshake.into_stream(acme_config).await {
370 Ok(_) => (),
371 Err(err) => {
372 logger
373 .send(LogMessage::new(
374 format!("Error during TLS handshake: {}", err),
375 true,
376 ))
377 .await
378 .unwrap_or_default();
379 return;
380 }
381 };
382 return;
383 }
384 }
385
386 let tls_stream = match start_handshake.into_stream(tls_config).await {
387 Ok(tls_stream) => tls_stream,
388 Err(err) => {
389 logger
390 .send(LogMessage::new(
391 format!("Error during TLS handshake: {}", err),
392 true,
393 ))
394 .await
395 .unwrap_or_default();
396 return;
397 }
398 };
399
400 MaybeTlsStream::Tls(tls_stream)
401 } else {
402 MaybeTlsStream::Plain(stream)
403 };
404
405 if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
406 let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
407 let is_http2;
408
409 if config["global"]["enableHTTP2"].as_bool().unwrap_or(true) {
410 if alpn_protocol == Some("h2".as_bytes()) {
411 is_http2 = true;
412 } else {
413 is_http2 = false;
415 }
416 } else {
417 is_http2 = false;
418 }
419
420 let io = TokioIo::new(tls_stream);
421 let handlers_vec = modules
422 .iter()
423 .map(|module| module.get_handlers(Handle::current()));
424
425 if is_http2 {
426 let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
427 http2_builder.timer(TokioTimer::new());
428 if let Some(initial_window_size) =
429 config["global"]["http2Settings"]["initialWindowSize"].as_i64()
430 {
431 http2_builder.initial_stream_window_size(initial_window_size as u32);
432 }
433 if let Some(max_frame_size) = config["global"]["http2Settings"]["maxFrameSize"].as_i64() {
434 http2_builder.max_frame_size(max_frame_size as u32);
435 }
436 if let Some(max_concurrent_streams) =
437 config["global"]["http2Settings"]["maxConcurrentStreams"].as_i64()
438 {
439 http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
440 }
441 if let Some(max_header_list_size) =
442 config["global"]["http2Settings"]["maxHeaderListSize"].as_i64()
443 {
444 http2_builder.max_header_list_size(max_header_list_size as u32);
445 }
446 if let Some(enable_connect_protocol) =
447 config["global"]["http2Settings"]["enableConnectProtocol"].as_bool()
448 {
449 if enable_connect_protocol {
450 http2_builder.enable_connect_protocol();
451 }
452 }
453
454 if let Err(err) = http2_builder
455 .serve_connection(
456 io,
457 service_fn(move |request: Request<Incoming>| {
458 let config = config.clone();
459 let logger = logger_clone.clone();
460 let handlers_vec_clone = handlers_vec
461 .clone()
462 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
463 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
464 let (request_parts, request_body) = request.into_parts();
465 let request = Request::from_parts(
466 request_parts,
467 request_body
468 .map_err(|e| std::io::Error::other(e.to_string()))
469 .boxed(),
470 );
471 request_handler(
472 request,
473 remote_address,
474 local_address,
475 true,
476 config,
477 logger,
478 handlers_vec_clone,
479 acme_http01_resolver_option_clone,
480 http3_enabled,
481 )
482 }),
483 )
484 .await
485 {
486 logger
487 .send(LogMessage::new(
488 format!("Error serving HTTPS connection: {}", err),
489 true,
490 ))
491 .await
492 .unwrap_or_default();
493 }
494 } else {
495 let mut http1_builder = hyper::server::conn::http1::Builder::new();
496
497 http1_builder.timer(TokioTimer::new());
499
500 if let Err(err) = http1_builder
501 .serve_connection(
502 io,
503 service_fn(move |request: Request<Incoming>| {
504 let config = config.clone();
505 let logger = logger_clone.clone();
506 let handlers_vec_clone = handlers_vec
507 .clone()
508 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
509 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
510 let (request_parts, request_body) = request.into_parts();
511 let request = Request::from_parts(
512 request_parts,
513 request_body
514 .map_err(|e| std::io::Error::other(e.to_string()))
515 .boxed(),
516 );
517 request_handler(
518 request,
519 remote_address,
520 local_address,
521 true,
522 config,
523 logger,
524 handlers_vec_clone,
525 acme_http01_resolver_option_clone,
526 http3_enabled,
527 )
528 }),
529 )
530 .with_upgrades()
531 .await
532 {
533 logger
534 .send(LogMessage::new(
535 format!("Error serving HTTPS connection: {}", err),
536 true,
537 ))
538 .await
539 .unwrap_or_default();
540 }
541 }
542 } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
543 let io = TokioIo::new(stream);
544 let handlers_vec = modules
545 .iter()
546 .map(|module| module.get_handlers(Handle::current()));
547
548 let mut http1_builder = hyper::server::conn::http1::Builder::new();
549
550 http1_builder.timer(TokioTimer::new());
552
553 if let Err(err) = http1_builder
554 .serve_connection(
555 io,
556 service_fn(move |request: Request<Incoming>| {
557 let config = config.clone();
558 let logger = logger_clone.clone();
559 let handlers_vec_clone = handlers_vec
560 .clone()
561 .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
562 let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
563 let (request_parts, request_body) = request.into_parts();
564 let request = Request::from_parts(
565 request_parts,
566 request_body
567 .map_err(|e| std::io::Error::other(e.to_string()))
568 .boxed(),
569 );
570 request_handler(
571 request,
572 remote_address,
573 local_address,
574 false,
575 config,
576 logger,
577 handlers_vec_clone,
578 acme_http01_resolver_option_clone,
579 http3_enabled,
580 )
581 }),
582 )
583 .with_upgrades()
584 .await
585 {
586 logger
587 .send(LogMessage::new(
588 format!("Error serving HTTP connection: {}", err),
589 true,
590 ))
591 .await
592 .unwrap_or_default();
593 }
594 }
595 });
596}
597
598#[allow(clippy::type_complexity)]
600async fn server_event_loop(
601 yaml_config: Arc<Yaml>,
602 logger: Sender<LogMessage>,
603 modules: Vec<Box<dyn ServerModule + Send + Sync>>,
604 module_error: Option<anyhow::Error>,
605 modules_optional_builtin: Vec<String>,
606 first_startup: bool,
607) -> Result<(), Box<dyn Error + Send + Sync>> {
608 if let Some(module_error) = module_error {
609 logger
610 .send(LogMessage::new(module_error.to_string(), true))
611 .await
612 .unwrap_or_default();
613 Err(module_error)?
614 }
615
616 let prepared_config = match prepare_config_for_validation(&yaml_config) {
617 Ok(prepared_config) => prepared_config,
618 Err(err) => {
619 logger
620 .send(LogMessage::new(
621 format!("Server configuration validation failed: {}", err),
622 true,
623 ))
624 .await
625 .unwrap_or_default();
626 Err(anyhow::anyhow!(format!(
627 "Server configuration validation failed: {}",
628 err
629 )))?
630 }
631 };
632
633 for (config_to_validate, is_global, is_location, is_error_config) in prepared_config {
634 match validate_config(
635 config_to_validate,
636 is_global,
637 is_location,
638 is_error_config,
639 &modules_optional_builtin,
640 ) {
641 Ok(unused_properties) => {
642 for unused_property in unused_properties {
643 logger
644 .send(LogMessage::new(
645 format!(
646 "Unused configuration property detected: \"{}\". You might load an appropriate module to use this configuration property",
647 unused_property
648 ),
649 true,
650 ))
651 .await
652 .unwrap_or_default();
653 }
654 }
655 Err(err) => {
656 logger
657 .send(LogMessage::new(
658 format!("Server configuration validation failed: {}", err),
659 true,
660 ))
661 .await
662 .unwrap_or_default();
663 Err(anyhow::anyhow!(format!(
664 "Server configuration validation failed: {}",
665 err
666 )))?
667 }
668 };
669 }
670
671 let mut crypto_provider = default_provider();
672
673 if let Some(cipher_suite) = yaml_config["global"]["cipherSuite"].as_vec() {
674 let mut cipher_suites = Vec::new();
675 let cipher_suite_iter = cipher_suite.iter();
676 for cipher_suite_yaml in cipher_suite_iter {
677 if let Some(cipher_suite) = cipher_suite_yaml.as_str() {
678 let cipher_suite_to_add = match cipher_suite {
679 "TLS_AES_128_GCM_SHA256" => TLS13_AES_128_GCM_SHA256,
680 "TLS_AES_256_GCM_SHA384" => TLS13_AES_256_GCM_SHA384,
681 "TLS_CHACHA20_POLY1305_SHA256" => TLS13_CHACHA20_POLY1305_SHA256,
682 "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
683 "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
684 "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256" => {
685 TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
686 }
687 "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
688 "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
689 "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256" => {
690 TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
691 }
692 _ => {
693 logger
694 .send(LogMessage::new(
695 format!("The \"{}\" cipher suite is not supported", cipher_suite),
696 true,
697 ))
698 .await
699 .unwrap_or_default();
700 Err(anyhow::anyhow!(format!(
701 "The \"{}\" cipher suite is not supported",
702 cipher_suite
703 )))?
704 }
705 };
706 cipher_suites.push(cipher_suite_to_add);
707 }
708 }
709 crypto_provider.cipher_suites = cipher_suites;
710 }
711
712 if let Some(ecdh_curves) = yaml_config["global"]["ecdhCurve"].as_vec() {
713 let mut kx_groups = Vec::new();
714 let ecdh_curves_iter = ecdh_curves.iter();
715 for ecdh_curve_yaml in ecdh_curves_iter {
716 if let Some(ecdh_curve) = ecdh_curve_yaml.as_str() {
717 let kx_group_to_add = match ecdh_curve {
718 "secp256r1" => SECP256R1,
719 "secp384r1" => SECP384R1,
720 "x25519" => X25519,
721 _ => {
722 logger
723 .send(LogMessage::new(
724 format!("The \"{}\" ECDH curve is not supported", ecdh_curve),
725 true,
726 ))
727 .await
728 .unwrap_or_default();
729 Err(anyhow::anyhow!(format!(
730 "The \"{}\" ECDH curve is not supported",
731 ecdh_curve
732 )))?
733 }
734 };
735 kx_groups.push(kx_group_to_add);
736 }
737 }
738 crypto_provider.kx_groups = kx_groups;
739 }
740
741 let crypto_provider_cloned = crypto_provider.clone();
742 let mut sni_resolver = CustomSniResolver::new();
743 let mut certified_keys = Vec::new();
744
745 let mut automatic_tls_enabled = false;
746 let mut acme_letsencrypt_production = true;
747 let acme_use_http_challenge = yaml_config["global"]["useAutomaticTLSHTTPChallenge"]
748 .as_bool()
749 .unwrap_or(false);
750 let acme_challenge_type = if acme_use_http_challenge {
751 UseChallenge::Http01
752 } else {
753 UseChallenge::TlsAlpn01
754 };
755
756 if let Some(read_automatic_tls_enabled) = yaml_config["global"]["enableAutomaticTLS"].as_bool() {
758 automatic_tls_enabled = read_automatic_tls_enabled;
759 }
760
761 let acme_contact = yaml_config["global"]["automaticTLSContactEmail"].as_str();
762 let acme_cache = yaml_config["global"]["automaticTLSContactCacheDirectory"]
763 .as_str()
764 .map(|s| s.to_string())
765 .map(DirCache::new);
766
767 if let Some(read_acme_letsencrypt_production) =
768 yaml_config["global"]["automaticTLSLetsEncryptProduction"].as_bool()
769 {
770 acme_letsencrypt_production = read_acme_letsencrypt_production;
771 }
772
773 if !automatic_tls_enabled {
774 if let Some(cert_path) = yaml_config["global"]["cert"].as_str() {
776 if let Some(key_path) = yaml_config["global"]["key"].as_str() {
777 let certs = match load_certs(cert_path) {
778 Ok(certs) => certs,
779 Err(err) => {
780 logger
781 .send(LogMessage::new(
782 format!("Cannot load the \"{}\" TLS certificate: {}", cert_path, err),
783 true,
784 ))
785 .await
786 .unwrap_or_default();
787 Err(anyhow::anyhow!(format!(
788 "Cannot load the \"{}\" TLS certificate: {}",
789 cert_path, err
790 )))?
791 }
792 };
793 let key = match load_private_key(key_path) {
794 Ok(key) => key,
795 Err(err) => {
796 logger
797 .send(LogMessage::new(
798 format!("Cannot load the \"{}\" private key: {}", cert_path, err),
799 true,
800 ))
801 .await
802 .unwrap_or_default();
803 Err(anyhow::anyhow!(format!(
804 "Cannot load the \"{}\" private key: {}",
805 cert_path, err
806 )))?
807 }
808 };
809 let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
810 Ok(key) => key,
811 Err(err) => {
812 logger
813 .send(LogMessage::new(
814 format!("Cannot load the \"{}\" private key: {}", cert_path, err),
815 true,
816 ))
817 .await
818 .unwrap_or_default();
819 Err(anyhow::anyhow!(format!(
820 "Cannot load the \"{}\" private key: {}",
821 cert_path, err
822 )))?
823 }
824 };
825 let certified_key = CertifiedKey::new(certs, signing_key);
826 sni_resolver.load_fallback_cert_key(Arc::new(certified_key));
827 }
828 }
829
830 if let Some(sni) = yaml_config["global"]["sni"].as_hash() {
831 let sni_hostnames = sni.keys();
832 for sni_hostname_unknown in sni_hostnames {
833 if let Some(sni_hostname) = sni_hostname_unknown.as_str() {
834 if let Some(cert_path) = sni[sni_hostname_unknown]["cert"].as_str() {
835 if let Some(key_path) = sni[sni_hostname_unknown]["key"].as_str() {
836 let certs = match load_certs(cert_path) {
837 Ok(certs) => certs,
838 Err(err) => {
839 logger
840 .send(LogMessage::new(
841 format!("Cannot load the \"{}\" TLS certificate: {}", cert_path, err),
842 true,
843 ))
844 .await
845 .unwrap_or_default();
846 Err(anyhow::anyhow!(format!(
847 "Cannot load the \"{}\" TLS certificate: {}",
848 cert_path, err
849 )))?
850 }
851 };
852 let key = match load_private_key(key_path) {
853 Ok(key) => key,
854 Err(err) => {
855 logger
856 .send(LogMessage::new(
857 format!("Cannot load the \"{}\" private key: {}", cert_path, err),
858 true,
859 ))
860 .await
861 .unwrap_or_default();
862 Err(anyhow::anyhow!(format!(
863 "Cannot load the \"{}\" private key: {}",
864 cert_path, err
865 )))?
866 }
867 };
868 let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
869 Ok(key) => key,
870 Err(err) => {
871 logger
872 .send(LogMessage::new(
873 format!("Cannot load the \"{}\" private key: {}", cert_path, err),
874 true,
875 ))
876 .await
877 .unwrap_or_default();
878 Err(anyhow::anyhow!(format!(
879 "Cannot load the \"{}\" private key: {}",
880 cert_path, err
881 )))?
882 }
883 };
884 let certified_key_arc = Arc::new(CertifiedKey::new(certs, signing_key));
885 sni_resolver.load_host_cert_key(sni_hostname, certified_key_arc.clone());
886 certified_keys.push(certified_key_arc);
887 }
888 }
889 }
890 }
891 }
892 }
893
894 let tls_config_builder_wants_versions =
896 ServerConfig::builder_with_provider(Arc::new(crypto_provider_cloned));
897
898 let min_tls_version_option = yaml_config["global"]["tlsMinVersion"].as_str();
900 let max_tls_version_option = yaml_config["global"]["tlsMaxVersion"].as_str();
901 let tls_config_builder_wants_verifier = match min_tls_version_option {
902 Some("TLSv1.3") => match max_tls_version_option {
903 Some("TLSv1.2") => {
904 logger
905 .send(LogMessage::new(
906 String::from("The maximum TLS version is older than the minimum TLS version"),
907 true,
908 ))
909 .await
910 .unwrap_or_default();
911 Err(anyhow::anyhow!(String::from(
912 "The maximum TLS version is older than the minimum TLS version"
913 )))?
914 }
915 Some("TLSv1.3") | None => {
916 match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS13]) {
917 Ok(builder) => builder,
918 Err(err) => {
919 logger
920 .send(LogMessage::new(
921 format!("Couldn't create the TLS server configuration: {}", err),
922 true,
923 ))
924 .await
925 .unwrap_or_default();
926 Err(anyhow::anyhow!(format!(
927 "Couldn't create the TLS server configuration: {}",
928 err
929 )))?
930 }
931 }
932 }
933 _ => {
934 logger
935 .send(LogMessage::new(
936 String::from("Invalid maximum TLS version"),
937 true,
938 ))
939 .await
940 .unwrap_or_default();
941 Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
942 }
943 },
944 Some("TLSv1.2") | None => match max_tls_version_option {
945 Some("TLSv1.2") => {
946 match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12]) {
947 Ok(builder) => builder,
948 Err(err) => {
949 logger
950 .send(LogMessage::new(
951 format!("Couldn't create the TLS server configuration: {}", err),
952 true,
953 ))
954 .await
955 .unwrap_or_default();
956 Err(anyhow::anyhow!(format!(
957 "Couldn't create the TLS server configuration: {}",
958 err
959 )))?
960 }
961 }
962 }
963 Some("TLSv1.3") | None => {
964 match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12, &TLS13]) {
965 Ok(builder) => builder,
966 Err(err) => {
967 logger
968 .send(LogMessage::new(
969 format!("Couldn't create the TLS server configuration: {}", err),
970 true,
971 ))
972 .await
973 .unwrap_or_default();
974 Err(anyhow::anyhow!(format!(
975 "Couldn't create the TLS server configuration: {}",
976 err
977 )))?
978 }
979 }
980 }
981 _ => {
982 logger
983 .send(LogMessage::new(
984 String::from("Invalid maximum TLS version"),
985 true,
986 ))
987 .await
988 .unwrap_or_default();
989 Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
990 }
991 },
992 _ => {
993 logger
994 .send(LogMessage::new(
995 String::from("Invalid minimum TLS version"),
996 true,
997 ))
998 .await
999 .unwrap_or_default();
1000 Err(anyhow::anyhow!(String::from("Invalid minimum TLS version")))?
1001 }
1002 };
1003
1004 let tls_config_builder_wants_server_cert =
1005 match yaml_config["global"]["useClientCertificate"].as_bool() {
1006 Some(true) => {
1007 let mut roots = RootCertStore::empty();
1008 let certs_result = load_native_certs();
1009 if !certs_result.errors.is_empty() {
1010 logger
1011 .send(LogMessage::new(
1012 format!(
1013 "Couldn't load the native certificate store: {}",
1014 certs_result.errors[0]
1015 ),
1016 true,
1017 ))
1018 .await
1019 .unwrap_or_default();
1020 Err(anyhow::anyhow!(format!(
1021 "Couldn't load the native certificate store: {}",
1022 certs_result.errors[0]
1023 )))?
1024 }
1025 let certs = certs_result.certs;
1026
1027 for cert in certs {
1028 match roots.add(cert) {
1029 Ok(_) => (),
1030 Err(err) => {
1031 logger
1032 .send(LogMessage::new(
1033 format!(
1034 "Couldn't add a certificate to the certificate store: {}",
1035 err
1036 ),
1037 true,
1038 ))
1039 .await
1040 .unwrap_or_default();
1041 Err(anyhow::anyhow!(format!(
1042 "Couldn't add a certificate to the certificate store: {}",
1043 err
1044 )))?
1045 }
1046 }
1047 }
1048 tls_config_builder_wants_verifier
1049 .with_client_cert_verifier(WebPkiClientVerifier::builder(Arc::new(roots)).build()?)
1050 }
1051 _ => tls_config_builder_wants_verifier.with_no_client_auth(),
1052 };
1053
1054 let mut tls_config;
1055
1056 let mut addr = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 80));
1057 let mut addr_tls = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 443));
1058 let mut tls_enabled = false;
1059 let mut non_tls_disabled = false;
1060
1061 if crypto_provider.install_default().is_err() && first_startup {
1063 logger
1064 .send(LogMessage::new(
1065 "Cannot install a process-wide cryptography provider".to_string(),
1066 true,
1067 ))
1068 .await
1069 .unwrap_or_default();
1070 Err(anyhow::anyhow!(
1071 "Cannot install a process-wide cryptography provider"
1072 ))?;
1073 }
1074
1075 if let Some(read_port) = yaml_config["global"]["port"].as_i64() {
1077 addr = SocketAddr::from((
1078 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1079 match read_port.try_into() {
1080 Ok(port) => port,
1081 Err(_) => {
1082 logger
1083 .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1084 .await
1085 .unwrap_or_default();
1086 Err(anyhow::anyhow!("Invalid HTTP port"))?
1087 }
1088 },
1089 ));
1090 } else if let Some(read_port) = yaml_config["global"]["port"].as_str() {
1091 addr = match read_port.parse() {
1092 Ok(addr) => addr,
1093 Err(_) => {
1094 logger
1095 .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1096 .await
1097 .unwrap_or_default();
1098 Err(anyhow::anyhow!("Invalid HTTP port"))?
1099 }
1100 };
1101 }
1102
1103 if let Some(read_tls_enabled) = yaml_config["global"]["secure"].as_bool() {
1104 tls_enabled = read_tls_enabled;
1105 if let Some(read_non_tls_disabled) =
1106 yaml_config["global"]["disableNonEncryptedServer"].as_bool()
1107 {
1108 non_tls_disabled = read_non_tls_disabled;
1109 }
1110 }
1111
1112 if let Some(read_port) = yaml_config["global"]["sport"].as_i64() {
1113 addr_tls = SocketAddr::from((
1114 IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1115 match read_port.try_into() {
1116 Ok(port) => port,
1117 Err(_) => {
1118 logger
1119 .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1120 .await
1121 .unwrap_or_default();
1122 Err(anyhow::anyhow!("Invalid HTTPS port"))?
1123 }
1124 },
1125 ));
1126 } else if let Some(read_port) = yaml_config["global"]["sport"].as_str() {
1127 addr_tls = match read_port.parse() {
1128 Ok(addr) => addr,
1129 Err(_) => {
1130 logger
1131 .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1132 .await
1133 .unwrap_or_default();
1134 Err(anyhow::anyhow!("Invalid HTTPS port"))?
1135 }
1136 };
1137 }
1138
1139 let mut acme_domains = Vec::new();
1141 if let Some(hosts_config) = yaml_config["hosts"].as_vec() {
1142 for host_yaml in hosts_config.iter() {
1143 if let Some(host) = host_yaml.as_hash() {
1144 if let Some(domain_yaml) = host.get(&Yaml::from_str("domain")) {
1145 if let Some(domain) = domain_yaml.as_str() {
1146 if !domain.contains("*") {
1147 acme_domains.push(domain);
1148 }
1149 }
1150 }
1151 }
1152 }
1153 }
1154
1155 let mut acme_config = AcmeConfig::new(acme_domains).challenge_type(acme_challenge_type);
1157 if let Some(acme_contact_unwrapped) = acme_contact {
1158 acme_config = acme_config.contact_push(format!("mailto:{}", acme_contact_unwrapped));
1159 }
1160 let mut acme_config_with_cache = acme_config.cache_option(acme_cache);
1161 acme_config_with_cache =
1162 acme_config_with_cache.directory_lets_encrypt(acme_letsencrypt_production);
1163
1164 let (acme_config, acme_http01_resolver) = if tls_enabled && automatic_tls_enabled {
1165 let mut acme_state = acme_config_with_cache.state();
1166
1167 let acme_resolver = acme_state.resolver();
1168
1169 tls_config = if yaml_config["global"]["enableOCSPStapling"]
1171 .as_bool()
1172 .unwrap_or(true)
1173 {
1174 tls_config_builder_wants_server_cert
1175 .with_cert_resolver(Arc::new(Stapler::new(acme_resolver.clone())))
1176 } else {
1177 tls_config_builder_wants_server_cert.with_cert_resolver(acme_resolver.clone())
1178 };
1179
1180 let acme_logger = logger.clone();
1181 tokio::spawn(async move {
1182 while let Some(acme_result) = acme_state.next().await {
1183 if let Err(acme_error) = acme_result {
1184 acme_logger
1185 .send(LogMessage::new(
1186 format!("Error while obtaining a TLS certificate: {}", acme_error),
1187 true,
1188 ))
1189 .await
1190 .unwrap_or_default();
1191 }
1192 }
1193 });
1194
1195 if acme_use_http_challenge {
1196 (None, Some(acme_resolver))
1197 } else {
1198 let mut acme_config = tls_config.clone();
1199 acme_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
1200
1201 (Some(acme_config), None)
1202 }
1203 } else {
1204 tls_config = if yaml_config["global"]["enableOCSPStapling"]
1206 .as_bool()
1207 .unwrap_or(true)
1208 {
1209 let ocsp_stapler_arc = Arc::new(Stapler::new(Arc::new(sni_resolver)));
1210 for certified_key in certified_keys.iter() {
1211 ocsp_stapler_arc.preload(certified_key.clone());
1212 }
1213 tls_config_builder_wants_server_cert.with_cert_resolver(ocsp_stapler_arc.clone())
1214 } else {
1215 tls_config_builder_wants_server_cert.with_cert_resolver(Arc::new(sni_resolver))
1216 };
1217
1218 drop(acme_config_with_cache);
1220 (None, None)
1221 };
1222
1223 let quic_config = if tls_enabled
1224 && yaml_config["global"]["enableHTTP3"]
1225 .as_bool()
1226 .unwrap_or(false)
1227 {
1228 let mut quic_tls_config = tls_config.clone();
1229 quic_tls_config.max_early_data_size = u32::MAX;
1230 quic_tls_config.alpn_protocols = vec![b"h3".to_vec(), b"h3-29".to_vec()];
1231 let quic_config = quinn::ServerConfig::with_crypto(Arc::new(match QuicServerConfig::try_from(
1232 quic_tls_config,
1233 ) {
1234 Ok(quinn_config) => quinn_config,
1235 Err(err) => {
1236 logger
1237 .send(LogMessage::new(
1238 format!("There was a problem when starting HTTP/3 server: {}", err),
1239 true,
1240 ))
1241 .await
1242 .unwrap_or_default();
1243 Err(anyhow::anyhow!(format!(
1244 "There was a problem when starting HTTP/3 server: {}",
1245 err
1246 )))?
1247 }
1248 }));
1249 Some(quic_config)
1250 } else {
1251 None
1252 };
1253
1254 let mut alpn_protocols = vec![b"http/1.1".to_vec(), b"http/1.0".to_vec()];
1256 if yaml_config["global"]["enableHTTP2"]
1257 .as_bool()
1258 .unwrap_or(true)
1259 {
1260 alpn_protocols.insert(0, b"h2".to_vec());
1261 }
1262 tls_config.alpn_protocols = alpn_protocols;
1263 let tls_config_arc = Arc::new(tls_config);
1264 let acme_config_arc = acme_config.map(Arc::new);
1265
1266 let mut listener = None;
1267 let mut listener_tls = None;
1268 let mut listener_quic = None;
1269
1270 if !non_tls_disabled {
1272 println!("HTTP server is listening at {}", addr);
1273 listener = Some(match TcpListener::bind(addr).await {
1274 Ok(listener) => listener,
1275 Err(err) => {
1276 logger
1277 .send(LogMessage::new(
1278 format!("Cannot listen to HTTP port: {}", err),
1279 true,
1280 ))
1281 .await
1282 .unwrap_or_default();
1283 Err(anyhow::anyhow!(format!(
1284 "Cannot listen to HTTP port: {}",
1285 err
1286 )))?
1287 }
1288 });
1289 }
1290
1291 if tls_enabled {
1292 println!("HTTPS server is listening at {}", addr_tls);
1293 listener_tls = Some(match TcpListener::bind(addr_tls).await {
1294 Ok(listener) => listener,
1295 Err(err) => {
1296 logger
1297 .send(LogMessage::new(
1298 format!("Cannot listen to HTTPS port: {}", err),
1299 true,
1300 ))
1301 .await
1302 .unwrap_or_default();
1303 Err(anyhow::anyhow!(format!(
1304 "Cannot listen to HTTPS port: {}",
1305 err
1306 )))?
1307 }
1308 });
1309
1310 if let Some(quic_config) = quic_config {
1311 println!("HTTP/3 server is listening at {}", addr_tls);
1312 listener_quic = Some(match quinn::Endpoint::server(quic_config, addr_tls) {
1313 Ok(listener) => listener,
1314 Err(err) => {
1315 logger
1316 .send(LogMessage::new(
1317 format!("Cannot listen to HTTP/3 port: {}", err),
1318 true,
1319 ))
1320 .await
1321 .unwrap_or_default();
1322 Err(anyhow::anyhow!(format!(
1323 "Cannot listen to HTTP/3 port: {}",
1324 err
1325 )))?
1326 }
1327 });
1328 }
1329 }
1330
1331 let modules_arc = Arc::new(modules);
1333
1334 let http3_enabled = if listener_quic.is_some() {
1335 Some(addr_tls.port())
1336 } else {
1337 None
1338 };
1339
1340 loop {
1342 let listener_borrowed = &listener;
1343 let listener_accept = async move {
1344 if let Some(listener) = listener_borrowed {
1345 listener.accept().await
1346 } else {
1347 futures_util::future::pending().await
1348 }
1349 };
1350
1351 let listener_tls_borrowed = &listener_tls;
1352 let listener_tls_accept = async move {
1353 if let Some(listener_tls) = listener_tls_borrowed {
1354 listener_tls.accept().await
1355 } else {
1356 futures_util::future::pending().await
1357 }
1358 };
1359
1360 let listener_quic_borrowed = &listener_quic;
1361 let listener_quic_accept = async move {
1362 if let Some(listener_quic) = listener_quic_borrowed {
1363 listener_quic.accept().await
1364 } else {
1365 futures_util::future::pending().await
1366 }
1367 };
1368
1369 if listener_borrowed.is_none()
1370 && listener_tls_borrowed.is_none()
1371 && listener_quic_borrowed.is_none()
1372 {
1373 logger
1374 .send(LogMessage::new(
1375 String::from("No server is listening"),
1376 true,
1377 ))
1378 .await
1379 .unwrap_or_default();
1380 Err(anyhow::anyhow!("No server is listening"))?;
1381 }
1382
1383 tokio::select! {
1384 status = listener_accept => {
1385 match status {
1386 Ok((stream, remote_address)) => {
1387 accept_connection(
1388 stream,
1389 remote_address,
1390 None,
1391 acme_http01_resolver.clone(),
1392 yaml_config.clone(),
1393 logger.clone(),
1394 modules_arc.clone(),
1395 None
1396 )
1397 .await;
1398 }
1399 Err(err) => {
1400 logger
1401 .send(LogMessage::new(
1402 format!("Cannot accept a connection: {}", err),
1403 true,
1404 ))
1405 .await
1406 .unwrap_or_default();
1407 }
1408 }
1409 },
1410 status = listener_tls_accept => {
1411 match status {
1412 Ok((stream, remote_address)) => {
1413 accept_connection(
1414 stream,
1415 remote_address,
1416 Some((tls_config_arc.clone(), acme_config_arc.clone())),
1417 None,
1418 yaml_config.clone(),
1419 logger.clone(),
1420 modules_arc.clone(),
1421 http3_enabled
1422 )
1423 .await;
1424 }
1425 Err(err) => {
1426 logger
1427 .send(LogMessage::new(
1428 format!("Cannot accept a connection: {}", err),
1429 true,
1430 ))
1431 .await
1432 .unwrap_or_default();
1433 }
1434 }
1435 },
1436 status = listener_quic_accept => {
1437 match status {
1438 Some(connection_attempt) => {
1439 let local_ip = SocketAddr::new(connection_attempt.local_ip().unwrap_or(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))), addr_tls.port());
1440 accept_quic_connection(
1441 connection_attempt,
1442 local_ip,
1443 yaml_config.clone(),
1444 logger.clone(),
1445 modules_arc.clone()
1446 )
1447 .await;
1448 }
1449 None => {
1450 logger
1451 .send(LogMessage::new(
1452 "HTTP/3 connections can't be accepted anymore".to_string(),
1453 true,
1454 ))
1455 .await
1456 .unwrap_or_default();
1457 }
1458 }
1459 }
1460 };
1461 }
1462}
1463
1464#[allow(clippy::type_complexity)]
1466pub fn start_server(
1467 yaml_config: Arc<Yaml>,
1468 modules: Vec<Box<dyn ServerModule + Send + Sync>>,
1469 module_error: Option<anyhow::Error>,
1470 modules_optional_builtin: Vec<String>,
1471 first_startup: bool,
1472) -> Result<bool, Box<dyn Error + Send + Sync>> {
1473 if let Some(environment_variables_hash) = yaml_config["global"]["environmentVariables"].as_hash()
1474 {
1475 let environment_variables_hash_iter = environment_variables_hash.iter();
1476 for (variable_name, variable_value) in environment_variables_hash_iter {
1477 if let Some(variable_name) = variable_name.as_str() {
1478 if let Some(variable_value) = variable_value.as_str() {
1479 if !variable_name.is_empty()
1480 && !variable_name.contains('\0')
1481 && !variable_name.contains('=')
1482 && !variable_value.contains('\0')
1483 {
1484 env::set_var(variable_name, variable_value);
1488 }
1489 }
1490 }
1491 }
1492 }
1493
1494 let available_parallelism = thread::available_parallelism()?.get();
1495
1496 let server_runtime = tokio::runtime::Builder::new_multi_thread()
1498 .worker_threads(available_parallelism)
1499 .max_blocking_threads(1536)
1500 .event_interval(25)
1501 .thread_name("server-pool")
1502 .enable_all()
1503 .build()?;
1504
1505 let log_runtime = tokio::runtime::Builder::new_multi_thread()
1507 .worker_threads(match available_parallelism / 2 {
1508 0 => 1,
1509 non_zero => non_zero,
1510 })
1511 .max_blocking_threads(768)
1512 .thread_name("log-pool")
1513 .enable_time()
1514 .build()?;
1515
1516 let (logger, receive_log) = async_channel::bounded::<LogMessage>(10000);
1517
1518 let log_filename = yaml_config["global"]["logFilePath"]
1519 .as_str()
1520 .map(String::from);
1521 let error_log_filename = yaml_config["global"]["errorLogFilePath"]
1522 .as_str()
1523 .map(String::from);
1524
1525 log_runtime.spawn(async move {
1526 let log_file = match log_filename {
1527 Some(log_filename) => Some(
1528 fs::OpenOptions::new()
1529 .append(true)
1530 .create(true)
1531 .open(log_filename)
1532 .await,
1533 ),
1534 None => None,
1535 };
1536
1537 let error_log_file = match error_log_filename {
1538 Some(error_log_filename) => Some(
1539 fs::OpenOptions::new()
1540 .append(true)
1541 .create(true)
1542 .open(error_log_filename)
1543 .await,
1544 ),
1545 None => None,
1546 };
1547
1548 let log_file_wrapped = match log_file {
1549 Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1550 Some(Err(e)) => {
1551 eprintln!("Failed to open log file: {}", e);
1552 None
1553 }
1554 None => None,
1555 };
1556
1557 let error_log_file_wrapped = match error_log_file {
1558 Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1559 Some(Err(e)) => {
1560 eprintln!("Failed to open error log file: {}", e);
1561 None
1562 }
1563 None => None,
1564 };
1565
1566 let log_file_wrapped_cloned_for_sleep = log_file_wrapped.clone();
1568 let error_log_file_wrapped_cloned_for_sleep = error_log_file_wrapped.clone();
1569 tokio::task::spawn(async move {
1570 let mut interval = time::interval(time::Duration::from_millis(100));
1571 loop {
1572 interval.tick().await;
1573 if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned_for_sleep.clone() {
1574 let mut locked_file = log_file_wrapped_cloned.lock().await;
1575 locked_file.flush().await.unwrap_or_default();
1576 }
1577 if let Some(error_log_file_wrapped_cloned) = error_log_file_wrapped_cloned_for_sleep.clone()
1578 {
1579 let mut locked_file = error_log_file_wrapped_cloned.lock().await;
1580 locked_file.flush().await.unwrap_or_default();
1581 }
1582 }
1583 });
1584
1585 while let Ok(message) = receive_log.recv().await {
1587 let (mut message, is_error) = message.get_message();
1588 let log_file_wrapped_cloned = if !is_error {
1589 log_file_wrapped.clone()
1590 } else {
1591 error_log_file_wrapped.clone()
1592 };
1593
1594 if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned {
1595 tokio::task::spawn(async move {
1596 let mut locked_file = log_file_wrapped_cloned.lock().await;
1597 if is_error {
1598 let now: DateTime<Local> = Local::now();
1599 let formatted_time = now.format("%Y-%m-%d %H:%M:%S").to_string();
1600 message = format!("[{}]: {}", formatted_time, message);
1601 }
1602 message.push('\n');
1603 if let Err(e) = locked_file.write(message.as_bytes()).await {
1604 eprintln!("Failed to write to log file: {}", e);
1605 }
1606 });
1607 }
1608 }
1609 });
1610
1611 for msg in env_config::log_env_var_overrides() {
1613 logger
1614 .send_blocking(LogMessage::new(msg, true))
1615 .unwrap_or_default();
1616 }
1617
1618 let result = server_runtime.block_on(async {
1620 let event_loop_future = server_event_loop(
1621 yaml_config,
1622 logger,
1623 modules,
1624 module_error,
1625 modules_optional_builtin,
1626 first_startup,
1627 );
1628
1629 let (continue_tx, continue_rx) = async_channel::unbounded::<bool>();
1630 let cancel_token = CancellationToken::new();
1631
1632 #[cfg(unix)]
1633 {
1634 let cancel_token_clone = cancel_token.clone();
1635 let continue_tx_clone = continue_tx.clone();
1636 tokio::spawn(async move {
1637 if let Ok(mut signal) = signal::unix::signal(signal::unix::SignalKind::hangup()) {
1638 tokio::select! {
1639 _ = signal.recv() => {
1640 continue_tx_clone.send(true).await.unwrap_or_default();
1641 }
1642 _ = cancel_token_clone.cancelled() => {}
1643 }
1644 }
1645 });
1646 }
1647
1648 let cancel_token_clone = cancel_token.clone();
1649 tokio::spawn(async move {
1650 tokio::select! {
1651 result = signal::ctrl_c() => {
1652 if result.is_ok() {
1653 continue_tx.send(false).await.unwrap_or_default();
1654 }
1655 }
1656 _ = cancel_token_clone.cancelled() => {}
1657 }
1658 });
1659
1660 let result = tokio::select! {
1661 result = event_loop_future => {
1662 time::sleep(tokio::time::Duration::from_millis(100)).await;
1664
1665 result.map(|_| false)
1666 },
1667 continue_running = continue_rx.recv() => Ok(continue_running?)
1668 };
1669
1670 cancel_token.cancel();
1671
1672 result
1673 });
1674
1675 server_runtime.shutdown_timeout(time::Duration::from_secs(10));
1677
1678 result
1679}