Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 62 additions & 58 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ use bitcoin::bip32::{ChildNumber, Xpriv};
use bitcoin::key::Secp256k1;
use bitcoin::secp256k1::PublicKey;
use bitcoin::{BlockHash, Network};

use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver;

use lightning::chain::{chainmonitor, BestBlock, Watch};
use lightning::io::Cursor;
use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs};
use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress};
use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
Expand Down Expand Up @@ -57,7 +54,9 @@ use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
write_node_metrics,
};
use crate::io::vss_store::VssStoreBuilder;
use crate::io::{
Expand Down Expand Up @@ -1053,7 +1052,9 @@ fn build_with_store_internal(
}

// Initialize the status fields.
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
let node_metrics = match runtime
.block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await })
{
Ok(metrics) => Arc::new(RwLock::new(metrics)),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1067,19 +1068,20 @@ fn build_with_store_internal(
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
let fee_estimator = Arc::new(OnchainFeeEstimator::new());

let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(payments) => Arc::new(PaymentStore::new(
payments,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read payment data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};
let payment_store =
match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) {
Ok(payments) => Arc::new(PaymentStore::new(
payments,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read payment data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};

let (chain_source, chain_tip_opt) = match chain_data_source_config {
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
Expand Down Expand Up @@ -1294,24 +1296,23 @@ fn build_with_store_internal(
));

// Initialize the network graph, scorer, and router
let network_graph =
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(graph) => Arc::new(graph),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
} else {
log_error!(logger, "Failed to read network graph from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
};
let network_graph = match runtime
.block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await })
{
Ok(graph) => Arc::new(graph),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
} else {
log_error!(logger, "Failed to read network graph from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
};

let local_scorer = match io::utils::read_scorer(
Arc::clone(&kv_store),
Arc::clone(&network_graph),
Arc::clone(&logger),
) {
let local_scorer = match runtime.block_on(async {
read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await
}) {
Ok(scorer) => scorer,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1327,7 +1328,9 @@ fn build_with_store_internal(
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));

// Restore external pathfinding scores from cache if possible.
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
match runtime.block_on(async {
read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await
}) {
Ok(external_scores) => {
scorer.lock().unwrap().merge(external_scores, cur_time);
log_trace!(logger, "External scores from cache merged successfully");
Expand Down Expand Up @@ -1380,13 +1383,12 @@ fn build_with_store_internal(

// Initialize the ChannelManager
let channel_manager = {
if let Ok(res) = KVStoreSync::read(
if let Ok(reader) = KVStoreSync::read(
&*kv_store,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
) {
let mut reader = Cursor::new(res);
let channel_monitor_references =
channel_monitors.iter().map(|(_, chanmon)| chanmon).collect();
let read_args = ChannelManagerReadArgs::new(
Expand All @@ -1403,7 +1405,7 @@ fn build_with_store_internal(
channel_monitor_references,
);
let (_hash, channel_manager) =
<(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| {
<(BlockHash, ChannelManager)>::read(&mut &*reader, read_args).map_err(|e| {
log_error!(logger, "Failed to read channel manager from store: {}", e);
BuildError::ReadFailed
})?;
Expand Down Expand Up @@ -1486,15 +1488,11 @@ fn build_with_store_internal(
{
let mut locked_node_metrics = node_metrics.write().unwrap();
locked_node_metrics.latest_rgs_snapshot_timestamp = None;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.map_err(|e| {
log_error!(logger, "Failed writing to store: {}", e);
BuildError::WriteFailed
})?;
write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger))
.map_err(|e| {
log_error!(logger, "Failed writing to store: {}", e);
BuildError::WriteFailed
})?;
}
p2p_source
},
Expand Down Expand Up @@ -1616,14 +1614,17 @@ fn build_with_store_internal(
let connection_manager =
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));

let output_sweeper = match io::utils::read_output_sweeper(
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&chain_source),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
Arc::clone(&logger),
) {
let output_sweeper = match runtime.block_on(async {
read_output_sweeper(
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&chain_source),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
Arc::clone(&logger),
)
.await
}) {
Ok(output_sweeper) => Arc::new(output_sweeper),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -1644,7 +1645,8 @@ fn build_with_store_internal(
},
};

let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger))
let event_queue = match runtime
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
{
Ok(event_queue) => Arc::new(event_queue),
Err(e) => {
Expand All @@ -1657,7 +1659,9 @@ fn build_with_store_internal(
},
};

let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) {
let peer_store = match runtime
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
{
Ok(peer_store) => Arc::new(peer_store),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
Expand Down
34 changes: 11 additions & 23 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,10 @@ impl BitcoindChainSource {
unix_time_secs_opt;
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)
.unwrap_or_else(|e| {
log_error!(self.logger, "Failed to persist node metrics: {}", e);
});
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)
.unwrap_or_else(|e| {
log_error!(self.logger, "Failed to persist node metrics: {}", e);
});
}
break;
},
Expand Down Expand Up @@ -420,11 +416,11 @@ impl BitcoindChainSource {
*self.latest_chain_tip.write().unwrap() = Some(tip);

periodically_archive_fully_resolved_monitors(
Arc::clone(&channel_manager),
chain_monitor,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
Arc::clone(&self.node_metrics),
&*channel_manager,
&*chain_monitor,
&*self.kv_store,
&*self.logger,
&*self.node_metrics,
)?;
},
Ok(_) => {},
Expand Down Expand Up @@ -469,11 +465,7 @@ impl BitcoindChainSource {
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;

write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)?;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;

Ok(())
}
Expand Down Expand Up @@ -586,11 +578,7 @@ impl BitcoindChainSource {
{
let mut locked_node_metrics = self.node_metrics.write().unwrap();
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)?;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}

Ok(())
Expand Down
26 changes: 9 additions & 17 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ impl ElectrumChainSource {
unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
&*self.kv_store,
&*self.logger,
)?;
}
Ok(())
Expand Down Expand Up @@ -239,19 +239,15 @@ impl ElectrumChainSource {
{
let mut locked_node_metrics = self.node_metrics.write().unwrap();
locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)?;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}

periodically_archive_fully_resolved_monitors(
Arc::clone(&channel_manager),
Arc::clone(&chain_monitor),
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
Arc::clone(&self.node_metrics),
&*channel_manager,
&*chain_monitor,
&*self.kv_store,
&*self.logger,
&*self.node_metrics,
)?;
}

Expand Down Expand Up @@ -284,11 +280,7 @@ impl ElectrumChainSource {
{
let mut locked_node_metrics = self.node_metrics.write().unwrap();
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)?;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}

Ok(())
Expand Down
26 changes: 9 additions & 17 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl EsploraChainSource {
locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger)
&*self.kv_store,
&*self.logger
)?;
}
Ok(())
Expand Down Expand Up @@ -259,19 +259,15 @@ impl EsploraChainSource {
let mut locked_node_metrics = self.node_metrics.write().unwrap();
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)?;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}

periodically_archive_fully_resolved_monitors(
Arc::clone(&channel_manager),
Arc::clone(&chain_monitor),
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
Arc::clone(&self.node_metrics),
&*channel_manager,
&*chain_monitor,
&*self.kv_store,
&*self.logger,
&*self.node_metrics,
)?;
Ok(())
},
Expand Down Expand Up @@ -353,11 +349,7 @@ impl EsploraChainSource {
{
let mut locked_node_metrics = self.node_metrics.write().unwrap();
locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)?;
write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ impl Filter for ChainSource {
}

fn periodically_archive_fully_resolved_monitors(
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
kv_store: Arc<DynStore>, logger: Arc<Logger>, node_metrics: Arc<RwLock<NodeMetrics>>,
channel_manager: &ChannelManager, chain_monitor: &ChainMonitor, kv_store: &DynStore,
logger: &Logger, node_metrics: &RwLock<NodeMetrics>,
) -> Result<(), Error> {
let mut locked_node_metrics = node_metrics.write().unwrap();
let cur_height = channel_manager.current_best_block().height;
Expand Down
Loading