From 395bb4b2b3abc2aa55a98d47f42785d5f82ad0a5 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 5 Jan 2026 13:22:53 +0100 Subject: [PATCH 1/3] Use async `KVStore` for `read_X` util methods Rather than using `KVStoreSync` we now use the async `KVStore` implementation for most `read_X` util methods used during node building. This is a first step towards making node building/startup entirely async eventually. --- src/builder.rs | 76 +++++++++++++++----------- src/io/utils.rs | 142 ++++++++++++++++++++++++++++-------------------- 2 files changed, 129 insertions(+), 89 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index bc548c18f..a5b54f064 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,7 +55,9 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics, + read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, + read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer, + write_node_metrics, }; use crate::io::vss_store::VssStore; use crate::io::{ @@ -1233,7 +1235,9 @@ fn build_with_store_internal( } // Initialize the status fields. - let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) { + let node_metrics = match runtime + .block_on(async { read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1247,7 +1251,9 @@ fn build_with_store_internal( let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) { + let payment_store = match runtime + .block_on(async { read_payments(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { Ok(payments) => Arc::new(PaymentStore::new( payments, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), @@ -1474,24 +1480,23 @@ fn build_with_store_internal( )); // Initialize the network graph, scorer, and router - let network_graph = - match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) { - Ok(graph) => Arc::new(graph), - Err(e) => { - if e.kind() == std::io::ErrorKind::NotFound { - Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) - } else { - log_error!(logger, "Failed to read network graph from store: {}", e); - return Err(BuildError::ReadFailed); - } - }, - }; + let network_graph = match runtime + .block_on(async { read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { + Ok(graph) => Arc::new(graph), + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) + } else { + log_error!(logger, "Failed to read network graph from store: {}", e); + return Err(BuildError::ReadFailed); + } + }, + }; - let local_scorer = match io::utils::read_scorer( - Arc::clone(&kv_store), - Arc::clone(&network_graph), - Arc::clone(&logger), - ) { + let local_scorer = match runtime.block_on(async { + read_scorer(Arc::clone(&kv_store), Arc::clone(&network_graph), Arc::clone(&logger)).await + }) { Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1507,7 +1512,10 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); // Restore external pathfinding scores from cache if possible. - match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) { + match runtime.block_on(async { + read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) + .await + }) { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); @@ -1709,7 +1717,8 @@ fn build_with_store_internal( }, }; - let event_queue = match io::utils::read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)) + let event_queue = match runtime + .block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await }) { Ok(event_queue) => Arc::new(event_queue), Err(e) => { @@ -1831,14 +1840,17 @@ fn build_with_store_internal( let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); - let output_sweeper = match io::utils::read_output_sweeper( - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&chain_source), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - Arc::clone(&logger), - ) { + let output_sweeper = match runtime.block_on(async { + read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + Arc::clone(&keys_manager), + Arc::clone(&kv_store), + Arc::clone(&logger), + ) + .await + }) { Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1859,7 +1871,9 @@ fn build_with_store_internal( }, }; - let peer_store = match io::utils::read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)) { + let peer_store = match runtime + .block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await }) + { Ok(peer_store) => Arc::new(peer_store), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { diff --git a/src/io/utils.rs b/src/io/utils.rs index 1b4b02a82..c336e4aa9 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -133,18 +133,21 @@ where } /// Read a previously persisted [`NetworkGraph`] from the store. -pub(crate) fn read_network_graph( +pub(crate) async fn read_network_graph( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await?, + ); NetworkGraph::read(&mut reader, logger.clone()).map_err(|e| { log_error!(logger, "Failed to deserialize NetworkGraph: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph") @@ -152,19 +155,22 @@ where } /// Read a previously persisted [`ProbabilisticScorer`] from the store. -pub(crate) fn read_scorer>, L: Deref + Clone>( +pub(crate) async fn read_scorer>, L: Deref + Clone>( kv_store: Arc, network_graph: G, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { let params = ProbabilisticScoringDecayParameters::default(); - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, + ) + .await?, + ); let args = (params, network_graph, logger.clone()); ProbabilisticScorer::read(&mut reader, args).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); @@ -173,18 +179,21 @@ where } /// Read previously persisted external pathfinding scores from the cache. -pub(crate) fn read_external_pathfinding_scores_from_cache( +pub(crate) async fn read_external_pathfinding_scores_from_cache( kv_store: Arc, logger: L, ) -> Result where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, - SCORER_PERSISTENCE_SECONDARY_NAMESPACE, - EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + EXTERNAL_PATHFINDING_SCORES_CACHE_KEY, + ) + .await?, + ); ChannelLiquidities::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize scorer: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer") @@ -220,18 +229,21 @@ where } /// Read previously persisted events from the store. -pub(crate) fn read_event_queue( +pub(crate) async fn read_event_queue( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, - EVENT_QUEUE_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, + EVENT_QUEUE_PERSISTENCE_KEY, + ) + .await?, + ); EventQueue::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize event queue: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue") @@ -239,18 +251,21 @@ where } /// Read previously persisted peer info from the store. -pub(crate) fn read_peer_info( +pub(crate) async fn read_peer_info( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - PEER_INFO_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + PEER_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PEER_INFO_PERSISTENCE_KEY, + ) + .await?, + ); PeerStore::read(&mut reader, (kv_store, logger.clone())).map_err(|e| { log_error!(logger, "Failed to deserialize peer store: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize PeerStore") @@ -258,7 +273,7 @@ where } /// Read previously persisted payments information from the store. -pub(crate) fn read_payments( +pub(crate) async fn read_payments( kv_store: Arc, logger: L, ) -> Result, std::io::Error> where @@ -266,17 +281,22 @@ where { let mut res = Vec::new(); - for stored_key in KVStoreSync::list( + for stored_key in KVStore::list( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - )? { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &stored_key, - )?); + ) + .await? + { + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + ) + .await?, + ); let payment = PaymentDetails::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); std::io::Error::new( @@ -290,17 +310,20 @@ where } /// Read `OutputSweeper` state from the store. -pub(crate) fn read_output_sweeper( +pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, chain_data_source: Arc, keys_manager: Arc, kv_store: Arc, logger: Arc, ) -> Result { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, - OUTPUT_SWEEPER_PERSISTENCE_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, + OUTPUT_SWEEPER_PERSISTENCE_KEY, + ) + .await?, + ); let args = ( broadcaster, fee_estimator, @@ -317,18 +340,21 @@ pub(crate) fn read_output_sweeper( Ok(sweeper) } -pub(crate) fn read_node_metrics( +pub(crate) async fn read_node_metrics( kv_store: Arc, logger: L, ) -> Result where L::Target: LdkLogger, { - let mut reader = Cursor::new(KVStoreSync::read( - &*kv_store, - NODE_METRICS_PRIMARY_NAMESPACE, - NODE_METRICS_SECONDARY_NAMESPACE, - NODE_METRICS_KEY, - )?); + let mut reader = Cursor::new( + KVStore::read( + &*kv_store, + NODE_METRICS_PRIMARY_NAMESPACE, + NODE_METRICS_SECONDARY_NAMESPACE, + NODE_METRICS_KEY, + ) + .await?, + ); NodeMetrics::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize NodeMetrics: {}", e); std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NodeMetrics") From 4a0acc2afebb7e843e9e7d1e56a504ff41cd1484 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 5 Jan 2026 14:16:29 +0100 Subject: [PATCH 2/3] Parallelize `read_payments` Previously, we would read entries of our payment store sequentially. This is more or less fine when we read from a local store, but when we read from a remote (e.g., VSS) store, all the latency could result in considerable slowdown during startup. Here, we opt to read store entries in batches. --- Cargo.toml | 2 ++ src/builder.rs | 2 +- src/ffi/types.rs | 3 +- src/io/utils.rs | 59 +++++++++++++++++++++++++++++++++------ src/payment/unified_qr.rs | 3 ++ src/types.rs | 3 +- 6 files changed, 60 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4f5f117e8..8ca337c05 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,8 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} +#bitcoin-payment-instructions = { version = "0.6" } +bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", branch = "2025-12-ldk-node-base" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } diff --git a/src/builder.rs b/src/builder.rs index a5b54f064..160fbfe19 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -19,7 +19,7 @@ use bip39::Mnemonic; use bitcoin::bip32::{ChildNumber, Xpriv}; use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Network}; -use bdk_chain::{BlockId, TxUpdate}; +use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::{chainmonitor, BestBlock, Watch}; use lightning::io::Cursor; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 3c88a665f..30ade1c34 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -29,6 +29,7 @@ use lightning::offers::invoice::Bolt12Invoice as LdkBolt12Invoice; pub use lightning::offers::offer::OfferId; use lightning::offers::offer::{Amount as LdkAmount, Offer as LdkOffer}; use lightning::offers::refund::Refund as LdkRefund; +use lightning::onion_message::dns_resolution::HumanReadableName as LdkHumanReadableName; pub use lightning::routing::gossip::{NodeAlias, NodeId, RoutingFees}; pub use lightning::routing::router::RouteParametersConfig; use lightning::util::ser::Writeable; @@ -54,7 +55,7 @@ pub use crate::logger::{LogLevel, LogRecord, LogWriter}; pub use crate::payment::store::{ ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, }; -pub use crate::payment::QrPaymentResult; +pub use crate::payment::UnifiedPaymentResult; use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; impl UniffiCustomTypeConverter for PublicKey { diff --git a/src/io/utils.rs b/src/io/utils.rs index c336e4aa9..536713c0f 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -281,22 +281,59 @@ where { let mut res = Vec::new(); - for stored_key in KVStore::list( + let mut stored_keys = KVStore::list( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, ) - .await? - { - let mut reader = Cursor::new( - KVStore::read( + .await?; + + const BATCH_SIZE: usize = 50; + + let mut set = tokio::task::JoinSet::new(); + + // Fill JoinSet with tasks if possible + while set.len() < BATCH_SIZE && !stored_keys.is_empty() { + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( &*kv_store, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &stored_key, - ) - .await?, - ); + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + } + + while let Some(read_res) = set.join_next().await { + // Exit early if we get an IO error. + let read_res = read_res + .map_err(|e| { + log_error!(logger, "Failed to read PaymentDetails: {}", e); + set.abort_all(); + e + })? + .map_err(|e| { + log_error!(logger, "Failed to read PaymentDetails: {}", e); + set.abort_all(); + e + })?; + + // Refill set for every finished future, if we still have something to do. + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + + // Handle result. + let mut reader = Cursor::new(read_res); let payment = PaymentDetails::read(&mut reader).map_err(|e| { log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); std::io::Error::new( @@ -306,6 +343,10 @@ where })?; res.push(payment); } + + debug_assert!(set.is_empty()); + debug_assert!(stored_keys.is_empty()); + Ok(res) } diff --git a/src/payment/unified_qr.rs b/src/payment/unified_qr.rs index 6ebf25563..48995d2e8 100644 --- a/src/payment/unified_qr.rs +++ b/src/payment/unified_qr.rs @@ -18,6 +18,8 @@ use bip21::de::ParamKind; use bip21::{DeserializationError, DeserializeParams, Param, SerializeParams}; use bitcoin::address::{NetworkChecked, NetworkUnchecked}; use bitcoin::{Amount, Txid}; +use bitcoin_payment_instructions::amount::Amount as BPIAmount; +use bitcoin_payment_instructions::{PaymentInstructions, PaymentMethod}; use lightning::ln::channelmanager::PaymentId; use lightning::offers::offer::Offer; use lightning::routing::router::RouteParametersConfig; @@ -310,6 +312,7 @@ impl DeserializationError for Extras { mod tests { use std::str::FromStr; + use bitcoin::address::NetworkUnchecked; use bitcoin::{Address, Network}; use super::*; diff --git a/src/types.rs b/src/types.rs index dfd4cd0bb..1081e2a18 100644 --- a/src/types.rs +++ b/src/types.rs @@ -9,7 +9,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; use bitcoin::secp256k1::PublicKey; -use bitcoin::OutPoint; +use bitcoin::{OutPoint, ScriptBuf}; +use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::chainmonitor; use lightning::impl_writeable_tlv_based; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; From 80a170f8c8b33cc6c10610cbd50904ffb176edb8 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 7 Jan 2026 10:56:33 +0100 Subject: [PATCH 3/3] Add test for payment persistence after node restart Add integration test that verifies 200 payments are correctly persisted and retrievable via `list_payments` after restarting a node. Co-Authored-By: Claude AI --- tests/integration_tests_rust.rs | 121 +++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index daf15752c..954dce838 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -38,13 +38,14 @@ use bitcoin::address::NetworkUnchecked; use bitcoin::hashes::sha256::Hash as Sha256Hash; use bitcoin::hashes::Hash; use bitcoin::{Address, Amount, ScriptBuf}; - use log::LevelFilter; use std::collections::HashSet; use std::str::FromStr; use std::sync::Arc; +use crate::common::TestStoreType; + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -2602,3 +2603,121 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { Some(6) ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn payment_persistence_after_restart() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + + // Setup nodes manually so we can restart node_a with the same config + println!("== Node A =="); + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + + let num_payments = 200; + let payment_amount_msat = 1_000_000; // 1000 sats per payment + + { + let node_a = setup_node(&chain_source, config_a.clone(), None); + + println!("\n== Node B =="); + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b, None); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + // Premine sufficient funds for a large channel and many payments + let premine_amount_sat = 10_000_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(premine_amount_sat), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!(node_b.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Open a large channel from node_a to node_b + let channel_amount_sat = 5_000_000; + open_channel(&node_a, &node_b, channel_amount_sat, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_channel_ready_event!(node_b, node_a.node_id()); + + // Send 200 payments from node_a to node_b + println!("\nSending {} payments from A to B...", num_payments); + let invoice_description = + Bolt11InvoiceDescription::Direct(Description::new(String::from("test")).unwrap()); + + for i in 0..num_payments { + let invoice = node_b + .bolt11_payment() + .receive(payment_amount_msat, &invoice_description.clone().into(), 3600) + .unwrap(); + let payment_id = node_a.bolt11_payment().send(&invoice, None).unwrap(); + expect_event!(node_a, PaymentSuccessful); + expect_event!(node_b, PaymentReceived); + + if (i + 1) % 50 == 0 { + println!("Completed {} payments", i + 1); + } + + // Verify payment succeeded + assert_eq!(node_a.payment(&payment_id).unwrap().status, PaymentStatus::Succeeded); + } + println!("All {} payments completed successfully", num_payments); + + // Verify node_a has 200 outbound Bolt11 payments before shutdown + let outbound_payments_before = node_a.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Bolt11 { .. }) + }); + assert_eq!(outbound_payments_before.len(), num_payments); + + // Shut down both nodes + println!("\nShutting down nodes..."); + node_a.stop().unwrap(); + node_b.stop().unwrap(); + } + + // Restart node_a with the same config + println!("\nRestarting node A..."); + let restarted_node_a = setup_node(&chain_source, config_a, None); + + // Assert all 200 payments are still in the store + let outbound_payments_after = restarted_node_a.list_payments_with_filter(|p| { + p.direction == PaymentDirection::Outbound && matches!(p.kind, PaymentKind::Bolt11 { .. }) + }); + assert_eq!( + outbound_payments_after.len(), + num_payments, + "Expected {} payments after restart, found {}", + num_payments, + outbound_payments_after.len() + ); + + // Verify all payments have the correct status + for payment in &outbound_payments_after { + assert_eq!( + payment.status, + PaymentStatus::Succeeded, + "Payment {:?} has unexpected status {:?}", + payment.id, + payment.status + ); + assert_eq!(payment.amount_msat, Some(payment_amount_msat)); + } + + println!( + "Successfully verified {} payments persisted after restart", + outbound_payments_after.len() + ); + + restarted_node_a.stop().unwrap(); +}