Skip to content

Commit

Permalink
add signature collector cleanup (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
dknopik authored Jan 20, 2025
1 parent 0232bf5 commit 49f69c0
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,18 @@ impl Client {

// Create the processor-adjacent managers
let signature_collector =
Arc::new(SignatureCollectorManager::new(processor_senders.clone()));
let Ok(qbft_manager) =
SignatureCollectorManager::new(processor_senders.clone(), slot_clock.clone())
.map_err(|e| format!("Unable to initialize signature collector manager: {e:?}"))?;

let qbft_manager =
QbftManager::new(processor_senders.clone(), OperatorId(1), slot_clock.clone())
else {
return Err("Unable to initialize qbft manager".into());
};
.map_err(|e| format!("Unable to initialize qbft manager: {e:?}"))?;

let validator_store = Arc::new(AnchorValidatorStore::<_, E>::new(
signature_collector,
qbft_manager,
slashing_protection,
slot_clock.clone(),
spec.clone(),
genesis_validators_root,
OperatorId(123),
Expand Down
1 change: 1 addition & 0 deletions anchor/signature_collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
[dependencies]
dashmap = { workspace = true }
processor = { workspace = true }
slot_clock = { workspace = true }
ssv_types = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
Expand Down
61 changes: 51 additions & 10 deletions anchor/signature_collector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use dashmap::DashMap;
use processor::{DropOnFinish, Senders, WorkItem};
use slot_clock::SlotClock;
use ssv_types::{ClusterId, OperatorId};
use std::collections::{hash_map, HashMap};
use std::mem;
Expand All @@ -8,24 +9,43 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::sleep;
use tracing::error;
use types::{Hash256, SecretKey, Signature};
use types::{Hash256, SecretKey, Signature, Slot};

const COLLECTOR_NAME: &str = "signature_collector";
const COLLECTOR_MESSAGE_NAME: &str = "signature_collector_message";
const COLLECTOR_CLEANER_NAME: &str = "signature_collector_cleaner";
const SIGNER_NAME: &str = "signer";

pub struct SignatureCollectorManager {
/// number of slots to keep before the current slot
const SIGNATURE_COLLECTOR_RETAIN_SLOTS: u64 = 1;

struct SignatureCollector {
sender: UnboundedSender<CollectorMessage>,
for_slot: Slot,
}

pub struct SignatureCollectorManager<T: SlotClock> {
processor: Senders,
signature_collectors: DashMap<Hash256, UnboundedSender<CollectorMessage>>,
slot_clock: T,
signature_collectors: DashMap<Hash256, SignatureCollector>,
}

impl SignatureCollectorManager {
pub fn new(processor: Senders) -> Self {
Self {
impl<T: SlotClock + 'static> SignatureCollectorManager<T> {
pub fn new(processor: Senders, slot_clock: T) -> Result<Arc<Self>, CollectionError> {
let manager = Arc::new(Self {
processor,
slot_clock,
signature_collectors: DashMap::new(),
}
});

manager
.processor
.permitless
.send_async(Arc::clone(&manager).cleaner(), COLLECTOR_CLEANER_NAME)?;

Ok(manager)
}

pub async fn sign_and_collect(
Expand Down Expand Up @@ -95,25 +115,46 @@ impl SignatureCollectorManager {

fn get_or_spawn(&self, request: SignatureRequest) -> UnboundedSender<CollectorMessage> {
match self.signature_collectors.entry(request.signing_root) {
dashmap::Entry::Occupied(entry) => entry.get().clone(),
dashmap::Entry::Occupied(entry) => entry.get().sender.clone(),
dashmap::Entry::Vacant(entry) => {
let (tx, rx) = mpsc::unbounded_channel();
let tx = entry.insert(tx);
entry.insert(SignatureCollector {
sender: tx.clone(),
for_slot: request.slot,
});
let _ = self
.processor
.permitless
.send_async(Box::pin(signature_collector(rx, request)), COLLECTOR_NAME);
tx.clone()
tx
}
}
}

async fn cleaner(self: Arc<Self>) {
while !self.processor.permitless.is_closed() {
sleep(
self.slot_clock
.duration_to_next_slot()
.unwrap_or(self.slot_clock.slot_duration()),
)
.await;
let Some(slot) = self.slot_clock.now() else {
continue;
};
let cutoff = slot.saturating_sub(SIGNATURE_COLLECTOR_RETAIN_SLOTS);
self.signature_collectors
.retain(|_, collector| collector.for_slot >= cutoff)
}
}
}

#[derive(Debug, Clone)]
pub struct SignatureRequest {
pub cluster_id: ClusterId,
pub signing_root: Hash256,
pub threshold: u64,
pub slot: Slot,
}

pub struct CollectorMessage {
Expand Down
53 changes: 41 additions & 12 deletions anchor/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use ssv_types::message::{
use ssv_types::{Cluster, OperatorId, ValidatorMetadata};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, warn};
use types::attestation::Attestation;
use types::beacon_block::BeaconBlock;
Expand All @@ -29,6 +30,7 @@ use types::signed_aggregate_and_proof::SignedAggregateAndProof;
use types::signed_beacon_block::SignedBeaconBlock;
use types::signed_contribution_and_proof::SignedContributionAndProof;
use types::signed_voluntary_exit::SignedVoluntaryExit;
use types::slot_data::SlotData;
use types::slot_epoch::{Epoch, Slot};
use types::sync_committee_contribution::SyncCommitteeContribution;
use types::sync_committee_message::SyncCommitteeMessage;
Expand Down Expand Up @@ -62,20 +64,22 @@ struct InitializedCluster {

pub struct AnchorValidatorStore<T: SlotClock + 'static, E: EthSpec> {
clusters: DashMap<PublicKeyBytes, InitializedCluster>,
signature_collector: Arc<SignatureCollectorManager>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T, E>>,
slashing_protection: SlashingDatabase,
slashing_protection_last_prune: Mutex<Epoch>,
slot_clock: T,
spec: Arc<ChainSpec>,
genesis_validators_root: Hash256,
operator_id: OperatorId,
}

impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
pub fn new(
signature_collector: Arc<SignatureCollectorManager>,
signature_collector: Arc<SignatureCollectorManager<T>>,
qbft_manager: Arc<QbftManager<T, E>>,
slashing_protection: SlashingDatabase,
slot_clock: T,
spec: Arc<ChainSpec>,
genesis_validators_root: Hash256,
operator_id: OperatorId,
Expand All @@ -86,6 +90,7 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
qbft_manager,
slashing_protection,
slashing_protection_last_prune: Mutex::new(Epoch::new(0)),
slot_clock,
spec,
genesis_validators_root,
operator_id,
Expand Down Expand Up @@ -132,6 +137,7 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
&self,
cluster: InitializedCluster,
signing_root: Hash256,
for_slot: Slot,
) -> Result<Signature, Error> {
let collector = self.signature_collector.sign_and_collect(
SignatureRequest {
Expand All @@ -143,6 +149,7 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
.safe_mul(2)
.and_then(|x| x.safe_add(1))
.map_err(SpecificError::from)?,
slot: for_slot,
},
self.operator_id,
cluster.decrypted_key_share,
Expand Down Expand Up @@ -237,7 +244,7 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {

let signing_root = block.signing_root(domain_hash);
let signature = self
.collect_signature(self.cluster(validator_pubkey)?, signing_root)
.collect_signature(self.cluster(validator_pubkey)?, signing_root, block.slot())
.await?;
Ok(SignedBeaconBlock::from_block(block, signature))
}
Expand Down Expand Up @@ -272,7 +279,7 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {

let domain = self.get_domain(epoch, Domain::SyncCommittee);
let signing_root = data.block_root.signing_root(domain);
let signature = self.collect_signature(cluster, signing_root).await?;
let signature = self.collect_signature(cluster, signing_root, slot).await?;

Ok(SyncCommitteeMessage {
slot,
Expand Down Expand Up @@ -354,13 +361,14 @@ impl<T: SlotClock, E: EthSpec> AnchorValidatorStore<T, E> {
.map(|contribution| {
let cluster = cluster.clone();
async move {
let slot = contribution.contribution.slot;
let message = ContributionAndProof {
aggregator_index,
contribution: contribution.contribution,
selection_proof: contribution.selection_proof_sig,
};
let signing_root = message.signing_root(domain_hash);
self.collect_signature(cluster, signing_root)
self.collect_signature(cluster, signing_root, slot)
.await
.map(|signature| SignedContributionAndProof { message, signature })
}
Expand Down Expand Up @@ -500,8 +508,12 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {
) -> Result<Signature, Error> {
let domain_hash = self.get_domain(signing_epoch, Domain::Randao);
let signing_root = signing_epoch.signing_root(domain_hash);
self.collect_signature(self.cluster(validator_pubkey)?, signing_root)
.await
self.collect_signature(
self.cluster(validator_pubkey)?,
signing_root,
signing_epoch.end_slot(E::slots_per_epoch()),
)
.await
}

fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64) {
Expand Down Expand Up @@ -620,7 +632,9 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {
)?;

let signing_root = attestation.data().signing_root(domain_hash);
let signature = self.collect_signature(cluster, signing_root).await?;
let signature = self
.collect_signature(cluster, signing_root, attestation.data().slot)
.await?;
attestation
.add_signature(&signature, validator_committee_position)
.map_err(Error::UnableToSignAttestation)?;
Expand All @@ -639,15 +653,28 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {

async fn sign_validator_registration_data(
&self,
validator_registration_data: ValidatorRegistrationData,
mut validator_registration_data: ValidatorRegistrationData,
) -> Result<SignedValidatorRegistrationData, Error> {
let domain_hash = self.spec.get_builder_domain();
let signing_root = validator_registration_data.signing_root(domain_hash);

// SSV always uses the start of the current epoch, so we need to convert to that
let epoch = self
.slot_clock
.slot_of(Duration::from_secs(validator_registration_data.timestamp))
.unwrap_or(self.spec.genesis_slot)
.epoch(E::slots_per_epoch());
let sign_slot = epoch.start_slot(E::slots_per_epoch());
let validity_slot = epoch.end_slot(E::slots_per_epoch());
if let Some(duration) = self.slot_clock.start_of(sign_slot) {
validator_registration_data.timestamp = duration.as_secs();
}

let signature = self
.collect_signature(
self.cluster(validator_registration_data.pubkey)?,
signing_root,
validity_slot,
)
.await?;

Expand Down Expand Up @@ -710,7 +737,9 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {

let domain_hash = self.get_domain(signing_epoch, Domain::AggregateAndProof);
let signing_root = message.signing_root(domain_hash);
let signature = self.collect_signature(cluster, signing_root).await?;
let signature = self
.collect_signature(cluster, signing_root, message.aggregate().get_slot())
.await?;

Ok(SignedAggregateAndProof::from_aggregate_and_proof(
message, signature,
Expand All @@ -726,7 +755,7 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {
let domain_hash = self.get_domain(epoch, Domain::SelectionProof);
let signing_root = slot.signing_root(domain_hash);

self.collect_signature(self.cluster(validator_pubkey)?, signing_root)
self.collect_signature(self.cluster(validator_pubkey)?, signing_root, slot)
.await
.map(SelectionProof::from)
}
Expand All @@ -745,7 +774,7 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {
}
.signing_root(domain_hash);

self.collect_signature(self.cluster(*validator_pubkey)?, signing_root)
self.collect_signature(self.cluster(*validator_pubkey)?, signing_root, slot)
.await
.map(SyncSelectionProof::from)
}
Expand Down

0 comments on commit 49f69c0

Please sign in to comment.