Skip to content

Commit

Permalink
Changes to attestation subnet discovery:
Browse files Browse the repository at this point in the history
* Avoid making new discovery requests in subnets while previous peer discoveries are not expired

* Make peer discovery requests instantly after receiving beacon committee subscriptions

* Make peer discovery requests for persistent subnet subscriptions only once per subscription period
  • Loading branch information
povi committed Dec 19, 2024
1 parent 6a409da commit e5f719c
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 79 deletions.
53 changes: 52 additions & 1 deletion helper_functions/src/misc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ pub fn compute_subscribed_subnets<P: Preset>(
Ok((permutated_prefix..cutoff).map(|index| index % AttestationSubnetCount::U64))
}

pub fn next_subnet_subscription_epoch<P: Preset>(
node_id: NodeId,
config: &Config,
current_epoch: Epoch,
) -> Result<Epoch> {
let current_subscribed_subnets =
compute_subscribed_subnets::<P>(node_id, config, current_epoch)?.collect_vec();

let mut epoch = current_epoch + 1;

while compute_subscribed_subnets::<P>(node_id, config, epoch)?.collect_vec()
== current_subscribed_subnets
{
epoch += 1;
}

Ok(epoch)
}

/// [`compute_time_at_slot`] and [`compute_timestamp_at_slot`].
///
/// The two functions do the same thing as long as [`GENESIS_SLOT`] is 0.
Expand Down Expand Up @@ -578,7 +597,7 @@ mod tests {
consts::{DOMAIN_BEACON_ATTESTER, FAR_FUTURE_EPOCH, GENESIS_EPOCH},
containers::Validator,
},
preset::Minimal,
preset::{Mainnet, Minimal},
};

use super::*;
Expand Down Expand Up @@ -691,4 +710,36 @@ mod tests {
compute_subscribed_subnets::<Minimal>(node_id, &config, epoch).ok();
}
}

