diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index cb24b38a9dc..06c4da03fd4 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -439,13 +439,23 @@ impl ValidatorMonitor { /// Returns `true` when the validator count is sufficiently low enough to /// emit metrics and logs on a per-validator basis (rather than just an /// aggregated basis). - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] fn individual_tracking(&self) -> bool { self.validators.len() <= self.individual_tracking_threshold } /// Add some validators to `self` for additional monitoring. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn add_validator_pubkey(&mut self, pubkey: PublicKeyBytes) { let index_opt = self .indices @@ -463,7 +473,12 @@ impl ValidatorMonitor { } /// Add an unaggregated attestation - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn set_unaggregated_attestation(&mut self, attestation: Attestation) { let unaggregated_attestations = &mut self.unaggregated_attestations; @@ -477,14 +492,24 @@ impl ValidatorMonitor { self.unaggregated_attestations.insert(slot, attestation); } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn get_unaggregated_attestation(&self, slot: Slot) -> Option<&Attestation> { self.unaggregated_attestations.get(&slot) } /// Reads information from the given `state`. The `state` *must* be valid (i.e, able to be /// imported). - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn process_valid_state( &mut self, current_epoch: Epoch, @@ -597,7 +622,12 @@ impl ValidatorMonitor { } /// Add missed non-finalized blocks for the monitored validators - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] fn add_validators_missed_blocks(&mut self, state: &BeaconState) { // Define range variables let current_slot = state.slot(); @@ -694,7 +724,12 @@ impl ValidatorMonitor { } } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] fn get_proposers_by_epoch_from_cache( &mut self, epoch: Epoch, @@ -708,7 +743,12 @@ impl ValidatorMonitor { /// Process the unaggregated attestations generated by the service `attestation_simulator_service` /// and check if the attestation qualifies for a reward matching the flags source/target/head - #[instrument(level = "info", name = "validator_monitor", skip(self, state))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, state) + )] fn process_unaggregated_attestations(&mut self, state: &BeaconState, spec: &ChainSpec) { let current_slot = state.slot(); @@ -781,7 +821,12 @@ impl ValidatorMonitor { /// /// We allow disabling tracking metrics on an individual validator basis /// since it can result in untenable cardinality with high validator counts. - #[instrument(level = "info", name = "validator_monitor", skip(self, func))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, func) + )] fn aggregatable_metric(&self, individual_id: &str, func: F) { func(TOTAL_LABEL); @@ -790,7 +835,12 @@ impl ValidatorMonitor { } } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn process_validator_statuses( &self, epoch: Epoch, @@ -1068,7 +1118,12 @@ impl ValidatorMonitor { Ok(()) } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] fn get_validator(&self, validator_index: u64) -> Option<&MonitoredValidator> { self.indices .get(&validator_index) @@ -1076,18 +1131,33 @@ impl ValidatorMonitor { } /// Returns the number of validators monitored by `self`. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn num_validators(&self) -> usize { self.validators.len() } /// Return the `id`'s of all monitored validators. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn get_all_monitored_validators(&self) -> Vec { self.validators.values().map(|val| val.id.clone()).collect() } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn get_monitored_validator(&self, index: u64) -> Option<&MonitoredValidator> { if let Some(pubkey) = self.indices.get(&index) { self.validators.get(pubkey) @@ -1096,7 +1166,12 @@ impl ValidatorMonitor { } } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn get_monitored_validator_missed_block_count(&self, validator_index: u64) -> u64 { self.missed_blocks .iter() @@ -1104,14 +1179,24 @@ impl ValidatorMonitor { .count() as u64 } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn get_beacon_proposer_cache(&self) -> Arc> { self.beacon_proposer_cache.clone() } /// If `self.auto_register == true`, add the `validator_index` to `self.monitored_validators`. /// Otherwise, do nothing. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn auto_register_local_validator(&mut self, validator_index: u64) { if !self.auto_register { return; @@ -1134,7 +1219,12 @@ impl ValidatorMonitor { } /// Process a block received on gossip. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_gossip_block( &self, seen_timestamp: Duration, @@ -1146,7 +1236,12 @@ impl ValidatorMonitor { } /// Process a block received on the HTTP API from a local validator. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_api_block( &self, seen_timestamp: Duration, @@ -1157,7 +1252,12 @@ impl ValidatorMonitor { self.register_beacon_block("api", seen_timestamp, block, block_root, slot_clock) } - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] fn register_beacon_block( &self, src: &str, @@ -1197,7 +1297,12 @@ impl ValidatorMonitor { } /// Register an attestation seen on the gossip network. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_gossip_unaggregated_attestation( &self, seen_timestamp: Duration, @@ -1213,7 +1318,12 @@ impl ValidatorMonitor { } /// Register an attestation seen on the HTTP API. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_api_unaggregated_attestation( &self, seen_timestamp: Duration, @@ -1228,7 +1338,12 @@ impl ValidatorMonitor { ) } - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] fn register_unaggregated_attestation( &self, src: &str, @@ -1315,7 +1430,12 @@ impl ValidatorMonitor { ) } - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] fn register_aggregated_attestation( &self, src: &str, @@ -1514,7 +1634,12 @@ impl ValidatorMonitor { } /// Register a sync committee message received over gossip. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_gossip_sync_committee_message( &self, seen_timestamp: Duration, @@ -1530,7 +1655,12 @@ impl ValidatorMonitor { } /// Register a sync committee message received over the http api. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_api_sync_committee_message( &self, seen_timestamp: Duration, @@ -1546,7 +1676,12 @@ impl ValidatorMonitor { } /// Register a sync committee message. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] fn register_sync_committee_message( &self, src: &str, @@ -1596,7 +1731,12 @@ impl ValidatorMonitor { } /// Register a sync committee contribution received over gossip. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_gossip_sync_committee_contribution( &self, seen_timestamp: Duration, @@ -1614,7 +1754,12 @@ impl ValidatorMonitor { } /// Register a sync committee contribution received over the http api. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn register_api_sync_committee_contribution( &self, seen_timestamp: Duration, @@ -1632,7 +1777,12 @@ impl ValidatorMonitor { } /// Register a sync committee contribution. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] fn register_sync_committee_contribution( &self, src: &str, @@ -1715,7 +1865,12 @@ impl ValidatorMonitor { } /// Register that the `sync_aggregate` was included in a *valid* `BeaconBlock`. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_sync_aggregate_in_block( &self, slot: Slot, @@ -1753,24 +1908,44 @@ impl ValidatorMonitor { } /// Register an exit from the gossip network. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_gossip_voluntary_exit(&self, exit: &VoluntaryExit) { self.register_voluntary_exit("gossip", exit) } /// Register an exit from the HTTP API. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_api_voluntary_exit(&self, exit: &VoluntaryExit) { self.register_voluntary_exit("api", exit) } /// Register an exit included in a *valid* beacon block. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_block_voluntary_exit(&self, exit: &VoluntaryExit) { self.register_voluntary_exit("block", exit) } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] fn register_voluntary_exit(&self, src: &str, exit: &VoluntaryExit) { if let Some(validator) = self.get_validator(exit.validator_index) { let id = &validator.id; @@ -1794,24 +1969,44 @@ impl ValidatorMonitor { } /// Register a proposer slashing from the gossip network. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_gossip_proposer_slashing(&self, slashing: &ProposerSlashing) { self.register_proposer_slashing("gossip", slashing) } /// Register a proposer slashing from the HTTP API. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_api_proposer_slashing(&self, slashing: &ProposerSlashing) { self.register_proposer_slashing("api", slashing) } /// Register a proposer slashing included in a *valid* `BeaconBlock`. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_block_proposer_slashing(&self, slashing: &ProposerSlashing) { self.register_proposer_slashing("block", slashing) } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] fn register_proposer_slashing(&self, src: &str, slashing: &ProposerSlashing) { let proposer = slashing.signed_header_1.message.proposer_index; let slot = slashing.signed_header_1.message.slot; @@ -1845,24 +2040,44 @@ impl ValidatorMonitor { } /// Register an attester slashing from the gossip network. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_gossip_attester_slashing(&self, slashing: AttesterSlashingRef<'_, E>) { self.register_attester_slashing("gossip", slashing) } /// Register an attester slashing from the HTTP API. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_api_attester_slashing(&self, slashing: AttesterSlashingRef<'_, E>) { self.register_attester_slashing("api", slashing) } /// Register an attester slashing included in a *valid* `BeaconBlock`. - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] pub fn register_block_attester_slashing(&self, slashing: AttesterSlashingRef<'_, E>) { self.register_attester_slashing("block", slashing) } - #[instrument(level = "info", name = "validator_monitor", skip(self))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self) + )] fn register_attester_slashing(&self, src: &str, slashing: AttesterSlashingRef<'_, E>) { let data = slashing.attestation_1().data(); let attestation_1_indices: HashSet = slashing @@ -1904,7 +2119,12 @@ impl ValidatorMonitor { /// Scrape `self` for metrics. /// /// Should be called whenever Prometheus is scraping Lighthouse. - #[instrument(level = "info", name = "validator_monitor", skip(self, slot_clock))] + #[instrument( + level = "info", + fields(service = "validator_monitor"), + name = "validator_monitor", + skip(self, slot_clock) + )] pub fn scrape_metrics(&self, slot_clock: &S, spec: &ChainSpec) { metrics::set_gauge( &metrics::VALIDATOR_MONITOR_VALIDATORS_TOTAL, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index a405edd27a6..04a1e8a736e 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -167,7 +167,12 @@ pub struct RPC { } impl RPC { - #[instrument(level = "trace", name = "libp2p_rpc", skip(network_params))] + #[instrument( + level = "trace", + fields(service = "libp2p_rpc"), + name = "libp2p_rpc", + skip(network_params) + )] pub fn new( fork_context: Arc, enable_light_client_server: bool, @@ -200,7 +205,12 @@ impl RPC { /// Sends an RPC response. /// /// The peer must be connected for this to succeed. - #[instrument(level = "trace", name = "libp2p_rpc", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p_rpc"), + name = "libp2p_rpc", + skip(self) + )] pub fn send_response( &mut self, peer_id: PeerId, @@ -218,7 +228,12 @@ impl RPC { /// Submits an RPC request. /// /// The peer must be connected for this to succeed. - #[instrument(level = "trace", name = "libp2p_rpc", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p_rpc"), + name = "libp2p_rpc", + skip(self) + )] pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) { let event = if let Some(self_limiter) = self.self_limiter.as_mut() { match self_limiter.allows(peer_id, request_id, req) { @@ -241,7 +256,12 @@ impl RPC { /// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This /// gracefully terminates the RPC behaviour with a goodbye message. - #[instrument(level = "trace", name = "libp2p_rpc", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p_rpc"), + name = "libp2p_rpc", + skip(self) + )] pub fn shutdown(&mut self, peer_id: PeerId, id: Id, reason: GoodbyeReason) { self.events.push(ToSwarm::NotifyHandler { peer_id, @@ -250,13 +270,23 @@ impl RPC { }); } - #[instrument(level = "trace", name = "libp2p_rpc", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p_rpc"), + name = "libp2p_rpc", + skip(self) + )] pub fn update_seq_number(&mut self, seq_number: u64) { self.seq_number = seq_number } /// Send a Ping request to the destination `PeerId` via `ConnectionId`. - #[instrument(level = "trace", name = "libp2p_rpc", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p_rpc"), + name = "libp2p_rpc", + skip(self) + )] pub fn ping(&mut self, peer_id: PeerId, id: Id) { let ping = Ping { data: self.seq_number, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 0f0e29e8391..2ad5cc4f910 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -165,7 +165,12 @@ pub struct Network { /// Implements the combined behaviour for the libp2p service. impl Network { - #[instrument(level = "trace", name = "libp2p", skip(executor, ctx))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(executor, ctx) + )] pub async fn new( executor: task_executor::TaskExecutor, mut ctx: ServiceContext<'_>, @@ -506,7 +511,12 @@ impl Network { /// - Starts listening in the given ports. /// - Dials boot-nodes and libp2p peers. /// - Subscribes to starting gossipsub topics. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] async fn start(&mut self, config: &crate::NetworkConfig) -> error::Result<()> { let enr = self.network_globals.local_enr(); info!( @@ -625,59 +635,114 @@ impl Network { /* Public Accessible Functions to interact with the behaviour */ /// The routing pub-sub mechanism for eth2. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn gossipsub_mut(&mut self) -> &mut Gossipsub { &mut self.swarm.behaviour_mut().gossipsub } /// The Eth2 RPC specified in the wire-0 protocol. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn eth2_rpc_mut(&mut self) -> &mut RPC { &mut self.swarm.behaviour_mut().eth2_rpc } /// Discv5 Discovery protocol. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn discovery_mut(&mut self) -> &mut Discovery { &mut self.swarm.behaviour_mut().discovery } /// Provides IP addresses and peer information. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn identify_mut(&mut self) -> &mut identify::Behaviour { &mut self.swarm.behaviour_mut().identify } /// The peer manager that keeps track of peer's reputation and status. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn peer_manager_mut(&mut self) -> &mut PeerManager { &mut self.swarm.behaviour_mut().peer_manager } /// The routing pub-sub mechanism for eth2. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn gossipsub(&self) -> &Gossipsub { &self.swarm.behaviour().gossipsub } /// The Eth2 RPC specified in the wire-0 protocol. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn eth2_rpc(&self) -> &RPC { &self.swarm.behaviour().eth2_rpc } /// Discv5 Discovery protocol. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn discovery(&self) -> &Discovery { &self.swarm.behaviour().discovery } /// Provides IP addresses and peer information. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn identify(&self) -> &identify::Behaviour { &self.swarm.behaviour().identify } /// The peer manager that keeps track of peer's reputation and status. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn peer_manager(&self) -> &PeerManager { &self.swarm.behaviour().peer_manager } /// Returns the local ENR of the node. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn local_enr(&self) -> Enr { self.network_globals.local_enr() } @@ -686,7 +751,12 @@ impl Network { /// Subscribes to a gossipsub topic kind, letting the network service determine the /// encoding and fork version. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn subscribe_kind(&mut self, kind: GossipKind) -> bool { let gossip_topic = GossipTopic::new( kind, @@ -699,7 +769,12 @@ impl Network { /// Unsubscribes from a gossipsub topic kind, letting the network service determine the /// encoding and fork version. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn unsubscribe_kind(&mut self, kind: GossipKind) -> bool { let gossip_topic = GossipTopic::new( kind, @@ -710,7 +785,12 @@ impl Network { } /// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) { // Subscribe to existing topics with new fork digest let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); @@ -741,7 +821,12 @@ impl Network { } /// Unsubscribe from all topics that doesn't have the given fork_digest - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn unsubscribe_from_fork_topics_except(&mut self, except: [u8; 4]) { let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone(); for topic in subscriptions @@ -754,7 +839,12 @@ impl Network { } /// Remove topic weight from all topics that don't have the given fork digest. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn remove_topic_weight_except(&mut self, except: [u8; 4]) { let new_param = TopicScoreParams { topic_weight: 0.0, @@ -779,7 +869,12 @@ impl Network { } /// Returns the scoring parameters for a topic if set. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn get_topic_params(&self, topic: GossipTopic) -> Option<&TopicScoreParams> { self.swarm .behaviour() @@ -790,7 +885,12 @@ impl Network { /// Subscribes to a gossipsub topic. /// /// Returns `true` if the subscription was successful and `false` otherwise. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn subscribe(&mut self, topic: GossipTopic) -> bool { // update the network globals self.network_globals @@ -813,7 +913,12 @@ impl Network { } /// Unsubscribe from a gossipsub topic. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn unsubscribe(&mut self, topic: GossipTopic) -> bool { // update the network globals self.network_globals @@ -838,7 +943,12 @@ impl Network { } /// Publishes a list of messages on the pubsub (gossipsub) behaviour, choosing the encoding. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn publish(&mut self, messages: Vec>) { for message in messages { for topic in message.topics(GossipEncoding::default(), self.enr_fork_id.fork_digest) { @@ -893,7 +1003,12 @@ impl Network { /// Informs the gossipsub about the result of a message validation. /// If the message is valid it will get propagated by gossipsub. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn report_message_validation_result( &mut self, propagation_source: &PeerId, @@ -930,7 +1045,12 @@ impl Network { /// Updates the current gossipsub scoring parameters based on the validator count and current /// slot. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn update_gossipsub_parameters( &mut self, active_validators: usize, @@ -974,7 +1094,12 @@ impl Network { /* Eth2 RPC behaviour functions */ /// Send a request to a peer over RPC. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn send_request( &mut self, peer_id: PeerId, @@ -992,7 +1117,12 @@ impl Network { } /// Send a successful response to a peer over RPC. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn send_response( &mut self, peer_id: PeerId, @@ -1005,7 +1135,12 @@ impl Network { } /// Inform the peer that their request produced an error. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn send_error_response( &mut self, peer_id: PeerId, @@ -1023,12 +1158,22 @@ impl Network { } /* Peer management functions */ - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn testing_dial(&mut self, addr: Multiaddr) -> Result<(), libp2p::swarm::DialError> { self.swarm.dial(addr) } - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn report_peer( &mut self, peer_id: &PeerId, @@ -1044,7 +1189,12 @@ impl Network { /// /// This will send a goodbye, disconnect and then ban the peer. /// This is fatal for a peer, and should be used in unrecoverable circumstances. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { self.peer_manager_mut() .goodbye_peer(peer_id, reason, source); @@ -1052,19 +1202,34 @@ impl Network { /// Hard (ungraceful) disconnect for testing purposes only /// Use goodbye_peer for disconnections, do not use this function. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) { let _ = self.swarm.disconnect_peer_id(peer_id); } /// Returns an iterator over all enr entries in the DHT. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn enr_entries(&self) -> Vec { self.discovery().table_entries_enr() } /// Add an ENR to the routing table of the discovery mechanism. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn add_enr(&mut self, enr: Enr) { self.discovery_mut().add_enr(enr); } @@ -1072,7 +1237,12 @@ impl Network { /// Updates a subnet value to the ENR attnets/syncnets bitfield. /// /// The `value` is `true` if a subnet is being added and false otherwise. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn update_enr_subnet(&mut self, subnet_id: Subnet, value: bool) { if let Err(e) = self.discovery_mut().update_enr_bitfield(subnet_id, value) { crit!(error = e, "Could not update ENR bitfield"); @@ -1083,7 +1253,12 @@ impl Network { /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { // If discovery is not started or disabled, ignore the request if !self.discovery().started { @@ -1138,7 +1313,12 @@ impl Network { } /// Updates the local ENR's "eth2" field with the latest EnrForkId. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub fn update_fork_version(&mut self, enr_fork_id: EnrForkId) { self.discovery_mut().update_eth2_enr(enr_fork_id.clone()); @@ -1149,7 +1329,12 @@ impl Network { /* Private internal functions */ /// Updates the current meta data of the node to match the local ENR. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn update_metadata_bitfields(&mut self) { let local_attnets = self .discovery_mut() @@ -1181,13 +1366,23 @@ impl Network { } /// Sends a Ping request to the peer. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn ping(&mut self, peer_id: PeerId) { self.eth2_rpc_mut().ping(peer_id, RequestId::Internal); } /// Sends a METADATA request to a peer. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn send_meta_data_request(&mut self, peer_id: PeerId) { let event = if self.fork_context.spec.is_peer_das_scheduled() { // Nodes with higher custody will probably start advertising it @@ -1202,7 +1397,12 @@ impl Network { } /// Sends a METADATA response to a peer. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn send_meta_data_response( &mut self, _req: MetadataRequest, @@ -1220,7 +1420,12 @@ impl Network { // RPC Propagation methods /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. #[must_use = "return the response"] - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn build_response( &mut self, id: RequestId, @@ -1239,7 +1444,12 @@ impl Network { /// Dial cached Enrs in discovery service that are in the given `subnet_id` and aren't /// in Connected, Dialing or Banned state. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet, spec: Arc) { let predicate = subnet_predicate::(vec![subnet], spec); let peers_to_dial: Vec = self @@ -1267,7 +1477,12 @@ impl Network { /* Sub-behaviour event handling functions */ /// Handle a gossipsub event. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn inject_gs_event(&mut self, event: gossipsub::Event) -> Option> { match event { gossipsub::Event::Message { @@ -1401,7 +1616,12 @@ impl Network { } /// Handle an RPC event. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn inject_rpc_event(&mut self, event: RPCMessage) -> Option> { let peer_id = event.peer_id; @@ -1688,7 +1908,12 @@ impl Network { } /// Handle an identify event. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn inject_identify_event(&mut self, event: identify::Event) -> Option> { match event { identify::Event::Received { @@ -1711,7 +1936,12 @@ impl Network { } /// Handle a peer manager event. - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn inject_pm_event(&mut self, event: PeerManagerEvent) -> Option> { match event { PeerManagerEvent::PeerConnectedIncoming(peer_id) => { @@ -1765,7 +1995,12 @@ impl Network { } } - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn inject_upnp_event(&mut self, event: libp2p::upnp::Event) { match event { libp2p::upnp::Event::NewExternalAddr(addr) => { @@ -1809,7 +2044,12 @@ impl Network { } /* Networking polling */ - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] pub async fn next_event(&mut self) -> NetworkEvent { loop { tokio::select! { @@ -1843,7 +2083,12 @@ impl Network { } } - #[instrument(level = "trace", name = "libp2p", skip(self))] + #[instrument( + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip(self) + )] fn parse_swarm_event( &mut self, event: SwarmEvent>, diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index c4bfcc14d52..f348d46f768 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -87,13 +87,14 @@ pub async fn build_libp2p_instance( boot_nodes: Vec, fork_name: ForkName, chain_spec: Arc, + service_name: String, ) -> Libp2pInstance { let config = build_config(boot_nodes); // launch libp2p service let (signal, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx); + let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx, service_name); let libp2p_context = lighthouse_network::Context { config, enr_fork_id: EnrForkId::default(), @@ -130,8 +131,16 @@ pub async fn build_node_pair( spec: Arc, protocol: Protocol, ) -> (Libp2pInstance, Libp2pInstance) { - let mut sender = build_libp2p_instance(rt.clone(), vec![], fork_name, spec.clone()).await; - let mut receiver = build_libp2p_instance(rt, vec![], fork_name, spec.clone()).await; + let mut sender = build_libp2p_instance( + rt.clone(), + vec![], + fork_name, + spec.clone(), + "sender".to_string(), + ) + .await; + let mut receiver = + build_libp2p_instance(rt, vec![], fork_name, spec.clone(), "receiver".to_string()).await; // let the two nodes set up listeners let sender_fut = async { @@ -205,7 +214,16 @@ pub async fn build_linear( ) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], fork_name, spec.clone()).await); + nodes.push( + build_libp2p_instance( + rt.clone(), + vec![], + fork_name, + spec.clone(), + "linear".to_string(), + ) + .await, + ); } let multiaddrs: Vec = nodes diff --git a/beacon_node/network/src/subnet_service/mod.rs b/beacon_node/network/src/subnet_service/mod.rs index d44e66c7a46..1c5748cb2a4 100644 --- a/beacon_node/network/src/subnet_service/mod.rs +++ b/beacon_node/network/src/subnet_service/mod.rs @@ -113,7 +113,12 @@ impl SubnetService { /* Public functions */ /// Establish the service based on the passed configuration. - #[instrument(level = "info", name = "subnet_service", skip(beacon_chain))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(beacon_chain) + )] pub fn new(beacon_chain: Arc>, node_id: NodeId, config: &NetworkConfig) -> Self { let slot_duration = beacon_chain.slot_clock.slot_duration(); @@ -216,7 +221,12 @@ impl SubnetService { /// /// This returns a result simply for the ergonomics of using ?. The result can be /// safely dropped. - #[instrument(level = "info", name = "subnet_service", skip(self, subscriptions))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self, subscriptions) + )] pub fn validator_subscriptions(&mut self, subscriptions: impl Iterator) { // If the node is in a proposer-only state, we ignore all subnet subscriptions. if self.proposer_only { @@ -351,7 +361,12 @@ impl SubnetService { /// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip /// verification, re-propagates and returns false. - #[instrument(level = "info", name = "subnet_service", skip(self))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self) + )] pub fn should_process_attestation( &self, subnet: Subnet, @@ -376,7 +391,12 @@ impl SubnetService { /// Adds an event to the event queue and notifies that this service is ready to be polled /// again. - #[instrument(level = "info", name = "subnet_service", skip(self))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self) + )] fn queue_event(&mut self, ev: SubnetServiceMessage) { self.events.push_back(ev); if let Some(waker) = &self.waker { @@ -440,7 +460,12 @@ impl SubnetService { } // Subscribes to the subnet if it should be done immediately, or schedules it if required. - #[instrument(level = "info", name = "subnet_service", skip(self))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self) + )] fn subscribe_to_subnet( &mut self, ExactSubnet { subnet, slot }: ExactSubnet, @@ -491,7 +516,12 @@ impl SubnetService { } /// Adds a subscription event to the sync subnet. - #[instrument(level = "info", name = "subnet_service", skip(self))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self) + )] fn subscribe_to_sync_subnet( &mut self, subnet: Subnet, @@ -541,7 +571,12 @@ impl SubnetService { /// Checks that the time in which the subscription would end is not in the past. If we are /// already subscribed, extends the timeout if necessary. If this is a new subscription, we send /// out the appropriate events. - #[instrument(level = "info", name = "subnet_service", skip(self))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self) + )] fn subscribe_to_subnet_immediately( &mut self, subnet: Subnet, @@ -597,7 +632,12 @@ impl SubnetService { } // Unsubscribes from a subnet that was removed. - #[instrument(level = "info", name = "subnet_service", skip(self))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self) + )] fn handle_removed_subnet(&mut self, subnet: Subnet) { if !self.subscriptions.contains_key(&subnet) { // Subscription no longer exists as short lived subnet @@ -615,7 +655,12 @@ impl SubnetService { impl Stream for SubnetService { type Item = SubnetServiceMessage; - #[instrument(level = "info", name = "subnet_service", skip(self))] + #[instrument( + level = "info", + fields(service = "subnet_service"), + name = "subnet_service", + skip(self) + )] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Update the waker if needed. if let Some(waker) = &self.waker { diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 0e6be32d90f..e417c0712e2 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -195,7 +195,12 @@ impl BackFillSync { } /// Pauses the backfill sync if it's currently syncing. - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] pub fn pause(&mut self) { if let BackFillState::Syncing = self.state() { debug!(processed_epochs = %self.validated_batches, to_be_processed = %self.current_start,"Backfill sync paused"); @@ -207,7 +212,12 @@ impl BackFillSync { /// /// If resuming is successful, reports back the current syncing metrics. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] pub fn start( &mut self, network: &mut SyncNetworkContext, @@ -283,7 +293,12 @@ impl BackFillSync { /// A fully synced peer has joined us. /// If we are in a failed state, update a local variable to indicate we are able to restart /// the failed sync on the next attempt. - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] pub fn fully_synced_peer_joined(&mut self) { if matches!(self.state(), BackFillState::Failed) { self.restart_failed_sync = true; @@ -292,7 +307,12 @@ impl BackFillSync { /// A peer has disconnected. /// If the peer has active batches, those are considered failed and re-requested. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn peer_disconnected( &mut self, @@ -341,7 +361,12 @@ impl BackFillSync { /// An RPC error has occurred. /// /// If the batch exists it is re-requested. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn inject_error( &mut self, @@ -381,7 +406,12 @@ impl BackFillSync { /// If this returns an error, the backfill sync has failed and will be restarted once new peers /// join the system. /// The sync manager should update the global sync state on failure. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn on_block_response( &mut self, @@ -459,7 +489,12 @@ impl BackFillSync { /// The syncing process has failed. /// /// This resets past variables, to allow for a fresh start when resuming. - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] fn fail_sync(&mut self, error: BackFillError) -> Result<(), BackFillError> { // Some errors shouldn't fail the chain. if matches!(error, BackFillError::Paused) { @@ -492,7 +527,12 @@ impl BackFillSync { /// Processes the batch with the given id. /// The batch must exist and be ready for processing - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn process_batch( &mut self, network: &mut SyncNetworkContext, @@ -546,7 +586,12 @@ impl BackFillSync { /// The block processor has completed processing a batch. This function handles the result /// of the batch processor. /// If an error is returned the BackFill sync has failed. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] pub fn on_batch_process_result( &mut self, @@ -690,7 +735,12 @@ impl BackFillSync { } /// Processes the next ready batch. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn process_completed_batches( &mut self, network: &mut SyncNetworkContext, @@ -753,7 +803,12 @@ impl BackFillSync { /// /// If a previous batch has been validated and it had been re-processed, penalize the original /// peer. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn advance_chain(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { // make sure this epoch produces an advancement if validating_epoch >= self.current_start { @@ -841,7 +896,12 @@ impl BackFillSync { /// These events occur when a peer has successfully responded with blocks, but the blocks we /// have received are incorrect or invalid. This indicates the peer has not performed as /// intended and can result in downvoting a peer. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn handle_invalid_batch( &mut self, network: &mut SyncNetworkContext, @@ -893,7 +953,12 @@ impl BackFillSync { } /// Sends and registers the request of a batch awaiting download. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn retry_batch_download( &mut self, network: &mut SyncNetworkContext, @@ -935,7 +1000,12 @@ impl BackFillSync { } /// Requests the batch assigned to the given id from a given peer. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn send_batch( &mut self, network: &mut SyncNetworkContext, @@ -995,7 +1065,12 @@ impl BackFillSync { /// When resuming a chain, this function searches for batches that need to be re-downloaded and /// transitions their state to redownload the batch. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn resume_batches(&mut self, network: &mut SyncNetworkContext) -> Result<(), BackFillError> { let batch_ids_to_retry = self .batches @@ -1020,7 +1095,12 @@ impl BackFillSync { /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn request_batches( &mut self, network: &mut SyncNetworkContext, @@ -1063,7 +1143,12 @@ impl BackFillSync { /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. - #[instrument(level = "info", name = "backfill_sync", skip(self, network))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self, network) + )] fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { // don't request batches beyond genesis; if self.last_batch_downloaded { @@ -1125,7 +1210,12 @@ impl BackFillSync { /// /// This errors if the beacon chain indicates that backfill sync has already completed or is /// not required. - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> { let anchor_info = self.beacon_chain.store.get_anchor_info(); if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { @@ -1139,7 +1229,12 @@ impl BackFillSync { } /// Checks with the beacon chain if backfill sync has completed. - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] fn check_completed(&mut self) -> bool { if self.would_complete(self.current_start) { // Check that the beacon chain agrees @@ -1155,7 +1250,12 @@ impl BackFillSync { } /// Checks if backfill would complete by syncing to `start_epoch`. - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] fn would_complete(&self, start_epoch: Epoch) -> bool { start_epoch <= self @@ -1165,12 +1265,22 @@ impl BackFillSync { } /// Updates the global network state indicating the current state of a backfill sync. - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] fn set_state(&self, state: BackFillState) { *self.network_globals.backfill_state.write() = state; } - #[instrument(level = "info", name = "backfill_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "backfill_sync"), + name = "backfill_sync", + skip(self) + )] fn state(&self) -> BackFillState { self.network_globals.backfill_state.read().clone() } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index ef126f39de7..0512ca3bae7 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -126,7 +126,7 @@ use lighthouse_network::service::api_types::Id; pub(crate) type BlockLookupSummary = (Id, Hash256, Option, Vec); impl BlockLookups { - #[instrument(level = "info", name = "lookup_sync")] + #[instrument(level = "info", fields(service = "lookup_sync"), name = "lookup_sync")] pub fn new() -> Self { Self { failed_chains: LRUTimeCache::new(Duration::from_secs( @@ -137,19 +137,34 @@ impl BlockLookups { } #[cfg(test)] - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub(crate) fn insert_failed_chain(&mut self, block_root: Hash256) { self.failed_chains.insert(block_root); } #[cfg(test)] - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub(crate) fn get_failed_chains(&mut self) -> Vec { self.failed_chains.keys().cloned().collect() } #[cfg(test)] - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub(crate) fn active_single_lookups(&self) -> Vec { self.single_block_lookups .iter() @@ -165,7 +180,12 @@ impl BlockLookups { } /// Returns a vec of all parent lookup chains by tip, in descending slot order (tip first) - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub(crate) fn active_parent_lookups(&self) -> Vec { compute_parent_chains( &self @@ -180,7 +200,12 @@ impl BlockLookups { /// Creates a parent lookup for the block with the given `block_root` and immediately triggers it. /// If a parent lookup exists or is triggered, a current lookup will be created. - #[instrument(level = "info", name = "lookup_sync", skip(self, block_component, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, block_component, cx) + )] pub fn search_child_and_parent( &mut self, block_root: Hash256, @@ -210,7 +235,12 @@ impl BlockLookups { /// Seach a block whose parent root is unknown. /// Returns true if the lookup is created or already exists - #[instrument(level = "info", name = "lookup_sync", skip(self, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, cx) + )] pub fn search_unknown_block( &mut self, block_root: Hash256, @@ -226,7 +256,12 @@ impl BlockLookups { /// - `block_root_to_search` is a failed chain /// /// Returns true if the lookup is created or already exists - #[instrument(level = "info", name = "lookup_sync", skip(self, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, cx) + )] pub fn search_parent_of_child( &mut self, block_root_to_search: Hash256, @@ -328,7 +363,12 @@ impl BlockLookups { /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is /// constructed. /// Returns true if the lookup is created or already exists - #[instrument(level = "info", name = "lookup_sync", skip(self, block_component, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, block_component, cx) + )] fn new_current_lookup( &mut self, block_root: Hash256, @@ -431,7 +471,12 @@ impl BlockLookups { /* Lookup responses */ /// Process a block or blob response received from a single lookup request. - #[instrument(level = "info", name = "lookup_sync", skip(self, response, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, response, cx) + )] pub fn on_download_response>( &mut self, id: SingleLookupReqId, @@ -517,7 +562,12 @@ impl BlockLookups { /* Error responses */ - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub fn peer_disconnected(&mut self, peer_id: &PeerId) { for (_, lookup) in self.single_block_lookups.iter_mut() { lookup.remove_peer(peer_id); @@ -526,7 +576,12 @@ impl BlockLookups { /* Processing responses */ - #[instrument(level = "info", name = "lookup_sync", skip(self, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, cx) + )] pub fn on_processing_result( &mut self, process_type: BlockProcessType, @@ -547,7 +602,12 @@ impl BlockLookups { self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); } - #[instrument(level = "info", name = "lookup_sync", skip(self, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, cx) + )] pub fn on_processing_result_inner>( &mut self, lookup_id: SingleLookupId, @@ -712,7 +772,12 @@ impl BlockLookups { } } - #[instrument(level = "info", name = "lookup_sync", skip(self, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, cx) + )] pub fn on_external_processing_result( &mut self, block_root: Hash256, @@ -738,7 +803,12 @@ impl BlockLookups { } /// Makes progress on the immediate children of `block_root` - #[instrument(level = "info", name = "lookup_sync", skip(self, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, cx) + )] pub fn continue_child_lookups(&mut self, block_root: Hash256, cx: &mut SyncNetworkContext) { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self @@ -759,7 +829,12 @@ impl BlockLookups { /// Drops `dropped_id` lookup and all its children recursively. Lookups awaiting a parent need /// the parent to make progress to resolve, therefore we must drop them if the parent is /// dropped. - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub fn drop_lookup_and_children(&mut self, dropped_id: SingleLookupId) { if let Some(dropped_lookup) = self.single_block_lookups.remove(&dropped_id) { debug!( @@ -784,7 +859,12 @@ impl BlockLookups { /// Common handler a lookup request error, drop it and update metrics /// Returns true if the lookup is created or already exists - #[instrument(level = "info", name = "lookup_sync", skip(self, result, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, result, cx) + )] fn on_lookup_result( &mut self, id: SingleLookupId, @@ -822,14 +902,24 @@ impl BlockLookups { /* Helper functions */ /// Drops all the single block requests and returns how many requests were dropped. - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub fn drop_single_block_requests(&mut self) -> usize { let requests_to_drop = self.single_block_lookups.len(); self.single_block_lookups.clear(); requests_to_drop } - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub fn update_metrics(&self) { metrics::set_gauge( &metrics::SYNC_SINGLE_BLOCK_LOOKUPS, @@ -838,7 +928,12 @@ impl BlockLookups { } /// Perform some prune operations on lookups on some interval - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] pub fn prune_lookups(&mut self) { self.drop_lookups_without_peers(); self.drop_stuck_lookups(); @@ -862,7 +957,12 @@ impl BlockLookups { /// /// Instead there's no negative for keeping lookups with no peers around for some time. If we /// regularly prune them, it should not be a memory concern (TODO: maybe yes!). - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] fn drop_lookups_without_peers(&mut self) { for (lookup_id, block_root) in self .single_block_lookups @@ -900,7 +1000,12 @@ impl BlockLookups { /// /// - One single clear warn level log per stuck incident /// - If the original bug is sporadic, it reduces the time a node is stuck from forever to 15 min - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] fn drop_stuck_lookups(&mut self) { // While loop to find and drop all disjoint trees of potentially stuck lookups. while let Some(stuck_lookup) = self.single_block_lookups.values().find(|lookup| { @@ -938,7 +1043,12 @@ impl BlockLookups { } /// Recursively find the oldest ancestor lookup of another lookup - #[instrument(level = "info", name = "lookup_sync", skip(self))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self) + )] fn find_oldest_ancestor_lookup<'a>( &'a self, lookup: &'a SingleBlockLookup, @@ -963,7 +1073,12 @@ impl BlockLookups { /// Adds peers to a lookup and its ancestors recursively. /// Note: Takes a `lookup_id` as argument to allow recursion on mutable lookups, without having /// to duplicate the code to add peers to a lookup - #[instrument(level = "info", name = "lookup_sync", skip(self, cx))] + #[instrument( + level = "info", + fields(service = "lookup_sync"), + name = "lookup_sync", + skip(self, cx) + )] fn add_peers_to_lookup_and_ancestors( &mut self, lookup_id: SingleLookupId, diff --git a/beacon_node/network/src/sync/peer_sampling.rs b/beacon_node/network/src/sync/peer_sampling.rs index e817f46f94a..d5a0f0aa00f 100644 --- a/beacon_node/network/src/sync/peer_sampling.rs +++ b/beacon_node/network/src/sync/peer_sampling.rs @@ -16,7 +16,7 @@ use std::{ collections::hash_map::Entry, collections::HashMap, marker::PhantomData, sync::Arc, time::Duration, }; -use tracing::{debug, error, warn}; +use tracing::{debug, error, instrument, warn}; use types::{data_column_sidecar::ColumnIndex, ChainSpec, DataColumnSidecar, Hash256}; pub type SamplingResult = Result<(), SamplingError>; @@ -29,6 +29,7 @@ pub struct Sampling { } impl Sampling { + #[instrument(level = "info", fields(service = "sampling"), name = "sampling")] pub fn new(sampling_config: SamplingConfig) -> Self { Self { requests: <_>::default(), @@ -37,11 +38,23 @@ impl Sampling { } #[cfg(test)] + #[instrument( + level = "info", + fields(service = "sampling"), + name = "sampling", + skip(self) + )] pub fn active_sampling_requests(&self) -> Vec { self.requests.values().map(|r| r.block_root).collect() } #[cfg(test)] + #[instrument( + level = "info", + fields(service = "sampling"), + name = "sampling", + skip(self) + )] pub fn get_request_status( &self, block_root: Hash256, @@ -59,6 +72,12 @@ impl Sampling { /// /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. /// - `None`: Request still active, requester should do no action + #[instrument( + level = "info", + fields(service = "sampling"), + name = "sampling", + skip(self, cx) + )] pub fn on_new_sample_request( &mut self, block_root: Hash256, @@ -104,6 +123,12 @@ impl Sampling { /// /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. /// - `None`: Request still active, requester should do no action + #[instrument( + level = "info", + fields(service = "sampling"), + name = "sampling", + skip(self, resp, cx) + )] pub fn on_sample_downloaded( &mut self, id: SamplingId, @@ -128,6 +153,12 @@ impl Sampling { /// /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. /// - `None`: Request still active, requester should do no action + #[instrument( + level = "info", + fields(service = "sampling"), + name = "sampling", + skip(self, cx) + )] pub fn on_sample_verified( &mut self, id: SamplingId, @@ -147,6 +178,12 @@ impl Sampling { /// Converts a result from the internal format of `ActiveSamplingRequest` (error first to use ? /// conveniently), to an Option first format to use an `if let Some() { act on result }` pattern /// in the sync manager. + #[instrument( + level = "info", + fields(service = "sampling"), + name = "sampling", + skip(self) + )] fn handle_sampling_result( &mut self, result: Result, SamplingError>, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index a4f4e04497c..fefa307f0eb 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -18,7 +18,7 @@ use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::fmt; use std::hash::{Hash, Hasher}; use strum::IntoStaticStr; -use tracing::{debug, warn}; +use tracing::{debug, instrument, warn}; use types::{Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of @@ -175,21 +175,25 @@ impl SyncingChain { } /// Check if the chain has peers from which to process batches. + #[instrument(level = "info", fields(chain = self.id), skip(self))] pub fn available_peers(&self) -> usize { self.peers.len() } /// Get the chain's id. + #[instrument(level = "info", fields(chain = self.id), skip(self))] pub fn get_id(&self) -> ChainId { self.id } /// Peers currently syncing this chain. + #[instrument(level = "info", fields(chain = self.id), skip(self))] pub fn peers(&self) -> impl Iterator + '_ { self.peers.keys().cloned() } /// Progress in epochs made by the chain + #[instrument(level = "info", fields(chain = self.id), skip(self))] pub fn processed_epochs(&self) -> u64 { self.processing_target .saturating_sub(self.start_epoch) @@ -197,6 +201,7 @@ impl SyncingChain { } /// Returns the total count of pending blocks in all the batches of this chain + #[instrument(level = "info", fields(chain = self.id), skip(self))] pub fn pending_blocks(&self) -> usize { self.batches .values() @@ -206,6 +211,7 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn remove_peer( &mut self, peer_id: &PeerId, @@ -238,6 +244,7 @@ impl SyncingChain { } /// Returns the latest slot number that has been processed. + #[instrument(level = "info", fields(chain = self.id), skip(self))] fn current_processed_slot(&self) -> Slot { // the last slot we processed was included in the previous batch, and corresponds to the // first slot of the current target epoch @@ -247,6 +254,7 @@ impl SyncingChain { /// A block has been received for a batch on this chain. /// If the block correctly completes the batch it will be processed if possible. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn on_block_response( &mut self, network: &mut SyncNetworkContext, @@ -313,6 +321,7 @@ impl SyncingChain { /// Processes the batch with the given id. /// The batch must exist and be ready for processing + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] fn process_batch( &mut self, network: &mut SyncNetworkContext, @@ -360,6 +369,7 @@ impl SyncingChain { } /// Processes the next ready batch, prioritizing optimistic batches over the processing target. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] fn process_completed_batches( &mut self, network: &mut SyncNetworkContext, @@ -466,6 +476,7 @@ impl SyncingChain { /// The block processor has completed processing a batch. This function handles the result /// of the batch processor. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn on_batch_process_result( &mut self, network: &mut SyncNetworkContext, @@ -620,6 +631,7 @@ impl SyncingChain { } } + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] fn reject_optimistic_batch( &mut self, network: &mut SyncNetworkContext, @@ -654,6 +666,7 @@ impl SyncingChain { /// If a previous batch has been validated and it had been re-processed, penalize the original /// peer. #[allow(clippy::modulo_one)] + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] fn advance_chain(&mut self, network: &mut SyncNetworkContext, validating_epoch: Epoch) { // make sure this epoch produces an advancement if validating_epoch <= self.start_epoch { @@ -760,6 +773,7 @@ impl SyncingChain { /// These events occur when a peer has successfully responded with blocks, but the blocks we /// have received are incorrect or invalid. This indicates the peer has not performed as /// intended and can result in downvoting a peer. + #[instrument(level = "info", fields(service = self.id, network), skip(self))] fn handle_invalid_batch( &mut self, network: &mut SyncNetworkContext, @@ -819,6 +833,7 @@ impl SyncingChain { /// This chain has been requested to start syncing. /// /// This could be new chain, or an old chain that is being resumed. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn start_syncing( &mut self, network: &mut SyncNetworkContext, @@ -857,6 +872,7 @@ impl SyncingChain { /// Add a peer to the chain. /// /// If the chain is active, this starts requesting batches from this peer. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn add_peer( &mut self, network: &mut SyncNetworkContext, @@ -874,6 +890,7 @@ impl SyncingChain { /// An RPC error has occurred. /// /// If the batch exists it is re-requested. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn inject_error( &mut self, network: &mut SyncNetworkContext, @@ -930,6 +947,7 @@ impl SyncingChain { } /// Sends and registers the request of a batch awaiting download. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn retry_batch_download( &mut self, network: &mut SyncNetworkContext, @@ -966,6 +984,7 @@ impl SyncingChain { } /// Requests the batch assigned to the given id from a given peer. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn send_batch( &mut self, network: &mut SyncNetworkContext, @@ -1038,6 +1057,7 @@ impl SyncingChain { } /// Returns true if this chain is currently syncing. + #[instrument(level = "info", fields(chain = self.id), skip(self))] pub fn is_syncing(&self) -> bool { match self.state { ChainSyncingState::Syncing => true, @@ -1047,6 +1067,7 @@ impl SyncingChain { /// Kickstarts the chain by sending for processing batches that are ready and requesting more /// batches if needed. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] pub fn resume( &mut self, network: &mut SyncNetworkContext, @@ -1059,6 +1080,7 @@ impl SyncingChain { /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] fn request_batches(&mut self, network: &mut SyncNetworkContext) -> ProcessingResult { if !matches!(self.state, ChainSyncingState::Syncing) { return Ok(KeepChain); @@ -1149,6 +1171,7 @@ impl SyncingChain { /// Creates the next required batch from the chain. If there are no more batches required, /// `false` is returned. + #[instrument(level = "info", fields(chain = self.id), skip(self, network))] fn include_next_batch(&mut self, network: &mut SyncNetworkContext) -> Option { // don't request batches beyond the target head slot if self @@ -1209,6 +1232,7 @@ impl SyncingChain { /// This produces a string of the form: [D,E,E,E,E] /// to indicate the current buffer state of the chain. The symbols are defined on each of the /// batch states. See [BatchState::visualize] for symbol definitions. + #[instrument(level = "info", fields(chain = self.id), skip(self))] fn visualize_batch_state(&self) -> String { let mut visualization_string = String::with_capacity((BATCH_BUFFER_SIZE * 3) as usize); diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index cc4cfc7f491..a2c3b99674f 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -83,7 +83,12 @@ where C: BlockStorage + ToStatusMessage, T: BeaconChainTypes, { - #[instrument(level = "info", name = "range_sync", skip(beacon_chain))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(beacon_chain) + )] pub fn new(beacon_chain: Arc) -> Self { RangeSync { beacon_chain: beacon_chain.clone(), @@ -95,7 +100,12 @@ where } } - #[instrument(level = "info", name = "range_sync", skip(self))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self) + )] pub fn state( &self, ) -> Result, &'static str> { @@ -107,7 +117,12 @@ where /// may need to be synced as a result. A new peer, may increase the peer pool of a finalized /// chain, this may result in a different finalized chain from syncing as finalized chains are /// prioritised by peer-pool size. - #[instrument(level = "info", name = "range_sync", skip(self, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, network) + )] pub fn add_peer( &mut self, network: &mut SyncNetworkContext, @@ -202,7 +217,12 @@ where /// /// This function finds the chain that made this request. Once found, processes the result. /// This request could complete a chain or simply add to its progress. - #[instrument(level = "info", name = "range_sync", skip(self, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, network) + )] pub fn blocks_by_range_response( &mut self, network: &mut SyncNetworkContext, @@ -233,7 +253,12 @@ where } } - #[instrument(level = "info", name = "range_sync", skip(self, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, network) + )] pub fn handle_block_process_result( &mut self, network: &mut SyncNetworkContext, @@ -266,7 +291,12 @@ where /// A peer has disconnected. This removes the peer from any ongoing chains and mappings. A /// disconnected peer could remove a chain - #[instrument(level = "info", name = "range_sync", skip(self, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, network) + )] pub fn peer_disconnect(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { // if the peer is in the awaiting head mapping, remove it self.awaiting_head_peers.remove(peer_id); @@ -279,7 +309,12 @@ where /// which pool the peer is in. The chain may also have a batch or batches awaiting /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. - #[instrument(level = "info", name = "range_sync", skip(self, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, network) + )] fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { for (removed_chain, sync_type, remove_reason) in self .chains @@ -299,7 +334,12 @@ where /// /// Check to see if the request corresponds to a pending batch. If so, re-request it if possible, if there have /// been too many failed attempts for the batch, remove the chain. - #[instrument(level = "info", name = "range_sync", skip(self, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, network) + )] pub fn inject_error( &mut self, network: &mut SyncNetworkContext, @@ -329,7 +369,12 @@ where } } - #[instrument(level = "info", name = "range_sync", skip(self, chain, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, chain, network) + )] fn on_chain_removed( &mut self, chain: SyncingChain, @@ -373,7 +418,12 @@ where } /// Kickstarts sync. - #[instrument(level = "info", name = "range_sync", skip(self, network))] + #[instrument( + level = "info", + fields(component = "range_sync"), + name = "range_sync", + skip(self, network) + )] pub fn resume(&mut self, network: &mut SyncNetworkContext) { for (removed_chain, sync_type, remove_reason) in self.chains.call_all(|chain| chain.resume(network)) diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 5ab788e4fd4..7da73596b5b 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -5,7 +5,7 @@ use futures::channel::mpsc::Sender; use futures::prelude::*; use std::sync::Weak; use tokio::runtime::{Handle, Runtime}; -use tracing::{debug, span, Level}; +use tracing::{debug, instrument}; pub use tokio::task::JoinHandle; @@ -79,6 +79,9 @@ pub struct TaskExecutor { /// /// The task must provide a reason for shutting down. signal_tx: Sender, + + /// The name of the service for inclusion in the logger output. + service_name: String, } impl TaskExecutor { @@ -89,27 +92,29 @@ impl TaskExecutor { /// This function should only be used during testing. In production, prefer to obtain an /// instance of `Self` via a `environment::RuntimeContext` (see the `lighthouse/environment` /// crate). + #[instrument(level = "info", fields(service = service_name), name = "task_executor", skip(handle, exit, signal_tx))] pub fn new>( handle: T, exit: async_channel::Receiver<()>, signal_tx: Sender, + service_name: String, ) -> Self { Self { handle_provider: handle.into(), exit, signal_tx, + service_name, } } /// Clones the task executor adding a service name. + #[instrument(level = "info", fields(service = service_name), name = "task_executor", skip(self))] pub fn clone_with_name(&self, service_name: String) -> Self { - let span = span!(Level::INFO, "TaskExecutor", service = service_name); - let _enter = span.enter(); - TaskExecutor { handle_provider: self.handle_provider.clone(), exit: self.exit.clone(), signal_tx: self.signal_tx.clone(), + service_name, } } @@ -119,6 +124,7 @@ impl TaskExecutor { /// The purpose of this function is to create a compile error if some function which previously /// returned `()` starts returning something else. Such a case may otherwise result in /// accidental error suppression. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self, task))] pub fn spawn_ignoring_error( &self, task: impl Future> + Send + 'static, @@ -130,6 +136,7 @@ impl TaskExecutor { /// Spawn a task to monitor the completion of another task. /// /// If the other task exits by panicking, then the monitor task will shut down the executor. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self, task_handle))] fn spawn_monitor( &self, task_handle: impl Future> + Send + 'static, @@ -161,6 +168,7 @@ impl TaskExecutor { /// of a panic, the executor will be shut down via `self.signal_tx`. /// /// This function generates prometheus metrics on number of tasks and task duration. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self, task))] pub fn spawn(&self, task: impl Future + Send + 'static, name: &'static str) { if let Some(task_handle) = self.spawn_handle(task, name) { self.spawn_monitor(task_handle, name) @@ -176,6 +184,7 @@ impl TaskExecutor { /// This is useful in cases where the future to be spawned needs to do additional cleanup work when /// the task is completed/canceled (e.g. writing local variables to disk) or the task is created from /// some framework which does its own cleanup (e.g. a hyper server). + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self, task))] pub fn spawn_without_exit( &self, task: impl Future + Send + 'static, @@ -213,6 +222,7 @@ impl TaskExecutor { /// The task is cancelled when the corresponding async-channel is dropped. /// /// This function generates prometheus metrics on number of tasks and task duration. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self, task))] pub fn spawn_handle( &self, task: impl Future + Send + 'static, @@ -251,6 +261,7 @@ impl TaskExecutor { /// The Future returned behaves like the standard JoinHandle which can return an error if the /// task failed. /// This function generates prometheus metrics on number of tasks and task duration. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self, task))] pub fn spawn_blocking_handle( &self, task: F, @@ -299,6 +310,7 @@ impl TaskExecutor { /// a `tokio` context present in the thread-local storage due to some `rayon` funkiness. Talk to /// @paulhauner if you plan to use this function in production. He has put metrics in here to /// track any use of it, so don't think you can pull a sneaky one on him. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self, future))] pub fn block_on_dangerous( &self, future: F, @@ -334,6 +346,7 @@ impl TaskExecutor { } /// Returns a `Handle` to the current runtime. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self))] pub fn handle(&self) -> Option { self.handle_provider.handle() } @@ -348,6 +361,7 @@ impl TaskExecutor { } /// Get a channel to request shutting down. + #[instrument(level = "info", fields(service = self.service_name), name = "task_executor", skip(self))] pub fn shutdown_sender(&self) -> Sender { self.signal_tx.clone() } diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index 1d259b152c3..698152f6c13 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -37,7 +37,7 @@ impl Default for TestRuntime { (Some(runtime), handle) }; - let task_executor = TaskExecutor::new(handle, exit, shutdown_tx); + let task_executor = TaskExecutor::new(handle, exit, shutdown_tx, "test".to_string()); Self { runtime, diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index ff8f0770390..e3e288ceef2 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -18,7 +18,7 @@ use std::path::PathBuf; use std::sync::Arc; use task_executor::{ShutdownReason, TaskExecutor}; use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; -use tracing::{error, info, span, warn, Level}; +use tracing::{error, info, warn}; use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_subscriber::filter::LevelFilter; use types::{EthSpec, GnosisEthSpec, MainnetEthSpec, MinimalEthSpec}; @@ -314,13 +314,14 @@ impl Environment { &self.runtime } - /// Returns a `Context` where no "service" has been added to the logger output. + /// Returns a `Context` where a "core" service has been added to the logger output. pub fn core_context(&self) -> RuntimeContext { RuntimeContext { executor: TaskExecutor::new( Arc::downgrade(self.runtime()), self.exit.clone(), self.signal_tx.clone(), + "core".to_string(), ), eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), @@ -331,14 +332,12 @@ impl Environment { /// Returns a `Context` where the `service_name` is added to the logger output. pub fn service_context(&self, service_name: String) -> RuntimeContext { - let span = span!(Level::INFO, "", service = service_name); - let _enter = span.enter(); - RuntimeContext { executor: TaskExecutor::new( Arc::downgrade(self.runtime()), self.exit.clone(), self.signal_tx.clone(), + service_name, ), eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 3023a227fed..e8459c533c0 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -112,7 +112,12 @@ impl TestRig { ); let (runtime_shutdown, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx); + let executor = TaskExecutor::new( + Arc::downgrade(&runtime), + exit, + shutdown_tx, + "test".to_string(), + ); let mut spec = TEST_FORK.make_genesis_spec(MainnetEthSpec::default_spec()); spec.terminal_total_difficulty = Uint256::ZERO; diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index 1dad2c0d0d5..436974d3d2f 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -338,7 +338,12 @@ mod tests { ); let (runtime_shutdown, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let executor = TaskExecutor::new(Arc::downgrade(&runtime), exit, shutdown_tx); + let executor = TaskExecutor::new( + Arc::downgrade(&runtime), + exit, + shutdown_tx, + "test".to_string(), + ); let slashing_db_path = validator_dir.path().join(SLASHING_PROTECTION_FILENAME); let slashing_protection = SlashingDatabase::open_or_create(&slashing_db_path).unwrap();