Skip to content

Commit 2dbb5bd

Browse files
committed
RUST-575 Consider server load during server selection
1 parent a27d05d commit 2dbb5bd

39 files changed

+1334
-260
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ lazy_static = "1.4.0"
4141
md-5 = "0.8.0"
4242
os_info = { version = "2.0.6", default-features = false }
4343
percent-encoding = "2.0.0"
44-
rand = "0.7.2"
44+
rand = { version = "0.7.2", features = ["small_rng"] }
4545
serde_with = "1.3.1"
4646
sha-1 = "0.8.1"
4747
sha2 = "0.8.0"

src/client/executor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent},
1313
operation::{Operation, Retryability},
1414
options::SelectionCriteria,
15-
sdam::{Server, SessionSupportStatus},
15+
sdam::{SelectedServer, SessionSupportStatus},
1616
};
1717

1818
lazy_static! {
@@ -356,7 +356,7 @@ impl Client {
356356
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
357357
server_info.server_type().is_data_bearing()
358358
}));
359-
let _: Arc<Server> = self.select_server(Some(&criteria)).await?;
359+
let _: SelectedServer = self.select_server(Some(&criteria)).await?;
360360
Ok(self.inner.topology.session_support_status().await)
361361
}
362362
_ => Ok(initial_status),

