diff --git a/helper_functions/src/misc.rs b/helper_functions/src/misc.rs index 16c0afab..da312e45 100644 --- a/helper_functions/src/misc.rs +++ b/helper_functions/src/misc.rs @@ -293,6 +293,25 @@ pub fn compute_subscribed_subnets( Ok((permutated_prefix..cutoff).map(|index| index % AttestationSubnetCount::U64)) } +pub fn next_subnet_subscription_epoch( + node_id: NodeId, + config: &Config, + current_epoch: Epoch, +) -> Result { + let current_subscribed_subnets = + compute_subscribed_subnets::

(node_id, config, current_epoch)?.collect_vec(); + + let mut epoch = current_epoch + 1; + + while compute_subscribed_subnets::

(node_id, config, epoch)?.collect_vec() + == current_subscribed_subnets + { + epoch += 1; + } + + Ok(epoch) +} + /// [`compute_time_at_slot`] and [`compute_timestamp_at_slot`]. /// /// The two functions do the same thing as long as [`GENESIS_SLOT`] is 0. @@ -578,7 +597,7 @@ mod tests { consts::{DOMAIN_BEACON_ATTESTER, FAR_FUTURE_EPOCH, GENESIS_EPOCH}, containers::Validator, }, - preset::Minimal, + preset::{Mainnet, Minimal}, }; use super::*; @@ -691,4 +710,36 @@ mod tests { compute_subscribed_subnets::(node_id, &config, epoch).ok(); } } + + #[test] + fn test_next_subnet_subscription_epoch() -> Result<()> { + let node_id = NodeId::from_u64(123_456); + let config = Config::mainnet(); + + for epoch in [100_000, 200_000, 300_000] { + let current_subscribed_subnets = + compute_subscribed_subnets::(node_id, &config, epoch)?.collect_vec(); + + let next_subscription_epoch = + next_subnet_subscription_epoch::(node_id, &config, epoch)?; + + assert_eq!( + current_subscribed_subnets, + compute_subscribed_subnets::( + node_id, + &config, + next_subscription_epoch - 1 + )? + .collect_vec(), + ); + + assert_ne!( + current_subscribed_subnets, + compute_subscribed_subnets::(node_id, &config, next_subscription_epoch)? + .collect_vec(), + ); + } + + Ok(()) + } } diff --git a/p2p/src/attestation_subnets.rs b/p2p/src/attestation_subnets.rs index 947b4248..f6456533 100644 --- a/p2p/src/attestation_subnets.rs +++ b/p2p/src/attestation_subnets.rs @@ -18,7 +18,6 @@ use crate::misc::{AttestationSubnetActions, BeaconCommitteeSubscription, SubnetP use AttestationSubnetState::{DiscoveringPeers, Irrelevant, Persistent, Subscribed}; -const DISCOVER_PEERS_IN_ADVANCE_SLOTS: u64 = 2; const SUBSCRIBE_IN_ADVANCE_SLOTS: u64 = 1; #[derive(Clone, Copy, Default, PartialEq, Eq)] @@ -35,24 +34,13 @@ pub enum AttestationSubnetState { Persistent { expiration: Slot }, } -impl AttestationSubnetState { - #[must_use] - pub fn max_expiration(self, other_expiration: Slot) -> Slot { - match self { - Irrelevant => other_expiration, - DiscoveringPeers { expiration } - | Subscribed { expiration } - | Persistent { expiration } => other_expiration.max(expiration), - } - } -} - #[derive(Clone, Copy)] pub struct AttestationSubnets

{ + active_discoveries: [Option; AttestationSubnetCount::USIZE], states: [AttestationSubnetState; AttestationSubnetCount::USIZE], // Needed to track if app is subscribed to persistent subnets, // and allow to subscribe to them mid epoch on app start. - initialized_persistent: bool, + persistent_subscriptions_expiration: Option, node_id: NodeId, phantom: PhantomData

, } @@ -60,8 +48,9 @@ pub struct AttestationSubnets

