From cf3ea901cf2212ab245a0c5305566d93d4526d31 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 20 Dec 2023 13:29:42 +0000 Subject: [PATCH] [refactor] modify timeout process so a block is proposed after a quorum received on max timeout. A new signal has been introduced for that. --- mysticeti-core/src/core.rs | 4 ++ mysticeti-core/src/core_thread/simulated.rs | 23 ++++++--- mysticeti-core/src/core_thread/spawned.rs | 34 ++++++++----- mysticeti-core/src/net_sync.rs | 40 +++++++++------ mysticeti-core/src/syncer.rs | 55 +++++++++++++++++---- mysticeti-core/src/test_util.rs | 21 +++++--- 6 files changed, 128 insertions(+), 49 deletions(-) diff --git a/mysticeti-core/src/core.rs b/mysticeti-core/src/core.rs index 0cf90a9d..95f8c356 100644 --- a/mysticeti-core/src/core.rs +++ b/mysticeti-core/src/core.rs @@ -327,6 +327,10 @@ impl Core { .expect("Failed to create wal syncer") } + pub fn current_round(&self) -> RoundNumber { + self.threshold_clock.get_round() + } + fn proposed_block_stats(&self, block: &Data) { self.metrics .proposed_block_size_bytes diff --git a/mysticeti-core/src/core_thread/simulated.rs b/mysticeti-core/src/core_thread/simulated.rs index 7577dff5..6e9750f2 100644 --- a/mysticeti-core/src/core_thread/simulated.rs +++ b/mysticeti-core/src/core_thread/simulated.rs @@ -6,25 +6,34 @@ use std::collections::HashSet; use crate::block_handler::BlockHandler; use crate::commit_observer::CommitObserver; use crate::data::Data; -use crate::syncer::{Syncer, SyncerSignals}; +use crate::syncer::{RoundAdvancedSignal, Syncer, SyncerSignals}; use crate::types::{AuthoritySet, BlockReference}; use crate::types::{RoundNumber, StatementBlock}; use parking_lot::Mutex; -pub struct CoreThreadDispatcher { - syncer: Mutex>, +pub struct CoreThreadDispatcher< + H: BlockHandler, + S: SyncerSignals, + R: RoundAdvancedSignal, + C: CommitObserver, +> { + syncer: Mutex>, } -impl - CoreThreadDispatcher +impl< + H: BlockHandler + 'static, + S: SyncerSignals + 'static, + R: RoundAdvancedSignal + 'static, + C: CommitObserver + 'static, + > CoreThreadDispatcher { - pub fn start(syncer: Syncer) -> Self { + pub fn start(syncer: Syncer) -> Self { Self { syncer: Mutex::new(syncer), } } - pub fn stop(self) -> Syncer { + pub fn stop(self) -> Syncer { self.syncer.into_inner() } diff --git a/mysticeti-core/src/core_thread/spawned.rs b/mysticeti-core/src/core_thread/spawned.rs index b2fa04a5..321aef19 100644 --- a/mysticeti-core/src/core_thread/spawned.rs +++ b/mysticeti-core/src/core_thread/spawned.rs @@ -4,7 +4,7 @@ use crate::block_handler::BlockHandler; use crate::commit_observer::CommitObserver; use crate::metrics::{Metrics, UtilizationTimerExt}; -use crate::syncer::{Syncer, SyncerSignals}; +use crate::syncer::{RoundAdvancedSignal, Syncer, SyncerSignals}; use crate::types::AuthoritySet; use crate::types::{RoundNumber, StatementBlock}; use crate::{data::Data, types::BlockReference}; @@ -12,14 +12,20 @@ use std::sync::Arc; use std::{collections::HashSet, thread}; use tokio::sync::{mpsc, oneshot}; -pub struct CoreThreadDispatcher { +pub struct CoreThreadDispatcher< + H: BlockHandler, + S: SyncerSignals, + R: RoundAdvancedSignal, + C: CommitObserver, +> { sender: mpsc::Sender, - join_handle: thread::JoinHandle>, + join_handle: thread::JoinHandle>, metrics: Arc, } -pub struct CoreThread { - syncer: Syncer, +pub struct CoreThread +{ + syncer: Syncer, receiver: mpsc::Receiver, } @@ -35,10 +41,14 @@ enum CoreThreadCommand { ), } -impl - CoreThreadDispatcher +impl< + H: BlockHandler + 'static, + S: SyncerSignals + 'static, + R: RoundAdvancedSignal + 'static, + C: CommitObserver + 'static, + > CoreThreadDispatcher { - pub fn start(syncer: Syncer) -> Self { + pub fn start(syncer: Syncer) -> Self { let (sender, receiver) = mpsc::channel(32); let metrics = syncer.core().metrics.clone(); let core_thread = CoreThread { syncer, receiver }; @@ -53,7 +63,7 @@ impl Syncer { + pub fn stop(self) -> Syncer { drop(self.sender); self.join_handle.join().unwrap() } @@ -109,8 +119,10 @@ impl CoreThread { - pub fn run(mut self) -> Syncer { +impl + CoreThread +{ + pub fn run(mut self) -> Syncer { tracing::info!("Started core thread with tid {}", gettid::gettid()); let metrics = self.syncer.core().metrics.clone(); while let Some(command) = self.receiver.blocking_recv() { diff --git a/mysticeti-core/src/net_sync.rs b/mysticeti-core/src/net_sync.rs index 219ce5c3..f6869667 100644 --- a/mysticeti-core/src/net_sync.rs +++ b/mysticeti-core/src/net_sync.rs @@ -9,8 +9,8 @@ use crate::runtime::Handle; use crate::runtime::{self, timestamp_utc}; use crate::runtime::{JoinError, JoinHandle}; use crate::syncer::{Syncer, SyncerSignals}; -use crate::types::AuthoritySet; use crate::types::{AuthorityIndex, StatementBlock}; +use crate::types::{AuthoritySet, RoundNumber}; use crate::wal::WalSyncer; use crate::{block_handler::BlockHandler, metrics::Metrics}; use crate::{block_store::BlockStore, synchronizer::BlockDisseminator}; @@ -23,6 +23,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::select; +use tokio::sync::watch::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot, Notify}; /// The maximum number of blocks that can be requested in a single message. @@ -66,7 +67,7 @@ pub struct NetworkSyncer { } pub struct NetworkSyncerInner { - pub syncer: CoreThreadDispatcher, C>, + pub syncer: CoreThreadDispatcher, Sender, C>, pub block_store: BlockStore, pub notify: Arc, committee: Arc, @@ -77,6 +78,7 @@ pub struct NetworkSyncerInner { } impl NetworkSyncer { + #[allow(clippy::too_many_arguments)] pub fn start( network: Network, core: Core, @@ -96,14 +98,17 @@ impl NetworkSyncer let wal_syncer = core.wal_syncer(); let block_store = core.block_store().clone(); let epoch_closing_time = core.epoch_closing_time(); + let (round_advanced_sender, round_advanced_receiver) = + tokio::sync::watch::channel(0 as RoundNumber); let mut syncer = Syncer::new( core, commit_period, notify.clone(), commit_observer, metrics.clone(), + round_advanced_sender, ); - syncer.force_new_block(0, Default::default()); + syncer.force_new_block(1, Default::default()); let syncer = CoreThreadDispatcher::start(syncer); let (stop_sender, stop_receiver) = mpsc::channel(1); stop_sender.try_send(()).unwrap(); // occupy the only available permit, so that all other calls to send() will block @@ -138,6 +143,7 @@ impl NetworkSyncer leader_timeout, parameters, cleanup_enabled, + round_advanced_receiver, )); let syncer_task = AsyncWalSyncer::start(wal_syncer, stop_sender, epoch_sender); Self { @@ -148,7 +154,7 @@ impl NetworkSyncer } } - pub async fn shutdown(self) -> Syncer, C> { + pub async fn shutdown(self) -> Syncer, Sender, C> { drop(self.stop); // todo - wait for network shutdown as well self.main_task.await.ok(); @@ -159,6 +165,7 @@ impl NetworkSyncer inner.syncer.stop() } + #[allow(clippy::too_many_arguments)] async fn run( mut network: Network, inner: Arc>, @@ -170,6 +177,7 @@ impl NetworkSyncer leader_timeout: Duration, parameters: SynchronizerParameters, cleanup_enabled: bool, + round_advanced_receiver: Receiver, ) { let mut connections: HashMap>> = HashMap::new(); let handle = Handle::current(); @@ -178,6 +186,7 @@ impl NetworkSyncer epoch_close_signal, shutdown_grace_period, leader_timeout, + round_advanced_receiver, )); let cleanup_task = if cleanup_enabled { handle.spawn(Self::cleanup_task(inner.clone())) @@ -377,14 +386,12 @@ impl NetworkSyncer mut epoch_close_signal: mpsc::Receiver<()>, shutdown_grace_period: Duration, leader_timeout: Duration, + mut round_advanced_receiver: Receiver, ) -> Option<()> { + let mut round = *round_advanced_receiver.borrow(); + let mut timed_out = false; + loop { - let notified = inner.notify.notified(); - let round = inner - .block_store - .last_own_block_ref() - .map(|b| b.round()) - .unwrap_or_default(); let closing_time = inner.epoch_closing_time.load(Ordering::Relaxed); let shutdown_duration = if closing_time != 0 { shutdown_grace_period.saturating_sub( @@ -397,14 +404,17 @@ impl NetworkSyncer return None; } select! { - _sleep = runtime::sleep(leader_timeout) => { + _sleep = runtime::sleep(leader_timeout), if !timed_out => { tracing::debug!("Timeout {round}"); // todo - more then one round timeout can happen, need to fix this let connected_authorities = inner.connected_authorities.lock().authorities.clone(); inner.syncer.force_new_block(round, connected_authorities).await; + timed_out = true; } - _notified = notified => { - // restart loop + _changed = round_advanced_receiver.changed() => { + // restart loop as we have received a new quorum + round = *round_advanced_receiver.borrow(); + timed_out = false; } _epoch_shutdown = runtime::sleep(shutdown_duration) => { tracing::info!("Shutting down sync after epoch close"); @@ -576,9 +586,11 @@ mod sim_tests { check_commits, print_stats, rng_at_seed, simulated_network_syncers, simulated_network_syncers_with_epoch_duration, }; + use crate::types::RoundNumber; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; + use tokio::sync::watch::Sender; use tokio::sync::Notify; #[test] @@ -604,7 +616,7 @@ mod sim_tests { async fn wait_for_epoch_to_close( network_syncers: Vec>, - ) -> Vec, TestCommitObserver>> { + ) -> Vec, Sender, TestCommitObserver>> { let mut any_closed = false; while !any_closed { for net_sync in network_syncers.iter() { diff --git a/mysticeti-core/src/syncer.rs b/mysticeti-core/src/syncer.rs index 2ec8a4c0..bc8d2301 100644 --- a/mysticeti-core/src/syncer.rs +++ b/mysticeti-core/src/syncer.rs @@ -9,27 +9,48 @@ use crate::runtime::timestamp_utc; use crate::types::{AuthoritySet, RoundNumber, StatementBlock}; use crate::{block_handler::BlockHandler, metrics::Metrics}; use std::sync::Arc; +use tokio::sync::watch::Sender; -pub struct Syncer { +pub struct Syncer { core: Core, force_new_block: bool, commit_period: u64, signals: S, commit_observer: C, metrics: Arc, + round_advanced_signal: R, } pub trait SyncerSignals: Send + Sync { fn new_block_ready(&mut self); } -impl Syncer { +pub trait RoundAdvancedSignal: Send + Sync { + fn new_round(&mut self, round_number: RoundNumber); +} + +impl RoundAdvancedSignal for Sender { + fn new_round(&mut self, round_number: RoundNumber) { + self.send(round_number).ok(); + } +} + +impl RoundAdvancedSignal for Option { + fn new_round(&mut self, round_number: RoundNumber) { + *self = Some(round_number) + } +} + +impl + Syncer +{ pub fn new( core: Core, commit_period: u64, signals: S, commit_observer: C, metrics: Arc, + round_advanced_signal: R, ) -> Self { Self { core, @@ -38,6 +59,7 @@ impl Syncer { signals, commit_observer, metrics, + round_advanced_signal, } } @@ -50,7 +72,15 @@ impl Syncer { .metrics .utilization_timer .utilization_timer("Syncer::add_blocks"); + let previous_round = self.core().current_round(); self.core.add_blocks(blocks); + let new_round = self.core().current_round(); + + // we got a new quorum of blocks. Let leader timeout task know about it. + if new_round > previous_round { + self.round_advanced_signal.new_round(new_round); + } + self.try_new_block(connected_authorities); } @@ -59,7 +89,7 @@ impl Syncer { round: RoundNumber, connected_authorities: AuthoritySet, ) -> bool { - if self.core.last_proposed() == round { + if self.core.last_proposed() < round { self.metrics.leader_timeout_total.inc(); self.force_new_block = true; self.try_new_block(connected_authorities); @@ -152,7 +182,7 @@ mod tests { DeliverBlock(Data), } - impl SimulatorState for Syncer { + impl SimulatorState for Syncer, TestCommitObserver> { type Event = SyncerEvent; fn handle_event(&mut self, event: Self::Event) { @@ -168,15 +198,20 @@ mod tests { } } - // New block was created - if self.signals { - self.signals = false; - let last_block = self.core.last_own_block().clone(); + // New quorum has been received and round has advanced + if let Some(new_round) = self.round_advanced_signal { + self.round_advanced_signal = None; Scheduler::schedule_event( ROUND_TIMEOUT, self.scheduler_state_id(), - SyncerEvent::ForceNewBlock(last_block.round()), + SyncerEvent::ForceNewBlock(new_round), ); + } + + // New block was created + if self.signals { + self.signals = false; + let last_block = self.core.last_own_block().clone(); for authority in self.core.committee().authorities() { if authority == self.core.authority() { continue; @@ -211,7 +246,7 @@ mod tests { simulator.schedule_event( Duration::ZERO, authority as usize, - SyncerEvent::ForceNewBlock(0), + SyncerEvent::ForceNewBlock(1), ); } // Simulation awaits for first num_txn transactions proposed by each authority to certify diff --git a/mysticeti-core/src/test_util.rs b/mysticeti-core/src/test_util.rs index e02774bb..12c2f6bb 100644 --- a/mysticeti-core/src/test_util.rs +++ b/mysticeti-core/src/test_util.rs @@ -18,7 +18,7 @@ use crate::net_sync::NetworkSyncer; use crate::network::Network; #[cfg(feature = "simulator")] use crate::simulated_network::SimulatedNetwork; -use crate::syncer::{Syncer, SyncerSignals}; +use crate::syncer::{RoundAdvancedSignal, Syncer, SyncerSignals}; use crate::types::{ format_authority_index, AuthorityIndex, BlockReference, RoundNumber, StatementBlock, }; @@ -150,7 +150,7 @@ pub fn committee_and_syncers( n: usize, ) -> ( Arc, - Vec>, + Vec, TestCommitObserver>>, ) { let (committee, cores, commit_observers, _) = committee_and_cores(n); ( @@ -159,7 +159,14 @@ pub fn committee_and_syncers( .into_iter() .zip(commit_observers) .map(|(core, commit_observer)| { - Syncer::new(core, 3, Default::default(), commit_observer, test_metrics()) + Syncer::new( + core, + 3, + Default::default(), + commit_observer, + test_metrics(), + None, + ) }) .collect(), ) @@ -267,8 +274,8 @@ pub fn rng_at_seed(seed: u64) -> StdRng { StdRng::from_seed(seed) } -pub fn check_commits( - syncers: &[Syncer], +pub fn check_commits( + syncers: &[Syncer], ) { let commits = syncers .iter() @@ -292,8 +299,8 @@ pub fn check_commits( } #[allow(dead_code)] -pub fn print_stats( - syncers: &[Syncer], +pub fn print_stats( + syncers: &[Syncer], reporters: &mut [MetricReporter], ) { assert_eq!(syncers.len(), reporters.len());