ferron/optional_modules/
fcgi.rs

1// FastCGI handler code inspired by SVR.JS's GreenRhombus mod, translated from JavaScript to Rust.
2// Based on the "cgi" and "scgi" module
3use std::env;
4use std::error::Error;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::ferron_common::{
10  ErrorLogger, HyperRequest, HyperResponse, RequestData, ResponseData, ServerConfig, ServerModule,
11  ServerModuleHandlers, SocketData,
12};
13use crate::ferron_common::{HyperUpgraded, WithRuntime};
14use async_trait::async_trait;
15use futures_util::future::Either;
16use futures_util::TryStreamExt;
17use hashlink::LinkedHashMap;
18use http_body_util::{BodyExt, StreamBody};
19use httparse::EMPTY_HEADER;
20use hyper::body::{Bytes, Frame};
21use hyper::{header, Response, StatusCode};
22use hyper_tungstenite::HyperWebsocket;
23use tokio::fs;
24use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
25use tokio::net::TcpStream;
26use tokio::runtime::Handle;
27use tokio::sync::RwLock;
28use tokio_util::codec::{FramedRead, FramedWrite};
29use tokio_util::io::{ReaderStream, SinkWriter, StreamReader};
30
31use crate::ferron_res::server_software::SERVER_SOFTWARE;
32use crate::ferron_util::cgi_response::CgiResponse;
33use crate::ferron_util::copy_move::Copier;
34use crate::ferron_util::fcgi_decoder::{FcgiDecodedData, FcgiDecoder};
35use crate::ferron_util::fcgi_encoder::FcgiEncoder;
36use crate::ferron_util::fcgi_name_value_pair::construct_fastcgi_name_value_pair;
37use crate::ferron_util::fcgi_record::construct_fastcgi_record;
38use crate::ferron_util::read_to_end_move::ReadToEndFuture;
39use crate::ferron_util::split_stream_by_map::SplitStreamByMapExt;
40use crate::ferron_util::ttl_cache::TtlCache;
41
42pub fn server_module_init(
43  _config: &ServerConfig,
44) -> Result<Box<dyn ServerModule + Send + Sync>, Box<dyn Error + Send + Sync>> {
45  let cache = Arc::new(RwLock::new(TtlCache::new(Duration::from_millis(100))));
46  Ok(Box::new(FcgiModule::new(cache)))
47}
48
49#[allow(clippy::type_complexity)]
50struct FcgiModule {
51  path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
52}
53
54impl FcgiModule {
55  #[allow(clippy::type_complexity)]
56  fn new(path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>) -> Self {
57    Self { path_cache }
58  }
59}
60
61impl ServerModule for FcgiModule {
62  fn get_handlers(&self, handle: Handle) -> Box<dyn ServerModuleHandlers + Send> {
63    Box::new(FcgiModuleHandlers {
64      path_cache: self.path_cache.clone(),
65      handle,
66    })
67  }
68}
69
70#[allow(clippy::type_complexity)]
71struct FcgiModuleHandlers {
72  handle: Handle,
73  path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
74}
75
76#[async_trait]
77impl ServerModuleHandlers for FcgiModuleHandlers {
78  async fn request_handler(
79    &mut self,
80    request: RequestData,
81    config: &ServerConfig,
82    socket_data: &SocketData,
83    error_logger: &ErrorLogger,
84  ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
85    WithRuntime::new(self.handle.clone(), async move {
86      let mut fastcgi_script_exts = Vec::new();
87
88      let fastcgi_script_exts_yaml = &config["fcgiScriptExtensions"];
89      if let Some(fastcgi_script_exts_obtained) = fastcgi_script_exts_yaml.as_vec() {
90        for fastcgi_script_ext_yaml in fastcgi_script_exts_obtained.iter() {
91          if let Some(fastcgi_script_ext) = fastcgi_script_ext_yaml.as_str() {
92            fastcgi_script_exts.push(fastcgi_script_ext);
93          }
94        }
95      }
96
97      let mut fastcgi_to = "tcp://localhost:4000/";
98      let fastcgi_to_yaml = &config["fcgiTo"];
99      if let Some(fastcgi_to_obtained) = fastcgi_to_yaml.as_str() {
100        fastcgi_to = fastcgi_to_obtained;
101      }
102
103      let mut fastcgi_path = None;
104      if let Some(fastcgi_path_obtained) = config["fcgiPath"].as_str() {
105        fastcgi_path = Some(fastcgi_path_obtained.to_string());
106      }
107
108      let hyper_request = request.get_hyper_request();
109
110      let request_path = hyper_request.uri().path();
111      let mut request_path_bytes = request_path.bytes();
112      if request_path_bytes.len() < 1 || request_path_bytes.nth(0) != Some(b'/') {
113        return Ok(
114          ResponseData::builder(request)
115            .status(StatusCode::BAD_REQUEST)
116            .build(),
117        );
118      }
119
120      let mut execute_pathbuf = None;
121      let mut execute_path_info = None;
122      let mut wwwroot_detected = None;
123
124      if let Some(fastcgi_path) = fastcgi_path {
125        let mut canonical_fastcgi_path: &str = &fastcgi_path;
126        if canonical_fastcgi_path.bytes().last() == Some(b'/') {
127          canonical_fastcgi_path = &canonical_fastcgi_path[..(canonical_fastcgi_path.len() - 1)];
128        }
129
130        let request_path_with_slashes = match request_path == canonical_fastcgi_path {
131          true => format!("{}/", request_path),
132          false => request_path.to_string(),
133        };
134        if let Some(stripped_request_path) =
135          request_path_with_slashes.strip_prefix(canonical_fastcgi_path)
136        {
137          let wwwroot_yaml = &config["wwwroot"];
138          let wwwroot = wwwroot_yaml.as_str().unwrap_or("/nonexistent");
139
140          let wwwroot_unknown = PathBuf::from(wwwroot);
141          let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
142            true => wwwroot_unknown,
143            false => match fs::canonicalize(&wwwroot_unknown).await {
144              Ok(pathbuf) => pathbuf,
145              Err(_) => wwwroot_unknown,
146            },
147          };
148          wwwroot_detected = Some(wwwroot_pathbuf.clone());
149          let wwwroot = wwwroot_pathbuf.as_path();
150
151          let mut relative_path = &request_path[1..];
152          while relative_path.as_bytes().first().copied() == Some(b'/') {
153            relative_path = &relative_path[1..];
154          }
155
156          let decoded_relative_path = match urlencoding::decode(relative_path) {
157            Ok(path) => path.to_string(),
158            Err(_) => {
159              return Ok(
160                ResponseData::builder(request)
161                  .status(StatusCode::BAD_REQUEST)
162                  .build(),
163              );
164            }
165          };
166
167          let joined_pathbuf = wwwroot.join(decoded_relative_path);
168          execute_pathbuf = Some(joined_pathbuf);
169          execute_path_info = stripped_request_path
170            .strip_prefix("/")
171            .map(|s| s.to_string());
172        }
173      }
174
175      if execute_pathbuf.is_none() {
176        if let Some(wwwroot) = config["wwwroot"].as_str() {
177          let cache_key = format!(
178            "{}{}{}",
179            match config["ip"].as_str() {
180              Some(ip) => format!("{}-", ip),
181              None => String::from(""),
182            },
183            match config["domain"].as_str() {
184              Some(domain) => format!("{}-", domain),
185              None => String::from(""),
186            },
187            request_path
188          );
189
190          let wwwroot_unknown = PathBuf::from(wwwroot);
191          let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
192            true => wwwroot_unknown,
193            false => match fs::canonicalize(&wwwroot_unknown).await {
194              Ok(pathbuf) => pathbuf,
195              Err(_) => wwwroot_unknown,
196            },
197          };
198          wwwroot_detected = Some(wwwroot_pathbuf.clone());
199          let wwwroot = wwwroot_pathbuf.as_path();
200
201          let read_rwlock = self.path_cache.read().await;
202          let (execute_pathbuf_got, execute_path_info_got) = match read_rwlock.get(&cache_key) {
203            Some(data) => {
204              drop(read_rwlock);
205              data
206            }
207            None => {
208              drop(read_rwlock);
209              let mut relative_path = &request_path[1..];
210              while relative_path.as_bytes().first().copied() == Some(b'/') {
211                relative_path = &relative_path[1..];
212              }
213
214              let decoded_relative_path = match urlencoding::decode(relative_path) {
215                Ok(path) => path.to_string(),
216                Err(_) => {
217                  return Ok(
218                    ResponseData::builder(request)
219                      .status(StatusCode::BAD_REQUEST)
220                      .build(),
221                  );
222                }
223              };
224
225              let joined_pathbuf = wwwroot.join(decoded_relative_path);
226              let mut execute_pathbuf: Option<PathBuf> = None;
227              let mut execute_path_info: Option<String> = None;
228
229              match fs::metadata(&joined_pathbuf).await {
230                Ok(metadata) => {
231                  if metadata.is_file() {
232                    let contained_extension = joined_pathbuf
233                      .extension()
234                      .map(|a| format!(".{}", a.to_string_lossy()));
235                    if let Some(contained_extension) = contained_extension {
236                      if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
237                        execute_pathbuf = Some(joined_pathbuf);
238                      }
239                    }
240                  } else if metadata.is_dir() {
241                    let indexes = vec!["index.php", "index.cgi"];
242                    for index in indexes {
243                      let temp_joined_pathbuf = joined_pathbuf.join(index);
244                      match fs::metadata(&temp_joined_pathbuf).await {
245                        Ok(temp_metadata) => {
246                          if temp_metadata.is_file() {
247                            let contained_extension = temp_joined_pathbuf
248                              .extension()
249                              .map(|a| format!(".{}", a.to_string_lossy()));
250                            if let Some(contained_extension) = contained_extension {
251                              if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
252                                execute_pathbuf = Some(temp_joined_pathbuf);
253                                break;
254                              }
255                            }
256                          }
257                        }
258                        Err(_) => continue,
259                      };
260                    }
261                  }
262                }
263                Err(err) => {
264                  if err.kind() == tokio::io::ErrorKind::NotADirectory {
265                    // TODO: find a file
266                    let mut temp_pathbuf = joined_pathbuf.clone();
267                    loop {
268                      if !temp_pathbuf.pop() {
269                        break;
270                      }
271                      match fs::metadata(&temp_pathbuf).await {
272                        Ok(metadata) => {
273                          if metadata.is_file() {
274                            let temp_path = temp_pathbuf.as_path();
275                            if !temp_path.starts_with(wwwroot) {
276                              // Traversed above the webroot, so ignore that.
277                              break;
278                            }
279                            let path_info = match joined_pathbuf.as_path().strip_prefix(temp_path) {
280                              Ok(path) => {
281                                let path = path.to_string_lossy().to_string();
282                                Some(match cfg!(windows) {
283                                  true => path.replace("\\", "/"),
284                                  false => path,
285                                })
286                              }
287                              Err(_) => None,
288                            };
289                            let mut request_path_normalized = match cfg!(windows) {
290                              true => request_path.to_lowercase(),
291                              false => request_path.to_string(),
292                            };
293                            while request_path_normalized.contains("//") {
294                              request_path_normalized = request_path_normalized.replace("//", "/");
295                            }
296                            if request_path_normalized == "/cgi-bin"
297                              || request_path_normalized.starts_with("/cgi-bin/")
298                            {
299                              execute_pathbuf = Some(temp_pathbuf);
300                              execute_path_info = path_info;
301                              break;
302                            } else {
303                              let contained_extension = temp_pathbuf
304                                .extension()
305                                .map(|a| format!(".{}", a.to_string_lossy()));
306                              if let Some(contained_extension) = contained_extension {
307                                if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
308                                  execute_pathbuf = Some(temp_pathbuf);
309                                  execute_path_info = path_info;
310                                  break;
311                                }
312                              }
313                            }
314                          } else {
315                            break;
316                          }
317                        }
318                        Err(err) => match err.kind() {
319                          tokio::io::ErrorKind::NotADirectory => (),
320                          _ => break,
321                        },
322                      };
323                    }
324                  }
325                }
326              };
327              let data = (execute_pathbuf, execute_path_info);
328
329              let mut write_rwlock = self.path_cache.write().await;
330              write_rwlock.cleanup();
331              write_rwlock.insert(cache_key, data.clone());
332              drop(write_rwlock);
333              data
334            }
335          };
336
337          execute_pathbuf = execute_pathbuf_got;
338          execute_path_info = execute_path_info_got;
339        }
340      }
341
342      if let Some(execute_pathbuf) = execute_pathbuf {
343        if let Some(wwwroot_detected) = wwwroot_detected {
344          return execute_fastcgi_with_environment_variables(
345            request,
346            socket_data,
347            error_logger,
348            wwwroot_detected.as_path(),
349            execute_pathbuf,
350            execute_path_info,
351            config["serverAdministratorEmail"].as_str(),
352            fastcgi_to,
353          )
354          .await;
355        }
356      }
357
358      Ok(ResponseData::builder(request).build())
359    })
360    .await
361  }
362
363  async fn proxy_request_handler(
364    &mut self,
365    request: RequestData,
366    _config: &ServerConfig,
367    _socket_data: &SocketData,
368    _error_logger: &ErrorLogger,
369  ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
370    Ok(ResponseData::builder(request).build())
371  }
372
373  async fn response_modifying_handler(
374    &mut self,
375    response: HyperResponse,
376  ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
377    Ok(response)
378  }
379
380  async fn proxy_response_modifying_handler(
381    &mut self,
382    response: HyperResponse,
383  ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
384    Ok(response)
385  }
386
387  async fn connect_proxy_request_handler(
388    &mut self,
389    _upgraded_request: HyperUpgraded,
390    _connect_address: &str,
391    _config: &ServerConfig,
392    _socket_data: &SocketData,
393    _error_logger: &ErrorLogger,
394  ) -> Result<(), Box<dyn Error + Send + Sync>> {
395    Ok(())
396  }
397
398  fn does_connect_proxy_requests(&mut self) -> bool {
399    false
400  }
401
402  async fn websocket_request_handler(
403    &mut self,
404    _websocket: HyperWebsocket,
405    _uri: &hyper::Uri,
406    _headers: &hyper::HeaderMap,
407    _config: &ServerConfig,
408    _socket_data: &SocketData,
409    _error_logger: &ErrorLogger,
410  ) -> Result<(), Box<dyn Error + Send + Sync>> {
411    Ok(())
412  }
413
414  fn does_websocket_requests(&mut self, _config: &ServerConfig, _socket_data: &SocketData) -> bool {
415    false
416  }
417}
418
419#[allow(clippy::too_many_arguments)]
420async fn execute_fastcgi_with_environment_variables(
421  request: RequestData,
422  socket_data: &SocketData,
423  error_logger: &ErrorLogger,
424  wwwroot: &Path,
425  execute_pathbuf: PathBuf,
426  path_info: Option<String>,
427  server_administrator_email: Option<&str>,
428  fastcgi_to: &str,
429) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
430  let mut environment_variables: LinkedHashMap<String, String> = LinkedHashMap::new();
431
432  let hyper_request = request.get_hyper_request();
433  let original_request_uri = request.get_original_url().unwrap_or(hyper_request.uri());
434
435  if let Some(auth_user) = request.get_auth_user() {
436    if let Some(authorization) = hyper_request.headers().get(header::AUTHORIZATION) {
437      let authorization_value = String::from_utf8_lossy(authorization.as_bytes()).to_string();
438      let mut authorization_value_split = authorization_value.split(" ");
439      if let Some(authorization_type) = authorization_value_split.next() {
440        environment_variables.insert("AUTH_TYPE".to_string(), authorization_type.to_string());
441      }
442    }
443    environment_variables.insert("REMOTE_USER".to_string(), auth_user.to_string());
444  }
445
446  environment_variables.insert(
447    "QUERY_STRING".to_string(),
448    match hyper_request.uri().query() {
449      Some(query) => query.to_string(),
450      None => "".to_string(),
451    },
452  );
453
454  environment_variables.insert("SERVER_SOFTWARE".to_string(), SERVER_SOFTWARE.to_string());
455  environment_variables.insert(
456    "SERVER_PROTOCOL".to_string(),
457    match hyper_request.version() {
458      hyper::Version::HTTP_09 => "HTTP/0.9".to_string(),
459      hyper::Version::HTTP_10 => "HTTP/1.0".to_string(),
460      hyper::Version::HTTP_11 => "HTTP/1.1".to_string(),
461      hyper::Version::HTTP_2 => "HTTP/2.0".to_string(),
462      hyper::Version::HTTP_3 => "HTTP/3.0".to_string(),
463      _ => "HTTP/Unknown".to_string(),
464    },
465  );
466  environment_variables.insert(
467    "SERVER_PORT".to_string(),
468    socket_data.local_addr.port().to_string(),
469  );
470  environment_variables.insert(
471    "SERVER_ADDR".to_string(),
472    socket_data.local_addr.ip().to_canonical().to_string(),
473  );
474  if let Some(server_administrator_email) = server_administrator_email {
475    environment_variables.insert(
476      "SERVER_ADMIN".to_string(),
477      server_administrator_email.to_string(),
478    );
479  }
480  if let Some(host) = hyper_request.headers().get(header::HOST) {
481    environment_variables.insert(
482      "SERVER_NAME".to_string(),
483      String::from_utf8_lossy(host.as_bytes()).to_string(),
484    );
485  }
486
487  environment_variables.insert(
488    "DOCUMENT_ROOT".to_string(),
489    wwwroot.to_string_lossy().to_string(),
490  );
491  environment_variables.insert(
492    "PATH_INFO".to_string(),
493    match &path_info {
494      Some(path_info) => format!("/{}", path_info),
495      None => "".to_string(),
496    },
497  );
498  environment_variables.insert(
499    "PATH_TRANSLATED".to_string(),
500    match &path_info {
501      Some(path_info) => {
502        let mut path_translated = execute_pathbuf.clone();
503        path_translated.push(path_info);
504        path_translated.to_string_lossy().to_string()
505      }
506      None => "".to_string(),
507    },
508  );
509  environment_variables.insert(
510    "REQUEST_METHOD".to_string(),
511    hyper_request.method().to_string(),
512  );
513  environment_variables.insert("GATEWAY_INTERFACE".to_string(), "CGI/1.1".to_string());
514  environment_variables.insert(
515    "REQUEST_URI".to_string(),
516    format!(
517      "{}{}",
518      original_request_uri.path(),
519      match original_request_uri.query() {
520        Some(query) => format!("?{}", query),
521        None => String::from(""),
522      }
523    ),
524  );
525
526  environment_variables.insert(
527    "REMOTE_PORT".to_string(),
528    socket_data.remote_addr.port().to_string(),
529  );
530  environment_variables.insert(
531    "REMOTE_ADDR".to_string(),
532    socket_data.remote_addr.ip().to_canonical().to_string(),
533  );
534
535  environment_variables.insert(
536    "SCRIPT_FILENAME".to_string(),
537    execute_pathbuf.to_string_lossy().to_string(),
538  );
539  if let Ok(script_path) = execute_pathbuf.as_path().strip_prefix(wwwroot) {
540    environment_variables.insert(
541      "SCRIPT_NAME".to_string(),
542      format!(
543        "/{}",
544        match cfg!(windows) {
545          true => script_path.to_string_lossy().to_string().replace("\\", "/"),
546          false => script_path.to_string_lossy().to_string(),
547        }
548      ),
549    );
550  }
551
552  if socket_data.encrypted {
553    environment_variables.insert("HTTPS".to_string(), "ON".to_string());
554  }
555
556  let mut content_length_set = false;
557  for (header_name, header_value) in hyper_request.headers().iter() {
558    let env_header_name = match *header_name {
559      header::CONTENT_LENGTH => {
560        content_length_set = true;
561        "CONTENT_LENGTH".to_string()
562      }
563      header::CONTENT_TYPE => "CONTENT_TYPE".to_string(),
564      _ => {
565        let mut result = String::new();
566
567        result.push_str("HTTP_");
568
569        for c in header_name.as_str().to_uppercase().chars() {
570          if c.is_alphanumeric() {
571            result.push(c);
572          } else {
573            result.push('_');
574          }
575        }
576
577        result
578      }
579    };
580    if environment_variables.contains_key(&env_header_name) {
581      let value = environment_variables.get_mut(&env_header_name);
582      if let Some(value) = value {
583        if env_header_name == "HTTP_COOKIE" {
584          value.push_str("; ");
585        } else {
586          // See https://stackoverflow.com/a/1801191
587          value.push_str(", ");
588        }
589        value.push_str(String::from_utf8_lossy(header_value.as_bytes()).as_ref());
590      } else {
591        environment_variables.insert(
592          env_header_name,
593          String::from_utf8_lossy(header_value.as_bytes()).to_string(),
594        );
595      }
596    } else {
597      environment_variables.insert(
598        env_header_name,
599        String::from_utf8_lossy(header_value.as_bytes()).to_string(),
600      );
601    }
602  }
603
604  if !content_length_set {
605    environment_variables.insert("CONTENT_LENGTH".to_string(), "0".to_string());
606  }
607
608  let (hyper_request, _, _, _) = request.into_parts();
609
610  execute_fastcgi(
611    hyper_request,
612    error_logger,
613    fastcgi_to,
614    environment_variables,
615  )
616  .await
617}
618
619async fn execute_fastcgi(
620  hyper_request: HyperRequest,
621  error_logger: &ErrorLogger,
622  fastcgi_to: &str,
623  mut environment_variables: LinkedHashMap<String, String>,
624) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
625  let (_, body) = hyper_request.into_parts();
626
627  // Insert other environment variables
628  for (key, value) in env::vars_os() {
629    let key_string = key.to_string_lossy().to_string();
630    let value_string = value.to_string_lossy().to_string();
631    environment_variables
632      .entry(key_string)
633      .or_insert(value_string);
634  }
635
636  let fastcgi_to_fixed = if let Some(stripped) = fastcgi_to.strip_prefix("unix:///") {
637    // hyper::Uri fails to parse a string if there is an empty authority, so add an "ignore" authority to Unix socket URLs
638    &format!("unix://ignore/{}", stripped)
639  } else {
640    fastcgi_to
641  };
642
643  let fastcgi_to_url = fastcgi_to_fixed.parse::<hyper::Uri>()?;
644  let scheme_str = fastcgi_to_url.scheme_str();
645
646  let (socket_reader, mut socket_writer) = match scheme_str {
647    Some("tcp") => {
648      let host = match fastcgi_to_url.host() {
649        Some(host) => host,
650        None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the host"))?,
651      };
652
653      let port = match fastcgi_to_url.port_u16() {
654        Some(port) => port,
655        None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the port"))?,
656      };
657
658      let addr = format!("{}:{}", host, port);
659
660      match connect_tcp(&addr).await {
661        Ok(data) => data,
662        Err(err) => match err.kind() {
663          tokio::io::ErrorKind::ConnectionRefused
664          | tokio::io::ErrorKind::NotFound
665          | tokio::io::ErrorKind::HostUnreachable => {
666            error_logger
667              .log(&format!("Service unavailable: {}", err))
668              .await;
669            return Ok(
670              ResponseData::builder_without_request()
671                .status(StatusCode::SERVICE_UNAVAILABLE)
672                .build(),
673            );
674          }
675          _ => Err(err)?,
676        },
677      }
678    }
679    Some("unix") => {
680      let path = fastcgi_to_url.path();
681      match connect_unix(path).await {
682        Ok(data) => data,
683        Err(err) => match err.kind() {
684          tokio::io::ErrorKind::ConnectionRefused
685          | tokio::io::ErrorKind::NotFound
686          | tokio::io::ErrorKind::HostUnreachable => {
687            error_logger
688              .log(&format!("Service unavailable: {}", err))
689              .await;
690            return Ok(
691              ResponseData::builder_without_request()
692                .status(StatusCode::SERVICE_UNAVAILABLE)
693                .build(),
694            );
695          }
696          _ => Err(err)?,
697        },
698      }
699    }
700    _ => Err(anyhow::anyhow!(
701      "Only HTTP and HTTPS reverse proxy URLs are supported."
702    ))?,
703  };
704
705  // Construct and send BEGIN_REQUEST record
706  // Use the responder role and don't use keep-alive
707  let begin_request_packet = construct_fastcgi_record(1, 1, &[0, 1, 0, 0, 0, 0, 0, 0]);
708  socket_writer.write_all(&begin_request_packet).await?;
709
710  // Construct and send PARAMS records
711  let mut environment_variables_to_wrap = Vec::new();
712  for (key, value) in environment_variables.iter() {
713    let mut environment_variable =
714      construct_fastcgi_name_value_pair(key.as_bytes(), value.as_bytes());
715    environment_variables_to_wrap.append(&mut environment_variable);
716  }
717  if !environment_variables_to_wrap.is_empty() {
718    let mut offset = 0;
719    while offset < environment_variables_to_wrap.len() {
720      let chunk_size = std::cmp::min(65536, environment_variables_to_wrap.len() - offset);
721      let chunk = &environment_variables_to_wrap[offset..offset + chunk_size];
722
723      // Record type 4 means PARAMS
724      let params_packet = construct_fastcgi_record(4, 1, chunk);
725      socket_writer.write_all(&params_packet).await?;
726
727      offset += chunk_size;
728    }
729  }
730
731  let params_packet_terminating = construct_fastcgi_record(4, 1, &[]);
732  socket_writer.write_all(&params_packet_terminating).await?;
733
734  let cgi_stdin_reader = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
735
736  // Emulated standard input, standard output, and standard error
737  type EitherStream = Either<Result<Bytes, std::io::Error>, Result<Bytes, std::io::Error>>;
738  let stdin = SinkWriter::new(FramedWrite::new(socket_writer, FcgiEncoder::new()));
739  let stdout_and_stderr = FramedRead::new(socket_reader, FcgiDecoder::new());
740  let (stdout_stream, stderr_stream) = stdout_and_stderr.split_by_map(|item| match item {
741    Ok(FcgiDecodedData::Stdout(bytes)) => EitherStream::Left(Ok(bytes)),
742    Ok(FcgiDecodedData::Stderr(bytes)) => EitherStream::Right(Ok(bytes)),
743    Err(err) => EitherStream::Left(Err(err)),
744  });
745  let stdout = StreamReader::new(stdout_stream);
746  let stderr = StreamReader::new(stderr_stream);
747
748  let mut cgi_response = CgiResponse::new(stdout);
749
750  let stdin_copy_future = Copier::with_zero_packet_writing(cgi_stdin_reader, stdin).copy();
751  let mut stdin_copy_future_pinned = Box::pin(stdin_copy_future);
752
753  let stderr_read_future = ReadToEndFuture::new(stderr);
754  let mut stderr_read_future_pinned = Box::pin(stderr_read_future);
755
756  let mut headers = [EMPTY_HEADER; 128];
757
758  let mut early_stdin_copied = false;
759
760  // Needed to wrap this in another scope to prevent errors with multiple mutable borrows.
761  {
762    let mut head_obtained = false;
763    let stdout_parse_future = cgi_response.get_head();
764    tokio::pin!(stdout_parse_future);
765
766    // Cannot use a loop with tokio::select, since stdin_copy_future_pinned being constantly ready will make the web server stop responding to HTTP requests
767    tokio::select! {
768      biased;
769
770      result = &mut stdin_copy_future_pinned => {
771        early_stdin_copied = true;
772        result?;
773      },
774      obtained_head = &mut stdout_parse_future => {
775        let obtained_head = obtained_head?;
776        if !obtained_head.is_empty() {
777          httparse::parse_headers(obtained_head, &mut headers)?;
778        }
779        head_obtained = true;
780      },
781      result = &mut stderr_read_future_pinned => {
782        let stderr_vec = result?;
783          let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
784          if !stderr_string.is_empty() {
785            error_logger
786              .log(&format!("There were CGI errors: {}", stderr_string))
787              .await;
788          }
789        return Ok(
790          ResponseData::builder_without_request()
791            .status(StatusCode::INTERNAL_SERVER_ERROR)
792            .build(),
793        );
794      },
795    }
796
797    if !head_obtained {
798      // Kept it same as in the tokio::select macro
799      tokio::select! {
800        biased;
801
802        result = &mut stderr_read_future_pinned => {
803          let stderr_vec = result?;
804            let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
805            if !stderr_string.is_empty() {
806              error_logger
807                .log(&format!("There were FastCGI errors: {}", stderr_string))
808                .await;
809            }
810          return Ok(
811            ResponseData::builder_without_request()
812              .status(StatusCode::INTERNAL_SERVER_ERROR)
813              .build(),
814          );
815        },
816        obtained_head = &mut stdout_parse_future => {
817          let obtained_head = obtained_head?;
818          if !obtained_head.is_empty() {
819            httparse::parse_headers(obtained_head, &mut headers)?;
820          }
821        }
822      }
823    }
824  }
825
826  let mut response_builder = Response::builder();
827  let mut status_code = 200;
828  for header in headers {
829    if header == EMPTY_HEADER {
830      break;
831    }
832    let mut is_status_header = false;
833    match &header.name.to_lowercase() as &str {
834      "location" => {
835        if !(300..=399).contains(&status_code) {
836          status_code = 302;
837        }
838      }
839      "status" => {
840        is_status_header = true;
841        let header_value_cow = String::from_utf8_lossy(header.value);
842        let mut split_status = header_value_cow.split(" ");
843        let first_part = split_status.next();
844        if let Some(first_part) = first_part {
845          if first_part.starts_with("HTTP/") {
846            let second_part = split_status.next();
847            if let Some(second_part) = second_part {
848              if let Ok(parsed_status_code) = second_part.parse::<u16>() {
849                status_code = parsed_status_code;
850              }
851            }
852          } else if let Ok(parsed_status_code) = first_part.parse::<u16>() {
853            status_code = parsed_status_code;
854          }
855        }
856      }
857      _ => (),
858    }
859    if !is_status_header {
860      response_builder = response_builder.header(header.name, header.value);
861    }
862  }
863
864  response_builder = response_builder.status(status_code);
865
866  let reader_stream = ReaderStream::new(cgi_response);
867  let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data));
868  let boxed_body = stream_body.boxed();
869
870  let response = response_builder.body(boxed_body)?;
871
872  let error_logger = error_logger.clone();
873
874  Ok(
875    ResponseData::builder_without_request()
876      .response(response)
877      .parallel_fn(async move {
878        let mut stdin_copied = early_stdin_copied;
879
880        if !stdin_copied {
881          tokio::select! {
882            biased;
883
884            _ = &mut stdin_copy_future_pinned => {
885              stdin_copied = true;
886            },
887            result = &mut stderr_read_future_pinned => {
888              let stderr_vec = result.unwrap_or(vec![]);
889              let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
890              if !stderr_string.is_empty() {
891                error_logger
892                  .log(&format!("There were FastCGI errors: {}", stderr_string))
893                  .await;
894              }
895            },
896          }
897        }
898
899        if stdin_copied {
900          let stderr_vec = stderr_read_future_pinned.await.unwrap_or(vec![]);
901          let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
902          if !stderr_string.is_empty() {
903            error_logger
904              .log(&format!("There were FastCGI errors: {}", stderr_string))
905              .await;
906          }
907        } else {
908          stdin_copy_future_pinned.await.unwrap_or_default();
909        }
910      })
911      .build(),
912  )
913}
914
915async fn connect_tcp(
916  addr: &str,
917) -> Result<
918  (
919    Box<dyn AsyncRead + Send + Sync + Unpin>,
920    Box<dyn AsyncWrite + Send + Sync + Unpin>,
921  ),
922  tokio::io::Error,
923> {
924  let socket = TcpStream::connect(addr).await?;
925  socket.set_nodelay(true)?;
926
927  let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
928  Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
929}
930
931#[allow(dead_code)]
932#[cfg(unix)]
933async fn connect_unix(
934  path: &str,
935) -> Result<
936  (
937    Box<dyn AsyncRead + Send + Sync + Unpin>,
938    Box<dyn AsyncWrite + Send + Sync + Unpin>,
939  ),
940  tokio::io::Error,
941> {
942  use tokio::net::UnixStream;
943
944  let socket = UnixStream::connect(path).await?;
945
946  let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
947  Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
948}
949
950#[allow(dead_code)]
951#[cfg(not(unix))]
952async fn connect_unix(
953  _path: &str,
954) -> Result<
955  (
956    Box<dyn AsyncRead + Send + Sync + Unpin>,
957    Box<dyn AsyncWrite + Send + Sync + Unpin>,
958  ),
959  tokio::io::Error,
960> {
961  Err(tokio::io::Error::new(
962    tokio::io::ErrorKind::Unsupported,
963    "Unix sockets are not supports on non-Unix platforms.",
964  ))
965}