#[test]
fn test_next_subnet_subscription_epoch() -> Result<()> {
let node_id = NodeId::from_u64(123_456);
let config = Config::mainnet();

for epoch in [100_000, 200_000, 300_000] {
let current_subscribed_subnets =
compute_subscribed_subnets::<Mainnet>(node_id, &config, epoch)?.collect_vec();

let next_subscription_epoch =
next_subnet_subscription_epoch::<Mainnet>(node_id, &config, epoch)?;

assert_eq!(
current_subscribed_subnets,
compute_subscribed_subnets::<Mainnet>(
node_id,
&config,
next_subscription_epoch - 1
)?
.collect_vec(),
);

assert_ne!(
current_subscribed_subnets,
compute_subscribed_subnets::<Mainnet>(node_id, &config, next_subscription_epoch)?
.collect_vec(),
);
}

Ok(())
}
}
161 changes: 83 additions & 78 deletions p2p/src/attestation_subnets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::misc::{AttestationSubnetActions, BeaconCommitteeSubscription, SubnetP

use AttestationSubnetState::{DiscoveringPeers, Irrelevant, Persistent, Subscribed};

const DISCOVER_PEERS_IN_ADVANCE_SLOTS: u64 = 2;
const SUBSCRIBE_IN_ADVANCE_SLOTS: u64 = 1;

#[derive(Clone, Copy, Default, PartialEq, Eq)]
Expand All @@ -35,33 +34,23 @@ pub enum AttestationSubnetState {
Persistent { expiration: Slot },
}

impl AttestationSubnetState {
#[must_use]
pub fn max_expiration(self, other_expiration: Slot) -> Slot {
match self {
Irrelevant => other_expiration,
DiscoveringPeers { expiration }
| Subscribed { expiration }
| Persistent { expiration } => other_expiration.max(expiration),
}
}
}

#[derive(Clone, Copy)]
pub struct AttestationSubnets<P> {
active_discoveries: [Option<Slot>; AttestationSubnetCount::USIZE],
states: [AttestationSubnetState; AttestationSubnetCount::USIZE],
// Needed to track if app is subscribed to persistent subnets,
// and allow to subscribe to them mid epoch on app start.
initialized_persistent: bool,
persistent_subscriptions_expiration: Option<Slot>,
node_id: NodeId,
phantom: PhantomData<P>,
}

impl<P: Preset> AttestationSubnets<P> {
pub fn new(node_id: NodeId) -> Self {
Self {
active_discoveries: [None; AttestationSubnetCount::USIZE],
states: [AttestationSubnetState::default(); AttestationSubnetCount::USIZE],
initialized_persistent: false,
persistent_subscriptions_expiration: None,
node_id,
phantom: PhantomData,
}
Expand All @@ -77,7 +66,7 @@ impl<P: Preset> AttestationSubnets<P> {
let old = *self;

if self.subscribe_to_all_if_needed(current_epoch) {
return Ok(self.actions(old));
return Ok(self.actions(old, slot));
}

// Advance subnet states to the current slot.
Expand All @@ -94,25 +83,30 @@ impl<P: Preset> AttestationSubnets<P> {
}
}

if !self.initialized_persistent || misc::is_epoch_start::<P>(slot) {
self.initialized_persistent = true;
let persistent_subscriptions_expired = self
.persistent_subscriptions_expiration
.map(|expiration| expiration <= slot)
.unwrap_or(true);

let expiration = misc::compute_start_slot_at_epoch::<P>(current_epoch + 1);
if persistent_subscriptions_expired {
let expiration_epoch =
misc::next_subnet_subscription_epoch::<P>(self.node_id, config, current_epoch)?;
let expiration = misc::compute_start_slot_at_epoch::<P>(expiration_epoch);

for subnet_id in
misc::compute_subscribed_subnets::<P>(self.node_id, config, current_epoch)?
{
let position = usize::try_from(subnet_id)?;

self.states[position] = Persistent {
expiration: self.states[position].max_expiration(expiration),
};
self.states[position] = Persistent { expiration };
}

self.persistent_subscriptions_expiration = Some(expiration);
}

self.update_states(slot, subscriptions)?;

Ok(self.actions(old))
Ok(self.actions(old, slot))
}

pub fn update(
Expand All @@ -124,54 +118,62 @@ impl<P: Preset> AttestationSubnets<P> {
let old = *self;

if self.subscribe_to_all_if_needed(current_epoch) {
return Ok(self.actions(old));
return Ok(self.actions(old, current_slot));
}

self.update_states(current_slot, subscriptions)?;

Ok(self.actions(old))
Ok(self.actions(old, current_slot))
}

fn actions(self, old: Self) -> AttestationSubnetActions {
let new = self;
fn actions(&mut self, old: Self, slot: Slot) -> AttestationSubnetActions {
let discoveries = izip!(0.., self.states)
.filter_map(|(subnet_id, new_state)| {
let expiration = match new_state {
DiscoveringPeers { expiration }
| Subscribed { expiration }
| Persistent { expiration } => {
if let Some(active_discovery_expiration) =
self.active_discoveries[subnet_id]
{
if slot < active_discovery_expiration {
// Avoid making a new discovery request in subnet while previous peer discovery is not expired
return None;
}
}

self.active_discoveries[subnet_id] = Some(expiration);

let discoveries = izip!(0.., old.states, new.states)
.filter(|(_, old_state, new_state)| old_state != new_state)
.filter_map(|(subnet_id, old_state, new_state)| {
let expiration = match (old_state, new_state) {
(_, Persistent { .. }) => None,
(_, DiscoveringPeers { expiration } | Subscribed { expiration }) => {
Some(expiration)
}
_ => return None,
Irrelevant => {
self.active_discoveries[subnet_id] = None;

return None;
}
};

Some(SubnetPeerDiscovery {
subnet_id,
subnet_id: subnet_id.try_into().expect("subnet id should fit into u64"),
expiration,
})
})
.collect();

let enr = izip!(0.., old.states, new.states)
let enr = izip!(0.., old.states, self.states)
.filter_map(|(subnet_id, old_state, new_state)| {
let add_to_enr = match (old_state, new_state) {
(
Irrelevant | DiscoveringPeers { .. } | Subscribed { .. },
Persistent { .. },
) => true,
(
Persistent { .. },
Irrelevant | DiscoveringPeers { .. } | Subscribed { .. },
) => false,
(Persistent { .. }, Persistent { .. }) => return None,
(_, Persistent { .. }) => true,
(Persistent { .. }, _) => false,
_ => return None,
};

Some((subnet_id, add_to_enr))
})
.collect();

let subscriptions = izip!(0.., old.states, new.states)
let subscriptions = izip!(0.., old.states, self.states)
.filter_map(|(subnet_id, old_state, new_state)| {
let subscribe = match (old_state, new_state) {
(
Expand Down Expand Up @@ -222,16 +224,6 @@ impl<P: Preset> AttestationSubnets<P> {
is_aggregator,
} = subscription;

let in_advance_slots = if is_aggregator {
SUBSCRIBE_IN_ADVANCE_SLOTS
} else {
DISCOVER_PEERS_IN_ADVANCE_SLOTS
};

if current_slot + in_advance_slots < slot || current_slot > slot {
continue;
}

let subnet_id: usize = misc::compute_subnet_for_attestation::<P>(
committees_at_slot,
slot,
Expand All @@ -241,29 +233,42 @@ impl<P: Preset> AttestationSubnets<P> {

let subnet_state = &mut self.states[subnet_id];

*subnet_state = match subnet_state {
// If persistent subscription exists at current slot, do not change anything
Persistent { expiration } => Persistent {
expiration: *expiration,
},
// If validator is aggregator, subscribe to subnet or extend existing subscription
// (except if persistent subscription already exists)
Subscribed { expiration } if is_aggregator => Subscribed {
expiration: (*expiration).max(slot + 1),
},
// Ignore DiscoveringPeers expiration for the new subscription
Irrelevant | DiscoveringPeers { .. } if is_aggregator => Subscribed {
expiration: slot + 1,
},
// If validator is not an aggregator, and subscription exists at current slot, do not change anything
Subscribed { expiration } => Subscribed {
expiration: *expiration,
},
// If validator is not an aggregator, discover peers in subnet
Irrelevant | DiscoveringPeers { .. } => DiscoveringPeers {
expiration: subnet_state.max_expiration(slot + 1),
},
};
if is_aggregator
&& current_slot + SUBSCRIBE_IN_ADVANCE_SLOTS >= slot
&& current_slot <= slot
{
match subnet_state {
// Beacon committee subscriptions have no effect on persistent subnet subscriptions
Persistent { .. } => {}
// If there is a subnet subscription at current slot, extend its expiration
Subscribed { expiration } => {
*subnet_state = Subscribed {
expiration: (*expiration).max(slot + 1),
};
}
// Make a new subnet subscription
Irrelevant | DiscoveringPeers { .. } => {
*subnet_state = Subscribed {
expiration: slot + 1,
};
}
};
} else {
// Discover peers instantly after processing beacon committee subscriptions
match subnet_state {
DiscoveringPeers { expiration } => {
*subnet_state = DiscoveringPeers {
expiration: (*expiration).max(slot + 1),
};
}
Irrelevant => {
*subnet_state = DiscoveringPeers {
expiration: slot + 1,
};
}
_ => {}
};
}
}

Ok(())
Expand Down

0 comments on commit e5f719c

Please sign in to comment.