1- use std:: { borrow:: Cow , collections:: HashMap , time:: Duration } ;
1+ use std:: { borrow:: Cow , collections:: HashMap , sync :: Arc , time:: Duration } ;
22
33use bson:: Document ;
44use serde:: Deserialize ;
@@ -7,11 +7,25 @@ use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
77use crate :: {
88 bson:: { doc, Bson } ,
99 error:: { CommandError , Error , ErrorKind } ,
10+ hello:: LEGACY_HELLO_COMMAND_NAME ,
1011 options:: { AuthMechanism , ClientOptions , Credential , ListDatabasesOptions , ServerAddress } ,
1112 runtime,
1213 selection_criteria:: { ReadPreference , ReadPreferenceOptions , SelectionCriteria } ,
13- test:: { log_uncaptured, util:: TestClient , CLIENT_OPTIONS , LOCK } ,
14+ test:: {
15+ log_uncaptured,
16+ util:: TestClient ,
17+ CmapEvent ,
18+ Event ,
19+ EventHandler ,
20+ FailCommandOptions ,
21+ FailPoint ,
22+ FailPointMode ,
23+ SdamEvent ,
24+ CLIENT_OPTIONS ,
25+ LOCK ,
26+ } ,
1427 Client ,
28+ ServerType ,
1529} ;
1630
1731#[ derive( Debug , Deserialize ) ]
@@ -691,3 +705,134 @@ async fn plain_auth() {
691705 }
692706 ) ;
693707}
708+
709+ /// Test verifies that retrying a commitTransaction operation after a checkOut
710+ /// failure works.
711+ #[ cfg_attr( feature = "tokio-runtime" , tokio:: test( flavor = "multi_thread" ) ) ]
712+ #[ cfg_attr( feature = "async-std-runtime" , async_std:: test) ]
713+ async fn retry_commit_txn_check_out ( ) {
714+ let _guard: RwLockWriteGuard < _ > = LOCK . run_exclusively ( ) . await ;
715+
716+ let setup_client = TestClient :: new ( ) . await ;
717+ if !setup_client. is_replica_set ( ) {
718+ log_uncaptured ( "skipping retry_commit_txn_check_out due to non-replicaset topology" ) ;
719+ return ;
720+ }
721+
722+ if !setup_client. supports_transactions ( ) {
723+ log_uncaptured ( "skipping retry_commit_txn_check_out due to lack of transaction support" ) ;
724+ return ;
725+ }
726+
727+ if !setup_client. supports_fail_command_appname_initial_handshake ( ) {
728+ log_uncaptured (
729+ "skipping retry_commit_txn_check_out due to insufficient failCommand support" ,
730+ ) ;
731+ return ;
732+ }
733+
734+ // ensure namespace exists
735+ setup_client
736+ . database ( "retry_commit_txn_check_out" )
737+ . collection ( "retry_commit_txn_check_out" )
738+ . insert_one ( doc ! { } , None )
739+ . await
740+ . unwrap ( ) ;
741+
742+ let mut options = CLIENT_OPTIONS . get ( ) . await . clone ( ) ;
743+ let handler = Arc :: new ( EventHandler :: new ( ) ) ;
744+ options. cmap_event_handler = Some ( handler. clone ( ) ) ;
745+ options. sdam_event_handler = Some ( handler. clone ( ) ) ;
746+ options. heartbeat_freq = Some ( Duration :: from_secs ( 120 ) ) ;
747+ options. app_name = Some ( "retry_commit_txn_check_out" . to_string ( ) ) ;
748+ let client = Client :: with_options ( options) . unwrap ( ) ;
749+
750+ let mut session = client. start_session ( None ) . await . unwrap ( ) ;
751+ session. start_transaction ( None ) . await . unwrap ( ) ;
752+ // transition transaction to "in progress" so that the commit
753+ // actually executes an operation.
754+ client
755+ . database ( "retry_commit_txn_check_out" )
756+ . collection ( "retry_commit_txn_check_out" )
757+ . insert_one_with_session ( doc ! { } , None , & mut session)
758+ . await
759+ . unwrap ( ) ;
760+
761+ // enable a fail point that clears the connection pools so that
762+ // commitTransaction will create a new connection during check out.
763+ let fp = FailPoint :: fail_command (
764+ & [ "ping" ] ,
765+ FailPointMode :: Times ( 1 ) ,
766+ FailCommandOptions :: builder ( ) . error_code ( 11600 ) . build ( ) ,
767+ ) ;
768+ let _guard = setup_client. enable_failpoint ( fp, None ) . await . unwrap ( ) ;
769+
770+ let mut subscriber = handler. subscribe ( ) ;
771+ client
772+ . database ( "foo" )
773+ . run_command ( doc ! { "ping" : 1 } , None )
774+ . await
775+ . unwrap_err ( ) ;
776+
777+ // failing with a state change error will request an immediate check
778+ // wait for the mark unknown and subsequent succeeded heartbeat
779+ let mut primary = None ;
780+ subscriber
781+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
782+ if let Event :: Sdam ( SdamEvent :: ServerDescriptionChanged ( event) ) = e {
783+ if event. is_marked_unknown_event ( ) {
784+ primary = Some ( event. address . clone ( ) ) ;
785+ return true ;
786+ }
787+ }
788+ false
789+ } )
790+ . await
791+ . expect ( "should see marked unknown event" ) ;
792+
793+ subscriber
794+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
795+ if let Event :: Sdam ( SdamEvent :: ServerDescriptionChanged ( event) ) = e {
796+ if & event. address == primary. as_ref ( ) . unwrap ( )
797+ && event. previous_description . server_type ( ) == ServerType :: Unknown
798+ {
799+ return true ;
800+ }
801+ }
802+ false
803+ } )
804+ . await
805+ . expect ( "should see mark available event" ) ;
806+
807+ // enable a failpoint on the handshake to cause check_out
808+ // to fail with a retryable error
809+ let fp = FailPoint :: fail_command (
810+ & [ LEGACY_HELLO_COMMAND_NAME , "hello" ] ,
811+ FailPointMode :: Times ( 1 ) ,
812+ FailCommandOptions :: builder ( )
813+ . error_code ( 11600 )
814+ . app_name ( "retry_commit_txn_check_out" . to_string ( ) )
815+ . build ( ) ,
816+ ) ;
817+ let _guard2 = setup_client. enable_failpoint ( fp, None ) . await . unwrap ( ) ;
818+
819+ // finally, attempt the commit.
820+ // this should succeed due to retry
821+ session. commit_transaction ( ) . await . unwrap ( ) ;
822+
823+ // ensure the first check out attempt fails
824+ subscriber
825+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
826+ matches ! ( e, Event :: Cmap ( CmapEvent :: ConnectionCheckOutFailed ( _) ) )
827+ } )
828+ . await
829+ . expect ( "should see check out failed event" ) ;
830+
831+ // ensure the second one succeeds
832+ subscriber
833+ . wait_for_event ( Duration :: from_secs ( 1 ) , |e| {
834+ matches ! ( e, Event :: Cmap ( CmapEvent :: ConnectionCheckedOut ( _) ) )
835+ } )
836+ . await
837+ . expect ( "should see checked out event" ) ;
838+ }
0 commit comments