@@ -20,7 +20,6 @@ use lightning::io::{self, Error, ErrorKind};
2020use lightning:: util:: persist:: { KVStore , KVStoreSync } ;
2121use prost:: Message ;
2222use rand:: RngCore ;
23- use tokio:: sync:: RwLock ;
2423use vss_client:: client:: VssClient ;
2524use vss_client:: error:: VssError ;
2625use vss_client:: headers:: VssHeaderProvider ;
@@ -75,7 +74,7 @@ impl VssStore {
7574 }
7675 }
7776
78- fn get_new_version_and_lock_ref ( & self , locking_key : String ) -> ( Arc < RwLock < u64 > > , u64 ) {
77+ fn get_new_version_and_lock_ref ( & self , locking_key : String ) -> ( Arc < tokio :: sync :: Mutex < u64 > > , u64 ) {
7978 let version = self . next_version . fetch_add ( 1 , Ordering :: Relaxed ) ;
8079 if version == u64:: MAX {
8180 panic ! ( "VssStore version counter overflowed" ) ;
@@ -212,7 +211,7 @@ struct VssStoreInner {
212211 key_obfuscator : KeyObfuscator ,
213212 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
214213 // The lock also encapsulates the latest written version per key.
215- locks : Mutex < HashMap < String , Arc < RwLock < u64 > > > > ,
214+ locks : Mutex < HashMap < String , Arc < tokio :: sync :: Mutex < u64 > > > > ,
216215}
217216
218217impl VssStoreInner {
@@ -242,7 +241,7 @@ impl VssStoreInner {
242241 Self { client, store_id, storable_builder, key_obfuscator, locks }
243242 }
244243
245- fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < RwLock < u64 > > {
244+ fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio :: sync :: Mutex < u64 > > {
246245 let mut outer_lock = self . locks . lock ( ) . unwrap ( ) ;
247246 Arc :: clone ( & outer_lock. entry ( locking_key) . or_default ( ) )
248247 }
@@ -332,7 +331,7 @@ impl VssStoreInner {
332331 }
333332
334333 async fn write_internal (
335- & self , inner_lock_ref : Arc < RwLock < u64 > > , locking_key : String , version : u64 ,
334+ & self , inner_lock_ref : Arc < tokio :: sync :: Mutex < u64 > > , locking_key : String , version : u64 ,
336335 primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
337336 ) -> io:: Result < ( ) > {
338337 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
@@ -367,7 +366,7 @@ impl VssStoreInner {
367366 }
368367
369368 async fn remove_internal (
370- & self , inner_lock_ref : Arc < RwLock < u64 > > , locking_key : String , version : u64 ,
369+ & self , inner_lock_ref : Arc < tokio :: sync :: Mutex < u64 > > , locking_key : String , version : u64 ,
371370 primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
372371 ) -> io:: Result < ( ) > {
373372 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "remove" ) ?;
@@ -414,10 +413,10 @@ impl VssStoreInner {
414413 F : Future < Output = Result < ( ) , lightning:: io:: Error > > ,
415414 FN : FnOnce ( ) -> F ,
416415 > (
417- & self , inner_lock_ref : Arc < RwLock < u64 > > , locking_key : String , version : u64 , callback : FN ,
416+ & self , inner_lock_ref : Arc < tokio :: sync :: Mutex < u64 > > , locking_key : String , version : u64 , callback : FN ,
418417 ) -> Result < ( ) , lightning:: io:: Error > {
419418 let res = {
420- let mut last_written_version = inner_lock_ref. write ( ) . await ;
419+ let mut last_written_version = inner_lock_ref. lock ( ) . await ;
421420
422421 // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
423422 // consistency.
@@ -438,7 +437,7 @@ impl VssStoreInner {
438437 res
439438 }
440439
441- fn clean_locks ( & self , inner_lock_ref : & Arc < RwLock < u64 > > , locking_key : String ) {
440+ fn clean_locks ( & self , inner_lock_ref : & Arc < tokio :: sync :: Mutex < u64 > > , locking_key : String ) {
442441 // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
443442 // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
444443 // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
0 commit comments