1use std::env;
4use std::error::Error;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::ferron_common::{
10 ErrorLogger, HyperRequest, HyperResponse, RequestData, ResponseData, ServerConfig, ServerModule,
11 ServerModuleHandlers, SocketData,
12};
13use crate::ferron_common::{HyperUpgraded, WithRuntime};
14use async_trait::async_trait;
15use futures_util::future::Either;
16use futures_util::TryStreamExt;
17use hashlink::LinkedHashMap;
18use http_body_util::{BodyExt, StreamBody};
19use httparse::EMPTY_HEADER;
20use hyper::body::{Bytes, Frame};
21use hyper::{header, Response, StatusCode};
22use hyper_tungstenite::HyperWebsocket;
23use tokio::fs;
24use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
25use tokio::net::TcpStream;
26use tokio::runtime::Handle;
27use tokio::sync::RwLock;
28use tokio_util::codec::{FramedRead, FramedWrite};
29use tokio_util::io::{ReaderStream, SinkWriter, StreamReader};
30
31use crate::ferron_res::server_software::SERVER_SOFTWARE;
32use crate::ferron_util::cgi_response::CgiResponse;
33use crate::ferron_util::copy_move::Copier;
34use crate::ferron_util::fcgi_decoder::{FcgiDecodedData, FcgiDecoder};
35use crate::ferron_util::fcgi_encoder::FcgiEncoder;
36use crate::ferron_util::fcgi_name_value_pair::construct_fastcgi_name_value_pair;
37use crate::ferron_util::fcgi_record::construct_fastcgi_record;
38use crate::ferron_util::read_to_end_move::ReadToEndFuture;
39use crate::ferron_util::split_stream_by_map::SplitStreamByMapExt;
40use crate::ferron_util::ttl_cache::TtlCache;
41
42pub fn server_module_init(
43 _config: &ServerConfig,
44) -> Result<Box<dyn ServerModule + Send + Sync>, Box<dyn Error + Send + Sync>> {
45 let cache = Arc::new(RwLock::new(TtlCache::new(Duration::from_millis(100))));
46 Ok(Box::new(FcgiModule::new(cache)))
47}
48
49#[allow(clippy::type_complexity)]
50struct FcgiModule {
51 path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
52}
53
54impl FcgiModule {
55 #[allow(clippy::type_complexity)]
56 fn new(path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>) -> Self {
57 Self { path_cache }
58 }
59}
60
61impl ServerModule for FcgiModule {
62 fn get_handlers(&self, handle: Handle) -> Box<dyn ServerModuleHandlers + Send> {
63 Box::new(FcgiModuleHandlers {
64 path_cache: self.path_cache.clone(),
65 handle,
66 })
67 }
68}
69
70#[allow(clippy::type_complexity)]
71struct FcgiModuleHandlers {
72 handle: Handle,
73 path_cache: Arc<RwLock<TtlCache<String, (Option<PathBuf>, Option<String>)>>>,
74}
75
76#[async_trait]
77impl ServerModuleHandlers for FcgiModuleHandlers {
78 async fn request_handler(
79 &mut self,
80 request: RequestData,
81 config: &ServerConfig,
82 socket_data: &SocketData,
83 error_logger: &ErrorLogger,
84 ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
85 WithRuntime::new(self.handle.clone(), async move {
86 let mut fastcgi_script_exts = Vec::new();
87
88 let fastcgi_script_exts_yaml = &config["fcgiScriptExtensions"];
89 if let Some(fastcgi_script_exts_obtained) = fastcgi_script_exts_yaml.as_vec() {
90 for fastcgi_script_ext_yaml in fastcgi_script_exts_obtained.iter() {
91 if let Some(fastcgi_script_ext) = fastcgi_script_ext_yaml.as_str() {
92 fastcgi_script_exts.push(fastcgi_script_ext);
93 }
94 }
95 }
96
97 let mut fastcgi_to = "tcp://localhost:4000/";
98 let fastcgi_to_yaml = &config["fcgiTo"];
99 if let Some(fastcgi_to_obtained) = fastcgi_to_yaml.as_str() {
100 fastcgi_to = fastcgi_to_obtained;
101 }
102
103 let mut fastcgi_path = None;
104 if let Some(fastcgi_path_obtained) = config["fcgiPath"].as_str() {
105 fastcgi_path = Some(fastcgi_path_obtained.to_string());
106 }
107
108 let hyper_request = request.get_hyper_request();
109
110 let request_path = hyper_request.uri().path();
111 let mut request_path_bytes = request_path.bytes();
112 if request_path_bytes.len() < 1 || request_path_bytes.nth(0) != Some(b'/') {
113 return Ok(
114 ResponseData::builder(request)
115 .status(StatusCode::BAD_REQUEST)
116 .build(),
117 );
118 }
119
120 let mut execute_pathbuf = None;
121 let mut execute_path_info = None;
122 let mut wwwroot_detected = None;
123
124 if let Some(fastcgi_path) = fastcgi_path {
125 let mut canonical_fastcgi_path: &str = &fastcgi_path;
126 if canonical_fastcgi_path.bytes().last() == Some(b'/') {
127 canonical_fastcgi_path = &canonical_fastcgi_path[..(canonical_fastcgi_path.len() - 1)];
128 }
129
130 let request_path_with_slashes = match request_path == canonical_fastcgi_path {
131 true => format!("{}/", request_path),
132 false => request_path.to_string(),
133 };
134 if let Some(stripped_request_path) =
135 request_path_with_slashes.strip_prefix(canonical_fastcgi_path)
136 {
137 let wwwroot_yaml = &config["wwwroot"];
138 let wwwroot = wwwroot_yaml.as_str().unwrap_or("/nonexistent");
139
140 let wwwroot_unknown = PathBuf::from(wwwroot);
141 let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
142 true => wwwroot_unknown,
143 false => match fs::canonicalize(&wwwroot_unknown).await {
144 Ok(pathbuf) => pathbuf,
145 Err(_) => wwwroot_unknown,
146 },
147 };
148 wwwroot_detected = Some(wwwroot_pathbuf.clone());
149 let wwwroot = wwwroot_pathbuf.as_path();
150
151 let mut relative_path = &request_path[1..];
152 while relative_path.as_bytes().first().copied() == Some(b'/') {
153 relative_path = &relative_path[1..];
154 }
155
156 let decoded_relative_path = match urlencoding::decode(relative_path) {
157 Ok(path) => path.to_string(),
158 Err(_) => {
159 return Ok(
160 ResponseData::builder(request)
161 .status(StatusCode::BAD_REQUEST)
162 .build(),
163 );
164 }
165 };
166
167 let joined_pathbuf = wwwroot.join(decoded_relative_path);
168 execute_pathbuf = Some(joined_pathbuf);
169 execute_path_info = stripped_request_path
170 .strip_prefix("/")
171 .map(|s| s.to_string());
172 }
173 }
174
175 if execute_pathbuf.is_none() {
176 if let Some(wwwroot) = config["wwwroot"].as_str() {
177 let cache_key = format!(
178 "{}{}{}",
179 match config["ip"].as_str() {
180 Some(ip) => format!("{}-", ip),
181 None => String::from(""),
182 },
183 match config["domain"].as_str() {
184 Some(domain) => format!("{}-", domain),
185 None => String::from(""),
186 },
187 request_path
188 );
189
190 let wwwroot_unknown = PathBuf::from(wwwroot);
191 let wwwroot_pathbuf = match wwwroot_unknown.as_path().is_absolute() {
192 true => wwwroot_unknown,
193 false => match fs::canonicalize(&wwwroot_unknown).await {
194 Ok(pathbuf) => pathbuf,
195 Err(_) => wwwroot_unknown,
196 },
197 };
198 wwwroot_detected = Some(wwwroot_pathbuf.clone());
199 let wwwroot = wwwroot_pathbuf.as_path();
200
201 let read_rwlock = self.path_cache.read().await;
202 let (execute_pathbuf_got, execute_path_info_got) = match read_rwlock.get(&cache_key) {
203 Some(data) => {
204 drop(read_rwlock);
205 data
206 }
207 None => {
208 drop(read_rwlock);
209 let mut relative_path = &request_path[1..];
210 while relative_path.as_bytes().first().copied() == Some(b'/') {
211 relative_path = &relative_path[1..];
212 }
213
214 let decoded_relative_path = match urlencoding::decode(relative_path) {
215 Ok(path) => path.to_string(),
216 Err(_) => {
217 return Ok(
218 ResponseData::builder(request)
219 .status(StatusCode::BAD_REQUEST)
220 .build(),
221 );
222 }
223 };
224
225 let joined_pathbuf = wwwroot.join(decoded_relative_path);
226 let mut execute_pathbuf: Option<PathBuf> = None;
227 let mut execute_path_info: Option<String> = None;
228
229 match fs::metadata(&joined_pathbuf).await {
230 Ok(metadata) => {
231 if metadata.is_file() {
232 let contained_extension = joined_pathbuf
233 .extension()
234 .map(|a| format!(".{}", a.to_string_lossy()));
235 if let Some(contained_extension) = contained_extension {
236 if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
237 execute_pathbuf = Some(joined_pathbuf);
238 }
239 }
240 } else if metadata.is_dir() {
241 let indexes = vec!["index.php", "index.cgi"];
242 for index in indexes {
243 let temp_joined_pathbuf = joined_pathbuf.join(index);
244 match fs::metadata(&temp_joined_pathbuf).await {
245 Ok(temp_metadata) => {
246 if temp_metadata.is_file() {
247 let contained_extension = temp_joined_pathbuf
248 .extension()
249 .map(|a| format!(".{}", a.to_string_lossy()));
250 if let Some(contained_extension) = contained_extension {
251 if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
252 execute_pathbuf = Some(temp_joined_pathbuf);
253 break;
254 }
255 }
256 }
257 }
258 Err(_) => continue,
259 };
260 }
261 }
262 }
263 Err(err) => {
264 if err.kind() == tokio::io::ErrorKind::NotADirectory {
265 let mut temp_pathbuf = joined_pathbuf.clone();
267 loop {
268 if !temp_pathbuf.pop() {
269 break;
270 }
271 match fs::metadata(&temp_pathbuf).await {
272 Ok(metadata) => {
273 if metadata.is_file() {
274 let temp_path = temp_pathbuf.as_path();
275 if !temp_path.starts_with(wwwroot) {
276 break;
278 }
279 let path_info = match joined_pathbuf.as_path().strip_prefix(temp_path) {
280 Ok(path) => {
281 let path = path.to_string_lossy().to_string();
282 Some(match cfg!(windows) {
283 true => path.replace("\\", "/"),
284 false => path,
285 })
286 }
287 Err(_) => None,
288 };
289 let mut request_path_normalized = match cfg!(windows) {
290 true => request_path.to_lowercase(),
291 false => request_path.to_string(),
292 };
293 while request_path_normalized.contains("//") {
294 request_path_normalized = request_path_normalized.replace("//", "/");
295 }
296 if request_path_normalized == "/cgi-bin"
297 || request_path_normalized.starts_with("/cgi-bin/")
298 {
299 execute_pathbuf = Some(temp_pathbuf);
300 execute_path_info = path_info;
301 break;
302 } else {
303 let contained_extension = temp_pathbuf
304 .extension()
305 .map(|a| format!(".{}", a.to_string_lossy()));
306 if let Some(contained_extension) = contained_extension {
307 if fastcgi_script_exts.contains(&(&contained_extension as &str)) {
308 execute_pathbuf = Some(temp_pathbuf);
309 execute_path_info = path_info;
310 break;
311 }
312 }
313 }
314 } else {
315 break;
316 }
317 }
318 Err(err) => match err.kind() {
319 tokio::io::ErrorKind::NotADirectory => (),
320 _ => break,
321 },
322 };
323 }
324 }
325 }
326 };
327 let data = (execute_pathbuf, execute_path_info);
328
329 let mut write_rwlock = self.path_cache.write().await;
330 write_rwlock.cleanup();
331 write_rwlock.insert(cache_key, data.clone());
332 drop(write_rwlock);
333 data
334 }
335 };
336
337 execute_pathbuf = execute_pathbuf_got;
338 execute_path_info = execute_path_info_got;
339 }
340 }
341
342 if let Some(execute_pathbuf) = execute_pathbuf {
343 if let Some(wwwroot_detected) = wwwroot_detected {
344 return execute_fastcgi_with_environment_variables(
345 request,
346 socket_data,
347 error_logger,
348 wwwroot_detected.as_path(),
349 execute_pathbuf,
350 execute_path_info,
351 config["serverAdministratorEmail"].as_str(),
352 fastcgi_to,
353 )
354 .await;
355 }
356 }
357
358 Ok(ResponseData::builder(request).build())
359 })
360 .await
361 }
362
363 async fn proxy_request_handler(
364 &mut self,
365 request: RequestData,
366 _config: &ServerConfig,
367 _socket_data: &SocketData,
368 _error_logger: &ErrorLogger,
369 ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
370 Ok(ResponseData::builder(request).build())
371 }
372
373 async fn response_modifying_handler(
374 &mut self,
375 response: HyperResponse,
376 ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
377 Ok(response)
378 }
379
380 async fn proxy_response_modifying_handler(
381 &mut self,
382 response: HyperResponse,
383 ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
384 Ok(response)
385 }
386
387 async fn connect_proxy_request_handler(
388 &mut self,
389 _upgraded_request: HyperUpgraded,
390 _connect_address: &str,
391 _config: &ServerConfig,
392 _socket_data: &SocketData,
393 _error_logger: &ErrorLogger,
394 ) -> Result<(), Box<dyn Error + Send + Sync>> {
395 Ok(())
396 }
397
398 fn does_connect_proxy_requests(&mut self) -> bool {
399 false
400 }
401
402 async fn websocket_request_handler(
403 &mut self,
404 _websocket: HyperWebsocket,
405 _uri: &hyper::Uri,
406 _headers: &hyper::HeaderMap,
407 _config: &ServerConfig,
408 _socket_data: &SocketData,
409 _error_logger: &ErrorLogger,
410 ) -> Result<(), Box<dyn Error + Send + Sync>> {
411 Ok(())
412 }
413
414 fn does_websocket_requests(&mut self, _config: &ServerConfig, _socket_data: &SocketData) -> bool {
415 false
416 }
417}
418
419#[allow(clippy::too_many_arguments)]
420async fn execute_fastcgi_with_environment_variables(
421 request: RequestData,
422 socket_data: &SocketData,
423 error_logger: &ErrorLogger,
424 wwwroot: &Path,
425 execute_pathbuf: PathBuf,
426 path_info: Option<String>,
427 server_administrator_email: Option<&str>,
428 fastcgi_to: &str,
429) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
430 let mut environment_variables: LinkedHashMap<String, String> = LinkedHashMap::new();
431
432 let hyper_request = request.get_hyper_request();
433 let original_request_uri = request.get_original_url().unwrap_or(hyper_request.uri());
434
435 if let Some(auth_user) = request.get_auth_user() {
436 if let Some(authorization) = hyper_request.headers().get(header::AUTHORIZATION) {
437 let authorization_value = String::from_utf8_lossy(authorization.as_bytes()).to_string();
438 let mut authorization_value_split = authorization_value.split(" ");
439 if let Some(authorization_type) = authorization_value_split.next() {
440 environment_variables.insert("AUTH_TYPE".to_string(), authorization_type.to_string());
441 }
442 }
443 environment_variables.insert("REMOTE_USER".to_string(), auth_user.to_string());
444 }
445
446 environment_variables.insert(
447 "QUERY_STRING".to_string(),
448 match hyper_request.uri().query() {
449 Some(query) => query.to_string(),
450 None => "".to_string(),
451 },
452 );
453
454 environment_variables.insert("SERVER_SOFTWARE".to_string(), SERVER_SOFTWARE.to_string());
455 environment_variables.insert(
456 "SERVER_PROTOCOL".to_string(),
457 match hyper_request.version() {
458 hyper::Version::HTTP_09 => "HTTP/0.9".to_string(),
459 hyper::Version::HTTP_10 => "HTTP/1.0".to_string(),
460 hyper::Version::HTTP_11 => "HTTP/1.1".to_string(),
461 hyper::Version::HTTP_2 => "HTTP/2.0".to_string(),
462 hyper::Version::HTTP_3 => "HTTP/3.0".to_string(),
463 _ => "HTTP/Unknown".to_string(),
464 },
465 );
466 environment_variables.insert(
467 "SERVER_PORT".to_string(),
468 socket_data.local_addr.port().to_string(),
469 );
470 environment_variables.insert(
471 "SERVER_ADDR".to_string(),
472 socket_data.local_addr.ip().to_canonical().to_string(),
473 );
474 if let Some(server_administrator_email) = server_administrator_email {
475 environment_variables.insert(
476 "SERVER_ADMIN".to_string(),
477 server_administrator_email.to_string(),
478 );
479 }
480 if let Some(host) = hyper_request.headers().get(header::HOST) {
481 environment_variables.insert(
482 "SERVER_NAME".to_string(),
483 String::from_utf8_lossy(host.as_bytes()).to_string(),
484 );
485 }
486
487 environment_variables.insert(
488 "DOCUMENT_ROOT".to_string(),
489 wwwroot.to_string_lossy().to_string(),
490 );
491 environment_variables.insert(
492 "PATH_INFO".to_string(),
493 match &path_info {
494 Some(path_info) => format!("/{}", path_info),
495 None => "".to_string(),
496 },
497 );
498 environment_variables.insert(
499 "PATH_TRANSLATED".to_string(),
500 match &path_info {
501 Some(path_info) => {
502 let mut path_translated = execute_pathbuf.clone();
503 path_translated.push(path_info);
504 path_translated.to_string_lossy().to_string()
505 }
506 None => "".to_string(),
507 },
508 );
509 environment_variables.insert(
510 "REQUEST_METHOD".to_string(),
511 hyper_request.method().to_string(),
512 );
513 environment_variables.insert("GATEWAY_INTERFACE".to_string(), "CGI/1.1".to_string());
514 environment_variables.insert(
515 "REQUEST_URI".to_string(),
516 format!(
517 "{}{}",
518 original_request_uri.path(),
519 match original_request_uri.query() {
520 Some(query) => format!("?{}", query),
521 None => String::from(""),
522 }
523 ),
524 );
525
526 environment_variables.insert(
527 "REMOTE_PORT".to_string(),
528 socket_data.remote_addr.port().to_string(),
529 );
530 environment_variables.insert(
531 "REMOTE_ADDR".to_string(),
532 socket_data.remote_addr.ip().to_canonical().to_string(),
533 );
534
535 environment_variables.insert(
536 "SCRIPT_FILENAME".to_string(),
537 execute_pathbuf.to_string_lossy().to_string(),
538 );
539 if let Ok(script_path) = execute_pathbuf.as_path().strip_prefix(wwwroot) {
540 environment_variables.insert(
541 "SCRIPT_NAME".to_string(),
542 format!(
543 "/{}",
544 match cfg!(windows) {
545 true => script_path.to_string_lossy().to_string().replace("\\", "/"),
546 false => script_path.to_string_lossy().to_string(),
547 }
548 ),
549 );
550 }
551
552 if socket_data.encrypted {
553 environment_variables.insert("HTTPS".to_string(), "ON".to_string());
554 }
555
556 let mut content_length_set = false;
557 for (header_name, header_value) in hyper_request.headers().iter() {
558 let env_header_name = match *header_name {
559 header::CONTENT_LENGTH => {
560 content_length_set = true;
561 "CONTENT_LENGTH".to_string()
562 }
563 header::CONTENT_TYPE => "CONTENT_TYPE".to_string(),
564 _ => {
565 let mut result = String::new();
566
567 result.push_str("HTTP_");
568
569 for c in header_name.as_str().to_uppercase().chars() {
570 if c.is_alphanumeric() {
571 result.push(c);
572 } else {
573 result.push('_');
574 }
575 }
576
577 result
578 }
579 };
580 if environment_variables.contains_key(&env_header_name) {
581 let value = environment_variables.get_mut(&env_header_name);
582 if let Some(value) = value {
583 if env_header_name == "HTTP_COOKIE" {
584 value.push_str("; ");
585 } else {
586 value.push_str(", ");
588 }
589 value.push_str(String::from_utf8_lossy(header_value.as_bytes()).as_ref());
590 } else {
591 environment_variables.insert(
592 env_header_name,
593 String::from_utf8_lossy(header_value.as_bytes()).to_string(),
594 );
595 }
596 } else {
597 environment_variables.insert(
598 env_header_name,
599 String::from_utf8_lossy(header_value.as_bytes()).to_string(),
600 );
601 }
602 }
603
604 if !content_length_set {
605 environment_variables.insert("CONTENT_LENGTH".to_string(), "0".to_string());
606 }
607
608 let (hyper_request, _, _, _) = request.into_parts();
609
610 execute_fastcgi(
611 hyper_request,
612 error_logger,
613 fastcgi_to,
614 environment_variables,
615 )
616 .await
617}
618
619async fn execute_fastcgi(
620 hyper_request: HyperRequest,
621 error_logger: &ErrorLogger,
622 fastcgi_to: &str,
623 mut environment_variables: LinkedHashMap<String, String>,
624) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
625 let (_, body) = hyper_request.into_parts();
626
627 for (key, value) in env::vars_os() {
629 let key_string = key.to_string_lossy().to_string();
630 let value_string = value.to_string_lossy().to_string();
631 environment_variables
632 .entry(key_string)
633 .or_insert(value_string);
634 }
635
636 let fastcgi_to_fixed = if let Some(stripped) = fastcgi_to.strip_prefix("unix:///") {
637 &format!("unix://ignore/{}", stripped)
639 } else {
640 fastcgi_to
641 };
642
643 let fastcgi_to_url = fastcgi_to_fixed.parse::<hyper::Uri>()?;
644 let scheme_str = fastcgi_to_url.scheme_str();
645
646 let (socket_reader, mut socket_writer) = match scheme_str {
647 Some("tcp") => {
648 let host = match fastcgi_to_url.host() {
649 Some(host) => host,
650 None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the host"))?,
651 };
652
653 let port = match fastcgi_to_url.port_u16() {
654 Some(port) => port,
655 None => Err(anyhow::anyhow!("The FastCGI URL doesn't include the port"))?,
656 };
657
658 let addr = format!("{}:{}", host, port);
659
660 match connect_tcp(&addr).await {
661 Ok(data) => data,
662 Err(err) => match err.kind() {
663 tokio::io::ErrorKind::ConnectionRefused
664 | tokio::io::ErrorKind::NotFound
665 | tokio::io::ErrorKind::HostUnreachable => {
666 error_logger
667 .log(&format!("Service unavailable: {}", err))
668 .await;
669 return Ok(
670 ResponseData::builder_without_request()
671 .status(StatusCode::SERVICE_UNAVAILABLE)
672 .build(),
673 );
674 }
675 _ => Err(err)?,
676 },
677 }
678 }
679 Some("unix") => {
680 let path = fastcgi_to_url.path();
681 match connect_unix(path).await {
682 Ok(data) => data,
683 Err(err) => match err.kind() {
684 tokio::io::ErrorKind::ConnectionRefused
685 | tokio::io::ErrorKind::NotFound
686 | tokio::io::ErrorKind::HostUnreachable => {
687 error_logger
688 .log(&format!("Service unavailable: {}", err))
689 .await;
690 return Ok(
691 ResponseData::builder_without_request()
692 .status(StatusCode::SERVICE_UNAVAILABLE)
693 .build(),
694 );
695 }
696 _ => Err(err)?,
697 },
698 }
699 }
700 _ => Err(anyhow::anyhow!(
701 "Only HTTP and HTTPS reverse proxy URLs are supported."
702 ))?,
703 };
704
705 let begin_request_packet = construct_fastcgi_record(1, 1, &[0, 1, 0, 0, 0, 0, 0, 0]);
708 socket_writer.write_all(&begin_request_packet).await?;
709
710 let mut environment_variables_to_wrap = Vec::new();
712 for (key, value) in environment_variables.iter() {
713 let mut environment_variable =
714 construct_fastcgi_name_value_pair(key.as_bytes(), value.as_bytes());
715 environment_variables_to_wrap.append(&mut environment_variable);
716 }
717 if !environment_variables_to_wrap.is_empty() {
718 let mut offset = 0;
719 while offset < environment_variables_to_wrap.len() {
720 let chunk_size = std::cmp::min(65536, environment_variables_to_wrap.len() - offset);
721 let chunk = &environment_variables_to_wrap[offset..offset + chunk_size];
722
723 let params_packet = construct_fastcgi_record(4, 1, chunk);
725 socket_writer.write_all(¶ms_packet).await?;
726
727 offset += chunk_size;
728 }
729 }
730
731 let params_packet_terminating = construct_fastcgi_record(4, 1, &[]);
732 socket_writer.write_all(¶ms_packet_terminating).await?;
733
734 let cgi_stdin_reader = StreamReader::new(body.into_data_stream().map_err(std::io::Error::other));
735
736 type EitherStream = Either<Result<Bytes, std::io::Error>, Result<Bytes, std::io::Error>>;
738 let stdin = SinkWriter::new(FramedWrite::new(socket_writer, FcgiEncoder::new()));
739 let stdout_and_stderr = FramedRead::new(socket_reader, FcgiDecoder::new());
740 let (stdout_stream, stderr_stream) = stdout_and_stderr.split_by_map(|item| match item {
741 Ok(FcgiDecodedData::Stdout(bytes)) => EitherStream::Left(Ok(bytes)),
742 Ok(FcgiDecodedData::Stderr(bytes)) => EitherStream::Right(Ok(bytes)),
743 Err(err) => EitherStream::Left(Err(err)),
744 });
745 let stdout = StreamReader::new(stdout_stream);
746 let stderr = StreamReader::new(stderr_stream);
747
748 let mut cgi_response = CgiResponse::new(stdout);
749
750 let stdin_copy_future = Copier::with_zero_packet_writing(cgi_stdin_reader, stdin).copy();
751 let mut stdin_copy_future_pinned = Box::pin(stdin_copy_future);
752
753 let stderr_read_future = ReadToEndFuture::new(stderr);
754 let mut stderr_read_future_pinned = Box::pin(stderr_read_future);
755
756 let mut headers = [EMPTY_HEADER; 128];
757
758 let mut early_stdin_copied = false;
759
760 {
762 let mut head_obtained = false;
763 let stdout_parse_future = cgi_response.get_head();
764 tokio::pin!(stdout_parse_future);
765
766 tokio::select! {
768 biased;
769
770 result = &mut stdin_copy_future_pinned => {
771 early_stdin_copied = true;
772 result?;
773 },
774 obtained_head = &mut stdout_parse_future => {
775 let obtained_head = obtained_head?;
776 if !obtained_head.is_empty() {
777 httparse::parse_headers(obtained_head, &mut headers)?;
778 }
779 head_obtained = true;
780 },
781 result = &mut stderr_read_future_pinned => {
782 let stderr_vec = result?;
783 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
784 if !stderr_string.is_empty() {
785 error_logger
786 .log(&format!("There were CGI errors: {}", stderr_string))
787 .await;
788 }
789 return Ok(
790 ResponseData::builder_without_request()
791 .status(StatusCode::INTERNAL_SERVER_ERROR)
792 .build(),
793 );
794 },
795 }
796
797 if !head_obtained {
798 tokio::select! {
800 biased;
801
802 result = &mut stderr_read_future_pinned => {
803 let stderr_vec = result?;
804 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
805 if !stderr_string.is_empty() {
806 error_logger
807 .log(&format!("There were FastCGI errors: {}", stderr_string))
808 .await;
809 }
810 return Ok(
811 ResponseData::builder_without_request()
812 .status(StatusCode::INTERNAL_SERVER_ERROR)
813 .build(),
814 );
815 },
816 obtained_head = &mut stdout_parse_future => {
817 let obtained_head = obtained_head?;
818 if !obtained_head.is_empty() {
819 httparse::parse_headers(obtained_head, &mut headers)?;
820 }
821 }
822 }
823 }
824 }
825
826 let mut response_builder = Response::builder();
827 let mut status_code = 200;
828 for header in headers {
829 if header == EMPTY_HEADER {
830 break;
831 }
832 let mut is_status_header = false;
833 match &header.name.to_lowercase() as &str {
834 "location" => {
835 if !(300..=399).contains(&status_code) {
836 status_code = 302;
837 }
838 }
839 "status" => {
840 is_status_header = true;
841 let header_value_cow = String::from_utf8_lossy(header.value);
842 let mut split_status = header_value_cow.split(" ");
843 let first_part = split_status.next();
844 if let Some(first_part) = first_part {
845 if first_part.starts_with("HTTP/") {
846 let second_part = split_status.next();
847 if let Some(second_part) = second_part {
848 if let Ok(parsed_status_code) = second_part.parse::<u16>() {
849 status_code = parsed_status_code;
850 }
851 }
852 } else if let Ok(parsed_status_code) = first_part.parse::<u16>() {
853 status_code = parsed_status_code;
854 }
855 }
856 }
857 _ => (),
858 }
859 if !is_status_header {
860 response_builder = response_builder.header(header.name, header.value);
861 }
862 }
863
864 response_builder = response_builder.status(status_code);
865
866 let reader_stream = ReaderStream::new(cgi_response);
867 let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data));
868 let boxed_body = stream_body.boxed();
869
870 let response = response_builder.body(boxed_body)?;
871
872 let error_logger = error_logger.clone();
873
874 Ok(
875 ResponseData::builder_without_request()
876 .response(response)
877 .parallel_fn(async move {
878 let mut stdin_copied = early_stdin_copied;
879
880 if !stdin_copied {
881 tokio::select! {
882 biased;
883
884 _ = &mut stdin_copy_future_pinned => {
885 stdin_copied = true;
886 },
887 result = &mut stderr_read_future_pinned => {
888 let stderr_vec = result.unwrap_or(vec![]);
889 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
890 if !stderr_string.is_empty() {
891 error_logger
892 .log(&format!("There were FastCGI errors: {}", stderr_string))
893 .await;
894 }
895 },
896 }
897 }
898
899 if stdin_copied {
900 let stderr_vec = stderr_read_future_pinned.await.unwrap_or(vec![]);
901 let stderr_string = String::from_utf8_lossy(stderr_vec.as_slice()).to_string();
902 if !stderr_string.is_empty() {
903 error_logger
904 .log(&format!("There were FastCGI errors: {}", stderr_string))
905 .await;
906 }
907 } else {
908 stdin_copy_future_pinned.await.unwrap_or_default();
909 }
910 })
911 .build(),
912 )
913}
914
915async fn connect_tcp(
916 addr: &str,
917) -> Result<
918 (
919 Box<dyn AsyncRead + Send + Sync + Unpin>,
920 Box<dyn AsyncWrite + Send + Sync + Unpin>,
921 ),
922 tokio::io::Error,
923> {
924 let socket = TcpStream::connect(addr).await?;
925 socket.set_nodelay(true)?;
926
927 let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
928 Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
929}
930
931#[allow(dead_code)]
932#[cfg(unix)]
933async fn connect_unix(
934 path: &str,
935) -> Result<
936 (
937 Box<dyn AsyncRead + Send + Sync + Unpin>,
938 Box<dyn AsyncWrite + Send + Sync + Unpin>,
939 ),
940 tokio::io::Error,
941> {
942 use tokio::net::UnixStream;
943
944 let socket = UnixStream::connect(path).await?;
945
946 let (socket_reader_set, socket_writer_set) = tokio::io::split(socket);
947 Ok((Box::new(socket_reader_set), Box::new(socket_writer_set)))
948}
949
950#[allow(dead_code)]
951#[cfg(not(unix))]
952async fn connect_unix(
953 _path: &str,
954) -> Result<
955 (
956 Box<dyn AsyncRead + Send + Sync + Unpin>,
957 Box<dyn AsyncWrite + Send + Sync + Unpin>,
958 ),
959 tokio::io::Error,
960> {
961 Err(tokio::io::Error::new(
962 tokio::io::ErrorKind::Unsupported,
963 "Unix sockets are not supports on non-Unix platforms.",
964 ))
965}