From 562e9d04495417a166486d848af851ae8a83e825 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 15 May 2024 09:18:45 +0300 Subject: [PATCH] Implement unconditional reconstruction for supernodes (#5781) * Implement unconditional reconstruction for supernodes * Move code into KzgVerifiedCustodyDataColumn * Remove expect * Add test * Thanks justin --- Cargo.lock | 4 +- Cargo.toml | 2 +- beacon_node/beacon_chain/src/beacon_chain.rs | 66 +++++++++-- .../beacon_chain/src/block_verification.rs | 7 +- .../src/data_availability_checker.rs | 22 +++- .../overflow_lru_cache.rs | 93 +++++++++++++-- .../src/data_column_verification.rs | 25 ++++ beacon_node/beacon_chain/src/kzg_utils.rs | 59 ++++++++++ beacon_node/beacon_chain/src/metrics.rs | 8 ++ beacon_node/http_api/src/publish_blocks.rs | 7 +- .../gossip_methods.rs | 73 ++++++++---- .../network_beacon_processor/sync_methods.rs | 38 +++--- consensus/types/src/data_column_sidecar.rs | 108 +++++++++++++++++- consensus/types/src/data_column_subnet_id.rs | 9 +- crypto/kzg/src/lib.rs | 19 +++ 15 files changed, 460 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce71a00af37..d1d7d3bf773 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1131,8 +1131,8 @@ dependencies = [ [[package]] name = "c-kzg" -version = "1.0.0" -source = "git+https://github.com/ethereum/c-kzg-4844?branch=das#e08f22ef65a5ba4ea808e0d3a9e845fbd6faea2f" +version = "1.0.2" +source = "git+https://github.com/ethereum/c-kzg-4844?rev=114fa0382990e9b74b1f90f3b0dc5f97c2f8a7ad#114fa0382990e9b74b1f90f3b0dc5f97c2f8a7ad" dependencies = [ "blst", "cc", diff --git a/Cargo.toml b/Cargo.toml index a43db3dfc53..a6f311f58ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,7 +92,7 @@ bytes = "1" # TODO(das): switch to c-kzg crate before merging back to unstable (and disable default-features) if possible # Turn off c-kzg's default features which include `blst/portable`. We can turn on blst's portable # feature ourselves when desired. -c-kzg = { git = "https://github.com/ethereum/c-kzg-4844", branch = "das" } +c-kzg = { git = "https://github.com/ethereum/c-kzg-4844", rev = "114fa0382990e9b74b1f90f3b0dc5f97c2f8a7ad" } clap = "2" compare_fields_derive = { path = "common/compare_fields_derive" } criterion = "0.3" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0a6a4666d13..69a0b0c6229 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -22,6 +22,7 @@ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, + DataColumnsToPublish, }; use crate::data_column_verification::{ CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn, @@ -3028,7 +3029,13 @@ impl BeaconChain { pub async fn process_gossip_data_column( self: &Arc, data_column: GossipVerifiedDataColumn, - ) -> Result> { + ) -> Result< + ( + AvailabilityProcessingStatus, + DataColumnsToPublish, + ), + BlockError, + > { let block_root = data_column.block_root(); // If this block has already been imported to forkchoice it must have been available, so @@ -3044,7 +3051,7 @@ impl BeaconChain { let r = self .check_gossip_data_column_availability_and_import(data_column) .await; - self.remove_notified(&block_root, r) + self.remove_notified_custody_columns(&block_root, r) } /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was @@ -3087,7 +3094,13 @@ impl BeaconChain { self: &Arc, block_root: Hash256, custody_columns: Vec>, - ) -> Result> { + ) -> Result< + ( + AvailabilityProcessingStatus, + DataColumnsToPublish, + ), + BlockError, + > { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its columns again. if self @@ -3109,7 +3122,7 @@ impl BeaconChain { let r = self .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) .await; - self.remove_notified(&block_root, r) + self.remove_notified_custody_columns(&block_root, r) } /// Remove any block components from the *processing cache* if we no longer require them. If the @@ -3127,6 +3140,23 @@ impl BeaconChain { r } + /// Remove any block components from the *processing cache* if we no longer require them. If the + /// block was imported full or erred, we no longer require them. + fn remove_notified_custody_columns

( + &self, + block_root: &Hash256, + r: Result<(AvailabilityProcessingStatus, P), BlockError>, + ) -> Result<(AvailabilityProcessingStatus, P), BlockError> { + let has_missing_components = matches!( + r, + Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _)) + ); + if !has_missing_components { + self.reqresp_pre_import_cache.write().remove(block_root); + } + r + } + /// Wraps `process_block` in logic to cache the block's commitments in the processing cache /// and evict if the block was imported or errored. pub async fn process_block_with_early_caching>( @@ -3362,16 +3392,24 @@ impl BeaconChain { async fn check_gossip_data_column_availability_and_import( self: &Arc, data_column: GossipVerifiedDataColumn, - ) -> Result> { + ) -> Result< + ( + AvailabilityProcessingStatus, + DataColumnsToPublish, + ), + BlockError, + > { let slot = data_column.slot(); if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(data_column.signed_block_header()); } - let availability = self + let (availability, data_columns_to_publish) = self .data_availability_checker .put_gossip_data_column(data_column)?; - self.process_availability(slot, availability).await + self.process_availability(slot, availability) + .await + .map(|result| (result, data_columns_to_publish)) } /// Checks if the provided blobs can make any cached blocks available, and imports immediately @@ -3419,7 +3457,13 @@ impl BeaconChain { slot: Slot, block_root: Hash256, custody_columns: Vec>, - ) -> Result> { + ) -> Result< + ( + AvailabilityProcessingStatus, + DataColumnsToPublish, + ), + BlockError, + > { // Need to scope this to ensure the lock is dropped before calling `process_availability` // Even an explicit drop is not enough to convince the borrow checker. { @@ -3443,11 +3487,13 @@ impl BeaconChain { } } } - let availability = self + let (availability, data_columns_to_publish) = self .data_availability_checker .put_rpc_custody_columns(block_root, custody_columns)?; - self.process_availability(slot, availability).await + self.process_availability(slot, availability) + .await + .map(|result| (result, data_columns_to_publish)) } /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0a49e107042..39760e860f4 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -799,12 +799,7 @@ fn build_gossip_verified_data_columns( let mut gossip_verified_data_columns = vec![]; for sidecar in sidecars { let subnet = - DataColumnSubnetId::try_from_column_index::(sidecar.index as usize) - .map_err(|_| { - BlockContentsError::::DataColumnSidecarError( - DataColumnSidecarError::DataColumnIndexOutOfBounds, - ) - })?; + DataColumnSubnetId::from_column_index::(sidecar.index as usize); let column = GossipVerifiedDataColumn::new(sidecar, subnet.into(), chain)?; gossip_verified_data_columns.push(column); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 70f72b80122..0ccee82641c 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -31,6 +31,8 @@ pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCh use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList}; use types::non_zero_usize::new_non_zero_usize; +pub use self::overflow_lru_cache::DataColumnsToPublish; + /// The LRU Cache stores `PendingComponents` which can store up to /// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So /// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this @@ -187,11 +189,13 @@ impl DataAvailabilityChecker { /// Put a list of custody columns received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. + #[allow(clippy::type_complexity)] pub fn put_rpc_custody_columns( &self, block_root: Hash256, custody_columns: Vec>, - ) -> Result, AvailabilityCheckError> { + ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> + { let Some(kzg) = self.kzg.as_ref() else { return Err(AvailabilityCheckError::KzgNotInitialized); }; @@ -203,8 +207,11 @@ impl DataAvailabilityChecker { .map(|c| KzgVerifiedCustodyDataColumn::new(c, kzg)) .collect::, _>>()?; - self.availability_cache - .put_kzg_verified_data_columns(block_root, verified_custody_columns) + self.availability_cache.put_kzg_verified_data_columns( + kzg, + block_root, + verified_custody_columns, + ) } /// Check if we've cached other blobs for this block. If it completes a set and we also @@ -225,10 +232,15 @@ impl DataAvailabilityChecker { /// Otherwise cache the data column sidecar. /// /// This should only accept gossip verified data columns, so we should not have to worry about dupes. + #[allow(clippy::type_complexity)] pub fn put_gossip_data_column( &self, gossip_data_column: GossipVerifiedDataColumn, - ) -> Result, AvailabilityCheckError> { + ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> + { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; let block_root = gossip_data_column.block_root(); // TODO(das): ensure that our custody requirements include this column @@ -236,7 +248,7 @@ impl DataAvailabilityChecker { KzgVerifiedCustodyDataColumn::from_asserted_custody(gossip_data_column.into_inner()); self.availability_cache - .put_kzg_verified_data_columns(block_root, vec![custody_column]) + .put_kzg_verified_data_columns(kzg, block_root, vec![custody_column]) } /// Check if we have all the blobs for a block. Returns `Availability` which has information diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 62fe3674122..8c7f1a77d51 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -35,11 +35,13 @@ use crate::block_verification_types::{ }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; +use crate::metrics; use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; +use kzg::Kzg; use lru::LruCache; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; -use slog::{trace, Logger}; +use slog::{debug, trace, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, VariableList}; @@ -49,6 +51,8 @@ use types::blob_sidecar::BlobIdentifier; use types::data_column_sidecar::DataColumnIdentifier; use types::{BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256}; +pub type DataColumnsToPublish = Option>>>; + /// This represents the components of a partially available block /// /// The blobs are all gossip and kzg verified. @@ -230,7 +234,7 @@ impl PendingComponents { /// matches the number of expected blobs / custody columns. pub fn is_available( &self, - block_import_requirement: BlockImportRequirement, + block_import_requirement: &BlockImportRequirement, log: &Logger, ) -> bool { match block_import_requirement { @@ -259,7 +263,7 @@ impl PendingComponents { if let Some(num_expected_blobs) = self.num_expected_blobs() { // No data columns when there are 0 blobs - num_expected_blobs == 0 || num_expected_columns == num_received_data_columns + num_expected_blobs == 0 || *num_expected_columns == num_received_data_columns } else { false } @@ -807,13 +811,16 @@ impl OverflowLRUCache { } } + #[allow(clippy::type_complexity)] pub fn put_kzg_verified_data_columns< I: IntoIterator>, >( &self, + kzg: &Kzg, block_root: Hash256, kzg_verified_data_columns: I, - ) -> Result, AvailabilityCheckError> { + ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> + { let mut write_lock = self.critical.write(); // Grab existing entry or create a new entry. @@ -825,22 +832,86 @@ impl OverflowLRUCache { pending_components.merge_data_columns(kzg_verified_data_columns)?; let block_import_requirement = self.block_import_requirement(&pending_components)?; - if pending_components.is_available(block_import_requirement, &self.log) { + + // Potentially trigger reconstruction if: + // - Our custody requirement is all columns + // - We >= 50% of columns + let data_columns_to_publish = + if self.should_reconstruct(&block_import_requirement, &pending_components) { + let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); + + let existing_column_indices = pending_components + .verified_data_columns + .iter() + .map(|d| d.index()) + .collect::>(); + + // Will only return an error if: + // - < 50% of columns + // - There are duplicates + let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( + kzg, + &pending_components.verified_data_columns, + )?; + + let data_columns_to_publish = all_data_columns + .iter() + .filter(|d| !existing_column_indices.contains(&d.index())) + .map(|d| d.clone_arc()) + .collect::>(); + + pending_components.verified_data_columns = all_data_columns.into(); + + metrics::stop_timer(timer); + metrics::inc_counter_by( + &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, + data_columns_to_publish.len() as u64, + ); + debug!(self.log, "Reconstructed columns"; "count" => data_columns_to_publish.len()); + + Some(data_columns_to_publish) + } else { + None + }; + + if pending_components.is_available(&block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(&self.spec, |diet_block| { - self.state_cache.recover_pending_executed_block(diet_block) - }) + pending_components + .make_available(&self.spec, |diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) + .map(|availability| (availability, data_columns_to_publish)) } else { write_lock.put_pending_components( block_root, pending_components, &self.overflow_store, )?; - Ok(Availability::MissingComponents(block_root)) + Ok(( + Availability::MissingComponents(block_root), + data_columns_to_publish, + )) } } + /// Potentially trigger reconstruction if: + /// - Our custody requirement is all columns + /// - We >= 50% of columns + fn should_reconstruct( + &self, + block_import_requirement: &BlockImportRequirement, + pending_components: &PendingComponents, + ) -> bool { + let BlockImportRequirement::CustodyColumns(num_expected_columns) = block_import_requirement + else { + return false; + }; + + *num_expected_columns == T::EthSpec::number_of_columns() + && pending_components.verified_data_columns.len() >= T::EthSpec::number_of_columns() / 2 + } + pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, @@ -865,7 +936,7 @@ impl OverflowLRUCache { pending_components.merge_blobs(fixed_blobs); let block_import_requirement = self.block_import_requirement(&pending_components)?; - if pending_components.is_available(block_import_requirement, &self.log) { + if pending_components.is_available(&block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(&self.spec, |diet_block| { @@ -905,7 +976,7 @@ impl OverflowLRUCache { // Check if we have all components and entire set is consistent. let block_import_requirement = self.block_import_requirement(&pending_components)?; - if pending_components.is_available(block_import_requirement, &self.log) { + if pending_components.is_available(&block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(&self.spec, |diet_block| { diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 7951c3dde9e..b872896a4b0 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -235,6 +235,31 @@ impl KzgVerifiedCustodyDataColumn { }) } + pub fn reconstruct_columns( + kzg: &Kzg, + partial_set_of_columns: &[Self], + ) -> Result, KzgError> { + // Will only return an error if: + // - < 50% of columns + // - There are duplicates + let all_data_columns = DataColumnSidecar::reconstruct( + kzg, + &partial_set_of_columns + .iter() + .map(|d| d.clone_arc()) + .collect::>(), + )?; + + Ok(all_data_columns + .into_iter() + .map(|d| { + KzgVerifiedCustodyDataColumn::from_asserted_custody(KzgVerifiedDataColumn { + data: d, + }) + }) + .collect::>()) + } + pub fn into_inner(self) -> Arc> { self.data } diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 69888235365..3f5aa46203e 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -128,3 +128,62 @@ pub fn verify_kzg_proof( ) -> Result { kzg.verify_kzg_proof(kzg_commitment, &z.0.into(), &y.0.into(), kzg_proof) } + +#[cfg(test)] +mod test { + use bls::Signature; + use eth2_network_config::TRUSTED_SETUP_BYTES; + use kzg::{Kzg, KzgCommitment, TrustedSetup}; + use types::{ + beacon_block_body::KzgCommitments, BeaconBlock, BeaconBlockDeneb, Blob, BlobsList, + ChainSpec, DataColumnSidecar, EmptyBlock, EthSpec, MainnetEthSpec, SignedBeaconBlock, + }; + + #[test] + fn build_and_reconstruct() { + type E = MainnetEthSpec; + let num_of_blobs = 6; + let spec = E::default_spec(); + let (signed_block, blob_sidecars) = create_test_block_and_blobs::(num_of_blobs, &spec); + + let trusted_setup: TrustedSetup = serde_json::from_reader(TRUSTED_SETUP_BYTES) + .map_err(|e| format!("Unable to read trusted setup file: {}", e)) + .expect("should have trusted setup"); + let kzg = Kzg::new_from_trusted_setup(trusted_setup).expect("should create kzg"); + + let column_sidecars = + DataColumnSidecar::build_sidecars(&blob_sidecars, &signed_block, &kzg).unwrap(); + + // Now reconstruct + let reconstructed_columns = DataColumnSidecar::reconstruct( + &kzg, + &column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2], + ) + .unwrap(); + + for i in 0..E::number_of_columns() { + assert_eq!(reconstructed_columns.get(i), column_sidecars.get(i), "{i}"); + } + } + + fn create_test_block_and_blobs( + num_of_blobs: usize, + spec: &ChainSpec, + ) -> (SignedBeaconBlock, BlobsList) { + let mut block = BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec)); + let mut body = block.body_mut(); + let blob_kzg_commitments = body.blob_kzg_commitments_mut().unwrap(); + *blob_kzg_commitments = + KzgCommitments::::new(vec![KzgCommitment::empty_for_testing(); num_of_blobs]) + .unwrap(); + + let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); + + let blobs = (0..num_of_blobs) + .map(|_| Blob::::default()) + .collect::>() + .into(); + + (signed_block, blobs) + } +} diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index c4ff0d3bc05..b6eb6e0e4d7 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1180,6 +1180,14 @@ lazy_static! { "data_availability_overflow_store_cache_size", "Number of entries in the data availability overflow store cache." ); + pub static ref DATA_AVAILABILITY_RECONSTRUCTION_TIME: Result = try_create_histogram( + "data_availability_reconstruction_time_seconds", + "Time taken to reconstruct columns" + ); + pub static ref DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS: Result = try_create_int_counter( + "data_availability_reconstructed_columns_total", + "Total count of reconstructed columns" + ); /* * light_client server metrics diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 896448176a0..8219f97b2a5 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -118,12 +118,9 @@ pub async fn publish_block( + let subnet = DataColumnSubnetId::from_column_index::( data_col.index as usize, - ) - .map_err(|e| { - BeaconChainError::UnableToBuildColumnSidecar(format!("{e:?}")) - })?; + ); pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new(( subnet, data_col, )))); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 1a55216efe6..d439f142f31 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,7 +4,6 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::store::Error; @@ -19,7 +18,13 @@ use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, GossipVerifiedBlock, NotifyExecutionLayer, }; -use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; +use beacon_chain::{ + blob_verification::{GossipBlobError, GossipVerifiedBlob}, + data_availability_checker::DataColumnsToPublish, +}; +use lighthouse_network::{ + Client, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage, ReportSource, +}; use operation_pool::ReceivedPreCapella; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; @@ -165,6 +170,24 @@ impl NetworkBeaconProcessor { }) } + pub(crate) fn handle_data_columns_to_publish( + &self, + data_columns_to_publish: DataColumnsToPublish, + ) { + if let Some(data_columns_to_publish) = data_columns_to_publish { + self.send_network_message(NetworkMessage::Publish { + messages: data_columns_to_publish + .iter() + .map(|d| { + let subnet = + DataColumnSubnetId::from_column_index::(d.index as usize); + PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) + }) + .collect(), + }); + } + } + /// Send a message on `message_tx` that the `message_id` sent by `peer_id` should be propagated on /// the gossip network. /// @@ -890,24 +913,34 @@ impl NetworkBeaconProcessor { .process_gossip_data_column(verified_data_column) .await { - Ok(AvailabilityProcessingStatus::Imported(block_root)) => { - // Note: Reusing block imported metric here - metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); - info!( - self.log, - "Gossipsub data column processed, imported fully available block"; - "block_root" => %block_root - ); - self.chain.recompute_head_at_current_slot().await; - } - Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { - trace!( - self.log, - "Processed data column, waiting for other components"; - "slot" => %slot, - "data_column_index" => %data_column_index, - "block_root" => %block_root, - ); + Ok((availability, data_columns_to_publish)) => { + self.handle_data_columns_to_publish(data_columns_to_publish); + + match availability { + AvailabilityProcessingStatus::Imported(block_root) => { + // Note: Reusing block imported metric here + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL, + ); + info!( + self.log, + "Gossipsub data column processed, imported fully available block"; + "block_root" => %block_root + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { + trace!( + self.log, + "Processed data column, waiting for other components"; + "slot" => %slot, + "data_column_index" => %data_column_index, + "block_root" => %block_root, + ); + + // Potentially trigger reconstruction + } + } } Err(err) => { debug!( diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 2a9b42be38d..a487157caaf 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -316,21 +316,27 @@ impl NetworkBeaconProcessor { .await; match &result { - Ok(AvailabilityProcessingStatus::Imported(hash)) => { - debug!( - self.log, - "Block components retrieved"; - "result" => "imported block and custody columns", - "block_hash" => %hash, - ); - self.chain.recompute_head_at_current_slot().await; - } - Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - debug!( - self.log, - "Missing components over rpc"; - "block_hash" => %block_root, - ); + Ok((availability, data_columns_to_publish)) => { + self.handle_data_columns_to_publish(data_columns_to_publish.clone()); + + match availability { + AvailabilityProcessingStatus::Imported(hash) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + ); + } + } } Err(BlockError::BlockIsAlreadyKnown(_)) => { debug!( @@ -351,7 +357,7 @@ impl NetworkBeaconProcessor { self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: result.into(), + result: result.map(|(r, _)| r).into(), }); } diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 7924a32f7b5..88094b946b2 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -9,7 +9,7 @@ use bls::Signature; use derivative::Derivative; #[cfg_attr(test, double)] use kzg::Kzg; -use kzg::{Blob as KzgBlob, Error as KzgError}; +use kzg::{Blob as KzgBlob, Cell as KzgCell, Error as KzgError}; use kzg::{KzgCommitment, KzgProof}; use merkle_proof::MerkleTreeError; #[cfg(test)] @@ -169,6 +169,106 @@ impl DataColumnSidecar { Ok(sidecars) } + pub fn reconstruct(kzg: &Kzg, data_columns: &[Arc]) -> Result>, KzgError> { + let mut columns = + vec![Vec::with_capacity(E::max_blobs_per_block()); E::number_of_columns()]; + let mut column_kzg_proofs = + vec![Vec::with_capacity(E::max_blobs_per_block()); E::number_of_columns()]; + + let first_data_column = data_columns + .first() + .ok_or(KzgError::InconsistentArrayLength( + "data_columns should have at least one element".to_string(), + ))?; + let num_of_blobs = first_data_column.kzg_commitments.len(); + + for row_index in 0..num_of_blobs { + let mut cells: Vec = vec![]; + let mut cell_ids: Vec = vec![]; + for data_column in data_columns { + let cell = + data_column + .column + .get(row_index) + .ok_or(KzgError::InconsistentArrayLength(format!( + "Missing data column at index {row_index}" + )))?; + + cells.push(ssz_cell_to_crypto_cell::(cell)?); + cell_ids.push(data_column.index); + } + // recover_all_cells does not expect sorted + let all_cells = kzg.recover_all_cells(&cell_ids, &cells)?; + let blob = kzg.cells_to_blob(&all_cells)?; + + // Note: This function computes all cells and proofs. According to Justin this is okay, + // computing a partial set may be more expensive and requires code paths that don't exist. + // Computing the blobs cells is technically unnecessary but very cheap. It's done here again + // for simplicity. + let (blob_cells, blob_cell_proofs) = kzg.compute_cells_and_proofs(&blob)?; + + // we iterate over each column, and we construct the column from "top to bottom", + // pushing on the cell and the corresponding proof at each column index. we do this for + // each blob (i.e. the outer loop). + for col in 0..E::number_of_columns() { + let cell = blob_cells + .get(col) + .ok_or(KzgError::InconsistentArrayLength(format!( + "Missing blob cell at index {col}" + )))?; + let cell: Vec = cell + .into_inner() + .into_iter() + .flat_map(|data| (*data).into_iter()) + .collect(); + let cell = Cell::::from(cell); + + let proof = blob_cell_proofs + .get(col) + .ok_or(KzgError::InconsistentArrayLength(format!( + "Missing blob cell KZG proof at index {col}" + )))?; + + let column = columns + .get_mut(col) + .ok_or(KzgError::InconsistentArrayLength(format!( + "Missing data column at index {col}" + )))?; + let column_proofs = + column_kzg_proofs + .get_mut(col) + .ok_or(KzgError::InconsistentArrayLength(format!( + "Missing data column proofs at index {col}" + )))?; + + column.push(cell); + column_proofs.push(*proof); + } + } + + // Clone sidecar elements from existing data column, no need to re-compute + let kzg_commitments = &first_data_column.kzg_commitments; + let signed_block_header = &first_data_column.signed_block_header; + let kzg_commitments_inclusion_proof = &first_data_column.kzg_commitments_inclusion_proof; + + let sidecars: Vec>> = columns + .into_iter() + .zip(column_kzg_proofs) + .enumerate() + .map(|(index, (col, proofs))| { + Arc::new(DataColumnSidecar { + index: index as u64, + column: DataColumn::::from(col), + kzg_commitments: kzg_commitments.clone(), + kzg_proofs: KzgProofs::::from(proofs), + signed_block_header: signed_block_header.clone(), + kzg_commitments_inclusion_proof: kzg_commitments_inclusion_proof.clone(), + }) + }) + .collect(); + Ok(sidecars) + } + pub fn min_size() -> usize { // min size is one cell Self { @@ -272,6 +372,12 @@ pub type DataColumnSidecarList = pub type FixedDataColumnSidecarList = FixedVector>>, ::DataColumnCount>; +/// Converts a cell ssz List object to an array to be used with the kzg +/// crypto library. +fn ssz_cell_to_crypto_cell(cell: &Cell) -> Result { + KzgCell::from_bytes(cell.as_ref()).map_err(Into::into) +} + #[cfg(test)] mod test { use super::*; diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 79b427ee63f..787a0db332c 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -41,9 +41,12 @@ impl DataColumnSubnetId { id.into() } - pub fn try_from_column_index(column_index: usize) -> Result { - let id = column_index.safe_rem(E::data_column_subnet_count())? as u64; - Ok(id.into()) + pub fn from_column_index(column_index: usize) -> Self { + (column_index + .safe_rem(E::data_column_subnet_count()) + .expect("data_column_subnet_count should never be zero if this function is called") + as u64) + .into() } #[allow(clippy::arithmetic_side_effects)] diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index b7d4c13a7c4..1c4bca05166 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -22,6 +22,8 @@ pub enum Error { Kzg(c_kzg::Error), /// The kzg verification failed KzgVerificationFailed, + /// Misc indexing error + InconsistentArrayLength(String), } impl From for Error { @@ -44,6 +46,7 @@ impl Kzg { trusted_setup: KzgSettings::load_trusted_setup( &trusted_setup.g1_points(), &trusted_setup.g2_points(), + 0, )?, }) } @@ -190,6 +193,22 @@ impl Kzg { Ok(()) } } + + pub fn cells_to_blob(&self, cells: &[Cell; c_kzg::CELLS_PER_EXT_BLOB]) -> Result { + Ok(Blob::cells_to_blob(cells)?) + } + + pub fn recover_all_cells( + &self, + cell_ids: &[u64], + cells: &[Cell], + ) -> Result, Error> { + Ok(c_kzg::Cell::recover_all_cells( + cell_ids, + cells, + &self.trusted_setup, + )?) + } } pub mod mock {