Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6dcf95a
Update BestBlock to store ANTI_REORG_DELAY * 2 recent block hashes
TheBlueMatt Oct 12, 2025
4528201
f add claude's bestblock test
TheBlueMatt Jan 27, 2026
1494c92
f use hard-coded values to break constant changes leading to ser break
TheBlueMatt Jan 27, 2026
6fe5d47
f rebase
TheBlueMatt Feb 10, 2026
e220e58
Return `BestBlock` when deserializing chain-synced structs
TheBlueMatt Oct 12, 2025
5bec3c1
Replace `Cache::block_disconnected` with `blocks_disconnected`
Oct 12, 2025
ce00f7e
Pass a `BestBlock` to `init::synchronize_listeners`
Oct 12, 2025
40f2f2f
Make the `Cache` trait priv, just use `UnboundedCache` publicly
TheBlueMatt Jan 27, 2026
46319de
Make `UnboundedCache` bounded
TheBlueMatt Jan 27, 2026
ad61a7f
f correct cache retention on reorg
TheBlueMatt Jan 27, 2026
257876f
f correct + constify header cache limit
TheBlueMatt Jan 27, 2026
857ad96
Consolidate all the pub aync utils to `native_async`
TheBlueMatt Dec 8, 2025
c0334f2
f rebase
TheBlueMatt Mar 4, 2026
c2bfb61
Add `async_poll.rs` to `lightning-block-sync`
TheBlueMatt Oct 18, 2025
4276541
Fetch blocks from source in parallel during initial sync
TheBlueMatt Dec 7, 2025
fe461fd
f msrv
TheBlueMatt Feb 10, 2026
8c60ff4
f msrv
TheBlueMatt Feb 10, 2026
ce90a04
Silence "elided lifetime has a name" warnings in no-std locking
TheBlueMatt Dec 8, 2025
0e87bcf
Use the header cache across listeners during initial disconnect
TheBlueMatt Dec 8, 2025
7962e2a
Include recent blocks in the `synchronize_listeners`-returned cache
Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ use lightning::routing::utxo::UtxoLookup;
#[cfg(not(c_bindings))]
use lightning::sign::EntropySource;
use lightning::sign::{ChangeDestinationSource, ChangeDestinationSourceSync, OutputSpender};
#[cfg(not(c_bindings))]
use lightning::util::async_poll::MaybeSend;
use lightning::util::logger::Logger;
#[cfg(not(c_bindings))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit msg typo

use lightning::util::native_async::MaybeSend;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
Expand Down
1 change: 1 addition & 0 deletions lightning-block-sync/src/async_poll.rs
274 changes: 137 additions & 137 deletions lightning-block-sync/src/init.rs

Large diffs are not rendered by default.

194 changes: 138 additions & 56 deletions lightning-block-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
#![deny(rustdoc::broken_intra_doc_links)]
#![deny(rustdoc::private_intra_doc_links)]
#![deny(missing_docs)]
#![deny(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

extern crate alloc;
extern crate core;

#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
pub mod http;

Expand All @@ -42,6 +44,9 @@ mod test_utils;
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
mod utils;

#[allow(unused)]
mod async_poll;

use crate::poll::{ChainTip, Poll, ValidatedBlockHeader};

use bitcoin::block::{Block, Header};
Expand Down Expand Up @@ -170,18 +175,13 @@ pub enum BlockData {
/// sources for the best chain tip. During this process it detects any chain forks, determines which
/// constitutes the best chain, and updates the listener accordingly with any blocks that were
/// connected or disconnected since the last poll.
///
/// Block headers for the best chain are maintained in the parameterized cache, allowing for a
/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage.
/// Hence, there is a trade-off between a lower memory footprint and potentially increased network
/// I/O as headers are re-fetched during fork detection.
pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref>
pub struct SpvClient<P: Poll, L: Deref>
where
L::Target: chain::Listen,
{
chain_tip: ValidatedBlockHeader,
chain_poller: P,
chain_notifier: ChainNotifier<'a, C, L>,
chain_notifier: ChainNotifier<HeaderCache, L>,
}

/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
Expand All @@ -194,37 +194,86 @@ where
/// Implementations may define how long to retain headers such that it's unlikely they will ever be
/// needed to disconnect a block. In cases where block sources provide access to headers on stale
/// forks reliably, caches may be entirely unnecessary.
pub trait Cache {
pub(crate) trait Cache {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the trait still needed or can it be removed if fixed types are used internally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually yea, see new commit, we do :/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the Cache trait is still present, only not public?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right the new commit on the end now uses it to implement a wrapper.

/// Retrieves the block header keyed by the given block hash.
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>;

/// Inserts the given block header during a find_difference operation, implying it might not be
/// the best header.
fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);

/// Called when a block has been connected to the best chain to ensure it is available to be
/// disconnected later if needed.
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader);

/// Called when a block has been disconnected from the best chain. Once disconnected, a block's
/// header is no longer needed and thus can be removed.
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader>;
/// Called when blocks have been disconnected from the best chain. Only the fork point
/// (best comon ancestor) is provided.
///
/// Once disconnected, a block's header is no longer needed and thus can be removed.
fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader);
}

