diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 3594b9415979e..2ab0aeac813a4 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -292,6 +292,8 @@ where network_manager.install_service(network_service).await; + info!("Authority start complete, took {:?}", start_time.elapsed()); + Self { context, start_time, diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index fb1f469c3c71e..0dcd9071edadc 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -5,7 +5,8 @@ use std::{sync::Arc, time::Duration}; use mysten_metrics::monitored_mpsc::UnboundedSender; use parking_lot::RwLock; -use tracing::info; +use tokio::time::Instant; +use tracing::{debug, info}; use crate::{ block::{BlockAPI, VerifiedBlock}, @@ -97,17 +98,19 @@ impl CommitObserver { } fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) { + let now = Instant::now(); // TODO: remove this check, to allow consensus to regenerate commits? let last_commit = self .store .read_last_commit() .expect("Reading the last commit should not fail"); - if let Some(last_commit) = last_commit { + if let Some(last_commit) = &last_commit { let last_commit_index = last_commit.index(); assert!(last_commit_index >= last_processed_commit_index); if last_commit_index == last_processed_commit_index { + debug!("Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index"); return; } }; @@ -118,6 +121,8 @@ impl CommitObserver { .scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into()) .expect("Scanning commits should not fail"); + debug!("Recovering commit observer from {last_processed_commit_index} with last commit {:?} and {} unsent commits", last_commit, unsent_commits.len()); + // Resend all the committed subdags to the consensus output channel // for all the commits above the last processed index. let mut last_sent_commit_index = last_processed_commit_index; @@ -150,6 +155,11 @@ impl CommitObserver { last_sent_commit_index += 1; } + + debug!( + "Commit observer recovery complete, took {:?}", + now.elapsed() + ); } fn report_metrics(&self, committed: &[CommittedSubDag]) { diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 0d103dff702d8..17d3d34ab3522 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -204,6 +204,11 @@ impl Core { .unwrap(); } + debug!( + "Core recovery complete with last block {:?}", + self.last_proposed_block + ); + self } @@ -406,6 +411,15 @@ impl Core { .node_metrics .block_ancestors .observe(ancestors.len() as f64); + for ancestor in &ancestors { + let authority = &self.context.committee.authority(ancestor.author()).hostname; + self.context + .metrics + .node_metrics + .block_ancestors_depth + .with_label_values(&[authority]) + .observe(clock_round.saturating_sub(ancestor.round()).into()); + } // Ensure ancestor timestamps are not more advanced than the current time. // Also catch the issue if system's clock go backwards. diff --git a/consensus/core/src/metrics.rs b/consensus/core/src/metrics.rs index 5587e77d5bcef..22543807306a3 100644 --- a/consensus/core/src/metrics.rs +++ b/consensus/core/src/metrics.rs @@ -101,6 +101,7 @@ pub(crate) struct NodeMetrics { pub(crate) proposed_blocks: IntCounterVec, pub(crate) block_size: Histogram, pub(crate) block_ancestors: Histogram, + pub(crate) block_ancestors_depth: HistogramVec, pub(crate) highest_verified_authority_round: IntGaugeVec, pub(crate) lowest_verified_authority_round: IntGaugeVec, pub(crate) block_proposal_leader_wait_ms: IntCounterVec, @@ -190,6 +191,13 @@ impl NodeMetrics { exponential_buckets(1.0, 1.4, 20).unwrap(), registry, ).unwrap(), + block_ancestors_depth: register_histogram_vec_with_registry!( + "block_ancestors_depth", + "The depth in rounds of ancestors included in newly proposed blocks", + &["authority"], + exponential_buckets(1.0, 2.0, 14).unwrap(), + registry, + ).unwrap(), highest_verified_authority_round: register_int_gauge_vec_with_registry!( "highest_verified_authority_round", "The highest round of verified block for the corresponding authority", diff --git a/consensus/core/src/network/anemo_network.rs b/consensus/core/src/network/anemo_network.rs index 7bfeb07bdd676..54ffff9c15d16 100644 --- a/consensus/core/src/network/anemo_network.rs +++ b/consensus/core/src/network/anemo_network.rs @@ -448,6 +448,8 @@ impl NetworkManager for AnemoManager { .with_label_values(&["anemo"]) .set(1); + debug!("Starting anemo service"); + let server = ConsensusRpcServer::new(AnemoServiceProxy::new(self.context.clone(), service)); let authority = self.context.committee.authority(self.context.own_index); // Bind to localhost in unit tests since only local networking is needed. diff --git a/consensus/core/src/network/tonic_network.rs b/consensus/core/src/network/tonic_network.rs index 91f826c581c42..cc12d15dd00c5 100644 --- a/consensus/core/src/network/tonic_network.rs +++ b/consensus/core/src/network/tonic_network.rs @@ -16,6 +16,7 @@ use consensus_config::{AuthorityIndex, NetworkKeyPair, NetworkPublicKey}; use futures::{stream, Stream, StreamExt as _}; use hyper::server::conn::Http; use mysten_common::sync::notify_once::NotifyOnce; +use mysten_metrics::monitored_future; use mysten_network::{ callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler}, multiaddr::Protocol, @@ -654,6 +655,8 @@ impl NetworkManager for TonicManager { .with_label_values(&["tonic"]) .set(1); + debug!("Starting tonic service"); + let authority = self.context.committee.authority(self.context.own_index); // Bind to localhost in unit tests since only local networking is needed. // Bind to the unspecified address to allow the actual address to be assigned, @@ -699,7 +702,7 @@ impl NetworkManager for TonicManager { let tls_acceptor = TlsAcceptor::from(Arc::new(tls_server_config)); // Create listener to incoming connections. - let deadline = Instant::now() + Duration::from_secs(30); + let deadline = Instant::now() + Duration::from_secs(20); let listener = loop { if Instant::now() > deadline { panic!("Failed to start server: timeout"); @@ -764,7 +767,7 @@ impl NetworkManager for TonicManager { let shutdown_notif = self.shutdown_notif.clone(); - self.server.spawn(async move { + self.server.spawn(monitored_future!(async move { let mut connection_handlers = JoinSet::new(); loop { @@ -895,7 +898,7 @@ impl NetworkManager for TonicManager { Ok(()) }); } - }); + })); info!("Server started at: {own_address}"); } diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 3c1019e211591..237eec4b1b436 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -244,23 +244,30 @@ impl Synchronizer Synchronizer