Skip to content

Commit c973613

Browse files
committed
Add LSPS5ServiceHandler persistence
We add simple `persist` call to `LSPS5ServiceHandler` that sequentially persist all the peer states under a key that encodes their node id.
1 parent 7444ac3 commit c973613

File tree

3 files changed

+76
-8
lines changed

3 files changed

+76
-8
lines changed

lightning-liquidity/src/lsps5/service.rs

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ use crate::lsps5::msgs::{
1717
SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod,
1818
};
1919
use crate::message_queue::MessageQueue;
20+
use crate::persist::{
21+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
22+
};
2023
use crate::prelude::*;
2124
use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
2225
use crate::utils::time::TimeProvider;
@@ -28,6 +31,8 @@ use lightning::ln::channelmanager::AChannelManager;
2831
use lightning::ln::msgs::{ErrorAction, LightningError};
2932
use lightning::sign::NodeSigner;
3033
use lightning::util::logger::Level;
34+
use lightning::util::persist::KVStore;
35+
use lightning::util::ser::Writeable;
3136

3237
use core::ops::Deref;
3338
use core::time::Duration;
@@ -118,10 +123,11 @@ impl Default for LSPS5ServiceConfig {
118123
/// [`LSPS5ServiceEvent::SendWebhookNotification`]: super::event::LSPS5ServiceEvent::SendWebhookNotification
119124
/// [`app_name`]: super::msgs::LSPS5AppName
120125
/// [`lsps5.webhook_registered`]: super::msgs::WebhookNotificationMethod::LSPS5WebhookRegistered
121-
pub struct LSPS5ServiceHandler<CM: Deref, NS: Deref, TP: Deref>
126+
pub struct LSPS5ServiceHandler<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref>
122127
where
123128
CM::Target: AChannelManager,
124129
NS::Target: NodeSigner,
130+
K::Target: KVStore,
125131
TP::Target: TimeProvider,
126132
{
127133
config: LSPS5ServiceConfig,
@@ -131,19 +137,21 @@ where
131137
time_provider: TP,
132138
channel_manager: CM,
133139
node_signer: NS,
140+
kv_store: K,
134141
last_pruning: Mutex<Option<LSPSDateTime>>,
135142
}
136143

137-
impl<CM: Deref, NS: Deref, TP: Deref> LSPS5ServiceHandler<CM, NS, TP>
144+
impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPS5ServiceHandler<CM, NS, K, TP>
138145
where
139146
CM::Target: AChannelManager,
140147
NS::Target: NodeSigner,
148+
K::Target: KVStore,
141149
TP::Target: TimeProvider,
142150
{
143151
/// Constructs a `LSPS5ServiceHandler` using the given time provider.
144152
pub(crate) fn new_with_time_provider(
145153
event_queue: Arc<EventQueue>, pending_messages: Arc<MessageQueue>, channel_manager: CM,
146-
node_signer: NS, config: LSPS5ServiceConfig, time_provider: TP,
154+
kv_store: K, node_signer: NS, config: LSPS5ServiceConfig, time_provider: TP,
147155
) -> Self {
148156
assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0");
149157
Self {
@@ -154,6 +162,7 @@ where
154162
time_provider,
155163
channel_manager,
156164
node_signer,
165+
kv_store,
157166
last_pruning: Mutex::new(None),
158167
}
159168
}
@@ -186,6 +195,51 @@ where
186195
}
187196
}
188197

198+
async fn persist_peer_state(
199+
&self, counterparty_node_id: PublicKey,
200+
) -> Result<(), lightning::io::Error> {
201+
let fut = {
202+
let outer_state_lock = self.per_peer_state.read().unwrap();
203+
let encoded = match outer_state_lock.get(&counterparty_node_id) {
204+
None => {
205+
let err = lightning::io::Error::new(
206+
lightning::io::ErrorKind::Other,
207+
"Failed to get peer entry",
208+
);
209+
return Err(err);
210+
},
211+
Some(entry) => entry.encode(),
212+
};
213+
214+
let key = counterparty_node_id.to_string();
215+
216+
self.kv_store.write(
217+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
218+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
219+
&key,
220+
encoded,
221+
)
222+
};
223+
224+
fut.await
225+
}
226+
227+
pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
228+
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
229+
// introduce some batching to upper-bound the number of requests inflight at any given
230+
// time.
231+
let need_persist: Vec<PublicKey> = {
232+
let outer_state_lock = self.per_peer_state.read().unwrap();
233+
outer_state_lock.iter().filter_map(|(k, v)| Some(*k)).collect()
234+
};
235+
236+
for counterparty_node_id in need_persist.into_iter() {
237+
self.persist_peer_state(counterparty_node_id).await?;
238+
}
239+
240+
Ok(())
241+
}
242+
189243
fn check_prune_stale_webhooks<'a>(
190244
&self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap<PublicKey, PeerState>>,
191245
) {
@@ -549,10 +603,12 @@ where
549603
}
550604
}
551605

