ferron/
server.rs

1use std::error::Error;
2use std::net::{IpAddr, Ipv6Addr, SocketAddr};
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::{env, thread};
6
7use crate::ferron_common::{LogMessage, ServerModule, ServerModuleHandlers};
8use crate::ferron_request_handler::request_handler;
9use crate::ferron_util::env_config;
10use crate::ferron_util::load_tls::{load_certs, load_private_key};
11use crate::ferron_util::sni::CustomSniResolver;
12use crate::ferron_util::validate_config::{prepare_config_for_validation, validate_config};
13use async_channel::Sender;
14use chrono::prelude::*;
15use futures_util::StreamExt;
16use h3_quinn::quinn;
17use h3_quinn::quinn::crypto::rustls::QuicServerConfig;
18use http::Response;
19use http_body_util::{BodyExt, StreamBody};
20use hyper::body::{Buf, Bytes, Frame, Incoming};
21use hyper::service::service_fn;
22use hyper::Request;
23use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
24use ocsp_stapler::Stapler;
25use rustls::crypto::ring::cipher_suite::*;
26use rustls::crypto::ring::default_provider;
27use rustls::crypto::ring::kx_group::*;
28use rustls::server::{Acceptor, WebPkiClientVerifier};
29use rustls::sign::CertifiedKey;
30use rustls::version::{TLS12, TLS13};
31use rustls::{RootCertStore, ServerConfig};
32use rustls_acme::acme::ACME_TLS_ALPN_NAME;
33use rustls_acme::caches::DirCache;
34use rustls_acme::{is_tls_alpn_challenge, AcmeConfig, ResolvesServerCertAcme, UseChallenge};
35use rustls_native_certs::load_native_certs;
36use tokio::fs;
37use tokio::io::{AsyncWriteExt, BufWriter};
38use tokio::net::{TcpListener, TcpStream};
39use tokio::runtime::Handle;
40use tokio::signal;
41use tokio::sync::Mutex;
42use tokio::time;
43use tokio_rustls::server::TlsStream;
44use tokio_rustls::LazyConfigAcceptor;
45use tokio_util::sync::CancellationToken;
46use yaml_rust2::Yaml;
47
48// Enum for maybe TLS stream
49#[allow(clippy::large_enum_variant)]
50enum MaybeTlsStream {
51  Tls(TlsStream<TcpStream>),
52  Plain(TcpStream),
53}
54
55// Function to accept and handle incoming QUIC connections
56#[allow(clippy::too_many_arguments)]
57async fn accept_quic_connection(
58  connection_attempt: quinn::Incoming,
59  local_address: SocketAddr,
60  config: Arc<Yaml>,
61  logger: Sender<LogMessage>,
62  modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
63) {
64  let remote_address = connection_attempt.remote_address();
65
66  let logger_clone = logger.clone();
67
68  tokio::task::spawn(async move {
69    match connection_attempt.await {
70      Ok(connection) => {
71        let mut h3_conn: h3::server::Connection<h3_quinn::Connection, Bytes> =
72          match h3::server::Connection::new(h3_quinn::Connection::new(connection)).await {
73            Ok(h3_conn) => h3_conn,
74            Err(err) => {
75              logger_clone
76                .send(LogMessage::new(
77                  format!("Error serving HTTP/3 connection: {}", err),
78                  true,
79                ))
80                .await
81                .unwrap_or_default();
82              return;
83            }
84          };
85
86        loop {
87          match h3_conn.accept().await {
88            Ok(Some(resolver)) => {
89              let config = config.clone();
90              let remote_address = remote_address;
91
92              let logger_clone = logger_clone.clone();
93              let modules = modules.clone();
94              tokio::spawn(async move {
95                let handlers_vec = modules
96                  .iter()
97                  .map(|module| module.get_handlers(Handle::current()));
98
99                let (request, stream) = match resolver.resolve_request().await {
100                  Ok(resolved) => resolved,
101                  Err(err) => {
102                    logger_clone
103                      .send(LogMessage::new(
104                        format!("Error serving HTTP/3 connection: {}", err),
105                        true,
106                      ))
107                      .await
108                      .unwrap_or_default();
109                    return;
110                  }
111                };
112                let (mut send, receive) = stream.split();
113                let request_body_stream = futures_util::stream::unfold(
114                  (receive, false),
115                  async move |(mut receive, mut is_body_finished)| loop {
116                    if !is_body_finished {
117                      match receive.recv_data().await {
118                        Ok(Some(mut data)) => {
119                          return Some((
120                            Ok(Frame::data(data.copy_to_bytes(data.remaining()))),
121                            (receive, false),
122                          ))
123                        }
124                        Ok(None) => is_body_finished = true,
125                        Err(err) => {
126                          return Some((
127                            Err(std::io::Error::other(err.to_string())),
128                            (receive, false),
129                          ))
130                        }
131                      }
132                    } else {
133                      match receive.recv_trailers().await {
134                        Ok(Some(trailers)) => {
135                          return Some((Ok(Frame::trailers(trailers)), (receive, true)))
136                        }
137                        Ok(None) => {
138                          return None;
139                        }
140                        Err(err) => {
141                          return Some((
142                            Err(std::io::Error::other(err.to_string())),
143                            (receive, true),
144                          ))
145                        }
146                      }
147                    }
148                  },
149                );
150                let request_body = BodyExt::boxed(StreamBody::new(request_body_stream));
151                let (request_parts, _) = request.into_parts();
152                let request = Request::from_parts(request_parts, request_body);
153                let handlers_vec_clone = handlers_vec
154                  .clone()
155                  .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
156                let mut response = match request_handler(
157                  request,
158                  remote_address,
159                  local_address,
160                  true,
161                  config,
162                  logger_clone.clone(),
163                  handlers_vec_clone,
164                  None,
165                  None,
166                )
167                .await
168                {
169                  Ok(response) => response,
170                  Err(err) => {
171                    logger_clone
172                      .send(LogMessage::new(
173                        format!("Error serving HTTP/3 connection: {}", err),
174                        true,
175                      ))
176                      .await
177                      .unwrap_or_default();
178                    return;
179                  }
180                };
181                if let Ok(http_date) = httpdate::fmt_http_date(SystemTime::now()).try_into() {
182                  response
183                    .headers_mut()
184                    .entry(http::header::DATE)
185                    .or_insert(http_date);
186                }
187                let (response_parts, mut response_body) = response.into_parts();
188                if let Err(err) = send
189                  .send_response(Response::from_parts(response_parts, ()))
190                  .await
191                {
192                  logger_clone
193                    .send(LogMessage::new(
194                      format!("Error serving HTTP/3 connection: {}", err),
195                      true,
196                    ))
197                    .await
198                    .unwrap_or_default();
199                  return;
200                }
201                let mut had_trailers = false;
202                while let Some(chunk) = response_body.frame().await {
203                  match chunk {
204                    Ok(frame) => {
205                      if frame.is_data() {
206                        match frame.into_data() {
207                          Ok(data) => {
208                            if let Err(err) = send.send_data(data).await {
209                              logger_clone
210                                .send(LogMessage::new(
211                                  format!("Error serving HTTP/3 connection: {}", err),
212                                  true,
213                                ))
214                                .await
215                                .unwrap_or_default();
216                              return;
217                            }
218                          }
219                          Err(_) => {
220                            logger_clone
221                            .send(LogMessage::new(
222                              "Error serving HTTP/3 connection: the frame isn't really a data frame".to_string(),
223                              true,
224                            ))
225                            .await
226                            .unwrap_or_default();
227                            return;
228                          }
229                        }
230                      } else if frame.is_trailers() {
231                        match frame.into_trailers() {
232                          Ok(trailers) => {
233                            had_trailers = true;
234                            if let Err(err) = send.send_trailers(trailers).await {
235                              logger_clone
236                                .send(LogMessage::new(
237                                  format!("Error serving HTTP/3 connection: {}", err),
238                                  true,
239                                ))
240                                .await
241                                .unwrap_or_default();
242                              return;
243                            }
244                          }
245                          Err(_) => {
246                            logger_clone
247                            .send(LogMessage::new(
248                              "Error serving HTTP/3 connection: the frame isn't really a trailers frame".to_string(),
249                              true,
250                            ))
251                            .await
252                            .unwrap_or_default();
253                            return;
254                          }
255                        }
256                      }
257                    }
258                    Err(err) => {
259                      logger_clone
260                        .send(LogMessage::new(
261                          format!("Error serving HTTP/3 connection: {}", err),
262                          true,
263                        ))
264                        .await
265                        .unwrap_or_default();
266                      return;
267                    }
268                  }
269                }
270                if !had_trailers {
271                  if let Err(err) = send.finish().await {
272                    logger_clone
273                      .send(LogMessage::new(
274                        format!("Error serving HTTP/3 connection: {}", err),
275                        true,
276                      ))
277                      .await
278                      .unwrap_or_default();
279                  }
280                }
281              });
282            }
283            Ok(None) => break,
284            Err(err) => {
285              logger_clone
286                .send(LogMessage::new(
287                  format!("Error serving HTTP/3 connection: {}", err),
288                  true,
289                ))
290                .await
291                .unwrap_or_default();
292              return;
293            }
294          }
295        }
296      }
297      Err(err) => {
298        logger_clone
299          .send(LogMessage::new(
300            format!("Cannot accept a connection: {}", err),
301            true,
302          ))
303          .await
304          .unwrap_or_default();
305      }
306    }
307  });
308}
309
310// Function to accept and handle incoming connections
311#[allow(clippy::too_many_arguments)]
312async fn accept_connection(
313  stream: TcpStream,
314  remote_address: SocketAddr,
315  tls_config_option: Option<(Arc<ServerConfig>, Option<Arc<ServerConfig>>)>,
316  acme_http01_resolver_option: Option<Arc<ResolvesServerCertAcme>>,
317  config: Arc<Yaml>,
318  logger: Sender<LogMessage>,
319  modules: Arc<Vec<Box<dyn ServerModule + std::marker::Send + Sync>>>,
320  http3_enabled: Option<u16>,
321) {
322  // Disable Nagle algorithm to improve performance
323  if let Err(err) = stream.set_nodelay(true) {
324    logger
325      .send(LogMessage::new(
326        format!("Cannot disable Nagle algorithm: {}", err),
327        true,
328      ))
329      .await
330      .unwrap_or_default();
331    return;
332  };
333
334  let config = config.clone();
335  let local_address = match stream.local_addr() {
336    Ok(local_address) => local_address,
337    Err(err) => {
338      logger
339        .send(LogMessage::new(
340          format!("Cannot obtain local address of the connection: {}", err),
341          true,
342        ))
343        .await
344        .unwrap_or_default();
345      return;
346    }
347  };
348
349  let logger_clone = logger.clone();
350
351  tokio::task::spawn(async move {
352    let maybe_tls_stream = if let Some((tls_config, acme_config_option)) = tls_config_option {
353      let start_handshake = match LazyConfigAcceptor::new(Acceptor::default(), stream).await {
354        Ok(start_handshake) => start_handshake,
355        Err(err) => {
356          logger
357            .send(LogMessage::new(
358              format!("Error during TLS handshake: {}", err),
359              true,
360            ))
361            .await
362            .unwrap_or_default();
363          return;
364        }
365      };
366
367      if let Some(acme_config) = acme_config_option {
368        if is_tls_alpn_challenge(&start_handshake.client_hello()) {
369          match start_handshake.into_stream(acme_config).await {
370            Ok(_) => (),
371            Err(err) => {
372              logger
373                .send(LogMessage::new(
374                  format!("Error during TLS handshake: {}", err),
375                  true,
376                ))
377                .await
378                .unwrap_or_default();
379              return;
380            }
381          };
382          return;
383        }
384      }
385
386      let tls_stream = match start_handshake.into_stream(tls_config).await {
387        Ok(tls_stream) => tls_stream,
388        Err(err) => {
389          logger
390            .send(LogMessage::new(
391              format!("Error during TLS handshake: {}", err),
392              true,
393            ))
394            .await
395            .unwrap_or_default();
396          return;
397        }
398      };
399
400      MaybeTlsStream::Tls(tls_stream)
401    } else {
402      MaybeTlsStream::Plain(stream)
403    };
404
405    if let MaybeTlsStream::Tls(tls_stream) = maybe_tls_stream {
406      let alpn_protocol = tls_stream.get_ref().1.alpn_protocol();
407      let is_http2;
408
409      if config["global"]["enableHTTP2"].as_bool().unwrap_or(true) {
410        if alpn_protocol == Some("h2".as_bytes()) {
411          is_http2 = true;
412        } else {
413          // Don't allow HTTP/2 if "h2" ALPN offering was't present
414          is_http2 = false;
415        }
416      } else {
417        is_http2 = false;
418      }
419
420      let io = TokioIo::new(tls_stream);
421      let handlers_vec = modules
422        .iter()
423        .map(|module| module.get_handlers(Handle::current()));
424
425      if is_http2 {
426        let mut http2_builder = hyper::server::conn::http2::Builder::new(TokioExecutor::new());
427        http2_builder.timer(TokioTimer::new());
428        if let Some(initial_window_size) =
429          config["global"]["http2Settings"]["initialWindowSize"].as_i64()
430        {
431          http2_builder.initial_stream_window_size(initial_window_size as u32);
432        }
433        if let Some(max_frame_size) = config["global"]["http2Settings"]["maxFrameSize"].as_i64() {
434          http2_builder.max_frame_size(max_frame_size as u32);
435        }
436        if let Some(max_concurrent_streams) =
437          config["global"]["http2Settings"]["maxConcurrentStreams"].as_i64()
438        {
439          http2_builder.max_concurrent_streams(max_concurrent_streams as u32);
440        }
441        if let Some(max_header_list_size) =
442          config["global"]["http2Settings"]["maxHeaderListSize"].as_i64()
443        {
444          http2_builder.max_header_list_size(max_header_list_size as u32);
445        }
446        if let Some(enable_connect_protocol) =
447          config["global"]["http2Settings"]["enableConnectProtocol"].as_bool()
448        {
449          if enable_connect_protocol {
450            http2_builder.enable_connect_protocol();
451          }
452        }
453
454        if let Err(err) = http2_builder
455          .serve_connection(
456            io,
457            service_fn(move |request: Request<Incoming>| {
458              let config = config.clone();
459              let logger = logger_clone.clone();
460              let handlers_vec_clone = handlers_vec
461                .clone()
462                .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
463              let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
464              let (request_parts, request_body) = request.into_parts();
465              let request = Request::from_parts(
466                request_parts,
467                request_body
468                  .map_err(|e| std::io::Error::other(e.to_string()))
469                  .boxed(),
470              );
471              request_handler(
472                request,
473                remote_address,
474                local_address,
475                true,
476                config,
477                logger,
478                handlers_vec_clone,
479                acme_http01_resolver_option_clone,
480                http3_enabled,
481              )
482            }),
483          )
484          .await
485        {
486          logger
487            .send(LogMessage::new(
488              format!("Error serving HTTPS connection: {}", err),
489              true,
490            ))
491            .await
492            .unwrap_or_default();
493        }
494      } else {
495        let mut http1_builder = hyper::server::conn::http1::Builder::new();
496
497        // The timer is neccessary for the header timeout to work to mitigate Slowloris.
498        http1_builder.timer(TokioTimer::new());
499
500        if let Err(err) = http1_builder
501          .serve_connection(
502            io,
503            service_fn(move |request: Request<Incoming>| {
504              let config = config.clone();
505              let logger = logger_clone.clone();
506              let handlers_vec_clone = handlers_vec
507                .clone()
508                .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
509              let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
510              let (request_parts, request_body) = request.into_parts();
511              let request = Request::from_parts(
512                request_parts,
513                request_body
514                  .map_err(|e| std::io::Error::other(e.to_string()))
515                  .boxed(),
516              );
517              request_handler(
518                request,
519                remote_address,
520                local_address,
521                true,
522                config,
523                logger,
524                handlers_vec_clone,
525                acme_http01_resolver_option_clone,
526                http3_enabled,
527              )
528            }),
529          )
530          .with_upgrades()
531          .await
532        {
533          logger
534            .send(LogMessage::new(
535              format!("Error serving HTTPS connection: {}", err),
536              true,
537            ))
538            .await
539            .unwrap_or_default();
540        }
541      }
542    } else if let MaybeTlsStream::Plain(stream) = maybe_tls_stream {
543      let io = TokioIo::new(stream);
544      let handlers_vec = modules
545        .iter()
546        .map(|module| module.get_handlers(Handle::current()));
547
548      let mut http1_builder = hyper::server::conn::http1::Builder::new();
549
550      // The timer is neccessary for the header timeout to work to mitigate Slowloris.
551      http1_builder.timer(TokioTimer::new());
552
553      if let Err(err) = http1_builder
554        .serve_connection(
555          io,
556          service_fn(move |request: Request<Incoming>| {
557            let config = config.clone();
558            let logger = logger_clone.clone();
559            let handlers_vec_clone = handlers_vec
560              .clone()
561              .collect::<Vec<Box<dyn ServerModuleHandlers + Send>>>();
562            let acme_http01_resolver_option_clone = acme_http01_resolver_option.clone();
563            let (request_parts, request_body) = request.into_parts();
564            let request = Request::from_parts(
565              request_parts,
566              request_body
567                .map_err(|e| std::io::Error::other(e.to_string()))
568                .boxed(),
569            );
570            request_handler(
571              request,
572              remote_address,
573              local_address,
574              false,
575              config,
576              logger,
577              handlers_vec_clone,
578              acme_http01_resolver_option_clone,
579              http3_enabled,
580            )
581          }),
582        )
583        .with_upgrades()
584        .await
585      {
586        logger
587          .send(LogMessage::new(
588            format!("Error serving HTTP connection: {}", err),
589            true,
590          ))
591          .await
592          .unwrap_or_default();
593      }
594    }
595  });
596}
597
598// Main server event loop
599#[allow(clippy::type_complexity)]
600async fn server_event_loop(
601  yaml_config: Arc<Yaml>,
602  logger: Sender<LogMessage>,
603  modules: Vec<Box<dyn ServerModule + Send + Sync>>,
604  module_error: Option<anyhow::Error>,
605  modules_optional_builtin: Vec<String>,
606  first_startup: bool,
607) -> Result<(), Box<dyn Error + Send + Sync>> {
608  if let Some(module_error) = module_error {
609    logger
610      .send(LogMessage::new(module_error.to_string(), true))
611      .await
612      .unwrap_or_default();
613    Err(module_error)?
614  }
615
616  let prepared_config = match prepare_config_for_validation(&yaml_config) {
617    Ok(prepared_config) => prepared_config,
618    Err(err) => {
619      logger
620        .send(LogMessage::new(
621          format!("Server configuration validation failed: {}", err),
622          true,
623        ))
624        .await
625        .unwrap_or_default();
626      Err(anyhow::anyhow!(format!(
627        "Server configuration validation failed: {}",
628        err
629      )))?
630    }
631  };
632
633  for (config_to_validate, is_global, is_location, is_error_config) in prepared_config {
634    match validate_config(
635      config_to_validate,
636      is_global,
637      is_location,
638      is_error_config,
639      &modules_optional_builtin,
640    ) {
641      Ok(unused_properties) => {
642        for unused_property in unused_properties {
643          logger
644            .send(LogMessage::new(
645              format!(
646                "Unused configuration property detected: \"{}\". You might load an appropriate module to use this configuration property",
647                unused_property
648              ),
649              true,
650            ))
651            .await
652            .unwrap_or_default();
653        }
654      }
655      Err(err) => {
656        logger
657          .send(LogMessage::new(
658            format!("Server configuration validation failed: {}", err),
659            true,
660          ))
661          .await
662          .unwrap_or_default();
663        Err(anyhow::anyhow!(format!(
664          "Server configuration validation failed: {}",
665          err
666        )))?
667      }
668    };
669  }
670
671  let mut crypto_provider = default_provider();
672
673  if let Some(cipher_suite) = yaml_config["global"]["cipherSuite"].as_vec() {
674    let mut cipher_suites = Vec::new();
675    let cipher_suite_iter = cipher_suite.iter();
676    for cipher_suite_yaml in cipher_suite_iter {
677      if let Some(cipher_suite) = cipher_suite_yaml.as_str() {
678        let cipher_suite_to_add = match cipher_suite {
679          "TLS_AES_128_GCM_SHA256" => TLS13_AES_128_GCM_SHA256,
680          "TLS_AES_256_GCM_SHA384" => TLS13_AES_256_GCM_SHA384,
681          "TLS_CHACHA20_POLY1305_SHA256" => TLS13_CHACHA20_POLY1305_SHA256,
682          "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
683          "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
684          "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256" => {
685            TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
686          }
687          "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256" => TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
688          "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384" => TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
689          "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256" => {
690            TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
691          }
692          _ => {
693            logger
694              .send(LogMessage::new(
695                format!("The \"{}\" cipher suite is not supported", cipher_suite),
696                true,
697              ))
698              .await
699              .unwrap_or_default();
700            Err(anyhow::anyhow!(format!(
701              "The \"{}\" cipher suite is not supported",
702              cipher_suite
703            )))?
704          }
705        };
706        cipher_suites.push(cipher_suite_to_add);
707      }
708    }
709    crypto_provider.cipher_suites = cipher_suites;
710  }
711
712  if let Some(ecdh_curves) = yaml_config["global"]["ecdhCurve"].as_vec() {
713    let mut kx_groups = Vec::new();
714    let ecdh_curves_iter = ecdh_curves.iter();
715    for ecdh_curve_yaml in ecdh_curves_iter {
716      if let Some(ecdh_curve) = ecdh_curve_yaml.as_str() {
717        let kx_group_to_add = match ecdh_curve {
718          "secp256r1" => SECP256R1,
719          "secp384r1" => SECP384R1,
720          "x25519" => X25519,
721          _ => {
722            logger
723              .send(LogMessage::new(
724                format!("The \"{}\" ECDH curve is not supported", ecdh_curve),
725                true,
726              ))
727              .await
728              .unwrap_or_default();
729            Err(anyhow::anyhow!(format!(
730              "The \"{}\" ECDH curve is not supported",
731              ecdh_curve
732            )))?
733          }
734        };
735        kx_groups.push(kx_group_to_add);
736      }
737    }
738    crypto_provider.kx_groups = kx_groups;
739  }
740
741  let crypto_provider_cloned = crypto_provider.clone();
742  let mut sni_resolver = CustomSniResolver::new();
743  let mut certified_keys = Vec::new();
744
745  let mut automatic_tls_enabled = false;
746  let mut acme_letsencrypt_production = true;
747  let acme_use_http_challenge = yaml_config["global"]["useAutomaticTLSHTTPChallenge"]
748    .as_bool()
749    .unwrap_or(false);
750  let acme_challenge_type = if acme_use_http_challenge {
751    UseChallenge::Http01
752  } else {
753    UseChallenge::TlsAlpn01
754  };
755
756  // Read automatic TLS configuration
757  if let Some(read_automatic_tls_enabled) = yaml_config["global"]["enableAutomaticTLS"].as_bool() {
758    automatic_tls_enabled = read_automatic_tls_enabled;
759  }
760
761  let acme_contact = yaml_config["global"]["automaticTLSContactEmail"].as_str();
762  let acme_cache = yaml_config["global"]["automaticTLSContactCacheDirectory"]
763    .as_str()
764    .map(|s| s.to_string())
765    .map(DirCache::new);
766
767  if let Some(read_acme_letsencrypt_production) =
768    yaml_config["global"]["automaticTLSLetsEncryptProduction"].as_bool()
769  {
770    acme_letsencrypt_production = read_acme_letsencrypt_production;
771  }
772
773  if !automatic_tls_enabled {
774    // Load public certificate and private key
775    if let Some(cert_path) = yaml_config["global"]["cert"].as_str() {
776      if let Some(key_path) = yaml_config["global"]["key"].as_str() {
777        let certs = match load_certs(cert_path) {
778          Ok(certs) => certs,
779          Err(err) => {
780            logger
781              .send(LogMessage::new(
782                format!("Cannot load the \"{}\" TLS certificate: {}", cert_path, err),
783                true,
784              ))
785              .await
786              .unwrap_or_default();
787            Err(anyhow::anyhow!(format!(
788              "Cannot load the \"{}\" TLS certificate: {}",
789              cert_path, err
790            )))?
791          }
792        };
793        let key = match load_private_key(key_path) {
794          Ok(key) => key,
795          Err(err) => {
796            logger
797              .send(LogMessage::new(
798                format!("Cannot load the \"{}\" private key: {}", cert_path, err),
799                true,
800              ))
801              .await
802              .unwrap_or_default();
803            Err(anyhow::anyhow!(format!(
804              "Cannot load the \"{}\" private key: {}",
805              cert_path, err
806            )))?
807          }
808        };
809        let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
810          Ok(key) => key,
811          Err(err) => {
812            logger
813              .send(LogMessage::new(
814                format!("Cannot load the \"{}\" private key: {}", cert_path, err),
815                true,
816              ))
817              .await
818              .unwrap_or_default();
819            Err(anyhow::anyhow!(format!(
820              "Cannot load the \"{}\" private key: {}",
821              cert_path, err
822            )))?
823          }
824        };
825        let certified_key = CertifiedKey::new(certs, signing_key);
826        sni_resolver.load_fallback_cert_key(Arc::new(certified_key));
827      }
828    }
829
830    if let Some(sni) = yaml_config["global"]["sni"].as_hash() {
831      let sni_hostnames = sni.keys();
832      for sni_hostname_unknown in sni_hostnames {
833        if let Some(sni_hostname) = sni_hostname_unknown.as_str() {
834          if let Some(cert_path) = sni[sni_hostname_unknown]["cert"].as_str() {
835            if let Some(key_path) = sni[sni_hostname_unknown]["key"].as_str() {
836              let certs = match load_certs(cert_path) {
837                Ok(certs) => certs,
838                Err(err) => {
839                  logger
840                    .send(LogMessage::new(
841                      format!("Cannot load the \"{}\" TLS certificate: {}", cert_path, err),
842                      true,
843                    ))
844                    .await
845                    .unwrap_or_default();
846                  Err(anyhow::anyhow!(format!(
847                    "Cannot load the \"{}\" TLS certificate: {}",
848                    cert_path, err
849                  )))?
850                }
851              };
852              let key = match load_private_key(key_path) {
853                Ok(key) => key,
854                Err(err) => {
855                  logger
856                    .send(LogMessage::new(
857                      format!("Cannot load the \"{}\" private key: {}", cert_path, err),
858                      true,
859                    ))
860                    .await
861                    .unwrap_or_default();
862                  Err(anyhow::anyhow!(format!(
863                    "Cannot load the \"{}\" private key: {}",
864                    cert_path, err
865                  )))?
866                }
867              };
868              let signing_key = match crypto_provider_cloned.key_provider.load_private_key(key) {
869                Ok(key) => key,
870                Err(err) => {
871                  logger
872                    .send(LogMessage::new(
873                      format!("Cannot load the \"{}\" private key: {}", cert_path, err),
874                      true,
875                    ))
876                    .await
877                    .unwrap_or_default();
878                  Err(anyhow::anyhow!(format!(
879                    "Cannot load the \"{}\" private key: {}",
880                    cert_path, err
881                  )))?
882                }
883              };
884              let certified_key_arc = Arc::new(CertifiedKey::new(certs, signing_key));
885              sni_resolver.load_host_cert_key(sni_hostname, certified_key_arc.clone());
886              certified_keys.push(certified_key_arc);
887            }
888          }
889        }
890      }
891    }
892  }
893
894  // Build TLS configuration
895  let tls_config_builder_wants_versions =
896    ServerConfig::builder_with_provider(Arc::new(crypto_provider_cloned));
897
898  // Very simple minimum and maximum TLS version logic for now...
899  let min_tls_version_option = yaml_config["global"]["tlsMinVersion"].as_str();
900  let max_tls_version_option = yaml_config["global"]["tlsMaxVersion"].as_str();
901  let tls_config_builder_wants_verifier = match min_tls_version_option {
902    Some("TLSv1.3") => match max_tls_version_option {
903      Some("TLSv1.2") => {
904        logger
905          .send(LogMessage::new(
906            String::from("The maximum TLS version is older than the minimum TLS version"),
907            true,
908          ))
909          .await
910          .unwrap_or_default();
911        Err(anyhow::anyhow!(String::from(
912          "The maximum TLS version is older than the minimum TLS version"
913        )))?
914      }
915      Some("TLSv1.3") | None => {
916        match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS13]) {
917          Ok(builder) => builder,
918          Err(err) => {
919            logger
920              .send(LogMessage::new(
921                format!("Couldn't create the TLS server configuration: {}", err),
922                true,
923              ))
924              .await
925              .unwrap_or_default();
926            Err(anyhow::anyhow!(format!(
927              "Couldn't create the TLS server configuration: {}",
928              err
929            )))?
930          }
931        }
932      }
933      _ => {
934        logger
935          .send(LogMessage::new(
936            String::from("Invalid maximum TLS version"),
937            true,
938          ))
939          .await
940          .unwrap_or_default();
941        Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
942      }
943    },
944    Some("TLSv1.2") | None => match max_tls_version_option {
945      Some("TLSv1.2") => {
946        match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12]) {
947          Ok(builder) => builder,
948          Err(err) => {
949            logger
950              .send(LogMessage::new(
951                format!("Couldn't create the TLS server configuration: {}", err),
952                true,
953              ))
954              .await
955              .unwrap_or_default();
956            Err(anyhow::anyhow!(format!(
957              "Couldn't create the TLS server configuration: {}",
958              err
959            )))?
960          }
961        }
962      }
963      Some("TLSv1.3") | None => {
964        match tls_config_builder_wants_versions.with_protocol_versions(&[&TLS12, &TLS13]) {
965          Ok(builder) => builder,
966          Err(err) => {
967            logger
968              .send(LogMessage::new(
969                format!("Couldn't create the TLS server configuration: {}", err),
970                true,
971              ))
972              .await
973              .unwrap_or_default();
974            Err(anyhow::anyhow!(format!(
975              "Couldn't create the TLS server configuration: {}",
976              err
977            )))?
978          }
979        }
980      }
981      _ => {
982        logger
983          .send(LogMessage::new(
984            String::from("Invalid maximum TLS version"),
985            true,
986          ))
987          .await
988          .unwrap_or_default();
989        Err(anyhow::anyhow!(String::from("Invalid maximum TLS version")))?
990      }
991    },
992    _ => {
993      logger
994        .send(LogMessage::new(
995          String::from("Invalid minimum TLS version"),
996          true,
997        ))
998        .await
999        .unwrap_or_default();
1000      Err(anyhow::anyhow!(String::from("Invalid minimum TLS version")))?
1001    }
1002  };
1003
1004  let tls_config_builder_wants_server_cert =
1005    match yaml_config["global"]["useClientCertificate"].as_bool() {
1006      Some(true) => {
1007        let mut roots = RootCertStore::empty();
1008        let certs_result = load_native_certs();
1009        if !certs_result.errors.is_empty() {
1010          logger
1011            .send(LogMessage::new(
1012              format!(
1013                "Couldn't load the native certificate store: {}",
1014                certs_result.errors[0]
1015              ),
1016              true,
1017            ))
1018            .await
1019            .unwrap_or_default();
1020          Err(anyhow::anyhow!(format!(
1021            "Couldn't load the native certificate store: {}",
1022            certs_result.errors[0]
1023          )))?
1024        }
1025        let certs = certs_result.certs;
1026
1027        for cert in certs {
1028          match roots.add(cert) {
1029            Ok(_) => (),
1030            Err(err) => {
1031              logger
1032                .send(LogMessage::new(
1033                  format!(
1034                    "Couldn't add a certificate to the certificate store: {}",
1035                    err
1036                  ),
1037                  true,
1038                ))
1039                .await
1040                .unwrap_or_default();
1041              Err(anyhow::anyhow!(format!(
1042                "Couldn't add a certificate to the certificate store: {}",
1043                err
1044              )))?
1045            }
1046          }
1047        }
1048        tls_config_builder_wants_verifier
1049          .with_client_cert_verifier(WebPkiClientVerifier::builder(Arc::new(roots)).build()?)
1050      }
1051      _ => tls_config_builder_wants_verifier.with_no_client_auth(),
1052    };
1053
1054  let mut tls_config;
1055
1056  let mut addr = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 80));
1057  let mut addr_tls = SocketAddr::from((IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)), 443));
1058  let mut tls_enabled = false;
1059  let mut non_tls_disabled = false;
1060
1061  // Install a process-wide cryptography provider. If it fails, then warn about it.
1062  if crypto_provider.install_default().is_err() && first_startup {
1063    logger
1064      .send(LogMessage::new(
1065        "Cannot install a process-wide cryptography provider".to_string(),
1066        true,
1067      ))
1068      .await
1069      .unwrap_or_default();
1070    Err(anyhow::anyhow!(
1071      "Cannot install a process-wide cryptography provider"
1072    ))?;
1073  }
1074
1075  // Read port configurations from YAML
1076  if let Some(read_port) = yaml_config["global"]["port"].as_i64() {
1077    addr = SocketAddr::from((
1078      IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1079      match read_port.try_into() {
1080        Ok(port) => port,
1081        Err(_) => {
1082          logger
1083            .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1084            .await
1085            .unwrap_or_default();
1086          Err(anyhow::anyhow!("Invalid HTTP port"))?
1087        }
1088      },
1089    ));
1090  } else if let Some(read_port) = yaml_config["global"]["port"].as_str() {
1091    addr = match read_port.parse() {
1092      Ok(addr) => addr,
1093      Err(_) => {
1094        logger
1095          .send(LogMessage::new(String::from("Invalid HTTP port"), true))
1096          .await
1097          .unwrap_or_default();
1098        Err(anyhow::anyhow!("Invalid HTTP port"))?
1099      }
1100    };
1101  }
1102
1103  if let Some(read_tls_enabled) = yaml_config["global"]["secure"].as_bool() {
1104    tls_enabled = read_tls_enabled;
1105    if let Some(read_non_tls_disabled) =
1106      yaml_config["global"]["disableNonEncryptedServer"].as_bool()
1107    {
1108      non_tls_disabled = read_non_tls_disabled;
1109    }
1110  }
1111
1112  if let Some(read_port) = yaml_config["global"]["sport"].as_i64() {
1113    addr_tls = SocketAddr::from((
1114      IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0)),
1115      match read_port.try_into() {
1116        Ok(port) => port,
1117        Err(_) => {
1118          logger
1119            .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1120            .await
1121            .unwrap_or_default();
1122          Err(anyhow::anyhow!("Invalid HTTPS port"))?
1123        }
1124      },
1125    ));
1126  } else if let Some(read_port) = yaml_config["global"]["sport"].as_str() {
1127    addr_tls = match read_port.parse() {
1128      Ok(addr) => addr,
1129      Err(_) => {
1130        logger
1131          .send(LogMessage::new(String::from("Invalid HTTPS port"), true))
1132          .await
1133          .unwrap_or_default();
1134        Err(anyhow::anyhow!("Invalid HTTPS port"))?
1135      }
1136    };
1137  }
1138
1139  // Get domains for ACME configuration
1140  let mut acme_domains = Vec::new();
1141  if let Some(hosts_config) = yaml_config["hosts"].as_vec() {
1142    for host_yaml in hosts_config.iter() {
1143      if let Some(host) = host_yaml.as_hash() {
1144        if let Some(domain_yaml) = host.get(&Yaml::from_str("domain")) {
1145          if let Some(domain) = domain_yaml.as_str() {
1146            if !domain.contains("*") {
1147              acme_domains.push(domain);
1148            }
1149          }
1150        }
1151      }
1152    }
1153  }
1154
1155  // Create ACME configuration
1156  let mut acme_config = AcmeConfig::new(acme_domains).challenge_type(acme_challenge_type);
1157  if let Some(acme_contact_unwrapped) = acme_contact {
1158    acme_config = acme_config.contact_push(format!("mailto:{}", acme_contact_unwrapped));
1159  }
1160  let mut acme_config_with_cache = acme_config.cache_option(acme_cache);
1161  acme_config_with_cache =
1162    acme_config_with_cache.directory_lets_encrypt(acme_letsencrypt_production);
1163
1164  let (acme_config, acme_http01_resolver) = if tls_enabled && automatic_tls_enabled {
1165    let mut acme_state = acme_config_with_cache.state();
1166
1167    let acme_resolver = acme_state.resolver();
1168
1169    // Create TLS configuration
1170    tls_config = if yaml_config["global"]["enableOCSPStapling"]
1171      .as_bool()
1172      .unwrap_or(true)
1173    {
1174      tls_config_builder_wants_server_cert
1175        .with_cert_resolver(Arc::new(Stapler::new(acme_resolver.clone())))
1176    } else {
1177      tls_config_builder_wants_server_cert.with_cert_resolver(acme_resolver.clone())
1178    };
1179
1180    let acme_logger = logger.clone();
1181    tokio::spawn(async move {
1182      while let Some(acme_result) = acme_state.next().await {
1183        if let Err(acme_error) = acme_result {
1184          acme_logger
1185            .send(LogMessage::new(
1186              format!("Error while obtaining a TLS certificate: {}", acme_error),
1187              true,
1188            ))
1189            .await
1190            .unwrap_or_default();
1191        }
1192      }
1193    });
1194
1195    if acme_use_http_challenge {
1196      (None, Some(acme_resolver))
1197    } else {
1198      let mut acme_config = tls_config.clone();
1199      acme_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
1200
1201      (Some(acme_config), None)
1202    }
1203  } else {
1204    // Create TLS configuration
1205    tls_config = if yaml_config["global"]["enableOCSPStapling"]
1206      .as_bool()
1207      .unwrap_or(true)
1208    {
1209      let ocsp_stapler_arc = Arc::new(Stapler::new(Arc::new(sni_resolver)));
1210      for certified_key in certified_keys.iter() {
1211        ocsp_stapler_arc.preload(certified_key.clone());
1212      }
1213      tls_config_builder_wants_server_cert.with_cert_resolver(ocsp_stapler_arc.clone())
1214    } else {
1215      tls_config_builder_wants_server_cert.with_cert_resolver(Arc::new(sni_resolver))
1216    };
1217
1218    // Drop the ACME configuration
1219    drop(acme_config_with_cache);
1220    (None, None)
1221  };
1222
1223  let quic_config = if tls_enabled
1224    && yaml_config["global"]["enableHTTP3"]
1225      .as_bool()
1226      .unwrap_or(false)
1227  {
1228    let mut quic_tls_config = tls_config.clone();
1229    quic_tls_config.max_early_data_size = u32::MAX;
1230    quic_tls_config.alpn_protocols = vec![b"h3".to_vec(), b"h3-29".to_vec()];
1231    let quic_config = quinn::ServerConfig::with_crypto(Arc::new(match QuicServerConfig::try_from(
1232      quic_tls_config,
1233    ) {
1234      Ok(quinn_config) => quinn_config,
1235      Err(err) => {
1236        logger
1237          .send(LogMessage::new(
1238            format!("There was a problem when starting HTTP/3 server: {}", err),
1239            true,
1240          ))
1241          .await
1242          .unwrap_or_default();
1243        Err(anyhow::anyhow!(format!(
1244          "There was a problem when starting HTTP/3 server: {}",
1245          err
1246        )))?
1247      }
1248    }));
1249    Some(quic_config)
1250  } else {
1251    None
1252  };
1253
1254  // Configure ALPN protocols
1255  let mut alpn_protocols = vec![b"http/1.1".to_vec(), b"http/1.0".to_vec()];
1256  if yaml_config["global"]["enableHTTP2"]
1257    .as_bool()
1258    .unwrap_or(true)
1259  {
1260    alpn_protocols.insert(0, b"h2".to_vec());
1261  }
1262  tls_config.alpn_protocols = alpn_protocols;
1263  let tls_config_arc = Arc::new(tls_config);
1264  let acme_config_arc = acme_config.map(Arc::new);
1265
1266  let mut listener = None;
1267  let mut listener_tls = None;
1268  let mut listener_quic = None;
1269
1270  // Bind to the specified ports
1271  if !non_tls_disabled {
1272    println!("HTTP server is listening at {}", addr);
1273    listener = Some(match TcpListener::bind(addr).await {
1274      Ok(listener) => listener,
1275      Err(err) => {
1276        logger
1277          .send(LogMessage::new(
1278            format!("Cannot listen to HTTP port: {}", err),
1279            true,
1280          ))
1281          .await
1282          .unwrap_or_default();
1283        Err(anyhow::anyhow!(format!(
1284          "Cannot listen to HTTP port: {}",
1285          err
1286        )))?
1287      }
1288    });
1289  }
1290
1291  if tls_enabled {
1292    println!("HTTPS server is listening at {}", addr_tls);
1293    listener_tls = Some(match TcpListener::bind(addr_tls).await {
1294      Ok(listener) => listener,
1295      Err(err) => {
1296        logger
1297          .send(LogMessage::new(
1298            format!("Cannot listen to HTTPS port: {}", err),
1299            true,
1300          ))
1301          .await
1302          .unwrap_or_default();
1303        Err(anyhow::anyhow!(format!(
1304          "Cannot listen to HTTPS port: {}",
1305          err
1306        )))?
1307      }
1308    });
1309
1310    if let Some(quic_config) = quic_config {
1311      println!("HTTP/3 server is listening at {}", addr_tls);
1312      listener_quic = Some(match quinn::Endpoint::server(quic_config, addr_tls) {
1313        Ok(listener) => listener,
1314        Err(err) => {
1315          logger
1316            .send(LogMessage::new(
1317              format!("Cannot listen to HTTP/3 port: {}", err),
1318              true,
1319            ))
1320            .await
1321            .unwrap_or_default();
1322          Err(anyhow::anyhow!(format!(
1323            "Cannot listen to HTTP/3 port: {}",
1324            err
1325          )))?
1326        }
1327      });
1328    }
1329  }
1330
1331  // Wrap the modules vector in an Arc
1332  let modules_arc = Arc::new(modules);
1333
1334  let http3_enabled = if listener_quic.is_some() {
1335    Some(addr_tls.port())
1336  } else {
1337    None
1338  };
1339
1340  // Main loop to accept incoming connections
1341  loop {
1342    let listener_borrowed = &listener;
1343    let listener_accept = async move {
1344      if let Some(listener) = listener_borrowed {
1345        listener.accept().await
1346      } else {
1347        futures_util::future::pending().await
1348      }
1349    };
1350
1351    let listener_tls_borrowed = &listener_tls;
1352    let listener_tls_accept = async move {
1353      if let Some(listener_tls) = listener_tls_borrowed {
1354        listener_tls.accept().await
1355      } else {
1356        futures_util::future::pending().await
1357      }
1358    };
1359
1360    let listener_quic_borrowed = &listener_quic;
1361    let listener_quic_accept = async move {
1362      if let Some(listener_quic) = listener_quic_borrowed {
1363        listener_quic.accept().await
1364      } else {
1365        futures_util::future::pending().await
1366      }
1367    };
1368
1369    if listener_borrowed.is_none()
1370      && listener_tls_borrowed.is_none()
1371      && listener_quic_borrowed.is_none()
1372    {
1373      logger
1374        .send(LogMessage::new(
1375          String::from("No server is listening"),
1376          true,
1377        ))
1378        .await
1379        .unwrap_or_default();
1380      Err(anyhow::anyhow!("No server is listening"))?;
1381    }
1382
1383    tokio::select! {
1384      status = listener_accept => {
1385        match status {
1386          Ok((stream, remote_address)) => {
1387            accept_connection(
1388              stream,
1389              remote_address,
1390              None,
1391              acme_http01_resolver.clone(),
1392              yaml_config.clone(),
1393              logger.clone(),
1394              modules_arc.clone(),
1395              None
1396            )
1397            .await;
1398          }
1399          Err(err) => {
1400            logger
1401              .send(LogMessage::new(
1402                format!("Cannot accept a connection: {}", err),
1403                true,
1404              ))
1405              .await
1406              .unwrap_or_default();
1407          }
1408        }
1409      },
1410      status = listener_tls_accept => {
1411        match status {
1412          Ok((stream, remote_address)) => {
1413            accept_connection(
1414              stream,
1415              remote_address,
1416              Some((tls_config_arc.clone(), acme_config_arc.clone())),
1417              None,
1418              yaml_config.clone(),
1419              logger.clone(),
1420              modules_arc.clone(),
1421              http3_enabled
1422            )
1423            .await;
1424          }
1425          Err(err) => {
1426            logger
1427              .send(LogMessage::new(
1428                format!("Cannot accept a connection: {}", err),
1429                true,
1430              ))
1431              .await
1432              .unwrap_or_default();
1433          }
1434        }
1435      },
1436      status = listener_quic_accept => {
1437        match status {
1438          Some(connection_attempt) => {
1439            let local_ip = SocketAddr::new(connection_attempt.local_ip().unwrap_or(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0))), addr_tls.port());
1440            accept_quic_connection(
1441              connection_attempt,
1442              local_ip,
1443              yaml_config.clone(),
1444              logger.clone(),
1445              modules_arc.clone()
1446            )
1447            .await;
1448          }
1449          None => {
1450            logger
1451              .send(LogMessage::new(
1452                "HTTP/3 connections can't be accepted anymore".to_string(),
1453                true,
1454              ))
1455              .await
1456              .unwrap_or_default();
1457          }
1458        }
1459      }
1460    };
1461  }
1462}
1463
1464// Start the server
1465#[allow(clippy::type_complexity)]
1466pub fn start_server(
1467  yaml_config: Arc<Yaml>,
1468  modules: Vec<Box<dyn ServerModule + Send + Sync>>,
1469  module_error: Option<anyhow::Error>,
1470  modules_optional_builtin: Vec<String>,
1471  first_startup: bool,
1472) -> Result<bool, Box<dyn Error + Send + Sync>> {
1473  if let Some(environment_variables_hash) = yaml_config["global"]["environmentVariables"].as_hash()
1474  {
1475    let environment_variables_hash_iter = environment_variables_hash.iter();
1476    for (variable_name, variable_value) in environment_variables_hash_iter {
1477      if let Some(variable_name) = variable_name.as_str() {
1478        if let Some(variable_value) = variable_value.as_str() {
1479          if !variable_name.is_empty()
1480            && !variable_name.contains('\0')
1481            && !variable_name.contains('=')
1482            && !variable_value.contains('\0')
1483          {
1484            // Safety: the environment variables are set before threads are spawned
1485            // The `std::env::set_var` function is safe to use in single-threaded environments
1486            // In Rust 2024 edition, the `std::env::set_var` function would be `unsafe`.
1487            env::set_var(variable_name, variable_value);
1488          }
1489        }
1490      }
1491    }
1492  }
1493
1494  let available_parallelism = thread::available_parallelism()?.get();
1495
1496  // Create Tokio runtime for the server
1497  let server_runtime = tokio::runtime::Builder::new_multi_thread()
1498    .worker_threads(available_parallelism)
1499    .max_blocking_threads(1536)
1500    .event_interval(25)
1501    .thread_name("server-pool")
1502    .enable_all()
1503    .build()?;
1504
1505  // Create Tokio runtime for logging
1506  let log_runtime = tokio::runtime::Builder::new_multi_thread()
1507    .worker_threads(match available_parallelism / 2 {
1508      0 => 1,
1509      non_zero => non_zero,
1510    })
1511    .max_blocking_threads(768)
1512    .thread_name("log-pool")
1513    .enable_time()
1514    .build()?;
1515
1516  let (logger, receive_log) = async_channel::bounded::<LogMessage>(10000);
1517
1518  let log_filename = yaml_config["global"]["logFilePath"]
1519    .as_str()
1520    .map(String::from);
1521  let error_log_filename = yaml_config["global"]["errorLogFilePath"]
1522    .as_str()
1523    .map(String::from);
1524
1525  log_runtime.spawn(async move {
1526    let log_file = match log_filename {
1527      Some(log_filename) => Some(
1528        fs::OpenOptions::new()
1529          .append(true)
1530          .create(true)
1531          .open(log_filename)
1532          .await,
1533      ),
1534      None => None,
1535    };
1536
1537    let error_log_file = match error_log_filename {
1538      Some(error_log_filename) => Some(
1539        fs::OpenOptions::new()
1540          .append(true)
1541          .create(true)
1542          .open(error_log_filename)
1543          .await,
1544      ),
1545      None => None,
1546    };
1547
1548    let log_file_wrapped = match log_file {
1549      Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1550      Some(Err(e)) => {
1551        eprintln!("Failed to open log file: {}", e);
1552        None
1553      }
1554      None => None,
1555    };
1556
1557    let error_log_file_wrapped = match error_log_file {
1558      Some(Ok(file)) => Some(Arc::new(Mutex::new(BufWriter::with_capacity(131072, file)))),
1559      Some(Err(e)) => {
1560        eprintln!("Failed to open error log file: {}", e);
1561        None
1562      }
1563      None => None,
1564    };
1565
1566    // The logs are written when the log message is received by the log event loop, and flushed every 100 ms, improving the server performance.
1567    let log_file_wrapped_cloned_for_sleep = log_file_wrapped.clone();
1568    let error_log_file_wrapped_cloned_for_sleep = error_log_file_wrapped.clone();
1569    tokio::task::spawn(async move {
1570      let mut interval = time::interval(time::Duration::from_millis(100));
1571      loop {
1572        interval.tick().await;
1573        if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned_for_sleep.clone() {
1574          let mut locked_file = log_file_wrapped_cloned.lock().await;
1575          locked_file.flush().await.unwrap_or_default();
1576        }
1577        if let Some(error_log_file_wrapped_cloned) = error_log_file_wrapped_cloned_for_sleep.clone()
1578        {
1579          let mut locked_file = error_log_file_wrapped_cloned.lock().await;
1580          locked_file.flush().await.unwrap_or_default();
1581        }
1582      }
1583    });
1584
1585    // Logging loop
1586    while let Ok(message) = receive_log.recv().await {
1587      let (mut message, is_error) = message.get_message();
1588      let log_file_wrapped_cloned = if !is_error {
1589        log_file_wrapped.clone()
1590      } else {
1591        error_log_file_wrapped.clone()
1592      };
1593
1594      if let Some(log_file_wrapped_cloned) = log_file_wrapped_cloned {
1595        tokio::task::spawn(async move {
1596          let mut locked_file = log_file_wrapped_cloned.lock().await;
1597          if is_error {
1598            let now: DateTime<Local> = Local::now();
1599            let formatted_time = now.format("%Y-%m-%d %H:%M:%S").to_string();
1600            message = format!("[{}]: {}", formatted_time, message);
1601          }
1602          message.push('\n');
1603          if let Err(e) = locked_file.write(message.as_bytes()).await {
1604            eprintln!("Failed to write to log file: {}", e);
1605          }
1606        });
1607      }
1608    }
1609  });
1610
1611  // Log env overrides once at startup
1612  for msg in env_config::log_env_var_overrides() {
1613    logger
1614      .send_blocking(LogMessage::new(msg, true))
1615      .unwrap_or_default();
1616  }
1617
1618  // Run the server event loop
1619  let result = server_runtime.block_on(async {
1620    let event_loop_future = server_event_loop(
1621      yaml_config,
1622      logger,
1623      modules,
1624      module_error,
1625      modules_optional_builtin,
1626      first_startup,
1627    );
1628
1629    let (continue_tx, continue_rx) = async_channel::unbounded::<bool>();
1630    let cancel_token = CancellationToken::new();
1631
1632    #[cfg(unix)]
1633    {
1634      let cancel_token_clone = cancel_token.clone();
1635      let continue_tx_clone = continue_tx.clone();
1636      tokio::spawn(async move {
1637        if let Ok(mut signal) = signal::unix::signal(signal::unix::SignalKind::hangup()) {
1638          tokio::select! {
1639            _ = signal.recv() => {
1640              continue_tx_clone.send(true).await.unwrap_or_default();
1641            }
1642            _ = cancel_token_clone.cancelled() => {}
1643          }
1644        }
1645      });
1646    }
1647
1648    let cancel_token_clone = cancel_token.clone();
1649    tokio::spawn(async move {
1650      tokio::select! {
1651        result = signal::ctrl_c() => {
1652          if result.is_ok() {
1653            continue_tx.send(false).await.unwrap_or_default();
1654          }
1655        }
1656        _ = cancel_token_clone.cancelled() => {}
1657      }
1658    });
1659
1660    let result = tokio::select! {
1661      result = event_loop_future => {
1662        // Sleep the Tokio runtime to ensure error logs are saved
1663        time::sleep(tokio::time::Duration::from_millis(100)).await;
1664
1665        result.map(|_| false)
1666      },
1667      continue_running = continue_rx.recv() => Ok(continue_running?)
1668    };
1669
1670    cancel_token.cancel();
1671
1672    result
1673  });
1674
1675  // Wait 10 seconds or until all tasks are complete
1676  server_runtime.shutdown_timeout(time::Duration::from_secs(10));
1677
1678  result
1679}