Skip to content

Commit 8cdc95b

Browse files
wip
1 parent eb69b8c commit 8cdc95b

File tree

7 files changed

+166
-10
lines changed

7 files changed

+166
-10
lines changed

Cargo.lock

Lines changed: 44 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ opentelemetry = ["dep:opentelemetry"]
8484
# Capture backtraces in errors. This can be slow, memory intensive, and very verbose.
8585
error-backtrace = []
8686

87+
# Enable SOCKS5 proxy support.
88+
socks5-proxy = ["dep:fast-socks5"]
89+
90+
8791
[dependencies]
8892
base64 = "0.22"
8993
bitflags = "2"
@@ -93,6 +97,7 @@ chrono = { version = "0.4.32", default-features = false, optional = true, featur
9397
] }
9498
derive_more = { version = "2", features = ["display", "from"] }
9599
derive-where = "1.2.7"
100+
fast-socks5 = { version = "0.10", optional = true }
96101
flate2 = { version = "1.0", optional = true }
97102
futures-io = "0.3.21"
98103
futures-core = "0.3.14"

src/client/options.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ const URI_OPTIONS: &[&str] = &[
7575
"maxpoolsize",
7676
"minpoolsize",
7777
"maxconnecting",
78+
"proxyHost",
79+
"proxyPort",
80+
"proxyUsername",
81+
"proxyPassword",
7882
"readconcernlevel",
7983
"readpreference",
8084
"readpreferencetags",
@@ -616,6 +620,18 @@ pub struct ClientOptions {
616620
#[cfg(feature = "opentelemetry")]
617621
pub tracing: Option<crate::otel::OpentelemetryOptions>,
618622

623+
#[cfg(feature = "socks5-proxy")]
624+
pub proxy_host: Option<String>,
625+
626+
#[cfg(feature = "socks5-proxy")]
627+
pub proxy_port: Option<u16>,
628+
629+
#[cfg(feature = "socks5-proxy")]
630+
pub proxy_username: Option<String>,
631+
632+
#[cfg(feature = "socks5-proxy")]
633+
pub proxy_password: Option<String>,
634+
619635
/// Information from the SRV URI that generated these client options, if applicable.
620636
#[builder(setter(skip))]
621637
#[serde(skip)]

src/client/options/parse.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,15 @@ impl ClientOptions {
164164
srv_service_name: conn_str.srv_service_name,
165165
#[cfg(feature = "opentelemetry")]
166166
tracing: None,
167+
// isabeltodo propagate from connstr
168+
#[cfg(feature = "socks5-proxy")]
169+
proxy_host: None,
170+
#[cfg(feature = "socks5-proxy")]
171+
proxy_port: None,
172+
#[cfg(feature = "socks5-proxy")]
173+
proxy_username: None,
174+
#[cfg(feature = "socks5-proxy")]
175+
proxy_password: None,
167176
}
168177
}
169178
}

src/cmap/establish.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ pub(crate) struct ConnectionEstablisher {
4040

4141
#[cfg(test)]
4242
test_patch_reply: Option<fn(&mut Result<HelloReply>)>,
43+
44+
#[cfg(feature = "socks5-proxy")]
45+
proxy: Option<Proxy>,
4346
}
4447