src/client/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
ReadPreference,
2525
SelectionCriteria,
2626
},
27-
sdam::{Server, SessionSupportStatus, Topology},
27+
sdam::{SelectedServer, SessionSupportStatus, Topology},
2828
};
2929
pub(crate) use session::{ClientSession, ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
3030
use session::{ServerSession, ServerSessionPool};
@@ -246,7 +246,7 @@ impl Client {
246246

247247
/// Select a server using the provided criteria. If none is provided, a primary read preference
248248
/// will be used instead.
249-
async fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<Arc<Server>> {
249+
async fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<SelectedServer> {
250250
let criteria =
251251
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
252252

src/client/options/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use rustls::{
2222
ServerCertVerifier,
2323
TLSError,
2424
};
25-
use serde::Deserialize;
25+
use serde::{de::Error, Deserialize, Deserializer};
2626
use strsim::jaro_winkler;
2727
pub use trust_dns_resolver::config::ResolverConfig;
2828
use typed_builder::TypedBuilder;
@@ -91,7 +91,7 @@ lazy_static! {
9191
}
9292

9393
/// A hostname:port address pair.
94-
#[derive(Clone, Debug, Deserialize, Eq)]
94+
#[derive(Clone, Debug, Eq)]
9595
pub struct StreamAddress {
9696
/// The hostname of the address.
9797
pub hostname: String,
@@ -102,6 +102,16 @@ pub struct StreamAddress {
102102
pub port: Option<u16>,
103103
}
104104

105+
impl<'de> Deserialize<'de> for StreamAddress {
106+
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
107+
where
108+
D: Deserializer<'de>,
109+
{
110+
let s: String = Deserialize::deserialize(deserializer)?;
111+
Self::parse(s.as_str()).map_err(|e| D::Error::custom(format!("{}", e)))
112+
}
113+
}
114+
105115
impl Default for StreamAddress {
106116
fn default() -> Self {
107117
Self {

src/cmap/connection_requester.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,22 @@ use crate::{
99
};
1010
use std::time::Duration;
1111

12+
/// Returns a new requester/receiver pair.
13+
pub(super) fn channel(
14+
address: StreamAddress,
15+
handle: PoolWorkerHandle,
16+
) -> (ConnectionRequester, ConnectionRequestReceiver) {
17+
let (sender, receiver) = mpsc::unbounded_channel();
18+
(
19+
ConnectionRequester {
20+
address,
21+
sender,
22+
handle,
23+
},
24+
ConnectionRequestReceiver::new(receiver),
25+
)
26+
}
27+
1228
/// Handle for requesting Connections from the pool.
1329
/// This requester will keep the pool alive. Once all requesters have been dropped,
1430
/// the pool will stop servicing requests, drop its available connections, and close.
@@ -20,22 +36,6 @@ pub(super) struct ConnectionRequester {
2036
}
2137

2238
impl ConnectionRequester {
23-
/// Returns a new requester/receiver pair.
24-
pub(super) fn new(
25-
address: StreamAddress,
26-
handle: PoolWorkerHandle,
27-
) -> (Self, ConnectionRequestReceiver) {
28-
let (sender, receiver) = mpsc::unbounded_channel();
29-
(
30-
Self {
31-
address,
32-
sender,
33-
handle,
34-
},
35-
ConnectionRequestReceiver::new(receiver),
36-
)
37-
}
38-
3939
/// Request a connection from the pool that owns the receiver end of this requester.
4040
/// Returns an error if it takes longer than wait_queue_timeout before either a connection is
4141
/// received or an establishment begins.

src/cmap/manager.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@ use tokio::sync::mpsc;
33
use super::Connection;
44
use crate::error::Error;
55

6+
pub(super) fn channel() -> (PoolManager, ManagementRequestReceiver) {
7+
let (sender, receiver) = mpsc::unbounded_channel();
8+
(
9+
PoolManager { sender },
10+
ManagementRequestReceiver { receiver },
11+
)
12+
}
13+
614
/// Struct used to make management requests to the pool (e.g. checking in a connection).
715
/// A PoolManager will NOT keep a pool from going out of scope and closing.
816
#[derive(Clone, Debug)]
917
pub(super) struct PoolManager {
10-
pub(super) sender: mpsc::UnboundedSender<PoolManagementRequest>,
18+
sender: mpsc::UnboundedSender<PoolManagementRequest>,
1119
}
1220

1321
impl PoolManager {
14-
pub(super) fn new() -> (PoolManager, ManagementRequestReceiver) {
15-
let (sender, receiver) = mpsc::unbounded_channel();
16-
(Self { sender }, ManagementRequestReceiver { receiver })
17-
}
18-
1922
/// Lazily clear the pool.
2023
pub(super) fn clear(&self) {
2124
let _ = self.sender.send(PoolManagementRequest::Clear);

src/cmap/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ use connection_requester::ConnectionRequester;
3131
use manager::PoolManager;
3232
use worker::ConnectionPoolWorker;
3333

34+
#[cfg(test)]
35+
use self::worker::PoolWorkerHandle;
36+
3437
const DEFAULT_MAX_POOL_SIZE: u32 = 100;
3538

3639
/// A pool of connections implementing the CMAP spec. All state is kept internally in an `Arc`, and
@@ -41,6 +44,7 @@ pub(crate) struct ConnectionPool {
4144
address: StreamAddress,
4245
manager: PoolManager,
4346
connection_requester: ConnectionRequester,
47+
4448
wait_queue_timeout: Option<Duration>,
4549

4650
#[derivative(Debug = "ignore")]
@@ -75,6 +79,21 @@ impl ConnectionPool {
7579
}
7680
}
7781

82+
#[cfg(test)]
83+
pub(crate) fn new_mocked(address: StreamAddress) -> Self {
84+
let (manager, _) = manager::channel();
85+
let handle = PoolWorkerHandle::new_mocked();
86+
let (connection_requester, _) = connection_requester::channel(Default::default(), handle);
87+
88+
Self {
89+
address,
90+
manager,
91+
connection_requester,
92+
wait_queue_timeout: None,
93+
event_handler: None,
94+
}
95+
}
96+
7897
fn emit_event<F>(&self, emit: F)
7998
where
8099
F: FnOnce(&Arc<dyn CmapEventHandler>),

src/cmap/test/integration.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ async fn acquire_connection_and_send_command() {
7272
async fn concurrent_connections() {
7373
let _guard = LOCK.run_exclusively().await;
7474

75-
let client = TestClient::new().await;
75+
let mut options = CLIENT_OPTIONS.clone();
76+
options.direct_connection = Some(true);
77+
options.hosts.drain(1..);
78+
79+
let client = TestClient::with_options(Some(options)).await;
7680
let version = VersionReq::parse(">= 4.2.9").unwrap();
7781
// blockConnection failpoint option only supported in 4.2.9+.
7882
if !version.matches(&client.server_version) {
@@ -165,7 +169,7 @@ async fn connection_error_during_establishment() {
165169

166170
let options = FailCommandOptions::builder().error_code(1234).build();
167171
let failpoint = FailPoint::fail_command(&["isMaster"], FailPointMode::Times(10), Some(options));
168-
let _fp_guard = client.enable_failpoint(failpoint).await.unwrap();
172+
let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap();
169173

170174
let handler = Arc::new(EventHandler::new());
171175
let mut subscriber = handler.subscribe();
@@ -213,7 +217,7 @@ async fn connection_error_during_operation() {
213217

214218
let options = FailCommandOptions::builder().close_connection(true).build();
215219
let failpoint = FailPoint::fail_command(&["ping"], FailPointMode::Times(10), Some(options));
216-
let _fp_guard = client.enable_failpoint(failpoint).await.unwrap();
220+
let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap();
217221

218222
let mut subscriber = handler.subscribe();
219223

src/cmap/worker.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ use derivative::Derivative;
22

33
use super::{
44
conn::PendingConnection,
5+
connection_requester,
56
connection_requester::{
67
ConnectionRequest,
78
ConnectionRequestReceiver,
89
ConnectionRequester,
910
RequestedConnection,
1011
},
1112
establish::ConnectionEstablisher,
13+
manager,
1214
manager::{ManagementRequestReceiver, PoolManagementRequest, PoolManager},
1315
options::{ConnectionOptions, ConnectionPoolOptions},
1416
Connection,
@@ -132,10 +134,10 @@ impl ConnectionPoolWorker {
132134
.as_ref()
133135
.map(|pool_options| ConnectionOptions::from(pool_options.clone()));
134136

135-
let (handle_listener, handle) = HandleListener::new();
137+
let (handle, handle_listener) = handle_channel();
136138
let (connection_requester, request_receiver) =
137-
ConnectionRequester::new(address.clone(), handle);
138-
let (manager, management_receiver) = PoolManager::new();
139+
connection_requester::channel(address.clone(), handle);
140+
let (manager, management_receiver) = manager::channel();
139141

140142
let worker = ConnectionPoolWorker {
141143
address,
@@ -498,25 +500,35 @@ impl From<PoolManagementRequest> for PoolTask {
498500
}
499501
}
500502

503+
/// Constructs a new channel for for monitoring whether this pool still has references
504+
/// to it.
505+
fn handle_channel() -> (PoolWorkerHandle, HandleListener) {
506+
let (sender, receiver) = mpsc::channel(1);
507+
(PoolWorkerHandle { sender }, HandleListener { receiver })
508+
}
509+
501510
/// Handle to the worker. Once all handles have been dropped, the worker
502511
/// will stop waiting for new requests and drop the pool itself.
503512
#[derive(Debug, Clone)]
504513
pub(super) struct PoolWorkerHandle {
505514
sender: mpsc::Sender<()>,
506515
}
507516

517+
impl PoolWorkerHandle {
518+
#[cfg(test)]
519+
pub(super) fn new_mocked() -> Self {
520+
let (s, _) = handle_channel();
521+
s
522+
}
523+
}
524+
508525
/// Listener used to determine when all handles have been dropped.
509526
#[derive(Debug)]
510527
struct HandleListener {
511528
receiver: mpsc::Receiver<()>,
512529
}
513530

514531
impl HandleListener {
515-
fn new() -> (Self, PoolWorkerHandle) {
516-
let (sender, receiver) = mpsc::channel(1);
517-
(Self { receiver }, PoolWorkerHandle { sender })
518-
}
519-
520532
/// Listen until all handles are dropped.
521533
/// This will not return until all handles are dropped, so make sure to only poll this via
522534
/// select or with a timeout.

src/sdam/description/topology/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod server_selection;
1+
pub(crate) mod server_selection;
22
#[cfg(test)]
33
mod test;
44

0 commit comments

Comments
 (0)