hyper_util/client/legacy/connect/proxy/socks/v4/
mod.rs1mod errors;
2pub use errors::*;
3
4mod messages;
5use messages::*;
6
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use std::net::{IpAddr, SocketAddr, SocketAddrV4, ToSocketAddrs};
12
13use http::Uri;
14use hyper::rt::{Read, Write};
15use tower_service::Service;
16
17use bytes::BytesMut;
18
19use pin_project_lite::pin_project;
20
21#[derive(Debug, Clone)]
27pub struct SocksV4<C> {
28 inner: C,
29 config: SocksConfig,
30}
31
32#[derive(Debug, Clone)]
33struct SocksConfig {
34 proxy: Uri,
35 local_dns: bool,
36}
37
38pin_project! {
39 #[must_use = "futures do nothing unless polled"]
45 #[allow(missing_debug_implementations)]
46 pub struct Handshaking<F, T, E> {
47 #[pin]
48 fut: BoxHandshaking<T, E>,
49 _marker: std::marker::PhantomData<F>
50 }
51}
52
53type BoxHandshaking<T, E> = Pin<Box<dyn Future<Output = Result<T, super::SocksError<E>>> + Send>>;
54
55impl<C> SocksV4<C> {
56 pub fn new(proxy_dst: Uri, connector: C) -> Self {
65 Self {
66 inner: connector,
67 config: SocksConfig::new(proxy_dst),
68 }
69 }
70
71 pub fn local_dns(mut self, local_dns: bool) -> Self {
76 self.config.local_dns = local_dns;
77 self
78 }
79}
80
81impl SocksConfig {
82 pub fn new(proxy: Uri) -> Self {
83 Self {
84 proxy,
85 local_dns: false,
86 }
87 }
88
89 async fn execute<T, E>(
90 self,
91 mut conn: T,
92 host: String,
93 port: u16,
94 ) -> Result<T, super::SocksError<E>>
95 where
96 T: Read + Write + Unpin,
97 {
98 let address = match host.parse::<IpAddr>() {
99 Ok(IpAddr::V6(_)) => return Err(SocksV4Error::IpV6.into()),
100 Ok(IpAddr::V4(ip)) => Address::Socket(SocketAddrV4::new(ip.into(), port)),
101 Err(_) => {
102 if self.local_dns {
103 (host, port)
104 .to_socket_addrs()?
105 .find_map(|s| {
106 if let SocketAddr::V4(v4) = s {
107 Some(Address::Socket(v4))
108 } else {
109 None
110 }
111 })
112 .ok_or(super::SocksError::DnsFailure)?
113 } else {
114 Address::Domain(host, port)
115 }
116 }
117 };
118
119 let mut send_buf = BytesMut::with_capacity(1024);
120 let mut recv_buf = BytesMut::with_capacity(1024);
121
122 let req = Request(&address);
124 let n = req.write_to_buf(&mut send_buf)?;
125 crate::rt::write_all(&mut conn, &send_buf[..n]).await?;
126
127 let res: Response = super::read_message(&mut conn, &mut recv_buf).await?;
129 if res.0 == Status::Success {
130 Ok(conn)
131 } else {
132 Err(SocksV4Error::Command(res.0).into())
133 }
134 }
135}
136
137impl<C> Service<Uri> for SocksV4<C>
138where
139 C: Service<Uri>,
140 C::Future: Send + 'static,
141 C::Response: Read + Write + Unpin + Send + 'static,
142 C::Error: Send + 'static,
143{
144 type Response = C::Response;
145 type Error = super::SocksError<C::Error>;
146 type Future = Handshaking<C::Future, C::Response, C::Error>;
147
148 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
149 self.inner.poll_ready(cx).map_err(super::SocksError::Inner)
150 }
151
152 fn call(&mut self, dst: Uri) -> Self::Future {
153 let config = self.config.clone();
154 let connecting = self.inner.call(config.proxy.clone());
155
156 let fut = async move {
157 let port = dst.port().map(|p| p.as_u16()).unwrap_or(443);
158 let host = dst
159 .host()
160 .ok_or(super::SocksError::MissingHost)?
161 .to_string();
162
163 let conn = connecting.await.map_err(super::SocksError::Inner)?;
164 config.execute(conn, host, port).await
165 };
166
167 Handshaking {
168 fut: Box::pin(fut),
169 _marker: Default::default(),
170 }
171 }
172}
173
174impl<F, T, E> Future for Handshaking<F, T, E>
175where
176 F: Future<Output = Result<T, E>>,
177{
178 type Output = Result<T, super::SocksError<E>>;
179
180 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181 self.project().fut.poll(cx)
182 }
183}