66// accordance with one or both of these licenses.
77
88//! Objects related to [`SqliteStore`] live here.
9+ use std:: boxed:: Box ;
10+ use std:: collections:: HashMap ;
911use std:: fs;
12+ use std:: future:: Future ;
1013use std:: path:: PathBuf ;
14+ use std:: pin:: Pin ;
15+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
1116use std:: sync:: { Arc , Mutex } ;
1217
1318use lightning:: io;
14- use lightning:: util:: persist:: KVStoreSync ;
19+ use lightning:: util:: persist:: { KVStore , KVStoreSync } ;
1520use lightning_types:: string:: PrintableString ;
1621use rusqlite:: { named_params, Connection } ;
1722
@@ -38,6 +43,10 @@ const SCHEMA_USER_VERSION: u16 = 2;
3843/// [SQLite]: https://sqlite.org
3944pub struct SqliteStore {
4045 inner : Arc < SqliteStoreInner > ,
46+
47+ // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
48+ // operations aren't sensitive to the order of execution.
49+ next_write_version : AtomicU64 ,
4150}
4251
4352impl SqliteStore {
@@ -51,7 +60,27 @@ impl SqliteStore {
5160 data_dir : PathBuf , db_file_name : Option < String > , kv_table_name : Option < String > ,
5261 ) -> io:: Result < Self > {
5362 let inner = Arc :: new ( SqliteStoreInner :: new ( data_dir, db_file_name, kv_table_name) ?) ;
54- Ok ( Self { inner } )
63+ let next_write_version = AtomicU64 :: new ( 1 ) ;
64+ Ok ( Self { inner, next_write_version } )
65+ }
66+
67+ fn build_locking_key (
68+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
69+ ) -> String {
70+ format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, key)
71+ }
72+
73+ fn get_new_version_and_lock_ref ( & self , locking_key : String ) -> ( Arc < Mutex < u64 > > , u64 ) {
74+ let version = self . next_write_version . fetch_add ( 1 , Ordering :: Relaxed ) ;
75+ if version == u64:: MAX {
76+ panic ! ( "SqliteStore version counter overflowed" ) ;
77+ }
78+
79+ // Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
80+ // cleaning up unused locks.
81+ let inner_lock_ref = self . inner . get_inner_lock_ref ( locking_key) ;
82+
83+ ( inner_lock_ref, version)
5584 }
5685
5786 /// Returns the data directory.
@@ -60,6 +89,99 @@ impl SqliteStore {
6089 }
6190}
6291
92+ impl KVStore for SqliteStore {
93+ fn read (
94+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
95+ ) -> Pin < Box < dyn Future < Output = Result < Vec < u8 > , io:: Error > > + Send > > {
96+ let primary_namespace = primary_namespace. to_string ( ) ;
97+ let secondary_namespace = secondary_namespace. to_string ( ) ;
98+ let key = key. to_string ( ) ;
99+ let inner = Arc :: clone ( & self . inner ) ;
100+ let fut = tokio:: task:: spawn_blocking ( move || {
101+ inner. read_internal ( & primary_namespace, & secondary_namespace, & key)
102+ } ) ;
103+ Box :: pin ( async move {
104+ fut. await . unwrap_or_else ( |e| {
105+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
106+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
107+ } )
108+ } )
109+ }
110+
111+ fn write (
112+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
113+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
114+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
115+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
116+ let primary_namespace = primary_namespace. to_string ( ) ;
117+ let secondary_namespace = secondary_namespace. to_string ( ) ;
118+ let key = key. to_string ( ) ;
119+ let inner = Arc :: clone ( & self . inner ) ;
120+ let fut = tokio:: task:: spawn_blocking ( move || {
121+ inner. write_internal (
122+ inner_lock_ref,
123+ locking_key,
124+ version,
125+ & primary_namespace,
126+ & secondary_namespace,
127+ & key,
128+ buf,
129+ )
130+ } ) ;
131+ Box :: pin ( async move {
132+ fut. await . unwrap_or_else ( |e| {
133+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
134+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
135+ } )
136+ } )
137+ }
138+
139+ fn remove (
140+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
141+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
142+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
143+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
144+ let primary_namespace = primary_namespace. to_string ( ) ;
145+ let secondary_namespace = secondary_namespace. to_string ( ) ;
146+ let key = key. to_string ( ) ;
147+ let inner = Arc :: clone ( & self . inner ) ;
148+ let fut = tokio:: task:: spawn_blocking ( move || {
149+ inner. remove_internal (
150+ inner_lock_ref,
151+ locking_key,
152+ version,
153+ & primary_namespace,
154+ & secondary_namespace,
155+ & key,
156+ lazy,
157+ )
158+ } ) ;
159+ Box :: pin ( async move {
160+ fut. await . unwrap_or_else ( |e| {
161+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
162+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
163+ } )
164+ } )
165+ }
166+
167+ fn list (
168+ & self , primary_namespace : & str , secondary_namespace : & str ,
169+ ) -> Pin < Box < dyn Future < Output = Result < Vec < String > , io:: Error > > + Send > > {
170+ let primary_namespace = primary_namespace. to_string ( ) ;
171+ let secondary_namespace = secondary_namespace. to_string ( ) ;
172+ let inner = Arc :: clone ( & self . inner ) ;
173+ let fut = tokio:: task:: spawn_blocking ( move || {
174+ inner. list_internal ( & primary_namespace, & secondary_namespace)
175+ } ) ;
176+ Box :: pin ( async move {
177+ fut. await . unwrap_or_else ( |e| {
178+ let msg = format ! ( "Failed to IO operation due join error: {}" , e) ;
179+ Err ( io:: Error :: new ( io:: ErrorKind :: Other , msg) )
180+ } )
181+ } )
182+ }
183+ }
184+
63185impl KVStoreSync for SqliteStore {
64186 fn read (
65187 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
@@ -70,13 +192,33 @@ impl KVStoreSync for SqliteStore {
70192 fn write (
71193 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
72194 ) -> io:: Result < ( ) > {
73- self . inner . write_internal ( primary_namespace, secondary_namespace, key, buf)
195+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
196+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
197+ self . inner . write_internal (
198+ inner_lock_ref,
199+ locking_key,
200+ version,
201+ primary_namespace,
202+ secondary_namespace,
203+ key,
204+ buf,
205+ )
74206 }
75207
76208 fn remove (
77209 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
78210 ) -> io:: Result < ( ) > {
79- self . inner . remove_internal ( primary_namespace, secondary_namespace, key, lazy)
211+ let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
212+ let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
213+ self . inner . remove_internal (
214+ inner_lock_ref,
215+ locking_key,
216+ version,
217+ primary_namespace,
218+ secondary_namespace,
219+ key,
220+ lazy,
221+ )
80222 }
81223
82224 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
@@ -88,6 +230,7 @@ struct SqliteStoreInner {
88230 connection : Arc < Mutex < Connection > > ,
89231 data_dir : PathBuf ,
90232 kv_table_name : String ,
233+ write_version_locks : Mutex < HashMap < String , Arc < Mutex < u64 > > > > ,
91234}
92235
93236impl SqliteStoreInner {
@@ -161,7 +304,13 @@ impl SqliteStoreInner {
161304 } ) ?;
162305
163306 let connection = Arc :: new ( Mutex :: new ( connection) ) ;
164- Ok ( Self { connection, data_dir, kv_table_name } )
307+ let write_version_locks = Mutex :: new ( HashMap :: new ( ) ) ;
308+ Ok ( Self { connection, data_dir, kv_table_name, write_version_locks } )
309+ }
310+
311+ fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < Mutex < u64 > > {
312+ let mut outer_lock = self . write_version_locks . lock ( ) . unwrap ( ) ;
313+ Arc :: clone ( & outer_lock. entry ( locking_key) . or_default ( ) )
165314 }
166315
167316 fn read_internal (
@@ -213,71 +362,77 @@ impl SqliteStoreInner {
213362 }
214363
215364 fn write_internal (
216- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
365+ & self , inner_lock_ref : Arc < Mutex < u64 > > , locking_key : String , version : u64 ,
366+ primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
217367 ) -> io:: Result < ( ) > {
218368 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
219369
220- let locked_conn = self . connection . lock ( ) . unwrap ( ) ;
370+ self . execute_locked_write ( inner_lock_ref, locking_key, version, || {
371+ let locked_conn = self . connection . lock ( ) . unwrap ( ) ;
221372
222- let sql = format ! (
223- "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);" ,
224- self . kv_table_name
225- ) ;
373+ let sql = format ! (
374+ "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);" ,
375+ self . kv_table_name
376+ ) ;
226377
227- let mut stmt = locked_conn. prepare_cached ( & sql) . map_err ( |e| {
228- let msg = format ! ( "Failed to prepare statement: {}" , e) ;
229- io:: Error :: new ( io:: ErrorKind :: Other , msg)
230- } ) ?;
378+ let mut stmt = locked_conn. prepare_cached ( & sql) . map_err ( |e| {
379+ let msg = format ! ( "Failed to prepare statement: {}" , e) ;
380+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
381+ } ) ?;
231382
232- stmt. execute ( named_params ! {
233- ":primary_namespace" : primary_namespace,
234- ":secondary_namespace" : secondary_namespace,
235- ":key" : key,
236- ":value" : buf,
237- } )
238- . map ( |_| ( ) )
239- . map_err ( |e| {
240- let msg = format ! (
241- "Failed to write to key {}/{}/{}: {}" ,
242- PrintableString ( primary_namespace) ,
243- PrintableString ( secondary_namespace) ,
244- PrintableString ( key) ,
245- e
246- ) ;
247- io:: Error :: new ( io:: ErrorKind :: Other , msg)
383+ stmt. execute ( named_params ! {
384+ ":primary_namespace" : primary_namespace,
385+ ":secondary_namespace" : secondary_namespace,
386+ ":key" : key,
387+ ":value" : buf,
388+ } )
389+ . map ( |_| ( ) )
390+ . map_err ( |e| {
391+ let msg = format ! (
392+ "Failed to write to key {}/{}/{}: {}" ,
393+ PrintableString ( primary_namespace) ,
394+ PrintableString ( secondary_namespace) ,
395+ PrintableString ( key) ,
396+ e
397+ ) ;
398+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
399+ } )
248400 } )
249401 }
250402
251403 fn remove_internal (
252- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
404+ & self , inner_lock_ref : Arc < Mutex < u64 > > , locking_key : String , version : u64 ,
405+ primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
253406 ) -> io:: Result < ( ) > {
254407 check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "remove" ) ?;
255408
256- let locked_conn = self . connection . lock ( ) . unwrap ( ) ;
409+ self . execute_locked_write ( inner_lock_ref, locking_key, version, || {
410+ let locked_conn = self . connection . lock ( ) . unwrap ( ) ;
257411
258- let sql = format ! ( "DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;" , self . kv_table_name) ;
412+ let sql = format ! ( "DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;" , self . kv_table_name) ;
259413
260- let mut stmt = locked_conn. prepare_cached ( & sql) . map_err ( |e| {
261- let msg = format ! ( "Failed to prepare statement: {}" , e) ;
262- io:: Error :: new ( io:: ErrorKind :: Other , msg)
263- } ) ?;
414+ let mut stmt = locked_conn. prepare_cached ( & sql) . map_err ( |e| {
415+ let msg = format ! ( "Failed to prepare statement: {}" , e) ;
416+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
417+ } ) ?;
264418
265- stmt. execute ( named_params ! {
266- ":primary_namespace" : primary_namespace,
267- ":secondary_namespace" : secondary_namespace,
268- ":key" : key,
419+ stmt. execute ( named_params ! {
420+ ":primary_namespace" : primary_namespace,
421+ ":secondary_namespace" : secondary_namespace,
422+ ":key" : key,
423+ } )
424+ . map_err ( |e| {
425+ let msg = format ! (
426+ "Failed to delete key {}/{}/{}: {}" ,
427+ PrintableString ( primary_namespace) ,
428+ PrintableString ( secondary_namespace) ,
429+ PrintableString ( key) ,
430+ e
431+ ) ;
432+ io:: Error :: new ( io:: ErrorKind :: Other , msg)
433+ } ) ?;
434+ Ok ( ( ) )
269435 } )
270- . map_err ( |e| {
271- let msg = format ! (
272- "Failed to delete key {}/{}/{}: {}" ,
273- PrintableString ( primary_namespace) ,
274- PrintableString ( secondary_namespace) ,
275- PrintableString ( key) ,
276- e
277- ) ;
278- io:: Error :: new ( io:: ErrorKind :: Other , msg)
279- } ) ?;
280- Ok ( ( ) )
281436 }
282437
283438 fn list_internal (
@@ -320,6 +475,46 @@ impl SqliteStoreInner {
320475
321476 Ok ( keys)
322477 }
478+
479+ fn execute_locked_write < F : FnOnce ( ) -> Result < ( ) , lightning:: io:: Error > > (
480+ & self , inner_lock_ref : Arc < Mutex < u64 > > , locking_key : String , version : u64 , callback : F ,
481+ ) -> Result < ( ) , lightning:: io:: Error > {
482+ let res = {
483+ let mut last_written_version = inner_lock_ref. lock ( ) . unwrap ( ) ;
484+
485+ // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
486+ // consistency.
487+ let is_stale_version = version <= * last_written_version;
488+
489+ // If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
490+ if is_stale_version {
491+ Ok ( ( ) )
492+ } else {
493+ callback ( ) . map ( |_| {
494+ * last_written_version = version;
495+ } )
496+ }
497+ } ;
498+
499+ self . clean_locks ( & inner_lock_ref, locking_key) ;
500+
501+ res
502+ }
503+
504+ fn clean_locks ( & self , inner_lock_ref : & Arc < Mutex < u64 > > , locking_key : String ) {
505+ // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
506+ // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
507+ // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
508+ // counted.
509+ let mut outer_lock = self . write_version_locks . lock ( ) . unwrap ( ) ;
510+
511+ let strong_count = Arc :: strong_count ( & inner_lock_ref) ;
512+ debug_assert ! ( strong_count >= 2 , "Unexpected SqliteStore strong count" ) ;
513+
514+ if strong_count == 2 {
515+ outer_lock. remove ( & locking_key) ;
516+ }
517+ }
323518}
324519
325520#[ cfg( test) ]
0 commit comments