Skip to content

Commit

Permalink
[consensus] Send excluded ancestors with new block messages (#20896)
Browse files Browse the repository at this point in the history
## Description 

In an effort to continue using smart ancestor selection we have to
ensure that we are not sacrificing on block propagation. This PR adds
excluded ancestors as part of the message sent when a new block is
created which can then be optimistically fetched by peers if they don't
have these block refs.

## Test plan 

Pending PTN experiments

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:

---------

Co-authored-by: Mingwei Tian <mingwei@mystenlabs.com>
  • Loading branch information
arun-koshy and mwtian authored Jan 19, 2025
1 parent 4d7b082 commit f31228f
Show file tree
Hide file tree
Showing 17 changed files with 638 additions and 163 deletions.
112 changes: 97 additions & 15 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use tokio_util::sync::ReusableBoxFuture;
use tracing::{debug, info, warn};

use crate::{
block::{BlockAPI as _, BlockRef, SignedBlock, VerifiedBlock, GENESIS_ROUND},
block::{BlockAPI as _, BlockRef, ExtendedBlock, SignedBlock, VerifiedBlock, GENESIS_ROUND},
block_verifier::BlockVerifier,
commit::{CommitAPI as _, CommitRange, TrustedCommit},
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::CoreThreadDispatcher,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
network::{BlockStream, NetworkService},
network::{BlockStream, ExtendedSerializedBlock, NetworkService},
stake_aggregator::{QuorumThreshold, StakeAggregator},
storage::Store,
synchronizer::SynchronizerHandle,
Expand All @@ -38,7 +38,7 @@ pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
block_verifier: Arc<dyn BlockVerifier>,
synchronizer: Arc<SynchronizerHandle>,
core_dispatcher: Arc<C>,
rx_block_broadcaster: broadcast::Receiver<VerifiedBlock>,
rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
subscription_counter: Arc<SubscriptionCounter>,
dag_state: Arc<RwLock<DagState>>,
store: Arc<dyn Store>,
Expand All @@ -51,7 +51,7 @@ impl<C: CoreThreadDispatcher> AuthorityService<C> {
commit_vote_monitor: Arc<CommitVoteMonitor>,
synchronizer: Arc<SynchronizerHandle>,
core_dispatcher: Arc<C>,
rx_block_broadcaster: broadcast::Receiver<VerifiedBlock>,
rx_block_broadcaster: broadcast::Receiver<ExtendedBlock>,
dag_state: Arc<RwLock<DagState>>,
store: Arc<dyn Store>,
) -> Self {
Expand All @@ -78,15 +78,15 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
async fn handle_send_block(
&self,
peer: AuthorityIndex,
serialized_block: Bytes,
serialized_block: ExtendedSerializedBlock,
) -> ConsensusResult<()> {
fail_point_async!("consensus-rpc-response");

let peer_hostname = &self.context.committee.authority(peer).hostname;

// TODO: dedup block verifications, here and with fetched blocks.
let signed_block: SignedBlock =
bcs::from_bytes(&serialized_block).map_err(ConsensusError::MalformedBlock)?;
bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;

// Reject blocks not produced by the peer.
if peer != signed_block.author() {
Expand All @@ -113,7 +113,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
info!("Invalid block from {}: {}", peer, e);
return Err(e);
}
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block);
let verified_block = VerifiedBlock::new_verified(signed_block, serialized_block.block);
let block_ref = verified_block.reference();
debug!("Received block {} via send block.", block_ref);

Expand Down Expand Up @@ -225,6 +225,75 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
}
}

// ------------ After processing the block, process the excluded ancestors ------------

let mut excluded_ancestors = serialized_block
.excluded_ancestors
.into_iter()
.map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
.collect::<Result<Vec<BlockRef>, bcs::Error>>()
.map_err(ConsensusError::MalformedBlock)?;

let excluded_ancestors_limit = self.context.committee.size() * 2;
if excluded_ancestors.len() > excluded_ancestors_limit {
debug!(
"Dropping {} excluded ancestor(s) from {} {} due to size limit",
excluded_ancestors.len() - excluded_ancestors_limit,
peer,
peer_hostname,
);
excluded_ancestors.truncate(excluded_ancestors_limit);
}

self.context
.metrics
.node_metrics
.network_received_excluded_ancestors_from_authority
.with_label_values(&[peer_hostname])
.inc_by(excluded_ancestors.len() as u64);

for excluded_ancestor in &excluded_ancestors {
let excluded_ancestor_hostname = &self
.context
.committee
.authority(excluded_ancestor.author)
.hostname;
self.context
.metrics
.node_metrics
.network_excluded_ancestors_count_by_authority
.with_label_values(&[excluded_ancestor_hostname])
.inc();
}

let missing_excluded_ancestors = self
.core_dispatcher
.check_block_refs(excluded_ancestors)
.await
.map_err(|_| ConsensusError::Shutdown)?;

if !missing_excluded_ancestors.is_empty() {
self.context
.metrics
.node_metrics
.network_excluded_ancestors_sent_to_fetch
.with_label_values(&[peer_hostname])
.inc_by(missing_excluded_ancestors.len() as u64);

let synchronizer = self.synchronizer.clone();
tokio::spawn(async move {
// schedule the fetching of them from this peer in the background
if let Err(err) = synchronizer
.fetch_blocks(missing_excluded_ancestors, peer)
.await
{
warn!(
"Errored while trying to fetch missing excluded ancestors via synchronizer: {err}"
);
}
});
}

Ok(())
}

