ferron/
server.rs

1use std::error::Error;
2use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::{env, thread};
6
7use crate::ferron_common::{LogMessage, ServerModule, ServerModuleHandlers};
8use crate::ferron_request_handler::request_handler;
9use crate::ferron_util::env_config;
10use crate::ferron_util::load_tls::{load_certs, load_private_key};
11use crate::ferron_util::sni::CustomSniResolver;
12use crate::ferron_util::validate_config::{prepare_config_for_validation, validate_config};
13use async_channel::Sender;
14use chrono::prelude::*;
15use futures_util::StreamExt;
16use h3_quinn::quinn;
17use h3_quinn::quinn::crypto::rustls::QuicServerConfig;
18use http::Response;
19use http_body_util::{BodyExt, StreamBody};
20use hyper::body::{Buf, Bytes, Frame, Incoming};
21use hyper::service::service_fn;
22use hyper::Request;
23use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
24use ocsp_stapler::Stapler;
25use rustls::crypto::ring::cipher_suite::*;
26use rustls::crypto::ring::default_provider;
27use rustls::crypto::ring::kx_group::*;
28use rustls::server::{Acceptor, WebPkiClientVerifier};
29use rustls::sign::CertifiedKey;
30use rustls::version::{TLS12, TLS13};
31use rustls::{RootCertStore, ServerConfig};
32use rustls_acme::acme::ACME_TLS_ALPN_NAME;
33use rustls_acme::caches::DirCache;
34use rustls_acme::{is_tls_alpn_challenge, AcmeConfig, ResolvesServerCertAcme, UseChallenge};
35use rustls_native_certs::load_native_certs;
36use tokio::fs;
37use tokio::io::{AsyncWriteExt, BufWriter};
38use tokio::net::{TcpListener, TcpStream};
39use tokio::runtime::Handle;
40use tokio::signal;
41use tokio::sync::Mutex;
42use tokio::time;
43use tokio_rustls::server::TlsStream;
44use tokio_rustls::LazyConfigAcceptor;
45use tokio_util::sync::CancellationToken;
46use yaml_rust2::Yaml;
47
48// Enum for maybe TLS stream
49#[allow(clippy::large_enum_variant)]
50enum MaybeTlsStream {
51  Tls(TlsStream<TcpStream>),
52  Plain(TcpStream),
53}
54
55// Function to accept and handle incoming QUIC connections
56#[allow(clippy::too_many_arguments)]
57async fn accept_quic_connection(
58  connection_attempt: quinn::Incoming,
59  local_address: SocketAddr,
60  config: Arc<Yaml>,
61  logger: Sender<LogMessage>,
62  modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
63) {
64  let remote_address = connection_attempt.remote_address();
65
66  let logger_clone = logger.clone();
67
68  tokio::task::spawn(async move {
69    match connection_attempt.await {
70      Ok(connection) => {
71        let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
72          match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
73            Ok(h3_conn) => h3_conn,
74            Err(err) => {
75              logger_clone
76                .send(LogMessage::new(
77                  format!("Error serving HTTP/3 connection: {err}"),
78                  true,
79                ))
80                .await
81                .unwrap_or_default();
82              return;
83            }
84          };
85
86        loop {
87          match h3_conn.accept().await {
88            Ok(Some(resolver)) => {
89              let config = config.clone();
90              let remote_address = remote_address;
91
92              let logger_clone = logger_clone.clone();
93              let modules = modules.clone();
94              tokio::spawn(async move {
95                let handlers_vec = modules
96                  .iter()
97                  .map(|module| module.get_handlers(Handle::current()));
98
99                let (request, stream) = match resolver.resolve_request().await {
100                  Ok(resolved) => resolved,
101                  Err(err) => {
102                    logger_clone
103                      .send(LogMessage::new(
104                        format!("Error serving HTTP/3 connection: {err}"),
105                        true,
106                      ))
107                      .await
108                      .unwrap_or_default();
109                    return;
110                  }
111                };
112                let (mut send, receive) = stream.split();
113                let request_body_stream = futures_util::stream::unfold(
114                  (receive, false),
115                  async move |(mut receive, mut is_body_finished)| loop {
116                    if !is_body_finished {
117                      match receive.recv_data().await {
118                        Ok(Some(mut data)) => {
119                          return Some((
120                            Ok(Frame::data(data.copy_to_bytes(data.remaining()))),
121                            (receive, false),
122                          ))
123                        }
124                        Ok(None) => is_body_finished = true,
125                        Err(err) => {
126                          return Some((
127                            Err(std::io::Error::other(err.to_string())),
128                            (receive, false),
129                          ))
130                        }
131                      }
132                    } else {
133                      match receive.recv_trailers().await {
134                        Ok(Some(trailers)) => {
135                          return Some((Ok(Frame::trailers(trailers)), (receive, true)))
136                        }
137                        Ok(None) => {
138                          return None;
139                        }
140                        Err(err) => {
141                          return Some((
142                            Err(std::io::Error::other(err.to_string())),
143                            (receive, true),
144                          ))
145                        }
146                      }
147                    }
148                  },
149                );
150                let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
151                let (request_parts, _) = request.into_parts();
152                let request = Request::from_parts(request_parts, request_body);
153                let handlers_vec_clone = handlers_vec
154                  .clone()
155                  .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
156                let mut response = match request_handler(
157                  request,
158                  remote_address,
159                  local_address,
160                  true,
161                  config,
162                  logger_clone.clone(),
163                  handlers_vec_clone,
164                  None,
165                  None,
166                )
167                .await
168                {
169                  Ok(response) => response,
170                  Err(err) => {
171                    logger_clone
172                      .send(LogMessage::new(
173                        format!("Error serving HTTP/3 connection: {err}"),
174                        true,
175                      ))
176                      .await
177                      .unwrap_or_default();
178                    return;
179                  }
180                };
181                if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
182                  response
183                    .headers_mut()
184                    .entry(http::header::DATE)
185                    .or_insert(http_date);
186                }
187                let (response_parts, mut response_body) = response.into_parts();
188                if let Err(err) = send
189                  .send_response(Response::from_parts(response_parts, ()))
190                  .await
191                {
192                  logger_clone
193                    .send(LogMessage::new(
194                      format!("Error serving HTTP/3 connection: {err}"),
195                      true,
196                    ))
197                    .await
198                    .unwrap_or_default();
199                  return;
200                }
201                let mut had_trailers = false;
202                while let Some(chunk) = response_body.frame().await {
203                  match chunk {
204                    Ok(frame) => {
205                      if frame.is_data() {
206                        match frame.into_data() {
207                          Ok(data) => {
208                            if let Err(err) = send.send_data(data).await {
209                              logger_clone
210                                .send(LogMessage::new(
211                                  format!("Error serving HTTP/3 connection: {err}"),
212                                  true,
213                                ))
214                                .await
215                                .unwrap_or_default();
216                              return;
217                            }
218                          }
219                          Err(_) => {
220                            logger_clone
221                            .send(LogMessage::new(
222                              "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
223                              true,
224                            ))
225                            .await
226                            .unwrap_or_default();
227                            return;
228                          }
229                        }
230                      } else if frame.is_trailers() {
231                        match frame.into_trailers() {
232                          Ok(trailers) => {
233                            had_trailers = true;
234                            if let Err(err) = send.send_trailers(trailers).await {
235                              logger_clone
236                                .send(LogMessage::new(
237                                  format!("Error serving HTTP/3 connection: {err}"),
238                                  true,
239                                ))
240                                .await
241                                .unwrap_or_default();
242                              return;
243                            }
244                          }
245                          Err(_) => {
246                            logger_clone
247                            .send(LogMessage::new(
248                              "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
249                              true,
250                            ))
251                            .await
252                            .unwrap_or_default();
253                            return;
254                          }
255                        }
256                      }
257                    }
258                    Err(err) => {
259                      logger_clone
260                        .send(LogMessage::new(
261                          format!("Error serving HTTP/3 connection: {err}"),
262                          true,
263                        ))
264                        .await
265                        .unwrap_or_default();
266                      return;
267                    }
268                  }
269                }
270                if !had_trailers {
271                  if let Err(err) = send.finish().await {
272                    logger_clone
273                      .send(LogMessage::new(
274                        format!("Error serving HTTP/3 connection: {err}"),
275                        true,
276                      ))
277                      .await
278                      .unwrap_or_default();
279                  }
280                }
281              });
282            }
283            Ok(None) => break,
284            Err(err) => {
285              logger_clone
286                .send(LogMessage::new(
287                  format!("Error serving HTTP/3 connection: {err}"),
288                  true,
289                ))
290                .await
291                .unwrap_or_default();
292              return;
293            }
294          }
295        }
296      }
297      Err(err) => {
298        logger_clone
299          .send(LogMessage::new(
300            format!("Cannot accept a connection: {err}"),
301            true,
302          ))
303          .await
304          .unwrap_or_default();
305      }
306    }
307  });
308}
309
310// Function to accept and handle incoming connections
311#[allow(clippy::too_many_arguments)]
312async fn accept_connection(
313  stream: TcpStream,
314  remote_address: SocketAddr,
315  tls_config_option: Option<(Arc<ServerConfig>, Option<Arc<ServerConfig>>)>,
316  acme_http01_resolver_option: Option<Arc<ResolvesServerCertAcme>>,
317  config: Arc<Yaml>,
318  logger: Sender<LogMessage>,
319  modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
320  http3_enabled: Option<u16>,
321) {
322  // Disable Nagle algorithm to improve performance
323  if let Err(err) = stream.set_nodelay(true) {
324    logger
325      .send(LogMessage::new(
326        format!("Cannot disable Nagle algorithm: {err}"),
327        true,
328      ))
329      .await
330      .unwrap_or_default();
331    return;
332  };
333
334  let config = config.clone();
335  let local_address = match stream.local_addr() {
336    Ok(local_address) => local_address,
337    Err(err) => {
338      logger
339        .send(LogMessage::new(
340          format!("Cannot obtain local address of the connection: {err}"),
341          true,
342        ))
343        .await
344        .unwrap_or_default();
345      return;
346    }
347  };
348
349  let logger_clone = logger.clone();
350
351  tokio::task::spawn(async move {
352    let maybe_tls_stream = if let Some((tls_config, acme_config_option)) = tls_config_option {
353      let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), stream).await {
354        Ok(start_handshake) => start_handshake,
355        Err(err) => {
356          logger
357            .send(LogMessage::new(
358              format!("Error during TLS handshake: {err}"),
359              true,
360            ))
361            .await
362            .unwrap_or_default();
363          return;
364        }
365      };
366
367      if let Some(acme_config) = acme_config_option {
368        if is_tls_alpn_challenge(&start_handshake.client_hello()) {
369          match start_handshake.into_stream(acme_config).await {
370            Ok(_) => (),
371            Err(err) => {
372              logger
373                .send(LogMessage::new(
374                  format!("Error during TLS handshake: {err}"),
375                  true,
376                ))
377                .await
378                .unwrap_or_default();
379              return;
380            }
381          };
382          return;
383        }
384      }
385
386      let tls_stream = match start_handshake.into_stream(tls_config).await {
387        Ok(tls_stream) => tls_stream,
388        Err(err) => {
389          logger
390            .send(LogMessage::new(
391              format!("Error during TLS handshake: {err}"),
392              true,
393            ))
394            .await
395            .unwrap_or_default();
396          return;
397        }
398      };
399
400      MaybeTlsStream::Tls(tls_stream)
401    } else {
402      MaybeTlsStream::Plain(stream)
403    };
404
405    if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
406      let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
407      let is_http2;
408
409      if config["global"]["enableHTTP2"].as_bool().unwrap_or(true) {
410        if alpn_protocol == Some("h2".as_bytes()) {
411          is_http2 = true;
412        } else {
413          // Don't allow HTTP/2 if "h2" ALPN offering was't present
414          is_http2 = false;
415        }
416      } else {
417        is_http2 = false;
418      }
419
420      let io = TokioIo::new(tls_stream);
421      let handlers_vec = modules
422        .iter()
423        .map(|module| module.get_handlers(Handle::current()));
424
425      if is_http2 {
426        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
427        http2_builder.timer(TokioTimer::new());
428        if let Some(initial_window_size) =
429          config["global"]["http2Settings"]["initialWindowSize"].as_i64()
430        {
431          http2_builder.initial_stream_window_size(initial_window_size as u32);
432        }
433        if let Some(max_frame_size) = config["global"]["http2Settings"]["maxFrameSize"].as_i64() {
434          http2_builder.max_frame_size(max_frame_size as u32);
435        }
436        if let Some(max_concurrent_streams) =
437          config["global"]["http2Settings"]["maxConcurrentStreams"].as_i64()
438        {
439          http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
440        }
441        if let Some(max_header_list_size) =
442          config["global"]["http2Settings"]["maxHeaderListSize"].as_i64()
443        {
444          http2_builder.max_header_list_size(max_header_list_size as u32);
445        }
446        if let Some(enable_connect_protocol) =
447          config["global"]["http2Settings"]["enableConnectProtocol"].as_bool()
448        {
449          if enable_connect_protocol {
450            http2_builder.enable_connect_protocol();
451          }
452        }
453
454        if let Err(err) = http2_builder
455          .serve_connection(
456            io,
457            service_fn(move |request: Request<Incoming>| {
458              let config = config.clone();
459              let logger = logger_clone.clone();
460              let handlers_vec_clone = handlers_vec
461                .clone()
462                .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
463              let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
464              let (request_parts, request_body) = request.into_parts();
465              let request = Request::from_parts(
466                request_parts,
467                request_body
468                  .map_err(|e| std::io::Error::other(e.to_string()))
469                  .boxed(),
470              );
471              request_handler(
472                request,
473                remote_address,
474                local_address,
475                true,
476                config,
477                logger,
478                handlers_vec_clone,
479                acme_http01_resolver_option_clone,
480                http3_enabled,
481              )
482            }),
483          )
484          .await
485        {
486          logger
487            .send(LogMessage::new(
488              format!("Error serving HTTPS connection: {err}"),
489              true,
490            ))
491            .await
492            .unwrap_or_default();
493        }
494      } else {
495        let mut http1_builder = hyper::server::conn::http1::Builder::new();
496
497        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
498        http1_builder.timer(TokioTimer::new());
499
500        if let Err(err) = http1_builder
501          .serve_connection(
502            io,
503            service_fn(move |request: Request<Incoming>| {
504              let config = config.clone();
505              let logger = logger_clone.clone();
506              let handlers_vec_clone = handlers_vec
507                .clone()
508                .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
509              let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
510              let (request_parts, request_body) = request.into_parts();
511              let request = Request::from_parts(
512                request_parts,
513                request_body
514                  .map_err(|e| std::io::Error::other(e.to_string()))
515                  .boxed(),
516              );
517              request_handler(
518                request,
519                remote_address,
520                local_address,
521                true,
522                config,
523                logger,
524                handlers_vec_clone,
525                acme_http01_resolver_option_clone,
526                http3_enabled,
527              )
528            }),
529          )
530          .with_upgrades()
531          .await
532        {
533          logger
534            .send(LogMessage::new(
535              format!("Error serving HTTPS connection: {err}"),
536              true,
537            ))
538            .await
539            .unwrap_or_default();
540        }
541      }
542    } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
543      let io = TokioIo::new(stream);
544      let handlers_vec = modules
545        .iter()
546        .map(|module| module.get_handlers(Handle::current()));
547
548      let mut http1_builder = hyper::server::conn::http1::Builder::new();
549
550      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
551      http1_builder.timer(TokioTimer::new());
552
553      if let Err(err) = http1_builder
554        .serve_connection(
555          io,
556          service_fn(move |request: Request<Incoming>| {
557            let config = config.clone();
558            let logger = logger_clone.clone();
559            let handlers_vec_clone = handlers_vec
560              .clone()
561              .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
562            let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
563            let (request_parts, request_body) = request.into_parts();
564            let request = Request::from_parts(
565              request_parts,
566              request_body
567                .map_err(|e| std::io::Error::other(e.to_string()))
568                .boxed(),
569            );
570            request_handler(
571              request,
572              remote_address,
573              local_address,
574              false,
575              config,
576              logger,
577              handlers_vec_clone,
578              acme_http01_resolver_option_clone,
579              http3_enabled,
580            )
581          }),
582        )
583        .with_upgrades()
584        .await
585      {
586        logger
587          .send(LogMessage::new(
588            format!("Error serving HTTP connection: {err}"),
589            true,
590          ))
591          .await
592          .unwrap_or_default();
593      }
594    }
595  });
596}
597
598// Main server event loop
599#[allow(clippy::type_complexity)]
600async fn server_event_loop(
601  yaml_config: Arc<Yaml>,
602  logger: Sender<LogMessage>,
603  modules: Vec<Box<dyn ServerModule + Send + Sync>>,
604  module_error: Option<anyhow::Error>,
605  modules_optional_builtin: Vec<String>,
606  first_startup: bool,
607) -> Result<(), Box<dyn Error + Send + Sync>> {
608  if let Some(module_error) = module_error {
609    logger
610      .send(LogMessage::new(module_error.to_string(), true))
611      .await
612      .unwrap_or_default();
613    Err(module_error)?
614  }
615
616  let prepared_config = match prepare_config_for_validation(&yaml_config) {
617    Ok(prepared_config) => prepared_config,
618    Err(err) => {
619      logger
620        .send(LogMessage::new(
621          format!("Server configuration validation failed: {err}"),
622          true,
623        ))
624        .await
625        .unwrap_or_default();
626      Err(anyhow::anyhow!(format!(
627        "Server configuration validation failed: {}",
628        err
629      )))?
630    }
631  };
632
633  for (config_to_validate, is_global, is_location, is_error_config) in prepared_config {
634    match validate_config(
635      config_to_validate,
636      is_global,
637      is_location,
638      is_error_config,
639      &modules_optional_builtin,
640    ) {
641      Ok(unused_properties) => {
642        for unused_property in unused_properties {
643          logger
644            .send(LogMessage::new(
645              format!(
646                "Unused configuration property detected: \"{unused_property}\". You might load an appropriate module to use this configuration property"
647              ),
648              true,
649            ))
650            .await
651            .unwrap_or_default();
652        }
653      }
654      Err(err) => {
655        logger
656          .send(LogMessage::new(
657            format!("Server configuration validation failed: {err}"),
658            true,
659          ))
660          .await
661          .unwrap_or_default();
662        Err(anyhow::anyhow!(format!(
663          "Server configuration validation failed: {}",
664          err
665        )))?
666      }
667    };
668  }
669
670  let mut crypto_provider = default_provider();
671
672  if let Some(cipher_suite) = yaml_config["global"]["cipherSuite"].as_vec() {
673    let mut cipher_suites = Vec::new();
674    let cipher_suite_iter = cipher_suite.iter();
675    for cipher_suite_yaml in cipher_suite_iter {
676      if let Some(cipher_suite) = cipher_suite_yaml.as_str() {
677        let cipher_suite_to_add = match cipher_suite {
678          "TLS_AES_128_GCM_SHA256" => TLS13_AES_128_GCM_SHA256,
679          "TLS_AES_256_GCM_SHA384" => TLS13_AES_256_GCM_SHA384,
680          "TLS_CHACHA20_POLY1305_SHA256" => TLS13_CHACHA20_POLY1305_SHA256,
681          "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
682          "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
683          "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256" => {
684            TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
685          }
686          "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
687          "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
688          "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256" => {
689            TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
690          }
691          _ => {
692            logger
693              .send(LogMessage::new(
694                format!("The \"{cipher_suite}\" cipher suite is not supported"),
695                true,
696              ))
697              .await
698              .unwrap_or_default();
699            Err(anyhow::anyhow!(format!(
700              "The \"{}\" cipher suite is not supported",
701              cipher_suite
702            )))?
703          }
704        };
705        cipher_suites.push(cipher_suite_to_add);
706      }
707    }
708    crypto_provider.cipher_suites = cipher_suites;
709  }
710
711  if let Some(ecdh_curves) = yaml_config["global"]["ecdhCurve"].as_vec() {
712    let mut kx_groups = Vec::new();
713    let ecdh_curves_iter = ecdh_curves.iter();
714    for ecdh_curve_yaml in ecdh_curves_iter {
715      if let Some(ecdh_curve) = ecdh_curve_yaml.as_str() {
716        let kx_group_to_add = match ecdh_curve {
717          "secp256r1" => SECP256R1,
718          "secp384r1" => SECP384R1,
719          "x25519" => X25519,
720          _ => {
721            logger
722              .send(LogMessage::new(
723                format!("The \"{ecdh_curve}\" ECDH curve is not supported"),
724                true,
725              ))
726              .await
727              .unwrap_or_default();
728            Err(anyhow::anyhow!(format!(
729              "The \"{}\" ECDH curve is not supported",
730              ecdh_curve
731            )))?
732          }
733        };
734        kx_groups.push(kx_group_to_add);
735      }
736    }
737    crypto_provider.kx_groups = kx_groups;
738  }
739
740  let crypto_provider_cloned = crypto_provider.clone();
741  let mut sni_resolver = CustomSniResolver::new();
742  let mut certified_keys = Vec::new();
743
744  let mut automatic_tls_enabled = false;
745  let mut acme_letsencrypt_production = true;
746  let acme_use_http_challenge = yaml_config["global"]["useAutomaticTLSHTTPChallenge"]
747    .as_bool()
748    .unwrap_or(false);
749  let acme_challenge_type = if acme_use_http_challenge {
750    UseChallenge::Http01
751  } else {
752    UseChallenge::TlsAlpn01
753  };
754
755  // Read automatic TLS configuration
756  if let Some(read_automatic_tls_enabled) = yaml_config["global"]["enableAutomaticTLS"].as_bool() {
757    automatic_tls_enabled = read_automatic_tls_enabled;
758  }
759
760  let acme_contact = yaml_config["global"]["automaticTLSContactEmail"].as_str();
761  let acme_cache = yaml_config["global"]["automaticTLSContactCacheDirectory"]
762    .as_str()
763    .map(|s| s.to_string())
764    .map(DirCache::new);
765
766  if let Some(read_acme_letsencrypt_production) =
767    yaml_config["global"]["automaticTLSLetsEncryptProduction"].as_bool()
768  {
769    acme_letsencrypt_production = read_acme_letsencrypt_production;
770  }
771
772  if !automatic_tls_enabled {
773    // Load public certificate and private key
774    if let Some(cert_path) = yaml_config["global"]["cert"].as_str() {
775      if let Some(key_path) = yaml_config["global"]["key"].as_str() {
776        let certs = match load_certs(cert_path) {
777          Ok(certs) => certs,
778          Err(err) => {
779            logger
780              .send(LogMessage::new(
781                format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
782                true,
783              ))
784              .await
785              .unwrap_or_default();
786            Err(anyhow::anyhow!(format!(
787              "Cannot load the \"{}\" TLS certificate: {}",
788              cert_path, err
789            )))?
790          }
791        };
792        let key = match load_private_key(key_path) {
793          Ok(key) => key,
794          Err(err) => {
795            logger
796              .send(LogMessage::new(
797                format!("Cannot load the \"{cert_path}\" private key: {err}"),
798                true,
799              ))
800              .await
801              .unwrap_or_default();
802            Err(anyhow::anyhow!(format!(
803              "Cannot load the \"{}\" private key: {}",
804              cert_path, err
805            )))?
806          }
807        };
808        let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
809          Ok(key) => key,
810          Err(err) => {
811            logger
812              .send(LogMessage::new(
813                format!("Cannot load the \"{cert_path}\" private key: {err}"),
814                true,
815              ))
816              .await
817              .unwrap_or_default();
818            Err(anyhow::anyhow!(format!(
819              "Cannot load the \"{}\" private key: {}",
820              cert_path, err
821            )))?
822          }
823        };
824        let certified_key = CertifiedKey::new(certs, signing_key);
825        sni_resolver.load_fallback_cert_key(Arc::new(certified_key));
826      }
827    }
828
829    if let Some(sni) = yaml_config["global"]["sni"].as_hash() {
830      let sni_hostnames = sni.keys();
831      for sni_hostname_unknown in sni_hostnames {
832        if let Some(sni_hostname) = sni_hostname_unknown.as_str() {
833          if let Some(cert_path) = sni[sni_hostname_unknown]["cert"].as_str() {
834            if let Some(key_path) = sni[sni_hostname_unknown]["key"].as_str() {
835              let certs = match load_certs(cert_path) {
836                Ok(certs) => certs,
837                Err(err) => {
838                  logger
839                    .send(LogMessage::new(
840                      format!("Cannot load the \"{cert_path}\" TLS certificate: {err}"),
841                      true,
842                    ))
843                    .await
844                    .unwrap_or_default();
845                  Err(anyhow::anyhow!(format!(
846                    "Cannot load the \"{}\" TLS certificate: {}",
847                    cert_path, err
848                  )))?
849                }
850              };
851              let key = match load_private_key(key_path) {
852                Ok(key) => key,
853                Err(err) => {
854                  logger
855                    .send(LogMessage::new(
856                      format!("Cannot load the \"{cert_path}\" private key: {err}"),
857                      true,
858                    ))
859                    .await
860                    .unwrap_or_default();
861                  Err(anyhow::anyhow!(format!(
862                    "Cannot load the \"{}\" private key: {}",
863                    cert_path, err
864                  )))?
865                }
866              };
867              let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
868                Ok(key) => key,
869                Err(err) => {
870                  logger
871                    .send(LogMessage::new(
872                      format!("Cannot load the \"{cert_path}\" private key: {err}"),
873                      true,
874                    ))
875                    .await
876                    .unwrap_or_default();
877                  Err(anyhow::anyhow!(format!(
878                    "Cannot load the \"{}\" private key: {}",
879                    cert_path, err
880                  )))?
881                }
882              };
883              let certified_key_arc = Arc::new(CertifiedKey::new(certs, signing_key));
884              sni_resolver.load_host_cert_key(sni_hostname, certified_key_arc.clone());
885              certified_keys.push(certified_key_arc);
886            }
887          }
888        }
889      }
890    }
891  }
892
893  // Build TLS configuration
894  let tls_config_builder_wants_versions =
895    ServerConfig::builder_with_provider(Arc::new(crypto_provider_cloned));
896
897  // Very simple minimum and maximum TLS version logic for now...
898  let min_tls_version_option = yaml_config["global"]["tlsMinVersion"].as_str();
899  let max_tls_version_option = yaml_config["global"]["tlsMaxVersion"].as_str();
900  let tls_config_builder_wants_verifier = match min_tls_version_option {
901    Some("TLSv1.3") => match max_tls_version_option {
902      Some("TLSv1.2") => {
903        logger
904          .send(LogMessage::new(
905            String::from("The maximum TLS version is older than the minimum TLS version"),
906            true,
907          ))
908          .await
909          .unwrap_or_default();
910        Err(anyhow::anyhow!(String::from(
911          "The maximum TLS version is older than the minimum TLS version"
912        )))?
913      }
914      Some("TLSv1.3") | None => {
915        match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS13]) {
916          Ok(builder) => builder,
917          Err(err) => {
918            logger
919              .send(LogMessage::new(
920                format!("Couldn't create the TLS server configuration: {err}"),
921                true,
922              ))
923              .await
924              .unwrap_or_default();
925            Err(anyhow::anyhow!(format!(
926              "Couldn't create the TLS server configuration: {}",
927              err
928            )))?
929          }
930        }
931      }
932      _ => {
933        logger
934          .send(LogMessage::new(
935            String::from("Invalid maximum TLS version"),
936            true,
937          ))
938          .await
939          .unwrap_or_default();
940        Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
941      }
942    },
943    Some("TLSv1.2") | None => match max_tls_version_option {
944      Some("TLSv1.2") => {
945        match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12]) {
946          Ok(builder) => builder,
947          Err(err) => {
948            logger
949              .send(LogMessage::new(
950                format!("Couldn't create the TLS server configuration: {err}"),
951                true,
952              ))
953              .await
954              .unwrap_or_default();
955            Err(anyhow::anyhow!(format!(
956              "Couldn't create the TLS server configuration: {}",
957              err
958            )))?
959          }
960        }
961      }
962      Some("TLSv1.3") | None => {
963        match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12, &TLS13]) {
964          Ok(builder) => builder,
965          Err(err) => {
966            logger
967              .send(LogMessage::new(
968                format!("Couldn't create the TLS server configuration: {err}"),
969                true,
970              ))
971              .await
972              .unwrap_or_default();
973            Err(anyhow::anyhow!(format!(
974              "Couldn't create the TLS server configuration: {}",
975              err
976            )))?
977          }
978        }
979      }
980      _ => {
981        logger
982          .send(LogMessage::new(
983            String::from("Invalid maximum TLS version"),
984            true,
985          ))
986          .await
987          .unwrap_or_default();
988        Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
989      }
990    },
991    _ => {
992      logger
993        .send(LogMessage::new(
994          String::from("Invalid minimum TLS version"),
995          true,
996        ))
997        .await
998        .unwrap_or_default();
999      Err(anyhow::anyhow!(String::from("Invalid minimum TLS version")))?
1000    }
1001  };
1002
1003  let tls_config_builder_wants_server_cert =
1004    match yaml_config["global"]["useClientCertificate"].as_bool() {
1005      Some(true) => {
1006        let mut roots = RootCertStore::empty();
1007        let certs_result = load_native_certs();
1008        if !certs_result.errors.is_empty() {
1009          logger
1010            .send(LogMessage::new(
1011              format!(
1012                "Couldn't load the native certificate store: {}",
1013                certs_result.errors[0]
1014              ),
1015              true,
1016            ))
1017            .await
1018            .unwrap_or_default();
1019          Err(anyhow::anyhow!(format!(
1020            "Couldn't load the native certificate store: {}",
1021            certs_result.errors[0]
1022          )))?
1023        }
1024        let certs = certs_result.certs;
1025
1026        for cert in certs {
1027          match roots.add(cert) {
1028            Ok(_) => (),
1029            Err(err) => {
1030              logger
1031                .send(LogMessage::new(
1032                  format!("Couldn't add a certificate to the certificate store: {err}"),
1033                  true,
1034                ))
1035                .await
1036                .unwrap_or_default();
1037              Err(anyhow::anyhow!(format!(
1038                "Couldn't add a certificate to the certificate store: {}",
1039                err
1040              )))?
1041            }
1042          }
1043        }
1044        tls_config_builder_wants_verifier
1045          .with_client_cert_verifier(WebPkiClientVerifier::builder(Arc::new(roots)).build()?)
1046      }
1047      _ => tls_config_builder_wants_verifier.with_no_client_auth(),
1048    };
1049
1050  let mut tls_config;
1051
1052  let mut addr = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 80));
1053  let mut addr_tls = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 443));
1054  let mut tls_enabled = false;
1055  let mut non_tls_disabled = false;
1056
1057  // Install a process-wide cryptography provider. If it fails, then warn about it.
1058  if crypto_provider.install_default().is_err() && first_startup {
1059    logger
1060      .send(LogMessage::new(
1061        "Cannot install a process-wide cryptography provider".to_string(),
1062        true,
1063      ))
1064      .await
1065      .unwrap_or_default();
1066    Err(anyhow::anyhow!(
1067      "Cannot install a process-wide cryptography provider"
1068    ))?;
1069  }
1070
1071  // Read port configurations from YAML
1072  if let Some(read_port) = yaml_config["global"]["port"].as_i64() {
1073    addr = SocketAddr::from((
1074      IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1075      match read_port.try_into() {
1076        Ok(port) => port,
1077        Err(_) => {
1078          logger
1079            .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1080            .await
1081            .unwrap_or_default();
1082          Err(anyhow::anyhow!("Invalid HTTP port"))?
1083        }
1084      },
1085    ));
1086  } else if let Some(read_port) = yaml_config["global"]["port"].as_str() {
1087    addr = match read_port.parse() {
1088      Ok(addr) => addr,
1089      Err(_) => {
1090        logger
1091          .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1092          .await
1093          .unwrap_or_default();
1094        Err(anyhow::anyhow!("Invalid HTTP port"))?
1095      }
1096    };
1097  }
1098
1099  if let Some(read_tls_enabled) = yaml_config["global"]["secure"].as_bool() {
1100    tls_enabled = read_tls_enabled;
1101    if let Some(read_non_tls_disabled) =
1102      yaml_config["global"]["disableNonEncryptedServer"].as_bool()
1103    {
1104      non_tls_disabled = read_non_tls_disabled;
1105    }
1106  }
1107
1108  if let Some(read_port) = yaml_config["global"]["sport"].as_i64() {
1109    addr_tls = SocketAddr::from((
1110      IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1111      match read_port.try_into() {
1112        Ok(port) => port,
1113        Err(_) => {
1114          logger
1115            .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1116            .await
1117            .unwrap_or_default();
1118          Err(anyhow::anyhow!("Invalid HTTPS port"))?
1119        }
1120      },
1121    ));
1122  } else if let Some(read_port) = yaml_config["global"]["sport"].as_str() {
1123    addr_tls = match read_port.parse() {
1124      Ok(addr) => addr,
1125      Err(_) => {
1126        logger
1127          .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1128          .await
1129          .unwrap_or_default();
1130        Err(anyhow::anyhow!("Invalid HTTPS port"))?
1131      }
1132    };
1133  }
1134
1135  // Get domains for ACME configuration
1136  let mut acme_domains = Vec::new();
1137  if let Some(hosts_config) = yaml_config["hosts"].as_vec() {
1138    for host_yaml in hosts_config.iter() {
1139      if let Some(host) = host_yaml.as_hash() {
1140        if let Some(domain_yaml) = host.get(&Yaml::from_str("domain")) {
1141          if let Some(domain) = domain_yaml.as_str() {
1142            if !domain.contains("*") {
1143              acme_domains.push(domain);
1144            }
1145          }
1146        }
1147      }
1148    }
1149  }
1150
1151  // Create ACME configuration
1152  let mut acme_config = AcmeConfig::new(acme_domains).challenge_type(acme_challenge_type);
1153  if let Some(acme_contact_unwrapped) = acme_contact {
1154    acme_config = acme_config.contact_push(format!("mailto:{acme_contact_unwrapped}"));
1155  }
1156  let mut acme_config_with_cache = acme_config.cache_option(acme_cache);
1157  acme_config_with_cache =
1158    acme_config_with_cache.directory_lets_encrypt(acme_letsencrypt_production);
1159
1160  let (acme_config, acme_http01_resolver) = if tls_enabled && automatic_tls_enabled {
1161    let mut acme_state = acme_config_with_cache.state();
1162
1163    let acme_resolver = acme_state.resolver();
1164
1165    // Create TLS configuration
1166    tls_config = if yaml_config["global"]["enableOCSPStapling"]
1167      .as_bool()
1168      .unwrap_or(true)
1169    {
1170      tls_config_builder_wants_server_cert
1171        .with_cert_resolver(Arc::new(Stapler::new(acme_resolver.clone())))
1172    } else {
1173      tls_config_builder_wants_server_cert.with_cert_resolver(acme_resolver.clone())
1174    };
1175
1176    let acme_logger = logger.clone();
1177    tokio::spawn(async move {
1178      while let Some(acme_result) = acme_state.next().await {
1179        if let Err(acme_error) = acme_result {
1180          acme_logger
1181            .send(LogMessage::new(
1182              format!("Error while obtaining a TLS certificate: {acme_error}"),
1183              true,
1184            ))
1185            .await
1186            .unwrap_or_default();
1187        }
1188      }
1189    });
1190
1191    if acme_use_http_challenge {
1192      (None, Some(acme_resolver))
1193    } else {
1194      let mut acme_config = tls_config.clone();
1195      acme_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
1196
1197      (Some(acme_config), None)
1198    }
1199  } else {
1200    // Create TLS configuration
1201    tls_config = if yaml_config["global"]["enableOCSPStapling"]
1202      .as_bool()
1203      .unwrap_or(true)
1204    {
1205      let ocsp_stapler_arc = Arc::new(Stapler::new(Arc::new(sni_resolver)));
1206      for certified_key in certified_keys.iter() {
1207        ocsp_stapler_arc.preload(certified_key.clone());
1208      }
1209      tls_config_builder_wants_server_cert.with_cert_resolver(ocsp_stapler_arc.clone())
1210    } else {
1211      tls_config_builder_wants_server_cert.with_cert_resolver(Arc::new(sni_resolver))
1212    };
1213
1214    // Drop the ACME configuration
1215    drop(acme_config_with_cache);
1216    (None, None)
1217  };
1218
1219  let quic_config = if tls_enabled
1220    && yaml_config["global"]["enableHTTP3"]
1221      .as_bool()
1222      .unwrap_or(false)
1223  {
1224    let mut quic_tls_config = tls_config.clone();
1225    quic_tls_config.max_early_data_size = u32::MAX;
1226    quic_tls_config.alpn_protocols = vec![b"h3".to_vec(), b"h3-29".to_vec()];
1227    let quic_config = quinn::ServerConfig::with_crypto(Arc::new(match QuicServerConfig::try_from(
1228      quic_tls_config,
1229    ) {
1230      Ok(quinn_config) => quinn_config,
1231      Err(err) => {
1232        logger
1233          .send(LogMessage::new(
1234            format!("There was a problem when starting HTTP/3 server: {err}"),
1235            true,
1236          ))
1237          .await
1238          .unwrap_or_default();
1239        Err(anyhow::anyhow!(format!(
1240          "There was a problem when starting HTTP/3 server: {}",
1241          err
1242        )))?
1243      }
1244    }));
1245    Some(quic_config)
1246  } else {
1247    None
1248  };
1249
1250  // Configure ALPN protocols
1251  let mut alpn_protocols = vec![b"http/1.1".to_vec(), b"http/1.0".to_vec()];
1252  if yaml_config["global"]["enableHTTP2"]
1253    .as_bool()
1254    .unwrap_or(true)
1255  {
1256    alpn_protocols.insert(0, b"h2".to_vec());
1257  }
1258  tls_config.alpn_protocols = alpn_protocols;
1259  let tls_config_arc = Arc::new(tls_config);
1260  let acme_config_arc = acme_config.map(Arc::new);
1261
1262  let mut listener = None;
1263  let mut listener_tls = None;
1264  let mut listener_quic = None;
1265
1266  // Bind to the specified ports
1267  if !non_tls_disabled {
1268    println!("HTTP server is listening at {addr}");
1269    listener = Some(match TcpListener::bind(addr).await {
1270      Ok(listener) => listener,
1271      Err(err) => {
1272        logger
1273          .send(LogMessage::new(
1274            format!("Cannot listen to HTTP port: {err}"),
1275            true,
1276          ))
1277          .await
1278          .unwrap_or_default();
1279        Err(anyhow::anyhow!(format!(
1280          "Cannot listen to HTTP port: {}",
1281          err
1282        )))?
1283      }
1284    });
1285  }
1286
1287  if tls_enabled {
1288    println!("HTTPS server is listening at {addr_tls}");
1289    listener_tls = Some(match TcpListener::bind(addr_tls).await {
1290      Ok(listener) => listener,
1291      Err(err) => {
1292        logger
1293          .send(LogMessage::new(
1294            format!("Cannot listen to HTTPS port: {err}"),
1295            true,
1296          ))
1297          .await
1298          .unwrap_or_default();
1299        Err(anyhow::anyhow!(format!(
1300          "Cannot listen to HTTPS port: {}",
1301          err
1302        )))?
1303      }
1304    });
1305
1306    if let Some(quic_config) = quic_config {
1307      println!("HTTP/3 server is listening at {addr_tls}");
1308      listener_quic = Some(match quinn::Endpoint::server(quic_config, addr_tls) {
1309        Ok(listener) => listener,
1310        Err(err) => {
1311          logger
1312            .send(LogMessage::new(
1313              format!("Cannot listen to HTTP/3 port: {err}"),
1314              true,
1315            ))
1316            .await
1317            .unwrap_or_default();
1318          Err(anyhow::anyhow!(format!(
1319            "Cannot listen to HTTP/3 port: {}",
1320            err
1321          )))?
1322        }
1323      });
1324    }
1325  }
1326
1327  // Wrap the modules vector in an Arc
1328  let modules_arc = Arc::new(modules);
1329
1330  let http3_enabled = if listener_quic.is_some() {
1331    Some(addr_tls.port())
1332  } else {
1333    None
1334  };
1335
1336  // Main loop to accept incoming connections
1337  loop {
1338    let listener_borrowed = &listener;
1339    let listener_accept = async move {
1340      if let Some(listener) = listener_borrowed {
1341        listener.accept().await
1342      } else {
1343        futures_util::future::pending().await
1344      }
1345    };
1346
1347    let listener_tls_borrowed = &listener_tls;
1348    let listener_tls_accept = async move {
1349      if let Some(listener_tls) = listener_tls_borrowed {
1350        listener_tls.accept().await
1351      } else {
1352        futures_util::future::pending().await
1353      }
1354    };
1355
1356    let listener_quic_borrowed = &listener_quic;
1357    let listener_quic_accept = async move {
1358      if let Some(listener_quic) = listener_quic_borrowed {
1359        listener_quic.accept().await
1360      } else {
1361        futures_util::future::pending().await
1362      }
1363    };
1364
1365    if listener_borrowed.is_none()
1366      && listener_tls_borrowed.is_none()
1367      && listener_quic_borrowed.is_none()
1368    {
1369      logger
1370        .send(LogMessage::new(
1371          String::from("No server is listening"),
1372          true,
1373        ))
1374        .await
1375        .unwrap_or_default();
1376      Err(anyhow::anyhow!("No server is listening"))?;
1377    }
1378
1379    tokio::select! {
1380      status = listener_accept => {
1381        match status {
1382          Ok((stream, remote_address)) => {
1383            accept_connection(
1384              stream,
1385              remote_address,
1386              None,
1387              acme_http01_resolver.clone(),
1388              yaml_config.clone(),
1389              logger.clone(),
1390              modules_arc.clone(),
1391              None
1392            )
1393            .await;
1394          }
1395          Err(err) => {
1396            logger
1397              .send(LogMessage::new(
1398                format!("Cannot accept a connection: {err}"),
1399                true,
1400              ))
1401              .await
1402              .unwrap_or_default();
1403          }
1404        }
1405      },
1406      status = listener_tls_accept => {
1407        match status {
1408          Ok((stream, remote_address)) => {
1409            accept_connection(
1410              stream,
1411              remote_address,
1412              Some((tls_config_arc.clone(), acme_config_arc.clone())),
1413              None,
1414              yaml_config.clone(),
1415              logger.clone(),
1416              modules_arc.clone(),
1417              http3_enabled
1418            )
1419            .await;
1420          }
1421          Err(err) => {
1422            logger
1423              .send(LogMessage::new(
1424                format!("Cannot accept a connection: {err}"),
1425                true,
1426              ))
1427              .await
1428              .unwrap_or_default();
1429          }
1430        }
1431      },
1432      status = listener_quic_accept => {
1433        match status {
1434          Some(connection_attempt) => {
1435            let local_ip = SocketAddr::new(connection_attempt.local_ip().unwrap_or(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))), addr_tls.port());
1436            accept_quic_connection(
1437              connection_attempt,
1438              local_ip,
1439              yaml_config.clone(),
1440              logger.clone(),
1441              modules_arc.clone()
1442            )
1443            .await;
1444          }
1445          None => {
1446            logger
1447              .send(LogMessage::new(
1448                "HTTP/3 connections can't be accepted anymore".to_string(),
1449                true,
1450              ))
1451              .await
1452              .unwrap_or_default();
1453          }
1454        }
1455      }
1456    };
1457  }
1458}
1459
1460// Start the server
1461#[allow(clippy::type_complexity)]
1462pub fn start_server(
1463  yaml_config: Arc<Yaml>,
1464  modules: Vec<Box<dyn ServerModule + Send + Sync>>,
1465  module_error: Option<anyhow::Error>,
1466  modules_optional_builtin: Vec<String>,
1467  first_startup: bool,
1468) -> Result<bool, Box<dyn Error + Send + Sync>> {
1469  if let Some(environment_variables_hash) = yaml_config["global"]["environmentVariables"].as_hash()
1470  {
1471    let environment_variables_hash_iter = environment_variables_hash.iter();
1472    for (variable_name, variable_value) in environment_variables_hash_iter {
1473      if let Some(variable_name) = variable_name.as_str() {
1474        if let Some(variable_value) = variable_value.as_str() {
1475          if !variable_name.is_empty()
1476            && !variable_name.contains('\0')
1477            && !variable_name.contains('=')
1478            && !variable_value.contains('\0')
1479          {
1480            // Safety: the environment variables are set before threads are spawned
1481            // The `std::env::set_var` function is safe to use in single-threaded environments
1482            // In Rust 2024 edition, the `std::env::set_var` function would be `unsafe`.
1483            env::set_var(variable_name, variable_value);
1484          }
1485        }
1486      }
1487    }
1488  }
1489
1490  let available_parallelism = thread::available_parallelism()?.get();
1491
1492  // Create Tokio runtime for the server
1493  let server_runtime = tokio::runtime::Builder::new_multi_thread()
1494    .worker_threads(available_parallelism)
1495    .max_blocking_threads(1536)
1496    .event_interval(25)
1497    .thread_name("server-pool")
1498    .enable_all()
1499    .build()?;
1500
1501  // Create Tokio runtime for logging
1502  let log_runtime = tokio::runtime::Builder::new_multi_thread()
1503    .worker_threads(match available_parallelism / 2 {
1504      0 => 1,
1505      non_zero => non_zero,
1506    })
1507    .max_blocking_threads(768)
1508    .thread_name("log-pool")
1509    .enable_time()
1510    .build()?;
1511
1512  let (logger, receive_log) = async_channel::bounded::<LogMessage>(10000);
1513
1514  let log_filename = yaml_config["global"]["logFilePath"]
1515    .as_str()
1516    .map(String::from);
1517  let error_log_filename = yaml_config["global"]["errorLogFilePath"]
1518    .as_str()
1519    .map(String::from);
1520
1521  log_runtime.spawn(async move {
1522    let log_file = match log_filename {
1523      Some(log_filename) => Some(
1524        fs::OpenOptions::new()
1525          .append(true)
1526          .create(true)
1527          .open(log_filename)
1528          .await,
1529      ),
1530      None => None,
1531    };
1532
1533    let error_log_file = match error_log_filename {
1534      Some(error_log_filename) => Some(
1535        fs::OpenOptions::new()
1536          .append(true)
1537          .create(true)
1538          .open(error_log_filename)
1539          .await,
1540      ),
1541      None => None,
1542    };
1543
1544    let log_file_wrapped = match log_file {
1545      Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1546      Some(Err(e)) => {
1547        eprintln!("Failed to open log file: {e}");
1548        None
1549      }
1550      None => None,
1551    };
1552
1553    let error_log_file_wrapped = match error_log_file {
1554      Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1555      Some(Err(e)) => {
1556        eprintln!("Failed to open error log file: {e}");
1557        None
1558      }
1559      None => None,
1560    };
1561
1562    // The logs are written when the log message is received by the log event loop, and flushed every 100 ms, improving the server performance.
1563    let log_file_wrapped_cloned_for_sleep = log_file_wrapped.clone();
1564    let error_log_file_wrapped_cloned_for_sleep = error_log_file_wrapped.clone();
1565    tokio::task::spawn(async move {
1566      let mut interval = time::interval(time::Duration::from_millis(100));
1567      loop {
1568        interval.tick().await;
1569        if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned_for_sleep.clone() {
1570          let mut locked_file = log_file_wrapped_cloned.lock().await;
1571          locked_file.flush().await.unwrap_or_default();
1572        }
1573        if let Some(error_log_file_wrapped_cloned) = error_log_file_wrapped_cloned_for_sleep.clone()
1574        {
1575          let mut locked_file = error_log_file_wrapped_cloned.lock().await;
1576          locked_file.flush().await.unwrap_or_default();
1577        }
1578      }
1579    });
1580
1581    // Logging loop
1582    while let Ok(message) = receive_log.recv().await {
1583      let (mut message, is_error) = message.get_message();
1584      let log_file_wrapped_cloned = if !is_error {
1585        log_file_wrapped.clone()
1586      } else {
1587        error_log_file_wrapped.clone()
1588      };
1589
1590      if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned {
1591        tokio::task::spawn(async move {
1592          let mut locked_file = log_file_wrapped_cloned.lock().await;
1593          if is_error {
1594            let now: DateTime<Local> = Local::now();
1595            let formatted_time = now.format("%Y-%m-%d %H:%M:%S").to_string();
1596            message = format!("[{formatted_time}]: {message}");
1597          }
1598          message.push('\n');
1599          if let Err(e) = locked_file.write(message.as_bytes()).await {
1600            eprintln!("Failed to write to log file: {e}");
1601          }
1602        });
1603      }
1604    }
1605  });
1606
1607  // Log env overrides once at startup
1608  for msg in env_config::log_env_var_overrides() {
1609    logger
1610      .send_blocking(LogMessage::new(msg, true))
1611      .unwrap_or_default();
1612  }
1613
1614  // Run the server event loop
1615  let result = server_runtime.block_on(async {
1616    let event_loop_future = server_event_loop(
1617      yaml_config,
1618      logger,
1619      modules,
1620      module_error,
1621      modules_optional_builtin,
1622      first_startup,
1623    );
1624
1625    let (continue_tx, continue_rx) = async_channel::unbounded::<bool>();
1626    let cancel_token = CancellationToken::new();
1627
1628    #[cfg(unix)]
1629    {
1630      let cancel_token_clone = cancel_token.clone();
1631      let continue_tx_clone = continue_tx.clone();
1632      tokio::spawn(async move {
1633        if let Ok(mut signal) = signal::unix::signal(signal::unix::SignalKind::hangup()) {
1634          tokio::select! {
1635            _ = signal.recv() => {
1636              continue_tx_clone.send(true).await.unwrap_or_default();
1637            }
1638            _ = cancel_token_clone.cancelled() => {}
1639          }
1640        }
1641      });
1642    }
1643
1644    let cancel_token_clone = cancel_token.clone();
1645    tokio::spawn(async move {
1646      tokio::select! {
1647        result = signal::ctrl_c() => {
1648          if result.is_ok() {
1649            continue_tx.send(false).await.unwrap_or_default();
1650          }
1651        }
1652        _ = cancel_token_clone.cancelled() => {}
1653      }
1654    });
1655
1656    let result = tokio::select! {
1657      result = event_loop_future => {
1658        // Sleep the Tokio runtime to ensure error logs are saved
1659        time::sleep(tokio::time::Duration::from_millis(100)).await;
1660
1661        result.map(|_| false)
1662      },
1663      continue_running = continue_rx.recv() => Ok(continue_running?)
1664    };
1665
1666    cancel_token.cancel();
1667
1668    result
1669  });
1670
1671  // Wait 10 seconds or until all tasks are complete
1672  server_runtime.shutdown_timeout(time::Duration::from_secs(10));
1673
1674  result
1675}