Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix misc PeerDAS todos #6862

Open
wants to merge 12 commits into
base: unstable
Choose a base branch
from
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2924,10 +2924,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Only completed sampling results are received. Blocks are unavailable by default and should
/// be pruned on finalization, on a timeout or by a max count.
pub async fn process_sampling_completed(self: &Arc<Self>, block_root: Hash256) {
// TODO(das): update fork-choice
// TODO(das): update fork-choice, act on sampling result, adjust log level
// NOTE: It is possible that sampling complets before block is imported into fork choice,
// in that case we may need to update availability cache.
// TODO(das): These log levels are too high, reduce once DAS matures
info!(self.log, "Sampling completed"; "block_root" => %block_root);
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,6 @@ impl<E: EthSpec> PendingComponents<E> {
self.get_cached_blobs().iter().flatten().count()
}

/// Checks if a data column of a given index exists in the cache.
///
/// Returns:
/// - `true` if a data column for the given index exists.
/// - `false` otherwise.
fn data_column_exists(&self, data_column_index: u64) -> bool {
self.get_cached_data_column(data_column_index).is_some()
}

/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.verified_data_columns.len()
Expand Down Expand Up @@ -182,8 +173,7 @@ impl<E: EthSpec> PendingComponents<E> {
kzg_verified_data_columns: I,
) -> Result<(), AvailabilityCheckError> {
for data_column in kzg_verified_data_columns {
// TODO(das): Add equivalent checks for data columns if necessary
if !self.data_column_exists(data_column.index()) {
if self.get_cached_data_column(data_column.index()).is_none() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks not necessary, we check the inclusion proof both via RPC and gossip. Inlined the function as it makes it easier to follow what's actually doing the data_column_exists check

self.verified_data_columns.push(data_column);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,13 +1328,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
return None;
}
Err(e @ BlockError::BlobNotRequired(_)) => {
// TODO(das): penalty not implemented yet as other clients may still send us blobs
// during early stage of implementation.
debug!(self.log, "Received blobs for slot after PeerDAS epoch from peer";
"error" => %e,
"peer_id" => %peer_id,
);
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
// Receiving blobs when not required just spends unnecessary bandwidth of us. If the
// peer is malicious the damange is mild. If the peer is faulty, it's likely due to
// a non-updated node which will be quickly disconnected due to other faults.
self.gossip_penalize_peer(
peer_id,
PeerAction::MidToleranceError,
"blob_not_required",
);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented the penalty, all implementations should not send blobs by now @jimmygchen

return None;
}
};
Expand Down Expand Up @@ -1447,9 +1453,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;

// TODO(das) Might be too early to issue a request here. We haven't checked that the block
// actually includes blob transactions and thus has data. A peer could send a block is
// garbage commitments, and make us trigger sampling for a block that does not have data.
// Note: okay to issue sampling request before the block is execution verified. If the
// proposer sends us a block with invalid blob transactions it can trigger us to issue
// sampling queries that will never resolve. This attack is equivalent to withholding data.
// Dismissed proposal to move this block to post-execution: https://github.com/sigp/lighthouse/pull/6492
if block.num_expected_blobs() > 0 {
// Trigger sampling for block not yet execution valid. At this point column custodials are
// unlikely to have received their columns. Triggering sampling so early is only viable with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,31 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: Arc<NetworkBeaconProcessor<T>>,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
_seen_timestamp: Duration,
seen_timestamp: Duration,
process_type: BlockProcessType,
) {
// custody_columns must always have at least one element
let Some(slot) = custody_columns.iter().map(|d| d.slot()).next() else {
return;
};

if let Ok(current_slot) = self.chain.slot() {
if current_slot == slot {
let delay = get_slot_delay_ms(seen_timestamp, slot, &self.chain.slot_clock);
metrics::observe_duration(&metrics::BEACON_BLOB_RPC_SLOT_START_DELAY_TIME, delay);
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use seen_timestamp is the same metric for blobs. Should we generalize the name?


let mut indices = custody_columns.iter().map(|d| d.index).collect::<Vec<_>>();
indices.sort_unstable();
debug!(
self.log,
"RPC custody data columns received";
"indices" => ?indices,
"block_root" => %block_root,
"slot" => %slot,
);

let mut result = self
.chain
.process_rpc_custody_columns(custody_columns)
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// continue_request will send for processing as the request state is AwaitingProcessing
}
Err(e) => {
// TODO(das): is it okay to not log the peer source of request failures? Then we
// should log individual requests failures in the SyncNetworkContext
// No need to log peer source here. When sending a DataColumnsByRoot request we log
// the peer and the request ID which is linked to this `id` value here.
debug!(self.log,
"Received lookup download failure";
"block_root" => ?block_root,
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,12 +1187,10 @@ impl<T: BeaconChainTypes> SyncManager<T> {
requester: CustodyRequester,
response: CustodyByRootResult<T::EthSpec>,
) {
// TODO(das): get proper timestamp
let seen_timestamp = timestamp_now();
self.block_lookups
.on_download_response::<CustodyRequestState<T::EthSpec>>(
requester.0,
response.map(|(columns, peer_group)| (columns, peer_group, seen_timestamp)),
response,
&mut self.network,
);
}
Expand Down
18 changes: 10 additions & 8 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ pub enum RpcEvent<T> {

pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;

pub type CustodyByRootResult<T> = Result<(DataColumnSidecarList<T>, PeerGroup), RpcResponseError>;
/// Duration = latest seen timestamp of all received data columns
pub type CustodyByRootResult<T> =
Result<(DataColumnSidecarList<T>, PeerGroup, Duration), RpcResponseError>;

#[derive(Debug)]
pub enum RpcResponseError {
Expand Down Expand Up @@ -1084,7 +1086,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to
// an Option first to use in an `if let Some() { act on result }` block.
match result.as_ref() {
Some(Ok((columns, peer_group))) => {
Some(Ok((columns, peer_group, _))) => {
debug!(self.log, "Custody request success, removing"; "id" => ?id, "count" => columns.len(), "peers" => ?peer_group)
}
Some(Err(e)) => {
Expand All @@ -1102,7 +1104,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
id: Id,
block_root: Hash256,
block: RpcBlock<T::EthSpec>,
duration: Duration,
seen_timestamp: Duration,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
Expand All @@ -1115,7 +1117,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.send_rpc_beacon_block(
block_root,
block,
duration,
seen_timestamp,
BlockProcessType::SingleBlock { id },
)
.map_err(|e| {
Expand All @@ -1133,7 +1135,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
id: Id,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
duration: Duration,
seen_timestamp: Duration,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
.beacon_processor_if_enabled()
Expand All @@ -1146,7 +1148,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.send_rpc_blobs(
block_root,
blobs,
duration,
seen_timestamp,
BlockProcessType::SingleBlob { id },
)
.map_err(|e| {
Expand All @@ -1164,7 +1166,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
_id: Id,
block_root: Hash256,
custody_columns: DataColumnSidecarList<T::EthSpec>,
duration: Duration,
seen_timestamp: Duration,
process_type: BlockProcessType,
) -> Result<(), SendErrorProcessor> {
let beacon_processor = self
Expand All @@ -1174,7 +1176,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type);

beacon_processor
.send_rpc_custody_columns(block_root, custody_columns, duration, process_type)
.send_rpc_custody_columns(block_root, custody_columns, seen_timestamp, process_type)
.map_err(|e| {
error!(
self.log,
Expand Down
34 changes: 22 additions & 12 deletions beacon_node/network/src/sync/network_context/custody.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::sync::network_context::{
DataColumnsByRootRequestId, DataColumnsByRootSingleBlockRequest,
};

use beacon_chain::validator_monitor::timestamp_now;
use beacon_chain::BeaconChainTypes;
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
Expand Down Expand Up @@ -61,7 +61,8 @@ struct ActiveBatchColumnsRequest {
indices: Vec<ColumnIndex>,
}

pub type CustodyRequestResult<E> = Result<Option<(DataColumnSidecarList<E>, PeerGroup)>, Error>;
pub type CustodyRequestResult<E> =
Result<Option<(DataColumnSidecarList<E>, PeerGroup, Duration)>, Error>;

impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
pub(crate) fn new(
Expand Down Expand Up @@ -102,8 +103,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
resp: RpcResponseResult<DataColumnSidecarList<T::EthSpec>>,
cx: &mut SyncNetworkContext<T>,
) -> CustodyRequestResult<T::EthSpec> {
// TODO(das): Should downscore peers for verify errors here
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, verify errors are handled in network_context.rs and downscored there


let Some(batch_request) = self.active_batch_columns_requests.get_mut(&req_id) else {
warn!(self.log,
"Received custody column response for unrequested index";
Expand All @@ -115,7 +114,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
};

match resp {
Ok((data_columns, _seen_timestamp)) => {
Ok((data_columns, seen_timestamp)) => {
debug!(self.log,
"Custody column download success";
"id" => ?self.custody_id,
Expand All @@ -141,7 +140,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.ok_or(Error::BadState("unknown column_index".to_owned()))?;

if let Some(data_column) = data_columns.remove(column_index) {
column_request.on_download_success(req_id, peer_id, data_column)?;
column_request.on_download_success(
req_id,
peer_id,
data_column,
seen_timestamp,
)?;
} else {
// Peer does not have the requested data.
// TODO(das) do not consider this case a success. We know for sure the block has
Expand Down Expand Up @@ -204,20 +208,23 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
if self.column_requests.values().all(|r| r.is_downloaded()) {
// All requests have completed successfully.
let mut peers = HashMap::<PeerId, Vec<usize>>::new();
let mut seen_timestamps = vec![];
let columns = std::mem::take(&mut self.column_requests)
.into_values()
.map(|request| {
let (peer, data_column) = request.complete()?;
let (peer, data_column, seen_timestamp) = request.complete()?;
peers
.entry(peer)
.or_default()
.push(data_column.index as usize);
seen_timestamps.push(seen_timestamp);
Ok(data_column)
})
.collect::<Result<Vec<_>, _>>()?;

let peer_group = PeerGroup::from_set(peers);
return Ok(Some((columns, peer_group)));
let max_seen_timestamp = seen_timestamps.into_iter().max().unwrap_or(timestamp_now());
return Ok(Some((columns, peer_group, max_seen_timestamp)));
}

let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
Expand Down Expand Up @@ -335,7 +342,7 @@ struct ColumnRequest<E: EthSpec> {
enum Status<E: EthSpec> {
NotStarted(Instant),
Downloading(DataColumnsByRootRequestId),
Downloaded(PeerId, Arc<DataColumnSidecar<E>>),
Downloaded(PeerId, Arc<DataColumnSidecar<E>>, Duration),
}

impl<E: EthSpec> ColumnRequest<E> {
Expand Down Expand Up @@ -404,6 +411,7 @@ impl<E: EthSpec> ColumnRequest<E> {
req_id: DataColumnsByRootRequestId,
peer_id: PeerId,
data_column: Arc<DataColumnSidecar<E>>,
seen_timestamp: Duration,
) -> Result<(), Error> {
match &self.status {
Status::Downloading(expected_req_id) => {
Expand All @@ -413,7 +421,7 @@ impl<E: EthSpec> ColumnRequest<E> {
req_id,
});
}
self.status = Status::Downloaded(peer_id, data_column);
self.status = Status::Downloaded(peer_id, data_column, seen_timestamp);
Ok(())
}
other => Err(Error::BadState(format!(
Expand All @@ -422,9 +430,11 @@ impl<E: EthSpec> ColumnRequest<E> {
}
}

fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>), Error> {
fn complete(self) -> Result<(PeerId, Arc<DataColumnSidecar<E>>, Duration), Error> {
match self.status {
Status::Downloaded(peer_id, data_column) => Ok((peer_id, data_column)),
Status::Downloaded(peer_id, data_column, seen_timestamp) => {
Ok((peer_id, data_column, seen_timestamp))
}
other => Err(Error::BadState(format!(
"bad state complete expected Downloaded got {other:?}"
))),
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,6 @@ impl TestRig {
self.complete_data_columns_by_root_request(id, data_columns);

// Expect work event
// TODO(das): worth it to append sender id to the work event for stricter assertion?
self.expect_rpc_sample_verify_work_event();

// Respond with valid result
Expand Down Expand Up @@ -753,7 +752,6 @@ impl TestRig {
}

// Expect work event
// TODO(das): worth it to append sender id to the work event for stricter assertion?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, there's a single request in this test

self.expect_rpc_custody_column_work_event();

// Respond with valid result
Expand Down
2 changes: 2 additions & 0 deletions consensus/types/src/data_column_custody_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub enum DataColumnCustodyGroupError {
/// The `get_custody_groups` function is used to determine the custody groups that a node is
/// assigned to.
///
/// Note: `get_custody_groups(node_id, x)` is a subset of `get_custody_groups(node_id, y)` if `x < y`.
///
/// spec: https://github.com/ethereum/consensus-specs/blob/8e0d0d48e81d6c7c5a8253ab61340f5ea5bac66a/specs/fulu/das-core.md#get_custody_groups
pub fn get_custody_groups(
raw_node_id: [u8; 32],
Expand Down
Loading