Skip to content

Commit

Permalink
Make range sync chain Id sequential
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jan 27, 2025
1 parent a1b7d61 commit 2f23c1e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
19 changes: 8 additions & 11 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use rand::seq::SliceRandom;
use rand::Rng;
use slog::{crit, debug, o, warn};
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use std::hash::{Hash, Hasher};
use strum::IntoStaticStr;
use types::{Epoch, EthSpec, Hash256, Slot};

Expand Down Expand Up @@ -56,7 +55,7 @@ pub enum RemoveChain {
pub struct KeepChain;

/// A chain identifier
pub type ChainId = u64;
pub type ChainId = Id;
pub type BatchId = Epoch;

#[derive(Debug, Copy, Clone, IntoStaticStr)]
Expand Down Expand Up @@ -127,14 +126,9 @@ pub enum ChainSyncingState {
}

impl<T: BeaconChainTypes> SyncingChain<T> {
pub fn id(target_root: &Hash256, target_slot: &Slot) -> u64 {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
(target_root, target_slot).hash(&mut hasher);
hasher.finish()
}

#[allow(clippy::too_many_arguments)]
pub fn new(
id: Id,
start_epoch: Epoch,
target_head_slot: Slot,
target_head_root: Hash256,
Expand All @@ -145,8 +139,6 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let mut peers = FnvHashMap::default();
peers.insert(peer_id, Default::default());

let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);

SyncingChain {
id,
chain_type,
Expand All @@ -165,6 +157,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}

/// Returns true if this chain has the same target
pub fn has_same_target(&self, target_head_slot: Slot, target_head_root: Hash256) -> bool {
self.target_head_slot == target_head_slot && self.target_head_root == target_head_root
}

/// Check if the chain has peers from which to process batches.
pub fn available_peers(&self) -> usize {
self.peers.len()
Expand Down Expand Up @@ -1258,7 +1255,7 @@ impl<T: BeaconChainTypes> slog::KV for SyncingChain<T> {
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
use slog::Value;
serializer.emit_u64("id", self.id)?;
serializer.emit_u32("id", self.id)?;
Value::serialize(&self.start_epoch, record, "from", serializer)?;
Value::serialize(
&self.target_head_slot.epoch(T::EthSpec::slots_per_epoch()),
Expand Down
31 changes: 18 additions & 13 deletions beacon_node/network/src/sync/range_sync/chain_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::metrics;
use crate::sync::network_context::SyncNetworkContext;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use fnv::FnvHashMap;
use lighthouse_network::service::api_types::Id;
use lighthouse_network::PeerId;
use lighthouse_network::SyncInfo;
use slog::{crit, debug, error};
Expand All @@ -29,9 +30,9 @@ const MIN_FINALIZED_CHAIN_PROCESSED_EPOCHS: u64 = 10;
#[derive(Clone)]
pub enum RangeSyncState {
/// A finalized chain is being synced.
Finalized(u64),
Finalized(Id),
/// There are no finalized chains and we are syncing one more head chains.
Head(SmallVec<[u64; PARALLEL_HEAD_CHAINS]>),
Head(SmallVec<[Id; PARALLEL_HEAD_CHAINS]>),
/// There are no head or finalized chains and no long range sync is in progress.
Idle,
}
Expand Down Expand Up @@ -74,7 +75,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
if syncing_id == id {
// the finalized chain that was syncing was removed
debug_assert!(was_syncing && sync_type == RangeSyncType::Finalized);
let syncing_head_ids: SmallVec<[u64; PARALLEL_HEAD_CHAINS]> = self
let syncing_head_ids: SmallVec<[Id; PARALLEL_HEAD_CHAINS]> = self
.head_chains
.iter()
.filter(|(_id, chain)| chain.is_syncing())
Expand Down Expand Up @@ -355,7 +356,7 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
.collect::<Vec<_>>();
preferred_ids.sort_unstable();

let mut syncing_chains = SmallVec::<[u64; PARALLEL_HEAD_CHAINS]>::new();
let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new();
for (_, _, id) in preferred_ids {
let chain = self.head_chains.get_mut(&id).expect("known chain");
if syncing_chains.len() < PARALLEL_HEAD_CHAINS {
Expand Down Expand Up @@ -465,15 +466,17 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
sync_type: RangeSyncType,
network: &mut SyncNetworkContext<T>,
) {
let id = SyncingChain::<T>::id(&target_head_root, &target_head_slot);
let collection = if let RangeSyncType::Finalized = sync_type {
&mut self.finalized_chains
} else {
&mut self.head_chains
};
match collection.entry(id) {
Entry::Occupied(mut entry) => {
let chain = entry.get_mut();

match collection
.iter_mut()
.find(|(_, chain)| chain.has_same_target(target_head_slot, target_head_root))
{
Some((&id, chain)) => {
debug!(self.log, "Adding peer to known chain"; "peer_id" => %peer, "sync_type" => ?sync_type, &chain);
debug_assert_eq!(chain.target_head_root, target_head_root);
debug_assert_eq!(chain.target_head_slot, target_head_slot);
Expand All @@ -483,23 +486,25 @@ impl<T: BeaconChainTypes> ChainCollection<T> {
} else {
error!(self.log, "Chain removed after adding peer"; "chain" => id, "reason" => ?remove_reason);
}
let chain = entry.remove();
self.on_chain_removed(&id, chain.is_syncing(), sync_type);
let is_syncing = chain.is_syncing();
collection.remove(&id);
self.on_chain_removed(&id, is_syncing, sync_type);
}
}
Entry::Vacant(entry) => {
None => {
let peer_rpr = peer.to_string();
let id = network.next_id();
let new_chain = SyncingChain::new(
id,
start_epoch,
target_head_slot,
target_head_root,
peer,
sync_type.into(),
&self.log,
);
debug_assert_eq!(new_chain.get_id(), id);
debug!(self.log, "New chain added to sync"; "peer_id" => peer_rpr, "sync_type" => ?sync_type, &new_chain);
entry.insert(new_chain);
collection.insert(id, new_chain);
metrics::inc_counter_vec(&metrics::SYNCING_CHAINS_ADDED, &[sync_type.as_str()]);
self.update_metrics();
}
Expand Down

0 comments on commit 2f23c1e

Please sign in to comment.