@@ -6,13 +6,18 @@ pub mod options;
66pub mod session;
77
88use std:: {
9- sync:: Arc ,
9+ sync:: {
10+ atomic:: { AtomicBool , Ordering } ,
11+ Mutex as SyncMutex ,
12+ } ,
1013 time:: { Duration , Instant } ,
1114} ;
1215
1316#[ cfg( feature = "in-use-encryption-unstable" ) ]
1417pub use self :: csfle:: client_builder:: * ;
1518use derivative:: Derivative ;
19+ use futures_core:: { future:: BoxFuture , Future } ;
20+ use futures_util:: { future:: join_all, FutureExt } ;
1621
1722#[ cfg( test) ]
1823use crate :: options:: ServerAddress ;
@@ -36,6 +41,7 @@ use crate::{
3641 db:: Database ,
3742 error:: { Error , ErrorKind , Result } ,
3843 event:: command:: { handle_command_event, CommandEvent } ,
44+ id_set:: IdSet ,
3945 operation:: { AggregateTarget , ListDatabases } ,
4046 options:: {
4147 ClientOptions ,
@@ -47,6 +53,7 @@ use crate::{
4753 } ,
4854 results:: DatabaseSpecification ,
4955 sdam:: { server_selection, SelectedServer , Topology } ,
56+ tracking_arc:: TrackingArc ,
5057 ClientSession ,
5158} ;
5259
@@ -103,9 +110,25 @@ const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
103110/// driver does not set ``tcp_keepalive_intvl``. See the
104111/// [MongoDB Diagnostics FAQ keepalive section](https://www.mongodb.com/docs/manual/faq/diagnostics/#does-tcp-keepalive-time-affect-mongodb-deployments)
105112/// for instructions on setting these values at the system level.
106- #[ derive( Clone , Debug ) ]
113+ ///
114+ /// ## Clean shutdown
115+ /// Because Rust has no async equivalent of `Drop`, values that require server-side cleanup when
116+ /// dropped spawn a new async task to perform that cleanup. This can cause two potential issues:
117+ ///
118+ /// * Drop tasks pending or in progress when the async runtime shuts down may not complete, causing
119+ /// server-side resources to not be freed.
120+ /// * Drop tasks may run at an arbitrary time even after no `Client` values exist, making it hard to
121+ /// reason about associated resources (e.g. event handlers).
122+ ///
123+ /// To address these issues, we highly recommend you use [`Client::shutdown`] in the termination
124+ /// path of your application. This will ensure that outstanding resources have been cleaned up and
125+ /// terminate internal worker tasks before returning. Please note that `shutdown` will wait for
126+ /// _all_ outstanding resource handles to be dropped, so they must either have been dropped before
127+ /// calling `shutdown` or in a concurrent task; see the documentation of `shutdown` for more
128+ /// details.
129+ #[ derive( Debug , Clone ) ]
107130pub struct Client {
108- inner : Arc < ClientInner > ,
131+ inner : TrackingArc < ClientInner > ,
109132}
110133
111134#[ allow( dead_code, unreachable_code, clippy:: diverging_sub_expression) ]
@@ -124,10 +147,17 @@ struct ClientInner {
124147 topology : Topology ,
125148 options : ClientOptions ,
126149 session_pool : ServerSessionPool ,
150+ shutdown : Shutdown ,
127151 #[ cfg( feature = "in-use-encryption-unstable" ) ]
128152 csfle : tokio:: sync:: RwLock < Option < csfle:: ClientState > > ,
129153}
130154
155+ #[ derive( Debug ) ]
156+ struct Shutdown {
157+ pending_drops : SyncMutex < IdSet < crate :: runtime:: AsyncJoinHandle < ( ) > > > ,
158+ executed : AtomicBool ,
159+ }
160+
131161impl Client {
132162 /// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
133163 /// MongoDB connection string.
@@ -144,12 +174,16 @@ impl Client {
144174 pub fn with_options ( options : ClientOptions ) -> Result < Self > {
145175 options. validate ( ) ?;
146176
147- let inner = Arc :: new ( ClientInner {
177+ let inner = TrackingArc :: new ( ClientInner {
148178 topology : Topology :: new ( options. clone ( ) ) ?,
149179 session_pool : ServerSessionPool :: new ( ) ,
180+ options,
181+ shutdown : Shutdown {
182+ pending_drops : SyncMutex :: new ( IdSet :: new ( ) ) ,
183+ executed : AtomicBool :: new ( false ) ,
184+ } ,
150185 #[ cfg( feature = "in-use-encryption-unstable" ) ]
151186 csfle : Default :: default ( ) ,
152- options,
153187 } ) ;
154188 Ok ( Self { inner } )
155189 }
@@ -461,6 +495,123 @@ impl Client {
461495 . await
462496 }
463497
498+ pub ( crate ) fn register_async_drop ( & self ) -> AsyncDropToken {
499+ let ( cleanup_tx, cleanup_rx) = tokio:: sync:: oneshot:: channel :: < BoxFuture < ' static , ( ) > > ( ) ;
500+ let ( id_tx, id_rx) = tokio:: sync:: oneshot:: channel :: < crate :: id_set:: Id > ( ) ;
501+ let weak = self . weak ( ) ;
502+ let handle = crate :: runtime:: spawn ( async move {
503+ // Unwrap safety: the id is sent immediately after task creation, with no
504+ // await points in between.
505+ let id = id_rx. await . unwrap ( ) ;
506+ // If the cleanup channel is closed, that task was dropped.
507+ let cleanup = if let Ok ( f) = cleanup_rx. await {
508+ f
509+ } else {
510+ return ;
511+ } ;
512+ cleanup. await ;
513+ if let Some ( client) = weak. upgrade ( ) {
514+ client
515+ . inner
516+ . shutdown
517+ . pending_drops
518+ . lock ( )
519+ . unwrap ( )
520+ . remove ( & id) ;
521+ }
522+ } ) ;
523+ let id = self
524+ . inner
525+ . shutdown
526+ . pending_drops
527+ . lock ( )
528+ . unwrap ( )
529+ . insert ( handle) ;
530+ let _ = id_tx. send ( id) ;
531+ AsyncDropToken {
532+ tx : Some ( cleanup_tx) ,
533+ }
534+ }
535+
536+ /// Shut down this `Client`, terminating background thread workers and closing connections.
537+ /// This will wait for any live handles to server-side resources (see below) to be
538+ /// dropped and any associated server-side operations to finish.
539+ ///
540+ /// IMPORTANT: Any live resource handles that are not dropped will cause this method to wait
541+ /// indefinitely. It's strongly recommended to structure your usage to avoid this, e.g. by
542+ /// only using those types in shorter-lived scopes than the `Client`. If this is not possible,
543+ /// see [`shutdown_immediate`](Client::shutdown_immediate). For example:
544+ ///
545+ /// ```rust
546+ /// # use mongodb::{Client, GridFsBucket, error::Result};
547+ /// async fn upload_data(bucket: &GridFsBucket) {
548+ /// let stream = bucket.open_upload_stream("test", None);
549+ /// // .. write to the stream ..
550+ /// }
551+ ///
552+ /// # async fn run() -> Result<()> {
553+ /// let client = Client::with_uri_str("mongodb://example.com").await?;
554+ /// let bucket = client.database("test").gridfs_bucket(None);
555+ /// upload_data(&bucket).await;
556+ /// client.shutdown().await;
557+ /// // Background cleanup work from `upload_data` is guaranteed to have run.
558+ /// # Ok(())
559+ /// # }
560+ /// ```
561+ ///
562+ /// If the handle is used in the same scope as `shutdown`, explicit `drop` may be needed:
563+ ///
564+ /// ```rust
565+ /// # use mongodb::{Client, error::Result};
566+ /// # async fn run() -> Result<()> {
567+ /// let client = Client::with_uri_str("mongodb://example.com").await?;
568+ /// let bucket = client.database("test").gridfs_bucket(None);
569+ /// let stream = bucket.open_upload_stream("test", None);
570+ /// // .. write to the stream ..
571+ /// drop(stream);
572+ /// client.shutdown().await;
573+ /// // Background cleanup work for `stream` is guaranteed to have run.
574+ /// # Ok(())
575+ /// # }
576+ /// ```
577+ ///
578+ /// Calling any methods on clones of this `Client` or derived handles after this will return
579+ /// errors.
580+ ///
581+ /// Handles to server-side resources are `Cursor`, `SessionCursor`, `Session`, or
582+ /// `GridFsUploadStream`.
583+ pub async fn shutdown ( self ) {
584+ // Subtle bug: if this is inlined into the `join_all(..)` call, Rust will extend the
585+ // lifetime of the temporary unnamed `MutexLock` until the end of the *statement*,
586+ // causing the lock to be held for the duration of the join, which deadlocks.
587+ let pending = self . inner . shutdown . pending_drops . lock ( ) . unwrap ( ) . extract ( ) ;
588+ join_all ( pending) . await ;
589+ self . shutdown_immediate ( ) . await ;
590+ }
591+
592+ /// Shut down this `Client`, terminating background thread workers and closing connections.
593+ /// This does *not* wait for other pending resources to be cleaned up, which may cause both
594+ /// client-side errors and server-side resource leaks. Calling any methods on clones of this
595+ /// `Client` or derived handles after this will return errors.
596+ ///
597+ /// ```rust
598+ /// # use mongodb::{Client, error::Result};
599+ /// # async fn run() -> Result<()> {
600+ /// let client = Client::with_uri_str("mongodb://example.com").await?;
601+ /// let bucket = client.database("test").gridfs_bucket(None);
602+ /// let stream = bucket.open_upload_stream("test", None);
603+ /// // .. write to the stream ..
604+ /// client.shutdown_immediate().await;
605+ /// // Background cleanup work for `stream` may or may not have run.
606+ /// # Ok(())
607+ /// # }
608+ /// ```
609+ pub async fn shutdown_immediate ( self ) {
610+ self . inner . topology . shutdown ( ) . await ;
611+ // This has to happen last to allow pending cleanup to execute commands.
612+ self . inner . shutdown . executed . store ( true , Ordering :: SeqCst ) ;
613+ }
614+
464615 /// Check in a server session to the server session pool. The session will be discarded if it is
465616 /// expired or dirty.
466617 pub ( crate ) async fn check_in_server_session ( & self , session : ServerSession ) {
@@ -630,10 +781,9 @@ impl Client {
630781 }
631782 }
632783
633- #[ cfg( feature = "in-use-encryption-unstable" ) ]
634784 pub ( crate ) fn weak ( & self ) -> WeakClient {
635785 WeakClient {
636- inner : Arc :: downgrade ( & self . inner ) ,
786+ inner : TrackingArc :: downgrade ( & self . inner ) ,
637787 }
638788 }
639789
@@ -653,16 +803,35 @@ impl Client {
653803 }
654804}
655805
656- #[ cfg( feature = "in-use-encryption-unstable" ) ]
657806#[ derive( Clone , Debug ) ]
658807pub ( crate ) struct WeakClient {
659- inner : std :: sync :: Weak < ClientInner > ,
808+ inner : crate :: tracking_arc :: Weak < ClientInner > ,
660809}
661810
662- #[ cfg( feature = "in-use-encryption-unstable" ) ]
663811impl WeakClient {
664- #[ allow( dead_code) ]
665812 pub ( crate ) fn upgrade ( & self ) -> Option < Client > {
666813 self . inner . upgrade ( ) . map ( |inner| Client { inner } )
667814 }
668815}
816+
817+ #[ derive( Derivative ) ]
818+ #[ derivative( Debug ) ]
819+ pub ( crate ) struct AsyncDropToken {
820+ #[ derivative( Debug = "ignore" ) ]
821+ tx : Option < tokio:: sync:: oneshot:: Sender < BoxFuture < ' static , ( ) > > > ,
822+ }
823+
824+ impl AsyncDropToken {
825+ pub ( crate ) fn spawn ( & mut self , fut : impl Future < Output = ( ) > + Send + ' static ) {
826+ if let Some ( tx) = self . tx . take ( ) {
827+ let _ = tx. send ( fut. boxed ( ) ) ;
828+ } else {
829+ #[ cfg( debug_assertions) ]
830+ panic ! ( "exhausted AsyncDropToken" ) ;
831+ }
832+ }
833+
834+ pub ( crate ) fn take ( & mut self ) -> Self {
835+ Self { tx : self . tx . take ( ) }
836+ }
837+ }
0 commit comments