@@ -17,6 +17,9 @@ use crate::lsps5::msgs::{
1717 SetWebhookRequest , SetWebhookResponse , WebhookNotification , WebhookNotificationMethod ,
1818} ;
1919use crate :: message_queue:: MessageQueue ;
20+ use crate :: persist:: {
21+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE , LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
22+ } ;
2023use crate :: prelude:: * ;
2124use crate :: sync:: { Arc , Mutex , RwLock , RwLockWriteGuard } ;
2225use crate :: utils:: time:: TimeProvider ;
@@ -28,6 +31,8 @@ use lightning::ln::channelmanager::AChannelManager;
2831use lightning:: ln:: msgs:: { ErrorAction , LightningError } ;
2932use lightning:: sign:: NodeSigner ;
3033use lightning:: util:: logger:: Level ;
34+ use lightning:: util:: persist:: KVStore ;
35+ use lightning:: util:: ser:: Writeable ;
3136
3237use core:: ops:: Deref ;
3338use core:: time:: Duration ;
@@ -118,10 +123,11 @@ impl Default for LSPS5ServiceConfig {
118123/// [`LSPS5ServiceEvent::SendWebhookNotification`]: super::event::LSPS5ServiceEvent::SendWebhookNotification
119124/// [`app_name`]: super::msgs::LSPS5AppName
120125/// [`lsps5.webhook_registered`]: super::msgs::WebhookNotificationMethod::LSPS5WebhookRegistered
121- pub struct LSPS5ServiceHandler < CM : Deref , NS : Deref , TP : Deref >
126+ pub struct LSPS5ServiceHandler < CM : Deref , NS : Deref , K : Deref + Clone , TP : Deref >
122127where
123128 CM :: Target : AChannelManager ,
124129 NS :: Target : NodeSigner ,
130+ K :: Target : KVStore ,
125131 TP :: Target : TimeProvider ,
126132{
127133 config : LSPS5ServiceConfig ,
@@ -131,19 +137,21 @@ where
131137 time_provider : TP ,
132138 channel_manager : CM ,
133139 node_signer : NS ,
140+ kv_store : K ,
134141 last_pruning : Mutex < Option < LSPSDateTime > > ,
135142}
136143
137- impl < CM : Deref , NS : Deref , TP : Deref > LSPS5ServiceHandler < CM , NS , TP >
144+ impl < CM : Deref , NS : Deref , K : Deref + Clone , TP : Deref > LSPS5ServiceHandler < CM , NS , K , TP >
138145where
139146 CM :: Target : AChannelManager ,
140147 NS :: Target : NodeSigner ,
148+ K :: Target : KVStore ,
141149 TP :: Target : TimeProvider ,
142150{
143151 /// Constructs a `LSPS5ServiceHandler` using the given time provider.
144152 pub ( crate ) fn new_with_time_provider (
145153 event_queue : Arc < EventQueue > , pending_messages : Arc < MessageQueue > , channel_manager : CM ,
146- node_signer : NS , config : LSPS5ServiceConfig , time_provider : TP ,
154+ kv_store : K , node_signer : NS , config : LSPS5ServiceConfig , time_provider : TP ,
147155 ) -> Self {
148156 assert ! ( config. max_webhooks_per_client > 0 , "`max_webhooks_per_client` must be > 0" ) ;
149157 Self {
@@ -154,6 +162,7 @@ where
154162 time_provider,
155163 channel_manager,
156164 node_signer,
165+ kv_store,
157166 last_pruning : Mutex :: new ( None ) ,
158167 }
159168 }
@@ -186,6 +195,51 @@ where
186195 }
187196 }
188197
198+ async fn persist_peer_state (
199+ & self , counterparty_node_id : PublicKey ,
200+ ) -> Result < ( ) , lightning:: io:: Error > {
201+ let fut = {
202+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
203+ let encoded = match outer_state_lock. get ( & counterparty_node_id) {
204+ None => {
205+ let err = lightning:: io:: Error :: new (
206+ lightning:: io:: ErrorKind :: Other ,
207+ "Failed to get peer entry" ,
208+ ) ;
209+ return Err ( err) ;
210+ } ,
211+ Some ( entry) => entry. encode ( ) ,
212+ } ;
213+
214+ let key = counterparty_node_id. to_string ( ) ;
215+
216+ self . kv_store . write (
217+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
218+ LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
219+ & key,
220+ encoded,
221+ )
222+ } ;
223+
224+ fut. await
225+ }
226+
227+ pub ( crate ) async fn persist ( & self ) -> Result < ( ) , lightning:: io:: Error > {
228+ // TODO: We should eventually persist in parallel, however, when we do, we probably want to
229+ // introduce some batching to upper-bound the number of requests inflight at any given
230+ // time.
231+ let need_persist: Vec < PublicKey > = {
232+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
233+ outer_state_lock. iter ( ) . filter_map ( |( k, v) | Some ( * k) ) . collect ( )
234+ } ;
235+
236+ for counterparty_node_id in need_persist. into_iter ( ) {
237+ self . persist_peer_state ( counterparty_node_id) . await ?;
238+ }
239+
240+ Ok ( ( ) )
241+ }
242+
189243 fn check_prune_stale_webhooks < ' a > (
190244 & self , outer_state_lock : & mut RwLockWriteGuard < ' a , HashMap < PublicKey , PeerState > > ,
191245 ) {
@@ -549,10 +603,12 @@ where
549603 }
550604}
551605
552- impl < CM : Deref , NS : Deref , TP : Deref > LSPSProtocolMessageHandler for LSPS5ServiceHandler < CM , NS , TP >
606+ impl < CM : Deref , NS : Deref , K : Deref + Clone , TP : Deref > LSPSProtocolMessageHandler
607+ for LSPS5ServiceHandler < CM , NS , K , TP >
553608where
554609 CM :: Target : AChannelManager ,
555610 NS :: Target : NodeSigner ,
611+ K :: Target : KVStore ,
556612 TP :: Target : TimeProvider ,
557613{
558614 type ProtocolMessage = LSPS5Message ;
0 commit comments