Skip to content

Commit

Permalink
lookup sync span
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev committed Nov 27, 2024
1 parent de7b04b commit 77fc2d4
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use tracing::{debug, error, warn};
use tracing::{debug, error, instrument, warn};
use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock};

pub mod common;
Expand Down Expand Up @@ -126,6 +126,7 @@ use lighthouse_network::service::api_types::Id;
pub(crate) type BlockLookupSummary = (Id, Hash256, Option<Hash256>, Vec<PeerId>);

impl<T: BeaconChainTypes> BlockLookups<T> {
#[instrument(level = "info", name = "lookup_sync")]
pub fn new() -> Self {
Self {
failed_chains: LRUTimeCache::new(Duration::from_secs(
Expand All @@ -136,16 +137,19 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

#[cfg(test)]
#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub(crate) fn insert_failed_chain(&mut self, block_root: Hash256) {
self.failed_chains.insert(block_root);
}

#[cfg(test)]
#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub(crate) fn get_failed_chains(&mut self) -> Vec<Hash256> {
self.failed_chains.keys().cloned().collect()
}

#[cfg(test)]
#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub(crate) fn active_single_lookups(&self) -> Vec<BlockLookupSummary> {
self.single_block_lookups
.iter()
Expand All @@ -161,6 +165,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Returns a vec of all parent lookup chains by tip, in descending slot order (tip first)
#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub(crate) fn active_parent_lookups(&self) -> Vec<NodeChain> {
compute_parent_chains(
&self
Expand All @@ -175,6 +180,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/// Creates a parent lookup for the block with the given `block_root` and immediately triggers it.
/// If a parent lookup exists or is triggered, a current lookup will be created.
#[instrument(level = "info", name = "lookup_sync", skip(self, block_component, cx))]
pub fn search_child_and_parent(
&mut self,
block_root: Hash256,
Expand Down Expand Up @@ -204,6 +210,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/// Seach a block whose parent root is unknown.
/// Returns true if the lookup is created or already exists
#[instrument(level = "info", name = "lookup_sync", skip(self, cx))]
pub fn search_unknown_block(
&mut self,
block_root: Hash256,
Expand All @@ -219,6 +226,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// - `block_root_to_search` is a failed chain
///
/// Returns true if the lookup is created or already exists
#[instrument(level = "info", name = "lookup_sync", skip(self, cx))]
pub fn search_parent_of_child(
&mut self,
block_root_to_search: Hash256,
Expand Down Expand Up @@ -320,6 +328,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is
/// constructed.
/// Returns true if the lookup is created or already exists
#[instrument(level = "info", name = "lookup_sync", skip(self, block_component, cx))]
fn new_current_lookup(
&mut self,
block_root: Hash256,
Expand Down Expand Up @@ -422,6 +431,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Lookup responses */

/// Process a block or blob response received from a single lookup request.
#[instrument(level = "info", name = "lookup_sync", skip(self, response, cx))]
pub fn on_download_response<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
Expand Down Expand Up @@ -507,6 +517,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/* Error responses */

#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
for (_, lookup) in self.single_block_lookups.iter_mut() {
lookup.remove_peer(peer_id);
Expand All @@ -515,6 +526,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/* Processing responses */

#[instrument(level = "info", name = "lookup_sync", skip(self, cx))]
pub fn on_processing_result(
&mut self,
process_type: BlockProcessType,
Expand All @@ -535,6 +547,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
}

#[instrument(level = "info", name = "lookup_sync", skip(self, cx))]
pub fn on_processing_result_inner<R: RequestState<T>>(
&mut self,
lookup_id: SingleLookupId,
Expand Down Expand Up @@ -699,6 +712,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}

#[instrument(level = "info", name = "lookup_sync", skip(self, cx))]
pub fn on_external_processing_result(
&mut self,
block_root: Hash256,
Expand All @@ -724,6 +738,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Makes progress on the immediate children of `block_root`
#[instrument(level = "info", name = "lookup_sync", skip(self, cx))]
pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext<T>) {
let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self

Expand All @@ -744,6 +759,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need
/// the parent to make progress to resolve, therefore we must drop them if the parent is
/// dropped.
#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub fn drop_lookup_and_children(&mut self, dropped_id: SingleLookupId) {
if let Some(dropped_lookup) = self.single_block_lookups.remove(&dropped_id) {
debug!(
Expand All @@ -768,6 +784,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

/// Common handler a lookup request error, drop it and update metrics
/// Returns true if the lookup is created or already exists
#[instrument(level = "info", name = "lookup_sync", skip(self, result, cx))]
fn on_lookup_result(
&mut self,
id: SingleLookupId,
Expand Down Expand Up @@ -805,12 +822,14 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Helper functions */

/// Drops all the single block requests and returns how many requests were dropped.
#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub fn drop_single_block_requests(&mut self) -> usize {
let requests_to_drop = self.single_block_lookups.len();
self.single_block_lookups.clear();
requests_to_drop
}

#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub fn update_metrics(&self) {
metrics::set_gauge(
&metrics::SYNC_SINGLE_BLOCK_LOOKUPS,
Expand All @@ -819,6 +838,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Perform some prune operations on lookups on some interval
#[instrument(level = "info", name = "lookup_sync", skip(self))]
pub fn prune_lookups(&mut self) {
self.drop_lookups_without_peers();
self.drop_stuck_lookups();
Expand All @@ -842,6 +862,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
///
/// Instead there's no negative for keeping lookups with no peers around for some time. If we
/// regularly prune them, it should not be a memory concern (TODO: maybe yes!).
#[instrument(level = "info", name = "lookup_sync", skip(self))]
fn drop_lookups_without_peers(&mut self) {
for (lookup_id, block_root) in self
.single_block_lookups
Expand Down Expand Up @@ -879,6 +900,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
///
/// - One single clear warn level log per stuck incident
/// - If the original bug is sporadic, it reduces the time a node is stuck from forever to 15 min
#[instrument(level = "info", name = "lookup_sync", skip(self))]
fn drop_stuck_lookups(&mut self) {
// While loop to find and drop all disjoint trees of potentially stuck lookups.
while let Some(stuck_lookup) = self.single_block_lookups.values().find(|lookup| {
Expand Down Expand Up @@ -916,6 +938,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}

/// Recursively find the oldest ancestor lookup of another lookup
#[instrument(level = "info", name = "lookup_sync", skip(self))]
fn find_oldest_ancestor_lookup<'a>(
&'a self,
lookup: &'a SingleBlockLookup<T>,
Expand All @@ -940,6 +963,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/// Adds peers to a lookup and its ancestors recursively.
/// Note: Takes a `lookup_id` as argument to allow recursion on mutable lookups, without having
/// to duplicate the code to add peers to a lookup
#[instrument(level = "info", name = "lookup_sync", skip(self, cx))]
fn add_peers_to_lookup_and_ancestors(
&mut self,
lookup_id: SingleLookupId,
Expand Down

0 comments on commit 77fc2d4

Please sign in to comment.