diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b8b48d27799..704c99a4c26 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7,9 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; -use crate::blob_verification::{ - GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar, -}; +use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::POS_PANDA_BANNER; use crate::block_verification::{ @@ -25,6 +23,7 @@ use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, }; +use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; @@ -2092,10 +2091,10 @@ impl BeaconChain { self: &Arc, data_column_sidecar: Arc>, subnet_id: u64, - ) -> Result, GossipBlobError> { + ) -> Result, GossipDataColumnError> { metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); - GossipVerifiedDataColumnSidecar::new(data_column_sidecar, subnet_id, self).map(|v| { + GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| { metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES); v }) @@ -2916,18 +2915,28 @@ impl BeaconChain { self.remove_notified(&block_root, r) } - pub fn process_gossip_data_column( + /// Cache the data column in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_gossip_data_column( self: &Arc, - gossip_verified_data_column: GossipVerifiedDataColumnSidecar, - ) { - let data_column = gossip_verified_data_column.as_data_column(); - // TODO(das) send to DA checker - info!( - self.log, - "Processed gossip data column"; - "index" => data_column.index, - "slot" => data_column.slot().as_u64() - ); + data_column: GossipVerifiedDataColumn, + ) -> Result> { + let block_root = data_column.block_root(); + + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its samples again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown); + } + + let r = self + .check_gossip_data_column_availability_and_import(data_column) + .await; + self.remove_notified(&block_root, r) } /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was @@ -3212,6 +3221,23 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided data column can make any cached blocks available, and imports immediately + /// if so, otherwise caches the blob in the data availability checker. + async fn check_gossip_data_column_availability_and_import( + self: &Arc, + data_column: GossipVerifiedDataColumn, + ) -> Result> { + let slot = data_column.slot(); + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(data_column.signed_block_header()); + } + let availability = self + .data_availability_checker + .put_gossip_data_column(data_column)?; + + self.process_availability(slot, availability).await + } + /// Checks if the provided blobs can make any cached blocks available, and imports immediately /// if so, otherwise caches the blob in the data availability checker. async fn check_rpc_blob_availability_and_import( diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index ffc64f9d10e..f2d150d72bf 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -17,8 +17,7 @@ use ssz_types::VariableList; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconStateError, BlobSidecar, CloneConfig, DataColumnSidecar, EthSpec, Hash256, - SignedBeaconBlockHeader, Slot, + BeaconStateError, BlobSidecar, CloneConfig, EthSpec, Hash256, SignedBeaconBlockHeader, Slot, }; /// An error occurred while validating a gossip blob. @@ -185,33 +184,6 @@ pub type GossipVerifiedBlobList = VariableList< <::EthSpec as EthSpec>::MaxBlobsPerBlock, >; -#[derive(Debug)] -pub struct GossipVerifiedDataColumnSidecar { - data_column_sidecar: Arc>, -} - -impl GossipVerifiedDataColumnSidecar { - pub fn new( - column_sidecar: Arc>, - subnet_id: u64, - chain: &BeaconChain, - ) -> Result> { - let header = column_sidecar.signed_block_header.clone(); - // We only process slashing info if the gossip verification failed - // since we do not process the blob any further in that case. - validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { - process_block_slash_info::<_, GossipBlobError>( - chain, - BlockSlashInfo::from_early_error_blob(header, e), - ) - }) - } - - pub fn as_data_column(&self) -> &Arc> { - &self.data_column_sidecar - } -} - /// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on /// the p2p network. #[derive(Debug)] @@ -675,17 +647,6 @@ pub fn validate_blob_sidecar_for_gossip( }) } -pub fn validate_data_column_sidecar_for_gossip( - data_column_sidecar: Arc>, - _subnet: u64, - _chain: &BeaconChain, -) -> Result, GossipBlobError> { - // TODO(das): validate kzg commitments, cell proofs etc - Ok(GossipVerifiedDataColumnSidecar { - data_column_sidecar: data_column_sidecar.clone(), - }) -} - /// Returns the canonical root of the given `blob`. /// /// Use this function to ensure that we report the blob hashing time Prometheus metric. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index e8df5b811ed..34bd83a706d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -53,6 +53,7 @@ use crate::block_verification_types::{ AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock, }; use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; +use crate::data_column_verification::GossipDataColumnError; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -528,6 +529,20 @@ impl BlockSlashInfo> { } } +impl BlockSlashInfo> { + pub fn from_early_error_data_column( + header: SignedBeaconBlockHeader, + e: GossipDataColumnError, + ) -> Self { + match e { + GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e), + // `InvalidSignature` could indicate any signature in the block, so we want + // to recheck the proposer signature alone. + _ => BlockSlashInfo::SignatureNotChecked(header, e), + } + } +} + /// Process invalid blocks to see if they are suitable for the slasher. /// /// If no slasher is configured, this is a no-op. @@ -1985,6 +2000,23 @@ impl BlockBlobError for GossipBlobError { } } +impl BlockBlobError for GossipDataColumnError { + fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self { + GossipDataColumnError::DataColumnIsNotLaterThanParent { + data_column_slot, + parent_slot, + } + } + + fn unknown_validator_error(validator_index: u64) -> Self { + GossipDataColumnError::UnknownValidator(validator_index) + } + + fn proposer_signature_invalid() -> Self { + GossipDataColumnError::ProposalSignatureInvalid + } +} + /// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for /// `slot` can be obtained from `state`. /// diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 48d505e9e7b..2d11243ca9f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -22,7 +22,9 @@ use std::sync::Arc; use task_executor::TaskExecutor; use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; -use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{ + BlobSidecarList, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, +}; mod availability_view; mod child_components; @@ -31,7 +33,9 @@ mod overflow_lru_cache; mod processing_cache; mod state_lru_cache; +use crate::data_column_verification::GossipVerifiedDataColumn; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; +use types::data_column_sidecar::DataColumnIdentifier; use types::non_zero_usize::new_non_zero_usize; /// The LRU Cache stores `PendingComponents` which can store up to @@ -192,6 +196,14 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } + /// Get a data column from the availability cache. + pub fn get_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + self.availability_cache.peek_data_column(data_column_id) + } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( @@ -223,6 +235,21 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()]) } + /// Check if we've cached other data columns for this block. If it completes a set and we also + /// have a block cached, return the `Availability` variant triggering block import. + /// Otherwise cache the data column sidecar. + /// + /// This should only accept gossip verified data columns, so we should not have to worry about dupes. + pub fn put_gossip_data_column( + &self, + gossip_data_column: GossipVerifiedDataColumn, + ) -> Result, AvailabilityCheckError> { + self.availability_cache.put_kzg_verified_data_columns( + gossip_data_column.block_root(), + vec![gossip_data_column.into_inner()], + ) + } + /// Check if we have all the blobs for a block. Returns `Availability` which has information /// about whether all components have been received or more are required. pub fn put_pending_executed_block( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs index 776f81ee545..b0879ae60b4 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs @@ -4,11 +4,12 @@ use crate::blob_verification::KzgVerifiedBlob; use crate::block_verification_types::AsBlock; use crate::data_availability_checker::overflow_lru_cache::PendingComponents; use crate::data_availability_checker::ProcessingComponents; +use crate::data_column_verification::KzgVerifiedDataColumn; use kzg::KzgCommitment; use ssz_types::FixedVector; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; /// Defines an interface for managing data availability with two key invariants: /// @@ -26,12 +27,20 @@ pub trait AvailabilityView { /// The type representing a blob in the implementation. Must implement `Clone`. type BlobType: Clone + GetCommitment; + /// The type representing a data column in the implementation. + type DataColumnType: Clone; + /// Returns an immutable reference to the cached block. fn get_cached_block(&self) -> &Option; /// Returns an immutable reference to the fixed vector of cached blobs. fn get_cached_blobs(&self) -> &FixedVector, E::MaxBlobsPerBlock>; + /// Returns an immutable reference to the fixed vector of cached data columns. + fn get_cached_data_columns( + &self, + ) -> &FixedVector, E::DataColumnCount>; + /// Returns a mutable reference to the cached block. fn get_cached_block_mut(&mut self) -> &mut Option; @@ -40,6 +49,11 @@ pub trait AvailabilityView { &mut self, ) -> &mut FixedVector, E::MaxBlobsPerBlock>; + /// Returns a mutable reference to the fixed vector of cached data columns. + fn get_cached_data_columns_mut( + &mut self, + ) -> &mut FixedVector, E::DataColumnCount>; + /// Checks if a block exists in the cache. /// /// Returns: @@ -61,6 +75,18 @@ pub trait AvailabilityView { .unwrap_or(false) } + /// Checks if a data column exists at the given index in the cache. + /// + /// Returns: + /// - `true` if a data column exists at the given index. + /// - `false` otherwise. + fn data_column_exists(&self, data_colum_index: usize) -> bool { + self.get_cached_data_columns() + .get(data_colum_index) + .map(|d| d.is_some()) + .unwrap_or(false) + } + /// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a /// block. /// @@ -90,6 +116,42 @@ pub trait AvailabilityView { } } + /// Inserts a data column at a specific index in the cache. + /// + /// Existing data column at the index will be replaced. + fn insert_data_column_at_index( + &mut self, + data_column_index: usize, + data_column: Self::DataColumnType, + ) { + if let Some(b) = self + .get_cached_data_columns_mut() + .get_mut(data_column_index) + { + *b = Some(data_column); + } + } + + /// Merges a given set of data columns into the cache. + /// + /// Data columns are only inserted if: + /// 1. The data column entry at the index is empty and no block exists. + /// 2. The block exists and its commitment matches the blob's commitment. + fn merge_data_columns( + &mut self, + data_columns: FixedVector, E::DataColumnCount>, + ) { + for (index, data_column) in data_columns.iter().cloned().enumerate() { + let Some(data_column) = data_column else { + continue; + }; + // TODO(das): Add equivalent checks for data columns if necessary + if !self.data_column_exists(index) { + self.insert_data_column_at_index(index, data_column) + } + } + } + /// Merges a given set of blobs into the cache. /// /// Blobs are only inserted if: @@ -148,14 +210,16 @@ pub trait AvailabilityView { /// - `$struct_name`: The name of the struct for which to implement `AvailabilityView`. /// - `$block_type`: The type to use for `BlockType` in the `AvailabilityView` trait. /// - `$blob_type`: The type to use for `BlobType` in the `AvailabilityView` trait. +/// - `$data_column_type`: The type to use for `DataColumnType` in the `AvailabilityView` trait. /// - `$block_field`: The field name in the struct that holds the cached block. -/// - `$blob_field`: The field name in the struct that holds the cached blobs. +/// - `$data_column_field`: The field name in the struct that holds the cached data columns. #[macro_export] macro_rules! impl_availability_view { - ($struct_name:ident, $block_type:ty, $blob_type:ty, $block_field:ident, $blob_field:ident) => { + ($struct_name:ident, $block_type:ty, $blob_type:ty, $data_column_type:ty, $block_field:ident, $blob_field:ident, $data_column_field:ident) => { impl AvailabilityView for $struct_name { type BlockType = $block_type; type BlobType = $blob_type; + type DataColumnType = $data_column_type; fn get_cached_block(&self) -> &Option { &self.$block_field @@ -167,6 +231,12 @@ macro_rules! impl_availability_view { &self.$blob_field } + fn get_cached_data_columns( + &self, + ) -> &FixedVector, E::DataColumnCount> { + &self.$data_column_field + } + fn get_cached_block_mut(&mut self) -> &mut Option { &mut self.$block_field } @@ -176,6 +246,12 @@ macro_rules! impl_availability_view { ) -> &mut FixedVector, E::MaxBlobsPerBlock> { &mut self.$blob_field } + + fn get_cached_data_columns_mut( + &mut self, + ) -> &mut FixedVector, E::DataColumnCount> { + &mut self.$data_column_field + } } }; } @@ -184,24 +260,30 @@ impl_availability_view!( ProcessingComponents, KzgCommitments, KzgCommitment, + (), block_commitments, - blob_commitments + blob_commitments, + data_column_opts ); impl_availability_view!( PendingComponents, DietAvailabilityPendingExecutedBlock, KzgVerifiedBlob, + KzgVerifiedDataColumn, executed_block, - verified_blobs + verified_blobs, + verified_data_columns ); impl_availability_view!( ChildComponents, Arc>, Arc>, + Arc>, downloaded_block, - downloaded_blobs + downloaded_blobs, + downloaded_data_columns ); pub trait GetCommitments { @@ -253,6 +335,7 @@ impl GetCommitments for Arc> { .unwrap_or_default() } } + impl GetCommitment for Arc> { fn get_commitment(&self) -> &KzgCommitment { &self.kzg_commitment diff --git a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs index 028bf9d67c8..d04a2db0744 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs @@ -3,6 +3,7 @@ use crate::data_availability_checker::AvailabilityView; use bls::Hash256; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; +use types::data_column_sidecar::FixedDataColumnSidecarList; use types::{EthSpec, SignedBeaconBlock}; /// For requests triggered by an `UnknownBlockParent` or `UnknownBlobParent`, this struct @@ -13,6 +14,7 @@ pub struct ChildComponents { pub block_root: Hash256, pub downloaded_block: Option>>, pub downloaded_blobs: FixedBlobSidecarList, + pub downloaded_data_columns: FixedDataColumnSidecarList, } impl From> for ChildComponents { @@ -31,6 +33,7 @@ impl ChildComponents { block_root, downloaded_block: None, downloaded_blobs: <_>::default(), + downloaded_data_columns: <_>::default(), } } pub fn new( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 0804fe3b9ab..9e52b34185f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -14,6 +14,7 @@ pub enum Error { SszTypes(ssz_types::Error), MissingBlobs, BlobIndexInvalid(u64), + DataColumnIndexInvalid(u64), StoreError(store::Error), DecodeError(ssz::DecodeError), ParentStateMissing(Hash256), @@ -42,6 +43,7 @@ impl Error { | Error::RebuildingStateCaches(_) => ErrorCategory::Internal, Error::Kzg(_) | Error::BlobIndexInvalid(_) + | Error::DataColumnIndexInvalid(_) | Error::KzgCommitmentMismatch { .. } | Error::KzgVerificationFailed => ErrorCategory::Malicious, } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 34c9bc76f6e..c226923e0ac 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -35,6 +35,7 @@ use crate::block_verification_types::{ }; use crate::data_availability_checker::availability_view::AvailabilityView; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; +use crate::data_column_verification::KzgVerifiedDataColumn; use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; @@ -45,7 +46,8 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256}; +use types::data_column_sidecar::DataColumnIdentifier; +use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; /// This represents the components of a partially available block /// @@ -55,6 +57,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, + pub verified_data_columns: FixedVector>, T::DataColumnCount>, pub executed_block: Option>, } @@ -63,6 +66,7 @@ impl PendingComponents { Self { block_root, verified_blobs: FixedVector::default(), + verified_data_columns: FixedVector::default(), executed_block: None, } } @@ -82,6 +86,7 @@ impl PendingComponents { let Self { block_root, verified_blobs, + verified_data_columns: _verified_data_columns, executed_block, } = self; @@ -133,6 +138,16 @@ impl PendingComponents { }); } } + for maybe_data_column in self.verified_data_columns.iter() { + if maybe_data_column.is_some() { + return maybe_data_column.as_ref().map(|kzg_verified_data_column| { + kzg_verified_data_column + .as_data_column() + .slot() + .epoch(T::slots_per_epoch()) + }); + } + } None }) } @@ -144,6 +159,7 @@ impl PendingComponents { enum OverflowKey { Block(Hash256), Blob(Hash256, u8), + DataColumn(Hash256, u8), } impl OverflowKey { @@ -160,10 +176,27 @@ impl OverflowKey { Ok(Self::Blob(blob_id.block_root, blob_id.index as u8)) } + pub fn from_data_column_id( + data_column_id: DataColumnIdentifier, + ) -> Result { + if data_column_id.index >= E::number_of_columns() as u64 + || data_column_id.index > u8::MAX as u64 + { + return Err(AvailabilityCheckError::DataColumnIndexInvalid( + data_column_id.index, + )); + } + Ok(Self::DataColumn( + data_column_id.block_root, + data_column_id.index as u8, + )) + } + pub fn root(&self) -> &Hash256 { match self { Self::Block(root) => root, Self::Blob(root, _) => root, + Self::DataColumn(root, _) => root, } } } @@ -203,6 +236,22 @@ impl OverflowStore { .put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())? } + for data_column in Vec::from(pending_components.verified_data_columns) + .into_iter() + .flatten() + { + let key = OverflowKey::from_data_column_id::(DataColumnIdentifier { + block_root, + index: data_column.data_column_index(), + })?; + + self.0.hot_db.put_bytes( + col.as_str(), + &key.as_ssz_bytes(), + &data_column.as_ssz_bytes(), + )? + } + Ok(()) } @@ -236,6 +285,16 @@ impl OverflowStore { .ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? = Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?); } + OverflowKey::DataColumn(_, index) => { + *maybe_pending_components + .get_or_insert_with(|| PendingComponents::empty(block_root)) + .verified_data_columns + .get_mut(index as usize) + .ok_or(AvailabilityCheckError::DataColumnIndexInvalid(index as u64))? = + Some(KzgVerifiedDataColumn::from_ssz_bytes( + value_bytes.as_slice(), + )?); + } } } @@ -267,6 +326,23 @@ impl OverflowStore { .map_err(|e| e.into()) } + /// Load a single data column from the database + pub fn load_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + let key = OverflowKey::from_data_column_id::(*data_column_id)?; + + self.0 + .hot_db + .get_bytes(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())? + .map(|data_column_bytes| { + Arc::>::from_ssz_bytes(data_column_bytes.as_slice()) + }) + .transpose() + .map_err(|e| e.into()) + } + /// Delete a set of keys from the database pub fn delete_keys(&self, keys: &Vec) -> Result<(), AvailabilityCheckError> { for key in keys { @@ -322,6 +398,25 @@ impl Critical { } } + /// This only checks for the data columns in memory + pub fn peek_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) { + Ok(pending_components + .verified_data_columns + .get(data_column_id.index as usize) + .ok_or(AvailabilityCheckError::DataColumnIndexInvalid( + data_column_id.index, + ))? + .as_ref() + .map(|data_column| data_column.clone_data_column())) + } else { + Ok(None) + } + } + /// Puts the pending components in the LRU cache. If the cache /// is at capacity, the LRU entry is written to the store first pub fn put_pending_components( @@ -415,6 +510,55 @@ impl OverflowLRUCache { } } + /// Fetch a data column from the cache without affecting the LRU ordering + pub fn peek_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + let read_lock = self.critical.read(); + if let Some(data_column) = read_lock.peek_data_column(data_column_id)? { + Ok(Some(data_column)) + } else if read_lock.store_keys.contains(&data_column_id.block_root) { + drop(read_lock); + self.overflow_store.load_data_column(data_column_id) + } else { + Ok(None) + } + } + + pub fn put_kzg_verified_data_columns< + I: IntoIterator>, + >( + &self, + block_root: Hash256, + kzg_verified_data_columns: I, + ) -> Result, AvailabilityCheckError> { + let mut fixed_data_columns = FixedVector::default(); + + for data_column in kzg_verified_data_columns { + if let Some(data_column_opt) = + fixed_data_columns.get_mut(data_column.data_column_index() as usize) + { + *data_column_opt = Some(data_column); + } + } + + let mut write_lock = self.critical.write(); + + // Grab existing entry or create a new entry. + let mut pending_components = write_lock + .pop_pending_components(block_root, &self.overflow_store)? + .unwrap_or_else(|| PendingComponents::empty(block_root)); + + // Merge in the data columns. + pending_components.merge_data_columns(fixed_data_columns); + + write_lock.put_pending_components(block_root, pending_components, &self.overflow_store)?; + + // TODO(das): Currently this does not change availability status and nor import yet. + Ok(Availability::MissingComponents(block_root)) + } + pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, @@ -650,6 +794,14 @@ impl OverflowLRUCache { .slot() .epoch(T::EthSpec::slots_per_epoch()) } + OverflowKey::DataColumn(_, _) => { + KzgVerifiedDataColumn::::from_ssz_bytes( + value_bytes.as_slice(), + )? + .as_data_column() + .slot() + .epoch(T::EthSpec::slots_per_epoch()) + } }; current_block_data = Some(BlockData { keys: vec![overflow_key], @@ -688,6 +840,10 @@ impl ssz::Encode for OverflowKey { block_hash.ssz_append(buf); buf.push(*index + 1) } + OverflowKey::DataColumn(block_hash, index) => { + block_hash.ssz_append(buf); + buf.push(*index + 1) + } } } @@ -699,6 +855,7 @@ impl ssz::Encode for OverflowKey { match self { Self::Block(root) => root.ssz_bytes_len() + 1, Self::Blob(root, _) => root.ssz_bytes_len() + 1, + Self::DataColumn(root, _) => root.ssz_bytes_len() + 1, } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs index 969034c6570..bebc2a5d8b5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs @@ -1,4 +1,5 @@ use crate::data_availability_checker::AvailabilityView; +use ssz_types::FixedVector; use std::collections::hash_map::Entry; use std::collections::HashMap; use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments}; @@ -49,6 +50,9 @@ pub struct ProcessingComponents { /// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See /// `AvailabilityView`'s trait definition for more details. pub blob_commitments: KzgCommitmentOpts, + // TODO(das): `KzgCommitments` are available in every data column sidecar, hence it may not be useful to store them + // again here and a `()` may be sufficient. + pub data_column_opts: FixedVector, E::DataColumnCount>, } impl ProcessingComponents { @@ -57,6 +61,7 @@ impl ProcessingComponents { slot, block_commitments: None, blob_commitments: KzgCommitmentOpts::::default(), + data_column_opts: FixedVector::default(), } } } @@ -69,6 +74,7 @@ impl ProcessingComponents { slot: Slot::new(0), block_commitments: None, blob_commitments: KzgCommitmentOpts::::default(), + data_column_opts: FixedVector::default(), } } } diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs new file mode 100644 index 00000000000..727584f5a76 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -0,0 +1,199 @@ +use crate::block_verification::{process_block_slash_info, BlockSlashInfo}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use derivative::Derivative; +use kzg::{Error as KzgError, Kzg}; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; +use types::{BeaconStateError, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot}; + +/// An error occurred while validating a gossip data column. +#[derive(Debug)] +pub enum GossipDataColumnError { + /// There was an error whilst processing the data column. It is not known if it is + /// valid or invalid. + /// + /// ## Peer scoring + /// + /// We were unable to process this blob due to an internal error. It's + /// unclear if the data column is valid. + BeaconChainError(BeaconChainError), + + /// The proposal signature in invalid. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + ProposalSignatureInvalid, + + /// The proposal_index corresponding to data column.beacon_block_root is not known. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + UnknownValidator(u64), + + /// The provided data column is not from a later slot than its parent. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + DataColumnIsNotLaterThanParent { + data_column_slot: Slot, + parent_slot: Slot, + }, + + /// `Kzg` struct hasn't been initialized. This is an internal error. + /// + /// ## Peer scoring + /// + /// The peer isn't faulty, This is an internal error. + KzgNotInitialized, + + /// The kzg verification failed. + /// + /// ## Peer scoring + /// + /// The data column sidecar is invalid and the peer is faulty. + KzgError(kzg::Error), + + /// The provided data column's parent block is unknown. + /// + /// ## Peer scoring + /// + /// We cannot process the data column without validating its parent, the peer isn't necessarily faulty. + DataColumnParentUnknown(Arc>), +} + +impl From for GossipDataColumnError { + fn from(e: BeaconChainError) -> Self { + GossipDataColumnError::BeaconChainError(e) + } +} + +impl From for GossipDataColumnError { + fn from(e: BeaconStateError) -> Self { + GossipDataColumnError::BeaconChainError(BeaconChainError::BeaconStateError(e)) + } +} + +/// A wrapper around a `DataColumnSidecar` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Debug)] +pub struct GossipVerifiedDataColumn { + block_root: Hash256, + data_column: KzgVerifiedDataColumn, +} + +impl GossipVerifiedDataColumn { + pub fn new( + column_sidecar: Arc>, + subnet_id: u64, + chain: &BeaconChain, + ) -> Result> { + let header = column_sidecar.signed_block_header.clone(); + // We only process slashing info if the gossip verification failed + // since we do not process the data column any further in that case. + validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { + process_block_slash_info::<_, GossipDataColumnError>( + chain, + BlockSlashInfo::from_early_error_data_column(header, e), + ) + }) + } + + pub fn id(&self) -> DataColumnIdentifier { + DataColumnIdentifier { + block_root: self.block_root, + index: self.data_column.data_column_index(), + } + } + + pub fn as_data_column(&self) -> &DataColumnSidecar { + self.data_column.as_data_column() + } + + pub fn block_root(&self) -> Hash256 { + self.block_root + } + + pub fn slot(&self) -> Slot { + self.data_column.data_column.slot() + } + + pub fn index(&self) -> ColumnIndex { + self.data_column.data_column.index + } + + pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.data_column.data_column.signed_block_header.clone() + } + + pub fn into_inner(self) -> KzgVerifiedDataColumn { + self.data_column + } +} + +/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification. +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] +pub struct KzgVerifiedDataColumn { + data_column: Arc>, +} + +impl KzgVerifiedDataColumn { + pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + verify_kzg_for_data_column(data_column, kzg) + } + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data_column + } + /// This is cheap as we're calling clone on an Arc + pub fn clone_data_column(&self) -> Arc> { + self.data_column.clone() + } + + pub fn data_column_index(&self) -> u64 { + self.data_column.index + } +} + +/// Complete kzg verification for a `DataColumnSidecar`. +/// +/// Returns an error if the kzg verification check fails. +pub fn verify_kzg_for_data_column( + data_column: Arc>, + _kzg: &Kzg, +) -> Result, KzgError> { + // TODO(das): validate data column + // validate_blob::( + // kzg, + // &data_column.blob, + // data_column.kzg_commitment, + // data_column.kzg_proof, + // )?; + Ok(KzgVerifiedDataColumn { data_column }) +} + +pub fn validate_data_column_sidecar_for_gossip( + data_column: Arc>, + _subnet: u64, + chain: &BeaconChain, +) -> Result, GossipDataColumnError> { + // TODO(das): validate gossip rules + let block_root = data_column.block_root(); + + // Kzg verification for gossip data column sidecar + let kzg = chain + .kzg + .as_ref() + .ok_or(GossipDataColumnError::KzgNotInitialized)?; + let kzg_verified_data_column = + KzgVerifiedDataColumn::new(data_column, kzg).map_err(GossipDataColumnError::KzgError)?; + + Ok(GossipVerifiedDataColumn { + block_root, + data_column: kzg_verified_data_column, + }) +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 522009b1b27..20023b2f299 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -18,6 +18,7 @@ pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; pub mod data_availability_checker; +pub mod data_column_verification; pub mod deneb_readiness; mod early_attester_cache; mod errors; diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index fb76f2a2677..62a1216f13b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,10 +4,9 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{ - GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar, -}; +use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, @@ -869,12 +868,59 @@ impl NetworkBeaconProcessor { pub async fn process_gossip_verified_data_column( self: &Arc, - _peer_id: PeerId, - verified_data_column: GossipVerifiedDataColumnSidecar, + peer_id: PeerId, + verified_data_column: GossipVerifiedDataColumn, // This value is not used presently, but it might come in handy for debugging. _seen_duration: Duration, ) { - self.chain.process_gossip_data_column(verified_data_column); + let block_root = verified_data_column.block_root(); + let data_column_slot = verified_data_column.slot(); + let data_column_index = verified_data_column.id().index; + + match self + .chain + .process_gossip_data_column(verified_data_column) + .await + { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + // Note: Reusing block imported metric here + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + info!( + self.log, + "Gossipsub data column processed, imported fully available block"; + "block_root" => %block_root + ); + self.chain.recompute_head_at_current_slot().await; + } + Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + trace!( + self.log, + "Processed data column, waiting for other components"; + "slot" => %slot, + "data_column_index" => %data_column_index, + "block_root" => %block_root, + ); + } + Err(err) => { + debug!( + self.log, + "Invalid gossip data column"; + "outcome" => ?err, + "block root" => ?block_root, + "block slot" => data_column_slot, + "data column index" => data_column_index, + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_data_column_ssz", + ); + trace!( + self.log, + "Invalid gossip data column ssz"; + ); + } + } } /// Process the beacon block received from the gossip network and: diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index e0afc3b6e81..c6b23cbc031 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -299,12 +299,58 @@ impl NetworkBeaconProcessor { /// Handle a `DataColumnsByRoot` request from the peer. pub fn handle_data_columns_by_root_request( self: Arc, - _peer_id: PeerId, - _request_id: PeerRequestId, - _request: DataColumnsByRootRequest, + peer_id: PeerId, + request_id: PeerRequestId, + request: DataColumnsByRootRequest, ) { - // TODO(das): handle DataColumnsByRoot requests - unimplemented!() + let Some(requested_root) = request + .data_column_ids + .as_slice() + .first() + .map(|id| id.block_root) + else { + // No data column ids requested. + return; + }; + let requested_indices = request + .data_column_ids + .as_slice() + .iter() + .map(|id| id.index) + .collect::>(); + let mut send_data_column_count = 0; + + for id in request.data_column_ids.as_slice() { + // Attempt to get the data columns from the RPC cache. + // TODO(das): we have not yet implemented importing data columns yet, so it's not available on the beacon chain. + if let Ok(Some(data_column)) = self.chain.data_availability_checker.get_data_column(id) + { + self.send_response( + peer_id, + Response::DataColumnsByRoot(Some(data_column)), + request_id, + ); + send_data_column_count += 1; + } else { + debug!( + self.log, + "Error fetching data column for peer"; + "peer" => %peer_id, + "request_root" => ?id.block_root, + ); + } + } + debug!( + self.log, + "Received DataColumnsByRoot Request"; + "peer" => %peer_id, + "request_root" => %requested_root, + "request_indices" => ?requested_indices, + "returned" => send_data_column_count + ); + + // send stream termination + self.send_response(peer_id, Response::DataColumnsByRoot(None), request_id); } /// Handle a `BlocksByRoot` request from the peer. diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 6a8bf3078a9..ced521426da 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -651,7 +651,7 @@ impl Router { _request_id: RequestId, _data_column_sidecar: Option>>, ) { - // TODO(das) implement handling of DataColumnsByRoot response + // TODO(das) implement `DataColumnsByRoot` response handling unimplemented!() } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 8c60621f1c7..989bfab00f0 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -179,11 +179,13 @@ impl SingleBlockLookup { block_root: _, downloaded_block, downloaded_blobs, + downloaded_data_columns, } = components; if let Some(block) = downloaded_block { existing_components.merge_block(block); } existing_components.merge_blobs(downloaded_blobs); + existing_components.merge_data_columns(downloaded_data_columns); } else { self.child_components = Some(components); } diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 3330aa0b622..37dde1a7623 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -15,6 +15,7 @@ use ssz_derive::{Decode, Encode}; use ssz_types::typenum::Unsigned; use ssz_types::Error as SszError; use ssz_types::{FixedVector, VariableList}; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; @@ -136,6 +137,13 @@ impl DataColumnSidecar { Ok(column) } + pub fn id(&self) -> DataColumnIdentifier { + DataColumnIdentifier { + block_root: self.block_root(), + index: self.index, + } + } + pub fn slot(&self) -> Slot { self.signed_block_header.message.slot } @@ -208,6 +216,9 @@ impl From for DataColumnSidecarError { } } +pub type FixedDataColumnSidecarList = + FixedVector>>, ::DataColumnCount>; + #[cfg(test)] mod test { use crate::beacon_block::EmptyBlock;