Skip to content

Commit 7444ac3

Browse files
committed
Add LSPS2ServiceHandler persistence
We add simple `persist` call to `LSPS2ServiceHandler` that sequentially persist all the peer states under a key that encodes their node id.
1 parent 29e4aeb commit 7444ac3

File tree

4 files changed

+119
-11
lines changed

4 files changed

+119
-11
lines changed

lightning-liquidity/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub mod lsps2;
6565
pub mod lsps5;
6666
mod manager;
6767
pub mod message_queue;
68+
pub mod persist;
6869
#[allow(dead_code)]
6970
#[allow(unused_imports)]
7071
mod sync;

lightning-liquidity/src/lsps2/service.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
1212
use alloc::string::{String, ToString};
1313
use alloc::vec::Vec;
14+
use lightning::util::persist::KVStore;
1415

1516
use core::cmp::Ordering as CmpOrdering;
1617
use core::ops::Deref;
@@ -28,6 +29,9 @@ use crate::lsps2::utils::{
2829
compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
2930
};
3031
use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard};
32+
use crate::persist::{
33+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
34+
};
3135
use crate::prelude::hash_map::Entry;
3236
use crate::prelude::{new_hash_map, HashMap};
3337
use crate::sync::{Arc, Mutex, MutexGuard, RwLock};
@@ -38,6 +42,7 @@ use lightning::ln::msgs::{ErrorAction, LightningError};
3842
use lightning::ln::types::ChannelId;
3943
use lightning::util::errors::APIError;
4044
use lightning::util::logger::Level;
45+
use lightning::util::ser::Writeable;
4146
use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
4247

4348
use lightning_types::payment::PaymentHash;
@@ -564,11 +569,13 @@ macro_rules! get_or_insert_peer_state_entry {
564569
}
565570

566571
/// The main object allowing to send and receive bLIP-52 / LSPS2 messages.
567-
pub struct LSPS2ServiceHandler<CM: Deref>
572+
pub struct LSPS2ServiceHandler<CM: Deref, K: Deref + Clone>
568573
where
569574
CM::Target: AChannelManager,
575+
K::Target: KVStore,
570576
{
571577
channel_manager: CM,
578+
kv_store: K,
572579
pending_messages: Arc<MessageQueue>,
573580
pending_events: Arc<EventQueue>,
574581
per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
@@ -578,14 +585,15 @@ where
578585
config: LSPS2ServiceConfig,
579586
}
580587

581-
impl<CM: Deref> LSPS2ServiceHandler<CM>
588+
impl<CM: Deref, K: Deref + Clone> LSPS2ServiceHandler<CM, K>
582589
where
583590
CM::Target: AChannelManager,
591+
K::Target: KVStore,
584592
{
585593
/// Constructs a `LSPS2ServiceHandler`.
586594
pub(crate) fn new(
587595
pending_messages: Arc<MessageQueue>, pending_events: Arc<EventQueue>, channel_manager: CM,
588-
config: LSPS2ServiceConfig,
596+
kv_store: K, config: LSPS2ServiceConfig,
589597
) -> Self {
590598
Self {
591599
pending_messages,
@@ -595,6 +603,7 @@ where
595603
peer_by_channel_id: RwLock::new(new_hash_map()),
596604
total_pending_requests: AtomicUsize::new(0),
597605
channel_manager,
606+
kv_store,
598607
config,
599608
}
600609
}
@@ -1442,6 +1451,50 @@ where
14421451
);
14431452
}
14441453

