@@ -20,7 +20,6 @@ use lightning::io::{self, Error, ErrorKind};
2020use lightning:: util:: persist:: { KVStore , KVStoreSync } ;
2121use prost:: Message ;
2222use rand:: RngCore ;
23- use tokio:: sync:: RwLock ;
2423use vss_client:: client:: VssClient ;
2524use vss_client:: error:: VssError ;
2625use vss_client:: headers:: VssHeaderProvider ;
@@ -75,7 +74,9 @@ impl VssStore {
7574 }
7675 }
7776
78- fn get_new_version_and_lock_ref ( & self , locking_key : String ) -> ( Arc < RwLock < u64 > > , u64 ) {
77+ fn get_new_version_and_lock_ref (
78+ & self , locking_key : String ,
79+ ) -> ( Arc < tokio:: sync:: Mutex < u64 > > , u64 ) {
7980 let version = self . next_version . fetch_add ( 1 , Ordering :: Relaxed ) ;
8081 if version == u64:: MAX {
8182 panic ! ( "VssStore version counter overflowed" ) ;
@@ -212,7 +213,7 @@ struct VssStoreInner {
212213 key_obfuscator : KeyObfuscator ,
213214 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
214215 // The lock also encapsulates the latest written version per key.
215- locks : Mutex < HashMap < String , Arc < RwLock < u64 > > > > ,
216+ locks : Mutex < HashMap < String , Arc < tokio :: sync :: Mutex < u64 > > > > ,
216217}
217218
218219impl VssStoreInner {
@@ -242,7 +243,7 @@ impl VssStoreInner {
242243 Self { client, store_id, storable_builder, key_obfuscator, locks }
243244 }
244245
245- fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < RwLock < u64 > > {
246+ fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio :: sync :: Mutex < u64 > > {
246247 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
247248 Arc :: clone ( & outer_lock. entry ( locking_key) . or_default ( ) )
248249 }
@@ -332,7 +333,7 @@ impl VssStoreInner {
332333 }
333334
334335 async fn write_internal (
335- & self , inner_lock_ref : Arc < RwLock < u64 > > , locking_key : String , version : u64 ,
336+ & self , inner_lock_ref : Arc < tokio :: sync :: Mutex < u64 > > , locking_key : String , version : u64 ,
336337 primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
337338 ) -> io:: Result < ( ) > {
338339 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
@@ -367,7 +368,7 @@ impl VssStoreInner {
367368 }
368369
369370 async fn remove_internal (
370- & self , inner_lock_ref : Arc < RwLock < u64 > > , locking_key : String , version : u64 ,
371+ & self , inner_lock_ref : Arc < tokio :: sync :: Mutex < u64 > > , locking_key : String , version : u64 ,
371372 primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
372373 ) -> io:: Result < ( ) > {
373374 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "remove" ) ?;
@@ -414,10 +415,11 @@ impl VssStoreInner {
414415 F : Future < Output = Result < ( ) , lightning:: io:: Error > > ,
415416 FN : FnOnce ( ) -> F ,
416417 > (
417- & self , inner_lock_ref : Arc < RwLock < u64 > > , locking_key : String , version : u64 , callback : FN ,
418+ & self , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > , locking_key : String , version : u64 ,
419+ callback : FN ,
418420 ) -> Result < ( ) , lightning:: io:: Error > {
419421 let res = {
420- let mut last_written_version = inner_lock_ref. write ( ) . await ;
422+ let mut last_written_version = inner_lock_ref. lock ( ) . await ;
421423
422424 // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
423425 // consistency.
@@ -438,7 +440,7 @@ impl VssStoreInner {
438440 res
439441 }
440442
441- fn clean_locks ( & self , inner_lock_ref : & Arc < RwLock < u64 > > , locking_key : String ) {
443+ fn clean_locks ( & self , inner_lock_ref : & Arc < tokio :: sync :: Mutex < u64 > > , locking_key : String ) {
442444 // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
443445 // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
444446 // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
0 commit comments