From f26852c5a801ebc06c81758ffaee50ccd4458fbe Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 20 Dec 2023 15:11:06 +0000 Subject: [PATCH] [feat] request missing blocks directly from the proposer of a block. --- mysticeti-core/src/block_manager.rs | 72 +++++++++++++++++++-- mysticeti-core/src/core.rs | 10 +-- mysticeti-core/src/core_thread/simulated.rs | 4 +- mysticeti-core/src/core_thread/spawned.rs | 14 ++-- mysticeti-core/src/net_sync.rs | 47 +++++++++----- mysticeti-core/src/network.rs | 5 +- mysticeti-core/src/syncer.rs | 8 ++- mysticeti-core/src/synchronizer.rs | 6 +- mysticeti-core/src/types.rs | 24 +++++++ 9 files changed, 150 insertions(+), 40 deletions(-) diff --git a/mysticeti-core/src/block_manager.rs b/mysticeti-core/src/block_manager.rs index 05b48c48..4ac732dd 100644 --- a/mysticeti-core/src/block_manager.rs +++ b/mysticeti-core/src/block_manager.rs @@ -39,18 +39,32 @@ impl BlockManager { } } + /// Attempts to process (accept) the provided blocks and stores them only when the causal history + /// is already present. If a block can't be processed, then it is parked in the `blocks_pending` map + /// and any missing references are recorded in the `block_references_waiting` and in the `missing` vector. + /// The method returns a tuple where the newly accepted/processed blocks are returned and the missing references + /// of the provided blocks. Keep in mind that the missing references are returned only the first one for a specific + /// provided block. If we attempt to add the same block again, then its missing references won't be returned again. pub fn add_blocks( &mut self, - blocks: Vec>, + mut blocks: Vec>, block_writer: &mut impl BlockWriter, - ) -> Vec<(WalPosition, Data)> { + ) -> ( + Vec<(WalPosition, Data)>, + HashSet, + ) { + // process the blocks in round order ascending to ensure that we do not create unnecessary missing references + blocks.sort_by_key(|b1| b1.round()); let mut blocks: VecDeque> = blocks.into(); let mut newly_blocks_processed: Vec<(WalPosition, Data)> = vec![]; + // missing references that we see them for first time + let mut missing_references = HashSet::new(); while let Some(block) = blocks.pop_front() { // Update the highest known round number. // check whether we have already processed this block and skip it if so. let block_reference = block.reference(); + if self.block_store.block_exists(*block_reference) || self.blocks_pending.contains_key(block_reference) { @@ -62,6 +76,17 @@ impl BlockManager { // If we are missing a reference then we insert into pending and update the waiting index if !self.block_store.block_exists(*included_reference) { processed = false; + + // we inserted the missing reference for the first time and the block has not been + // fetched already. + if !self + .block_references_waiting + .contains_key(included_reference) + && !self.blocks_pending.contains_key(included_reference) + { + missing_references.insert(*included_reference); + } + self.block_references_waiting .entry(*included_reference) .or_default() @@ -107,7 +132,7 @@ impl BlockManager { } } - newly_blocks_processed + (newly_blocks_processed, missing_references) } pub fn missing_blocks(&self) -> &[HashSet] { @@ -145,7 +170,7 @@ mod tests { ); let mut processed_blocks = HashSet::new(); for block in iter { - let processed = bm.add_blocks(vec![block.clone()], &mut block_writer); + let (processed, _missing) = bm.add_blocks(vec![block.clone()], &mut block_writer); print!("Adding {:?}:", block.reference()); for (_, p) in processed { print!("{:?},", p.reference()); @@ -163,6 +188,45 @@ mod tests { } } + #[test] + fn test_block_manager_add_block_missing_references() { + let (metrics, _reporter) = Metrics::new(&Registry::new(), None); + let dag = + Dag::draw("A1:[A0, B0]; B1:[A0, B0]; B2:[A0, B1]; A2:[A1, B1]").add_genesis_blocks(); + assert_eq!(dag.len(), 6); // 4 blocks in dag + 2 genesis + + let mut block_writer = TestBlockWriter::new(&dag.committee()); + let mut iter = dag.iter_rev(); + let mut bm = BlockManager::new( + block_writer.block_store(), + &dag.committee(), + metrics.clone(), + ); + + let a2 = iter.next().unwrap(); + + // WHEN processing block A2 for first time we should get back 2 missing references + let (_processed, missing) = bm.add_blocks(vec![a2.clone()], &mut block_writer); + assert_eq!(missing.len(), 2); + + // WHEN processing again block A2, then now missing should be empty + let (_processed, missing) = bm.add_blocks(vec![a2.clone()], &mut block_writer); + assert!(missing.is_empty()); + + // WHEN processing block B2, should now yield one missing blocks + let b2 = iter.next().unwrap(); + let (_processed, missing) = bm.add_blocks(vec![b2.clone()], &mut block_writer); + assert_eq!(missing.len(), 1); + + // Now processing all the rest of the blocks should yield as missing zero + let (_processed, missing) = bm.add_blocks(iter.cloned().collect(), &mut block_writer); + + assert!(missing.is_empty()); + assert_eq!(bm.block_references_waiting.len(), 0); + assert_eq!(bm.blocks_pending.len(), 0); + assert_eq!(bm.block_store.len_expensive(), dag.len()); + } + fn rng(s: u8) -> StdRng { let mut seed = [0; 32]; seed[0] = s; diff --git a/mysticeti-core/src/core.rs b/mysticeti-core/src/core.rs index 95f8c356..d4604d23 100644 --- a/mysticeti-core/src/core.rs +++ b/mysticeti-core/src/core.rs @@ -163,14 +163,16 @@ impl Core { self } - // Note that generally when you update this function you also want to change genesis initialization above - pub fn add_blocks(&mut self, blocks: Vec>) -> Vec> { + // Note that generally when you update this function you also want to change genesis initialization above. + // The method returns the missing references in order to successfully process the provided blocks. The missing + // references though will be returned only the first time that a block is provided for processing. + pub fn add_blocks(&mut self, blocks: Vec>) -> Vec { let _timer = self .metrics .utilization_timer .utilization_timer("Core::add_blocks"); let now = timestamp_utc(); - let processed = self + let (processed, missing_references) = self .block_manager .add_blocks(blocks, &mut (&mut self.wal_writer, &self.block_store)); let mut result = Vec::with_capacity(processed.len()); @@ -197,7 +199,7 @@ impl Core { .threshold_clock_round .set(self.threshold_clock.get_round() as i64); self.run_block_handler(&result); - result + missing_references.into_iter().collect() } fn run_block_handler(&mut self, processed: &[Data]) { diff --git a/mysticeti-core/src/core_thread/simulated.rs b/mysticeti-core/src/core_thread/simulated.rs index 6e9750f2..d1545cba 100644 --- a/mysticeti-core/src/core_thread/simulated.rs +++ b/mysticeti-core/src/core_thread/simulated.rs @@ -41,8 +41,8 @@ impl< &self, blocks: Vec>, connected_authorities: AuthoritySet, - ) { - self.syncer.lock().add_blocks(blocks, connected_authorities); + ) -> Vec { + self.syncer.lock().add_blocks(blocks, connected_authorities) } pub async fn force_new_block(&self, round: RoundNumber, connected_authorities: AuthoritySet) { diff --git a/mysticeti-core/src/core_thread/spawned.rs b/mysticeti-core/src/core_thread/spawned.rs index 321aef19..287182c1 100644 --- a/mysticeti-core/src/core_thread/spawned.rs +++ b/mysticeti-core/src/core_thread/spawned.rs @@ -30,7 +30,11 @@ pub struct CoreThread>, AuthoritySet, oneshot::Sender<()>), + AddBlocks( + Vec>, + AuthoritySet, + oneshot::Sender>, + ), ForceNewBlock(RoundNumber, AuthoritySet, oneshot::Sender<()>), Cleanup(oneshot::Sender<()>), /// Request missing blocks that need to be synched. @@ -72,7 +76,7 @@ impl< &self, blocks: Vec>, connected_authorities: AuthoritySet, - ) { + ) -> Vec { let (sender, receiver) = oneshot::channel(); self.send(CoreThreadCommand::AddBlocks( blocks, @@ -80,7 +84,7 @@ impl< sender, )) .await; - receiver.await.expect("core thread is not expected to stop"); + receiver.await.expect("core thread is not expected to stop") } pub async fn force_new_block(&self, round: RoundNumber, connected_authorities: AuthoritySet) { @@ -130,8 +134,8 @@ impl { - self.syncer.add_blocks(blocks, connected_authorities); - sender.send(()).ok(); + let missing_blocks = self.syncer.add_blocks(blocks, connected_authorities); + sender.send(missing_blocks).ok(); } CoreThreadCommand::ForceNewBlock(round, connected_authorities, sender) => { self.syncer.force_new_block(round, connected_authorities); diff --git a/mysticeti-core/src/net_sync.rs b/mysticeti-core/src/net_sync.rs index f6869667..29880a9a 100644 --- a/mysticeti-core/src/net_sync.rs +++ b/mysticeti-core/src/net_sync.rs @@ -9,7 +9,7 @@ use crate::runtime::Handle; use crate::runtime::{self, timestamp_utc}; use crate::runtime::{JoinError, JoinHandle}; use crate::syncer::{Syncer, SyncerSignals}; -use crate::types::{AuthorityIndex, StatementBlock}; +use crate::types::{AuthorityIndex, BlockReference, StatementBlock}; use crate::types::{AuthoritySet, RoundNumber}; use crate::wal::WalSyncer; use crate::{block_handler::BlockHandler, metrics::Metrics}; @@ -267,18 +267,14 @@ impl NetworkSyncer disseminator.disseminate_own_blocks(round).await } NetworkMessage::Blocks(blocks) => { - if Self::process_blocks(&inner, &block_verifier, &metrics, blocks) - .await - .is_err() - { - break; - } - } - NetworkMessage::Block(block) => { - if Self::process_blocks(&inner, &block_verifier, &metrics, vec![block]) - .await - .is_err() + if let Ok(missing_blocks) = + Self::process_blocks(&inner, &block_verifier, &metrics, blocks).await { + // we only want to request missing blocks when a validator is sending us their block + // proposals, and not during a bulk catchup via our request (RequestBlocks) to avoid + // overwhelming the peer. + Self::request_missing_blocks(missing_blocks, &connection.sender); + } else { break; } } @@ -295,6 +291,14 @@ impl NetworkSyncer break; } } + NetworkMessage::RequestBlocksResponse(blocks) => { + if Self::process_blocks(&inner, &block_verifier, &metrics, blocks) + .await + .is_err() + { + break; + } + } NetworkMessage::BlockNotFound(_references) => { // TODO: leverage this signal to request blocks from other peers } @@ -312,9 +316,9 @@ impl NetworkSyncer block_verifier: &Arc, metrics: &Arc, blocks: Vec>, - ) -> Result<(), eyre::Report> { + ) -> Result, eyre::Report> { if blocks.is_empty() { - return Ok(()); + return Ok(vec![]); } let now = timestamp_utc(); @@ -372,13 +376,22 @@ impl NetworkSyncer if !to_process.is_empty() { let connected_authorities = inner.connected_authorities.lock().authorities.clone(); - inner + return Ok(inner .syncer .add_blocks(to_process, connected_authorities) - .await; + .await); } - Ok(()) + Ok(vec![]) + } + + fn request_missing_blocks( + missing_blocks: Vec, + sender: &mpsc::Sender, + ) { + if let Ok(permit) = sender.try_reserve() { + permit.send(NetworkMessage::RequestBlocks(missing_blocks)) + } } async fn leader_timeout_task( diff --git a/mysticeti-core/src/network.rs b/mysticeti-core/src/network.rs index 59210c22..b3acecb9 100644 --- a/mysticeti-core/src/network.rs +++ b/mysticeti-core/src/network.rs @@ -34,11 +34,12 @@ const PING_INTERVAL: Duration = Duration::from_secs(30); #[derive(Debug, Serialize, Deserialize)] pub enum NetworkMessage { SubscribeOwnFrom(RoundNumber), // subscribe from round number excluding - Block(Data), - // Sending multiple blocks at once + /// Sending multiple blocks at once Blocks(Vec>), /// Request a few specific block references (this is not indented for large requests). RequestBlocks(Vec), + /// The response to the request blocks + RequestBlocksResponse(Vec>), /// Indicate that a requested block is not found. BlockNotFound(Vec), } diff --git a/mysticeti-core/src/syncer.rs b/mysticeti-core/src/syncer.rs index bc8d2301..50d0acff 100644 --- a/mysticeti-core/src/syncer.rs +++ b/mysticeti-core/src/syncer.rs @@ -6,7 +6,7 @@ use crate::core::Core; use crate::data::Data; use crate::metrics::UtilizationTimerVecExt; use crate::runtime::timestamp_utc; -use crate::types::{AuthoritySet, RoundNumber, StatementBlock}; +use crate::types::{AuthoritySet, BlockReference, RoundNumber, StatementBlock}; use crate::{block_handler::BlockHandler, metrics::Metrics}; use std::sync::Arc; use tokio::sync::watch::Sender; @@ -67,13 +67,13 @@ impl>, connected_authorities: AuthoritySet, - ) { + ) -> Vec { let _timer = self .metrics .utilization_timer .utilization_timer("Syncer::add_blocks"); let previous_round = self.core().current_round(); - self.core.add_blocks(blocks); + let missing_references = self.core.add_blocks(blocks); let new_round = self.core().current_round(); // we got a new quorum of blocks. Let leader timeout task know about it. @@ -82,6 +82,8 @@ impl DagIter { + let mut v: Vec<_> = self.0.keys().cloned().collect(); + v.sort_by(|b1, b2| { + let ordering = b2.round.cmp(&b1.round); + if ordering == Ordering::Equal { + return b1.authority.cmp(&b2.authority); + } + ordering + }); + DagIter(self, v.into_iter()) + } + pub fn len(&self) -> usize { self.0.len() } @@ -864,6 +877,17 @@ mod test { } } + pub struct DagIter<'a>(&'a Dag, std::vec::IntoIter); + + impl<'a> Iterator for DagIter<'a> { + type Item = &'a Data; + + fn next(&mut self) -> Option { + let next = self.1.next()?; + Some(self.0 .0.get(&next).unwrap()) + } + } + #[test] fn test_draw_dag() { let d = Dag::draw("A1:[A0, B1]; B2:[B1]").0;