1454+
async fn persist_peer_state(
1455+
&self, counterparty_node_id: PublicKey,
1456+
) -> Result<(), lightning::io::Error> {
1457+
let fut = {
1458+
let outer_state_lock = self.per_peer_state.read().unwrap();
1459+
let encoded = match outer_state_lock.get(&counterparty_node_id) {
1460+
None => {
1461+
let err = lightning::io::Error::new(
1462+
lightning::io::ErrorKind::Other,
1463+
"Failed to get peer entry",
1464+
);
1465+
return Err(err);
1466+
},
1467+
Some(entry) => entry.lock().unwrap().encode(),
1468+
};
1469+
let key = counterparty_node_id.to_string();
1470+
1471+
self.kv_store.write(
1472+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1473+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1474+
&key,
1475+
encoded,
1476+
)
1477+
};
1478+
1479+
fut.await
1480+
}
1481+
1482+
pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
1483+
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
1484+
// introduce some batching to upper-bound the number of requests inflight at any given
1485+
// time.
1486+
let need_persist: Vec<PublicKey> = {
1487+
let outer_state_lock = self.per_peer_state.read().unwrap();
1488+
outer_state_lock.iter().filter_map(|(k, v)| Some(*k)).collect()
1489+
};
1490+
1491+
for counterparty_node_id in need_persist.into_iter() {
1492+
self.persist_peer_state(counterparty_node_id).await?;
1493+
}
1494+
1495+
Ok(())
1496+
}
1497+
14451498
pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
14461499
let mut outer_state_lock = self.per_peer_state.write().unwrap();
14471500
let is_prunable =
@@ -1468,9 +1521,10 @@ where
14681521
}
14691522
}
14701523

1471-
impl<CM: Deref> LSPSProtocolMessageHandler for LSPS2ServiceHandler<CM>
1524+
impl<CM: Deref, K: Deref + Clone> LSPSProtocolMessageHandler for LSPS2ServiceHandler<CM, K>
14721525
where
14731526
CM::Target: AChannelManager,
1527+
K::Target: KVStore,
14741528
{
14751529
type ProtocolMessage = LSPS2Message;
14761530
const PROTOCOL_NUMBER: Option<u16> = Some(2);

lightning-liquidity/src/manager.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// You may not use this file except in accordance with one or both of these
88
// licenses.
99

10+
use alloc::boxed::Box;
1011
use alloc::string::ToString;
1112
use alloc::vec::Vec;
1213

@@ -34,6 +35,7 @@ use crate::lsps2::msgs::LSPS2Message;
3435
use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler};
3536
use crate::prelude::{new_hash_map, new_hash_set, HashMap, HashSet};
3637
use crate::sync::{Arc, Mutex, RwLock};
38+
use crate::utils::async_poll::dummy_waker;
3739
#[cfg(feature = "time")]
3840
use crate::utils::time::DefaultTimeProvider;
3941
use crate::utils::time::TimeProvider;
@@ -53,7 +55,9 @@ use lightning_types::features::{InitFeatures, NodeFeatures};
5355

5456
use bitcoin::secp256k1::PublicKey;
5557

58+
use core::future::Future as StdFuture;
5659
use core::ops::Deref;
60+
use core::task;
5761

5862
const LSPS_FEATURE_BIT: usize = 729;
5963

@@ -297,15 +301,14 @@ pub struct LiquidityManager<
297301
#[cfg(lsps1_service)]
298302
lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, C>>,
299303
lsps1_client_handler: Option<LSPS1ClientHandler<ES>>,
300-
lsps2_service_handler: Option<LSPS2ServiceHandler<CM>>,
304+
lsps2_service_handler: Option<LSPS2ServiceHandler<CM, K>>,
301305
lsps2_client_handler: Option<LSPS2ClientHandler<ES>>,
302306
lsps5_service_handler: Option<LSPS5ServiceHandler<CM, NS, TP>>,
303307
lsps5_client_handler: Option<LSPS5ClientHandler<ES>>,
304308
service_config: Option<LiquidityServiceConfig>,
305309
_client_config: Option<LiquidityClientConfig>,
306310
best_block: RwLock<Option<BestBlock>>,
307311
_chain_source: Option<C>,
308-
kv_store: K,
309312
}
310313