/// The maximum number of [`ValidatedBlockHeader`]s stored in a [`HeaderCache`].
pub const HEADER_CACHE_LIMIT: u32 = 6 * 24 * 7;

/// Bounded cache of block headers keyed by block hash.
///
/// Retains only the latest [`HEADER_CACHE_LIMIT`] block headers based on height.
pub struct HeaderCache(std::collections::HashMap<BlockHash, ValidatedBlockHeader>);

impl HeaderCache {
/// Creates a new empty header cache.
pub fn new() -> Self {
Self(std::collections::HashMap::new())
}
}

/// Unbounded cache of block headers keyed by block hash.
pub type UnboundedCache = std::collections::HashMap<BlockHash, ValidatedBlockHeader>;
impl Cache for HeaderCache {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.0.get(block_hash)
}

fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
self.0.insert(block_hash, block_header);

// Remove headers older than our newest header minus a week.
let best_height = self.0.iter().map(|(_, header)| header.height).max().unwrap_or(0);
let cutoff_height = best_height.saturating_sub(HEADER_CACHE_LIMIT);
self.0.retain(|_, header| header.height >= cutoff_height);
}

fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
self.0.insert(block_hash, block_header);

// Remove headers older than a week.
let cutoff_height = block_header.height.saturating_sub(HEADER_CACHE_LIMIT);
self.0.retain(|_, header| header.height >= cutoff_height);
}

fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) {
self.0.retain(|_, block_info| block_info.height <= fork_point.height);
}
}

impl Cache for UnboundedCache {
impl Cache for &mut HeaderCache {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.get(block_hash)
self.0.get(block_hash)
}

fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
(*self).insert_during_diff(block_hash, block_header);
}

fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
self.insert(block_hash, block_header);
(*self).block_connected(block_hash, block_header);
}

fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
self.remove(block_hash)
fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) {
self.0.retain(|_, block_info| block_info.height <= fork_point.height);
}
}

impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L>
impl<P: Poll, L: Deref> SpvClient<P, L>
where
L::Target: chain::Listen,
{
Expand All @@ -239,7 +288,7 @@ where
///
/// [`poll_best_tip`]: SpvClient::poll_best_tip
pub fn new(
chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C,
chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: HeaderCache,
chain_listener: L,
) -> Self {
let chain_notifier = ChainNotifier { header_cache, chain_listener };
Expand Down Expand Up @@ -293,15 +342,15 @@ where
/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
///
/// [listeners]: lightning::chain::Listen
pub struct ChainNotifier<'a, C: Cache, L: Deref>
pub(crate) struct ChainNotifier<C: Cache, L: Deref>
where
L::Target: chain::Listen,
{
/// Cache for looking up headers before fetching from a block source.
header_cache: &'a mut C,
pub(crate) header_cache: C,

/// Listener that will be notified of connected or disconnected blocks.
chain_listener: L,
pub(crate) chain_listener: L,
}

/// Changes made to the chain between subsequent polls that transformed it from having one chain tip
Expand All @@ -315,14 +364,11 @@ struct ChainDifference {
/// If there are any disconnected blocks, this is where the chain forked.
common_ancestor: ValidatedBlockHeader,

/// Blocks that were disconnected from the chain since the last poll.
disconnected_blocks: Vec<ValidatedBlockHeader>,

/// Blocks that were connected to the chain since the last poll.
connected_blocks: Vec<ValidatedBlockHeader>,
}

impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L>
impl<C: Cache, L: Deref> ChainNotifier<C, L>
where
L::Target: chain::Listen,
{
Expand All @@ -338,23 +384,66 @@ where
chain_poller: &mut P,
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
let difference = self
.find_difference(new_header, old_header, chain_poller)
.find_difference_from_header(new_header, old_header, chain_poller)
.await
.map_err(|e| (e, None))?;
self.disconnect_blocks(difference.disconnected_blocks);
if difference.common_ancestor != *old_header {
self.disconnect_blocks(difference.common_ancestor);
}
self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller)
.await
}

/// Returns the changes needed to produce the chain with `current_header` as its tip from the
/// chain with `prev_best_block` as its tip.
///
/// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks`
/// field as fallback if needed, then finds the common ancestor.
async fn find_difference_from_best_block<P: Poll>(
&mut self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock,
chain_poller: &mut P,
) -> BlockSourceResult<ChainDifference> {
// Try to resolve the header for the previous best block. First try the block_hash,
// then fall back to previous_blocks if that fails.
let cur_tip = core::iter::once((0, &prev_best_block.block_hash));
let prev_tips =
prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| {
if let Some(block_hash) = hash_opt {
Some((idx as u32 + 1, block_hash))
} else {
None
}
});
let mut found_header = None;
for (height_diff, block_hash) in cur_tip.chain(prev_tips) {
if let Some(header) = self.header_cache.look_up(block_hash) {
found_header = Some(*header);
break;
}
let height = prev_best_block.height.checked_sub(height_diff).ok_or(
BlockSourceError::persistent("BestBlock had more previous_blocks than its height"),
)?;
if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await {
found_header = Some(header);
self.header_cache.insert_during_diff(*block_hash, header);
break;
}
}
let found_header = found_header.ok_or_else(|| {
BlockSourceError::persistent("could not resolve any block from BestBlock")
})?;

self.find_difference_from_header(current_header, &found_header, chain_poller).await
}

/// Returns the changes needed to produce the chain with `current_header` as its tip from the
/// chain with `prev_header` as its tip.
///
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
async fn find_difference<P: Poll>(
async fn find_difference_from_header<P: Poll>(
&self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader,
chain_poller: &mut P,
) -> BlockSourceResult<ChainDifference> {
let mut disconnected_blocks = Vec::new();
let mut connected_blocks = Vec::new();
let mut current = current_header;
let mut previous = *prev_header;
Expand All @@ -369,7 +458,6 @@ where
let current_height = current.height;
let previous_height = previous.height;
if current_height <= previous_height {
disconnected_blocks.push(previous);
previous = self.look_up_previous_header(chain_poller, &previous).await?;
}
if current_height >= previous_height {
Expand All @@ -379,7 +467,7 @@ where
}

let common_ancestor = current;
Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks })
Ok(ChainDifference { common_ancestor, connected_blocks })
}

/// Returns the previous header for the given header, either by looking it up in the cache or
Expand All @@ -394,16 +482,10 @@ where
}

/// Notifies the chain listeners of disconnected blocks.
fn disconnect_blocks(&mut self, disconnected_blocks: Vec<ValidatedBlockHeader>) {
for header in disconnected_blocks.iter() {
if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) {
assert_eq!(cached_header, *header);
}
}
if let Some(block) = disconnected_blocks.last() {
let fork_point = BestBlock::new(block.header.prev_blockhash, block.height - 1);
self.chain_listener.blocks_disconnected(fork_point);
}
fn disconnect_blocks(&mut self, fork_point: ValidatedBlockHeader) {
self.header_cache.blocks_disconnected(&fork_point);
let best_block = BestBlock::new(fork_point.block_hash, fork_point.height);
self.chain_listener.blocks_disconnected(best_block);
}

/// Notifies the chain listeners of connected blocks.
Expand Down Expand Up @@ -447,9 +529,9 @@ mod spv_client_tests {
let best_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(best_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -466,9 +548,9 @@ mod spv_client_tests {
let common_tip = chain.tip();

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(common_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -486,9 +568,9 @@ mod spv_client_tests {
let old_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -506,9 +588,9 @@ mod spv_client_tests {
let old_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -526,9 +608,9 @@ mod spv_client_tests {
let old_tip = chain.at_height(1);

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(old_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -547,9 +629,9 @@ mod spv_client_tests {
let worse_tip = chain.tip();

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let cache = HeaderCache::new();
let mut listener = NullChainListener {};
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
let mut client = SpvClient::new(best_tip, poller, cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand Down
Loading
Loading