Skip to content

Commit

Permalink
[refactor] small improvements and fixes to the proposing logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Dec 15, 2023
1 parent e8b8c07 commit 396bab3
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 20 deletions.
65 changes: 57 additions & 8 deletions mysticeti-core/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::committee::Committee;
use crate::committee::{Authority, Committee};
use crate::crypto::Signer;
use crate::data::Data;
use crate::epoch_close::EpochManager;
Expand All @@ -26,11 +26,13 @@ use crate::{
consensus::universal_committer::UniversalCommitter,
};
use crate::{config::Parameters, consensus::linearizer::CommittedSubDag};
use itertools::Itertools;
use minibytes::Bytes;
use std::collections::{HashSet, VecDeque};
use std::mem;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;

pub struct Core<H: BlockHandler> {
block_manager: BlockManager,
Expand Down Expand Up @@ -83,7 +85,7 @@ impl<H: BlockHandler> Core<H> {
unprocessed_blocks,
last_committed_leader,
} = recovered;
let mut threshold_clock = ThresholdClockAggregator::new(0);
let mut threshold_clock = ThresholdClockAggregator::new(0, metrics.clone());
let last_own_block = if let Some(own_block) = last_own_block {
for (_, pending_block) in pending.iter() {
if let MetaStatement::Include(include) = pending_block {
Expand Down Expand Up @@ -174,7 +176,10 @@ impl<H: BlockHandler> Core<H> {
.block_manager
.add_blocks(blocks, &mut (&mut self.wal_writer, &self.block_store));
let mut result = Vec::with_capacity(processed.len());
for (position, processed) in processed.into_iter() {
for (position, processed) in processed
.into_iter()
.sorted_by(|b1, b2| b1.1.round().cmp(&b2.1.round()))
{
// report latency
let hostname = self.committee.authority_safe(processed.author()).hostname();
self.metrics
Expand All @@ -185,7 +190,6 @@ impl<H: BlockHandler> Core<H> {
.unwrap_or_default()
.as_secs_f64(),
);

self.threshold_clock
.add_block(*processed.reference(), &self.committee);
self.pending
Expand Down Expand Up @@ -321,6 +325,15 @@ impl<H: BlockHandler> Core<H> {
Some(block)
}

pub fn leaders(&self, leader_round: RoundNumber) -> Vec<&Authority> {
let leaders = self.committer.get_leaders(leader_round);

leaders
.iter()
.map(|leader_id| self.committee.authority_safe(*leader_id))
.collect()
}

pub fn wal_syncer(&self) -> WalSyncer {
self.wal_writer
.syncer()
Expand Down Expand Up @@ -385,10 +398,46 @@ impl<H: BlockHandler> Core<H> {
// Leader round we check if we have a leader block
if quorum_round > self.last_decided_leader.round().max(period - 1) {
let leader_round = quorum_round - 1;
let mut leaders = self.committer.get_leaders(leader_round);
leaders.retain(|leader| connected_authorities.contains(*leader));
self.block_store
.all_blocks_exists_at_authority_round(&leaders, leader_round)
let leaders = self.committer.get_leaders(leader_round);
if leaders.is_empty() {
self.metrics
.ready_new_block
.with_label_values(&["non_leader_round"])
.inc();
return true;
}

let connected_leaders: Vec<_> = leaders
.iter()
.filter(|leader| connected_authorities.contains(**leader))
.cloned()
.collect();

if connected_leaders.is_empty() {
self.metrics
.ready_new_block
.with_label_values(&["leader_not_connected"])
.inc();
return true;
}

if self
.block_store
.all_blocks_exists_at_authority_round(&connected_leaders, leader_round)
{
// time to receive leader
let now = Instant::now();
tracing::debug!(
"Leader receive time for round {}: {:?} , {:?}",
leader_round,
now.duration_since(self.threshold_clock.last_quorum_ts()),
leaders
);

return true;
}

false
} else {
false
}
Expand Down
22 changes: 19 additions & 3 deletions mysticeti-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use crate::stat::{histogram, DivUsize, HistogramSender, PreciseHistogram};
use crate::types::{format_authority_index, AuthorityIndex};
use prometheus::{
register_counter_vec_with_registry, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, CounterVec,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_counter_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry, CounterVec, Histogram, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use std::net::SocketAddr;
use std::ops::AddAssign;
Expand Down Expand Up @@ -81,6 +82,8 @@ pub struct Metrics {
pub block_commit_latency: HistogramSender<Duration>,
pub block_receive_latency: HistogramVec,
pub add_block_latency: HistogramVec,
pub quorum_receive_latency: Histogram,
pub ready_new_block: IntCounterVec,
}

pub struct MetricReporter {
Expand Down Expand Up @@ -390,6 +393,19 @@ impl Metrics {
registry
).unwrap(),

quorum_receive_latency: register_histogram_with_registry!(
"quorum_receive_latency",
"The time it took to reach a round quorum",
registry
).unwrap(),

ready_new_block: register_int_counter_vec_with_registry!(
"ready_new_block",
"Report when it's ready to propose a new block",
&["type"],
registry,
).unwrap(),

transaction_certified_latency,
certificate_committed_latency,
transaction_committed_latency,
Expand Down
26 changes: 24 additions & 2 deletions mysticeti-core/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::metrics::UtilizationTimerVecExt;
use crate::runtime::timestamp_utc;
use crate::types::{AuthoritySet, RoundNumber, StatementBlock};
use crate::{block_handler::BlockHandler, metrics::Metrics};
use itertools::Itertools;
use std::sync::Arc;

pub struct Syncer<H: BlockHandler, S: SyncerSignals, C: CommitObserver> {
Expand Down Expand Up @@ -50,8 +51,13 @@ impl<H: BlockHandler, S: SyncerSignals, C: CommitObserver> Syncer<H, S, C> {
.metrics
.utilization_timer
.utilization_timer("Syncer::add_blocks");
self.core.add_blocks(blocks);
self.try_new_block(connected_authorities);
for block in blocks
.into_iter()
.sorted_by(|b1, b2| b1.round().cmp(&b2.round()))
{
self.core.add_blocks(vec![block]);
self.try_new_block(connected_authorities.clone());
}
}

pub fn force_new_block(
Expand Down Expand Up @@ -82,6 +88,22 @@ impl<H: BlockHandler, S: SyncerSignals, C: CommitObserver> Syncer<H, S, C> {
if self.core.try_new_block().is_none() {
return;
}

if self.force_new_block {
let leaders = self
.core
.leaders(self.core.last_proposed().saturating_sub(1));
tracing::debug!(
"Proposed with timeout for round {} missing {:?} ",
self.core.last_proposed(),
leaders
);
self.metrics
.ready_new_block
.with_label_values(&["leader_timeout"])
.inc();
}

self.signals.new_block_ready();
self.force_new_block = false;

Expand Down
35 changes: 28 additions & 7 deletions mysticeti-core/src/threshold_clock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::cmp::Ordering;
use std::sync::Arc;
use std::time::Instant;

use crate::committee::{Committee, QuorumThreshold, StakeAggregator};
use crate::metrics::Metrics;
use crate::types::{BlockReference, RoundNumber, StatementBlock};

// A block is threshold clock valid if:
Expand Down Expand Up @@ -34,16 +37,24 @@ pub fn threshold_clock_valid_non_genesis(block: &StatementBlock, committee: &Com
pub struct ThresholdClockAggregator {
aggregator: StakeAggregator<QuorumThreshold>,
round: RoundNumber,
last_quorum_ts: Instant,
metrics: Arc<Metrics>,
}

impl ThresholdClockAggregator {
pub fn new(round: RoundNumber) -> Self {
pub fn new(round: RoundNumber, metrics: Arc<Metrics>) -> Self {
Self {
aggregator: StakeAggregator::new(),
round,
last_quorum_ts: Instant::now(),
metrics,
}
}

pub fn last_quorum_ts(&self) -> Instant {
self.last_quorum_ts
}

pub fn add_block(&mut self, block: BlockReference, committee: &Committee) {
match block.round.cmp(&self.round) {
// Blocks with round less then what we currently build are irrelevant here
Expand All @@ -59,13 +70,22 @@ impl ThresholdClockAggregator {
self.aggregator.clear();
// We have seen 2f+1 blocks for current round, advance
self.round = block.round + 1;

// now record the time of receipt from last quorum
let now = Instant::now();
self.metrics
.quorum_receive_latency
.observe(now.duration_since(self.last_quorum_ts).as_secs_f64());
tracing::debug!(
"Time since last quorum for round {}: {}",
self.round,
self.last_quorum_ts.elapsed().as_secs_f64()
);

self.last_quorum_ts = now;
}
}
}
if block.round > self.round {
// If we processed block for round r, we also have stored 2f+1 blocks from r-1
self.round = block.round;
}
}

pub fn get_round(&self) -> RoundNumber {
Expand All @@ -75,7 +95,7 @@ impl ThresholdClockAggregator {

#[cfg(test)]
mod tests {

use crate::test_util::test_metrics;
use crate::types::Dag;

use super::*;
Expand Down Expand Up @@ -115,7 +135,8 @@ mod tests {
#[test]
fn test_threshold_clock_aggregator() {
let committee = Committee::new_test(vec![1, 1, 1, 1]);
let mut aggregator = ThresholdClockAggregator::new(0);
let metrics = test_metrics();
let mut aggregator = ThresholdClockAggregator::new(0, metrics);

aggregator.add_block(BlockReference::new_test(0, 0), &committee);
assert_eq!(aggregator.get_round(), 0);
Expand Down

0 comments on commit 396bab3

Please sign in to comment.