diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index ad6bea455ec..2f6200a836b 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -411,6 +411,27 @@ impl OldBlocksByRangeRequest { } } +impl From for OldBlocksByRangeRequest { + fn from(req: BlocksByRangeRequest) -> Self { + match req { + BlocksByRangeRequest::V1(ref req) => { + OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }) + } + BlocksByRangeRequest::V2(ref req) => { + OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }) + } + } + } +} + /// Request a number of beacon block bodies from a peer. #[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))] #[derive(Clone, Debug, PartialEq)] diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index e0c8593f29f..ae63e5cdb5a 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -217,7 +217,7 @@ mod tests { use crate::rpc::rate_limiter::Quota; use crate::rpc::self_limiter::SelfRateLimiter; use crate::rpc::{Ping, Protocol, RequestType}; - use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId}; + use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId}; use libp2p::PeerId; use std::time::Duration; use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot}; @@ -238,12 +238,16 @@ mod tests { let mut limiter: SelfRateLimiter = SelfRateLimiter::new(config, fork_context, log).unwrap(); let peer_id = PeerId::random(); + let lookup_id = 0; for i in 1..=5u32 { let _ = limiter.allows( peer_id, - RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { - id: i, + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + lookup_id, + req_id: i, + }, })), RequestType::Ping(Ping { data: i as u64 }), ); @@ -261,9 +265,9 @@ mod tests { for i in 2..=5u32 { assert!(matches!( iter.next().unwrap().request_id, - RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { - id, - })) if id == i + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + })) if req_id == i, )); } @@ -286,9 +290,9 @@ mod tests { for i in 3..=5 { assert!(matches!( iter.next().unwrap().request_id, - RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { - id - })) if id == i + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + })) if req_id == i, )); } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 85fabbb0c3c..800d988d1a9 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap, + BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, }; @@ -31,8 +31,12 @@ pub enum SyncRequestId { SingleBlob { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), - /// Range request that is composed by both a block range request and a blob range request. - RangeBlockAndBlobs { id: Id }, + /// Blocks by range request + BlocksByRange(BlocksByRangeRequestId), + /// Blobs by range request + BlobsByRange(BlobsByRangeRequestId), + /// Data columns by range request + DataColumnsByRange(DataColumnsByRangeRequestId), } /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. @@ -43,12 +47,60 @@ pub struct DataColumnsByRootRequestId { pub requester: DataColumnsByRootRequester, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct BlocksByRangeRequestId { + /// Id to identify this attempt at a blocks_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct BlobsByRangeRequestId { + /// Id to identify this attempt at a blobs_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRangeRequestId { + /// Id to identify this attempt at a data_columns_by_range request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Range request for block components. + pub parent_request_id: ComponentsByRangeRequestId, +} + +/// Block components by range request for range sync. Includes an ID for downstream consumers to +/// handle retries and tie all their sub requests together. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct ComponentsByRangeRequestId { + /// Each `RangeRequestId` may request the same data in a later retry. This Id identifies the + /// current attempt. + pub id: Id, + /// What sync component is issuing a components by range request and expecting data back + pub requester: RangeRequestId, +} + +/// Range sync chain or backfill batch +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum RangeRequestId { + RangeSync { chain_id: Id, batch_id: Epoch }, + BackfillSync { batch_id: Epoch }, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), Custody(CustodyId), } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum RangeRequester { + RangeSync { chain_id: u64, batch_id: Epoch }, + BackfillSync { batch_id: Epoch }, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub struct SamplingId { pub id: SamplingRequester, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 41b9f2c91ed..36e5c391e91 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -624,7 +624,7 @@ impl Router { ) { let request_id = match request_id { AppRequestId::Sync(sync_id) => match sync_id { - id @ SyncRequestId::RangeBlockAndBlobs { .. } => id, + id @ SyncRequestId::BlocksByRange { .. } => id, other => { crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other); return; diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 70a3fe4f5aa..6c8a8eab633 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,7 +1,6 @@ use beacon_chain::{ block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, }; -use lighthouse_network::PeerId; use std::{ collections::{HashMap, VecDeque}, sync::Arc, @@ -29,9 +28,6 @@ pub struct RangeBlockComponentsRequest { /// Used to determine if the number of data columns stream termination this accumulator should /// wait for. This may be less than the number of `expects_custody_columns` due to request batching. num_custody_column_requests: Option, - /// The peers the request was made to. - pub(crate) peer_ids: Vec, - max_blobs_per_block: usize, } impl RangeBlockComponentsRequest { @@ -39,8 +35,6 @@ impl RangeBlockComponentsRequest { expects_blobs: bool, expects_custody_columns: Option>, num_custody_column_requests: Option, - peer_ids: Vec, - max_blobs_per_block: usize, ) -> Self { Self { blocks: <_>::default(), @@ -52,50 +46,42 @@ impl RangeBlockComponentsRequest { expects_blobs, expects_custody_columns, num_custody_column_requests, - peer_ids, - max_blobs_per_block, } } - // TODO: This function should be deprecated when simplying the retry mechanism of this range - // requests. - pub fn get_requirements(&self) -> (bool, Option>) { - (self.expects_blobs, self.expects_custody_columns.clone()) - } - - pub fn add_block_response(&mut self, block_opt: Option>>) { - match block_opt { - Some(block) => self.blocks.push_back(block), - None => self.is_blocks_stream_terminated = true, + pub fn add_blocks(&mut self, blocks: Vec>>) { + for block in blocks { + self.blocks.push_back(block); } + self.is_blocks_stream_terminated = true; } - pub fn add_sidecar_response(&mut self, sidecar_opt: Option>>) { - match sidecar_opt { - Some(sidecar) => self.blobs.push_back(sidecar), - None => self.is_sidecars_stream_terminated = true, + pub fn add_blobs(&mut self, blobs: Vec>>) { + for blob in blobs { + self.blobs.push_back(blob); } + self.is_sidecars_stream_terminated = true; } - pub fn add_data_column(&mut self, column_opt: Option>>) { - match column_opt { - Some(column) => self.data_columns.push_back(column), - // TODO(das): this mechanism is dangerous, if somehow there are two requests for the - // same column index it can terminate early. This struct should track that all requests - // for all custody columns terminate. - None => self.custody_columns_streams_terminated += 1, + pub fn add_custody_columns(&mut self, columns: Vec>>) { + for column in columns { + self.data_columns.push_back(column); } + // TODO(das): this mechanism is dangerous, if somehow there are two requests for the + // same column index it can terminate early. This struct should track that all requests + // for all custody columns terminate. + self.custody_columns_streams_terminated += 1; } pub fn into_responses(self, spec: &ChainSpec) -> Result>, String> { if let Some(expects_custody_columns) = self.expects_custody_columns.clone() { self.into_responses_with_custody_columns(expects_custody_columns, spec) } else { - self.into_responses_with_blobs() + self.into_responses_with_blobs(spec) } } - fn into_responses_with_blobs(self) -> Result>, String> { + fn into_responses_with_blobs(self, spec: &ChainSpec) -> Result>, String> { let RangeBlockComponentsRequest { blocks, blobs, .. } = self; // There can't be more more blobs than blocks. i.e. sending any blob (empty @@ -103,7 +89,8 @@ impl RangeBlockComponentsRequest { let mut responses = Vec::with_capacity(blocks.len()); let mut blob_iter = blobs.into_iter().peekable(); for block in blocks.into_iter() { - let mut blob_list = Vec::with_capacity(self.max_blobs_per_block); + let max_blobs_per_block = spec.max_blobs_per_block(block.epoch()) as usize; + let mut blob_list = Vec::with_capacity(max_blobs_per_block); while { let pair_next_blob = blob_iter .peek() @@ -114,7 +101,7 @@ impl RangeBlockComponentsRequest { blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?); } - let mut blobs_buffer = vec![None; self.max_blobs_per_block]; + let mut blobs_buffer = vec![None; max_blobs_per_block]; for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { @@ -128,7 +115,7 @@ impl RangeBlockComponentsRequest { } let blobs = RuntimeVariableList::new( blobs_buffer.into_iter().flatten().collect::>(), - self.max_blobs_per_block, + max_blobs_per_block, ) .map_err(|_| "Blobs returned exceeds max length".to_string())?; responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?) @@ -246,30 +233,25 @@ mod tests { use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; - use lighthouse_network::PeerId; use rand::SeedableRng; - use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E}; + use std::sync::Arc; + use types::{test_utils::XorShiftRng, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; #[test] fn no_blobs_into_responses() { let spec = test_spec::(); - let peer_id = PeerId::random(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng, &spec) .0 + .into() }) - .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize; - let mut info = - RangeBlockComponentsRequest::::new(false, None, None, vec![peer_id], max_len); + .collect::>>>(); + let mut info = RangeBlockComponentsRequest::::new(false, None, None); // Send blocks and complete terminate response - for block in blocks { - info.add_block_response(Some(block.into())); - } - info.add_block_response(None); + info.add_blocks(blocks); // Assert response is finished and RpcBlocks can be constructed assert!(info.is_finished()); @@ -279,7 +261,6 @@ mod tests { #[test] fn empty_blobs_into_responses() { let spec = test_spec::(); - let peer_id = PeerId::random(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -291,19 +272,15 @@ mod tests { &spec, ) .0 + .into() }) - .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().epoch()) as usize; - let mut info = - RangeBlockComponentsRequest::::new(true, None, None, vec![peer_id], max_len); + .collect::>>>(); + let mut info = RangeBlockComponentsRequest::::new(true, None, None); // Send blocks and complete terminate response - for block in blocks { - info.add_block_response(Some(block.into())); - } - info.add_block_response(None); + info.add_blocks(blocks); // Expect no blobs returned - info.add_sidecar_response(None); + info.add_blobs(vec![]); // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. // This makes sure we don't expect blobs here when they have expired. Checking this logic should @@ -316,7 +293,6 @@ mod tests { fn rpc_block_with_custody_columns() { let spec = test_spec::(); let expects_custody_columns = vec![1, 2, 3, 4]; - let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -328,34 +304,24 @@ mod tests { ) }) .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize; let mut info = RangeBlockComponentsRequest::::new( false, Some(expects_custody_columns.clone()), Some(expects_custody_columns.len()), - vec![PeerId::random()], - max_len, ); // Send blocks and complete terminate response - for block in &blocks { - info.add_block_response(Some(block.0.clone().into())); - } - info.add_block_response(None); + info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect()); // Assert response is not finished assert!(!info.is_finished()); - // Send data columns interleaved - for block in &blocks { - for column in &block.1 { - if expects_custody_columns.contains(&column.index) { - info.add_data_column(Some(column.clone())); - } - } - } - - // Terminate the requests - for (i, _column_index) in expects_custody_columns.iter().enumerate() { - info.add_data_column(None); + // Send data columns + for (i, &column_index) in expects_custody_columns.iter().enumerate() { + info.add_custody_columns( + blocks + .iter() + .flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned()) + .collect(), + ); if i < expects_custody_columns.len() - 1 { assert!( @@ -377,8 +343,21 @@ mod tests { #[test] fn rpc_block_with_custody_columns_batched() { let spec = test_spec::(); - let expects_custody_columns = vec![1, 2, 3, 4]; - let num_of_data_column_requests = 2; + let batched_column_requests = [vec![1_u64, 2], vec![3, 4]]; + let expects_custody_columns = batched_column_requests + .iter() + .flatten() + .cloned() + .collect::>(); + let custody_column_request_ids = + (0..batched_column_requests.len() as u32).collect::>(); + let num_of_data_column_requests = custody_column_request_ids.len(); + + let mut info = RangeBlockComponentsRequest::::new( + false, + Some(expects_custody_columns.clone()), + Some(num_of_data_column_requests), + ); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) @@ -391,34 +370,25 @@ mod tests { ) }) .collect::>(); - let max_len = spec.max_blobs_per_block(blocks.first().unwrap().0.epoch()) as usize; - let mut info = RangeBlockComponentsRequest::::new( - false, - Some(expects_custody_columns.clone()), - Some(num_of_data_column_requests), - vec![PeerId::random()], - max_len, - ); + // Send blocks and complete terminate response - for block in &blocks { - info.add_block_response(Some(block.0.clone().into())); - } - info.add_block_response(None); + info.add_blocks(blocks.iter().map(|b| b.0.clone().into()).collect()); // Assert response is not finished assert!(!info.is_finished()); - // Send data columns interleaved - for block in &blocks { - for column in &block.1 { - if expects_custody_columns.contains(&column.index) { - info.add_data_column(Some(column.clone())); - } - } - } + for (i, column_indices) in batched_column_requests.iter().enumerate() { + // Send the set of columns in the same batch request + info.add_custody_columns( + blocks + .iter() + .flat_map(|b| { + b.1.iter() + .filter(|d| column_indices.contains(&d.index)) + .cloned() + }) + .collect::>(), + ); - // Terminate the requests - for i in 0..num_of_data_column_requests { - info.add_data_column(None); if i < num_of_data_column_requests - 1 { assert!( !info.is_finished(), diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fd91dc78b13..fc31e837277 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -36,7 +36,7 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; use super::network_context::{ - BlockOrBlob, CustodyByRootResult, RangeRequestId, RpcEvent, SyncNetworkContext, + CustodyByRootResult, RangeBlockComponent, RangeRequestId, RpcEvent, SyncNetworkContext, }; use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; @@ -47,7 +47,6 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; -use crate::sync::block_sidecar_coupling::RangeBlockComponentsRequest; use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::validator_monitor::timestamp_now; @@ -57,8 +56,9 @@ use beacon_chain::{ use futures::StreamExt; use lighthouse_network::rpc::RPCError; use lighthouse_network::service::api_types::{ - CustodyRequester, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SamplingId, - SamplingRequester, SingleLookupReqId, SyncRequestId, + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester, + DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, + SamplingId, SamplingRequester, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::types::{NetworkGlobals, SyncState}; use lighthouse_network::SyncInfo; @@ -491,36 +491,14 @@ impl SyncManager { SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) } - SyncRequestId::RangeBlockAndBlobs { id } => { - if let Some(sender_id) = self.network.range_request_failed(id) { - match sender_id { - RangeRequestId::RangeSync { chain_id, batch_id } => { - self.range_sync.inject_error( - &mut self.network, - peer_id, - batch_id, - chain_id, - id, - ); - self.update_sync_state(); - } - RangeRequestId::BackfillSync { batch_id } => match self - .backfill_sync - .inject_error(&mut self.network, batch_id, &peer_id, id) - { - Ok(_) => {} - Err(_) => self.update_sync_state(), - }, - } - } else { - debug!( - self.log, - "RPC error for range request has no associated entry in network context, ungraceful disconnect"; - "peer_id" => %peer_id, - "request_id" => %id, - "error" => ?error, - ); - } + SyncRequestId::BlocksByRange(req_id) => { + self.on_blocks_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) + } + SyncRequestId::BlobsByRange(req_id) => { + self.on_blobs_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) + } + SyncRequestId::DataColumnsByRange(req_id) => { + self.on_data_columns_by_range_response(req_id, peer_id, RpcEvent::RPCError(error)) } } } @@ -1051,14 +1029,13 @@ impl SyncManager { SyncRequestId::SingleBlock { id } => self.on_single_block_response( id, peer_id, - match block { - Some(block) => RpcEvent::Response(block, seen_timestamp), - None => RpcEvent::StreamTermination, - }, + RpcEvent::from_chunk(block, seen_timestamp), + ), + SyncRequestId::BlocksByRange(id) => self.on_blocks_by_range_response( + id, + peer_id, + RpcEvent::from_chunk(block, seen_timestamp), ), - SyncRequestId::RangeBlockAndBlobs { id } => { - self.range_block_and_blobs_response(id, peer_id, block.into()) - } _ => { crit!(self.log, "bad request id for block"; "peer_id" => %peer_id ); } @@ -1094,14 +1071,13 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => self.on_single_blob_response( id, peer_id, - match blob { - Some(blob) => RpcEvent::Response(blob, seen_timestamp), - None => RpcEvent::StreamTermination, - }, + RpcEvent::from_chunk(blob, seen_timestamp), + ), + SyncRequestId::BlobsByRange(id) => self.on_blobs_by_range_response( + id, + peer_id, + RpcEvent::from_chunk(blob, seen_timestamp), ), - SyncRequestId::RangeBlockAndBlobs { id } => { - self.range_block_and_blobs_response(id, peer_id, blob.into()) - } _ => { crit!(self.log, "bad request id for blob"; "peer_id" => %peer_id); } @@ -1120,19 +1096,14 @@ impl SyncManager { self.on_data_columns_by_root_response( req_id, peer_id, - match data_column { - Some(data_column) => RpcEvent::Response(data_column, seen_timestamp), - None => RpcEvent::StreamTermination, - }, - ); - } - SyncRequestId::RangeBlockAndBlobs { id } => { - self.range_block_and_blobs_response( - id, - peer_id, - BlockOrBlob::CustodyColumns(data_column), + RpcEvent::from_chunk(data_column, seen_timestamp), ); } + SyncRequestId::DataColumnsByRange(id) => self.on_data_columns_by_range_response( + id, + peer_id, + RpcEvent::from_chunk(data_column, seen_timestamp), + ), _ => { crit!(self.log, "bad request id for data_column"; "peer_id" => %peer_id); } @@ -1188,6 +1159,54 @@ impl SyncManager { } } + fn on_blocks_by_range_response( + &mut self, + id: BlocksByRangeRequestId, + peer_id: PeerId, + block: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_blocks_by_range_response(id, peer_id, block) { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::Block(resp), + ); + } + } + + fn on_blobs_by_range_response( + &mut self, + id: BlobsByRangeRequestId, + peer_id: PeerId, + blob: RpcEvent>>, + ) { + if let Some(resp) = self.network.on_blobs_by_range_response(id, peer_id, blob) { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::Blob(resp), + ); + } + } + + fn on_data_columns_by_range_response( + &mut self, + id: DataColumnsByRangeRequestId, + peer_id: PeerId, + data_column: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_data_columns_by_range_response(id, peer_id, data_column) + { + self.on_range_components_response( + id.parent_request_id, + peer_id, + RangeBlockComponent::CustodyColumns(resp), + ); + } + } + fn on_custody_by_root_result( &mut self, requester: CustodyRequester, @@ -1230,27 +1249,26 @@ impl SyncManager { /// Handles receiving a response for a range sync request that should have both blocks and /// blobs. - fn range_block_and_blobs_response( + fn on_range_components_response( &mut self, - id: Id, + range_request_id: ComponentsByRangeRequestId, peer_id: PeerId, - block_or_blob: BlockOrBlob, + range_block_component: RangeBlockComponent, ) { if let Some(resp) = self .network - .range_block_and_blob_response(id, block_or_blob) + .range_block_component_response(range_request_id, range_block_component) { - let epoch = resp.sender_id.batch_id(); - match resp.responses { + match resp { Ok(blocks) => { - match resp.sender_id { + match range_request_id.requester { RangeRequestId::RangeSync { chain_id, batch_id } => { self.range_sync.blocks_by_range_response( &mut self.network, peer_id, chain_id, batch_id, - id, + range_request_id.id, blocks, ); self.update_sync_state(); @@ -1260,7 +1278,7 @@ impl SyncManager { &mut self.network, batch_id, &peer_id, - id, + range_request_id.id, blocks, ) { Ok(ProcessResult::SyncCompleted) => self.update_sync_state(), @@ -1274,36 +1292,25 @@ impl SyncManager { } } } - Err(e) => { - // Re-insert the request so we can retry - self.network.insert_range_blocks_and_blobs_request( - id, - resp.sender_id, - RangeBlockComponentsRequest::new( - resp.expects_blobs, - resp.expects_custody_columns, - None, - vec![], - self.chain.spec.max_blobs_per_block(epoch) as usize, - ), - ); - // inform range that the request needs to be treated as failed - // With time we will want to downgrade this log - warn!( - self.log, - "Blocks and blobs request for range received invalid data"; - "peer_id" => %peer_id, - "sender_id" => ?resp.sender_id, - "error" => e.clone() - ); - let id = SyncRequestId::RangeBlockAndBlobs { id }; - self.network.report_peer( - peer_id, - PeerAction::MidToleranceError, - "block_blob_faulty_batch", - ); - self.inject_error(peer_id, id, RPCError::InvalidData(e)) - } + Err(_) => match range_request_id.requester { + RangeRequestId::RangeSync { chain_id, batch_id } => { + self.range_sync.inject_error( + &mut self.network, + peer_id, + batch_id, + chain_id, + range_request_id.id, + ); + self.update_sync_state(); + } + RangeRequestId::BackfillSync { batch_id } => match self + .backfill_sync + .inject_error(&mut self.network, batch_id, &peer_id, range_request_id.id) + { + Ok(_) => {} + Err(_) => self.update_sync_state(), + }, + }, } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 4135f901b1c..0cd21de7f41 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -5,7 +5,7 @@ use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; -use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; +use super::range_sync::ByRangeRequestType; use super::SyncMessage; use crate::metrics; use crate::network_beacon_processor::NetworkBeaconProcessor; @@ -17,13 +17,12 @@ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; -use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, - OldBlocksByRangeRequestV1, OldBlocksByRangeRequestV2, -}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; +pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ - AppRequestId, CustodyId, CustodyRequester, DataColumnsByRootRequestId, + AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + CustodyId, CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; @@ -32,8 +31,8 @@ use rand::prelude::IteratorRandom; use rand::thread_rng; pub use requests::LookupVerifyError; use requests::{ - ActiveRequests, BlobsByRootRequestItems, BlocksByRootRequestItems, - DataColumnsByRootRequestItems, + ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, + BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, }; use slog::{debug, error, warn}; use std::collections::hash_map::Entry; @@ -50,33 +49,6 @@ use types::{ pub mod custody; mod requests; -pub struct BlocksAndBlobsByRangeResponse { - pub sender_id: RangeRequestId, - pub responses: Result>, String>, - pub expects_blobs: bool, - pub expects_custody_columns: Option>, -} - -#[derive(Debug, Clone, Copy)] -pub enum RangeRequestId { - RangeSync { - chain_id: ChainId, - batch_id: BatchId, - }, - BackfillSync { - batch_id: BatchId, - }, -} - -impl RangeRequestId { - pub fn batch_id(&self) -> BatchId { - match self { - RangeRequestId::RangeSync { batch_id, .. } => *batch_id, - RangeRequestId::BackfillSync { batch_id, .. } => *batch_id, - } - } -} - #[derive(Debug)] pub enum RpcEvent { StreamTermination, @@ -84,6 +56,15 @@ pub enum RpcEvent { RPCError(RPCError), } +impl RpcEvent { + pub fn from_chunk(chunk: Option, seen_timestamp: Duration) -> Self { + match chunk { + Some(item) => RpcEvent::Response(item, seen_timestamp), + None => RpcEvent::StreamTermination, + } + } +} + pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; pub type CustodyByRootResult = Result<(DataColumnSidecarList, PeerGroup), RpcResponseError>; @@ -93,6 +74,7 @@ pub enum RpcResponseError { RpcError(RPCError), VerifyError(LookupVerifyError), CustodyRequestError(CustodyRequestError), + BlockComponentCouplingError(String), } #[derive(Debug, PartialEq, Eq)] @@ -110,16 +92,6 @@ pub enum SendErrorProcessor { ProcessorNotAvailable, } -impl std::fmt::Display for RpcResponseError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - RpcResponseError::RpcError(e) => write!(f, "RPC Error: {:?}", e), - RpcResponseError::VerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), - RpcResponseError::CustodyRequestError(e) => write!(f, "Custody Request Error: {:?}", e), - } - } -} - impl From for RpcResponseError { fn from(e: RPCError) -> Self { RpcResponseError::RpcError(e) @@ -199,13 +171,22 @@ pub struct SyncNetworkContext { /// A mapping of active DataColumnsByRoot requests data_columns_by_root_requests: ActiveRequests>, + /// A mapping of active BlocksByRange requests + blocks_by_range_requests: + ActiveRequests>, + /// A mapping of active BlobsByRange requests + blobs_by_range_requests: + ActiveRequests>, + /// A mapping of active DataColumnsByRange requests + data_columns_by_range_requests: + ActiveRequests>, /// Mapping of active custody column requests for a block root custody_by_root_requests: FnvHashMap>, - /// BlocksByRange requests paired with BlobsByRange - range_block_components_requests: - FnvHashMap)>, + /// BlocksByRange requests paired with other ByRange requests for data components + components_by_range_requests: + FnvHashMap>, /// Whether the ee is online. If it's not, we don't allow access to the /// `beacon_processor_send`. @@ -223,22 +204,10 @@ pub struct SyncNetworkContext { } /// Small enumeration to make dealing with block and blob requests easier. -pub enum BlockOrBlob { - Block(Option>>), - Blob(Option>>), - CustodyColumns(Option>>), -} - -impl From>>> for BlockOrBlob { - fn from(block: Option>>) -> Self { - BlockOrBlob::Block(block) - } -} - -impl From>>> for BlockOrBlob { - fn from(blob: Option>>) -> Self { - BlockOrBlob::Blob(blob) - } +pub enum RangeBlockComponent { + Block(RpcResponseResult>>>), + Blob(RpcResponseResult>>>), + CustodyColumns(RpcResponseResult>>>), } impl SyncNetworkContext { @@ -256,8 +225,11 @@ impl SyncNetworkContext { blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), + blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), + blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), + data_columns_by_range_requests: ActiveRequests::new("data_columns_by_range"), custody_by_root_requests: <_>::default(), - range_block_components_requests: FnvHashMap::default(), + components_by_range_requests: FnvHashMap::default(), network_beacon_processor, chain, fork_context, @@ -272,37 +244,60 @@ impl SyncNetworkContext { /// Returns the ids of all the requests made to the given peer_id. pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Vec { - let failed_range_ids = - self.range_block_components_requests - .iter() - .filter_map(|(id, request)| { - if request.1.peer_ids.contains(peer_id) { - Some(SyncRequestId::RangeBlockAndBlobs { id: *id }) - } else { - None - } - }); - - let failed_block_ids = self - .blocks_by_root_requests + // Note: using destructuring pattern without a default case to make sure we don't forget to + // add new request types to this function. Otherwise, lookup sync can break and lookups + // will get stuck if a peer disconnects during an active requests. + let Self { + network_send: _, + request_id: _, + blocks_by_root_requests, + blobs_by_root_requests, + data_columns_by_root_requests, + blocks_by_range_requests, + blobs_by_range_requests, + data_columns_by_range_requests, + // custody_by_root_requests is a meta request of data_columns_by_root_requests + custody_by_root_requests: _, + // components_by_range_requests is a meta request of various _by_range requests + components_by_range_requests: _, + execution_engine_state: _, + network_beacon_processor: _, + chain: _, + fork_context: _, + log: _, + } = self; + + let blocks_by_root_ids = blocks_by_root_requests .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlock { id: *id }); - let failed_blob_ids = self - .blobs_by_root_requests + let blobs_by_root_ids = blobs_by_root_requests .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlob { id: *id }); - let failed_data_column_by_root_ids = self - .data_columns_by_root_requests + let data_column_by_root_ids = data_columns_by_root_requests .active_requests_of_peer(peer_id) .into_iter() .map(|req_id| SyncRequestId::DataColumnsByRoot(*req_id)); - - failed_range_ids - .chain(failed_block_ids) - .chain(failed_blob_ids) - .chain(failed_data_column_by_root_ids) + let blocks_by_range_ids = blocks_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::BlocksByRange(*req_id)); + let blobs_by_range_ids = blobs_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::BlobsByRange(*req_id)); + let data_column_by_range_ids = data_columns_by_range_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); + + blocks_by_root_ids + .chain(blobs_by_root_ids) + .chain(data_column_by_root_ids) + .chain(blocks_by_range_ids) + .chain(blobs_by_range_ids) + .chain(data_column_by_range_ids) .collect() } @@ -361,117 +356,62 @@ impl SyncNetworkContext { peer_id: PeerId, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, - sender_id: RangeRequestId, + requester: RangeRequestId, ) -> Result { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let id = self.next_id(); - let mut requested_peers = vec![peer_id]; - debug!( - self.log, - "Sending BlocksByRange request"; - "method" => "BlocksByRange", - "count" => request.count(), - "epoch" => epoch, - "peer" => %peer_id, - "id" => id, - ); - let rpc_request = match request { - BlocksByRangeRequest::V1(ref req) => { - RequestType::BlocksByRange(OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 { - start_slot: req.start_slot, - count: req.count, - step: 1, - })) - } - BlocksByRangeRequest::V2(ref req) => { - RequestType::BlocksByRange(OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 { - start_slot: req.start_slot, - count: req.count, - step: 1, - })) - } + // Create the overall components_by_range request ID before its individual components + let id = ComponentsByRangeRequestId { + id: self.next_id(), + requester, }; - self.network_send - .send(NetworkMessage::SendRequest { - peer_id, - request: rpc_request, - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; - let expected_blobs = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { - debug!( - self.log, - "Sending BlobsByRange requests"; - "method" => "BlobsByRange", - "count" => request.count(), - "epoch" => epoch, - "peer" => %peer_id, - ); + let _blocks_req_id = self.send_blocks_by_range_request(peer_id, request.clone(), id)?; - // Create the blob request based on the blocks request. - self.network_send - .send(NetworkMessage::SendRequest { - peer_id, - request: RequestType::BlobsByRange(BlobsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - }), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; - true + let blobs_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndBlobs) { + Some(self.send_blobs_by_range_request( + peer_id, + BlobsByRangeRequest { + start_slot: *request.start_slot(), + count: *request.count(), + }, + id, + )?) } else { - false + None }; - let (expects_columns, num_of_column_req) = + let (expects_columns, data_column_requests) = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { let column_indexes = self.network_globals().sampling_columns.clone(); - let mut num_of_custody_column_req = 0; - for (peer_id, columns_by_range_request) in - self.make_columns_by_range_requests(request, &column_indexes)? - { - requested_peers.push(peer_id); - - debug!( - self.log, - "Sending DataColumnsByRange requests"; - "method" => "DataColumnsByRange", - "count" => columns_by_range_request.count, - "epoch" => epoch, - "columns" => ?columns_by_range_request.columns, - "peer" => %peer_id, - "id" => id, - ); - - self.send_network_msg(NetworkMessage::SendRequest { - peer_id, - request: RequestType::DataColumnsByRange(columns_by_range_request), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + let data_column_requests = self + .make_columns_by_range_requests(request, &column_indexes)? + .into_iter() + .map(|(peer_id, columns_by_range_request)| { + self.send_data_columns_by_range_request( + peer_id, + columns_by_range_request, + id, + ) }) - .map_err(|_| RpcRequestSendError::NetworkSendError)?; + .collect::, _>>()?; - num_of_custody_column_req += 1; - } - - (Some(column_indexes), Some(num_of_custody_column_req)) + ( + Some(column_indexes.into_iter().collect::>()), + Some(data_column_requests), + ) } else { (None, None) }; - let max_blobs_len = self.chain.spec.max_blobs_per_block(epoch); + let expected_blobs = blobs_req_id.is_some(); let info = RangeBlockComponentsRequest::new( expected_blobs, - expects_columns.map(|c| c.into_iter().collect()), - num_of_column_req, - requested_peers, - max_blobs_len as usize, + expects_columns, + data_column_requests.map(|items| items.len()), ); - self.range_block_components_requests - .insert(id, (sender_id, info)); - Ok(id) + self.components_by_range_requests.insert(id, info); + + Ok(id.id) } fn make_columns_by_range_requests( @@ -508,54 +448,43 @@ impl SyncNetworkContext { Ok(peer_id_to_request_map) } - pub fn range_request_failed(&mut self, request_id: Id) -> Option { - let sender_id = self - .range_block_components_requests - .remove(&request_id) - .map(|(sender_id, _info)| sender_id); - if let Some(sender_id) = sender_id { - debug!( - self.log, - "Sync range request failed"; - "request_id" => request_id, - "sender_id" => ?sender_id - ); - Some(sender_id) - } else { - debug!(self.log, "Sync range request failed"; "request_id" => request_id); - None - } - } - /// Received a blocks by range or blobs by range response for a request that couples blocks ' /// and blobs. - pub fn range_block_and_blob_response( + pub fn range_block_component_response( &mut self, - request_id: Id, - block_or_blob: BlockOrBlob, - ) -> Option> { - let Entry::Occupied(mut entry) = self.range_block_components_requests.entry(request_id) - else { + id: ComponentsByRangeRequestId, + range_block_component: RangeBlockComponent, + ) -> Option>, RpcResponseError>> { + let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else { metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]); return None; }; - let (_, info) = entry.get_mut(); - match block_or_blob { - BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), - BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), - BlockOrBlob::CustodyColumns(column) => info.add_data_column(column), + if let Err(e) = { + let request = entry.get_mut(); + match range_block_component { + RangeBlockComponent::Block(resp) => resp.map(|(blocks, _)| { + request.add_blocks(blocks); + }), + RangeBlockComponent::Blob(resp) => resp.map(|(blobs, _)| { + request.add_blobs(blobs); + }), + RangeBlockComponent::CustodyColumns(resp) => resp.map(|(custody_columns, _)| { + request.add_custody_columns(custody_columns); + }), + } + } { + entry.remove(); + return Some(Err(e)); } - if info.is_finished() { + + if entry.get_mut().is_finished() { // If the request is finished, dequeue everything - let (sender_id, info) = entry.remove(); - let (expects_blobs, expects_custody_columns) = info.get_requirements(); - Some(BlocksAndBlobsByRangeResponse { - sender_id, - responses: info.into_responses(&self.chain.spec), - expects_blobs, - expects_custody_columns, - }) + let request = entry.remove(); + let blocks = request + .into_responses(&self.chain.spec) + .map_err(RpcResponseError::BlockComponentCouplingError); + Some(blocks) } else { None } @@ -831,6 +760,125 @@ impl SyncNetworkContext { } } + fn send_blocks_by_range_request( + &mut self, + peer_id: PeerId, + request: BlocksByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + ) -> Result { + let id = BlocksByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + debug!( + self.log, + "Sending BlocksByRange request"; + "method" => "BlocksByRange", + "count" => request.count(), + "epoch" => Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()), + "peer" => %peer_id, + "id" => ?id, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlocksByRange(request.clone().into()), + request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; + + self.blocks_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + BlocksByRangeRequestItems::new(request), + ); + Ok(id) + } + + fn send_blobs_by_range_request( + &mut self, + peer_id: PeerId, + request: BlobsByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + ) -> Result { + let id = BlobsByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + let request_epoch = Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()); + debug!( + self.log, + "Sending BlobsByRange requests"; + "method" => "BlobsByRange", + "count" => request.count, + "epoch" => request_epoch, + "peer" => %peer_id, + "id" => ?id, + ); + + // Create the blob request based on the blocks request. + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: RequestType::BlobsByRange(request.clone()), + request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; + + let max_blobs_per_block = self.chain.spec.max_blobs_per_block(request_epoch); + self.blobs_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + BlobsByRangeRequestItems::new(request, max_blobs_per_block), + ); + Ok(id) + } + + fn send_data_columns_by_range_request( + &mut self, + peer_id: PeerId, + request: DataColumnsByRangeRequest, + parent_request_id: ComponentsByRangeRequestId, + ) -> Result { + let id = DataColumnsByRangeRequestId { + id: self.next_id(), + parent_request_id, + }; + debug!( + self.log, + "Sending DataColumnsByRange requests"; + "method" => "DataColumnsByRange", + "count" => request.count, + "epoch" => Slot::new(request.start_slot).epoch(T::EthSpec::slots_per_epoch()), + "columns" => ?request.columns, + "peer" => %peer_id, + "id" => ?id, + ); + + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request: RequestType::DataColumnsByRange(request.clone()), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), + }) + .map_err(|_| RpcRequestSendError::NetworkSendError)?; + + self.data_columns_by_range_requests.insert( + id, + peer_id, + // false = do not enforce max_requests are returned for *_by_range methods. We don't + // know if there are missed blocks. + false, + DataColumnsByRangeRequestItems::new(request), + ); + Ok(id) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -929,16 +977,6 @@ impl SyncNetworkContext { } } - pub fn insert_range_blocks_and_blobs_request( - &mut self, - id: Id, - sender_id: RangeRequestId, - info: RangeBlockComponentsRequest, - ) { - self.range_block_components_requests - .insert(id, (sender_id, info)); - } - /// Attempt to make progress on all custody_by_root requests. Some request may be stale waiting /// for custody peers. Returns a Vec of results as zero or more requests may fail in this /// attempt. @@ -1037,6 +1075,41 @@ impl SyncNetworkContext { self.report_rpc_response_errors(resp, peer_id) } + #[allow(clippy::type_complexity)] + pub(crate) fn on_blocks_by_range_response( + &mut self, + id: BlocksByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self.blocks_by_range_requests.on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_blobs_by_range_response( + &mut self, + id: BlobsByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>>> { + let resp = self.blobs_by_range_requests.on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } + + #[allow(clippy::type_complexity)] + pub(crate) fn on_data_columns_by_range_response( + &mut self, + id: DataColumnsByRangeRequestId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>> { + let resp = self + .data_columns_by_range_requests + .on_response(id, rpc_event); + self.report_rpc_response_errors(resp, peer_id) + } + fn report_rpc_response_errors( &mut self, resp: Option>, @@ -1191,21 +1264,27 @@ impl SyncNetworkContext { } pub(crate) fn register_metrics(&self) { - metrics::set_gauge_vec( - &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, - &["blocks_by_root"], - self.blocks_by_root_requests.len() as i64, - ); - metrics::set_gauge_vec( - &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, - &["blobs_by_root"], - self.blobs_by_root_requests.len() as i64, - ); - metrics::set_gauge_vec( - &metrics::SYNC_ACTIVE_NETWORK_REQUESTS, - &["range_blocks"], - self.range_block_components_requests.len() as i64, - ); + for (id, count) in [ + ("blocks_by_root", self.blocks_by_root_requests.len()), + ("blobs_by_root", self.blobs_by_root_requests.len()), + ( + "data_columns_by_root", + self.data_columns_by_root_requests.len(), + ), + ("blocks_by_range", self.blocks_by_range_requests.len()), + ("blobs_by_range", self.blobs_by_range_requests.len()), + ( + "data_columns_by_range", + self.data_columns_by_range_requests.len(), + ), + ("custody_by_root", self.custody_by_root_requests.len()), + ( + "components_by_range", + self.components_by_range_requests.len(), + ), + ] { + metrics::set_gauge_vec(&metrics::SYNC_ACTIVE_NETWORK_REQUESTS, &[id], count as i64); + } } } diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 4a5a16459d3..c9b85e47b69 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -4,10 +4,13 @@ use beacon_chain::validator_monitor::timestamp_now; use fnv::FnvHashMap; use lighthouse_network::PeerId; use strum::IntoStaticStr; -use types::Hash256; +use types::{Hash256, Slot}; +pub use blobs_by_range::BlobsByRangeRequestItems; pub use blobs_by_root::{BlobsByRootRequestItems, BlobsByRootSingleBlockRequest}; +pub use blocks_by_range::BlocksByRangeRequestItems; pub use blocks_by_root::{BlocksByRootRequestItems, BlocksByRootSingleRequest}; +pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; @@ -16,8 +19,11 @@ use crate::metrics; use super::{RpcEvent, RpcResponseResult}; +mod blobs_by_range; mod blobs_by_root; +mod blocks_by_range; mod blocks_by_root; +mod data_columns_by_range; mod data_columns_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -26,8 +32,9 @@ pub enum LookupVerifyError { TooManyResponses, UnrequestedBlockRoot(Hash256), UnrequestedIndex(u64), + UnrequestedSlot(Slot), InvalidInclusionProof, - DuplicateData, + DuplicatedData(Slot, u64), InternalError(String), } diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs new file mode 100644 index 00000000000..9c6f516199c --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_range.rs @@ -0,0 +1,56 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use std::sync::Arc; +use types::{BlobSidecar, EthSpec}; + +/// Accumulates results of a blobs_by_range request. Only returns items after receiving the +/// stream termination. +pub struct BlobsByRangeRequestItems { + request: BlobsByRangeRequest, + items: Vec>>, + max_blobs_per_block: u64, +} + +impl BlobsByRangeRequestItems { + pub fn new(request: BlobsByRangeRequest, max_blobs_per_block: u64) -> Self { + Self { + request, + items: vec![], + max_blobs_per_block, + } + } +} + +impl ActiveRequestItems for BlobsByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, blob: Self::Item) -> Result { + if blob.slot() < self.request.start_slot + || blob.slot() >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(blob.slot())); + } + if blob.index >= self.max_blobs_per_block { + return Err(LookupVerifyError::UnrequestedIndex(blob.index)); + } + if !blob.verify_blob_sidecar_inclusion_proof() { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if self + .items + .iter() + .any(|existing| existing.slot() == blob.slot() && existing.index == blob.index) + { + return Err(LookupVerifyError::DuplicatedData(blob.slot(), blob.index)); + } + + self.items.push(blob); + + // Skip check if blobs are ready as it's rare that all blocks have max blobs + Ok(false) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs index a6702298845..547c51198e4 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -57,7 +57,7 @@ impl ActiveRequestItems for BlobsByRootRequestItems { return Err(LookupVerifyError::UnrequestedIndex(blob.index)); } if self.items.iter().any(|b| b.index == blob.index) { - return Err(LookupVerifyError::DuplicateData); + return Err(LookupVerifyError::DuplicatedData(blob.slot(), blob.index)); } self.items.push(blob); diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs new file mode 100644 index 00000000000..c7d2dda01ea --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_range.rs @@ -0,0 +1,48 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::BlocksByRangeRequest; +use std::sync::Arc; +use types::{EthSpec, SignedBeaconBlock}; + +/// Accumulates results of a blocks_by_range request. Only returns items after receiving the +/// stream termination. +pub struct BlocksByRangeRequestItems { + request: BlocksByRangeRequest, + items: Vec>>, +} + +impl BlocksByRangeRequestItems { + pub fn new(request: BlocksByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for BlocksByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, block: Self::Item) -> Result { + if block.slot().as_u64() < *self.request.start_slot() + || block.slot().as_u64() >= self.request.start_slot() + self.request.count() + { + return Err(LookupVerifyError::UnrequestedSlot(block.slot())); + } + if self + .items + .iter() + .any(|existing| existing.slot() == block.slot()) + { + // DuplicatedData is a common error for all components, default index to 0 + return Err(LookupVerifyError::DuplicatedData(block.slot(), 0)); + } + + self.items.push(block); + + Ok(self.items.len() >= *self.request.count() as usize) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs new file mode 100644 index 00000000000..9dabb2defa0 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_range.rs @@ -0,0 +1,54 @@ +use super::{ActiveRequestItems, LookupVerifyError}; +use lighthouse_network::rpc::methods::DataColumnsByRangeRequest; +use std::sync::Arc; +use types::{DataColumnSidecar, EthSpec}; + +/// Accumulates results of a data_columns_by_range request. Only returns items after receiving the +/// stream termination. +pub struct DataColumnsByRangeRequestItems { + request: DataColumnsByRangeRequest, + items: Vec>>, +} + +impl DataColumnsByRangeRequestItems { + pub fn new(request: DataColumnsByRangeRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for DataColumnsByRangeRequestItems { + type Item = Arc>; + + fn add(&mut self, data_column: Self::Item) -> Result { + if data_column.slot() < self.request.start_slot + || data_column.slot() >= self.request.start_slot + self.request.count + { + return Err(LookupVerifyError::UnrequestedSlot(data_column.slot())); + } + if !self.request.columns.contains(&data_column.index) { + return Err(LookupVerifyError::UnrequestedIndex(data_column.index)); + } + if !data_column.verify_inclusion_proof() { + return Err(LookupVerifyError::InvalidInclusionProof); + } + if self.items.iter().any(|existing| { + existing.slot() == data_column.slot() && existing.index == data_column.index + }) { + return Err(LookupVerifyError::DuplicatedData( + data_column.slot(), + data_column.index, + )); + } + + self.items.push(data_column); + + Ok(self.items.len() >= self.request.count as usize * self.request.columns.len()) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +} diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index 1b8d46ff072..4e02737f086 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -57,7 +57,10 @@ impl ActiveRequestItems for DataColumnsByRootRequestItems { return Err(LookupVerifyError::UnrequestedIndex(data_column.index)); } if self.items.iter().any(|d| d.index == data_column.index) { - return Err(LookupVerifyError::DuplicateData); + return Err(LookupVerifyError::DuplicatedData( + data_column.slot(), + data_column.index, + )); } self.items.push(data_column); diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 53fb55b14da..818fde07b83 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,4 +1,4 @@ -use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; +use beacon_chain::block_verification_types::RpcBlock; use lighthouse_network::rpc::methods::BlocksByRangeRequest; use lighthouse_network::service::api_types::Id; use lighthouse_network::PeerId; @@ -277,36 +277,6 @@ impl BatchInfo { > { match self.state.poison() { BatchState::Downloading(peer, _request_id) => { - // verify that blocks are in range - if let Some(last_slot) = blocks.last().map(|b| b.slot()) { - // the batch is non-empty - let first_slot = blocks[0].slot(); - - let failed_range = if first_slot < self.start_slot { - Some((self.start_slot, first_slot)) - } else if self.end_slot < last_slot { - Some((self.end_slot, last_slot)) - } else { - None - }; - - if let Some((expected, received)) = failed_range { - // this is a failed download, register the attempt and check if the batch - // can be tried again - self.failed_download_attempts.push(peer); - self.state = if self.failed_download_attempts.len() - >= B::max_batch_download_attempts() as usize - { - BatchState::Failed - } else { - // drop the blocks - BatchState::AwaitingDownload - }; - - return Err(Ok((expected, received, self.outcome()))); - } - } - let received = blocks.len(); self.state = BatchState::AwaitingProcessing(peer, blocks, Instant::now()); Ok(received) diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index cfd89f7b44e..f78b44308d1 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -11,7 +11,10 @@ use lighthouse_network::rpc::methods::{ OldBlocksByRangeRequestV2, }; use lighthouse_network::rpc::{RequestType, StatusMessage}; -use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId}; +use lighthouse_network::service::api_types::{ + AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + SyncRequestId, +}; use lighthouse_network::{PeerId, SyncInfo}; use std::time::Duration; use types::{ @@ -28,8 +31,8 @@ pub(crate) enum DataSidecars { enum ByRangeDataRequestIds { PreDeneb, - PrePeerDAS(Id, PeerId), - PostPeerDAS(Vec<(Id, PeerId)>), + PrePeerDAS(BlobsByRangeRequestId, PeerId), + PostPeerDAS(Vec<(DataColumnsByRangeRequestId, PeerId)>), } /// Sync tests are usually written in the form: @@ -151,7 +154,7 @@ impl TestRig { fn find_blocks_by_range_request( &mut self, request_filter: RequestFilter, - ) -> ((Id, PeerId), ByRangeDataRequestIds) { + ) -> ((BlocksByRangeRequestId, PeerId), ByRangeDataRequestIds) { let filter_f = |peer: PeerId, start_slot: u64| { if let Some(expected_epoch) = request_filter.epoch { let epoch = Slot::new(start_slot).epoch(E::slots_per_epoch()).as_u64(); @@ -175,7 +178,7 @@ impl TestRig { RequestType::BlocksByRange(OldBlocksByRangeRequest::V2( OldBlocksByRangeRequestV2 { start_slot, .. }, )), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::BlocksByRange(id)), } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) @@ -190,7 +193,7 @@ impl TestRig { RequestType::DataColumnsByRange(DataColumnsByRangeRequest { start_slot, .. }), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) { @@ -206,7 +209,7 @@ impl TestRig { NetworkMessage::SendRequest { peer_id, request: RequestType::BlobsByRange(BlobsByRangeRequest { start_slot, .. }), - request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + request_id: AppRequestId::Sync(SyncRequestId::BlobsByRange(id)), } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) @@ -225,10 +228,10 @@ impl TestRig { // Complete the request with a single stream termination self.log(&format!( - "Completing BlocksByRange request {blocks_req_id} with empty stream" + "Completing BlocksByRange request {blocks_req_id:?} with empty stream" )); self.send_sync_message(SyncMessage::RpcBlock { - request_id: SyncRequestId::RangeBlockAndBlobs { id: blocks_req_id }, + request_id: SyncRequestId::BlocksByRange(blocks_req_id), peer_id: block_peer, beacon_block: None, seen_timestamp: D, @@ -239,10 +242,10 @@ impl TestRig { ByRangeDataRequestIds::PrePeerDAS(id, peer_id) => { // Complete the request with a single stream termination self.log(&format!( - "Completing BlobsByRange request {id} with empty stream" + "Completing BlobsByRange request {id:?} with empty stream" )); self.send_sync_message(SyncMessage::RpcBlob { - request_id: SyncRequestId::RangeBlockAndBlobs { id }, + request_id: SyncRequestId::BlobsByRange(id), peer_id, blob_sidecar: None, seen_timestamp: D, @@ -252,10 +255,10 @@ impl TestRig { // Complete the request with a single stream termination for (id, peer_id) in data_column_req_ids { self.log(&format!( - "Completing DataColumnsByRange request {id} with empty stream" + "Completing DataColumnsByRange request {id:?} with empty stream" )); self.send_sync_message(SyncMessage::RpcDataColumn { - request_id: SyncRequestId::RangeBlockAndBlobs { id }, + request_id: SyncRequestId::DataColumnsByRange(id), peer_id, data_column: None, seen_timestamp: D,