Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add individual by_range sync requests #6497

Merged
merged 9 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,27 @@ impl OldBlocksByRangeRequest {
}
}

impl From<BlocksByRangeRequest> for OldBlocksByRangeRequest {
fn from(req: BlocksByRangeRequest) -> Self {
match req {
BlocksByRangeRequest::V1(ref req) => {
OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
BlocksByRangeRequest::V2(ref req) => {
OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 {
start_slot: req.start_slot,
count: req.count,
step: 1,
})
}
}
}
}

/// Request a number of beacon block bodies from a peer.
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
Expand Down
22 changes: 13 additions & 9 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod tests {
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{Ping, Protocol, RequestType};
use crate::service::api_types::{AppRequestId, RequestId, SyncRequestId};
use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use std::time::Duration;
use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot};
Expand All @@ -238,12 +238,16 @@ mod tests {
let mut limiter: SelfRateLimiter<RequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, fork_context, log).unwrap();
let peer_id = PeerId::random();
let lookup_id = 0;

for i in 1..=5u32 {
let _ = limiter.allows(
peer_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id: i,
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id,
req_id: i,
},
})),
RequestType::Ping(Ping { data: i as u64 }),
);
Expand All @@ -261,9 +265,9 @@ mod tests {
for i in 2..=5u32 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id,
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}

Expand All @@ -286,9 +290,9 @@ mod tests {
for i in 3..=5 {
assert!(matches!(
iter.next().unwrap().request_id,
RequestId::Application(AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs {
id
})) if id == i
RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
})) if req_id == i,
));
}

Expand Down
58 changes: 55 additions & 3 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use libp2p::swarm::ConnectionId;
use types::{
BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap,
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
};

Expand Down Expand Up @@ -31,8 +31,12 @@ pub enum SyncRequestId {
SingleBlob { id: SingleLookupReqId },
/// Request searching for a set of data columns given a hash and list of column indices.
DataColumnsByRoot(DataColumnsByRootRequestId),
/// Range request that is composed by both a block range request and a blob range request.
RangeBlockAndBlobs { id: Id },
/// Blocks by range request
BlocksByRange(BlocksByRangeRequestId),
/// Blobs by range request
BlobsByRange(BlobsByRangeRequestId),
/// Data columns by range request
DataColumnsByRange(DataColumnsByRangeRequestId),
}

/// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly.
Expand All @@ -43,12 +47,60 @@ pub struct DataColumnsByRootRequestId {
pub requester: DataColumnsByRootRequester,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlocksByRangeRequestId {
/// Id to identify this attempt at a blocks_by_range request for `parent_request_id`
pub id: Id,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct BlobsByRangeRequestId {
/// Id to identify this attempt at a blobs_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct DataColumnsByRangeRequestId {
/// Id to identify this attempt at a data_columns_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}

/// Block components by range request for range sync. Includes an ID for downstream consumers to
/// handle retries and tie all their sub requests together.
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct ComponentsByRangeRequestId {
/// Each `RangeRequestId` may request the same data in a later retry. This Id identifies the
/// current attempt.
pub id: Id,
/// What sync component is issuing a components by range request and expecting data back
pub requester: RangeRequestId,
}

/// Range sync chain or backfill batch
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequestId {
RangeSync { chain_id: Id, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRootRequester {
Sampling(SamplingId),
Custody(CustodyId),
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum RangeRequester {
RangeSync { chain_id: u64, batch_id: Epoch },
BackfillSync { batch_id: Epoch },
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct SamplingId {
pub id: SamplingRequester,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ impl<T: BeaconChainTypes> Router<T> {
) {
let request_id = match request_id {
AppRequestId::Sync(sync_id) => match sync_id {
id @ SyncRequestId::RangeBlockAndBlobs { .. } => id,
id @ SyncRequestId::BlocksByRange { .. } => id,
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
other => {
crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other);
return;
Expand Down
Loading