{ impl AttestationSubnets

{ pub fn new(node_id: NodeId) -> Self { Self { + active_discoveries: [None; AttestationSubnetCount::USIZE], states: [AttestationSubnetState::default(); AttestationSubnetCount::USIZE], - initialized_persistent: false, + persistent_subscriptions_expiration: None, node_id, phantom: PhantomData, } @@ -77,7 +66,7 @@ impl AttestationSubnets

{ let old = *self; if self.subscribe_to_all_if_needed(current_epoch) { - return Ok(self.actions(old)); + return Ok(self.actions(old, slot)); } // Advance subnet states to the current slot. @@ -94,25 +83,30 @@ impl AttestationSubnets

{ } } - if !self.initialized_persistent || misc::is_epoch_start::

(slot) { - self.initialized_persistent = true; + let persistent_subscriptions_expired = self + .persistent_subscriptions_expiration + .map(|expiration| expiration <= slot) + .unwrap_or(true); - let expiration = misc::compute_start_slot_at_epoch::

(current_epoch + 1); + if persistent_subscriptions_expired { + let expiration_epoch = + misc::next_subnet_subscription_epoch::

(self.node_id, config, current_epoch)?; + let expiration = misc::compute_start_slot_at_epoch::

(expiration_epoch); for subnet_id in misc::compute_subscribed_subnets::

(self.node_id, config, current_epoch)? { let position = usize::try_from(subnet_id)?; - self.states[position] = Persistent { - expiration: self.states[position].max_expiration(expiration), - }; + self.states[position] = Persistent { expiration }; } + + self.persistent_subscriptions_expiration = Some(expiration); } self.update_states(slot, subscriptions)?; - Ok(self.actions(old)) + Ok(self.actions(old, slot)) } pub fn update( @@ -124,46 +118,54 @@ impl AttestationSubnets

{ let old = *self; if self.subscribe_to_all_if_needed(current_epoch) { - return Ok(self.actions(old)); + return Ok(self.actions(old, current_slot)); } self.update_states(current_slot, subscriptions)?; - Ok(self.actions(old)) + Ok(self.actions(old, current_slot)) } - fn actions(self, old: Self) -> AttestationSubnetActions { - let new = self; + fn actions(&mut self, old: Self, slot: Slot) -> AttestationSubnetActions { + let discoveries = izip!(0.., self.states) + .filter_map(|(subnet_id, new_state)| { + let expiration = match new_state { + DiscoveringPeers { expiration } + | Subscribed { expiration } + | Persistent { expiration } => { + if let Some(active_discovery_expiration) = + self.active_discoveries[subnet_id] + { + if slot < active_discovery_expiration { + // Avoid making a new discovery request in subnet while previous peer discovery is not expired + return None; + } + } + + self.active_discoveries[subnet_id] = Some(expiration); - let discoveries = izip!(0.., old.states, new.states) - .filter(|(_, old_state, new_state)| old_state != new_state) - .filter_map(|(subnet_id, old_state, new_state)| { - let expiration = match (old_state, new_state) { - (_, Persistent { .. }) => None, - (_, DiscoveringPeers { expiration } | Subscribed { expiration }) => { Some(expiration) } - _ => return None, + Irrelevant => { + self.active_discoveries[subnet_id] = None; + + return None; + } }; Some(SubnetPeerDiscovery { - subnet_id, + subnet_id: subnet_id.try_into().expect("subnet id should fit into u64"), expiration, }) }) .collect(); - let enr = izip!(0.., old.states, new.states) + let enr = izip!(0.., old.states, self.states) .filter_map(|(subnet_id, old_state, new_state)| { let add_to_enr = match (old_state, new_state) { - ( - Irrelevant | DiscoveringPeers { .. } | Subscribed { .. }, - Persistent { .. }, - ) => true, - ( - Persistent { .. }, - Irrelevant | DiscoveringPeers { .. } | Subscribed { .. }, - ) => false, + (Persistent { .. }, Persistent { .. }) => return None, + (_, Persistent { .. }) => true, + (Persistent { .. }, _) => false, _ => return None, }; @@ -171,7 +173,7 @@ impl AttestationSubnets

{ }) .collect(); - let subscriptions = izip!(0.., old.states, new.states) + let subscriptions = izip!(0.., old.states, self.states) .filter_map(|(subnet_id, old_state, new_state)| { let subscribe = match (old_state, new_state) { ( @@ -222,16 +224,6 @@ impl AttestationSubnets

{ is_aggregator, } = subscription; - let in_advance_slots = if is_aggregator { - SUBSCRIBE_IN_ADVANCE_SLOTS - } else { - DISCOVER_PEERS_IN_ADVANCE_SLOTS - }; - - if current_slot + in_advance_slots < slot || current_slot > slot { - continue; - } - let subnet_id: usize = misc::compute_subnet_for_attestation::

( committees_at_slot, slot, @@ -241,29 +233,42 @@ impl AttestationSubnets

{ let subnet_state = &mut self.states[subnet_id]; - *subnet_state = match subnet_state { - // If persistent subscription exists at current slot, do not change anything - Persistent { expiration } => Persistent { - expiration: *expiration, - }, - // If validator is aggregator, subscribe to subnet or extend existing subscription - // (except if persistent subscription already exists) - Subscribed { expiration } if is_aggregator => Subscribed { - expiration: (*expiration).max(slot + 1), - }, - // Ignore DiscoveringPeers expiration for the new subscription - Irrelevant | DiscoveringPeers { .. } if is_aggregator => Subscribed { - expiration: slot + 1, - }, - // If validator is not an aggregator, and subscription exists at current slot, do not change anything - Subscribed { expiration } => Subscribed { - expiration: *expiration, - }, - // If validator is not an aggregator, discover peers in subnet - Irrelevant | DiscoveringPeers { .. } => DiscoveringPeers { - expiration: subnet_state.max_expiration(slot + 1), - }, - }; + if is_aggregator + && current_slot + SUBSCRIBE_IN_ADVANCE_SLOTS >= slot + && current_slot <= slot + { + match subnet_state { + // Beacon committee subscriptions have no effect on persistent subnet subscriptions + Persistent { .. } => {} + // If there is a subnet subscription at current slot, extend its expiration + Subscribed { expiration } => { + *subnet_state = Subscribed { + expiration: (*expiration).max(slot + 1), + }; + } + // Make a new subnet subscription + Irrelevant | DiscoveringPeers { .. } => { + *subnet_state = Subscribed { + expiration: slot + 1, + }; + } + }; + } else { + // Discover peers instantly after processing beacon committee subscriptions + match subnet_state { + DiscoveringPeers { expiration } => { + *subnet_state = DiscoveringPeers { + expiration: (*expiration).max(slot + 1), + }; + } + Irrelevant => { + *subnet_state = DiscoveringPeers { + expiration: slot + 1, + }; + } + _ => {} + }; + } } Ok(())