Skip to content

Commit

Permalink
Implement unconditional reconstruction for supernodes (#5781)
Browse files Browse the repository at this point in the history
* Implement unconditional reconstruction for supernodes

* Move code into KzgVerifiedCustodyDataColumn

* Remove expect

* Add test

* Thanks justin
  • Loading branch information
dapplion authored May 15, 2024
1 parent c356c2e commit 562e9d0
Show file tree
Hide file tree
Showing 15 changed files with 460 additions and 80 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ bytes = "1"
# TODO(das): switch to c-kzg crate before merging back to unstable (and disable default-features) if possible
# Turn off c-kzg's default features which include `blst/portable`. We can turn on blst's portable
# feature ourselves when desired.
c-kzg = { git = "https://github.com/ethereum/c-kzg-4844", branch = "das" }
c-kzg = { git = "https://github.com/ethereum/c-kzg-4844", rev = "114fa0382990e9b74b1f90f3b0dc5f97c2f8a7ad" }
clap = "2"
compare_fields_derive = { path = "common/compare_fields_derive" }
criterion = "0.3"
Expand Down
66 changes: 56 additions & 10 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnsToPublish,
};
use crate::data_column_verification::{
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn,
Expand Down Expand Up @@ -3028,7 +3029,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_data_column(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
let block_root = data_column.block_root();

// If this block has already been imported to forkchoice it must have been available, so
Expand All @@ -3044,7 +3051,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let r = self
.check_gossip_data_column_availability_and_import(data_column)
.await;
self.remove_notified(&block_root, r)
self.remove_notified_custody_columns(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3087,7 +3094,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its columns again.
if self
Expand All @@ -3109,7 +3122,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified(&block_root, r)
self.remove_notified_custody_columns(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
Expand All @@ -3127,6 +3140,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
r
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns<P>(
&self,
block_root: &Hash256,
r: Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>>,
) -> Result<(AvailabilityProcessingStatus, P), BlockError<T::EthSpec>> {
let has_missing_components = matches!(
r,
Ok((AvailabilityProcessingStatus::MissingComponents(_, _), _))
);
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
Expand Down Expand Up @@ -3362,16 +3392,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_gossip_data_column_availability_and_import(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
let slot = data_column.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(data_column.signed_block_header());
}
let availability = self
let (availability, data_columns_to_publish) = self
.data_availability_checker
.put_gossip_data_column(data_column)?;

self.process_availability(slot, availability).await
self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
Expand Down Expand Up @@ -3419,7 +3457,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<
(
AvailabilityProcessingStatus,
DataColumnsToPublish<T::EthSpec>,
),
BlockError<T::EthSpec>,
> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
Expand All @@ -3443,11 +3487,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
let availability = self
let (availability, data_columns_to_publish) = self
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability).await
self.process_availability(slot, availability)
.await
.map(|result| (result, data_columns_to_publish))
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
Expand Down
7 changes: 1 addition & 6 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,12 +799,7 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
let mut gossip_verified_data_columns = vec![];
for sidecar in sidecars {
let subnet =
DataColumnSubnetId::try_from_column_index::<T::EthSpec>(sidecar.index as usize)
.map_err(|_| {
BlockContentsError::<T::EthSpec>::DataColumnSidecarError(
DataColumnSidecarError::DataColumnIndexOutOfBounds,
)
})?;
DataColumnSubnetId::from_column_index::<T::EthSpec>(sidecar.index as usize);
let column = GossipVerifiedDataColumn::new(sidecar, subnet.into(), chain)?;
gossip_verified_data_columns.push(column);
}
Expand Down
22 changes: 17 additions & 5 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCh
use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList};
use types::non_zero_usize::new_non_zero_usize;

pub use self::overflow_lru_cache::DataColumnsToPublish;

/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
Expand Down Expand Up @@ -187,11 +189,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
#[allow(clippy::type_complexity)]
pub fn put_rpc_custody_columns(
&self,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
Expand All @@ -203,8 +207,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(|c| KzgVerifiedCustodyDataColumn::new(c, kzg))
.collect::<Result<Vec<_>, _>>()?;

self.availability_cache
.put_kzg_verified_data_columns(block_root, verified_custody_columns)
self.availability_cache.put_kzg_verified_data_columns(
kzg,
block_root,
verified_custody_columns,
)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand All @@ -225,18 +232,23 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// Otherwise cache the data column sidecar.
///
/// This should only accept gossip verified data columns, so we should not have to worry about dupes.
#[allow(clippy::type_complexity)]
pub fn put_gossip_data_column(
&self,
gossip_data_column: GossipVerifiedDataColumn<T>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
let Some(kzg) = self.kzg.as_ref() else {
return Err(AvailabilityCheckError::KzgNotInitialized);
};
let block_root = gossip_data_column.block_root();

// TODO(das): ensure that our custody requirements include this column
let custody_column =
KzgVerifiedCustodyDataColumn::from_asserted_custody(gossip_data_column.into_inner());

self.availability_cache
.put_kzg_verified_data_columns(block_root, vec![custody_column])
.put_kzg_verified_data_columns(kzg, block_root, vec![custody_column])
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down
Loading

0 comments on commit 562e9d0

Please sign in to comment.