hyper_util/client/legacy/connect/proxy/socks/v5/
mod.rs1mod errors;
2pub use errors::*;
3
4mod messages;
5use messages::*;
6
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
12
13use http::Uri;
14use hyper::rt::{Read, Write};
15use tower_service::Service;
16
17use bytes::BytesMut;
18
19use pin_project_lite::pin_project;
20
21#[derive(Debug, Clone)]
27pub struct SocksV5<C> {
28 inner: C,
29 config: SocksConfig,
30}
31
32#[derive(Debug, Clone)]
33pub struct SocksConfig {
34 proxy: Uri,
35 proxy_auth: Option<(String, String)>,
36
37 local_dns: bool,
38 optimistic: bool,
39}
40
41#[derive(Debug)]
42enum State {
43 SendingNegReq,
44 ReadingNegRes,
45 SendingAuthReq,
46 ReadingAuthRes,
47 SendingProxyReq,
48 ReadingProxyRes,
49}
50
51pin_project! {
52 #[must_use = "futures do nothing unless polled"]
58 #[allow(missing_debug_implementations)]
59 pub struct Handshaking<F, T, E> {
60 #[pin]
61 fut: BoxHandshaking<T, E>,
62 _marker: std::marker::PhantomData<F>
63 }
64}
65
66type BoxHandshaking<T, E> = Pin<Box<dyn Future<Output = Result<T, super::SocksError<E>>> + Send>>;
67
68impl<C> SocksV5<C> {
69 pub fn new(proxy_dst: Uri, connector: C) -> Self {
78 Self {
79 inner: connector,
80 config: SocksConfig::new(proxy_dst),
81 }
82 }
83
84 pub fn with_auth(mut self, user: String, pass: String) -> Self {
90 self.config.proxy_auth = Some((user, pass));
91 self
92 }
93
94 pub fn local_dns(mut self, local_dns: bool) -> Self {
99 self.config.local_dns = local_dns;
100 self
101 }
102
103 pub fn send_optimistically(mut self, optimistic: bool) -> Self {
113 self.config.optimistic = optimistic;
114 self
115 }
116}
117
118impl SocksConfig {
119 fn new(proxy: Uri) -> Self {
120 Self {
121 proxy,
122 proxy_auth: None,
123
124 local_dns: false,
125 optimistic: false,
126 }
127 }
128
129 async fn execute<T, E>(
130 self,
131 mut conn: T,
132 host: String,
133 port: u16,
134 ) -> Result<T, super::SocksError<E>>
135 where
136 T: Read + Write + Unpin,
137 {
138 let address = match host.parse::<IpAddr>() {
139 Ok(ip) => Address::Socket(SocketAddr::new(ip, port)),
140 Err(_) if host.len() <= 255 => {
141 if self.local_dns {
142 let socket = (host, port)
143 .to_socket_addrs()?
144 .next()
145 .ok_or(super::SocksError::DnsFailure)?;
146
147 Address::Socket(socket)
148 } else {
149 Address::Domain(host, port)
150 }
151 }
152 Err(_) => return Err(SocksV5Error::HostTooLong.into()),
153 };
154
155 let method = if self.proxy_auth.is_some() {
156 AuthMethod::UserPass
157 } else {
158 AuthMethod::NoAuth
159 };
160
161 let mut recv_buf = BytesMut::with_capacity(513); let mut send_buf = BytesMut::with_capacity(262); let mut state = State::SendingNegReq;
164
165 loop {
166 match state {
167 State::SendingNegReq => {
168 let req = NegotiationReq(&method);
169
170 let start = send_buf.len();
171 req.write_to_buf(&mut send_buf)?;
172 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
173
174 if self.optimistic {
175 if method == AuthMethod::UserPass {
176 state = State::SendingAuthReq;
177 } else {
178 state = State::SendingProxyReq;
179 }
180 } else {
181 state = State::ReadingNegRes;
182 }
183 }
184
185 State::ReadingNegRes => {
186 let res: NegotiationRes = super::read_message(&mut conn, &mut recv_buf).await?;
187
188 if res.0 == AuthMethod::NoneAcceptable {
189 return Err(SocksV5Error::Auth(AuthError::Unsupported).into());
190 }
191
192 if res.0 != method {
193 return Err(SocksV5Error::Auth(AuthError::MethodMismatch).into());
194 }
195
196 if self.optimistic {
197 if res.0 == AuthMethod::UserPass {
198 state = State::ReadingAuthRes;
199 } else {
200 state = State::ReadingProxyRes;
201 }
202 } else {
203 if res.0 == AuthMethod::UserPass {
204 state = State::SendingAuthReq;
205 } else {
206 state = State::SendingProxyReq;
207 }
208 }
209 }
210
211 State::SendingAuthReq => {
212 let (user, pass) = self.proxy_auth.as_ref().unwrap();
213 let req = AuthenticationReq(&user, &pass);
214
215 let start = send_buf.len();
216 req.write_to_buf(&mut send_buf)?;
217 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
218
219 if self.optimistic {
220 state = State::SendingProxyReq;
221 } else {
222 state = State::ReadingAuthRes;
223 }
224 }
225
226 State::ReadingAuthRes => {
227 let res: AuthenticationRes =
228 super::read_message(&mut conn, &mut recv_buf).await?;
229
230 if !res.0 {
231 return Err(SocksV5Error::Auth(AuthError::Failed).into());
232 }
233
234 if self.optimistic {
235 state = State::ReadingProxyRes;
236 } else {
237 state = State::SendingProxyReq;
238 }
239 }
240
241 State::SendingProxyReq => {
242 let req = ProxyReq(&address);
243
244 let start = send_buf.len();
245 req.write_to_buf(&mut send_buf)?;
246 crate::rt::write_all(&mut conn, &send_buf[start..]).await?;
247
248 if self.optimistic {
249 state = State::ReadingNegRes;
250 } else {
251 state = State::ReadingProxyRes;
252 }
253 }
254
255 State::ReadingProxyRes => {
256 let res: ProxyRes = super::read_message(&mut conn, &mut recv_buf).await?;
257
258 if res.0 == Status::Success {
259 return Ok(conn);
260 } else {
261 return Err(SocksV5Error::Command(res.0).into());
262 }
263 }
264 }
265 }
266 }
267}
268
269impl<C> Service<Uri> for SocksV5<C>
270where
271 C: Service<Uri>,
272 C::Future: Send + 'static,
273 C::Response: Read + Write + Unpin + Send + 'static,
274 C::Error: Send + 'static,
275{
276 type Response = C::Response;
277 type Error = super::SocksError<C::Error>;
278 type Future = Handshaking<C::Future, C::Response, C::Error>;
279
280 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
281 self.inner.poll_ready(cx).map_err(super::SocksError::Inner)
282 }
283
284 fn call(&mut self, dst: Uri) -> Self::Future {
285 let config = self.config.clone();
286 let connecting = self.inner.call(config.proxy.clone());
287
288 let fut = async move {
289 let port = dst.port().map(|p| p.as_u16()).unwrap_or(443);
290 let host = dst
291 .host()
292 .ok_or(super::SocksError::MissingHost)?
293 .to_string();
294
295 let conn = connecting.await.map_err(super::SocksError::Inner)?;
296 config.execute(conn, host, port).await
297 };
298
299 Handshaking {
300 fut: Box::pin(fut),
301 _marker: Default::default(),
302 }
303 }
304}
305
306impl<F, T, E> Future for Handshaking<F, T, E>
307where
308 F: Future<Output = Result<T, E>>,
309{
310 type Output = Result<T, super::SocksError<E>>;
311
312 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
313 self.project().fut.poll(cx)
314 }
315}