ferron/optional_modules/
rproxy.rs

1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::ferron_common::{
8  ErrorLogger, HyperUpgraded, RequestData, ResponseData, ServerConfig, ServerModule,
9  ServerModuleHandlers, SocketData,
10};
11use crate::ferron_common::{HyperResponse, WithRuntime};
12use async_trait::async_trait;
13use futures_util::{SinkExt, StreamExt};
14use http::uri::{PathAndQuery, Scheme};
15use http_body_util::combinators::BoxBody;
16use http_body_util::BodyExt;
17use hyper::body::Bytes;
18use hyper::client::conn::http1::SendRequest;
19use hyper::{header, Request, StatusCode, Uri};
20use hyper_tungstenite::HyperWebsocket;
21use hyper_util::rt::TokioIo;
22use rustls::pki_types::ServerName;
23use rustls::RootCertStore;
24use rustls_native_certs::load_native_certs;
25use tokio::io::{AsyncRead, AsyncWrite};
26use tokio::net::TcpStream;
27use tokio::runtime::Handle;
28use tokio::sync::RwLock;
29use tokio_rustls::TlsConnector;
30use tokio_tungstenite::Connector;
31
32use crate::ferron_util::no_server_verifier::NoServerVerifier;
33use crate::ferron_util::ttl_cache::TtlCache;
34
35const DEFAULT_CONCURRENT_CONNECTIONS_PER_HOST: u32 = 32;
36
37pub fn server_module_init(
38  config: &ServerConfig,
39) -> Result<Box<dyn ServerModule + Send + Sync>, Box<dyn Error + Send + Sync>> {
40  let mut roots: RootCertStore = RootCertStore::empty();
41  let certs_result = load_native_certs();
42  if !certs_result.errors.is_empty() {
43    Err(anyhow::anyhow!(format!(
44      "Couldn't load the native certificate store: {}",
45      certs_result.errors[0]
46    )))?
47  }
48  let certs = certs_result.certs;
49
50  for cert in certs {
51    match roots.add(cert) {
52      Ok(_) => (),
53      Err(err) => Err(anyhow::anyhow!(format!(
54        "Couldn't add a certificate to the certificate store: {}",
55        err
56      )))?,
57    }
58  }
59
60  let mut connections_vec = Vec::new();
61  for _ in 0..DEFAULT_CONCURRENT_CONNECTIONS_PER_HOST {
62    connections_vec.push(RwLock::new(HashMap::new()));
63  }
64  Ok(Box::new(ReverseProxyModule::new(
65    Arc::new(roots),
66    Arc::new(connections_vec),
67    Arc::new(RwLock::new(TtlCache::new(Duration::from_millis(
68      config["global"]["loadBalancerHealthCheckWindow"]
69        .as_i64()
70        .unwrap_or(5000) as u64,
71    )))),
72  )))
73}
74
75#[allow(clippy::type_complexity)]
76struct ReverseProxyModule {
77  roots: Arc<RootCertStore>,
78  connections: Arc<Vec<RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>>>,
79  failed_backends: Arc<RwLock<TtlCache<String, u64>>>,
80}
81
82impl ReverseProxyModule {
83  #[allow(clippy::type_complexity)]
84  fn new(
85    roots: Arc<RootCertStore>,
86    connections: Arc<Vec<RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>>>,
87    failed_backends: Arc<RwLock<TtlCache<String, u64>>>,
88  ) -> Self {
89    Self {
90      roots,
91      connections,
92      failed_backends,
93    }
94  }
95}
96
97impl ServerModule for ReverseProxyModule {
98  fn get_handlers(&self, handle: Handle) -> Box<dyn ServerModuleHandlers + Send> {
99    Box::new(ReverseProxyModuleHandlers {
100      roots: self.roots.clone(),
101      connections: self.connections.clone(),
102      failed_backends: self.failed_backends.clone(),
103      handle,
104    })
105  }
106}
107
108#[allow(clippy::type_complexity)]
109struct ReverseProxyModuleHandlers {
110  handle: Handle,
111  roots: Arc<RootCertStore>,
112  connections: Arc<Vec<RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>>>,
113  failed_backends: Arc<RwLock<TtlCache<String, u64>>>,
114}
115
116#[async_trait]
117impl ServerModuleHandlers for ReverseProxyModuleHandlers {
118  async fn request_handler(
119    &mut self,
120    request: RequestData,
121    config: &ServerConfig,
122    socket_data: &SocketData,
123    error_logger: &ErrorLogger,
124  ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
125    WithRuntime::new(self.handle.clone(), async move {
126      let enable_health_check = config["enableLoadBalancerHealthCheck"]
127        .as_bool()
128        .unwrap_or(false);
129      let health_check_max_fails = config["loadBalancerHealthCheckMaximumFails"]
130        .as_i64()
131        .unwrap_or(3) as u64;
132      let disable_certificate_verification = config["disableProxyCertificateVerification"]
133        .as_bool()
134        .unwrap_or(false);
135      if let Some(proxy_to) = determine_proxy_to(
136        config,
137        socket_data.encrypted,
138        &self.failed_backends,
139        enable_health_check,
140        health_check_max_fails,
141      )
142      .await
143      {
144        let (hyper_request, _auth_user, _original_url) = request.into_parts();
145        let (mut hyper_request_parts, request_body) = hyper_request.into_parts();
146
147        let proxy_request_url = proxy_to.parse::<hyper::Uri>()?;
148        let scheme_str = proxy_request_url.scheme_str();
149        let mut encrypted = false;
150
151        match scheme_str {
152          Some("http") => {
153            encrypted = false;
154          }
155          Some("https") => {
156            encrypted = true;
157          }
158          _ => Err(anyhow::anyhow!(
159            "Only HTTP and HTTPS reverse proxy URLs are supported."
160          ))?,
161        };
162
163        let host = match proxy_request_url.host() {
164          Some(host) => host,
165          None => Err(anyhow::anyhow!(
166            "The reverse proxy URL doesn't include the host"
167          ))?,
168        };
169
170        let port = proxy_request_url.port_u16().unwrap_or(match scheme_str {
171          Some("http") => 80,
172          Some("https") => 443,
173          _ => 80,
174        });
175
176        let addr = format!("{}:{}", host, port);
177        let authority = proxy_request_url.authority().cloned();
178
179        let hyper_request_path = hyper_request_parts.uri.path();
180
181        let path = match hyper_request_path.as_bytes().first() {
182          Some(b'/') => {
183            let mut proxy_request_path = proxy_request_url.path();
184            while proxy_request_path.as_bytes().last().copied() == Some(b'/') {
185              proxy_request_path = &proxy_request_path[..(proxy_request_path.len() - 1)];
186            }
187            format!("{}{}", proxy_request_path, hyper_request_path)
188          }
189          _ => hyper_request_path.to_string(),
190        };
191
192        hyper_request_parts.uri = Uri::from_str(&format!(
193          "{}{}",
194          path,
195          match hyper_request_parts.uri.query() {
196            Some(query) => format!("?{}", query),
197            None => "".to_string(),
198          }
199        ))?;
200
201        let original_host = hyper_request_parts.headers.get(header::HOST).cloned();
202
203        // Host header for host identification
204        match authority {
205          Some(authority) => {
206            hyper_request_parts
207              .headers
208              .insert(header::HOST, authority.to_string().parse()?);
209          }
210          None => {
211            hyper_request_parts.headers.remove(header::HOST);
212          }
213        }
214
215        // Connection header to enable HTTP/1.1 keep-alive
216        hyper_request_parts
217          .headers
218          .insert(header::CONNECTION, "keep-alive".parse()?);
219
220        // X-Forwarded-* headers to send the client's data to a server that's behind the reverse proxy
221        hyper_request_parts.headers.insert(
222          "x-forwarded-for",
223          socket_data
224            .remote_addr
225            .ip()
226            .to_canonical()
227            .to_string()
228            .parse()?,
229        );
230
231        if socket_data.encrypted {
232          hyper_request_parts
233            .headers
234            .insert("x-forwarded-proto", "https".parse()?);
235        } else {
236          hyper_request_parts
237            .headers
238            .insert("x-forwarded-proto", "http".parse()?);
239        }
240
241        if let Some(original_host) = original_host {
242          hyper_request_parts
243            .headers
244            .insert("x-forwarded-host", original_host);
245        }
246
247        let proxy_request = Request::from_parts(hyper_request_parts, request_body);
248
249        let connections = &self.connections[rand::random_range(..self.connections.len())];
250
251        let rwlock_read = connections.read().await;
252        let sender_read_option = rwlock_read.get(&addr);
253
254        if let Some(sender_read) = sender_read_option {
255          if !sender_read.is_closed() {
256            drop(rwlock_read);
257            let mut rwlock_write = connections.write().await;
258            let sender_option = rwlock_write.get_mut(&addr);
259
260            if let Some(sender) = sender_option {
261              if !sender.is_closed() {
262                let result = http_proxy_kept_alive(sender, proxy_request, error_logger).await;
263                drop(rwlock_write);
264                return result;
265              } else {
266                drop(rwlock_write);
267              }
268            } else {
269              drop(rwlock_write);
270            }
271          } else {
272            drop(rwlock_read);
273          }
274        } else {
275          drop(rwlock_read);
276        }
277
278        let stream = match TcpStream::connect(&addr).await {
279          Ok(stream) => stream,
280          Err(err) => {
281            if enable_health_check {
282              let mut failed_backends_write = self.failed_backends.write().await;
283              let proxy_to = proxy_to.clone();
284              let failed_attempts = failed_backends_write.get(&proxy_to);
285              failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
286            }
287            match err.kind() {
288              tokio::io::ErrorKind::ConnectionRefused
289              | tokio::io::ErrorKind::NotFound
290              | tokio::io::ErrorKind::HostUnreachable => {
291                error_logger
292                  .log(&format!("Service unavailable: {}", err))
293                  .await;
294                return Ok(
295                  ResponseData::builder_without_request()
296                    .status(StatusCode::SERVICE_UNAVAILABLE)
297                    .build(),
298                );
299              }
300              tokio::io::ErrorKind::TimedOut => {
301                error_logger.log(&format!("Gateway timeout: {}", err)).await;
302                return Ok(
303                  ResponseData::builder_without_request()
304                    .status(StatusCode::GATEWAY_TIMEOUT)
305                    .build(),
306                );
307              }
308              _ => {
309                error_logger.log(&format!("Bad gateway: {}", err)).await;
310                return Ok(
311                  ResponseData::builder_without_request()
312                    .status(StatusCode::BAD_GATEWAY)
313                    .build(),
314                );
315              }
316            };
317          }
318        };
319
320        match stream.set_nodelay(true) {
321          Ok(_) => (),
322          Err(err) => {
323            if enable_health_check {
324              let mut failed_backends_write = self.failed_backends.write().await;
325              let proxy_to = proxy_to.clone();
326              let failed_attempts = failed_backends_write.get(&proxy_to);
327              failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
328            }
329            error_logger.log(&format!("Bad gateway: {}", err)).await;
330            return Ok(
331              ResponseData::builder_without_request()
332                .status(StatusCode::BAD_GATEWAY)
333                .build(),
334            );
335          }
336        };
337
338        let failed_backends_option_borrowed = if enable_health_check {
339          Some(&*self.failed_backends)
340        } else {
341          None
342        };
343
344        if !encrypted {
345          http_proxy(
346            connections,
347            addr,
348            stream,
349            proxy_request,
350            error_logger,
351            proxy_to,
352            failed_backends_option_borrowed,
353          )
354          .await
355        } else {
356          let tls_client_config = (if disable_certificate_verification {
357            rustls::ClientConfig::builder()
358              .dangerous()
359              .with_custom_certificate_verifier(Arc::new(NoServerVerifier::new()))
360          } else {
361            rustls::ClientConfig::builder().with_root_certificates(self.roots.clone())
362          })
363          .with_no_client_auth();
364          let connector = TlsConnector::from(Arc::new(tls_client_config));
365          let domain = ServerName::try_from(host)?.to_owned();
366
367          let tls_stream = match connector.connect(domain, stream).await {
368            Ok(stream) => stream,
369            Err(err) => {
370              if enable_health_check {
371                let mut failed_backends_write = self.failed_backends.write().await;
372                let proxy_to = proxy_to.clone();
373                let failed_attempts = failed_backends_write.get(&proxy_to);
374                failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
375              }
376              error_logger.log(&format!("Bad gateway: {}", err)).await;
377              return Ok(
378                ResponseData::builder_without_request()
379                  .status(StatusCode::BAD_GATEWAY)
380                  .build(),
381              );
382            }
383          };
384
385          http_proxy(
386            connections,
387            addr,
388            tls_stream,
389            proxy_request,
390            error_logger,
391            proxy_to,
392            failed_backends_option_borrowed,
393          )
394          .await
395        }
396      } else {
397        Ok(ResponseData::builder(request).build())
398      }
399    })
400    .await
401  }
402
403  async fn proxy_request_handler(
404    &mut self,
405    request: RequestData,
406    _config: &ServerConfig,
407    _socket_data: &SocketData,
408    _error_logger: &ErrorLogger,
409  ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
410    Ok(ResponseData::builder(request).build())
411  }
412
413  async fn response_modifying_handler(
414    &mut self,
415    response: HyperResponse,
416  ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
417    Ok(response)
418  }
419
420  async fn proxy_response_modifying_handler(
421    &mut self,
422    response: HyperResponse,
423  ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
424    Ok(response)
425  }
426
427  async fn connect_proxy_request_handler(
428    &mut self,
429    _upgraded_request: HyperUpgraded,
430    _connect_address: &str,
431    _config: &ServerConfig,
432    _socket_data: &SocketData,
433    _error_logger: &ErrorLogger,
434  ) -> Result<(), Box<dyn Error + Send + Sync>> {
435    Ok(())
436  }
437
438  fn does_connect_proxy_requests(&mut self) -> bool {
439    false
440  }
441
442  async fn websocket_request_handler(
443    &mut self,
444    websocket: HyperWebsocket,
445    uri: &hyper::Uri,
446    config: &ServerConfig,
447    socket_data: &SocketData,
448    error_logger: &ErrorLogger,
449  ) -> Result<(), Box<dyn Error + Send + Sync>> {
450    WithRuntime::new(self.handle.clone(), async move {
451      let enable_health_check = config["enableLoadBalancerHealthCheck"]
452        .as_bool()
453        .unwrap_or(false);
454      let health_check_max_fails = config["loadBalancerHealthCheckMaximumFails"]
455        .as_i64()
456        .unwrap_or(3) as u64;
457
458      let disable_certificate_verification = config["disableProxyCertificateVerification"]
459        .as_bool()
460        .unwrap_or(false);
461      if let Some(proxy_to) = determine_proxy_to(
462        config,
463        socket_data.encrypted,
464        &self.failed_backends,
465        enable_health_check,
466        health_check_max_fails,
467      )
468      .await
469      {
470        let proxy_request_url = proxy_to.parse::<hyper::Uri>()?;
471        let scheme_str = proxy_request_url.scheme_str();
472        let mut encrypted = false;
473
474        match scheme_str {
475          Some("http") => {
476            encrypted = false;
477          }
478          Some("https") => {
479            encrypted = true;
480          }
481          _ => Err(anyhow::anyhow!(
482            "Only HTTP and HTTPS reverse proxy URLs are supported."
483          ))?,
484        };
485
486        let request_path = uri.path();
487
488        let path = match request_path.as_bytes().first() {
489          Some(b'/') => {
490            let mut proxy_request_path = proxy_request_url.path();
491            while proxy_request_path.as_bytes().last().copied() == Some(b'/') {
492              proxy_request_path = &proxy_request_path[..(proxy_request_path.len() - 1)];
493            }
494            format!("{}{}", proxy_request_path, request_path)
495          }
496          _ => request_path.to_string(),
497        };
498
499        let mut proxy_request_url_parts = proxy_request_url.into_parts();
500        proxy_request_url_parts.scheme = if encrypted {
501          Some(Scheme::from_str("wss")?)
502        } else {
503          Some(Scheme::from_str("ws")?)
504        };
505        match proxy_request_url_parts.path_and_query {
506          Some(path_and_query) => {
507            let path_and_query_string = match path_and_query.query() {
508              Some(query) => {
509                format!("{}?{}", path, query)
510              }
511              None => path,
512            };
513            proxy_request_url_parts.path_and_query =
514              Some(PathAndQuery::from_str(&path_and_query_string)?);
515          }
516          None => {
517            proxy_request_url_parts.path_and_query = Some(PathAndQuery::from_str(&path)?);
518          }
519        };
520
521        let proxy_request_url = hyper::Uri::from_parts(proxy_request_url_parts)?;
522
523        let connector = if !encrypted {
524          Connector::Plain
525        } else {
526          Connector::Rustls(Arc::new(
527            (if disable_certificate_verification {
528              rustls::ClientConfig::builder()
529                .dangerous()
530                .with_custom_certificate_verifier(Arc::new(NoServerVerifier::new()))
531            } else {
532              rustls::ClientConfig::builder().with_root_certificates(self.roots.clone())
533            })
534            .with_no_client_auth(),
535          ))
536        };
537
538        let client_bi_stream = websocket.await?;
539
540        let (proxy_bi_stream, _) = match tokio_tungstenite::connect_async_tls_with_config(
541          proxy_request_url,
542          None,
543          true,
544          Some(connector),
545        )
546        .await
547        {
548          Ok(data) => data,
549          Err(err) => {
550            error_logger
551              .log(&format!("Cannot connect to WebSocket server: {}", err))
552              .await;
553            return Ok(());
554          }
555        };
556
557        let (mut client_sink, mut client_stream) = client_bi_stream.split();
558        let (mut proxy_sink, mut proxy_stream) = proxy_bi_stream.split();
559
560        let client_to_proxy = async {
561          while let Some(Ok(value)) = client_stream.next().await {
562            if proxy_sink.send(value).await.is_err() {
563              break;
564            }
565          }
566        };
567
568        let proxy_to_client = async {
569          while let Some(Ok(value)) = proxy_stream.next().await {
570            if client_sink.send(value).await.is_err() {
571              break;
572            }
573          }
574        };
575
576        tokio::pin!(client_to_proxy);
577        tokio::pin!(proxy_to_client);
578
579        let client_to_proxy_first;
580        tokio::select! {
581          _ = &mut client_to_proxy => {
582            client_to_proxy_first = true;
583          }
584          _ = &mut proxy_to_client => {
585            client_to_proxy_first = false;
586          }
587        }
588
589        if client_to_proxy_first {
590          proxy_to_client.await;
591        } else {
592          client_to_proxy.await;
593        }
594      }
595
596      Ok(())
597    })
598    .await
599  }
600
601  fn does_websocket_requests(&mut self, config: &ServerConfig, socket_data: &SocketData) -> bool {
602    if socket_data.encrypted {
603      let secure_proxy_to = &config["secureProxyTo"];
604      if secure_proxy_to.as_vec().is_some() || secure_proxy_to.as_str().is_some() {
605        return true;
606      }
607    }
608
609    let proxy_to = &config["proxyTo"];
610    proxy_to.as_vec().is_some() || proxy_to.as_str().is_some()
611  }
612}
613
614async fn determine_proxy_to(
615  config: &ServerConfig,
616  encrypted: bool,
617  failed_backends: &RwLock<TtlCache<String, u64>>,
618  enable_health_check: bool,
619  health_check_max_fails: u64,
620) -> Option<String> {
621  let mut proxy_to = None;
622  // When the array is supplied with non-string values, the reverse proxy may have undesirable behavior
623  // The "proxyTo" and "secureProxyTo" are validated though.
624
625  if encrypted {
626    let secure_proxy_to_yaml = &config["secureProxyTo"];
627    if let Some(secure_proxy_to_vector) = secure_proxy_to_yaml.as_vec() {
628      if enable_health_check {
629        let mut secure_proxy_to_vector = secure_proxy_to_vector.clone();
630        loop {
631          if !secure_proxy_to_vector.is_empty() {
632            let index = rand::random_range(..secure_proxy_to_vector.len());
633            if let Some(secure_proxy_to) = secure_proxy_to_vector[index].as_str() {
634              proxy_to = Some(secure_proxy_to.to_string());
635              let failed_backends_read = failed_backends.read().await;
636              let failed_backend_fails =
637                match failed_backends_read.get(&secure_proxy_to.to_string()) {
638                  Some(fails) => fails,
639                  None => break,
640                };
641              if failed_backend_fails > health_check_max_fails {
642                secure_proxy_to_vector.remove(index);
643              } else {
644                break;
645              }
646            }
647          } else {
648            break;
649          }
650        }
651      } else if !secure_proxy_to_vector.is_empty() {
652        if let Some(secure_proxy_to) =
653          secure_proxy_to_vector[rand::random_range(..secure_proxy_to_vector.len())].as_str()
654        {
655          proxy_to = Some(secure_proxy_to.to_string());
656        }
657      }
658    } else if let Some(secure_proxy_to) = secure_proxy_to_yaml.as_str() {
659      proxy_to = Some(secure_proxy_to.to_string());
660    }
661  }
662
663  if proxy_to.is_none() {
664    let proxy_to_yaml = &config["proxyTo"];
665    if let Some(proxy_to_vector) = proxy_to_yaml.as_vec() {
666      if enable_health_check {
667        let mut proxy_to_vector = proxy_to_vector.clone();
668        loop {
669          if !proxy_to_vector.is_empty() {
670            let index = rand::random_range(..proxy_to_vector.len());
671            if let Some(proxy_to_str) = proxy_to_vector[index].as_str() {
672              proxy_to = Some(proxy_to_str.to_string());
673              let failed_backends_read = failed_backends.read().await;
674              let failed_backend_fails = match failed_backends_read.get(&proxy_to_str.to_string()) {
675                Some(fails) => fails,
676                None => break,
677              };
678              if failed_backend_fails > health_check_max_fails {
679                proxy_to_vector.remove(index);
680              } else {
681                break;
682              }
683            }
684          } else {
685            break;
686          }
687        }
688      } else if !proxy_to_vector.is_empty() {
689        if let Some(proxy_to_str) =
690          proxy_to_vector[rand::random_range(..proxy_to_vector.len())].as_str()
691        {
692          proxy_to = Some(proxy_to_str.to_string());
693        }
694      }
695    } else if let Some(proxy_to_str) = proxy_to_yaml.as_str() {
696      proxy_to = Some(proxy_to_str.to_string());
697    }
698  }
699
700  proxy_to
701}
702
703async fn http_proxy(
704  connections: &RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>,
705  connect_addr: String,
706  stream: impl AsyncRead + AsyncWrite + Send + Unpin + 'static,
707  proxy_request: Request<BoxBody<Bytes, std::io::Error>>,
708  error_logger: &ErrorLogger,
709  proxy_to: String,
710  failed_backends: Option<&tokio::sync::RwLock<TtlCache<std::string::String, u64>>>,
711) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
712  let io = TokioIo::new(stream);
713
714  let (mut sender, conn) = match hyper::client::conn::http1::handshake(io).await {
715    Ok(data) => data,
716    Err(err) => {
717      if let Some(failed_backends) = failed_backends {
718        let mut failed_backends_write = failed_backends.write().await;
719        let failed_attempts = failed_backends_write.get(&proxy_to);
720        failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
721      }
722      error_logger.log(&format!("Bad gateway: {}", err)).await;
723      return Ok(
724        ResponseData::builder_without_request()
725          .status(StatusCode::BAD_GATEWAY)
726          .build(),
727      );
728    }
729  };
730
731  let send_request = sender.send_request(proxy_request);
732
733  let mut pinned_conn = Box::pin(conn);
734  tokio::pin!(send_request);
735
736  let response;
737
738  loop {
739    tokio::select! {
740      biased;
741
742       proxy_response = &mut send_request => {
743        let proxy_response = match proxy_response {
744          Ok(response) => response,
745          Err(err) => {
746            error_logger.log(&format!("Bad gateway: {}", err)).await;
747            return Ok(ResponseData::builder_without_request().status(StatusCode::BAD_GATEWAY).build());
748          }
749        };
750
751        response = ResponseData::builder_without_request()
752                  .response(proxy_response.map(|b| {
753                    b.map_err(|e| std::io::Error::other(e.to_string()))
754                      .boxed()
755                  }))
756                  .parallel_fn(async move {
757                    pinned_conn.await.unwrap_or_default();
758                  })
759                  .build();
760
761        break;
762      },
763      state = &mut pinned_conn => {
764        if state.is_err() {
765          error_logger.log("Bad gateway: incomplete response").await;
766          return Ok(ResponseData::builder_without_request().status(StatusCode::BAD_GATEWAY).build());
767        }
768      },
769    };
770  }
771
772  if !sender.is_closed() {
773    let mut rwlock_write = connections.write().await;
774    rwlock_write.insert(connect_addr, sender);
775    drop(rwlock_write);
776  }
777
778  Ok(response)
779}
780
781async fn http_proxy_kept_alive(
782  sender: &mut SendRequest<BoxBody<Bytes, std::io::Error>>,
783  proxy_request: Request<BoxBody<Bytes, std::io::Error>>,
784  error_logger: &ErrorLogger,
785) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
786  let proxy_response = match sender.send_request(proxy_request).await {
787    Ok(response) => response,
788    Err(err) => {
789      error_logger.log(&format!("Bad gateway: {}", err)).await;
790      return Ok(
791        ResponseData::builder_without_request()
792          .status(StatusCode::BAD_GATEWAY)
793          .build(),
794      );
795    }
796  };
797
798  let response = ResponseData::builder_without_request()
799    .response(proxy_response.map(|b| b.map_err(|e| std::io::Error::other(e.to_string())).boxed()))
800    .build();
801
802  Ok(response)
803}