From 1efc540b3ae491604e0c3ec15ab0b2ebf04075b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 3 Mar 2026 16:54:50 -0300 Subject: [PATCH] refactor: store aggregated payloads in memory --- CLAUDE.md | 19 +- crates/blockchain/src/store.rs | 3 +- .../blockchain/tests/forkchoice_spectests.rs | 23 +- crates/storage/src/api/tables.rs | 10 +- crates/storage/src/backend/rocksdb.rs | 2 - crates/storage/src/store.rs | 256 ++++++------------ 6 files changed, 118 insertions(+), 195 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 2b42ce96..33ddcf17 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -328,7 +328,7 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads - `LiveChain` table provides fast `(slot||root) → parent_root` index for fork choice - Storage uses trait-based API: `StorageBackend` → `StorageReadView` (reads) + `StorageWriteBatch` (atomic writes) -### Storage Tables (10) +### Storage Tables (8) | Table | Key → Value | Purpose | |-------|-------------|---------| @@ -336,13 +336,24 @@ cargo test -p ethlambda-blockchain --test forkchoice_spectests -- --test-threads | `BlockBodies` | H256 → BlockBody | Block bodies (empty for genesis) | | `BlockSignatures` | H256 → BlockSignaturesWithAttestation | Signatures (absent for genesis) | | `States` | H256 → State | Beacon states by root | -| `LatestKnownAttestations` | u64 → AttestationData | Fork-choice-active attestations | -| `LatestNewAttestations` | u64 → AttestationData | Pending (pre-promotion) attestations | | `GossipSignatures` | SignatureKey → ValidatorSignature | Individual validator signatures | -| `AggregatedPayloads` | SignatureKey → Vec\ | Aggregated proofs | +| `AttestationDataByRoot` | H256 → AttestationData | Content-addressed attestation data | | `Metadata` | string → various | Store state (head, config, checkpoints) | | `LiveChain` | (slot\|\|root) → parent\_root | Fast fork choice traversal index | +### In-Memory Store Fields + +Two ephemeral maps live on `Store` as `Arc>` (not persisted to RocksDB): + +| Field | Type | Purpose | +|-------|------|---------| +| `known_aggregated_payloads` | SignatureKey → Vec\ | Fork-choice-active aggregated proofs | +| `new_aggregated_payloads` | SignatureKey → Vec\ | Pending aggregated proofs (promoted each interval) | + +Lock ordering invariant: always acquire `known_aggregated_payloads` before `new_aggregated_payloads`. + +**Note**: Existing RocksDB databases with the old `latest_new_aggregated_payloads` / `latest_known_aggregated_payloads` column families must be wiped before running the new binary — `open_cf_descriptors` fails if the DB contains CFs not in the descriptor list. + ### State Root Computation - Always computed via `tree_hash_root()` after full state transition - Must match proposer's pre-computed `block.state_root` diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 7aaa5c3d..b5469586 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -100,6 +100,7 @@ fn update_safe_target(store: &mut Store) { // node's self-attestation) would be invisible without this merge. let all_keys: HashSet = store .iter_known_aggregated_payload_keys() + .into_iter() .chain(store.iter_new_aggregated_payload_keys()) .collect(); let attestations = store.extract_latest_attestations(all_keys.into_iter()); @@ -765,7 +766,7 @@ pub fn produce_block_with_signatures( } // Single pass over known aggregated payloads: extract both attestation data and proofs - let known_payloads: Vec<_> = store.iter_known_aggregated_payloads().collect(); + let known_payloads = store.iter_known_aggregated_payloads(); let known_attestations = store.extract_latest_attestations(known_payloads.iter().map(|(key, _)| *key)); diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index e7222c34..cc83b1f5 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -284,12 +284,16 @@ fn validate_attestation_check( let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => { - st.extract_latest_attestations(st.iter_new_aggregated_payloads().map(|(key, _)| key)) - } - "known" => { - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)) - } + "new" => st.extract_latest_attestations( + st.iter_new_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ), + "known" => st.extract_latest_attestations( + st.iter_known_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -369,8 +373,11 @@ fn validate_lexicographic_head_among( } let blocks = st.get_live_chain(); - let known_attestations: HashMap = - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)); + let known_attestations: HashMap = st.extract_latest_attestations( + st.iter_known_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 7b184d59..7f7d7a3c 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -16,12 +16,6 @@ pub enum Table { GossipSignatures, /// Attestation data indexed by tree hash root: H256 -> AttestationData AttestationDataByRoot, - /// Pending aggregated payloads (not yet active in fork choice): - /// SignatureKey -> Vec - LatestNewAggregatedPayloads, - /// Active aggregated payloads (counted in fork choice): - /// SignatureKey -> Vec - LatestKnownAggregatedPayloads, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -33,15 +27,13 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 10] = [ +pub const ALL_TABLES: [Table; 8] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, Table::GossipSignatures, Table::AttestationDataByRoot, - Table::LatestNewAggregatedPayloads, - Table::LatestKnownAggregatedPayloads, Table::Metadata, Table::LiveChain, ]; diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 45565f18..b1338052 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -18,8 +18,6 @@ fn cf_name(table: Table) -> &'static str { Table::States => "states", Table::GossipSignatures => "gossip_signatures", Table::AttestationDataByRoot => "attestation_data_by_root", - Table::LatestNewAggregatedPayloads => "latest_new_aggregated_payloads", - Table::LatestKnownAggregatedPayloads => "latest_known_aggregated_payloads", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 2f22db21..b4aa49ae 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, LazyLock}; +use std::sync::{Arc, LazyLock, Mutex}; /// The tree hash root of an empty block body. /// @@ -138,9 +138,15 @@ fn decode_live_chain_key(bytes: &[u8]) -> (u64, H256) { /// /// - [`from_anchor_state`](Self::from_anchor_state): Initialize from a checkpoint state (no block body) /// - [`get_forkchoice_store`](Self::get_forkchoice_store): Initialize from state + block (stores body) +type AggregatedPayloadMap = HashMap>; + +/// Lock ordering invariant: always acquire `known_aggregated_payloads` before +/// `new_aggregated_payloads` when both locks are needed, to prevent deadlocks. #[derive(Clone)] pub struct Store { backend: Arc, + known_aggregated_payloads: Arc>, + new_aggregated_payloads: Arc>, } impl Store { @@ -276,7 +282,11 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - Self { backend } + Self { + backend, + known_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + new_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + } } // ============ Metadata Helpers ============ @@ -388,11 +398,9 @@ impl Store { // Prune signatures, payloads, and attestation data for finalized slots let pruned_sigs = self.prune_gossip_signatures(finalized.slot); let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); - self.prune_aggregated_payload_table(Table::LatestNewAggregatedPayloads, finalized.slot); - self.prune_aggregated_payload_table( - Table::LatestKnownAggregatedPayloads, - finalized.slot, - ); + // Lock ordering: known before new (field declaration order) + Self::prune_aggregated_payload_map(&self.known_aggregated_payloads, finalized.slot); + Self::prune_aggregated_payload_map(&self.new_aggregated_payloads, finalized.slot); // Prune old states before blocks: state pruning uses headers for slot lookup let protected_roots = [finalized.root, self.latest_justified().root]; let pruned_states = self.prune_old_states(&protected_roots); @@ -505,40 +513,13 @@ impl Store { }) } - /// Prune an aggregated payload table (new or known) for slots <= finalized_slot. - fn prune_aggregated_payload_table(&mut self, table: Table, finalized_slot: u64) { - let view = self.backend.begin_read().expect("read view"); - let mut updates = vec![]; - let mut deletes = vec![]; - - for (key_bytes, value_bytes) in view - .prefix_iterator(table, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Ok(mut payloads) = Vec::::from_ssz_bytes(&value_bytes) { - let original_len = payloads.len(); - payloads.retain(|p| p.slot > finalized_slot); - - if payloads.is_empty() { - deletes.push(key_bytes.to_vec()); - } else if payloads.len() < original_len { - updates.push((key_bytes.to_vec(), payloads.as_ssz_bytes())); - } - } - } - drop(view); - - if !updates.is_empty() || !deletes.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - if !updates.is_empty() { - batch.put_batch(table, updates).expect("put"); - } - if !deletes.is_empty() { - batch.delete_batch(table, deletes).expect("delete"); - } - batch.commit().expect("commit"); - } + /// Prune an in-memory aggregated payload map for slots <= finalized_slot. + fn prune_aggregated_payload_map(map: &Mutex, finalized_slot: u64) { + let mut guard = map.lock().expect("lock"); + guard.retain(|_, payloads| { + payloads.retain(|p| p.slot > finalized_slot); + !payloads.is_empty() + }); } /// Prune old states beyond the retention window. @@ -822,7 +803,11 @@ impl Store { /// Convenience: extract latest attestation per validator from known /// (fork-choice-active) aggregated payloads only. pub fn extract_latest_known_attestations(&self) -> HashMap { - self.extract_latest_attestations(self.iter_known_aggregated_payloads().map(|(key, _)| key)) + self.extract_latest_attestations( + self.iter_known_aggregated_payloads() + .into_iter() + .map(|(key, _)| key), + ) } // ============ Known Aggregated Payloads ============ @@ -830,34 +815,36 @@ impl Store { // "Known" aggregated payloads are active in fork choice weight calculations. // Promoted from "new" payloads at specific intervals (0 with proposal, 4). - /// Iterates over all known aggregated payloads. + /// Returns all known aggregated payloads. pub fn iter_known_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { - self.iter_aggregated_payloads(Table::LatestKnownAggregatedPayloads) + ) -> Vec<(SignatureKey, Vec)> { + Self::iter_aggregated_payloads_from(&self.known_aggregated_payloads) } - /// Iterates over keys only from the known aggregated payloads table, - /// skipping value deserialization. - pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator + '_ { - self.iter_aggregated_payload_keys(Table::LatestKnownAggregatedPayloads) + /// Returns keys from the known aggregated payloads map. + pub fn iter_known_aggregated_payload_keys(&self) -> Vec { + Self::iter_aggregated_payload_keys_from(&self.known_aggregated_payloads) } - /// Insert an aggregated payload into the known (fork-choice-active) table. + /// Insert an aggregated payload into the known (fork-choice-active) map. pub fn insert_known_aggregated_payload( &mut self, key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.insert_aggregated_payload(Table::LatestKnownAggregatedPayloads, key, payload); + Self::insert_aggregated_payloads_batch_to( + &self.known_aggregated_payloads, + vec![(key, payload)], + ); } - /// Batch-insert multiple aggregated payloads into the known table in a single commit. + /// Batch-insert multiple aggregated payloads into the known map. pub fn insert_known_aggregated_payloads_batch( &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.insert_aggregated_payloads_batch(Table::LatestKnownAggregatedPayloads, entries); + Self::insert_aggregated_payloads_batch_to(&self.known_aggregated_payloads, entries); } // ============ New Aggregated Payloads ============ @@ -865,34 +852,36 @@ impl Store { // "New" aggregated payloads are pending — not yet counted in fork choice. // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) aggregated payloads. + /// Returns all new (pending) aggregated payloads. pub fn iter_new_aggregated_payloads( &self, - ) -> impl Iterator)> + '_ { - self.iter_aggregated_payloads(Table::LatestNewAggregatedPayloads) + ) -> Vec<(SignatureKey, Vec)> { + Self::iter_aggregated_payloads_from(&self.new_aggregated_payloads) } - /// Iterates over keys only from the new aggregated payloads table, - /// skipping value deserialization. - pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator + '_ { - self.iter_aggregated_payload_keys(Table::LatestNewAggregatedPayloads) + /// Returns keys from the new aggregated payloads map. + pub fn iter_new_aggregated_payload_keys(&self) -> Vec { + Self::iter_aggregated_payload_keys_from(&self.new_aggregated_payloads) } - /// Insert an aggregated payload into the new (pending) table. + /// Insert an aggregated payload into the new (pending) map. pub fn insert_new_aggregated_payload( &mut self, key: SignatureKey, payload: StoredAggregatedPayload, ) { - self.insert_aggregated_payload(Table::LatestNewAggregatedPayloads, key, payload); + Self::insert_aggregated_payloads_batch_to( + &self.new_aggregated_payloads, + vec![(key, payload)], + ); } - /// Batch-insert multiple aggregated payloads into the new table in a single commit. + /// Batch-insert multiple aggregated payloads into the new map. pub fn insert_new_aggregated_payloads_batch( &mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { - self.insert_aggregated_payloads_batch(Table::LatestNewAggregatedPayloads, entries); + Self::insert_aggregated_payloads_batch_to(&self.new_aggregated_payloads, entries); } // ============ Pruning Helpers ============ @@ -932,130 +921,43 @@ impl Store { // ============ Aggregated Payload Helpers ============ - fn iter_aggregated_payloads( - &self, - table: Table, - ) -> impl Iterator)> { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| { - let key = decode_signature_key(&k); - let payloads = - Vec::::from_ssz_bytes(&v).expect("valid payloads"); - (key, payloads) - }) - .collect(); - entries.into_iter() - } - - fn iter_aggregated_payload_keys(&self, table: Table) -> impl Iterator { - let view = self.backend.begin_read().expect("read view"); - let keys: Vec<_> = view - .prefix_iterator(table, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, _)| decode_signature_key(&k)) - .collect(); - keys.into_iter() + fn iter_aggregated_payloads_from( + map: &Mutex, + ) -> Vec<(SignatureKey, Vec)> { + map.lock() + .expect("lock") + .iter() + .map(|(k, v)| (*k, v.clone())) + .collect() } - fn insert_aggregated_payload( - &mut self, - table: Table, - key: SignatureKey, - payload: StoredAggregatedPayload, - ) { - self.insert_aggregated_payloads_batch(table, vec![(key, payload)]); + fn iter_aggregated_payload_keys_from(map: &Mutex) -> Vec { + map.lock().expect("lock").keys().copied().collect() } - /// Batch-insert multiple aggregated payloads in a single read-write-commit cycle. - /// Groups entries by key to correctly handle multiple payloads for the same key. - fn insert_aggregated_payloads_batch( - &mut self, - table: Table, + fn insert_aggregated_payloads_batch_to( + map: &Mutex, entries: Vec<(SignatureKey, StoredAggregatedPayload)>, ) { if entries.is_empty() { return; } - - // Group entries by key to handle multiple payloads for the same key - let mut grouped: HashMap, Vec> = HashMap::new(); + let mut guard = map.lock().expect("lock"); for (key, payload) in entries { - let encoded_key = encode_signature_key(&key); - grouped.entry(encoded_key).or_default().push(payload); + guard.entry(key).or_default().push(payload); } - - let view = self.backend.begin_read().expect("read view"); - let mut batch_entries = Vec::new(); - - for (encoded_key, new_payloads) in grouped { - let mut payloads: Vec = view - .get(table, &encoded_key) - .expect("get") - .map(|bytes| Vec::::from_ssz_bytes(&bytes).expect("valid")) - .unwrap_or_default(); - payloads.extend(new_payloads); - batch_entries.push((encoded_key, payloads.as_ssz_bytes())); - } - drop(view); - - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .put_batch(table, batch_entries) - .expect("put aggregated payloads"); - batch.commit().expect("commit"); } /// Promotes all new aggregated payloads to known, making them active in fork choice. /// - /// Merges entries from `LatestNewAggregatedPayloads` into `LatestKnownAggregatedPayloads`, - /// appending to existing payload lists rather than overwriting them. + /// Drains `new_aggregated_payloads` and merges into `known_aggregated_payloads`. pub fn promote_new_aggregated_payloads(&mut self) { - let view = self.backend.begin_read().expect("read view"); - let new_entries: Vec<(Vec, Vec)> = view - .prefix_iterator(Table::LatestNewAggregatedPayloads, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .map(|(k, v)| (k.to_vec(), v.to_vec())) - .collect(); - - if new_entries.is_empty() { - drop(view); - return; + // Lock ordering: known before new (field declaration order) + let mut known_guard = self.known_aggregated_payloads.lock().expect("lock"); + let mut new_guard = self.new_aggregated_payloads.lock().expect("lock"); + for (key, new_payloads) in new_guard.drain() { + known_guard.entry(key).or_default().extend(new_payloads); } - - // Merge new payloads with existing known payloads - let merged: Vec<(Vec, Vec)> = new_entries - .iter() - .map(|(key, new_bytes)| { - let new_payloads = - Vec::::from_ssz_bytes(new_bytes).expect("valid"); - let mut known_payloads: Vec = view - .get(Table::LatestKnownAggregatedPayloads, key) - .expect("get") - .map(|bytes| { - Vec::::from_ssz_bytes(&bytes).expect("valid") - }) - .unwrap_or_default(); - known_payloads.extend(new_payloads); - (key.clone(), known_payloads.as_ssz_bytes()) - }) - .collect(); - drop(view); - - let keys_to_delete: Vec<_> = new_entries.into_iter().map(|(k, _)| k).collect(); - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::LatestNewAggregatedPayloads, keys_to_delete) - .expect("delete new aggregated payloads"); - batch - .put_batch(Table::LatestKnownAggregatedPayloads, merged) - .expect("put known aggregated payloads"); - batch.commit().expect("commit"); } /// Delete specific gossip signatures by key. @@ -1256,6 +1158,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + known_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + new_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), }; // Insert exactly BLOCKS_TO_KEEP blocks @@ -1280,6 +1184,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + known_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + new_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), }; let total = BLOCKS_TO_KEEP + 10; @@ -1318,6 +1224,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + known_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + new_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), }; let total = BLOCKS_TO_KEEP + 10; @@ -1361,6 +1269,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + known_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + new_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), }; // Insert STATES_TO_KEEP headers + states @@ -1382,6 +1292,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + known_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + new_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), }; let total = STATES_TO_KEEP + 5; @@ -1413,6 +1325,8 @@ mod tests { let backend = Arc::new(InMemoryBackend::new()); let mut store = Store { backend: backend.clone(), + known_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), + new_aggregated_payloads: Arc::new(Mutex::new(HashMap::new())), }; let total = STATES_TO_KEEP + 5;