From af4a297a934e3e997c76d8b7f2f1bb26eaf923cd Mon Sep 17 00:00:00 2001 From: Maria Kuklina Date: Wed, 2 Oct 2024 14:33:02 +0200 Subject: [PATCH] feat(metrics): add CC metrics --- crates/chain-listener/src/listener.rs | 39 +++++++- crates/peer-metrics/src/chain_listener.rs | 107 +++++++++++++++++++++- 2 files changed, 142 insertions(+), 4 deletions(-) diff --git a/crates/chain-listener/src/listener.rs b/crates/chain-listener/src/listener.rs index b8c0617412..ba9bb650d5 100644 --- a/crates/chain-listener/src/listener.rs +++ b/crates/chain-listener/src/listener.rs @@ -367,6 +367,18 @@ impl ChainListener { self.set_current_epoch(init_params.current_epoch).await; self.set_global_nonce(init_params.global_nonce).await; + self.observe(|m| { + m.observe_epoch_settings( + truncate_to_u64(&self.init_timestamp) as i64, + truncate_to_u64(&self.epoch_duration) as i64, + ); + + m.observe_allowed_proofs_settings( + truncate_to_u64(&self.max_proofs_per_epoch) as i64, + truncate_to_u64(&self.min_proofs_per_epoch) as i64, + ) + }); + Ok(()) } @@ -556,8 +568,12 @@ impl ChainListener { async fn refresh_compute_units(&mut self) -> eyre::Result<()> { let mut units = self.chain_connector.get_compute_units().await?; + self.observe(|m| m.observe_cus_total(units.len() as i64)); + let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect(); + self.observe(|m| m.observe_cus_in_deals(in_deal.len() as i64)); + let current_units: Vec = units.iter().map(|unit| CUID::new(unit.id.0)).collect(); self.core_distributor .cleanup_cache(current_units.as_slice()); @@ -850,12 +866,12 @@ impl ChainListener { deal_event.deal ); - let cu_ids = deal_event + let cu_ids: Vec<_> = deal_event .cuIds .into_iter() .map(|cu| CUID::new(cu.0)) .collect(); - + let cu_ids_len = cu_ids.len(); self.active_deals.insert( deal_event.deal.to_string().into(), OnChainWorker { @@ -863,6 +879,9 @@ impl ChainListener { cu_ids, }, ); + + self.observe(|m| m.observe_cus_in_deals_added(cu_ids_len as i64)); + Ok(()) } @@ -1082,6 +1101,9 @@ impl ChainListener { self.active_deals.clear(); self.current_commitment = None; self.stop_commitment().await?; + + self.observe(|m| m.observe_cus_in_deals(0)); + Ok(()) } @@ -1342,7 +1364,12 @@ impl ChainListener { }) .await?; - self.active_deals.remove(deal_id); + let removed_deal = self.active_deals.remove(deal_id); + + if let Some(removed_deal) = removed_deal { + self.observe(|m| m.observe_cus_in_deals_removed(removed_deal.cu_ids.len() as i64)); + } + Ok(()) } @@ -1417,6 +1444,8 @@ impl ChainListener { } async fn set_current_epoch(&mut self, epoch_number: U256) { + self.observe(|m| m.observe_current_epoch(truncate_to_u64(&epoch_number) as i64)); + self.current_epoch = epoch_number; self.proof_tracker.set_current_epoch(epoch_number).await; } @@ -1485,3 +1514,7 @@ where .inspect(|m| m.observe_ccp_reply(elapsed.as_millis() as f64)); result } + +fn truncate_to_u64(value: &U256) -> u64 { + value.as_limbs()[0] +} diff --git a/crates/peer-metrics/src/chain_listener.rs b/crates/peer-metrics/src/chain_listener.rs index c742e56c1a..de5240aa7f 100644 --- a/crates/peer-metrics/src/chain_listener.rs +++ b/crates/peer-metrics/src/chain_listener.rs @@ -17,7 +17,6 @@ * along with this program. If not, see . */ -use crate::{execution_time_buckets, register}; use prometheus_client::encoding::EncodeLabelSet; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::exemplar::CounterWithExemplar; @@ -26,6 +25,8 @@ use prometheus_client::metrics::gauge::Gauge; use prometheus_client::metrics::histogram::Histogram; use prometheus_client::registry::Registry; +use crate::{execution_time_buckets, register}; + #[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)] struct TxLabel { tx_hash: String, @@ -53,14 +54,32 @@ pub struct ChainListenerMetrics { ccp_proofs_tx_success: Counter, // how many proofs transaction are failed ccp_proofs_tx_failed: CounterWithExemplar, + // max amount of proofs are allowed + ccp_proofs_per_epoch_allowed_max: Gauge, + // min amount of proofs are allowed + ccp_proofs_per_epoch_allowed_min: Gauge, + // How many blocks we have received from the newHead subscription blocks_seen: Counter, last_seen_block: Gauge, // How many block we manage to process while processing the block blocks_processed: Counter, + // The number of the latest block last_process_block: Gauge, + + // Current commitment status current_commitment_status: Gauge, + // Current commitment id current_commitment: Family, + + // CUs metrics + cus_total: Gauge, + cus_in_deals: Gauge, + + // Epoch Metrics + current_epoch: Gauge, + current_epoch_start_timestamp_sec: Gauge, + current_epoch_duration_sec: Gauge, } impl ChainListenerMetrics { @@ -157,6 +176,55 @@ impl ChainListenerMetrics { "Current commitment", ); + let cus_total = register( + sub_registry, + Gauge::default(), + "cus_total", + "Total number of CUs", + ); + + let cus_in_deals = register( + sub_registry, + Gauge::default(), + "cus_in_deals", + "Total number of CUs in deals", + ); + + let current_epoch = register( + sub_registry, + Gauge::default(), + "current_epoch", + "Current epoch", + ); + + let current_epoch_start_timestamp_sec = register( + sub_registry, + Gauge::default(), + "current_epoch_start_timestamp_sec", + "Current epoch start timestamp", + ); + + let current_epoch_duration_sec = register( + sub_registry, + Gauge::default(), + "current_epoch_duration_sec", + "Current epoch duration", + ); + + let ccp_proofs_per_epoch_allowed_max = register( + sub_registry, + Gauge::default(), + "ccp_proofs_per_epoch_allowed_max", + "Max amount of proofs are allowed per epoch", + ); + + let ccp_proofs_per_epoch_allowed_min = register( + sub_registry, + Gauge::default(), + "ccp_proofs_per_epoch_allowed_min", + "Min amount of proofs are allowed per epoch", + ); + Self { ccp_requests_total, ccp_replies_total, @@ -165,12 +233,19 @@ impl ChainListenerMetrics { ccp_proofs_submit_failed, ccp_proofs_tx_success, ccp_proofs_tx_failed, + ccp_proofs_per_epoch_allowed_max, + ccp_proofs_per_epoch_allowed_min, blocks_seen, last_seen_block, blocks_processed, last_process_block, + cus_total, + cus_in_deals, current_commitment_status, current_commitment, + current_epoch, + current_epoch_start_timestamp_sec, + current_epoch_duration_sec, } } @@ -225,4 +300,34 @@ impl ChainListenerMetrics { .get_or_create(&CommitmentLabel { commitment_id }) .set(0); } + + pub fn observe_allowed_proofs_settings(&self, max_allowed: i64, min_allowed: i64) { + self.ccp_proofs_per_epoch_allowed_max.set(max_allowed); + self.ccp_proofs_per_epoch_allowed_min.set(min_allowed); + } + + pub fn observe_current_epoch(&self, epoch: i64) { + self.current_epoch.set(epoch); + } + + pub fn observe_epoch_settings(&self, start_timestamp: i64, duration: i64) { + self.current_epoch_start_timestamp_sec.set(start_timestamp); + self.current_epoch_duration_sec.set(duration); + } + + pub fn observe_cus_total(&self, n: i64) { + self.cus_total.set(n); + } + + pub fn observe_cus_in_deals(&self, n: i64) { + self.cus_in_deals.set(n); + } + + pub fn observe_cus_in_deals_added(&self, n: i64) { + self.cus_in_deals.inc_by(n); + } + + pub fn observe_cus_in_deals_removed(&self, n: i64) { + self.cus_in_deals.dec_by(n); + } }