Skip to content

Commit

Permalink
[feat] request missing blocks directly from the proposer of a block.
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Dec 20, 2023
1 parent cf3ea90 commit f26852c
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 40 deletions.
72 changes: 68 additions & 4 deletions mysticeti-core/src/block_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,32 @@ impl BlockManager {
}
}

/// Attempts to process (accept) the provided blocks and stores them only when the causal history
/// is already present. If a block can't be processed, then it is parked in the `blocks_pending` map
/// and any missing references are recorded in the `block_references_waiting` and in the `missing` vector.
/// The method returns a tuple where the newly accepted/processed blocks are returned and the missing references
/// of the provided blocks. Keep in mind that the missing references are returned only the first one for a specific
/// provided block. If we attempt to add the same block again, then its missing references won't be returned again.
pub fn add_blocks(
&mut self,
blocks: Vec<Data<StatementBlock>>,
mut blocks: Vec<Data<StatementBlock>>,
block_writer: &mut impl BlockWriter,
) -> Vec<(WalPosition, Data<StatementBlock>)> {
) -> (
Vec<(WalPosition, Data<StatementBlock>)>,
HashSet<BlockReference>,
) {
// process the blocks in round order ascending to ensure that we do not create unnecessary missing references
blocks.sort_by_key(|b1| b1.round());
let mut blocks: VecDeque<Data<StatementBlock>> = blocks.into();
let mut newly_blocks_processed: Vec<(WalPosition, Data<StatementBlock>)> = vec![];
// missing references that we see them for first time
let mut missing_references = HashSet::new();
while let Some(block) = blocks.pop_front() {
// Update the highest known round number.

// check whether we have already processed this block and skip it if so.
let block_reference = block.reference();

if self.block_store.block_exists(*block_reference)
|| self.blocks_pending.contains_key(block_reference)
{
Expand All @@ -62,6 +76,17 @@ impl BlockManager {
// If we are missing a reference then we insert into pending and update the waiting index
if !self.block_store.block_exists(*included_reference) {
processed = false;

// we inserted the missing reference for the first time and the block has not been
// fetched already.
if !self
.block_references_waiting
.contains_key(included_reference)
&& !self.blocks_pending.contains_key(included_reference)
{
missing_references.insert(*included_reference);
}

self.block_references_waiting
.entry(*included_reference)
.or_default()
Expand Down Expand Up @@ -107,7 +132,7 @@ impl BlockManager {
}
}

newly_blocks_processed
(newly_blocks_processed, missing_references)
}

pub fn missing_blocks(&self) -> &[HashSet<BlockReference>] {
Expand Down Expand Up @@ -145,7 +170,7 @@ mod tests {
);
let mut processed_blocks = HashSet::new();
for block in iter {
let processed = bm.add_blocks(vec![block.clone()], &mut block_writer);
let (processed, _missing) = bm.add_blocks(vec![block.clone()], &mut block_writer);
print!("Adding {:?}:", block.reference());
for (_, p) in processed {
print!("{:?},", p.reference());
Expand All @@ -163,6 +188,45 @@ mod tests {
}
}

#[test]
fn test_block_manager_add_block_missing_references() {
let (metrics, _reporter) = Metrics::new(&Registry::new(), None);
let dag =
Dag::draw("A1:[A0, B0]; B1:[A0, B0]; B2:[A0, B1]; A2:[A1, B1]").add_genesis_blocks();
assert_eq!(dag.len(), 6); // 4 blocks in dag + 2 genesis

let mut block_writer = TestBlockWriter::new(&dag.committee());
let mut iter = dag.iter_rev();
let mut bm = BlockManager::new(
block_writer.block_store(),
&dag.committee(),
metrics.clone(),
);

let a2 = iter.next().unwrap();

// WHEN processing block A2 for first time we should get back 2 missing references
let (_processed, missing) = bm.add_blocks(vec![a2.clone()], &mut block_writer);
assert_eq!(missing.len(), 2);

// WHEN processing again block A2, then now missing should be empty
let (_processed, missing) = bm.add_blocks(vec![a2.clone()], &mut block_writer);
assert!(missing.is_empty());

// WHEN processing block B2, should now yield one missing blocks
let b2 = iter.next().unwrap();
let (_processed, missing) = bm.add_blocks(vec![b2.clone()], &mut block_writer);
assert_eq!(missing.len(), 1);

// Now processing all the rest of the blocks should yield as missing zero
let (_processed, missing) = bm.add_blocks(iter.cloned().collect(), &mut block_writer);

assert!(missing.is_empty());
assert_eq!(bm.block_references_waiting.len(), 0);
assert_eq!(bm.blocks_pending.len(), 0);
assert_eq!(bm.block_store.len_expensive(), dag.len());
}

fn rng(s: u8) -> StdRng {
let mut seed = [0; 32];
seed[0] = s;
Expand Down
10 changes: 6 additions & 4 deletions mysticeti-core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,16 @@ impl<H: BlockHandler> Core<H> {
self
}

// Note that generally when you update this function you also want to change genesis initialization above
pub fn add_blocks(&mut self, blocks: Vec<Data<StatementBlock>>) -> Vec<Data<StatementBlock>> {
// Note that generally when you update this function you also want to change genesis initialization above.
// The method returns the missing references in order to successfully process the provided blocks. The missing
// references though will be returned only the first time that a block is provided for processing.
pub fn add_blocks(&mut self, blocks: Vec<Data<StatementBlock>>) -> Vec<BlockReference> {
let _timer = self
.metrics
.utilization_timer
.utilization_timer("Core::add_blocks");
let now = timestamp_utc();
let processed = self
let (processed, missing_references) = self
.block_manager
.add_blocks(blocks, &mut (&mut self.wal_writer, &self.block_store));
let mut result = Vec::with_capacity(processed.len());
Expand All @@ -197,7 +199,7 @@ impl<H: BlockHandler> Core<H> {
.threshold_clock_round
.set(self.threshold_clock.get_round() as i64);
self.run_block_handler(&result);
result
missing_references.into_iter().collect()
}

fn run_block_handler(&mut self, processed: &[Data<StatementBlock>]) {
Expand Down
4 changes: 2 additions & 2 deletions mysticeti-core/src/core_thread/simulated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl<
&self,
blocks: Vec<Data<StatementBlock>>,
connected_authorities: AuthoritySet,
) {
self.syncer.lock().add_blocks(blocks, connected_authorities);
) -> Vec<BlockReference> {
self.syncer.lock().add_blocks(blocks, connected_authorities)
}

pub async fn force_new_block(&self, round: RoundNumber, connected_authorities: AuthoritySet) {
Expand Down
14 changes: 9 additions & 5 deletions mysticeti-core/src/core_thread/spawned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ pub struct CoreThread<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal,
}

enum CoreThreadCommand {
AddBlocks(Vec<Data<StatementBlock>>, AuthoritySet, oneshot::Sender<()>),
AddBlocks(
Vec<Data<StatementBlock>>,
AuthoritySet,
oneshot::Sender<Vec<BlockReference>>,
),
ForceNewBlock(RoundNumber, AuthoritySet, oneshot::Sender<()>),
Cleanup(oneshot::Sender<()>),
/// Request missing blocks that need to be synched.
Expand Down Expand Up @@ -72,15 +76,15 @@ impl<
&self,
blocks: Vec<Data<StatementBlock>>,
connected_authorities: AuthoritySet,
) {
) -> Vec<BlockReference> {
let (sender, receiver) = oneshot::channel();
self.send(CoreThreadCommand::AddBlocks(
blocks,
connected_authorities,
sender,
))
.await;
receiver.await.expect("core thread is not expected to stop");
receiver.await.expect("core thread is not expected to stop")
}

pub async fn force_new_block(&self, round: RoundNumber, connected_authorities: AuthoritySet) {
Expand Down Expand Up @@ -130,8 +134,8 @@ impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserve
metrics.core_lock_dequeued.inc();
match command {
CoreThreadCommand::AddBlocks(blocks, connected_authorities, sender) => {
self.syncer.add_blocks(blocks, connected_authorities);
sender.send(()).ok();
let missing_blocks = self.syncer.add_blocks(blocks, connected_authorities);
sender.send(missing_blocks).ok();
}
CoreThreadCommand::ForceNewBlock(round, connected_authorities, sender) => {
self.syncer.force_new_block(round, connected_authorities);
Expand Down
47 changes: 30 additions & 17 deletions mysticeti-core/src/net_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::runtime::Handle;
use crate::runtime::{self, timestamp_utc};
use crate::runtime::{JoinError, JoinHandle};
use crate::syncer::{Syncer, SyncerSignals};
use crate::types::{AuthorityIndex, StatementBlock};
use crate::types::{AuthorityIndex, BlockReference, StatementBlock};
use crate::types::{AuthoritySet, RoundNumber};
use crate::wal::WalSyncer;
use crate::{block_handler::BlockHandler, metrics::Metrics};
Expand Down Expand Up @@ -267,18 +267,14 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
disseminator.disseminate_own_blocks(round).await
}
NetworkMessage::Blocks(blocks) => {
if Self::process_blocks(&inner, &block_verifier, &metrics, blocks)
.await
.is_err()
{
break;
}
}
NetworkMessage::Block(block) => {
if Self::process_blocks(&inner, &block_verifier, &metrics, vec![block])
.await
.is_err()
if let Ok(missing_blocks) =
Self::process_blocks(&inner, &block_verifier, &metrics, blocks).await
{
// we only want to request missing blocks when a validator is sending us their block
// proposals, and not during a bulk catchup via our request (RequestBlocks) to avoid
// overwhelming the peer.
Self::request_missing_blocks(missing_blocks, &connection.sender);
} else {
break;
}
}
Expand All @@ -295,6 +291,14 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
break;
}
}
NetworkMessage::RequestBlocksResponse(blocks) => {
if Self::process_blocks(&inner, &block_verifier, &metrics, blocks)
.await
.is_err()
{
break;
}
}
NetworkMessage::BlockNotFound(_references) => {
// TODO: leverage this signal to request blocks from other peers
}
Expand All @@ -312,9 +316,9 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>
block_verifier: &Arc<impl BlockVerifier>,
metrics: &Arc<Metrics>,
blocks: Vec<Data<StatementBlock>>,
) -> Result<(), eyre::Report> {
) -> Result<Vec<BlockReference>, eyre::Report> {
if blocks.is_empty() {
return Ok(());
return Ok(vec![]);
}

let now = timestamp_utc();
Expand Down Expand Up @@ -372,13 +376,22 @@ impl<H: BlockHandler + 'static, C: CommitObserver + 'static> NetworkSyncer<H, C>

if !to_process.is_empty() {
let connected_authorities = inner.connected_authorities.lock().authorities.clone();
inner
return Ok(inner
.syncer
.add_blocks(to_process, connected_authorities)
.await;
.await);
}

Ok(())
Ok(vec![])
}

fn request_missing_blocks(
missing_blocks: Vec<BlockReference>,
sender: &mpsc::Sender<NetworkMessage>,
) {
if let Ok(permit) = sender.try_reserve() {
permit.send(NetworkMessage::RequestBlocks(missing_blocks))
}
}

async fn leader_timeout_task(
Expand Down
5 changes: 3 additions & 2 deletions mysticeti-core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ const PING_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Debug, Serialize, Deserialize)]
pub enum NetworkMessage {
SubscribeOwnFrom(RoundNumber), // subscribe from round number excluding
Block(Data<StatementBlock>),
// Sending multiple blocks at once
/// Sending multiple blocks at once
Blocks(Vec<Data<StatementBlock>>),
/// Request a few specific block references (this is not indented for large requests).
RequestBlocks(Vec<BlockReference>),
/// The response to the request blocks
RequestBlocksResponse(Vec<Data<StatementBlock>>),
/// Indicate that a requested block is not found.
BlockNotFound(Vec<BlockReference>),
}
Expand Down
8 changes: 5 additions & 3 deletions mysticeti-core/src/syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::core::Core;
use crate::data::Data;
use crate::metrics::UtilizationTimerVecExt;
use crate::runtime::timestamp_utc;
use crate::types::{AuthoritySet, RoundNumber, StatementBlock};
use crate::types::{AuthoritySet, BlockReference, RoundNumber, StatementBlock};
use crate::{block_handler::BlockHandler, metrics::Metrics};
use std::sync::Arc;
use tokio::sync::watch::Sender;
Expand Down Expand Up @@ -67,13 +67,13 @@ impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserve
&mut self,
blocks: Vec<Data<StatementBlock>>,
connected_authorities: AuthoritySet,
) {
) -> Vec<BlockReference> {
let _timer = self
.metrics
.utilization_timer
.utilization_timer("Syncer::add_blocks");
let previous_round = self.core().current_round();
self.core.add_blocks(blocks);
let missing_references = self.core.add_blocks(blocks);
let new_round = self.core().current_round();

// we got a new quorum of blocks. Let leader timeout task know about it.
Expand All @@ -82,6 +82,8 @@ impl<H: BlockHandler, S: SyncerSignals, R: RoundAdvancedSignal, C: CommitObserve
}

self.try_new_block(connected_authorities);

missing_references
}

pub fn force_new_block(
Expand Down
6 changes: 3 additions & 3 deletions mysticeti-core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ where
let blocks = inner
.block_store
.get_others_blocks(round, author, batch_size);
for block in blocks {
round = block.round();
to.send(NetworkMessage::Block(block)).await.ok()?;
if !blocks.is_empty() {
round = blocks.last().unwrap().round();
to.send(NetworkMessage::Blocks(blocks)).await.ok()?;
}
sleep(stream_interval).await;
}
Expand Down
24 changes: 24 additions & 0 deletions mysticeti-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ mod test {
use super::*;
use rand::prelude::SliceRandom;
use rand::Rng;
use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -833,6 +834,18 @@ mod test {
RandomDagIter(self, v.into_iter())
}

pub fn iter_rev(&self) -> DagIter {
let mut v: Vec<_> = self.0.keys().cloned().collect();
v.sort_by(|b1, b2| {
let ordering = b2.round.cmp(&b1.round);
if ordering == Ordering::Equal {
return b1.authority.cmp(&b2.authority);
}
ordering
});
DagIter(self, v.into_iter())
}

pub fn len(&self) -> usize {
self.0.len()
}
Expand Down Expand Up @@ -864,6 +877,17 @@ mod test {
}
}

pub struct DagIter<'a>(&'a Dag, std::vec::IntoIter<BlockReference>);

impl<'a> Iterator for DagIter<'a> {
type Item = &'a Data<StatementBlock>;

fn next(&mut self) -> Option<Self::Item> {
let next = self.1.next()?;
Some(self.0 .0.get(&next).unwrap())
}
}

#[test]
fn test_draw_dag() {
let d = Dag::draw("A1:[A0, B1]; B2:[B1]").0;
Expand Down

0 comments on commit f26852c

Please sign in to comment.