From 13e55ba8057c0673d34a43e00390fc5e329e479d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 5 Jan 2026 13:22:53 +0100 Subject: [PATCH 1/5] Use async `KVStore` for `read_X` util methods Rather than using `KVStoreSync` we now use the async `KVStore` implementation for most `read_X` util methods used during node building. This is a first step towards making node building/startup entirely async eventually. --- src/builder.rs | 76 +++++++++++++++----------- src/io/utils.rs | 142 ++++++++++++++++++++++++++++-------------------- 2 files changed, 129 insertions(+), 89 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 08ac123fa..63e7df005 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -57,7 +57,9 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics, + read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, + read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer, + write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ @@ -1053,7 +1055,9 @@ fn build_with_store_internal( } // Initialize the status fields. - let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) { + let node_metrics = match runtime + .block_on(async { read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1067,7 +1071,9 @@ fn build_with_store_internal( let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) { + let payment_store = match runtime + .block_on(async { read_payments(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { Ok(payments) => Arc::new(PaymentStore::new( payments, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), @@ -1294,24 +1300,23 @@ fn build_with_store_internal( )); // Initialize the network graph, scorer, and router - let network_graph = - match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) { - Ok(graph) => Arc::new(graph), - Err(e) => { - if e.kind() == std::io::ErrorKind::NotFound { - Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) - } else { - log_error!(logger, "Failed to read network graph from store: {}", e); - return Err(BuildError::ReadFailed); - } - }, - }; + let network_graph = match runtime + .block_on(async { read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { + Ok(graph) => Arc::new(graph), + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) + } else { + log_error!(logger, "Failed to read network graph from store: {}", e); + return Err(BuildError::ReadFailed); + } + }, + }; - let local_scorer = match io::utils::read_scorer( - Arc::clone(&kv_store), - Arc::clone(&network_graph), - Arc::clone(&logger), - ) { + let local_scorer = match runtime.block_on(async { + read_scorer(Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger)).await + }) { Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1327,7 +1332,10 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); // Restore external pathfinding scores from cache if possible. - match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) { + match runtime.block_on(async { + read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) + .await + }) { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); @@ -1616,14 +1624,17 @@ fn build_with_store_internal( let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); - let output_sweeper = match io::utils::read_output_sweeper( - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&chain_source), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - Arc::clone(&logger), - ) { + let output_sweeper = match runtime.block_on(async { + read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + Arc::clone(&keys_manager), + Arc::clone(&kv_store), + Arc::clone(&logger), + ) + .await + }) { Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1644,7 +1655,8 @@ fn build_with_store_internal( }, }; - let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)) + let event_queue = match runtime + .block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await }) { Ok(event_queue) => Arc::new(event_queue), Err(e) => { @@ -1657,7 +1669,9 @@ fn build_with_store_internal( }, }; - let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) { + let peer_store = match runtime + .block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { Ok(peer_store) => Arc::new(peer_store), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { diff --git a/src/io/utils.rs b/src/io/utils.rs index 928d4031b..9b754f32a 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -88,18 +88,21 @@ pub(crate) fn read_or_generate_seed_file( } /// Read a previously persisted [`NetworkGraph`] from the store. -pub(crate) fn read_network_graph( +pub(crate) async fn read_network_graph( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await?, + ); NetworkGraph::read(&mut reader, logger.clone()).map_err(|e| { log_error!(logger, "Failed to deserialize NetworkGraph: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph") @@ -107,19 +110,22 @@ where } /// Read a previously persisted [`ProbabilisticScorer`] from the store. -pub(crate) fn read_scorer>, L: Deref + Clone>( +pub(crate) async fn read_scorer>, L: Deref + Clone>( kv_store: Arc, network_graph: G, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { let params = ProbabilisticScoringDecayParameters::default(); - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + ) + .await?, + ); let args = (params, network_graph, logger.clone()); ProbabilisticScorer::read(&mut reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); @@ -128,18 +134,21 @@ where } /// Read previously persisted external pathfinding scores from the cache. -pub(crate) fn read_external_pathfinding_scores_from_cache( +pub(crate) async fn read_external_pathfinding_scores_from_cache( kv_store: Arc, logger: L, ) -> Result where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + ) + .await?, + ); ChannelLiquidities::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") @@ -175,18 +184,21 @@ where } /// Read previously persisted events from the store. -pub(crate) fn read_event_queue( +pub(crate) async fn read_event_queue( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + ) + .await?, + ); EventQueue::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize event queue: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue") @@ -194,18 +206,21 @@ where } /// Read previously persisted peer info from the store. -pub(crate) fn read_peer_info( +pub(crate) async fn read_peer_info( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ) + .await?, + ); PeerStore::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize peer store: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore") @@ -213,7 +228,7 @@ where } /// Read previously persisted payments information from the store. -pub(crate) fn read_payments( +pub(crate) async fn read_payments( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where @@ -221,17 +236,22 @@ where { let mut res = Vec::new(); - for stored_key in KVStoreSync::list( + for stored_key in KVStore::list( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - )? { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &stored_key, - )?); + ) + .await? + { + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + ) + .await?, + ); let payment = PaymentDetails::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); std::io::Error::new( @@ -245,17 +265,20 @@ where } /// Read `OutputSweeper` state from the store. -pub(crate) fn read_output_sweeper( +pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, chain_data_source: Arc, keys_manager: Arc, kv_store: Arc, logger: Arc, ) -> Result { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + ) + .await?, + ); let args = ( broadcaster, fee_estimator, @@ -272,18 +295,21 @@ pub(crate) fn read_output_sweeper( Ok(sweeper) } -pub(crate) fn read_node_metrics( +pub(crate) async fn read_node_metrics( kv_store: Arc, logger: L, ) -> Result where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - NODE_METRICS_PRIMARY_NAMESPACE, - NODE_METRICS_SECONDARY_NAMESPACE, - NODE_METRICS_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, + ) + .await?, + ); NodeMetrics::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize NodeMetrics: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NodeMetrics") From aba72a2462ed2b0cd16ddcb5dee809e34073dd46 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 5 Jan 2026 14:16:29 +0100 Subject: [PATCH 2/5] Parallelize `read_payments` Previously, we would read entries of our payment store sequentially. This is more or less fine when we read from a local store, but when we read from a remote (e.g., VSS) store, all the latency could result in considerable slowdown during startup. Here, we opt to read store entries in batches. --- src/builder.rs | 2 -- src/ffi/types.rs | 4 +-- src/io/utils.rs | 59 +++++++++++++++++++++++++++++++++++------- src/payment/unified.rs | 9 +++---- src/types.rs | 3 +-- 5 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 63e7df005..2aa09a61a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -19,9 +19,7 @@ use bitcoin::bip32::{ChildNumber, Xpriv}; use bitcoin::key::Secp256k1; use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Network}; - use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; - use lightning::chain::{chainmonitor, BestBlock, Watch}; use lightning::io::Cursor; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; diff --git a/src/ffi/types.rs b/src/ffi/types.rs index a5ff8372f..bed040fcd 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -29,6 +29,7 @@ use lightning::offers::invoice::Bolt12Invoice as LdkBolt12Invoice; pub use lightning::offers::offer::OfferId; use lightning::offers::offer::{Amount as LdkAmount, Offer as LdkOffer}; use lightning::offers::refund::Refund as LdkRefund; +use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName; pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees}; pub use lightning::routing::router::RouteParametersConfig; use lightning::util::ser::Writeable; @@ -56,9 +57,6 @@ pub use crate::payment::store::{ ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, }; pub use crate::payment::UnifiedPaymentResult; - -use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName; - use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; impl UniffiCustomTypeConverter for PublicKey { diff --git a/src/io/utils.rs b/src/io/utils.rs index 9b754f32a..d326827d7 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -236,22 +236,59 @@ where { let mut res = Vec::new(); - for stored_key in KVStore::list( + let mut stored_keys = KVStore::list( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, ) - .await? - { - let mut reader = Cursor::new( - KVStore::read( + .await?; + + const BATCH_SIZE: usize = 50; + + let mut set = tokio::task::JoinSet::new(); + + // Fill JoinSet with tasks if possible + while set.len() < BATCH_SIZE && !stored_keys.is_empty() { + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &stored_key, - ) - .await?, - ); + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + } + + while let Some(read_res) = set.join_next().await { + // Exit early if we get an IO error. + let read_res = read_res + .map_err(|e| { + log_error!(logger, "Failed to read PaymentDetails: {}", e); + set.abort_all(); + e + })? + .map_err(|e| { + log_error!(logger, "Failed to read PaymentDetails: {}", e); + set.abort_all(); + e + })?; + + // Refill set for every finished future, if we still have something to do. + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + + // Handle result. + let mut reader = Cursor::new(read_res); let payment = PaymentDetails::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); std::io::Error::new( @@ -261,6 +298,10 @@ where })?; res.push(payment); } + + debug_assert!(set.is_empty()); + debug_assert!(stored_keys.is_empty()); + Ok(res) } diff --git a/src/payment/unified.rs b/src/payment/unified.rs index b1546961b..8225205fd 100644 --- a/src/payment/unified.rs +++ b/src/payment/unified.rs @@ -21,16 +21,14 @@ use bip21::de::ParamKind; use bip21::{DeserializationError, DeserializeParams, Param, SerializeParams}; use bitcoin::address::NetworkChecked; use bitcoin::{Amount, Txid}; +use bitcoin_payment_instructions::amount::Amount as BPIAmount; +use bitcoin_payment_instructions::{PaymentInstructions, PaymentMethod}; use lightning::ln::channelmanager::PaymentId; use lightning::offers::offer::Offer; use lightning::onion_message::dns_resolution::HumanReadableName; use lightning::routing::router::RouteParametersConfig; use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description}; -use bitcoin_payment_instructions::{ - amount::Amount as BPIAmount, PaymentInstructions, PaymentMethod, -}; - use crate::error::Error; use crate::ffi::maybe_wrap; use crate::logger::{log_error, LdkLogger, Logger}; @@ -393,7 +391,8 @@ impl DeserializationError for Extras { mod tests { use std::str::FromStr; - use bitcoin::{address::NetworkUnchecked, Address, Network}; + use bitcoin::address::NetworkUnchecked; + use bitcoin::{Address, Network}; use super::{Amount, Bolt11Invoice, Extras, Offer}; diff --git a/src/types.rs b/src/types.rs index 8835cb424..5e9cd74c9 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,6 +12,7 @@ use std::sync::{Arc, Mutex}; use bitcoin::secp256k1::PublicKey; use bitcoin::{OutPoint, ScriptBuf}; +use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::chainmonitor; use lightning::impl_writeable_tlv_based; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; @@ -29,8 +30,6 @@ use lightning_block_sync::gossip::GossipVerifier; use lightning_liquidity::utils::time::DefaultTimeProvider; use lightning_net_tokio::SocketDescriptor; -use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; - use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; use crate::config::ChannelConfig; From 21f0020049444a7a23e966e48966a58468b3785f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 7 Jan 2026 09:59:07 +0100 Subject: [PATCH 3/5] Drop unnecessary uses of `io::Cursor` --- src/builder.rs | 6 +- src/io/utils.rs | 137 +++++++++++++++++++------------------------ src/io/vss_store.rs | 9 ++- src/payment/store.rs | 25 +++----- src/scoring.rs | 6 +- 5 files changed, 77 insertions(+), 106 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 2aa09a61a..ce1bd7155 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -21,7 +21,6 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Network}; use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::{chainmonitor, BestBlock, Watch}; -use lightning::io::Cursor; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; @@ -1386,13 +1385,12 @@ fn build_with_store_internal( // Initialize the ChannelManager let channel_manager = { - if let Ok(res) = KVStoreSync::read( + if let Ok(reader) = KVStoreSync::read( &*kv_store, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, ) { - let mut reader = Cursor::new(res); let channel_monitor_references = channel_monitors.iter().map(|(_, chanmon)| chanmon).collect(); let read_args = ChannelManagerReadArgs::new( @@ -1409,7 +1407,7 @@ fn build_with_store_internal( channel_monitor_references, ); let (_hash, channel_manager) = - <(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| { + <(BlockHash, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| { log_error!(logger, "Failed to read channel manager from store: {}", e); BuildError::ReadFailed })?; diff --git a/src/io/utils.rs b/src/io/utils.rs index d326827d7..15677d096 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -18,7 +18,6 @@ use bdk_chain::tx_graph::ChangeSet as BdkTxGraphChangeSet; use bdk_chain::ConfirmationBlockTime; use bdk_wallet::ChangeSet as BdkWalletChangeSet; use bitcoin::Network; -use lightning::io::Cursor; use lightning::ln::msgs::DecodeError; use lightning::routing::gossip::NetworkGraph; use lightning::routing::scoring::{ @@ -94,16 +93,14 @@ pub(crate) async fn read_network_graph( where L::Target: LdkLogger, { - let mut reader = Cursor::new( - KVStore::read( - &*kv_store, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - ) - .await?, - ); - NetworkGraph::read(&mut reader, logger.clone()).map_err(|e| { + let reader = KVStore::read( + &*kv_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await?; + NetworkGraph::read(&mut &*reader, logger.clone()).map_err(|e| { log_error!(logger, "Failed to deserialize NetworkGraph: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph") }) @@ -117,17 +114,15 @@ where L::Target: LdkLogger, { let params = ProbabilisticScoringDecayParameters::default(); - let mut reader = Cursor::new( - KVStore::read( - &*kv_store, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - ) - .await?, - ); + let reader = KVStore::read( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + ) + .await?; let args = (params, network_graph, logger.clone()); - ProbabilisticScorer::read(&mut reader, args).map_err(|e| { + ProbabilisticScorer::read(&mut &*reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") }) @@ -140,16 +135,14 @@ pub(crate) async fn read_external_pathfinding_scores_from_cache( where L::Target: LdkLogger, { - let mut reader = Cursor::new( - KVStore::read( - &*kv_store, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, - ) - .await?, - ); - ChannelLiquidities::read(&mut reader).map_err(|e| { + let reader = KVStore::read( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + ) + .await?; + ChannelLiquidities::read(&mut &*reader).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") }) @@ -190,16 +183,14 @@ pub(crate) async fn read_event_queue( where L::Target: LdkLogger, { - let mut reader = Cursor::new( - KVStore::read( - &*kv_store, - EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_KEY, - ) - .await?, - ); - EventQueue::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { + let reader = KVStore::read( + &*kv_store, + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + ) + .await?; + EventQueue::read(&mut &*reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize event queue: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue") }) @@ -212,16 +203,14 @@ pub(crate) async fn read_peer_info( where L::Target: LdkLogger, { - let mut reader = Cursor::new( - KVStore::read( - &*kv_store, - PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - ) - .await?, - ); - PeerStore::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { + let reader = KVStore::read( + &*kv_store, + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ) + .await?; + PeerStore::read(&mut &*reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize peer store: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore") }) @@ -263,7 +252,7 @@ where while let Some(read_res) = set.join_next().await { // Exit early if we get an IO error. - let read_res = read_res + let reader = read_res .map_err(|e| { log_error!(logger, "Failed to read PaymentDetails: {}", e); set.abort_all(); @@ -288,8 +277,7 @@ where } // Handle result. - let mut reader = Cursor::new(read_res); - let payment = PaymentDetails::read(&mut reader).map_err(|e| { + let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -311,15 +299,13 @@ pub(crate) async fn read_output_sweeper( chain_data_source: Arc, keys_manager: Arc, kv_store: Arc, logger: Arc, ) -> Result { - let mut reader = Cursor::new( - KVStore::read( - &*kv_store, - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - ) - .await?, - ); + let reader = KVStore::read( + &*kv_store, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + ) + .await?; let args = ( broadcaster, fee_estimator, @@ -329,7 +315,7 @@ pub(crate) async fn read_output_sweeper( kv_store, logger.clone(), ); - let (_, sweeper) = <(_, Sweeper)>::read(&mut reader, args).map_err(|e| { + let (_, sweeper) = <(_, Sweeper)>::read(&mut &*reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize OutputSweeper: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize OutputSweeper") })?; @@ -342,16 +328,14 @@ pub(crate) async fn read_node_metrics( where L::Target: LdkLogger, { - let mut reader = Cursor::new( - KVStore::read( - &*kv_store, - NODE_METRICS_PRIMARY_NAMESPACE, - NODE_METRICS_SECONDARY_NAMESPACE, - NODE_METRICS_KEY, - ) - .await?, - ); - NodeMetrics::read(&mut reader).map_err(|e| { + let reader = KVStore::read( + &*kv_store, + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, + ) + .await?; + NodeMetrics::read(&mut &*reader).map_err(|e| { log_error!(logger, "Failed to deserialize NodeMetrics: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NodeMetrics") }) @@ -490,7 +474,7 @@ macro_rules! impl_read_write_change_set_type { where L::Target: LdkLogger, { - let bytes = + let reader = match KVStoreSync::read(&*kv_store, $primary_namespace, $secondary_namespace, $key) { Ok(bytes) => bytes, @@ -511,9 +495,8 @@ macro_rules! impl_read_write_change_set_type { }, }; - let mut reader = Cursor::new(bytes); let res: Result, DecodeError> = - Readable::read(&mut reader); + Readable::read(&mut &*reader); match res { Ok(res) => Ok(Some(res.0)), Err(e) => { diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index eb439ed10..b4fdc770a 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -745,11 +745,10 @@ async fn determine_and_write_schema_version( })? .0; - let schema_version: VssSchemaVersion = Readable::read(&mut io::Cursor::new(decrypted)) - .map_err(|e| { - let msg = format!("Failed to decode schema version: {}", e); - Error::new(ErrorKind::Other, msg) - })?; + let schema_version: VssSchemaVersion = Readable::read(&mut &*decrypted).map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; Ok(schema_version) } else { // The schema version wasn't present, this either means we're running for the first time *or* it's V0 pre-migration (predating writing of the schema version). diff --git a/src/payment/store.rs b/src/payment/store.rs index 184de2ea9..15e94190c 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -605,7 +605,6 @@ impl StorableObjectUpdate for PaymentDetailsUpdate { #[cfg(test)] mod tests { - use bitcoin::io::Cursor; use lightning::util::ser::Readable; use super::*; @@ -657,16 +656,12 @@ mod tests { let old_bolt11_encoded = old_bolt11_payment.encode(); assert_eq!( old_bolt11_payment, - OldPaymentDetails::read(&mut Cursor::new(old_bolt11_encoded.clone())).unwrap() + OldPaymentDetails::read(&mut &*old_bolt11_encoded.clone()).unwrap() ); - let bolt11_decoded = - PaymentDetails::read(&mut Cursor::new(old_bolt11_encoded)).unwrap(); + let bolt11_decoded = PaymentDetails::read(&mut &*old_bolt11_encoded).unwrap(); let bolt11_reencoded = bolt11_decoded.encode(); - assert_eq!( - bolt11_decoded, - PaymentDetails::read(&mut Cursor::new(bolt11_reencoded)).unwrap() - ); + assert_eq!(bolt11_decoded, PaymentDetails::read(&mut &*bolt11_reencoded).unwrap()); match bolt11_decoded.kind { PaymentKind::Bolt11 { hash: h, preimage: p, secret: s } => { @@ -700,15 +695,14 @@ mod tests { let old_bolt11_jit_encoded = old_bolt11_jit_payment.encode(); assert_eq!( old_bolt11_jit_payment, - OldPaymentDetails::read(&mut Cursor::new(old_bolt11_jit_encoded.clone())).unwrap() + OldPaymentDetails::read(&mut &*old_bolt11_jit_encoded.clone()).unwrap() ); - let bolt11_jit_decoded = - PaymentDetails::read(&mut Cursor::new(old_bolt11_jit_encoded)).unwrap(); + let bolt11_jit_decoded = PaymentDetails::read(&mut &*old_bolt11_jit_encoded).unwrap(); let bolt11_jit_reencoded = bolt11_jit_decoded.encode(); assert_eq!( bolt11_jit_decoded, - PaymentDetails::read(&mut Cursor::new(bolt11_jit_reencoded)).unwrap() + PaymentDetails::read(&mut &*bolt11_jit_reencoded).unwrap() ); match bolt11_jit_decoded.kind { @@ -746,15 +740,14 @@ mod tests { let old_spontaneous_encoded = old_spontaneous_payment.encode(); assert_eq!( old_spontaneous_payment, - OldPaymentDetails::read(&mut Cursor::new(old_spontaneous_encoded.clone())).unwrap() + OldPaymentDetails::read(&mut &*old_spontaneous_encoded.clone()).unwrap() ); - let spontaneous_decoded = - PaymentDetails::read(&mut Cursor::new(old_spontaneous_encoded)).unwrap(); + let spontaneous_decoded = PaymentDetails::read(&mut &*old_spontaneous_encoded).unwrap(); let spontaneous_reencoded = spontaneous_decoded.encode(); assert_eq!( spontaneous_decoded, - PaymentDetails::read(&mut Cursor::new(spontaneous_reencoded)).unwrap() + PaymentDetails::read(&mut &*spontaneous_reencoded).unwrap() ); match spontaneous_decoded.kind { diff --git a/src/scoring.rs b/src/scoring.rs index 6385f2f56..daa5725fa 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -1,4 +1,3 @@ -use std::io::Cursor; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, SystemTime}; @@ -74,15 +73,14 @@ async fn sync_external_scores( return; }, }; - let body = match response.bytes().await { + let reader = match response.bytes().await { Ok(bytes) => bytes, Err(e) => { log_error!(logger, "Failed to read external scores update: {}", e); return; }, }; - let mut reader = Cursor::new(body); - match ChannelLiquidities::read(&mut reader) { + match ChannelLiquidities::read(&mut &*reader) { Ok(liquidities) => { if let Err(e) = write_external_pathfinding_scores_to_cache( Arc::clone(&kv_store), From f1885be7a0ddc0403f1ac3b54da05e146e7e3e71 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 7 Jan 2026 10:19:21 +0100 Subject: [PATCH 4/5] Drop a bunch of unnecessary `Arc::clone`s Previously, we consistently handed around `Arc` references for most objects to avoid unnecessary refactoring work. This approach however introduced a bunch of unnecessary allocations through `Arc::clone`. Here we opt to rather use plain references in a bunch of places, reducing the usage of `Arc`s. --- src/builder.rs | 52 +++++++++++++++++++------------------------ src/chain/bitcoind.rs | 34 +++++++++------------------- src/chain/electrum.rs | 26 ++++++++-------------- src/chain/esplora.rs | 26 ++++++++-------------- src/chain/mod.rs | 4 ++-- src/io/utils.rs | 38 +++++++++++++------------------ src/lib.rs | 4 ++-- src/scoring.rs | 15 +++++-------- src/wallet/persist.rs | 27 +++++++--------------- 9 files changed, 85 insertions(+), 141 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index ce1bd7155..e7e2de286 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1053,7 +1053,7 @@ fn build_with_store_internal( // Initialize the status fields. let node_metrics = match runtime - .block_on(async { read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + .block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await }) { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { @@ -1068,21 +1068,20 @@ fn build_with_store_internal( let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = match runtime - .block_on(async { read_payments(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { - Ok(payments) => Arc::new(PaymentStore::new( - payments, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), - Arc::clone(&kv_store), - Arc::clone(&logger), - )), - Err(e) => { - log_error!(logger, "Failed to read payment data from store: {}", e); - return Err(BuildError::ReadFailed); - }, - }; + let payment_store = + match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) { + Ok(payments) => Arc::new(PaymentStore::new( + payments, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { @@ -1298,7 +1297,7 @@ fn build_with_store_internal( // Initialize the network graph, scorer, and router let network_graph = match runtime - .block_on(async { read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + .block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await }) { Ok(graph) => Arc::new(graph), Err(e) => { @@ -1312,7 +1311,7 @@ fn build_with_store_internal( }; let local_scorer = match runtime.block_on(async { - read_scorer(Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger)).await + read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await }) { Ok(scorer) => scorer, Err(e) => { @@ -1330,8 +1329,7 @@ fn build_with_store_internal( // Restore external pathfinding scores from cache if possible. match runtime.block_on(async { - read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) - .await + read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await }) { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); @@ -1490,15 +1488,11 @@ fn build_with_store_internal( { let mut locked_node_metrics = node_metrics.write().unwrap(); locked_node_metrics.latest_rgs_snapshot_timestamp = None; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .map_err(|e| { - log_error!(logger, "Failed writing to store: {}", e); - BuildError::WriteFailed - })?; + write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger)) + .map_err(|e| { + log_error!(logger, "Failed writing to store: {}", e); + BuildError::WriteFailed + })?; } p2p_source }, diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 0c3b644ca..69255e080 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -205,14 +205,10 @@ impl BitcoindChainSource { unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - ) - .unwrap_or_else(|e| { - log_error!(self.logger, "Failed to persist node metrics: {}", e); - }); + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger) + .unwrap_or_else(|e| { + log_error!(self.logger, "Failed to persist node metrics: {}", e); + }); } break; }, @@ -420,11 +416,11 @@ impl BitcoindChainSource { *self.latest_chain_tip.write().unwrap() = Some(tip); periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - chain_monitor, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - Arc::clone(&self.node_metrics), + &*channel_manager, + &*chain_monitor, + &*self.kv_store, + &*self.logger, + &*self.node_metrics, )?; }, Ok(_) => {}, @@ -469,11 +465,7 @@ impl BitcoindChainSource { locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; Ok(()) } @@ -586,11 +578,7 @@ impl BitcoindChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } Ok(()) diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 9e05dfaee..b520b2e11 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -149,8 +149,8 @@ impl ElectrumChainSource { unix_time_secs_opt; write_node_metrics( &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), + &*self.kv_store, + &*self.logger, )?; } Ok(()) @@ -239,19 +239,15 @@ impl ElectrumChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - Arc::clone(&chain_monitor), - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - Arc::clone(&self.node_metrics), + &*channel_manager, + &*chain_monitor, + &*self.kv_store, + &*self.logger, + &*self.node_metrics, )?; } @@ -284,11 +280,7 @@ impl ElectrumChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } Ok(()) diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index f6f313955..2acca4654 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -128,8 +128,8 @@ impl EsploraChainSource { locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics( &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger) + &*self.kv_store, + &*self.logger )?; } Ok(()) @@ -259,19 +259,15 @@ impl EsploraChainSource { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - Arc::clone(&chain_monitor), - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - Arc::clone(&self.node_metrics), + &*channel_manager, + &*chain_monitor, + &*self.kv_store, + &*self.logger, + &*self.node_metrics, )?; Ok(()) }, @@ -353,11 +349,7 @@ impl EsploraChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } Ok(()) diff --git a/src/chain/mod.rs b/src/chain/mod.rs index a73ce7418..1010f32b7 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -488,8 +488,8 @@ impl Filter for ChainSource { } fn periodically_archive_fully_resolved_monitors( - channel_manager: Arc, chain_monitor: Arc, - kv_store: Arc, logger: Arc, node_metrics: Arc>, + channel_manager: &ChannelManager, chain_monitor: &ChainMonitor, kv_store: &DynStore, + logger: &Logger, node_metrics: &RwLock, ) -> Result<(), Error> { let mut locked_node_metrics = node_metrics.write().unwrap(); let cur_height = channel_manager.current_best_block().height; diff --git a/src/io/utils.rs b/src/io/utils.rs index 15677d096..68ca7a61e 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -88,7 +88,7 @@ pub(crate) fn read_or_generate_seed_file( /// Read a previously persisted [`NetworkGraph`] from the store. pub(crate) async fn read_network_graph( - kv_store: Arc, logger: L, + kv_store: &DynStore, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, @@ -108,7 +108,7 @@ where /// Read a previously persisted [`ProbabilisticScorer`] from the store. pub(crate) async fn read_scorer>, L: Deref + Clone>( - kv_store: Arc, network_graph: G, logger: L, + kv_store: &DynStore, network_graph: G, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, @@ -130,7 +130,7 @@ where /// Read previously persisted external pathfinding scores from the cache. pub(crate) async fn read_external_pathfinding_scores_from_cache( - kv_store: Arc, logger: L, + kv_store: &DynStore, logger: L, ) -> Result where L::Target: LdkLogger, @@ -150,7 +150,7 @@ where /// Persist external pathfinding scores to the cache. pub(crate) async fn write_external_pathfinding_scores_to_cache( - kv_store: Arc, data: &ChannelLiquidities, logger: L, + kv_store: &DynStore, data: &ChannelLiquidities, logger: L, ) -> Result<(), Error> where L::Target: LdkLogger, @@ -218,7 +218,7 @@ where /// Read previously persisted payments information from the store. pub(crate) async fn read_payments( - kv_store: Arc, logger: L, + kv_store: &DynStore, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, @@ -323,7 +323,7 @@ pub(crate) async fn read_output_sweeper( } pub(crate) async fn read_node_metrics( - kv_store: Arc, logger: L, + kv_store: &DynStore, logger: L, ) -> Result where L::Target: LdkLogger, @@ -342,7 +342,7 @@ where } pub(crate) fn write_node_metrics( - node_metrics: &NodeMetrics, kv_store: Arc, logger: L, + node_metrics: &NodeMetrics, kv_store: &DynStore, logger: L, ) -> Result<(), Error> where L::Target: LdkLogger, @@ -469,7 +469,7 @@ macro_rules! impl_read_write_change_set_type { $key:expr ) => { pub(crate) fn $read_name( - kv_store: Arc, logger: L, + kv_store: &DynStore, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, @@ -510,7 +510,7 @@ macro_rules! impl_read_write_change_set_type { } pub(crate) fn $write_name( - value: &$change_set_type, kv_store: Arc, logger: L, + value: &$change_set_type, kv_store: &DynStore, logger: L, ) -> Result<(), std::io::Error> where L::Target: LdkLogger, @@ -588,41 +588,35 @@ impl_read_write_change_set_type!( // Reads the full BdkWalletChangeSet or returns default fields pub(crate) fn read_bdk_wallet_change_set( - kv_store: Arc, logger: Arc, + kv_store: &DynStore, logger: &Logger, ) -> Result, std::io::Error> { let mut change_set = BdkWalletChangeSet::default(); // We require a descriptor and return `None` to signal creation of a new wallet otherwise. - if let Some(descriptor) = - read_bdk_wallet_descriptor(Arc::clone(&kv_store), Arc::clone(&logger))? - { + if let Some(descriptor) = read_bdk_wallet_descriptor(kv_store, logger)? { change_set.descriptor = Some(descriptor); } else { return Ok(None); } // We require a change_descriptor and return `None` to signal creation of a new wallet otherwise. - if let Some(change_descriptor) = - read_bdk_wallet_change_descriptor(Arc::clone(&kv_store), Arc::clone(&logger))? - { + if let Some(change_descriptor) = read_bdk_wallet_change_descriptor(kv_store, logger)? { change_set.change_descriptor = Some(change_descriptor); } else { return Ok(None); } // We require a network and return `None` to signal creation of a new wallet otherwise. - if let Some(network) = read_bdk_wallet_network(Arc::clone(&kv_store), Arc::clone(&logger))? { + if let Some(network) = read_bdk_wallet_network(kv_store, logger)? { change_set.network = Some(network); } else { return Ok(None); } - read_bdk_wallet_local_chain(Arc::clone(&kv_store), Arc::clone(&logger))? + read_bdk_wallet_local_chain(&*kv_store, logger)? .map(|local_chain| change_set.local_chain = local_chain); - read_bdk_wallet_tx_graph(Arc::clone(&kv_store), Arc::clone(&logger))? - .map(|tx_graph| change_set.tx_graph = tx_graph); - read_bdk_wallet_indexer(Arc::clone(&kv_store), Arc::clone(&logger))? - .map(|indexer| change_set.indexer = indexer); + read_bdk_wallet_tx_graph(&*kv_store, logger)?.map(|tx_graph| change_set.tx_graph = tx_graph); + read_bdk_wallet_indexer(&*kv_store, logger)?.map(|indexer| change_set.indexer = indexer); Ok(Some(change_set)) } diff --git a/src/lib.rs b/src/lib.rs index b050fba57..e15b708ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -291,7 +291,7 @@ impl Node { { let mut locked_node_metrics = gossip_node_metrics.write().unwrap(); locked_node_metrics.latest_rgs_snapshot_timestamp = Some(updated_timestamp); - write_node_metrics(&*locked_node_metrics, Arc::clone(&gossip_sync_store), Arc::clone(&gossip_sync_logger)) + write_node_metrics(&*locked_node_metrics, &*gossip_sync_store, Arc::clone(&gossip_sync_logger)) .unwrap_or_else(|e| { log_error!(gossip_sync_logger, "Persistence failed: {}", e); }); @@ -507,7 +507,7 @@ impl Node { { let mut locked_node_metrics = bcast_node_metrics.write().unwrap(); locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt; - write_node_metrics(&*locked_node_metrics, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + write_node_metrics(&*locked_node_metrics, &*bcast_store, Arc::clone(&bcast_logger)) .unwrap_or_else(|e| { log_error!(bcast_logger, "Persistence failed: {}", e); }); diff --git a/src/scoring.rs b/src/scoring.rs index daa5725fa..2e0d226ff 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -82,12 +82,8 @@ async fn sync_external_scores( }; match ChannelLiquidities::read(&mut &*reader) { Ok(liquidities) => { - if let Err(e) = write_external_pathfinding_scores_to_cache( - Arc::clone(&kv_store), - &liquidities, - logger, - ) - .await + if let Err(e) = + write_external_pathfinding_scores_to_cache(&*kv_store, &liquidities, logger).await { log_error!(logger, "Failed to persist external scores to cache: {}", e); } @@ -98,10 +94,9 @@ async fn sync_external_scores( let mut locked_node_metrics = node_metrics.write().unwrap(); locked_node_metrics.latest_pathfinding_scores_sync_timestamp = Some(duration_since_epoch.as_secs()); - write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), logger) - .unwrap_or_else(|e| { - log_error!(logger, "Persisting node metrics failed: {}", e); - }); + write_node_metrics(&*locked_node_metrics, &*kv_store, logger).unwrap_or_else(|e| { + log_error!(logger, "Persisting node metrics failed: {}", e); + }); log_trace!(logger, "External scores merged successfully"); }, Err(e) => { diff --git a/src/wallet/persist.rs b/src/wallet/persist.rs index 5c8668937..10be1fac0 100644 --- a/src/wallet/persist.rs +++ b/src/wallet/persist.rs @@ -38,10 +38,7 @@ impl WalletPersister for KVStoreWalletPersister { return Ok(latest_change_set.clone()); } - let change_set_opt = read_bdk_wallet_change_set( - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), - )?; + let change_set_opt = read_bdk_wallet_change_set(&*persister.kv_store, &*persister.logger)?; let change_set = match change_set_opt { Some(persisted_change_set) => persisted_change_set, @@ -87,11 +84,7 @@ impl WalletPersister for KVStoreWalletPersister { )); } else { latest_change_set.descriptor = Some(descriptor.clone()); - write_bdk_wallet_descriptor( - &descriptor, - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), - )?; + write_bdk_wallet_descriptor(&descriptor, &*persister.kv_store, &*persister.logger)?; } } @@ -112,8 +105,8 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.change_descriptor = Some(change_descriptor.clone()); write_bdk_wallet_change_descriptor( &change_descriptor, - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), + &*persister.kv_store, + &*persister.logger, )?; } } @@ -131,11 +124,7 @@ impl WalletPersister for KVStoreWalletPersister { )); } else { latest_change_set.network = Some(network); - write_bdk_wallet_network( - &network, - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), - )?; + write_bdk_wallet_network(&network, &*persister.kv_store, &*persister.logger)?; } } @@ -157,7 +146,7 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.indexer.merge(change_set.indexer.clone()); write_bdk_wallet_indexer( &latest_change_set.indexer, - Arc::clone(&persister.kv_store), + &*persister.kv_store, Arc::clone(&persister.logger), )?; } @@ -166,7 +155,7 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.tx_graph.merge(change_set.tx_graph.clone()); write_bdk_wallet_tx_graph( &latest_change_set.tx_graph, - Arc::clone(&persister.kv_store), + &*persister.kv_store, Arc::clone(&persister.logger), )?; } @@ -175,7 +164,7 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.local_chain.merge(change_set.local_chain.clone()); write_bdk_wallet_local_chain( &latest_change_set.local_chain, - Arc::clone(&persister.kv_store), + &*persister.kv_store, Arc::clone(&persister.logger), )?; } From c724a893ece9a21b8332aab416ef3288b2488c2a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 7 Jan 2026 10:56:33 +0100 Subject: [PATCH 5/5] Add test for payment persistence after node restart Add integration test that verifies 200 payments are correctly persisted and retrievable via `list_payments` after restarting a node. Co-Authored-By: Claude AI --- tests/integration_tests_rust.rs | 121 +++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 655b5fd94..4d2a17422 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -23,7 +23,8 @@ use common::{ expect_splice_pending_event, generate_blocks_and_wait, open_channel, open_channel_push_amt, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_config, random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, - setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestSyncStore, + setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestStoreType, + TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::liquidity::LSPS2ServiceConfig; @@ -2317,3 +2318,121 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { Some(6) ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn payment_persistence_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + + // Setup nodes manually so we can restart node_a with the same config + println!("== Node A =="); + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + + let num_payments = 200; + let payment_amount_msat = 1_000_000; // 1000 sats per payment + + { + let node_a = setup_node(&chain_source, config_a.clone()); + + println!("\n== Node B =="); + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + // Premine sufficient funds for a large channel and many payments + let premine_amount_sat = 10_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!(node_b.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Open a large channel from node_a to node_b + let channel_amount_sat = 5_000_000; + open_channel(&node_a, &node_b, channel_amount_sat, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Send 200 payments from node_a to node_b + println!("\nSending {} payments from A to B...", num_payments); + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new(String::from("test")).unwrap()); + + for i in 0..num_payments { + let invoice = node_b + .bolt11_payment() + .receive(payment_amount_msat, &invoice_description.clone().into(), 3600) + .unwrap(); + let payment_id = node_a.bolt11_payment().send(&invoice, None).unwrap(); + expect_event!(node_a, PaymentSuccessful); + expect_event!(node_b, PaymentReceived); + + if (i + 1) % 50 == 0 { + println!("Completed {} payments", i + 1); + } + + // Verify payment succeeded + assert_eq!(node_a.payment(&payment_id).unwrap().status, PaymentStatus::Succeeded); + } + println!("All {} payments completed successfully", num_payments); + + // Verify node_a has 200 outbound Bolt11 payments before shutdown + let outbound_payments_before = node_a.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Bolt11 { .. }) + }); + assert_eq!(outbound_payments_before.len(), num_payments); + + // Shut down both nodes + println!("\nShutting down nodes..."); + node_a.stop().unwrap(); + node_b.stop().unwrap(); + } + + // Restart node_a with the same config + println!("\nRestarting node A..."); + let restarted_node_a = setup_node(&chain_source, config_a); + + // Assert all 200 payments are still in the store + let outbound_payments_after = restarted_node_a.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound && matches!(p.kind, PaymentKind::Bolt11 { .. }) + }); + assert_eq!( + outbound_payments_after.len(), + num_payments, + "Expected {} payments after restart, found {}", + num_payments, + outbound_payments_after.len() + ); + + // Verify all payments have the correct status + for payment in &outbound_payments_after { + assert_eq!( + payment.status, + PaymentStatus::Succeeded, + "Payment {:?} has unexpected status {:?}", + payment.id, + payment.status + ); + assert_eq!(payment.amount_msat, Some(payment_amount_msat)); + } + + println!( + "Successfully verified {} payments persisted after restart", + outbound_payments_after.len() + ); + + restarted_node_a.stop().unwrap(); +}