diff --git a/src/builder.rs b/src/builder.rs index 7641a767d..36df91e0d 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -9,8 +9,9 @@ use std::collections::HashMap; use std::convert::TryInto; use std::default::Default; use std::path::PathBuf; +use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex, Once, RwLock}; -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use std::{fmt, fs}; use bdk_wallet::template::Bip84; @@ -47,6 +48,8 @@ use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MAX_PROBE_LOCKED_MSAT, DEFAULT_PROBING_INTERVAL_SECS, + MIN_PROBE_AMOUNT_MSAT, }; use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; @@ -73,6 +76,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; +use crate::probing; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -151,6 +155,38 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[cfg_attr(feature = "uniffi", allow(dead_code))] +enum ProbingStrategyKind { + HighDegree { top_n: usize }, + Random { max_hops: usize }, + Custom(Arc), +} + +struct ProbingStrategyConfig { + kind: ProbingStrategyKind, + interval: Duration, + max_locked_msat: u64, +} + +impl fmt::Debug for ProbingStrategyConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let kind_str = match &self.kind { + ProbingStrategyKind::HighDegree { top_n } => { + format!("HighDegree {{ top_n: {} }}", top_n) + }, + ProbingStrategyKind::Random { max_hops } => { + format!("Random {{ max_hops: {} }}", max_hops) + }, + ProbingStrategyKind::Custom(_) => "Custom()".to_string(), + }; + f.debug_struct("ProbingStrategyConfig") + .field("kind", &kind_str) + .field("interval", &self.interval) + .field("max_locked_msat", &self.max_locked_msat) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -247,6 +283,8 @@ pub struct NodeBuilder { runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, + probing_strategy: Option, + probing_diversity_penalty_msat: Option, } impl NodeBuilder { @@ -265,6 +303,9 @@ impl NodeBuilder { let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; + let async_payments_role = None; + let probing_strategy = None; + let probing_diversity_penalty_msat = None; Self { config, chain_data_source_config, @@ -272,9 +313,11 @@ impl NodeBuilder { liquidity_source_config, log_writer_config, runtime_handle, - async_payments_role: None, + async_payments_role, pathfinding_scores_sync_config, recovery_mode, + probing_strategy, + probing_diversity_penalty_msat, } } @@ -559,6 +602,87 @@ impl NodeBuilder { self } + /// Configures background probing toward the highest-degree nodes in the network graph. + /// + /// `top_n` controls how many of the most-connected nodes are cycled through. + #[cfg_attr(feature = "uniffi", allow(dead_code))] + pub fn set_high_degree_probing_strategy(&mut self, top_n: usize) -> &mut Self { + let kind = ProbingStrategyKind::HighDegree { top_n }; + self.probing_strategy = Some(self.make_probing_config(kind)); + self + } + + /// Configures background probing via random graph walks of up to `max_hops` hops. + #[cfg_attr(feature = "uniffi", allow(dead_code))] + pub fn set_random_probing_strategy(&mut self, max_hops: usize) -> &mut Self { + let kind = ProbingStrategyKind::Random { max_hops }; + self.probing_strategy = Some(self.make_probing_config(kind)); + self + } + + /// Configures a custom probing strategy for background channel probing. + /// + /// When set, the node will periodically call [`probing::ProbingStrategy::next_probe`] and dispatch the + /// returned probe via the channel manager. + #[cfg_attr(feature = "uniffi", allow(dead_code))] + pub fn set_custom_probing_strategy( + &mut self, strategy: Arc, + ) -> &mut Self { + let kind = ProbingStrategyKind::Custom(strategy); + self.probing_strategy = Some(self.make_probing_config(kind)); + self + } + + /// Overrides the interval between probe attempts. Only has effect if a probing strategy is set. + #[cfg_attr(feature = "uniffi", allow(dead_code))] + pub fn set_probing_interval(&mut self, interval: Duration) -> &mut Self { + if let Some(cfg) = &mut self.probing_strategy { + cfg.interval = interval; + } + self + } + + /// Overrides the maximum millisatoshis that may be locked in in-flight probes at any time. + /// Only has effect if a probing strategy is set. + #[cfg_attr(feature = "uniffi", allow(dead_code))] + pub fn set_max_probe_locked_msat(&mut self, max_msat: u64) -> &mut Self { + if let Some(cfg) = &mut self.probing_strategy { + cfg.max_locked_msat = max_msat; + } + self + } + + /// Sets the probing diversity penalty applied by the probabilistic scorer. + /// + /// When set, the scorer will penalize channels that have been recently probed, + /// encouraging path diversity during background probing. The penalty decays + /// quadratically over 24 hours. + /// + /// This is only useful for probing strategies that route through the scorer + /// (e.g., [`probing::HighDegreeStrategy`]). Strategies that build paths manually + /// (e.g., [`probing::RandomStrategy`]) bypass the scorer entirely. + /// + /// If unset, LDK's default of `0` (no penalty) is used. + #[cfg_attr(feature = "uniffi", allow(dead_code))] + pub fn set_probing_diversity_penalty_msat(&mut self, penalty_msat: u64) -> &mut Self { + self.probing_diversity_penalty_msat = Some(penalty_msat); + self + } + + #[cfg_attr(feature = "uniffi", allow(dead_code))] + fn make_probing_config(&self, kind: ProbingStrategyKind) -> ProbingStrategyConfig { + let existing = self.probing_strategy.as_ref(); + ProbingStrategyConfig { + kind, + interval: existing + .map(|c| c.interval) + .unwrap_or(Duration::from_secs(DEFAULT_PROBING_INTERVAL_SECS)), + max_locked_msat: existing + .map(|c| c.max_locked_msat) + .unwrap_or(DEFAULT_MAX_PROBE_LOCKED_MSAT), + } + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -736,6 +860,8 @@ impl NodeBuilder { runtime, logger, Arc::new(DynStoreWrapper(kv_store)), + self.probing_strategy.as_ref(), + self.probing_diversity_penalty_msat, ) } } @@ -981,6 +1107,11 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_wallet_recovery_mode(); } + /// Configures a probing strategy for background channel probing. + pub fn set_custom_probing_strategy(&self, strategy: Arc) { + self.inner.write().unwrap().set_custom_probing_strategy(strategy); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { @@ -1126,6 +1257,7 @@ fn build_with_store_internal( pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, async_payments_role: Option, recovery_mode: bool, seed_bytes: [u8; 64], runtime: Arc, logger: Arc, kv_store: Arc, + probing_config: Option<&ProbingStrategyConfig>, probing_diversity_penalty_msat: Option, ) -> Result { optionally_install_rustls_cryptoprovider(); @@ -1517,7 +1649,10 @@ fn build_with_store_internal( }, } - let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default(); + if let Some(penalty) = probing_diversity_penalty_msat { + scoring_fee_params.probing_diversity_penalty_msat = penalty; + } let router = Arc::new(DefaultRouter::new( Arc::clone(&network_graph), Arc::clone(&logger), @@ -1853,6 +1988,36 @@ fn build_with_store_internal( _leak_checker.0.push(Arc::downgrade(&wallet) as Weak); } + let prober = probing_config.map(|probing_cfg| { + let strategy: Arc = match &probing_cfg.kind { + ProbingStrategyKind::HighDegree { top_n } => { + Arc::new(probing::HighDegreeStrategy::new( + network_graph.clone(), + *top_n, + MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + )) + }, + ProbingStrategyKind::Random { max_hops } => Arc::new(probing::RandomStrategy::new( + network_graph.clone(), + channel_manager.clone(), + *max_hops, + MIN_PROBE_AMOUNT_MSAT, + DEFAULT_MAX_PROBE_AMOUNT_MSAT, + )), + ProbingStrategyKind::Custom(s) => s.clone(), + }; + Arc::new(probing::Prober { + channel_manager: channel_manager.clone(), + logger: logger.clone(), + strategy, + interval: probing_cfg.interval, + liquidity_limit_multiplier: Some(config.probing_liquidity_limit_multiplier), + max_locked_msat: probing_cfg.max_locked_msat, + locked_msat: Arc::new(AtomicU64::new(0)), + }) + }); + Ok(Node { runtime, stop_sender, @@ -1886,6 +2051,7 @@ fn build_with_store_internal( om_mailbox, async_payments_role, hrn_resolver, + prober, #[cfg(cycle_tests)] _leak_checker, }) diff --git a/src/config.rs b/src/config.rs index 96a6f49d9..6a317f188 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,10 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; +pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10; +pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats +pub(crate) const MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats +pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; // The default timeout after which we abort a wallet syncing operation. diff --git a/src/event.rs b/src/event.rs index c4949a5ac..0c39728a0 100644 --- a/src/event.rs +++ b/src/event.rs @@ -9,6 +9,7 @@ use core::future::Future; use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use bitcoin::blockdata::locktime::absolute::LockTime; @@ -515,6 +516,7 @@ where static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, + probe_locked_msat: Option>, } impl EventHandler @@ -531,6 +533,7 @@ where keys_manager: Arc, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, runtime: Arc, logger: L, config: Arc, + probe_locked_msat: Option>, ) -> Self { Self { event_queue, @@ -550,6 +553,7 @@ where static_invoice_store, onion_messenger, om_mailbox, + probe_locked_msat, } } @@ -1135,8 +1139,22 @@ where LdkEvent::PaymentPathSuccessful { .. } => {}, LdkEvent::PaymentPathFailed { .. } => {}, - LdkEvent::ProbeSuccessful { .. } => {}, - LdkEvent::ProbeFailed { .. } => {}, + LdkEvent::ProbeSuccessful { path, .. } => { + if let Some(counter) = &self.probe_locked_msat { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| { + Some(v.saturating_sub(amount)) + }); + } + }, + LdkEvent::ProbeFailed { path, .. } => { + if let Some(counter) = &self.probe_locked_msat { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + let _ = counter.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| { + Some(v.saturating_sub(amount)) + }); + } + }, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source.handle_htlc_handling_failed(failure_type).await; @@ -1375,7 +1393,6 @@ where ); } } - if let Some(liquidity_source) = self.liquidity_source.as_ref() { let skimmed_fee_msat = skimmed_fee_msat.unwrap_or(0); liquidity_source diff --git a/src/lib.rs b/src/lib.rs index 3e5180dcb..59388b8c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,6 +101,7 @@ pub mod logger; mod message_handler; pub mod payment; mod peer_store; +mod probing; mod runtime; mod scoring; mod tx_broadcaster; @@ -166,6 +167,7 @@ use payment::{ UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; +pub use probing::{HighDegreeStrategy, Probe, ProbingStrategy, RandomStrategy}; use runtime::Runtime; pub use tokio; use types::{ @@ -235,6 +237,7 @@ pub struct Node { om_mailbox: Option>, async_payments_role: Option, hrn_resolver: Arc, + prober: Option>, #[cfg(cycle_tests)] _leak_checker: LeakChecker, } @@ -571,6 +574,7 @@ impl Node { None }; + let probe_locked_msat = self.prober.as_ref().map(|p| Arc::clone(&p.locked_msat)); let event_handler = Arc::new(EventHandler::new( Arc::clone(&self.event_queue), Arc::clone(&self.wallet), @@ -589,8 +593,16 @@ impl Node { Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), + probe_locked_msat, )); + if let Some(prober) = self.prober.clone() { + let stop_rx = self.stop_sender.subscribe(); + self.runtime.spawn_cancellable_background_task(async move { + probing::run_prober(prober, stop_rx).await; + }); + } + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1063,6 +1075,42 @@ impl Node { )) } + /// Returns the total millisatoshis currently locked in in-flight probes, or `None` if no + /// probing strategy is configured. + pub fn probe_locked_msat(&self) -> Option { + self.prober.as_ref().map(|p| p.locked_msat.load(std::sync::atomic::Ordering::Relaxed)) + } + + /// Returns the scorer's estimated `(min, max)` liquidity range for the given channel in the + /// direction toward `target`, or `None` if the scorer has no data for that channel. + /// + /// Works by serializing the `CombinedScorer` (which writes `local_only_scorer`) and + /// deserializing it as a plain `ProbabilisticScorer` to call `estimated_channel_liquidity_range`. + pub fn scorer_channel_liquidity(&self, scid: u64, target: PublicKey) -> Option<(u64, u64)> { + use lightning::routing::scoring::{ + ProbabilisticScorer, ProbabilisticScoringDecayParameters, + }; + use lightning::util::ser::{ReadableArgs, Writeable}; + + let target_node_id = lightning::routing::gossip::NodeId::from_pubkey(&target); + + let bytes = { + let scorer = self.scorer.lock().unwrap(); + let mut buf = Vec::new(); + scorer.write(&mut buf).ok()?; + buf + }; + + let decay_params = ProbabilisticScoringDecayParameters::default(); + let prob_scorer = ProbabilisticScorer::read( + &mut &bytes[..], + (decay_params, Arc::clone(&self.network_graph), Arc::clone(&self.logger)), + ) + .ok()?; + + prob_scorer.estimated_channel_liquidity_range(scid, &target_node_id) + } + /// Retrieve a list of known channels. pub fn list_channels(&self) -> Vec { self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect() diff --git a/src/probing.rs b/src/probing.rs new file mode 100644 index 000000000..dcfce2d8f --- /dev/null +++ b/src/probing.rs @@ -0,0 +1,391 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use bitcoin::secp256k1::PublicKey; +use lightning::routing::gossip::NodeId; +use lightning::routing::router::{Path, RouteHop, MAX_PATH_LENGTH_ESTIMATE}; +use lightning_invoice::DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA; +use lightning_types::features::NodeFeatures; + +use crate::logger::{log_debug, LdkLogger, Logger}; +use crate::types::{ChannelManager, Graph}; + +/// Returns a random `u64` uniformly distributed in `[min, max]` (inclusive). +fn random_range(min: u64, max: u64) -> u64 { + debug_assert!(min <= max); + if min == max { + return min; + } + let mut buf = [0u8; 8]; + getrandom::fill(&mut buf).expect("getrandom failed"); + let range = max - min + 1; + min + (u64::from_ne_bytes(buf) % range) +} + +/// A probe to be dispatched by the Prober. +pub enum Probe { + /// A manually constructed path; dispatched via `send_probe`. + PrebuiltRoute(Path), + /// A destination to reach; the router selects the actual path via + /// `send_spontaneous_preflight_probes`. + Destination { + /// The destination node. + final_node: PublicKey, + /// The probe amount in millisatoshis. + amount_msat: u64, + }, +} + +/// Strategy can be used for determining the next target and amount for probing. +pub trait ProbingStrategy: Send + Sync + 'static { + /// Returns the next probe to run, or `None` to skip this tick. + fn next_probe(&self) -> Option; +} + +/// Probes toward the most-connected nodes in the graph. +/// +/// Sorts all graph nodes by channel count descending, then cycles through the +/// top-`top_node_count` entries using `Destination` so the router finds the actual path. +/// The probe amount is chosen uniformly at random from `[min_amount_msat, max_amount_msat]`. +pub struct HighDegreeStrategy { + network_graph: Arc, + /// How many of the highest-degree nodes to cycle through. + pub top_node_count: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, + cursor: AtomicUsize, +} + +impl HighDegreeStrategy { + /// Creates a new high-degree probing strategy. + pub(crate) fn new( + network_graph: Arc, top_node_count: usize, min_amount_msat: u64, + max_amount_msat: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + top_node_count, + min_amount_msat, + max_amount_msat, + cursor: AtomicUsize::new(0), + } + } +} + +impl ProbingStrategy for HighDegreeStrategy { + fn next_probe(&self) -> Option { + let graph = self.network_graph.read_only(); + + // Collect (pubkey, channel_count) for all nodes. + let mut nodes_by_degree: Vec<(PublicKey, usize)> = graph + .nodes() + .unordered_iter() + .filter_map(|(id, info)| { + PublicKey::try_from(*id).ok().map(|pk| (pk, info.channels.len())) + }) + .collect(); + + if nodes_by_degree.is_empty() { + return None; + } + + nodes_by_degree.sort_unstable_by(|a, b| b.1.cmp(&a.1)); + + let top_node_count = self.top_node_count.min(nodes_by_degree.len()); + + let cursor = self.cursor.fetch_add(1, Ordering::Relaxed); + let (final_node, _degree) = nodes_by_degree[cursor % top_node_count]; + + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + Some(Probe::Destination { final_node, amount_msat }) + } +} + +/// Explores the graph by walking a random number of hops outward from one of our own +/// channels, constructing the [`Path`] explicitly. +/// +/// On each tick: +/// 1. Picks one of our confirmed, usable channels to start from. +/// 2. Performs a deterministic walk of a randomly chosen depth (up to +/// [`MAX_PATH_LENGTH_ESTIMATE`]) through the gossip graph, skipping disabled +/// channels and dead-ends. +/// 3. Returns `Probe::PrebuiltRoute(path)` so the prober calls `send_probe` directly. +/// +/// The probe amount is chosen uniformly at random from `[min_amount_msat, max_amount_msat]`. +/// +/// Because path selection ignores the scorer, this probes channels the router +/// would never try on its own, teaching the scorer about previously unknown paths. +pub struct RandomStrategy { + network_graph: Arc, + channel_manager: Arc, + /// Upper bound on the number of hops in a randomly constructed path. + pub max_hops: usize, + /// Lower bound for the randomly chosen probe amount. + pub min_amount_msat: u64, + /// Upper bound for the randomly chosen probe amount. + pub max_amount_msat: u64, +} + +impl RandomStrategy { + /// Creates a new random-walk probing strategy. + pub(crate) fn new( + network_graph: Arc, channel_manager: Arc, max_hops: usize, + min_amount_msat: u64, max_amount_msat: u64, + ) -> Self { + assert!( + min_amount_msat <= max_amount_msat, + "min_amount_msat must not exceed max_amount_msat" + ); + Self { + network_graph, + channel_manager, + max_hops: max_hops.clamp(1, MAX_PATH_LENGTH_ESTIMATE as usize), + min_amount_msat, + max_amount_msat, + } + } + + /// Tries to build a path of `target_hops` hops. Returns `None` if the local node has no + /// usable channels, or the walk terminates before reaching `target_hops`. + fn try_build_path(&self, target_hops: usize, amount_msat: u64) -> Option { + let initial_channels = self + .channel_manager + .list_channels() + .into_iter() + .filter(|c| c.is_usable && c.short_channel_id.is_some()) + .collect::>(); + + if initial_channels.is_empty() { + return None; + } + + let graph = self.network_graph.read_only(); + let first_hop = + &initial_channels[random_range(0, initial_channels.len() as u64 - 1) as usize]; + let first_hop_scid = first_hop.short_channel_id.unwrap(); + let next_peer_pubkey = first_hop.counterparty.node_id; + let next_peer_node_id = NodeId::from_pubkey(&next_peer_pubkey); + + // Track the tightest HTLC limit across all hops to cap the probe amount. + // The first hop limit comes from our live channel state; subsequent hops use htlc_maximum_msat from the gossip channel update. + let mut route_least_htlc_upper_bound = first_hop.next_outbound_htlc_limit_msat; + + // Walk the graph: each entry is (node_id, arrived_via_scid, pubkey); first entry is set: + let mut route: Vec<(NodeId, u64, PublicKey)> = + vec![(next_peer_node_id, first_hop_scid, next_peer_pubkey)]; + + let mut prev_scid = first_hop_scid; + let mut current_node_id = next_peer_node_id; + + for _ in 1..target_hops { + let node_info = match graph.node(¤t_node_id) { + Some(n) => n, + None => break, + }; + + // Outward channels: skip the one we arrived on to avoid backtracking. + let candidates: Vec = + node_info.channels.iter().copied().filter(|&scid| scid != prev_scid).collect(); + + if candidates.is_empty() { + break; + } + + let next_scid = candidates[random_range(0, candidates.len() as u64 - 1) as usize]; + let next_channel = match graph.channel(next_scid) { + Some(c) => c, + None => break, + }; + + // as_directed_from validates that current_node_id is a channel endpoint and that + // both direction updates are present; effective_capacity covers both htlc_maximum_msat + // and funding capacity. + let Some((directed, next_node_id)) = next_channel.as_directed_from(¤t_node_id) + else { + break; + }; + // Retrieve the direction-specific update via the public ChannelInfo fields. + // Safe to unwrap: as_directed_from already checked both directions are Some. + let update = if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref().unwrap() + } else { + next_channel.two_to_one.as_ref().unwrap() + }; + + if !update.enabled { + break; + } + + route_least_htlc_upper_bound = + route_least_htlc_upper_bound.min(update.htlc_maximum_msat); + + let next_pubkey = match PublicKey::try_from(*next_node_id) { + Ok(pk) => pk, + Err(_) => break, + }; + + route.push((*next_node_id, next_scid, next_pubkey)); + prev_scid = next_scid; + current_node_id = *next_node_id; + } + + let amount_msat = amount_msat.min(route_least_htlc_upper_bound); //cap probe amount + if amount_msat < self.min_amount_msat { + return None; + } + + // Assemble hops. + // For hop i: fee and CLTV are determined by the *next* channel (what route[i] + // will charge to forward onward). For the last hop they are amount_msat and zero expiry delta. + let mut hops = Vec::with_capacity(route.len()); + for i in 0..route.len() { + let (node_id, via_scid, pubkey) = route[i]; + + let channel_info = graph.channel(via_scid)?; + + let node_features = graph + .node(&node_id) + .and_then(|n| n.announcement_info.as_ref().map(|a| a.features().clone())) + .unwrap_or_else(NodeFeatures::empty); + + let (fee_msat, cltv_expiry_delta) = if i + 1 < route.len() { + // non-final hop + let (_, next_scid, _) = route[i + 1]; + let next_channel = graph.channel(next_scid)?; + let (directed, _) = next_channel.as_directed_from(&node_id)?; + let update = if directed.source() == &next_channel.node_one { + next_channel.one_to_two.as_ref().unwrap() + } else { + next_channel.two_to_one.as_ref().unwrap() + }; + let fee = update.fees.base_msat as u64 + + (amount_msat * update.fees.proportional_millionths as u64 / 1_000_000); + (fee, update.cltv_expiry_delta as u32) + } else { + // Final hop: fee_msat carries the delivery amount; cltv delta is zero. + (amount_msat, 0) + }; + + hops.push(RouteHop { + pubkey, + node_features, + short_channel_id: via_scid, + channel_features: channel_info.features.clone(), + fee_msat, + cltv_expiry_delta, + maybe_announced_channel: true, + }); + } + + // The first-hop HTLC carries amount_msat + all intermediate fees. + // Verify the total fits within our live outbound limit before returning. + let total_outgoing: u64 = hops.iter().map(|h| h.fee_msat).sum(); + if total_outgoing > first_hop.next_outbound_htlc_limit_msat { + return None; + } + + Some(Path { hops, blinded_tail: None }) + } +} + +impl ProbingStrategy for RandomStrategy { + fn next_probe(&self) -> Option { + let target_hops = random_range(1, self.max_hops as u64) as usize; + let amount_msat = random_range(self.min_amount_msat, self.max_amount_msat); + + self.try_build_path(target_hops, amount_msat).map(Probe::PrebuiltRoute) + } +} + +/// Periodically dispatches probes according to a [`ProbingStrategy`]. +pub struct Prober { + /// The channel manager used to send probes. + pub channel_manager: Arc, + /// Logger. + pub logger: Arc, + /// The strategy that decides what to probe. + pub strategy: Arc, + /// How often to fire a probe attempt. + pub interval: Duration, + /// Passed to `send_spontaneous_preflight_probes`. `None` uses LDK default (3×). + pub liquidity_limit_multiplier: Option, + /// Maximum total millisatoshis that may be locked in in-flight probes at any time. + pub max_locked_msat: u64, + /// Current millisatoshis locked in in-flight probes. Shared with the event handler, + /// which decrements it on `ProbeSuccessful` / `ProbeFailed`. + pub(crate) locked_msat: Arc, +} + +/// Runs the probing loop for the given [`Prober`] until `stop_rx` fires. +pub(crate) async fn run_prober(prober: Arc, mut stop_rx: tokio::sync::watch::Receiver<()>) { + let mut ticker = tokio::time::interval(prober.interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = stop_rx.changed() => { + log_debug!(prober.logger, "Stopping background probing."); + return; + } + _ = ticker.tick() => { + match prober.strategy.next_probe() { + None => {} + Some(Probe::PrebuiltRoute(path)) => { + let amount: u64 = path.hops.iter().map(|h| h.fee_msat).sum(); + if prober.locked_msat.load(Ordering::Acquire) + amount > prober.max_locked_msat { + log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); + } else { + match prober.channel_manager.send_probe(path) { + Ok(_) => { + prober.locked_msat.fetch_add(amount, Ordering::Release); + } + Err(e) => { + log_debug!(prober.logger, "Prebuilt path probe failed: {:?}", e); + } + } + } + } + Some(Probe::Destination { final_node, amount_msat }) => { + if prober.locked_msat.load(Ordering::Acquire) + amount_msat + > prober.max_locked_msat + { + log_debug!(prober.logger, "Skipping probe: locked-msat budget exceeded."); + } else { + match prober.channel_manager.send_spontaneous_preflight_probes( + final_node, + amount_msat, + DEFAULT_MIN_FINAL_CLTV_EXPIRY_DELTA as u32, + prober.liquidity_limit_multiplier, + ) { + Ok(probes) => { + if !probes.is_empty() { + prober.locked_msat.fetch_add(amount_msat, Ordering::Release); + } else { + log_debug!(prober.logger, "No probe paths found for destination {}; skipping budget increment.", final_node); + } + } + Err(e) => { + log_debug!(prober.logger, "Route-follow probe to {} failed: {:?}", final_node, e); + } + } + } + } + } + } + } + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7854a77f2..98dd94c02 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,7 +32,7 @@ use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; use ldk_node::{ Builder, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, - UserChannelId, + ProbingStrategy, UserChannelId, }; use lightning::io; use lightning::ln::msgs::SocketAddress; @@ -340,6 +340,21 @@ impl Default for TestStoreType { } } +#[derive(Clone)] +pub(crate) enum TestProbingStrategy { + Random { max_hops: usize }, + HighDegree { top_n: usize }, + Custom(Arc), +} + +#[derive(Clone)] +pub(crate) struct TestProbingConfig { + pub strategy: TestProbingStrategy, + pub interval: Duration, + pub max_locked_msat: u64, + pub diversity_penalty_msat: Option, +} + #[derive(Clone)] pub(crate) struct TestConfig { pub node_config: Config, @@ -348,6 +363,7 @@ pub(crate) struct TestConfig { pub node_entropy: NodeEntropy, pub async_payments_role: Option, pub recovery_mode: bool, + pub probing: Option, } impl Default for TestConfig { @@ -367,6 +383,7 @@ impl Default for TestConfig { node_entropy, async_payments_role, recovery_mode, + probing: None, } } } @@ -483,6 +500,25 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + if let Some(probing) = config.probing { + match probing.strategy { + TestProbingStrategy::Random { max_hops } => { + builder.set_random_probing_strategy(max_hops); + }, + TestProbingStrategy::HighDegree { top_n } => { + builder.set_high_degree_probing_strategy(top_n); + }, + TestProbingStrategy::Custom(strategy) => { + builder.set_custom_probing_strategy(strategy); + }, + } + builder.set_probing_interval(probing.interval); + builder.set_max_probe_locked_msat(probing.max_locked_msat); + if let Some(penalty) = probing.diversity_penalty_msat { + builder.set_probing_diversity_penalty_msat(penalty); + } + } + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -713,6 +749,41 @@ pub async fn open_channel( open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd).await } +/// Like [`open_channel`] but skips the `wait_for_tx` electrum check so that +/// multiple channels can be opened back-to-back before any blocks are mined. +/// The caller is responsible for mining blocks and confirming the funding txs. +pub async fn open_channel_no_electrum_wait( + node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool, +) -> OutPoint { + if should_announce { + node_a + .open_announced_channel( + node_b.node_id(), + node_b.listening_addresses().unwrap().first().unwrap().clone(), + funding_amount_sat, + None, + None, + ) + .unwrap(); + } else { + node_a + .open_channel( + node_b.node_id(), + node_b.listening_addresses().unwrap().first().unwrap().clone(), + funding_amount_sat, + None, + None, + ) + .unwrap(); + } + assert!(node_a.list_peers().iter().find(|c| { c.node_id == node_b.node_id() }).is_some()); + + let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); + let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); + assert_eq!(funding_txo_a, funding_txo_b); + funding_txo_a +} + pub async fn open_channel_push_amt( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, should_announce: bool, electrsd: &ElectrsD, diff --git a/tests/probing_tests.rs b/tests/probing_tests.rs new file mode 100644 index 000000000..6eb690ade --- /dev/null +++ b/tests/probing_tests.rs @@ -0,0 +1,571 @@ +// Integration tests for the probing service. +// +// Budget tests – linear A ──[1M sats]──▶ B ──[1M sats]──▶ C topology: +// +// probe_budget_increments_and_decrements +// Verifies locked_msat rises when a probe is dispatched and returns +// to zero once the probe resolves. +// +// exhausted_probe_budget_blocks_new_probes +// Stops B mid-flight so the HTLC cannot resolve; confirms the budget +// stays exhausted and no further probes are sent. After B restarts +// the probe fails, the budget clears, and new probes resume. +// +// Strategy tests: +// +// probing_strategies_perfomance +// Brings up a random mesh of nodes, fires random-walk probes via +// RandomStrategy and high-degree probes via HighDegreeStrategy, then +// runs payment rounds and prints probing perfomance tables. + +mod common; + +use lightning::routing::gossip::NodeAlias; +use lightning_invoice::{Bolt11InvoiceDescription, Description}; + +use common::{ + expect_channel_ready_event, expect_event, generate_blocks_and_wait, open_channel, + open_channel_no_electrum_wait, premine_and_distribute_funds, random_config, + setup_bitcoind_and_electrsd, setup_node, TestChainSource, TestProbingConfig, + TestProbingStrategy, +}; + +use ldk_node::bitcoin::secp256k1::PublicKey; +use ldk_node::bitcoin::Amount; +use ldk_node::{Event, Node, Probe, ProbingStrategy}; + +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; + +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; +use std::time::Duration; + +const PROBE_AMOUNT_MSAT: u64 = 1_000_000; +const MAX_LOCKED_MSAT: u64 = 100_000_000; +const PROBING_INTERVAL_MILLISECONDS: u64 = 500; +const PROBING_DIVERSITY_PENALTY: u64 = 50_000; + +/// FixedDestStrategy — always targets one node; used by budget tests. +struct FixedDestStrategy { + destination: PublicKey, + amount_msat: u64, +} + +impl FixedDestStrategy { + fn new(destination: PublicKey, amount_msat: u64) -> Arc { + Arc::new(Self { destination, amount_msat }) + } +} + +impl ProbingStrategy for FixedDestStrategy { + fn next_probe(&self) -> Option { + Some(Probe::Destination { final_node: self.destination, amount_msat: self.amount_msat }) + } +} + +// helpers +async fn wait_until(timeout: Duration, predicate: impl Fn() -> bool) -> bool { + let deadline = tokio::time::Instant::now() + timeout; + loop { + if predicate() { + return true; + } + if tokio::time::Instant::now() >= deadline { + return false; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +fn config_with_label(label: &str) -> common::TestConfig { + let mut config = random_config(false); + let mut alias_bytes = [0u8; 32]; + let b = label.as_bytes(); + alias_bytes[..b.len()].copy_from_slice(b); + config.node_config.node_alias = Some(NodeAlias(alias_bytes)); + config +} + +fn probing_config( + strategy: TestProbingStrategy, max_locked_msat: u64, diversity_penalty_msat: Option, +) -> Option { + Some(TestProbingConfig { + strategy, + interval: Duration::from_millis(PROBING_INTERVAL_MILLISECONDS), + max_locked_msat, + diversity_penalty_msat, + }) +} + +fn build_node_fixed_dest_probing( + chain_source: &TestChainSource<'_>, destination_node_id: PublicKey, +) -> Node { + let mut config = random_config(false); + let strategy = FixedDestStrategy::new(destination_node_id, PROBE_AMOUNT_MSAT); + config.probing = probing_config(TestProbingStrategy::Custom(strategy), PROBE_AMOUNT_MSAT, None); + setup_node(chain_source, config) +} + +fn build_node_random_probing(chain_source: &TestChainSource<'_>, max_hops: usize) -> Node { + let mut config = config_with_label("Random"); + config.probing = + probing_config(TestProbingStrategy::Random { max_hops }, MAX_LOCKED_MSAT, None); + setup_node(chain_source, config) +} + +fn build_node_highdegree_probing(chain_source: &TestChainSource<'_>, top_n: usize) -> Node { + let mut config = config_with_label("HiDeg"); + config.probing = + probing_config(TestProbingStrategy::HighDegree { top_n }, MAX_LOCKED_MSAT, None); + setup_node(chain_source, config) +} + +fn build_node_z_highdegree_probing( + chain_source: &TestChainSource<'_>, top_n: usize, diversity_penalty_msat: u64, +) -> Node { + let mut config = config_with_label("HiDeg+P"); + config.probing = probing_config( + TestProbingStrategy::HighDegree { top_n }, + MAX_LOCKED_MSAT, + Some(diversity_penalty_msat), + ); + setup_node(chain_source, config) +} + +// helpers, formatting +fn node_label(node: &Node) -> String { + node.node_alias() + .map(|alias| { + let end = alias.0.iter().position(|&b| b == 0).unwrap_or(32); + String::from_utf8_lossy(&alias.0[..end]).to_string() + }) + .unwrap_or_else(|| format!("{:.8}", node.node_id())) +} + +fn print_topology(all_nodes: &[&Node]) { + let labels: HashMap = + all_nodes.iter().map(|n| (n.node_id(), node_label(n))).collect(); + let label_of = |pk: PublicKey| labels.get(&pk).cloned().unwrap_or_else(|| format!("{:.8}", pk)); + + let mut adjacency: BTreeMap> = BTreeMap::new(); + for node in all_nodes { + let local = label_of(node.node_id()); + let mut peers: Vec = node + .list_channels() + .into_iter() + .filter(|ch| ch.short_channel_id.is_some()) + .map(|ch| label_of(ch.counterparty_node_id)) + .collect(); + peers.sort(); + peers.dedup(); + adjacency.entry(local).or_default().extend(peers); + } + + println!("\n=== Topology ==="); + for (node, peers) in &adjacency { + println!(" {node} ── {}", peers.join(", ")); + } +} + +const LABEL_MAX: usize = 8; +const DIR_W: usize = LABEL_MAX * 2 + 1; +const SCORER_W: usize = 28; + +fn thousands(n: u64) -> String { + let s = n.to_string(); + let mut out = String::with_capacity(s.len() + s.len() / 3); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + out.push(' '); + } + out.push(c); + } + out.chars().rev().collect() +} + +fn short_label(label: &str) -> String { + label.chars().take(LABEL_MAX).collect() +} + +fn fmt_est(est: Option<(u64, u64)>) -> String { + match est { + Some((lo, hi)) => format!("[{}, {}]", thousands(lo), thousands(hi)), + None => "unknown".into(), + } +} + +fn print_probing_perfomance(observers: &[&Node], all_nodes: &[&Node]) { + let labels: HashMap = + all_nodes.iter().chain(observers.iter()).map(|n| (n.node_id(), node_label(n))).collect(); + let label_of = |pk: PublicKey| { + short_label(&labels.get(&pk).cloned().unwrap_or_else(|| format!("{:.8}", pk))) + }; + + let mut by_scid: BTreeMap> = BTreeMap::new(); + for node in all_nodes { + let local_pk = node.node_id(); + for ch in node.list_channels() { + if let Some(scid) = ch.short_channel_id { + by_scid.entry(scid).or_default().push(( + local_pk, + ch.counterparty_node_id, + ch.outbound_capacity_msat, + )); + } + } + } + + print!("\n{:<15} {: 0).await; + assert!(went_up, "locked_msat never increased — no probe was dispatched"); + println!("First probe dispatched; locked_msat = {}", node_a.probe_locked_msat().unwrap()); + + let cleared = + wait_until(Duration::from_secs(20), || node_a.probe_locked_msat().unwrap_or(1) == 0).await; + assert!(cleared, "locked_msat never returned to zero after probe resolved"); + println!("Probe resolved; locked_msat = 0"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Verifies that no new probes are dispatched once the in-flight budget is exhausted. +/// +/// Exhaustion is triggered by stopping the intermediate node (B) while a probe HTLC +/// is in-flight, preventing resolution and keeping the budget locked. After B restarts +/// the HTLC fails, the budget clears, and probing resumes. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exhausted_probe_budget_blocks_new_probes() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Electrum(&electrsd); + + let node_b = setup_node(&chain_source, random_config(false)); + let node_c = setup_node(&chain_source, random_config(false)); + + // Use a slow probing interval so we can read capacity before the first probe fires. + let mut config_a = random_config(false); + let strategy = FixedDestStrategy::new(node_c.node_id(), PROBE_AMOUNT_MSAT); + config_a.probing = Some(TestProbingConfig { + strategy: TestProbingStrategy::Custom(strategy), + interval: Duration::from_secs(3), + max_locked_msat: PROBE_AMOUNT_MSAT, + diversity_penalty_msat: None, + }); + let node_a = setup_node(&chain_source, config_a); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a, addr_b], + Amount::from_sat(2_000_000), + ) + .await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + open_channel(&node_a, &node_b, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + node_b.sync_wallets().unwrap(); + open_channel(&node_b, &node_c, 1_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + node_c.sync_wallets().unwrap(); + + expect_channel_ready_event!(node_a, node_b.node_id()); + expect_event!(node_b, ChannelReady); + expect_event!(node_b, ChannelReady); + expect_event!(node_c, ChannelReady); + + // Record capacity before the first probe fires (interval is 3s, so we have time). + let capacity_at_open = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + + // Give gossip time to propagate to A, then wait for the first probe. + let locked = + wait_until(Duration::from_secs(15), || node_a.probe_locked_msat().unwrap_or(0) > 0).await; + assert!(locked, "no probe dispatched within 15 s"); + + // Capacity should have decreased due to the in-flight probe HTLC. + let capacity_with_probe = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .expect("A→B channel not found"); + assert!( + capacity_with_probe < capacity_at_open, + "HTLC not visible in channel state: capacity unchanged ({capacity_at_open} msat)" + ); + + // Stop B while the probe HTLC is in-flight. + node_b.stop().unwrap(); + + // Let several Prober ticks fire (interval is 3s); the budget is exhausted so + // they must be skipped. Wait, then check both conditions at once. + tokio::time::sleep(Duration::from_secs(5)).await; + assert!( + node_a.probe_locked_msat().unwrap_or(0) > 0, + "probe resolved unexpectedly while B was offline" + ); + let capacity_after_wait = node_a + .list_channels() + .iter() + .find(|ch| ch.counterparty_node_id == node_b.node_id()) + .map(|ch| ch.outbound_capacity_msat) + .unwrap_or(u64::MAX); + assert!( + capacity_after_wait >= capacity_with_probe, + "a new probe HTLC was sent despite budget being exhausted" + ); + + // Bring B back and explicitly reconnect to A and C so the stuck HTLC resolves + // without waiting for the background reconnection backoff. + node_b.start().unwrap(); + let node_a_addr = node_a.listening_addresses().unwrap().first().unwrap().clone(); + let node_c_addr = node_c.listening_addresses().unwrap().first().unwrap().clone(); + node_b.connect(node_a.node_id(), node_a_addr, false).unwrap(); + node_b.connect(node_c.node_id(), node_c_addr, false).unwrap(); + + let cleared = + wait_until(Duration::from_secs(15), || node_a.probe_locked_msat().unwrap_or(1) == 0).await; + assert!(cleared, "locked_msat never cleared after B came back online"); + + // Once the budget is freed, a new probe should be dispatched within a few ticks. + let new_probe = + wait_until(Duration::from_secs(10), || node_a.probe_locked_msat().unwrap_or(0) > 0).await; + assert!(new_probe, "no new probe dispatched after budget was freed"); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); + node_c.stop().unwrap(); +} + +/// Builds a random mesh of nodes, runs `RandomStrategy` and `HighDegreeStrategy` +/// probers alongside payment rounds, then prints scorer liquidity estimates to +/// compare probing coverage. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn probing_strategies_perfomance() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Electrum(&electrsd); + + let num_nodes = 5; + let channel_capacity_sat = 1_000_000u64; + // Each observer opens 1 channel; regular nodes open at most (num_nodes-1) each. + // num_nodes UTXOs per node is a safe upper bound for funding. + let utxos_per_node = num_nodes; + let utxo_per_channel = Amount::from_sat(channel_capacity_sat + 50_000); + + let mut nodes: Vec = Vec::new(); + for i in 0..num_nodes { + let label = char::from(b'B' + i as u8).to_string(); + let mut config = random_config(false); + let mut alias_bytes = [0u8; 32]; + alias_bytes[..label.as_bytes().len()].copy_from_slice(label.as_bytes()); + config.node_config.node_alias = Some(NodeAlias(alias_bytes)); + nodes.push(setup_node(&chain_source, config)); + } + let node_a = build_node_random_probing(&chain_source, 4); + let node_x = setup_node(&chain_source, config_with_label("nostrat")); + let node_y = build_node_highdegree_probing(&chain_source, 4); + let node_z = build_node_z_highdegree_probing(&chain_source, 4, PROBING_DIVERSITY_PENALTY); + + let seed = std::env::var("TEST_SEED") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or_else(|| rand::rng().random()); + println!("RNG seed: {seed} (re-run with TEST_SEED={seed} to reproduce)"); + let mut rng = StdRng::seed_from_u64(seed); + let channels_per_node = rng.random_range(1..=num_nodes - 1); + let channels_per_nodes: Vec = + (0..num_nodes).map(|_| rng.random_range(1..=channels_per_node)).collect(); + + let observer_nodes: [&Node; 4] = [&node_a, &node_y, &node_z, &node_x]; + + let mut addresses = Vec::new(); + for node in observer_nodes { + for _ in 0..utxos_per_node { + addresses.push(node.onchain_payment().new_address().unwrap()); + } + } + for node in &nodes { + for _ in 0..utxos_per_node { + addresses.push(node.onchain_payment().new_address().unwrap()); + } + } + + premine_and_distribute_funds(&bitcoind.client, &electrsd.client, addresses, utxo_per_channel) + .await; + + println!("distributed initial sats"); + for node in nodes.iter().chain(observer_nodes) { + node.sync_wallets().unwrap(); + } + + fn drain_events(node: &Node) { + while let Some(_) = node.next_event() { + node.event_handled().unwrap(); + } + } + + println!("opening channels"); + for node in observer_nodes { + let idx = rng.random_range(0..num_nodes); + open_channel_no_electrum_wait(node, &nodes[idx], channel_capacity_sat, true).await; + } + for (i, &count) in channels_per_nodes.iter().enumerate() { + let targets: Vec = (0..num_nodes).filter(|&j| j != i).take(count).collect(); + for j in targets { + open_channel_no_electrum_wait(&nodes[i], &nodes[j], channel_capacity_sat, true).await; + } + } + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + + for node in nodes.iter().chain(observer_nodes) { + node.sync_wallets().unwrap(); + } + for node in nodes.iter().chain(observer_nodes) { + drain_events(node); + } + + tokio::time::sleep(Duration::from_secs(3)).await; + + let mut node_map = HashMap::new(); + for (i, node) in nodes.iter().enumerate() { + node_map.insert(node.node_id(), i); + } + + let all_nodes: Vec<&Node> = nodes.iter().chain(observer_nodes).collect(); + + print_topology(&all_nodes); + + println!("\nbefore payments"); + print_probing_perfomance(&observer_nodes, &all_nodes); + + let desc = Bolt11InvoiceDescription::Direct(Description::new("test".to_string()).unwrap()); + for round in 0..10 { + let mut sent = 0u32; + for sender_idx in 0..num_nodes { + let channels: Vec<_> = nodes[sender_idx] + .list_channels() + .into_iter() + .filter(|ch| ch.is_channel_ready && ch.outbound_capacity_msat > 1_000) + .collect(); + if channels.is_empty() { + continue; + } + let ch = &channels[rng.random_range(0..channels.len())]; + let amount_msat = rng.random_range(1_000..=ch.outbound_capacity_msat.min(100_000_000)); + if let Some(&receiver_idx) = node_map.get(&ch.counterparty_node_id) { + let invoice = nodes[receiver_idx] + .bolt11_payment() + .receive(amount_msat, &desc.clone().into(), 3600) + .unwrap(); + if nodes[sender_idx].bolt11_payment().send(&invoice, None).is_ok() { + sent += 1; + } + } + } + println!("round {round}: sent {sent} payments"); + tokio::time::sleep(Duration::from_millis(500)).await; + for node in nodes.iter().chain(observer_nodes) { + drain_events(node); + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; + println!("\n=== after payments ==="); + print_probing_perfomance(&observer_nodes, &all_nodes); + + for node in nodes.iter().chain(observer_nodes) { + node.stop().unwrap(); + } +}