4548
pub(crate) struct EstablisherOptions {
@@ -48,6 +51,64 @@ pub(crate) struct EstablisherOptions {
4851
connect_timeout: Option<Duration>,
4952
#[cfg(test)]
5053
pub(crate) test_patch_reply: Option<fn(&mut Result<HelloReply>)>,
54+
#[cfg(feature = "socks5-proxy")]
55+
proxy: Option<Proxy>,
56+
}
57+
58+
#[derive(Clone)]
59+
#[cfg(feature = "socks5-proxy")]
60+
struct Proxy {
61+
host: String,
62+
port: u16,
63+
authentication: Option<(String, String)>,
64+
}
65+
66+
#[cfg(feature = "socks5-proxy")]
67+
impl Proxy {
68+
fn from_client_options(options: &crate::options::ClientOptions) -> Option<Self> {
69+
let host = options.proxy_host.as_ref()?.clone();
70+
let port = options.proxy_port.unwrap_or(1080);
71+
let authentication = match (&options.proxy_username, &options.proxy_password) {
72+
(Some(username), Some(password)) => Some((username.clone(), password.clone())),
73+
// ClientOptions::validate will return an error if the username and password are not
74+
// provided together or both omitted.
75+
_ => None,
76+
};
77+
Some(Self {
78+
host,
79+
port,
80+
authentication,
81+
})
82+
}
83+
84+
async fn connect(
85+
&self,
86+
address: ServerAddress,
87+
connect_timeout: Duration,
88+
) -> Result<AsyncStream> {
89+
use fast_socks5::client::{Config, Socks5Stream};
90+
91+
let mut config = Config::default();
92+
config.set_connect_timeout(connect_timeout.as_secs());
93+
94+
let stream = if let Some((username, password)) = self.authentication.as_ref() {
95+
Socks5Stream::connect_with_password(
96+
address.to_string(),
97+
self.host.clone(),
98+
self.port,
99+
username.clone(),
100+
password.clone(),
101+
config,
102+
)
103+
.await
104+
} else {
105+
Socks5Stream::connect(address.to_string(), self.host.clone(), self.port, config).await
106+
}
107+
.map_err(|error| ErrorKind::ProxyConnect {
108+
message: error.to_string(),
109+
})?;
110+
Ok(AsyncStream::Socks5(stream))
111+
}
51112
}
52113

53114
impl From<&TopologySpec> for EstablisherOptions {
@@ -58,6 +119,8 @@ impl From<&TopologySpec> for EstablisherOptions {
58119
connect_timeout: spec.options.connect_timeout,
59120
#[cfg(test)]
60121
test_patch_reply: None,
122+
#[cfg(feature = "socks5-proxy")]
123+
proxy: Proxy::from_client_options(&spec.options),
61124
}
62125
}
63126
}
@@ -113,6 +176,8 @@ impl ConnectionEstablisher {
113176
let address = pending_connection.address.clone();
114177
let cancellation_receiver = pending_connection.cancellation_receiver.take();
115178

179+
if let Some(ref proxy) = self.proxy {}
180+
116181
let stream = self
117182
.make_stream(address)
118183
.await

src/error.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,8 @@ impl Error {
577577
| ErrorKind::Authentication { .. }
578578
| ErrorKind::Custom(_)
579579
| ErrorKind::Shutdown
580-
| ErrorKind::GridFs(_) => {}
580+
| ErrorKind::GridFs(_)
581+
| ErrorKind::ProxyConnect { .. } => {}
581582
#[cfg(feature = "in-use-encryption")]
582583
ErrorKind::Encryption(_) => {}
583584
#[cfg(feature = "bson-3")]
@@ -766,6 +767,12 @@ pub enum ErrorKind {
766767
/// A method was called on a client that was shut down.
767768
#[error("Client has been shut down")]
768769
Shutdown,
770+
771+
/// An error occurred when connecting to a proxy host.
772+
#[error("An error occurred when connecting to a proxy host: {message}")]
773+
#[non_exhaustive]
774+
#[cfg(feature = "socks5-proxy")]
775+
ProxyConnect { message: String },
769776
}
770777

771778
impl ErrorKind {
@@ -812,6 +819,7 @@ impl ErrorKind {
812819
ErrorKind::Encryption(..) => "Encryption",
813820
ErrorKind::Custom(..) => "Custom",
814821
ErrorKind::Shutdown => "Shutdown",
822+
ErrorKind::ProxyConnect { .. } => "ProxyConnect",
815823
}
816824
}
817825
}

src/runtime/stream.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ pub(crate) enum AsyncStream {
3838
/// A Unix domain socket connection.
3939
#[cfg(unix)]
4040
Unix(tokio::net::UnixStream),
41+
42+
/// A connection to a SOCKS5 proxy.
43+
#[cfg(feature = "socks5-proxy")]
44+
Socks5(fast_socks5::client::Socks5Stream<TcpStream>),
4145
}
4246

4347
impl AsyncStream {
@@ -175,6 +179,10 @@ impl tokio::io::AsyncRead for AsyncStream {
175179
Self::Tls(ref mut inner) => tokio::io::AsyncRead::poll_read(Pin::new(inner), cx, buf),
176180
#[cfg(unix)]
177181
Self::Unix(ref mut inner) => tokio::io::AsyncRead::poll_read(Pin::new(inner), cx, buf),
182+
#[cfg(feature = "socks5-proxy")]
183+
Self::Socks5(ref mut inner) => {
184+
tokio::io::AsyncRead::poll_read(Pin::new(inner), cx, buf)
185+
}
178186
}
179187
}
180188
}
@@ -191,6 +199,8 @@ impl AsyncWrite for AsyncStream {
191199
Self::Tls(ref mut inner) => Pin::new(inner).poll_write(cx, buf),
192200
#[cfg(unix)]
193201
Self::Unix(ref mut inner) => AsyncWrite::poll_write(Pin::new(inner), cx, buf),
202+
#[cfg(feature = "socks5-proxy")]
203+
Self::Socks5(ref mut inner) => AsyncWrite::poll_write(Pin::new(inner), cx, buf),
194204
}
195205
}
196206

@@ -201,6 +211,8 @@ impl AsyncWrite for AsyncStream {
201211
Self::Tls(ref mut inner) => Pin::new(inner).poll_flush(cx),
202212
#[cfg(unix)]
203213
Self::Unix(ref mut inner) => AsyncWrite::poll_flush(Pin::new(inner), cx),
214+
#[cfg(feature = "socks5-proxy")]
215+
Self::Socks5(ref mut inner) => AsyncWrite::poll_flush(Pin::new(inner), cx),
204216
}
205217
}
206218

@@ -211,6 +223,8 @@ impl AsyncWrite for AsyncStream {
211223
Self::Tls(ref mut inner) => Pin::new(inner).poll_shutdown(cx),
212224
#[cfg(unix)]
213225
Self::Unix(ref mut inner) => Pin::new(inner).poll_shutdown(cx),
226+
#[cfg(feature = "socks5-proxy")]
227+
Self::Socks5(ref mut inner) => Pin::new(inner).poll_shutdown(cx),
214228
}
215229
}
216230

@@ -225,6 +239,8 @@ impl AsyncWrite for AsyncStream {
225239
Self::Tls(ref mut inner) => Pin::new(inner).poll_write_vectored(cx, bufs),
226240
#[cfg(unix)]
227241
Self::Unix(ref mut inner) => Pin::new(inner).poll_write_vectored(cx, bufs),
242+
#[cfg(feature = "socks5-proxy")]
243+
Self::Socks5(ref mut inner) => Pin::new(inner).poll_write_vectored(cx, bufs),
228244
}
229245
}
230246

@@ -235,6 +251,8 @@ impl AsyncWrite for AsyncStream {
235251
Self::Tls(ref inner) => inner.is_write_vectored(),
236252
#[cfg(unix)]
237253
Self::Unix(ref inner) => inner.is_write_vectored(),
254+
#[cfg(feature = "socks5-proxy")]
255+
Self::Socks5(ref inner) => inner.is_write_vectored(),
238256
}
239257
}
240258
}

0 commit comments

Comments
 (0)