Skip to content
Open
1 change: 1 addition & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl TestChainMonitor {
Arc::clone(&persister),
Arc::clone(&keys),
keys.get_peer_storage_key(),
false,
)),
logger,
keys,
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger + MaybeSend + MaybeSync>
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
false,
));

let network = Network::Bitcoin;
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/lsps_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) {
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
false,
));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
Expand Down
81 changes: 80 additions & 1 deletion lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,15 @@ where

let mut futures = Joiner::new();

// We capture pending_operation_count inside the persistence branch to
// avoid a race: ChannelManager handlers queue deferred monitor ops
// before the persistence flag is set. Capturing outside would let us
// observe pending ops while the flag is still unset, causing us to
// flush monitor writes without persisting the ChannelManager.
let mut pending_monitor_writes = 0;

if channel_manager.get_cm().get_and_clear_needs_persistence() {
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
log_trace!(logger, "Persisting ChannelManager...");

let fut = async {
Expand Down Expand Up @@ -1317,6 +1325,10 @@ where
res?;
}

// Flush monitor operations that were pending before we persisted. New updates
// that arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Expand Down Expand Up @@ -1373,6 +1385,7 @@ where
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1381,6 +1394,10 @@ where
channel_manager.get_cm().encode(),
)
.await?;

// Flush monitor operations that were pending before final persistence.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(ref scorer) = scorer {
kv_store
.write(
Expand Down Expand Up @@ -1684,7 +1701,17 @@ impl BackgroundProcessor {
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}

// We capture pending_operation_count inside the persistence
// branch to avoid a race: ChannelManager handlers queue
// deferred monitor ops before the persistence flag is set.
// Capturing outside would let us observe pending ops while the
// flag is still unset, causing us to flush monitor writes
// without persisting the ChannelManager.
let mut pending_monitor_writes = 0;

if channel_manager.get_cm().get_and_clear_needs_persistence() {
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1695,6 +1722,10 @@ impl BackgroundProcessor {
log_trace!(logger, "Done persisting ChannelManager.");
}

// Flush monitor operations that were pending before we persisted. New
// updates that arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
Expand Down Expand Up @@ -1809,12 +1840,17 @@ impl BackgroundProcessor {
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;

// Flush monitor operations that were pending before final persistence.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down Expand Up @@ -1896,9 +1932,10 @@ mod tests {
use bitcoin::transaction::{Transaction, TxOut};
use bitcoin::{Amount, ScriptBuf, Txid};
use core::sync::atomic::{AtomicBool, Ordering};
use lightning::chain::chainmonitor;
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::transaction::OutPoint;
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
use lightning::chain::{BestBlock, Confirm, Filter};
use lightning::events::{Event, PathFailure, ReplayEvent};
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{
Expand Down Expand Up @@ -2444,6 +2481,7 @@ mod tests {
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
true,
));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
Expand Down Expand Up @@ -2567,6 +2605,8 @@ mod tests {
(persist_dir, nodes)
}

/// Opens a channel between two nodes without a running `BackgroundProcessor`,
/// so deferred monitor operations are flushed manually at each step.
macro_rules! open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
begin_open_channel!($node_a, $node_b, $channel_value);
Expand All @@ -2582,19 +2622,31 @@ mod tests {
tx.clone(),
)
.unwrap();
// funding_transaction_generated does not call watch_channel, so no
// deferred op is queued and FundingCreated is available immediately.
let msg_a = get_event_msg!(
$node_a,
MessageSendEvent::SendFundingCreated,
$node_b.node.get_our_node_id()
);
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
// Flush node_b's new monitor (watch_channel) so it releases the
// FundingSigned message.
$node_b
.chain_monitor
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
get_event!($node_b, Event::ChannelPending);
let msg_b = get_event_msg!(
$node_b,
MessageSendEvent::SendFundingSigned,
$node_a.node.get_our_node_id()
);
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
// Flush node_a's new monitor (watch_channel) queued by
// handle_funding_signed.
$node_a
.chain_monitor
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
get_event!($node_a, Event::ChannelPending);
tx
}};
Expand Down Expand Up @@ -2720,6 +2772,20 @@ mod tests {
confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
}

/// Waits until the background processor has flushed all pending deferred monitor
/// operations for the given node. Panics if the pending count does not reach zero
/// within `EVENT_DEADLINE`.
fn wait_for_flushed(chain_monitor: &ChainMonitor) {
let start = std::time::Instant::now();
while chain_monitor.pending_operation_count() > 0 {
assert!(
start.elapsed() < EVENT_DEADLINE,
"Pending monitor operations were not flushed within deadline"
);
std::thread::sleep(Duration::from_millis(10));
}
}

#[test]
fn test_background_processor() {
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
Expand Down Expand Up @@ -3060,11 +3126,21 @@ mod tests {
.node
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
.unwrap();
// funding_transaction_generated does not call watch_channel, so no deferred op is
// queued and the FundingCreated message is available immediately.
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
// Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
// events and FundingSigned are released.
nodes[1]
.chain_monitor
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
get_event!(nodes[1], Event::ChannelPending);
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
// Wait for the bg processor to flush the new monitor (watch_channel) queued by
// handle_funding_signed.
wait_for_flushed(&nodes[0].chain_monitor);
channel_pending_recv
.recv_timeout(EVENT_DEADLINE)
.expect("ChannelPending not handled within deadline");
Expand Down Expand Up @@ -3125,6 +3201,9 @@ mod tests {
error_message.to_string(),
)
.unwrap();
// Wait for the bg processor to flush the monitor update triggered by force close
// so the commitment tx is broadcast.
wait_for_flushed(&nodes[0].chain_monitor);
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);

Expand Down
Loading
Loading