552-
impl<CM: Deref, NS: Deref, TP: Deref> LSPSProtocolMessageHandler for LSPS5ServiceHandler<CM, NS, TP>
606+
impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPSProtocolMessageHandler
607+
for LSPS5ServiceHandler<CM, NS, K, TP>
553608
where
554609
CM::Target: AChannelManager,
555610
NS::Target: NodeSigner,
611+
K::Target: KVStore,
556612
TP::Target: TimeProvider,
557613
{
558614
type ProtocolMessage = LSPS5Message;

lightning-liquidity/src/manager.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ pub struct LiquidityManager<
303303
lsps1_client_handler: Option<LSPS1ClientHandler<ES>>,
304304
lsps2_service_handler: Option<LSPS2ServiceHandler<CM, K>>,
305305
lsps2_client_handler: Option<LSPS2ClientHandler<ES>>,
306-
lsps5_service_handler: Option<LSPS5ServiceHandler<CM, NS, TP>>,
306+
lsps5_service_handler: Option<LSPS5ServiceHandler<CM, NS, K, TP>>,
307307
lsps5_client_handler: Option<LSPS5ClientHandler<ES>>,
308308
service_config: Option<LiquidityServiceConfig>,
309309
_client_config: Option<LiquidityClientConfig>,
@@ -423,7 +423,7 @@ where
423423
let lsps5_service_handler = service_config.as_ref().and_then(|config| {
424424
config.lsps5_service_config.as_ref().map(|config| {
425425
if let Some(number) =
426-
<LSPS5ServiceHandler<CM, NS, TP> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
426+
<LSPS5ServiceHandler<CM, NS, K, TP> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
427427
{
428428
supported_protocols.push(number);
429429
}
@@ -432,6 +432,7 @@ where
432432
Arc::clone(&pending_events),
433433
Arc::clone(&pending_messages),
434434
channel_manager.clone(),
435+
kv_store.clone(),
435436
node_signer,
436437
config.clone(),
437438
time_provider,
@@ -552,7 +553,7 @@ where
552553
/// Returns a reference to the LSPS5 server-side handler.
553554
///
554555
/// The returned handler allows to initiate the LSPS5 service-side flow.
555-
pub fn lsps5_service_handler(&self) -> Option<&LSPS5ServiceHandler<CM, NS, TP>> {
556+
pub fn lsps5_service_handler(&self) -> Option<&LSPS5ServiceHandler<CM, NS, K, TP>> {
556557
self.lsps5_service_handler.as_ref()
557558
}
558559

@@ -623,6 +624,10 @@ where
623624
lsps2_service_handler.persist().await?;
624625
}
625626

627+
if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
628+
lsps5_service_handler.persist().await?;
629+
}
630+
626631
Ok(())
627632
}
628633

@@ -1142,7 +1147,9 @@ where
11421147
/// Returns a reference to the LSPS5 server-side handler.
11431148
///
11441149
/// Wraps [`LiquidityManager::lsps5_service_handler`].
1145-
pub fn lsps5_service_handler(&self) -> Option<&LSPS5ServiceHandler<CM, NS, TP>> {
1150+
pub fn lsps5_service_handler(
1151+
&self,
1152+
) -> Option<&LSPS5ServiceHandler<CM, NS, Arc<KVStoreSyncWrapper<KS>>, TP>> {
11461153
self.inner.lsps5_service_handler()
11471154
}
11481155

lightning-liquidity/src/persist.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,8 @@ pub const LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "liquidity_man
1818
///
1919
/// [`LSPS2ServiceHandler`]: crate::lsps2::service::LSPS2ServiceHandler
2020
pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";
21+
22+
/// The secondary namespace under which the [`LSPS5ServiceHandler`] data will be persisted.
23+
///
24+
/// [`LSPS5ServiceHandler`]: crate::lsps5::service::LSPS5ServiceHandler
25+
pub const LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps5_service";

0 commit comments

Comments
 (0)