Skip to content

Commit

Permalink
Merge pull request #2303 from input-output-hk/ensemble/2222/aggregato…
Browse files Browse the repository at this point in the history
…r/unaivalable_stake_distribution_in_epoch_service

Fix mithril-aggregator genesis bootstrap flakiness in e2e tests
  • Loading branch information
Alenar authored Feb 13, 2025
2 parents 31caf35 + 98fcb5b commit 562c26b
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 70 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.7.2"
version = "0.7.3"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ impl DependenciesBuilder {
let epoch_settings_storer = self.get_epoch_settings_store().await?;
let chain_observer = self.get_chain_observer().await?;
let era_checker = self.get_era_checker().await?;
let stake_distribution_service = self.get_stake_distribution_service().await?;
let stake_store = self.get_stake_store().await?;
let epoch_settings = self.configuration.get_epoch_settings_configuration();
let allowed_discriminants = self
.configuration
Expand All @@ -23,7 +23,7 @@ impl DependenciesBuilder {
verification_key_store,
chain_observer,
era_checker,
stake_distribution_service,
stake_store,
),
allowed_discriminants,
self.root_logger(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ impl DependenciesBuilder {
message: "cannot build aggregator runner: no epoch returned.".to_string(),
error: None,
})?;
let retrieval_epoch = current_epoch
.offset_to_signer_retrieval_epoch()
.map_err(|e| DependenciesBuilderError::Initialization {
message: format!("cannot create aggregator runner: failed to offset current epoch '{current_epoch}' to signer retrieval epoch."),
error: Some(e.into()),
})?;

{
// Temporary fix, should be removed
Expand All @@ -136,11 +142,11 @@ impl DependenciesBuilder {
let epoch_settings_configuration = self.configuration.get_epoch_settings_configuration();
debug!(
logger,
"Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {current_epoch}";
"Handle discrepancies at startup of epoch settings store, will record epoch settings from the configuration for epoch {retrieval_epoch}";
"epoch_settings_configuration" => ?epoch_settings_configuration,
);
epoch_settings_store
.handle_discrepancies_at_startup(current_epoch, &epoch_settings_configuration)
.handle_discrepancies_at_startup(retrieval_epoch, &epoch_settings_configuration)
.await
.map_err(|e| DependenciesBuilderError::Initialization {
message: "can not create aggregator runner".to_string(),
Expand Down
22 changes: 20 additions & 2 deletions mithril-aggregator/src/http_server/routes/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ async fn get_aggregator_status_message(
let total_next_signers = epoch_service.next_signers()?.len();
let total_stakes_signers = epoch_service.total_stakes_signers()?;
let total_next_stakes_signers = epoch_service.total_next_stakes_signers()?;
let total_cardano_spo = epoch_service.total_spo()?;
let total_cardano_stake = epoch_service.total_stake()?;
let total_cardano_spo = epoch_service.total_spo()?.unwrap_or_default();
let total_cardano_stake = epoch_service.total_stake()?.unwrap_or_default();

let message = AggregatorStatusMessage {
epoch,
Expand Down Expand Up @@ -232,6 +232,24 @@ mod tests {
);
}

#[tokio::test]
async fn total_cardano_stakes_and_spo_are_0_if_none_in_epoch_service() {
let epoch_service = FakeEpochServiceBuilder {
total_spo: None,
total_stake: None,
..FakeEpochServiceBuilder::dummy(Epoch(3))
}
.build();
let epoch_service = Arc::new(RwLock::new(epoch_service));

let message = get_aggregator_status_message(epoch_service, String::new(), String::new())
.await
.unwrap();

assert_eq!(message.total_cardano_spo, 0);
assert_eq!(message.total_cardano_stake, 0);
}

#[tokio::test]
async fn retrieves_correct_total_signers_from_epoch_service() {
let total_signers = 5;
Expand Down
102 changes: 52 additions & 50 deletions mithril-aggregator/src/services/epoch_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ use mithril_common::entities::{
use mithril_common::logging::LoggerExtensions;
use mithril_common::protocol::{MultiSigner as ProtocolMultiSigner, SignerBuilder};
use mithril_common::StdResult;
use mithril_persistence::store::StakeStorer;

use crate::{
entities::AggregatorEpochSettings, services::StakeDistributionService, EpochSettingsStorer,
VerificationKeyStorer,
};
use crate::{entities::AggregatorEpochSettings, EpochSettingsStorer, VerificationKeyStorer};

/// Errors dedicated to the CertifierService.
#[derive(Debug, Error)]
Expand Down Expand Up @@ -118,10 +116,14 @@ pub trait EpochService: Sync + Send {
fn signed_entity_config(&self) -> StdResult<&SignedEntityConfig>;

/// Get the total number of SPOs for the current epoch in the Cardano stake distribution.
fn total_spo(&self) -> StdResult<TotalSPOs>;
///
/// Optional as the stake distribution is not available at Aggregator first startup.
fn total_spo(&self) -> StdResult<Option<TotalSPOs>>;

/// Get the total stake for the current epoch in the Cardano stake distribution.
fn total_stake(&self) -> StdResult<Stake>;
///
/// Optional as the stake distribution is not available at Aggregator first startup.
fn total_stake(&self) -> StdResult<Option<Stake>>;
}

struct EpochData {
Expand All @@ -138,8 +140,8 @@ struct EpochData {
total_stakes_signers: Stake,
total_next_stakes_signers: Stake,
signed_entity_config: SignedEntityConfig,
total_spo: TotalSPOs,
total_stake: Stake,
total_spo: Option<TotalSPOs>,
total_stake: Option<Stake>,
}

struct ComputedEpochData {
Expand All @@ -155,7 +157,7 @@ pub struct EpochServiceDependencies {
verification_key_store: Arc<dyn VerificationKeyStorer>,
chain_observer: Arc<dyn ChainObserver>,
era_checker: Arc<EraChecker>,
stake_distribution_service: Arc<dyn StakeDistributionService>,
stake_store: Arc<dyn StakeStorer>,
}

impl EpochServiceDependencies {
Expand All @@ -165,14 +167,14 @@ impl EpochServiceDependencies {
verification_key_store: Arc<dyn VerificationKeyStorer>,
chain_observer: Arc<dyn ChainObserver>,
era_checker: Arc<EraChecker>,
stake_distribution_service: Arc<dyn StakeDistributionService>,
stake_store: Arc<dyn StakeStorer>,
) -> Self {
Self {
epoch_settings_storer,
verification_key_store,
chain_observer,
era_checker,
stake_distribution_service,
stake_store,
}
}
}
Expand All @@ -187,7 +189,7 @@ pub struct MithrilEpochService {
verification_key_store: Arc<dyn VerificationKeyStorer>,
chain_observer: Arc<dyn ChainObserver>,
era_checker: Arc<EraChecker>,
stake_distribution_service: Arc<dyn StakeDistributionService>,
stake_store: Arc<dyn StakeStorer>,
allowed_signed_entity_discriminants: BTreeSet<SignedEntityTypeDiscriminants>,
logger: Logger,
}
Expand All @@ -208,7 +210,7 @@ impl MithrilEpochService {
verification_key_store: dependencies.verification_key_store,
chain_observer: dependencies.chain_observer,
era_checker: dependencies.era_checker,
stake_distribution_service: dependencies.stake_distribution_service,
stake_store: dependencies.stake_store,
allowed_signed_entity_discriminants: allowed_discriminants,
logger: logger.new_with_component_name::<Self>(),
}
Expand All @@ -224,19 +226,16 @@ impl MithrilEpochService {
Ok(cardano_era)
}

async fn get_total_spo_and_total_stake(&self, epoch: Epoch) -> StdResult<(TotalSPOs, Stake)> {
let stake_distribution = self
.stake_distribution_service
.get_stake_distribution(epoch)
.await
.with_context(|| {
format!("Epoch service failed to obtain the stake distribution for epoch: {epoch}")
})?;

Ok((
stake_distribution.len() as TotalSPOs,
stake_distribution.values().sum(),
))
async fn get_total_spo_and_total_stake(
&self,
epoch: Epoch,
) -> StdResult<(Option<TotalSPOs>, Option<Stake>)> {
match self.stake_store.get_stakes(epoch).await.with_context(|| {
format!("Epoch service failed to obtain the stake distribution for epoch: {epoch}")
})? {
None => Ok((None, None)),
Some(sd) => Ok((Some(sd.len() as TotalSPOs), Some(sd.values().sum()))),
}
}

async fn get_signers_with_stake_at_epoch(
Expand Down Expand Up @@ -348,7 +347,9 @@ impl EpochService for MithrilEpochService {
.clone(),
};

let (total_spo, total_stake) = self.get_total_spo_and_total_stake(epoch).await?;
let (total_spo, total_stake) = self
.get_total_spo_and_total_stake(signer_retrieval_epoch)
.await?;

self.epoch_data = Some(EpochData {
cardano_era,
Expand Down Expand Up @@ -506,11 +507,11 @@ impl EpochService for MithrilEpochService {
Ok(&self.unwrap_data()?.signed_entity_config)
}

fn total_spo(&self) -> StdResult<TotalSPOs> {
fn total_spo(&self) -> StdResult<Option<TotalSPOs>> {
Ok(self.unwrap_data()?.total_spo)
}

fn total_stake(&self) -> StdResult<Stake> {
fn total_stake(&self) -> StdResult<Option<Stake>> {
Ok(self.unwrap_data()?.total_stake)
}
}
Expand All @@ -535,8 +536,8 @@ pub(crate) struct FakeEpochServiceBuilder {
pub current_signers_with_stake: Vec<SignerWithStake>,
pub next_signers_with_stake: Vec<SignerWithStake>,
pub signed_entity_config: SignedEntityConfig,
pub total_spo: TotalSPOs,
pub total_stake: Stake,
pub total_spo: Option<TotalSPOs>,
pub total_stake: Option<Stake>,
}

#[cfg(test)]
Expand All @@ -555,8 +556,8 @@ impl FakeEpochServiceBuilder {
current_signers_with_stake: signers.clone(),
next_signers_with_stake: signers,
signed_entity_config: SignedEntityConfig::dummy(),
total_spo: 0,
total_stake: 0,
total_spo: None,
total_stake: None,
}
}

Expand Down Expand Up @@ -805,11 +806,11 @@ impl EpochService for FakeEpochService {
Ok(&self.unwrap_data()?.signed_entity_config)
}

fn total_spo(&self) -> StdResult<u32> {
fn total_spo(&self) -> StdResult<Option<u32>> {
Ok(self.unwrap_data()?.total_spo)
}

fn total_stake(&self) -> StdResult<u64> {
fn total_stake(&self) -> StdResult<Option<u64>> {
Ok(self.unwrap_data()?.total_stake)
}
}
Expand All @@ -826,9 +827,9 @@ mod tests {
};
use mockall::predicate::eq;

use crate::services::MockStakeDistributionService;
use crate::store::{FakeEpochSettingsStorer, MockVerificationKeyStorer};
use crate::test_tools::TestLogger;
use crate::tools::mocks::MockStakeStore;

use super::*;

Expand Down Expand Up @@ -859,8 +860,8 @@ mod tests {
current_signers: BTreeSet<Signer>,
next_signers: BTreeSet<Signer>,
signed_entity_config: SignedEntityConfig,
total_spo: TotalSPOs,
total_stake: Stake,
total_spo: Option<TotalSPOs>,
total_stake: Option<Stake>,
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -998,22 +999,23 @@ mod tests {
chain_observer.set_current_era(self.cardano_era).await;
let era_checker = EraChecker::new(self.mithril_era, Epoch::default());

let stake_distribution_service = {
let stake_store = {
assert!(
self.total_stake % self.total_spo as u64 == 0,
"'total_stake' must be a multiple of 'total_spo' to create a uniform stake distribution"
);
self.total_stake % self.total_spo as u64 == 0,
"'total_stake' must be a multiple of 'total_spo' to create a uniform stake distribution"
);
let stake_per_spo = self.total_stake / self.total_spo as u64;

let stake_distribution =
build_uniform_stake_distribution(self.total_spo, stake_per_spo);

let mut stake_distribution_service = MockStakeDistributionService::new();
stake_distribution_service
.expect_get_stake_distribution()
.returning(move |_| Ok(stake_distribution.clone()));
let mut stake_store = MockStakeStore::new();
stake_store
.expect_get_stakes()
.with(eq(signer_retrieval_epoch))
.returning(move |_| Ok(Some(stake_distribution.clone())));

stake_distribution_service
stake_store
};

MithrilEpochService::new(
Expand All @@ -1026,7 +1028,7 @@ mod tests {
Arc::new(verification_key_store),
Arc::new(chain_observer),
Arc::new(era_checker),
Arc::new(stake_distribution_service),
Arc::new(stake_store),
),
self.allowed_discriminants,
TestLogger::stdout(),
Expand Down Expand Up @@ -1095,8 +1097,8 @@ mod tests {
current_signers: current_epoch_fixture.signers().into_iter().collect(),
next_signers: next_epoch_fixture.signers().into_iter().collect(),
signed_entity_config: SignedEntityConfig::dummy(),
total_spo: 10,
total_stake: 20_000_000,
total_spo: Some(10),
total_stake: Some(20_000_000),
}
);
}
Expand Down
14 changes: 11 additions & 3 deletions mithril-aggregator/src/store/epoch_settings_storer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ pub trait EpochSettingsStorer: EpochPruningTask + Sync + Send {
async fn get_epoch_settings(&self, epoch: Epoch) -> StdResult<Option<AggregatorEpochSettings>>;

/// Handle discrepancies at startup in the epoch settings store.
///
/// In case an aggregator has been launched after some epochs of not being up or at initial startup,
/// the discrepancies in the epoch settings store needs to be fixed.
/// The epoch settings needs to be recorded for the working epoch and the next 2 epochs.
///
/// The epoch settings needs to be recorded for the working epoch and the next 3 epochs.
/// We need data over four epochs because the epoch service use epoch settings over a window of
/// three epochs, and there may be an epoch change between this `handle_discrepancies_at_startup`
/// call and the epoch service call.
async fn handle_discrepancies_at_startup(
&self,
current_epoch: Epoch,
epoch_settings_configuration: &AggregatorEpochSettings,
) -> StdResult<()> {
for epoch_offset in 0..=2 {
for epoch_offset in 0..=3 {
let epoch = current_epoch + epoch_offset;
if self.get_epoch_settings(epoch).await?.is_none() {
self.save_epoch_settings(epoch, epoch_settings_configuration.clone())
Expand Down Expand Up @@ -158,7 +163,7 @@ mod tests {
}

#[tokio::test]
async fn test_handle_discrepancies_at_startup_should_complete_at_least_two_epochs() {
async fn test_handle_discrepancies_at_startup_should_complete_at_least_four_epochs() {
let epoch_settings = AggregatorEpochSettings::dummy();
let epoch_settings_new = AggregatorEpochSettings {
protocol_parameters: ProtocolParameters {
Expand Down Expand Up @@ -191,6 +196,9 @@ mod tests {
assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);

let epoch_settings_stored = store.get_epoch_settings(epoch + 3).await.unwrap();
assert_eq!(Some(epoch_settings_new.clone()), epoch_settings_stored);

let epoch_settings_stored = store.get_epoch_settings(epoch + 4).await.unwrap();
assert!(epoch_settings_stored.is_none());
}
}
Loading

0 comments on commit 562c26b

Please sign in to comment.