311314
#[cfg(feature = "time")]
@@ -392,14 +395,15 @@ where
392395
let lsps2_service_handler = service_config.as_ref().and_then(|config| {
393396
config.lsps2_service_config.as_ref().map(|config| {
394397
if let Some(number) =
395-
<LSPS2ServiceHandler<CM> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
398+
<LSPS2ServiceHandler<CM, K> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
396399
{
397400
supported_protocols.push(number);
398401
}
399402
LSPS2ServiceHandler::new(
400403
Arc::clone(&pending_messages),
401404
Arc::clone(&pending_events),
402405
channel_manager.clone(),
406+
kv_store.clone(),
403407
config.clone(),
404408
)
405409
})
@@ -495,7 +499,6 @@ where
495499
_client_config: client_config,
496500
best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
497501
_chain_source: chain_source,
498-
kv_store,
499502
}
500503
}
501504

@@ -534,8 +537,8 @@ where
534537

535538
/// Returns a reference to the LSPS2 server-side handler.
536539
///
537-
/// The returned handler allows to initiate the LSPS2 service-side flow.
538-
pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM>> {
540+
/// The returned hendler allows to initiate the LSPS2 service-side flow.
541+
pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM, K>> {
539542
self.lsps2_service_handler.as_ref()
540543
}
541544

@@ -610,6 +613,19 @@ where
610613
self.pending_events.get_and_clear_pending_events()
611614
}
612615

616+
/// Persists the state of the service handlers towards the given [`KVStore`] implementation.
617+
///
618+
/// This will be regularly called by LDK's background processor if necessary and only needs to
619+
/// be called manually if it's not utilized.
620+
pub async fn persist(&self) -> Result<(), lightning::io::Error> {
621+
// TODO: We should eventually persist in parallel.
622+
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
623+
lsps2_service_handler.persist().await?;
624+
}
625+
626+
Ok(())
627+
}
628+
613629
fn handle_lsps_message(
614630
&self, msg: LSPSMessage, sender_node_id: &PublicKey,
615631
) -> Result<(), lightning::ln::msgs::LightningError> {
@@ -1110,7 +1126,9 @@ where
11101126
/// Returns a reference to the LSPS2 server-side handler.
11111127
///
11121128
/// Wraps [`LiquidityManager::lsps2_service_handler`].
1113-
pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM>> {
1129+
pub fn lsps2_service_handler(
1130+
&self,
1131+
) -> Option<&LSPS2ServiceHandler<CM, Arc<KVStoreSyncWrapper<KS>>>> {
11141132
self.inner.lsps2_service_handler()
11151133
}
11161134

@@ -1163,6 +1181,21 @@ where
11631181
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
11641182
self.inner.get_and_clear_pending_events()
11651183
}
1184+
1185+
/// Persists the state of the service handlers towards the given [`KVStoreSync`] implementation.
1186+
///
1187+
/// Wraps [`LiquidityManager::persist`].
1188+
pub fn persist(&self) -> Result<(), lightning::io::Error> {
1189+
let mut waker = dummy_waker();
1190+
let mut ctx = task::Context::from_waker(&mut waker);
1191+
match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) {
1192+
task::Poll::Ready(result) => result,
1193+
task::Poll::Pending => {
1194+
// In a sync context, we can't wait for the future to complete.
1195+
unreachable!("LiquidityManager::persist should not be pending in a sync context");
1196+
},
1197+
}
1198+
}
11661199
}
11671200

11681201
impl<

lightning-liquidity/src/persist.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
//! Types and utils for persistence.
11+
12+
/// The primary namespace under which the [`LiquidityManager`] will be persisted.
13+
///
14+
/// [`LiquidityManager`]: crate::LiquidityManager
15+
pub const LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "liquidity_manager";
16+
17+
/// The secondary namespace under which the [`LSPS2ServiceHandler`] data will be persisted.
18+
///
19+
/// [`LSPS2ServiceHandler`]: crate::lsps2::service::LSPS2ServiceHandler
20+
pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";

0 commit comments

Comments
 (0)