Skip to content

Commit

Permalink
Subscribe to peerdas topics on Fulu fork
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jan 25, 2025
1 parent a1b7d61 commit 385b061
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 55 deletions.
8 changes: 7 additions & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,17 @@ impl<E: EthSpec> Network<E> {
}

// Subscribe to core topics for the new fork
for kind in fork_core_topics::<E>(&new_fork, &self.fork_context.spec) {
for kind in fork_core_topics::<E>(
&new_fork,
&self.fork_context.spec,
&self.network_globals.topic_config(),
) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}

// TODO(das): unsubscribe from blob topics at the Fulu fork

// Register the new topics for metrics
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A collection of variables that are accessible outside of the network thread itself.
use super::TopicConfig;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV3};
use crate::types::{BackFillState, SyncState};
Expand Down Expand Up @@ -183,6 +184,14 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect::<Vec<_>>()
}

/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn topic_config(&self) -> TopicConfig {
TopicConfig {
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
sampling_subnets: self.sampling_subnets.iter().copied().collect(),
}
}

/// TESTING ONLY. Build a dummy NetworkGlobals instance.
pub fn new_test_globals(
trusted_peers: Vec<PeerId>,
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
};
43 changes: 36 additions & 7 deletions beacon_node/lighthouse_network/src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,17 @@ pub const LIGHT_CLIENT_GOSSIP_TOPICS: [GossipKind; 2] = [
GossipKind::LightClientOptimisticUpdate,
];

pub struct TopicConfig {
pub subscribe_all_data_column_subnets: bool,
pub sampling_subnets: Vec<DataColumnSubnetId>,
}

/// Returns the core topics associated with each fork that are new to the previous fork
pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> Vec<GossipKind> {
pub fn fork_core_topics<E: EthSpec>(
fork_name: &ForkName,
spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> {
match fork_name {
ForkName::Base => BASE_CORE_TOPICS.to_vec(),
ForkName::Altair => ALTAIR_CORE_TOPICS.to_vec(),
Expand All @@ -64,7 +73,21 @@ pub fn fork_core_topics<E: EthSpec>(fork_name: &ForkName, spec: &ChainSpec) -> V
}
electra_blob_topics
}
ForkName::Fulu => vec![],
ForkName::Fulu => {
let mut topics = vec![];
if topic_config.subscribe_all_data_column_subnets {
for column_subnet in 0..spec.data_column_sidecar_subnet_count {
topics.push(GossipKind::DataColumnSidecar(DataColumnSubnetId::new(
column_subnet,
)));
}
} else {
for column_subnet in &topic_config.sampling_subnets {
topics.push(GossipKind::DataColumnSidecar(*column_subnet));
}
}
topics
}
}
}

Expand All @@ -84,10 +107,11 @@ pub fn attestation_sync_committee_topics<E: EthSpec>() -> impl Iterator<Item = G
pub fn core_topics_to_subscribe<E: EthSpec>(
mut current_fork: ForkName,
spec: &ChainSpec,
topic_config: &TopicConfig,
) -> Vec<GossipKind> {
let mut topics = fork_core_topics::<E>(&current_fork, spec);
let mut topics = fork_core_topics::<E>(&current_fork, spec, topic_config);
while let Some(previous_fork) = current_fork.previous_fork() {
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec);
let previous_fork_topics = fork_core_topics::<E>(&previous_fork, spec, topic_config);
topics.extend(previous_fork_topics);
current_fork = previous_fork;
}
Expand Down Expand Up @@ -475,16 +499,21 @@ mod tests {
type E = MainnetEthSpec;
let spec = E::default_spec();
let mut all_topics = Vec::new();
let mut electra_core_topics = fork_core_topics::<E>(&ForkName::Electra, &spec);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec);
let topic_config = TopicConfig {
subscribe_all_data_column_subnets: false,
sampling_subnets: vec![],
};
let mut electra_core_topics =
fork_core_topics::<E>(&ForkName::Electra, &spec, &topic_config);
let mut deneb_core_topics = fork_core_topics::<E>(&ForkName::Deneb, &spec, &topic_config);
all_topics.append(&mut electra_core_topics);
all_topics.append(&mut deneb_core_topics);
all_topics.extend(CAPELLA_CORE_TOPICS);
all_topics.extend(ALTAIR_CORE_TOPICS);
all_topics.extend(BASE_CORE_TOPICS);

let latest_fork = *ForkName::list_all().last().unwrap();
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec);
let core_topics = core_topics_to_subscribe::<E>(latest_fork, &spec, &topic_config);
// Need to check all the topics exist in an order independent manner
for topic in all_topics {
assert!(core_topics.contains(&topic));
Expand Down
49 changes: 4 additions & 45 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId,
Unsigned, ValidatorSubscription,
};

mod tests;
Expand Down Expand Up @@ -181,8 +181,6 @@ pub struct NetworkService<T: BeaconChainTypes> {
next_fork_subscriptions: Pin<Box<OptionFuture<Sleep>>>,
/// A delay that expires when we need to unsubscribe from old fork topics.
next_unsubscribe: Pin<Box<OptionFuture<Sleep>>>,
/// Subscribe to all the data column subnets.
subscribe_all_data_column_subnets: bool,
/// Subscribe to all the subnets once synced.
subscribe_all_subnets: bool,
/// Shutdown beacon node after sync is complete.
Expand Down Expand Up @@ -348,7 +346,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
next_fork_update,
next_fork_subscriptions,
next_unsubscribe,
subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets,
subscribe_all_subnets: config.subscribe_all_subnets,
shutdown_after_sync: config.shutdown_after_sync,
metrics_enabled: config.metrics_enabled,
Expand Down Expand Up @@ -716,6 +713,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
for topic_kind in core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.topic_config(),
) {
for fork_digest in self.required_gossip_fork_digests() {
let topic = GossipTopic::new(
Expand Down Expand Up @@ -750,15 +748,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

// TODO(das): This is added here for the purpose of testing, *without* having to
// activate Electra. This should happen as part of the Electra upgrade and we should
// move the subscription logic once it's ready to rebase PeerDAS on Electra, or if
// we decide to activate via the soft fork route:
// https://github.com/sigp/lighthouse/pull/5899
if self.fork_context.spec.is_peer_das_scheduled() {
self.subscribe_to_peer_das_topics(&mut subscribed_topics);
}

// If we are to subscribe to all subnets we do it here
if self.subscribe_all_subnets {
for subnet_id in 0..<<T as BeaconChainTypes>::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() {
Expand Down Expand Up @@ -805,37 +794,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec<GossipTopic>) {
if self.subscribe_all_data_column_subnets {
for column_subnet in 0..self.fork_context.spec.data_column_sidecar_subnet_count {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind =
Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into();
let topic =
GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
} else {
for column_subnet in &self.network_globals.sampling_subnets {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic =
GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
}
}

/// Handle a message sent to the network service.
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {
Expand Down Expand Up @@ -951,6 +909,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let core_topics = core_topics_to_subscribe::<T::EthSpec>(
self.fork_context.current_fork(),
&self.fork_context.spec,
&self.network_globals.topic_config(),
);
let core_topics: HashSet<&GossipKind> = HashSet::from_iter(&core_topics);
let subscriptions = self.network_globals.gossipsub_subscriptions.read();
Expand Down

0 comments on commit 385b061

Please sign in to comment.