diff --git a/src/builder.rs b/src/builder.rs index 08ac123fa..e7e2de286 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -19,11 +19,8 @@ use bitcoin::bip32::{ChildNumber, Xpriv}; use bitcoin::key::Secp256k1; use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Network}; - use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; - use lightning::chain::{chainmonitor, BestBlock, Watch}; -use lightning::io::Cursor; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; @@ -57,7 +54,9 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics, + read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, + read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer, + write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ @@ -1053,7 +1052,9 @@ fn build_with_store_internal( } // Initialize the status fields. - let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) { + let node_metrics = match runtime + .block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await }) + { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1067,19 +1068,20 @@ fn build_with_store_internal( let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) { - Ok(payments) => Arc::new(PaymentStore::new( - payments, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), - Arc::clone(&kv_store), - Arc::clone(&logger), - )), - Err(e) => { - log_error!(logger, "Failed to read payment data from store: {}", e); - return Err(BuildError::ReadFailed); - }, - }; + let payment_store = + match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) { + Ok(payments) => Arc::new(PaymentStore::new( + payments, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { @@ -1294,24 +1296,23 @@ fn build_with_store_internal( )); // Initialize the network graph, scorer, and router - let network_graph = - match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) { - Ok(graph) => Arc::new(graph), - Err(e) => { - if e.kind() == std::io::ErrorKind::NotFound { - Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) - } else { - log_error!(logger, "Failed to read network graph from store: {}", e); - return Err(BuildError::ReadFailed); - } - }, - }; + let network_graph = match runtime + .block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await }) + { + Ok(graph) => Arc::new(graph), + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) + } else { + log_error!(logger, "Failed to read network graph from store: {}", e); + return Err(BuildError::ReadFailed); + } + }, + }; - let local_scorer = match io::utils::read_scorer( - Arc::clone(&kv_store), - Arc::clone(&network_graph), - Arc::clone(&logger), - ) { + let local_scorer = match runtime.block_on(async { + read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await + }) { Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1327,7 +1328,9 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); // Restore external pathfinding scores from cache if possible. - match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) { + match runtime.block_on(async { + read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await + }) { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); @@ -1380,13 +1383,12 @@ fn build_with_store_internal( // Initialize the ChannelManager let channel_manager = { - if let Ok(res) = KVStoreSync::read( + if let Ok(reader) = KVStoreSync::read( &*kv_store, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, ) { - let mut reader = Cursor::new(res); let channel_monitor_references = channel_monitors.iter().map(|(_, chanmon)| chanmon).collect(); let read_args = ChannelManagerReadArgs::new( @@ -1403,7 +1405,7 @@ fn build_with_store_internal( channel_monitor_references, ); let (_hash, channel_manager) = - <(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| { + <(BlockHash, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| { log_error!(logger, "Failed to read channel manager from store: {}", e); BuildError::ReadFailed })?; @@ -1486,15 +1488,11 @@ fn build_with_store_internal( { let mut locked_node_metrics = node_metrics.write().unwrap(); locked_node_metrics.latest_rgs_snapshot_timestamp = None; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .map_err(|e| { - log_error!(logger, "Failed writing to store: {}", e); - BuildError::WriteFailed - })?; + write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger)) + .map_err(|e| { + log_error!(logger, "Failed writing to store: {}", e); + BuildError::WriteFailed + })?; } p2p_source }, @@ -1616,14 +1614,17 @@ fn build_with_store_internal( let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); - let output_sweeper = match io::utils::read_output_sweeper( - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&chain_source), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - Arc::clone(&logger), - ) { + let output_sweeper = match runtime.block_on(async { + read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + Arc::clone(&keys_manager), + Arc::clone(&kv_store), + Arc::clone(&logger), + ) + .await + }) { Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1644,7 +1645,8 @@ fn build_with_store_internal( }, }; - let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)) + let event_queue = match runtime + .block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await }) { Ok(event_queue) => Arc::new(event_queue), Err(e) => { @@ -1657,7 +1659,9 @@ fn build_with_store_internal( }, }; - let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) { + let peer_store = match runtime + .block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { Ok(peer_store) => Arc::new(peer_store), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 0c3b644ca..69255e080 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -205,14 +205,10 @@ impl BitcoindChainSource { unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - ) - .unwrap_or_else(|e| { - log_error!(self.logger, "Failed to persist node metrics: {}", e); - }); + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger) + .unwrap_or_else(|e| { + log_error!(self.logger, "Failed to persist node metrics: {}", e); + }); } break; }, @@ -420,11 +416,11 @@ impl BitcoindChainSource { *self.latest_chain_tip.write().unwrap() = Some(tip); periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - chain_monitor, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - Arc::clone(&self.node_metrics), + &*channel_manager, + &*chain_monitor, + &*self.kv_store, + &*self.logger, + &*self.node_metrics, )?; }, Ok(_) => {}, @@ -469,11 +465,7 @@ impl BitcoindChainSource { locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; Ok(()) } @@ -586,11 +578,7 @@ impl BitcoindChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } Ok(()) diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 9e05dfaee..b520b2e11 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -149,8 +149,8 @@ impl ElectrumChainSource { unix_time_secs_opt; write_node_metrics( &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), + &*self.kv_store, + &*self.logger, )?; } Ok(()) @@ -239,19 +239,15 @@ impl ElectrumChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - Arc::clone(&chain_monitor), - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - Arc::clone(&self.node_metrics), + &*channel_manager, + &*chain_monitor, + &*self.kv_store, + &*self.logger, + &*self.node_metrics, )?; } @@ -284,11 +280,7 @@ impl ElectrumChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } Ok(()) diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index f6f313955..2acca4654 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -128,8 +128,8 @@ impl EsploraChainSource { locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics( &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger) + &*self.kv_store, + &*self.logger )?; } Ok(()) @@ -259,19 +259,15 @@ impl EsploraChainSource { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } periodically_archive_fully_resolved_monitors( - Arc::clone(&channel_manager), - Arc::clone(&chain_monitor), - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - Arc::clone(&self.node_metrics), + &*channel_manager, + &*chain_monitor, + &*self.kv_store, + &*self.logger, + &*self.node_metrics, )?; Ok(()) }, @@ -353,11 +349,7 @@ impl EsploraChainSource { { let mut locked_node_metrics = self.node_metrics.write().unwrap(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; + write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } Ok(()) diff --git a/src/chain/mod.rs b/src/chain/mod.rs index a73ce7418..1010f32b7 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -488,8 +488,8 @@ impl Filter for ChainSource { } fn periodically_archive_fully_resolved_monitors( - channel_manager: Arc, chain_monitor: Arc, - kv_store: Arc, logger: Arc, node_metrics: Arc>, + channel_manager: &ChannelManager, chain_monitor: &ChainMonitor, kv_store: &DynStore, + logger: &Logger, node_metrics: &RwLock, ) -> Result<(), Error> { let mut locked_node_metrics = node_metrics.write().unwrap(); let cur_height = channel_manager.current_best_block().height; diff --git a/src/ffi/types.rs b/src/ffi/types.rs index a5ff8372f..bed040fcd 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -29,6 +29,7 @@ use lightning::offers::invoice::Bolt12Invoice as LdkBolt12Invoice; pub use lightning::offers::offer::OfferId; use lightning::offers::offer::{Amount as LdkAmount, Offer as LdkOffer}; use lightning::offers::refund::Refund as LdkRefund; +use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName; pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees}; pub use lightning::routing::router::RouteParametersConfig; use lightning::util::ser::Writeable; @@ -56,9 +57,6 @@ pub use crate::payment::store::{ ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, }; pub use crate::payment::UnifiedPaymentResult; - -use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName; - use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; impl UniffiCustomTypeConverter for PublicKey { diff --git a/src/io/utils.rs b/src/io/utils.rs index 928d4031b..68ca7a61e 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -18,7 +18,6 @@ use bdk_chain::tx_graph::ChangeSet as BdkTxGraphChangeSet; use bdk_chain::ConfirmationBlockTime; use bdk_wallet::ChangeSet as BdkWalletChangeSet; use bitcoin::Network; -use lightning::io::Cursor; use lightning::ln::msgs::DecodeError; use lightning::routing::gossip::NetworkGraph; use lightning::routing::scoring::{ @@ -88,59 +87,62 @@ pub(crate) fn read_or_generate_seed_file( } /// Read a previously persisted [`NetworkGraph`] from the store. -pub(crate) fn read_network_graph( - kv_store: Arc, logger: L, +pub(crate) async fn read_network_graph( + kv_store: &DynStore, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( + let reader = KVStore::read( &*kv_store, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, - )?); - NetworkGraph::read(&mut reader, logger.clone()).map_err(|e| { + ) + .await?; + NetworkGraph::read(&mut &*reader, logger.clone()).map_err(|e| { log_error!(logger, "Failed to deserialize NetworkGraph: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph") }) } /// Read a previously persisted [`ProbabilisticScorer`] from the store. -pub(crate) fn read_scorer>, L: Deref + Clone>( - kv_store: Arc, network_graph: G, logger: L, +pub(crate) async fn read_scorer>, L: Deref + Clone>( + kv_store: &DynStore, network_graph: G, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { let params = ProbabilisticScoringDecayParameters::default(); - let mut reader = Cursor::new(KVStoreSync::read( + let reader = KVStore::read( &*kv_store, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, - )?); + ) + .await?; let args = (params, network_graph, logger.clone()); - ProbabilisticScorer::read(&mut reader, args).map_err(|e| { + ProbabilisticScorer::read(&mut &*reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") }) } /// Read previously persisted external pathfinding scores from the cache. -pub(crate) fn read_external_pathfinding_scores_from_cache( - kv_store: Arc, logger: L, +pub(crate) async fn read_external_pathfinding_scores_from_cache( + kv_store: &DynStore, logger: L, ) -> Result where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( + let reader = KVStore::read( &*kv_store, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, - )?); - ChannelLiquidities::read(&mut reader).map_err(|e| { + ) + .await?; + ChannelLiquidities::read(&mut &*reader).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") }) @@ -148,7 +150,7 @@ where /// Persist external pathfinding scores to the cache. pub(crate) async fn write_external_pathfinding_scores_to_cache( - kv_store: Arc, data: &ChannelLiquidities, logger: L, + kv_store: &DynStore, data: &ChannelLiquidities, logger: L, ) -> Result<(), Error> where L::Target: LdkLogger, @@ -175,64 +177,107 @@ where } /// Read previously persisted events from the store. -pub(crate) fn read_event_queue( +pub(crate) async fn read_event_queue( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( + let reader = KVStore::read( &*kv_store, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, - )?); - EventQueue::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { + ) + .await?; + EventQueue::read(&mut &*reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize event queue: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue") }) } /// Read previously persisted peer info from the store. -pub(crate) fn read_peer_info( +pub(crate) async fn read_peer_info( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( + let reader = KVStore::read( &*kv_store, PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PEER_INFO_PERSISTENCE_KEY, - )?); - PeerStore::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { + ) + .await?; + PeerStore::read(&mut &*reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize peer store: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore") }) } /// Read previously persisted payments information from the store. -pub(crate) fn read_payments( - kv_store: Arc, logger: L, +pub(crate) async fn read_payments( + kv_store: &DynStore, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { let mut res = Vec::new(); - for stored_key in KVStoreSync::list( + let mut stored_keys = KVStore::list( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - )? { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &stored_key, - )?); - let payment = PaymentDetails::read(&mut reader).map_err(|e| { + ) + .await?; + + const BATCH_SIZE: usize = 50; + + let mut set = tokio::task::JoinSet::new(); + + // Fill JoinSet with tasks if possible + while set.len() < BATCH_SIZE && !stored_keys.is_empty() { + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + } + + while let Some(read_res) = set.join_next().await { + // Exit early if we get an IO error. + let reader = read_res + .map_err(|e| { + log_error!(logger, "Failed to read PaymentDetails: {}", e); + set.abort_all(); + e + })? + .map_err(|e| { + log_error!(logger, "Failed to read PaymentDetails: {}", e); + set.abort_all(); + e + })?; + + // Refill set for every finished future, if we still have something to do. + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + + // Handle result. + let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -241,21 +286,26 @@ where })?; res.push(payment); } + + debug_assert!(set.is_empty()); + debug_assert!(stored_keys.is_empty()); + Ok(res) } /// Read `OutputSweeper` state from the store. -pub(crate) fn read_output_sweeper( +pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, chain_data_source: Arc, keys_manager: Arc, kv_store: Arc, logger: Arc, ) -> Result { - let mut reader = Cursor::new(KVStoreSync::read( + let reader = KVStore::read( &*kv_store, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, - )?); + ) + .await?; let args = ( broadcaster, fee_estimator, @@ -265,33 +315,34 @@ pub(crate) fn read_output_sweeper( kv_store, logger.clone(), ); - let (_, sweeper) = <(_, Sweeper)>::read(&mut reader, args).map_err(|e| { + let (_, sweeper) = <(_, Sweeper)>::read(&mut &*reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize OutputSweeper: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize OutputSweeper") })?; Ok(sweeper) } -pub(crate) fn read_node_metrics( - kv_store: Arc, logger: L, +pub(crate) async fn read_node_metrics( + kv_store: &DynStore, logger: L, ) -> Result where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( + let reader = KVStore::read( &*kv_store, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, NODE_METRICS_KEY, - )?); - NodeMetrics::read(&mut reader).map_err(|e| { + ) + .await?; + NodeMetrics::read(&mut &*reader).map_err(|e| { log_error!(logger, "Failed to deserialize NodeMetrics: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NodeMetrics") }) } pub(crate) fn write_node_metrics( - node_metrics: &NodeMetrics, kv_store: Arc, logger: L, + node_metrics: &NodeMetrics, kv_store: &DynStore, logger: L, ) -> Result<(), Error> where L::Target: LdkLogger, @@ -418,12 +469,12 @@ macro_rules! impl_read_write_change_set_type { $key:expr ) => { pub(crate) fn $read_name( - kv_store: Arc, logger: L, + kv_store: &DynStore, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let bytes = + let reader = match KVStoreSync::read(&*kv_store, $primary_namespace, $secondary_namespace, $key) { Ok(bytes) => bytes, @@ -444,9 +495,8 @@ macro_rules! impl_read_write_change_set_type { }, }; - let mut reader = Cursor::new(bytes); let res: Result, DecodeError> = - Readable::read(&mut reader); + Readable::read(&mut &*reader); match res { Ok(res) => Ok(Some(res.0)), Err(e) => { @@ -460,7 +510,7 @@ macro_rules! impl_read_write_change_set_type { } pub(crate) fn $write_name( - value: &$change_set_type, kv_store: Arc, logger: L, + value: &$change_set_type, kv_store: &DynStore, logger: L, ) -> Result<(), std::io::Error> where L::Target: LdkLogger, @@ -538,41 +588,35 @@ impl_read_write_change_set_type!( // Reads the full BdkWalletChangeSet or returns default fields pub(crate) fn read_bdk_wallet_change_set( - kv_store: Arc, logger: Arc, + kv_store: &DynStore, logger: &Logger, ) -> Result, std::io::Error> { let mut change_set = BdkWalletChangeSet::default(); // We require a descriptor and return `None` to signal creation of a new wallet otherwise. - if let Some(descriptor) = - read_bdk_wallet_descriptor(Arc::clone(&kv_store), Arc::clone(&logger))? - { + if let Some(descriptor) = read_bdk_wallet_descriptor(kv_store, logger)? { change_set.descriptor = Some(descriptor); } else { return Ok(None); } // We require a change_descriptor and return `None` to signal creation of a new wallet otherwise. - if let Some(change_descriptor) = - read_bdk_wallet_change_descriptor(Arc::clone(&kv_store), Arc::clone(&logger))? - { + if let Some(change_descriptor) = read_bdk_wallet_change_descriptor(kv_store, logger)? { change_set.change_descriptor = Some(change_descriptor); } else { return Ok(None); } // We require a network and return `None` to signal creation of a new wallet otherwise. - if let Some(network) = read_bdk_wallet_network(Arc::clone(&kv_store), Arc::clone(&logger))? { + if let Some(network) = read_bdk_wallet_network(kv_store, logger)? { change_set.network = Some(network); } else { return Ok(None); } - read_bdk_wallet_local_chain(Arc::clone(&kv_store), Arc::clone(&logger))? + read_bdk_wallet_local_chain(&*kv_store, logger)? .map(|local_chain| change_set.local_chain = local_chain); - read_bdk_wallet_tx_graph(Arc::clone(&kv_store), Arc::clone(&logger))? - .map(|tx_graph| change_set.tx_graph = tx_graph); - read_bdk_wallet_indexer(Arc::clone(&kv_store), Arc::clone(&logger))? - .map(|indexer| change_set.indexer = indexer); + read_bdk_wallet_tx_graph(&*kv_store, logger)?.map(|tx_graph| change_set.tx_graph = tx_graph); + read_bdk_wallet_indexer(&*kv_store, logger)?.map(|indexer| change_set.indexer = indexer); Ok(Some(change_set)) } diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index eb439ed10..b4fdc770a 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -745,11 +745,10 @@ async fn determine_and_write_schema_version( })? .0; - let schema_version: VssSchemaVersion = Readable::read(&mut io::Cursor::new(decrypted)) - .map_err(|e| { - let msg = format!("Failed to decode schema version: {}", e); - Error::new(ErrorKind::Other, msg) - })?; + let schema_version: VssSchemaVersion = Readable::read(&mut &*decrypted).map_err(|e| { + let msg = format!("Failed to decode schema version: {}", e); + Error::new(ErrorKind::Other, msg) + })?; Ok(schema_version) } else { // The schema version wasn't present, this either means we're running for the first time *or* it's V0 pre-migration (predating writing of the schema version). diff --git a/src/lib.rs b/src/lib.rs index b050fba57..e15b708ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -291,7 +291,7 @@ impl Node { { let mut locked_node_metrics = gossip_node_metrics.write().unwrap(); locked_node_metrics.latest_rgs_snapshot_timestamp = Some(updated_timestamp); - write_node_metrics(&*locked_node_metrics, Arc::clone(&gossip_sync_store), Arc::clone(&gossip_sync_logger)) + write_node_metrics(&*locked_node_metrics, &*gossip_sync_store, Arc::clone(&gossip_sync_logger)) .unwrap_or_else(|e| { log_error!(gossip_sync_logger, "Persistence failed: {}", e); }); @@ -507,7 +507,7 @@ impl Node { { let mut locked_node_metrics = bcast_node_metrics.write().unwrap(); locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt; - write_node_metrics(&*locked_node_metrics, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + write_node_metrics(&*locked_node_metrics, &*bcast_store, Arc::clone(&bcast_logger)) .unwrap_or_else(|e| { log_error!(bcast_logger, "Persistence failed: {}", e); }); diff --git a/src/payment/store.rs b/src/payment/store.rs index 184de2ea9..15e94190c 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -605,7 +605,6 @@ impl StorableObjectUpdate for PaymentDetailsUpdate { #[cfg(test)] mod tests { - use bitcoin::io::Cursor; use lightning::util::ser::Readable; use super::*; @@ -657,16 +656,12 @@ mod tests { let old_bolt11_encoded = old_bolt11_payment.encode(); assert_eq!( old_bolt11_payment, - OldPaymentDetails::read(&mut Cursor::new(old_bolt11_encoded.clone())).unwrap() + OldPaymentDetails::read(&mut &*old_bolt11_encoded.clone()).unwrap() ); - let bolt11_decoded = - PaymentDetails::read(&mut Cursor::new(old_bolt11_encoded)).unwrap(); + let bolt11_decoded = PaymentDetails::read(&mut &*old_bolt11_encoded).unwrap(); let bolt11_reencoded = bolt11_decoded.encode(); - assert_eq!( - bolt11_decoded, - PaymentDetails::read(&mut Cursor::new(bolt11_reencoded)).unwrap() - ); + assert_eq!(bolt11_decoded, PaymentDetails::read(&mut &*bolt11_reencoded).unwrap()); match bolt11_decoded.kind { PaymentKind::Bolt11 { hash: h, preimage: p, secret: s } => { @@ -700,15 +695,14 @@ mod tests { let old_bolt11_jit_encoded = old_bolt11_jit_payment.encode(); assert_eq!( old_bolt11_jit_payment, - OldPaymentDetails::read(&mut Cursor::new(old_bolt11_jit_encoded.clone())).unwrap() + OldPaymentDetails::read(&mut &*old_bolt11_jit_encoded.clone()).unwrap() ); - let bolt11_jit_decoded = - PaymentDetails::read(&mut Cursor::new(old_bolt11_jit_encoded)).unwrap(); + let bolt11_jit_decoded = PaymentDetails::read(&mut &*old_bolt11_jit_encoded).unwrap(); let bolt11_jit_reencoded = bolt11_jit_decoded.encode(); assert_eq!( bolt11_jit_decoded, - PaymentDetails::read(&mut Cursor::new(bolt11_jit_reencoded)).unwrap() + PaymentDetails::read(&mut &*bolt11_jit_reencoded).unwrap() ); match bolt11_jit_decoded.kind { @@ -746,15 +740,14 @@ mod tests { let old_spontaneous_encoded = old_spontaneous_payment.encode(); assert_eq!( old_spontaneous_payment, - OldPaymentDetails::read(&mut Cursor::new(old_spontaneous_encoded.clone())).unwrap() + OldPaymentDetails::read(&mut &*old_spontaneous_encoded.clone()).unwrap() ); - let spontaneous_decoded = - PaymentDetails::read(&mut Cursor::new(old_spontaneous_encoded)).unwrap(); + let spontaneous_decoded = PaymentDetails::read(&mut &*old_spontaneous_encoded).unwrap(); let spontaneous_reencoded = spontaneous_decoded.encode(); assert_eq!( spontaneous_decoded, - PaymentDetails::read(&mut Cursor::new(spontaneous_reencoded)).unwrap() + PaymentDetails::read(&mut &*spontaneous_reencoded).unwrap() ); match spontaneous_decoded.kind { diff --git a/src/payment/unified.rs b/src/payment/unified.rs index b1546961b..8225205fd 100644 --- a/src/payment/unified.rs +++ b/src/payment/unified.rs @@ -21,16 +21,14 @@ use bip21::de::ParamKind; use bip21::{DeserializationError, DeserializeParams, Param, SerializeParams}; use bitcoin::address::NetworkChecked; use bitcoin::{Amount, Txid}; +use bitcoin_payment_instructions::amount::Amount as BPIAmount; +use bitcoin_payment_instructions::{PaymentInstructions, PaymentMethod}; use lightning::ln::channelmanager::PaymentId; use lightning::offers::offer::Offer; use lightning::onion_message::dns_resolution::HumanReadableName; use lightning::routing::router::RouteParametersConfig; use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description}; -use bitcoin_payment_instructions::{ - amount::Amount as BPIAmount, PaymentInstructions, PaymentMethod, -}; - use crate::error::Error; use crate::ffi::maybe_wrap; use crate::logger::{log_error, LdkLogger, Logger}; @@ -393,7 +391,8 @@ impl DeserializationError for Extras { mod tests { use std::str::FromStr; - use bitcoin::{address::NetworkUnchecked, Address, Network}; + use bitcoin::address::NetworkUnchecked; + use bitcoin::{Address, Network}; use super::{Amount, Bolt11Invoice, Extras, Offer}; diff --git a/src/scoring.rs b/src/scoring.rs index 6385f2f56..2e0d226ff 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -1,4 +1,3 @@ -use std::io::Cursor; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, SystemTime}; @@ -74,22 +73,17 @@ async fn sync_external_scores( return; }, }; - let body = match response.bytes().await { + let reader = match response.bytes().await { Ok(bytes) => bytes, Err(e) => { log_error!(logger, "Failed to read external scores update: {}", e); return; }, }; - let mut reader = Cursor::new(body); - match ChannelLiquidities::read(&mut reader) { + match ChannelLiquidities::read(&mut &*reader) { Ok(liquidities) => { - if let Err(e) = write_external_pathfinding_scores_to_cache( - Arc::clone(&kv_store), - &liquidities, - logger, - ) - .await + if let Err(e) = + write_external_pathfinding_scores_to_cache(&*kv_store, &liquidities, logger).await { log_error!(logger, "Failed to persist external scores to cache: {}", e); } @@ -100,10 +94,9 @@ async fn sync_external_scores( let mut locked_node_metrics = node_metrics.write().unwrap(); locked_node_metrics.latest_pathfinding_scores_sync_timestamp = Some(duration_since_epoch.as_secs()); - write_node_metrics(&*locked_node_metrics, Arc::clone(&kv_store), logger) - .unwrap_or_else(|e| { - log_error!(logger, "Persisting node metrics failed: {}", e); - }); + write_node_metrics(&*locked_node_metrics, &*kv_store, logger).unwrap_or_else(|e| { + log_error!(logger, "Persisting node metrics failed: {}", e); + }); log_trace!(logger, "External scores merged successfully"); }, Err(e) => { diff --git a/src/types.rs b/src/types.rs index 8835cb424..5e9cd74c9 100644 --- a/src/types.rs +++ b/src/types.rs @@ -12,6 +12,7 @@ use std::sync::{Arc, Mutex}; use bitcoin::secp256k1::PublicKey; use bitcoin::{OutPoint, ScriptBuf}; +use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::chainmonitor; use lightning::impl_writeable_tlv_based; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; @@ -29,8 +30,6 @@ use lightning_block_sync::gossip::GossipVerifier; use lightning_liquidity::utils::time::DefaultTimeProvider; use lightning_net_tokio::SocketDescriptor; -use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; - use crate::chain::bitcoind::UtxoSourceClient; use crate::chain::ChainSource; use crate::config::ChannelConfig; diff --git a/src/wallet/persist.rs b/src/wallet/persist.rs index 5c8668937..10be1fac0 100644 --- a/src/wallet/persist.rs +++ b/src/wallet/persist.rs @@ -38,10 +38,7 @@ impl WalletPersister for KVStoreWalletPersister { return Ok(latest_change_set.clone()); } - let change_set_opt = read_bdk_wallet_change_set( - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), - )?; + let change_set_opt = read_bdk_wallet_change_set(&*persister.kv_store, &*persister.logger)?; let change_set = match change_set_opt { Some(persisted_change_set) => persisted_change_set, @@ -87,11 +84,7 @@ impl WalletPersister for KVStoreWalletPersister { )); } else { latest_change_set.descriptor = Some(descriptor.clone()); - write_bdk_wallet_descriptor( - &descriptor, - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), - )?; + write_bdk_wallet_descriptor(&descriptor, &*persister.kv_store, &*persister.logger)?; } } @@ -112,8 +105,8 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.change_descriptor = Some(change_descriptor.clone()); write_bdk_wallet_change_descriptor( &change_descriptor, - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), + &*persister.kv_store, + &*persister.logger, )?; } } @@ -131,11 +124,7 @@ impl WalletPersister for KVStoreWalletPersister { )); } else { latest_change_set.network = Some(network); - write_bdk_wallet_network( - &network, - Arc::clone(&persister.kv_store), - Arc::clone(&persister.logger), - )?; + write_bdk_wallet_network(&network, &*persister.kv_store, &*persister.logger)?; } } @@ -157,7 +146,7 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.indexer.merge(change_set.indexer.clone()); write_bdk_wallet_indexer( &latest_change_set.indexer, - Arc::clone(&persister.kv_store), + &*persister.kv_store, Arc::clone(&persister.logger), )?; } @@ -166,7 +155,7 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.tx_graph.merge(change_set.tx_graph.clone()); write_bdk_wallet_tx_graph( &latest_change_set.tx_graph, - Arc::clone(&persister.kv_store), + &*persister.kv_store, Arc::clone(&persister.logger), )?; } @@ -175,7 +164,7 @@ impl WalletPersister for KVStoreWalletPersister { latest_change_set.local_chain.merge(change_set.local_chain.clone()); write_bdk_wallet_local_chain( &latest_change_set.local_chain, - Arc::clone(&persister.kv_store), + &*persister.kv_store, Arc::clone(&persister.logger), )?; } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 655b5fd94..4d2a17422 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -23,7 +23,8 @@ use common::{ expect_splice_pending_event, generate_blocks_and_wait, open_channel, open_channel_push_amt, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_config, random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, - setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestSyncStore, + setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestStoreType, + TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::liquidity::LSPS2ServiceConfig; @@ -2317,3 +2318,121 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { Some(6) ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn payment_persistence_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + + // Setup nodes manually so we can restart node_a with the same config + println!("== Node A =="); + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + + let num_payments = 200; + let payment_amount_msat = 1_000_000; // 1000 sats per payment + + { + let node_a = setup_node(&chain_source, config_a.clone()); + + println!("\n== Node B =="); + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + // Premine sufficient funds for a large channel and many payments + let premine_amount_sat = 10_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!(node_b.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Open a large channel from node_a to node_b + let channel_amount_sat = 5_000_000; + open_channel(&node_a, &node_b, channel_amount_sat, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Send 200 payments from node_a to node_b + println!("\nSending {} payments from A to B...", num_payments); + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new(String::from("test")).unwrap()); + + for i in 0..num_payments { + let invoice = node_b + .bolt11_payment() + .receive(payment_amount_msat, &invoice_description.clone().into(), 3600) + .unwrap(); + let payment_id = node_a.bolt11_payment().send(&invoice, None).unwrap(); + expect_event!(node_a, PaymentSuccessful); + expect_event!(node_b, PaymentReceived); + + if (i + 1) % 50 == 0 { + println!("Completed {} payments", i + 1); + } + + // Verify payment succeeded + assert_eq!(node_a.payment(&payment_id).unwrap().status, PaymentStatus::Succeeded); + } + println!("All {} payments completed successfully", num_payments); + + // Verify node_a has 200 outbound Bolt11 payments before shutdown + let outbound_payments_before = node_a.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Bolt11 { .. }) + }); + assert_eq!(outbound_payments_before.len(), num_payments); + + // Shut down both nodes + println!("\nShutting down nodes..."); + node_a.stop().unwrap(); + node_b.stop().unwrap(); + } + + // Restart node_a with the same config + println!("\nRestarting node A..."); + let restarted_node_a = setup_node(&chain_source, config_a); + + // Assert all 200 payments are still in the store + let outbound_payments_after = restarted_node_a.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound && matches!(p.kind, PaymentKind::Bolt11 { .. }) + }); + assert_eq!( + outbound_payments_after.len(), + num_payments, + "Expected {} payments after restart, found {}", + num_payments, + outbound_payments_after.len() + ); + + // Verify all payments have the correct status + for payment in &outbound_payments_after { + assert_eq!( + payment.status, + PaymentStatus::Succeeded, + "Payment {:?} has unexpected status {:?}", + payment.id, + payment.status + ); + assert_eq!(payment.amount_msat, Some(payment_amount_msat)); + } + + println!( + "Successfully verified {} payments persisted after restart", + outbound_payments_after.len() + ); + + restarted_node_a.stop().unwrap(); +}