Skip to content

Commit

Permalink
[refactor] modify timeout process so a block is proposed after a quor…
Browse files Browse the repository at this point in the history
…um received on max timeout. A new signal has been introduced for that.
  • Loading branch information
akichidis committed Dec 20, 2023
1 parent f4bae7f commit cf3ea90
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 49 deletions.
4 changes: 4 additions & 0 deletions mysticeti-core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ impl<H: BlockHandler> Core<H> {
.expect("Failed to create wal syncer")
}

pub fn current_round(&self) -> RoundNumber {
self.threshold_clock.get_round()
}

fn proposed_block_stats(&self, block: &Data<StatementBlock>) {
self.metrics
.proposed_block_size_bytes
Expand Down
23 changes: 16 additions & 7 deletions mysticeti-core/src/core_thread/simulated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,34 @@ use std::collections::HashSet;
use crate::block_handler::BlockHandler;
use crate::commit_observer::CommitObserver;
use crate::data::Data;
use crate::syncer::{Syncer, SyncerSignals};
use crate::syncer::{RoundAdvancedSignal, Syncer, SyncerSignals};
use crate::types::{AuthoritySet, BlockReference};
use crate::types::{RoundNumber, StatementBlock};
use parking_lot::Mutex;

pub struct CoreThreadDispatcher<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
syncer: Mutex<Syncer<H, S, C>>,
pub struct CoreThreadDispatcher<
H: BlockHandler,
S: SyncerSignals,
R: RoundAdvancedSignal,
C: CommitObserver,
> {
syncer: Mutex<Syncer<H, S, R, C>>,
}

impl<H: BlockHandler + 'static, S: SyncerSignals + 'static, C: CommitObserver + 'static>
CoreThreadDispatcher<H, S, C>
impl<
H: BlockHandler + 'static,
S: SyncerSignals + 'static,
R: RoundAdvancedSignal + 'static,
C: CommitObserver + 'static,
> CoreThreadDispatcher<H, S, R, C>
{
pub fn start(syncer: Syncer<H, S, C>) -> Self {
pub fn start(syncer: Syncer<H, S, R, C>) -> Self {
Self {
syncer: Mutex::new(syncer),
}
}

pub fn stop(self) -> Syncer<H, S, C> {
pub fn stop(self) -> Syncer<H, S, R, C> {
self.syncer.into_inner()
}

Expand Down
34 changes: 23 additions & 11 deletions mysticeti-core/src/core_thread/spawned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,28 @@
use crate::block_handler::BlockHandler;
use crate::commit_observer::CommitObserver;
use crate::metrics::{Metrics, UtilizationTimerExt};
use crate::syncer::{Syncer, SyncerSignals};
use crate::syncer::{RoundAdvancedSignal, Syncer, SyncerSignals};
use crate::types::AuthoritySet;
use crate::types::{RoundNumber, StatementBlock};
use crate::{data::Data, types::BlockReference};
use std::sync::Arc;
use std::{collections::HashSet, thread};
use tokio::sync::{mpsc, oneshot};

pub struct CoreThreadDispatcher<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
pub struct CoreThreadDispatcher<
H: BlockHandler,
S: SyncerSignals,
R: RoundAdvancedSignal,
C: CommitObserver,
> {
sender: mpsc::Sender<CoreThreadCommand>,
join_handle: thread::JoinHandle<Syncer<H, S, C>>,
join_handle: thread::JoinHandle<Syncer<H, S, R, C>>,
metrics: Arc<Metrics>,
}

pub struct CoreThread<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
syncer: Syncer<H, S, C>,
pub struct CoreThread<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserver>
{
syncer: Syncer<H, S, R, C>,
receiver: mpsc::Receiver<CoreThreadCommand>,
}

Expand All @@ -35,10 +41,14 @@ enum CoreThreadCommand {
),
}

impl<H: BlockHandler + 'static, S: SyncerSignals + 'static, C: CommitObserver + 'static>
CoreThreadDispatcher<H, S, C>
impl<
H: BlockHandler + 'static,
S: SyncerSignals + 'static,
R: RoundAdvancedSignal + 'static,
C: CommitObserver + 'static,
> CoreThreadDispatcher<H, S, R, C>
{
pub fn start(syncer: Syncer<H, S, C>) -> Self {
pub fn start(syncer: Syncer<H, S, R, C>) -> Self {
let (sender, receiver) = mpsc::channel(32);
let metrics = syncer.core().metrics.clone();
let core_thread = CoreThread { syncer, receiver };
Expand All @@ -53,7 +63,7 @@ impl<H: BlockHandler + 'static, S: SyncerSignals + 'static, C: CommitObserver +
}
}

pub fn stop(self) -> Syncer<H, S, C> {
pub fn stop(self) -> Syncer<H, S, R, C> {
drop(self.sender);
self.join_handle.join().unwrap()
}
Expand Down Expand Up @@ -109,8 +119,10 @@ impl<H: BlockHandler + 'static, S: SyncerSignals + 'static, C: CommitObserver +
}
}

impl<H: BlockHandler, S: SyncerSignals, C: CommitObserver> CoreThread<H, S, C> {
pub fn run(mut self) -> Syncer<H, S, C> {
impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserver>
CoreThread<H, S, R, C>
{
pub fn run(mut self) -> Syncer<H, S, R, C> {
tracing::info!("Started core thread with tid {}", gettid::gettid());
let metrics = self.syncer.core().metrics.clone();
while let Some(command) = self.receiver.blocking_recv() {
Expand Down
40 changes: 26 additions & 14 deletions mysticeti-core/src/net_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::runtime::Handle;
use crate::runtime::{self, timestamp_utc};
use crate::runtime::{JoinError, JoinHandle};
use crate::syncer::{Syncer, SyncerSignals};
use crate::types::AuthoritySet;
use crate::types::{AuthorityIndex, StatementBlock};
use crate::types::{AuthoritySet, RoundNumber};
use crate::wal::WalSyncer;
use crate::{block_handler::BlockHandler, metrics::Metrics};
use crate::{block_store::BlockStore, synchronizer::BlockDisseminator};
Expand All @@ -23,6 +23,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::watch::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, Notify};

/// The maximum number of blocks that can be requested in a single message.
Expand Down Expand Up @@ -66,7 +67,7 @@ pub struct NetworkSyncer<H: BlockHandler, C: CommitObserver> {
}

pub struct NetworkSyncerInner<H: BlockHandler, C: CommitObserver> {
pub syncer: CoreThreadDispatcher<H, Arc<Notify>, C>,
pub syncer: CoreThreadDispatcher<H, Arc<Notify>, Sender<RoundNumber>, C>,
pub block_store: BlockStore,
pub notify: Arc<Notify>,
committee: Arc<Committee>,
Expand All @@ -77,6 +78,7 @@ pub struct NetworkSyncerInner<H: BlockHandler, C: CommitObserver> {
}

impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C> {
#[allow(clippy::too_many_arguments)]
pub fn start(
network: Network,
core: Core<H>,
Expand All @@ -96,14 +98,17 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
let wal_syncer = core.wal_syncer();
let block_store = core.block_store().clone();
let epoch_closing_time = core.epoch_closing_time();
let (round_advanced_sender, round_advanced_receiver) =
tokio::sync::watch::channel(0 as RoundNumber);
let mut syncer = Syncer::new(
core,
commit_period,
notify.clone(),
commit_observer,
metrics.clone(),
round_advanced_sender,
);
syncer.force_new_block(0, Default::default());
syncer.force_new_block(1, Default::default());
let syncer = CoreThreadDispatcher::start(syncer);
let (stop_sender, stop_receiver) = mpsc::channel(1);
stop_sender.try_send(()).unwrap(); // occupy the only available permit, so that all other calls to send() will block
Expand Down Expand Up @@ -138,6 +143,7 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
leader_timeout,
parameters,
cleanup_enabled,
round_advanced_receiver,
));
let syncer_task = AsyncWalSyncer::start(wal_syncer, stop_sender, epoch_sender);
Self {
Expand All @@ -148,7 +154,7 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
}
}

pub async fn shutdown(self) -> Syncer<H, Arc<Notify>, C> {
pub async fn shutdown(self) -> Syncer<H, Arc<Notify>, Sender<RoundNumber>, C> {
drop(self.stop);
// todo - wait for network shutdown as well
self.main_task.await.ok();
Expand All @@ -159,6 +165,7 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
inner.syncer.stop()
}

#[allow(clippy::too_many_arguments)]
async fn run(
mut network: Network,
inner: Arc<NetworkSyncerInner<H, C>>,
Expand All @@ -170,6 +177,7 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
leader_timeout: Duration,
parameters: SynchronizerParameters,
cleanup_enabled: bool,
round_advanced_receiver: Receiver<RoundNumber>,
) {
let mut connections: HashMap<usize, JoinHandle<Option<()>>> = HashMap::new();
let handle = Handle::current();
Expand All @@ -178,6 +186,7 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
epoch_close_signal,
shutdown_grace_period,
leader_timeout,
round_advanced_receiver,
));
let cleanup_task = if cleanup_enabled {
handle.spawn(Self::cleanup_task(inner.clone()))
Expand Down Expand Up @@ -377,14 +386,12 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
mut epoch_close_signal: mpsc::Receiver<()>,
shutdown_grace_period: Duration,
leader_timeout: Duration,
mut round_advanced_receiver: Receiver<RoundNumber>,
) -> Option<()> {
let mut round = *round_advanced_receiver.borrow();
let mut timed_out = false;

loop {
let notified = inner.notify.notified();
let round = inner
.block_store
.last_own_block_ref()
.map(|b| b.round())
.unwrap_or_default();
let closing_time = inner.epoch_closing_time.load(Ordering::Relaxed);
let shutdown_duration = if closing_time != 0 {
shutdown_grace_period.saturating_sub(
Expand All @@ -397,14 +404,17 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
return None;
}
select! {
_sleep = runtime::sleep(leader_timeout) => {
_sleep = runtime::sleep(leader_timeout), if !timed_out => {
tracing::debug!("Timeout {round}");
// todo - more then one round timeout can happen, need to fix this
let connected_authorities = inner.connected_authorities.lock().authorities.clone();
inner.syncer.force_new_block(round, connected_authorities).await;
timed_out = true;
}
_notified = notified => {
// restart loop
_changed = round_advanced_receiver.changed() => {
// restart loop as we have received a new quorum
round = *round_advanced_receiver.borrow();
timed_out = false;
}
_epoch_shutdown = runtime::sleep(shutdown_duration) => {
tracing::info!("Shutting down sync after epoch close");
Expand Down Expand Up @@ -576,9 +586,11 @@ mod sim_tests {
check_commits, print_stats, rng_at_seed, simulated_network_syncers,
simulated_network_syncers_with_epoch_duration,
};
use crate::types::RoundNumber;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch::Sender;
use tokio::sync::Notify;

#[test]
Expand All @@ -604,7 +616,7 @@ mod sim_tests {

async fn wait_for_epoch_to_close(
network_syncers: Vec<NetworkSyncer<TestBlockHandler, TestCommitObserver>>,
) -> Vec<Syncer<TestBlockHandler, Arc<Notify>, TestCommitObserver>> {
) -> Vec<Syncer<TestBlockHandler, Arc<Notify>, Sender<RoundNumber>, TestCommitObserver>> {
let mut any_closed = false;
while !any_closed {
for net_sync in network_syncers.iter() {
Expand Down
Loading

0 comments on commit cf3ea90

Please sign in to comment.