@@ -5,16 +5,18 @@ use std::{
55} ;
66
77use derive_where:: derive_where;
8- use tokio:: sync:: { mpsc, Mutex } ;
8+ use tokio:: sync:: { broadcast , mpsc, Mutex } ;
99
1010use super :: {
1111 CmapEventEmitter ,
1212 Connection ,
1313 ConnectionGeneration ,
1414 ConnectionInfo ,
15+ Message ,
1516 PendingConnection ,
1617 PinnedConnectionHandle ,
1718 PoolManager ,
19+ RawCommandResponse ,
1820} ;
1921use crate :: {
2022 bson:: oid:: ObjectId ,
@@ -50,7 +52,7 @@ pub(crate) struct PooledConnection {
5052}
5153
5254/// The state of a pooled connection.
53- #[ derive( Clone , Debug ) ]
55+ #[ derive( Debug ) ]
5456enum PooledConnectionState {
5557 /// The state associated with a connection checked into the connection pool.
5658 CheckedIn { available_time : Instant } ,
@@ -59,6 +61,10 @@ enum PooledConnectionState {
5961 CheckedOut {
6062 /// The manager used to check this connection back into the pool.
6163 pool_manager : PoolManager ,
64+
65+ /// The receiver to receive a cancellation notice. Only present on non-load-balanced
66+ /// connections.
67+ cancellation_receiver : Option < broadcast:: Receiver < ( ) > > ,
6268 } ,
6369
6470 /// The state associated with a pinned connection.
@@ -140,6 +146,24 @@ impl PooledConnection {
140146 . and_then ( |sd| sd. service_id )
141147 }
142148
149+ /// Sends a message on this connection.
150+ pub ( crate ) async fn send_message (
151+ & mut self ,
152+ message : impl TryInto < Message , Error = impl Into < Error > > ,
153+ ) -> Result < RawCommandResponse > {
154+ match self . state {
155+ PooledConnectionState :: CheckedOut {
156+ cancellation_receiver : Some ( ref mut cancellation_receiver) ,
157+ ..
158+ } => {
159+ self . connection
160+ . send_message_with_cancellation ( message, cancellation_receiver)
161+ . await
162+ }
163+ _ => self . connection . send_message ( message) . await ,
164+ }
165+ }
166+
143167 /// Updates the state of the connection to indicate that it is checked into the pool.
144168 pub ( crate ) fn mark_checked_in ( & mut self ) {
145169 if !matches ! ( self . state, PooledConnectionState :: CheckedIn { .. } ) {
@@ -155,8 +179,15 @@ impl PooledConnection {
155179 }
156180
157181 /// Updates the state of the connection to indicate that it is checked out of the pool.
158- pub ( crate ) fn mark_checked_out ( & mut self , pool_manager : PoolManager ) {
159- self . state = PooledConnectionState :: CheckedOut { pool_manager } ;
182+ pub ( crate ) fn mark_checked_out (
183+ & mut self ,
184+ pool_manager : PoolManager ,
185+ cancellation_receiver : Option < broadcast:: Receiver < ( ) > > ,
186+ ) {
187+ self . state = PooledConnectionState :: CheckedOut {
188+ pool_manager,
189+ cancellation_receiver,
190+ } ;
160191 }
161192
162193 /// Whether this connection is idle.
@@ -175,15 +206,14 @@ impl PooledConnection {
175206 Instant :: now ( ) . duration_since ( available_time) >= max_idle_time
176207 }
177208
178- /// Nullifies the internal state of this connection and returns it in a new [PooledConnection].
179- /// If a state is provided, then the new connection will contain that state; otherwise, this
180- /// connection's state will be cloned.
181- fn take ( & mut self , state : impl Into < Option < PooledConnectionState > > ) -> Self {
209+ /// Nullifies the internal state of this connection and returns it in a new [PooledConnection]
210+ /// with the given state.
211+ fn take ( & mut self , new_state : PooledConnectionState ) -> Self {
182212 Self {
183213 connection : self . connection . take ( ) ,
184214 generation : self . generation ,
185215 event_emitter : self . event_emitter . clone ( ) ,
186- state : state . into ( ) . unwrap_or_else ( || self . state . clone ( ) ) ,
216+ state : new_state ,
187217 }
188218 }
189219
@@ -196,7 +226,9 @@ impl PooledConnection {
196226 self . id
197227 ) ) )
198228 }
199- PooledConnectionState :: CheckedOut { ref pool_manager } => {
229+ PooledConnectionState :: CheckedOut {
230+ ref pool_manager, ..
231+ } => {
200232 let ( tx, rx) = mpsc:: channel ( 1 ) ;
201233 self . state = PooledConnectionState :: Pinned {
202234 // Mark the connection as in-use while the operation currently using the
@@ -286,10 +318,11 @@ impl Drop for PooledConnection {
286318 // Nothing needs to be done when a checked-in connection is dropped.
287319 PooledConnectionState :: CheckedIn { .. } => Ok ( ( ) ) ,
288320 // A checked-out connection should be sent back to the connection pool.
289- PooledConnectionState :: CheckedOut { pool_manager } => {
321+ PooledConnectionState :: CheckedOut { pool_manager, .. } => {
290322 let pool_manager = pool_manager. clone ( ) ;
291- let mut dropped_connection = self . take ( None ) ;
292- dropped_connection. mark_checked_in ( ) ;
323+ let dropped_connection = self . take ( PooledConnectionState :: CheckedIn {
324+ available_time : Instant :: now ( ) ,
325+ } ) ;
293326 pool_manager. check_in ( dropped_connection)
294327 }
295328 // A pinned connection should be returned to its pinner or to the connection pool.
@@ -339,7 +372,11 @@ impl Drop for PooledConnection {
339372 }
340373 // The pinner of this connection has been dropped while the connection was
341374 // sitting in its channel, so the connection should be returned to the pool.
342- PinnedState :: Returned { .. } => pool_manager. check_in ( self . take ( None ) ) ,
375+ PinnedState :: Returned { .. } => {
376+ pool_manager. check_in ( self . take ( PooledConnectionState :: CheckedIn {
377+ available_time : Instant :: now ( ) ,
378+ } ) )
379+ }
343380 }
344381 }
345382 } ;
0 commit comments