Expand All @@ -243,7 +312,10 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
dag_state
.get_cached_blocks(self.context.own_index, last_received + 1)
.into_iter()
.map(|block| block.serialized().clone()),
.map(|block| ExtendedSerializedBlock {
block: block.serialized().clone(),
excluded_ancestors: vec![],
}),
);

let broadcasted_blocks = BroadcastedBlockStream::new(
Expand All @@ -254,7 +326,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {

// Return a stream of blocks that first yields missed blocks as requested, then new blocks.
Ok(Box::pin(missed_blocks.chain(
broadcasted_blocks.map(|block| block.serialized().clone()),
broadcasted_blocks.map(ExtendedSerializedBlock::from),
)))
}

Expand Down Expand Up @@ -423,7 +495,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
.get_last_cached_block_per_authority(Round::MAX);
let highest_accepted_rounds = blocks
.into_iter()
.map(|block| block.round())
.map(|(block, _)| block.round())
.collect::<Vec<_>>();

// Own blocks do not go through the core dispatcher, so they need to be set separately.
Expand Down Expand Up @@ -516,7 +588,7 @@ impl SubscriptionCounter {

/// Each broadcasted block stream wraps a broadcast receiver for blocks.
/// It yields blocks that are broadcasted after the stream is created.
type BroadcastedBlockStream = BroadcastStream<VerifiedBlock>;
type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;

/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
/// this tolerates lags with only logging, without yielding errors.
Expand Down Expand Up @@ -612,15 +684,14 @@ async fn make_recv_future<T: Clone>(
mod tests {
use crate::{
authority_service::AuthorityService,
block::BlockAPI,
block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock},
block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
commit::CommitRange,
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
dag_state::DagState,
error::ConsensusResult,
network::{BlockStream, NetworkClient, NetworkService},
network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
round_prober::QuorumRound,
storage::mem_store::MemStore,
synchronizer::Synchronizer,
Expand Down Expand Up @@ -664,6 +735,13 @@ mod tests {
Ok(block_refs)
}

async fn check_block_refs(
&self,
_block_refs: Vec<BlockRef>,
) -> Result<BTreeSet<BlockRef>, CoreError> {
Ok(BTreeSet::new())
}

async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
Ok(())
}
Expand Down Expand Up @@ -797,7 +875,11 @@ mod tests {
);

let service = authority_service.clone();
let serialized = input_block.serialized().clone();
let serialized = ExtendedSerializedBlock {
block: input_block.serialized().clone(),
excluded_ancestors: vec![],
};

tokio::spawn(async move {
service
.handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
Expand Down
14 changes: 13 additions & 1 deletion consensus/core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use serde::{Deserialize, Serialize};
use shared_crypto::intent::{Intent, IntentMessage, IntentScope};

use crate::{
commit::CommitVote, context::Context, ensure, error::ConsensusError, error::ConsensusResult,
commit::CommitVote,
context::Context,
ensure,
error::{ConsensusError, ConsensusResult},
};

/// Round number of a block.
Expand Down Expand Up @@ -638,6 +641,15 @@ impl fmt::Debug for VerifiedBlock {
}
}

/// Block with extended additional information, such as
/// local blocks that are excluded from the block's ancestors.
/// The extended information do not need to be certified or forwarded to other authorities.
#[derive(Clone, Debug)]
pub(crate) struct ExtendedBlock {
pub block: VerifiedBlock,
pub excluded_ancestors: Vec<BlockRef>,
}

/// Generates the genesis blocks for the current Committee.
/// The blocks are returned in authority index order.
pub(crate) fn genesis_blocks(context: Arc<Context>) -> Vec<VerifiedBlock> {
Expand Down
Loading

0 comments on commit f31228f

Please sign in to comment.