1use std::collections::HashMap;
2use std::error::Error;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::ferron_common::{
8 ErrorLogger, HyperUpgraded, RequestData, ResponseData, ServerConfig, ServerModule,
9 ServerModuleHandlers, SocketData,
10};
11use crate::ferron_common::{HyperResponse, WithRuntime};
12use async_trait::async_trait;
13use futures_util::{SinkExt, StreamExt};
14use http::uri::{PathAndQuery, Scheme};
15use http_body_util::combinators::BoxBody;
16use http_body_util::BodyExt;
17use hyper::body::Bytes;
18use hyper::client::conn::http1::SendRequest;
19use hyper::{header, Request, StatusCode, Uri};
20use hyper_tungstenite::HyperWebsocket;
21use hyper_util::rt::TokioIo;
22use rustls::pki_types::ServerName;
23use rustls::RootCertStore;
24use rustls_native_certs::load_native_certs;
25use tokio::io::{AsyncRead, AsyncWrite};
26use tokio::net::TcpStream;
27use tokio::runtime::Handle;
28use tokio::sync::RwLock;
29use tokio_rustls::TlsConnector;
30use tokio_tungstenite::Connector;
31
32use crate::ferron_util::no_server_verifier::NoServerVerifier;
33use crate::ferron_util::ttl_cache::TtlCache;
34
35const DEFAULT_CONCURRENT_CONNECTIONS_PER_HOST: u32 = 32;
36
37pub fn server_module_init(
38 config: &ServerConfig,
39) -> Result<Box<dyn ServerModule + Send + Sync>, Box<dyn Error + Send + Sync>> {
40 let mut roots: RootCertStore = RootCertStore::empty();
41 let certs_result = load_native_certs();
42 if !certs_result.errors.is_empty() {
43 Err(anyhow::anyhow!(format!(
44 "Couldn't load the native certificate store: {}",
45 certs_result.errors[0]
46 )))?
47 }
48 let certs = certs_result.certs;
49
50 for cert in certs {
51 match roots.add(cert) {
52 Ok(_) => (),
53 Err(err) => Err(anyhow::anyhow!(format!(
54 "Couldn't add a certificate to the certificate store: {}",
55 err
56 )))?,
57 }
58 }
59
60 let mut connections_vec = Vec::new();
61 for _ in 0..DEFAULT_CONCURRENT_CONNECTIONS_PER_HOST {
62 connections_vec.push(RwLock::new(HashMap::new()));
63 }
64 Ok(Box::new(ReverseProxyModule::new(
65 Arc::new(roots),
66 Arc::new(connections_vec),
67 Arc::new(RwLock::new(TtlCache::new(Duration::from_millis(
68 config["global"]["loadBalancerHealthCheckWindow"]
69 .as_i64()
70 .unwrap_or(5000) as u64,
71 )))),
72 )))
73}
74
75#[allow(clippy::type_complexity)]
76struct ReverseProxyModule {
77 roots: Arc<RootCertStore>,
78 connections: Arc<Vec<RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>>>,
79 failed_backends: Arc<RwLock<TtlCache<String, u64>>>,
80}
81
82impl ReverseProxyModule {
83 #[allow(clippy::type_complexity)]
84 fn new(
85 roots: Arc<RootCertStore>,
86 connections: Arc<Vec<RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>>>,
87 failed_backends: Arc<RwLock<TtlCache<String, u64>>>,
88 ) -> Self {
89 Self {
90 roots,
91 connections,
92 failed_backends,
93 }
94 }
95}
96
97impl ServerModule for ReverseProxyModule {
98 fn get_handlers(&self, handle: Handle) -> Box<dyn ServerModuleHandlers + Send> {
99 Box::new(ReverseProxyModuleHandlers {
100 roots: self.roots.clone(),
101 connections: self.connections.clone(),
102 failed_backends: self.failed_backends.clone(),
103 handle,
104 })
105 }
106}
107
108#[allow(clippy::type_complexity)]
109struct ReverseProxyModuleHandlers {
110 handle: Handle,
111 roots: Arc<RootCertStore>,
112 connections: Arc<Vec<RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>>>,
113 failed_backends: Arc<RwLock<TtlCache<String, u64>>>,
114}
115
116#[async_trait]
117impl ServerModuleHandlers for ReverseProxyModuleHandlers {
118 async fn request_handler(
119 &mut self,
120 request: RequestData,
121 config: &ServerConfig,
122 socket_data: &SocketData,
123 error_logger: &ErrorLogger,
124 ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
125 WithRuntime::new(self.handle.clone(), async move {
126 let enable_health_check = config["enableLoadBalancerHealthCheck"]
127 .as_bool()
128 .unwrap_or(false);
129 let health_check_max_fails = config["loadBalancerHealthCheckMaximumFails"]
130 .as_i64()
131 .unwrap_or(3) as u64;
132 let disable_certificate_verification = config["disableProxyCertificateVerification"]
133 .as_bool()
134 .unwrap_or(false);
135 if let Some(proxy_to) = determine_proxy_to(
136 config,
137 socket_data.encrypted,
138 &self.failed_backends,
139 enable_health_check,
140 health_check_max_fails,
141 )
142 .await
143 {
144 let (hyper_request, _auth_user, _original_url) = request.into_parts();
145 let (mut hyper_request_parts, request_body) = hyper_request.into_parts();
146
147 let proxy_request_url = proxy_to.parse::<hyper::Uri>()?;
148 let scheme_str = proxy_request_url.scheme_str();
149 let mut encrypted = false;
150
151 match scheme_str {
152 Some("http") => {
153 encrypted = false;
154 }
155 Some("https") => {
156 encrypted = true;
157 }
158 _ => Err(anyhow::anyhow!(
159 "Only HTTP and HTTPS reverse proxy URLs are supported."
160 ))?,
161 };
162
163 let host = match proxy_request_url.host() {
164 Some(host) => host,
165 None => Err(anyhow::anyhow!(
166 "The reverse proxy URL doesn't include the host"
167 ))?,
168 };
169
170 let port = proxy_request_url.port_u16().unwrap_or(match scheme_str {
171 Some("http") => 80,
172 Some("https") => 443,
173 _ => 80,
174 });
175
176 let addr = format!("{}:{}", host, port);
177 let authority = proxy_request_url.authority().cloned();
178
179 let hyper_request_path = hyper_request_parts.uri.path();
180
181 let path = match hyper_request_path.as_bytes().first() {
182 Some(b'/') => {
183 let mut proxy_request_path = proxy_request_url.path();
184 while proxy_request_path.as_bytes().last().copied() == Some(b'/') {
185 proxy_request_path = &proxy_request_path[..(proxy_request_path.len() - 1)];
186 }
187 format!("{}{}", proxy_request_path, hyper_request_path)
188 }
189 _ => hyper_request_path.to_string(),
190 };
191
192 hyper_request_parts.uri = Uri::from_str(&format!(
193 "{}{}",
194 path,
195 match hyper_request_parts.uri.query() {
196 Some(query) => format!("?{}", query),
197 None => "".to_string(),
198 }
199 ))?;
200
201 let original_host = hyper_request_parts.headers.get(header::HOST).cloned();
202
203 match authority {
205 Some(authority) => {
206 hyper_request_parts
207 .headers
208 .insert(header::HOST, authority.to_string().parse()?);
209 }
210 None => {
211 hyper_request_parts.headers.remove(header::HOST);
212 }
213 }
214
215 hyper_request_parts
217 .headers
218 .insert(header::CONNECTION, "keep-alive".parse()?);
219
220 hyper_request_parts.headers.insert(
222 "x-forwarded-for",
223 socket_data
224 .remote_addr
225 .ip()
226 .to_canonical()
227 .to_string()
228 .parse()?,
229 );
230
231 if socket_data.encrypted {
232 hyper_request_parts
233 .headers
234 .insert("x-forwarded-proto", "https".parse()?);
235 } else {
236 hyper_request_parts
237 .headers
238 .insert("x-forwarded-proto", "http".parse()?);
239 }
240
241 if let Some(original_host) = original_host {
242 hyper_request_parts
243 .headers
244 .insert("x-forwarded-host", original_host);
245 }
246
247 let proxy_request = Request::from_parts(hyper_request_parts, request_body);
248
249 let connections = &self.connections[rand::random_range(..self.connections.len())];
250
251 let rwlock_read = connections.read().await;
252 let sender_read_option = rwlock_read.get(&addr);
253
254 if let Some(sender_read) = sender_read_option {
255 if !sender_read.is_closed() {
256 drop(rwlock_read);
257 let mut rwlock_write = connections.write().await;
258 let sender_option = rwlock_write.get_mut(&addr);
259
260 if let Some(sender) = sender_option {
261 if !sender.is_closed() {
262 let result = http_proxy_kept_alive(sender, proxy_request, error_logger).await;
263 drop(rwlock_write);
264 return result;
265 } else {
266 drop(rwlock_write);
267 }
268 } else {
269 drop(rwlock_write);
270 }
271 } else {
272 drop(rwlock_read);
273 }
274 } else {
275 drop(rwlock_read);
276 }
277
278 let stream = match TcpStream::connect(&addr).await {
279 Ok(stream) => stream,
280 Err(err) => {
281 if enable_health_check {
282 let mut failed_backends_write = self.failed_backends.write().await;
283 let proxy_to = proxy_to.clone();
284 let failed_attempts = failed_backends_write.get(&proxy_to);
285 failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
286 }
287 match err.kind() {
288 tokio::io::ErrorKind::ConnectionRefused
289 | tokio::io::ErrorKind::NotFound
290 | tokio::io::ErrorKind::HostUnreachable => {
291 error_logger
292 .log(&format!("Service unavailable: {}", err))
293 .await;
294 return Ok(
295 ResponseData::builder_without_request()
296 .status(StatusCode::SERVICE_UNAVAILABLE)
297 .build(),
298 );
299 }
300 tokio::io::ErrorKind::TimedOut => {
301 error_logger.log(&format!("Gateway timeout: {}", err)).await;
302 return Ok(
303 ResponseData::builder_without_request()
304 .status(StatusCode::GATEWAY_TIMEOUT)
305 .build(),
306 );
307 }
308 _ => {
309 error_logger.log(&format!("Bad gateway: {}", err)).await;
310 return Ok(
311 ResponseData::builder_without_request()
312 .status(StatusCode::BAD_GATEWAY)
313 .build(),
314 );
315 }
316 };
317 }
318 };
319
320 match stream.set_nodelay(true) {
321 Ok(_) => (),
322 Err(err) => {
323 if enable_health_check {
324 let mut failed_backends_write = self.failed_backends.write().await;
325 let proxy_to = proxy_to.clone();
326 let failed_attempts = failed_backends_write.get(&proxy_to);
327 failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
328 }
329 error_logger.log(&format!("Bad gateway: {}", err)).await;
330 return Ok(
331 ResponseData::builder_without_request()
332 .status(StatusCode::BAD_GATEWAY)
333 .build(),
334 );
335 }
336 };
337
338 let failed_backends_option_borrowed = if enable_health_check {
339 Some(&*self.failed_backends)
340 } else {
341 None
342 };
343
344 if !encrypted {
345 http_proxy(
346 connections,
347 addr,
348 stream,
349 proxy_request,
350 error_logger,
351 proxy_to,
352 failed_backends_option_borrowed,
353 )
354 .await
355 } else {
356 let tls_client_config = (if disable_certificate_verification {
357 rustls::ClientConfig::builder()
358 .dangerous()
359 .with_custom_certificate_verifier(Arc::new(NoServerVerifier::new()))
360 } else {
361 rustls::ClientConfig::builder().with_root_certificates(self.roots.clone())
362 })
363 .with_no_client_auth();
364 let connector = TlsConnector::from(Arc::new(tls_client_config));
365 let domain = ServerName::try_from(host)?.to_owned();
366
367 let tls_stream = match connector.connect(domain, stream).await {
368 Ok(stream) => stream,
369 Err(err) => {
370 if enable_health_check {
371 let mut failed_backends_write = self.failed_backends.write().await;
372 let proxy_to = proxy_to.clone();
373 let failed_attempts = failed_backends_write.get(&proxy_to);
374 failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
375 }
376 error_logger.log(&format!("Bad gateway: {}", err)).await;
377 return Ok(
378 ResponseData::builder_without_request()
379 .status(StatusCode::BAD_GATEWAY)
380 .build(),
381 );
382 }
383 };
384
385 http_proxy(
386 connections,
387 addr,
388 tls_stream,
389 proxy_request,
390 error_logger,
391 proxy_to,
392 failed_backends_option_borrowed,
393 )
394 .await
395 }
396 } else {
397 Ok(ResponseData::builder(request).build())
398 }
399 })
400 .await
401 }
402
403 async fn proxy_request_handler(
404 &mut self,
405 request: RequestData,
406 _config: &ServerConfig,
407 _socket_data: &SocketData,
408 _error_logger: &ErrorLogger,
409 ) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
410 Ok(ResponseData::builder(request).build())
411 }
412
413 async fn response_modifying_handler(
414 &mut self,
415 response: HyperResponse,
416 ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
417 Ok(response)
418 }
419
420 async fn proxy_response_modifying_handler(
421 &mut self,
422 response: HyperResponse,
423 ) -> Result<HyperResponse, Box<dyn Error + Send + Sync>> {
424 Ok(response)
425 }
426
427 async fn connect_proxy_request_handler(
428 &mut self,
429 _upgraded_request: HyperUpgraded,
430 _connect_address: &str,
431 _config: &ServerConfig,
432 _socket_data: &SocketData,
433 _error_logger: &ErrorLogger,
434 ) -> Result<(), Box<dyn Error + Send + Sync>> {
435 Ok(())
436 }
437
438 fn does_connect_proxy_requests(&mut self) -> bool {
439 false
440 }
441
442 async fn websocket_request_handler(
443 &mut self,
444 websocket: HyperWebsocket,
445 uri: &hyper::Uri,
446 config: &ServerConfig,
447 socket_data: &SocketData,
448 error_logger: &ErrorLogger,
449 ) -> Result<(), Box<dyn Error + Send + Sync>> {
450 WithRuntime::new(self.handle.clone(), async move {
451 let enable_health_check = config["enableLoadBalancerHealthCheck"]
452 .as_bool()
453 .unwrap_or(false);
454 let health_check_max_fails = config["loadBalancerHealthCheckMaximumFails"]
455 .as_i64()
456 .unwrap_or(3) as u64;
457
458 let disable_certificate_verification = config["disableProxyCertificateVerification"]
459 .as_bool()
460 .unwrap_or(false);
461 if let Some(proxy_to) = determine_proxy_to(
462 config,
463 socket_data.encrypted,
464 &self.failed_backends,
465 enable_health_check,
466 health_check_max_fails,
467 )
468 .await
469 {
470 let proxy_request_url = proxy_to.parse::<hyper::Uri>()?;
471 let scheme_str = proxy_request_url.scheme_str();
472 let mut encrypted = false;
473
474 match scheme_str {
475 Some("http") => {
476 encrypted = false;
477 }
478 Some("https") => {
479 encrypted = true;
480 }
481 _ => Err(anyhow::anyhow!(
482 "Only HTTP and HTTPS reverse proxy URLs are supported."
483 ))?,
484 };
485
486 let request_path = uri.path();
487
488 let path = match request_path.as_bytes().first() {
489 Some(b'/') => {
490 let mut proxy_request_path = proxy_request_url.path();
491 while proxy_request_path.as_bytes().last().copied() == Some(b'/') {
492 proxy_request_path = &proxy_request_path[..(proxy_request_path.len() - 1)];
493 }
494 format!("{}{}", proxy_request_path, request_path)
495 }
496 _ => request_path.to_string(),
497 };
498
499 let mut proxy_request_url_parts = proxy_request_url.into_parts();
500 proxy_request_url_parts.scheme = if encrypted {
501 Some(Scheme::from_str("wss")?)
502 } else {
503 Some(Scheme::from_str("ws")?)
504 };
505 match proxy_request_url_parts.path_and_query {
506 Some(path_and_query) => {
507 let path_and_query_string = match path_and_query.query() {
508 Some(query) => {
509 format!("{}?{}", path, query)
510 }
511 None => path,
512 };
513 proxy_request_url_parts.path_and_query =
514 Some(PathAndQuery::from_str(&path_and_query_string)?);
515 }
516 None => {
517 proxy_request_url_parts.path_and_query = Some(PathAndQuery::from_str(&path)?);
518 }
519 };
520
521 let proxy_request_url = hyper::Uri::from_parts(proxy_request_url_parts)?;
522
523 let connector = if !encrypted {
524 Connector::Plain
525 } else {
526 Connector::Rustls(Arc::new(
527 (if disable_certificate_verification {
528 rustls::ClientConfig::builder()
529 .dangerous()
530 .with_custom_certificate_verifier(Arc::new(NoServerVerifier::new()))
531 } else {
532 rustls::ClientConfig::builder().with_root_certificates(self.roots.clone())
533 })
534 .with_no_client_auth(),
535 ))
536 };
537
538 let client_bi_stream = websocket.await?;
539
540 let (proxy_bi_stream, _) = match tokio_tungstenite::connect_async_tls_with_config(
541 proxy_request_url,
542 None,
543 true,
544 Some(connector),
545 )
546 .await
547 {
548 Ok(data) => data,
549 Err(err) => {
550 error_logger
551 .log(&format!("Cannot connect to WebSocket server: {}", err))
552 .await;
553 return Ok(());
554 }
555 };
556
557 let (mut client_sink, mut client_stream) = client_bi_stream.split();
558 let (mut proxy_sink, mut proxy_stream) = proxy_bi_stream.split();
559
560 let client_to_proxy = async {
561 while let Some(Ok(value)) = client_stream.next().await {
562 if proxy_sink.send(value).await.is_err() {
563 break;
564 }
565 }
566 };
567
568 let proxy_to_client = async {
569 while let Some(Ok(value)) = proxy_stream.next().await {
570 if client_sink.send(value).await.is_err() {
571 break;
572 }
573 }
574 };
575
576 tokio::pin!(client_to_proxy);
577 tokio::pin!(proxy_to_client);
578
579 let client_to_proxy_first;
580 tokio::select! {
581 _ = &mut client_to_proxy => {
582 client_to_proxy_first = true;
583 }
584 _ = &mut proxy_to_client => {
585 client_to_proxy_first = false;
586 }
587 }
588
589 if client_to_proxy_first {
590 proxy_to_client.await;
591 } else {
592 client_to_proxy.await;
593 }
594 }
595
596 Ok(())
597 })
598 .await
599 }
600
601 fn does_websocket_requests(&mut self, config: &ServerConfig, socket_data: &SocketData) -> bool {
602 if socket_data.encrypted {
603 let secure_proxy_to = &config["secureProxyTo"];
604 if secure_proxy_to.as_vec().is_some() || secure_proxy_to.as_str().is_some() {
605 return true;
606 }
607 }
608
609 let proxy_to = &config["proxyTo"];
610 proxy_to.as_vec().is_some() || proxy_to.as_str().is_some()
611 }
612}
613
614async fn determine_proxy_to(
615 config: &ServerConfig,
616 encrypted: bool,
617 failed_backends: &RwLock<TtlCache<String, u64>>,
618 enable_health_check: bool,
619 health_check_max_fails: u64,
620) -> Option<String> {
621 let mut proxy_to = None;
622 if encrypted {
626 let secure_proxy_to_yaml = &config["secureProxyTo"];
627 if let Some(secure_proxy_to_vector) = secure_proxy_to_yaml.as_vec() {
628 if enable_health_check {
629 let mut secure_proxy_to_vector = secure_proxy_to_vector.clone();
630 loop {
631 if !secure_proxy_to_vector.is_empty() {
632 let index = rand::random_range(..secure_proxy_to_vector.len());
633 if let Some(secure_proxy_to) = secure_proxy_to_vector[index].as_str() {
634 proxy_to = Some(secure_proxy_to.to_string());
635 let failed_backends_read = failed_backends.read().await;
636 let failed_backend_fails =
637 match failed_backends_read.get(&secure_proxy_to.to_string()) {
638 Some(fails) => fails,
639 None => break,
640 };
641 if failed_backend_fails > health_check_max_fails {
642 secure_proxy_to_vector.remove(index);
643 } else {
644 break;
645 }
646 }
647 } else {
648 break;
649 }
650 }
651 } else if !secure_proxy_to_vector.is_empty() {
652 if let Some(secure_proxy_to) =
653 secure_proxy_to_vector[rand::random_range(..secure_proxy_to_vector.len())].as_str()
654 {
655 proxy_to = Some(secure_proxy_to.to_string());
656 }
657 }
658 } else if let Some(secure_proxy_to) = secure_proxy_to_yaml.as_str() {
659 proxy_to = Some(secure_proxy_to.to_string());
660 }
661 }
662
663 if proxy_to.is_none() {
664 let proxy_to_yaml = &config["proxyTo"];
665 if let Some(proxy_to_vector) = proxy_to_yaml.as_vec() {
666 if enable_health_check {
667 let mut proxy_to_vector = proxy_to_vector.clone();
668 loop {
669 if !proxy_to_vector.is_empty() {
670 let index = rand::random_range(..proxy_to_vector.len());
671 if let Some(proxy_to_str) = proxy_to_vector[index].as_str() {
672 proxy_to = Some(proxy_to_str.to_string());
673 let failed_backends_read = failed_backends.read().await;
674 let failed_backend_fails = match failed_backends_read.get(&proxy_to_str.to_string()) {
675 Some(fails) => fails,
676 None => break,
677 };
678 if failed_backend_fails > health_check_max_fails {
679 proxy_to_vector.remove(index);
680 } else {
681 break;
682 }
683 }
684 } else {
685 break;
686 }
687 }
688 } else if !proxy_to_vector.is_empty() {
689 if let Some(proxy_to_str) =
690 proxy_to_vector[rand::random_range(..proxy_to_vector.len())].as_str()
691 {
692 proxy_to = Some(proxy_to_str.to_string());
693 }
694 }
695 } else if let Some(proxy_to_str) = proxy_to_yaml.as_str() {
696 proxy_to = Some(proxy_to_str.to_string());
697 }
698 }
699
700 proxy_to
701}
702
703async fn http_proxy(
704 connections: &RwLock<HashMap<String, SendRequest<BoxBody<Bytes, std::io::Error>>>>,
705 connect_addr: String,
706 stream: impl AsyncRead + AsyncWrite + Send + Unpin + 'static,
707 proxy_request: Request<BoxBody<Bytes, std::io::Error>>,
708 error_logger: &ErrorLogger,
709 proxy_to: String,
710 failed_backends: Option<&tokio::sync::RwLock<TtlCache<std::string::String, u64>>>,
711) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
712 let io = TokioIo::new(stream);
713
714 let (mut sender, conn) = match hyper::client::conn::http1::handshake(io).await {
715 Ok(data) => data,
716 Err(err) => {
717 if let Some(failed_backends) = failed_backends {
718 let mut failed_backends_write = failed_backends.write().await;
719 let failed_attempts = failed_backends_write.get(&proxy_to);
720 failed_backends_write.insert(proxy_to, failed_attempts.map_or(1, |x| x + 1));
721 }
722 error_logger.log(&format!("Bad gateway: {}", err)).await;
723 return Ok(
724 ResponseData::builder_without_request()
725 .status(StatusCode::BAD_GATEWAY)
726 .build(),
727 );
728 }
729 };
730
731 let send_request = sender.send_request(proxy_request);
732
733 let mut pinned_conn = Box::pin(conn);
734 tokio::pin!(send_request);
735
736 let response;
737
738 loop {
739 tokio::select! {
740 biased;
741
742 proxy_response = &mut send_request => {
743 let proxy_response = match proxy_response {
744 Ok(response) => response,
745 Err(err) => {
746 error_logger.log(&format!("Bad gateway: {}", err)).await;
747 return Ok(ResponseData::builder_without_request().status(StatusCode::BAD_GATEWAY).build());
748 }
749 };
750
751 response = ResponseData::builder_without_request()
752 .response(proxy_response.map(|b| {
753 b.map_err(|e| std::io::Error::other(e.to_string()))
754 .boxed()
755 }))
756 .parallel_fn(async move {
757 pinned_conn.await.unwrap_or_default();
758 })
759 .build();
760
761 break;
762 },
763 state = &mut pinned_conn => {
764 if state.is_err() {
765 error_logger.log("Bad gateway: incomplete response").await;
766 return Ok(ResponseData::builder_without_request().status(StatusCode::BAD_GATEWAY).build());
767 }
768 },
769 };
770 }
771
772 if !sender.is_closed() {
773 let mut rwlock_write = connections.write().await;
774 rwlock_write.insert(connect_addr, sender);
775 drop(rwlock_write);
776 }
777
778 Ok(response)
779}
780
781async fn http_proxy_kept_alive(
782 sender: &mut SendRequest<BoxBody<Bytes, std::io::Error>>,
783 proxy_request: Request<BoxBody<Bytes, std::io::Error>>,
784 error_logger: &ErrorLogger,
785) -> Result<ResponseData, Box<dyn Error + Send + Sync>> {
786 let proxy_response = match sender.send_request(proxy_request).await {
787 Ok(response) => response,
788 Err(err) => {
789 error_logger.log(&format!("Bad gateway: {}", err)).await;
790 return Ok(
791 ResponseData::builder_without_request()
792 .status(StatusCode::BAD_GATEWAY)
793 .build(),
794 );
795 }
796 };
797
798 let response = ResponseData::builder_without_request()
799 .response(proxy_response.map(|b| b.map_err(|e| std::io::Error::other(e.to_string())).boxed()))
800 .build();
801
802 Ok(response)
803}