Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add CC metrics #2390

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,18 @@ impl ChainListener {
self.set_current_epoch(init_params.current_epoch).await;
self.set_global_nonce(init_params.global_nonce).await;

self.observe(|m| {
m.observe_epoch_settings(
truncate_to_u64(&self.init_timestamp) as i64,
truncate_to_u64(&self.epoch_duration) as i64,
);

m.observe_allowed_proofs_settings(
truncate_to_u64(&self.max_proofs_per_epoch) as i64,
truncate_to_u64(&self.min_proofs_per_epoch) as i64,
)
});

Ok(())
}

Expand Down Expand Up @@ -556,8 +568,12 @@ impl ChainListener {
async fn refresh_compute_units(&mut self) -> eyre::Result<()> {
let mut units = self.chain_connector.get_compute_units().await?;

self.observe(|m| m.observe_cus_total(units.len() as i64));

let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect();

self.observe(|m| m.observe_cus_in_deals(in_deal.len() as i64));

let current_units: Vec<CUID> = units.iter().map(|unit| CUID::new(unit.id.0)).collect();
self.core_distributor
.cleanup_cache(current_units.as_slice());
Expand Down Expand Up @@ -850,19 +866,22 @@ impl ChainListener {
deal_event.deal
);

let cu_ids = deal_event
let cu_ids: Vec<_> = deal_event
.cuIds
.into_iter()
.map(|cu| CUID::new(cu.0))
.collect();

let cu_ids_len = cu_ids.len();
self.active_deals.insert(
deal_event.deal.to_string().into(),
OnChainWorker {
id: deal_event.onchainWorkerId,
cu_ids,
},
);

self.observe(|m| m.observe_cus_in_deals_added(cu_ids_len as i64));

Ok(())
}

Expand Down Expand Up @@ -1082,6 +1101,9 @@ impl ChainListener {
self.active_deals.clear();
self.current_commitment = None;
self.stop_commitment().await?;

self.observe(|m| m.observe_cus_in_deals(0));

Ok(())
}

Expand Down Expand Up @@ -1342,7 +1364,12 @@ impl ChainListener {
})
.await?;

self.active_deals.remove(deal_id);
let removed_deal = self.active_deals.remove(deal_id);

if let Some(removed_deal) = removed_deal {
self.observe(|m| m.observe_cus_in_deals_removed(removed_deal.cu_ids.len() as i64));
}

Ok(())
}

Expand Down Expand Up @@ -1417,6 +1444,8 @@ impl ChainListener {
}

async fn set_current_epoch(&mut self, epoch_number: U256) {
self.observe(|m| m.observe_current_epoch(truncate_to_u64(&epoch_number) as i64));

self.current_epoch = epoch_number;
self.proof_tracker.set_current_epoch(epoch_number).await;
}
Expand Down Expand Up @@ -1485,3 +1514,7 @@ where
.inspect(|m| m.observe_ccp_reply(elapsed.as_millis() as f64));
result
}

fn truncate_to_u64(value: &U256) -> u64 {
value.as_limbs()[0]
}
107 changes: 106 additions & 1 deletion crates/peer-metrics/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use crate::{execution_time_buckets, register};
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::exemplar::CounterWithExemplar;
Expand All @@ -26,6 +25,8 @@ use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;

use crate::{execution_time_buckets, register};

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct TxLabel {
tx_hash: String,
Expand Down Expand Up @@ -53,14 +54,32 @@ pub struct ChainListenerMetrics {
ccp_proofs_tx_success: Counter,
// how many proofs transaction are failed
ccp_proofs_tx_failed: CounterWithExemplar<TxLabel>,
// max amount of proofs are allowed
ccp_proofs_per_epoch_allowed_max: Gauge,
// min amount of proofs are allowed
ccp_proofs_per_epoch_allowed_min: Gauge,

// How many blocks we have received from the newHead subscription
blocks_seen: Counter,
last_seen_block: Gauge,
// How many block we manage to process while processing the block
blocks_processed: Counter,
// The number of the latest block
last_process_block: Gauge,

// Current commitment status
current_commitment_status: Gauge,
// Current commitment id
current_commitment: Family<CommitmentLabel, Gauge>,

// CUs metrics
cus_total: Gauge,
cus_in_deals: Gauge,

// Epoch Metrics
current_epoch: Gauge,
current_epoch_start_timestamp_sec: Gauge,
current_epoch_duration_sec: Gauge,
kmd-fl marked this conversation as resolved.
Show resolved Hide resolved
}

impl ChainListenerMetrics {
Expand Down Expand Up @@ -157,6 +176,55 @@ impl ChainListenerMetrics {
"Current commitment",
);

let cus_total = register(
sub_registry,
Gauge::default(),
"cus_total",
"Total number of CUs",
);

let cus_in_deals = register(
sub_registry,
Gauge::default(),
"cus_in_deals",
"Total number of CUs in deals",
);

let current_epoch = register(
sub_registry,
Gauge::default(),
"current_epoch",
"Current epoch",
);

let current_epoch_start_timestamp_sec = register(
sub_registry,
Gauge::default(),
"current_epoch_start_timestamp_sec",
"Current epoch start timestamp",
);

let current_epoch_duration_sec = register(
sub_registry,
Gauge::default(),
"current_epoch_duration_sec",
"Current epoch duration",
);

let ccp_proofs_per_epoch_allowed_max = register(
sub_registry,
Gauge::default(),
"ccp_proofs_per_epoch_allowed_max",
"Max amount of proofs are allowed per epoch",
);

let ccp_proofs_per_epoch_allowed_min = register(
sub_registry,
Gauge::default(),
"ccp_proofs_per_epoch_allowed_min",
"Min amount of proofs are allowed per epoch",
);

Self {
ccp_requests_total,
ccp_replies_total,
Expand All @@ -165,12 +233,19 @@ impl ChainListenerMetrics {
ccp_proofs_submit_failed,
ccp_proofs_tx_success,
ccp_proofs_tx_failed,
ccp_proofs_per_epoch_allowed_max,
ccp_proofs_per_epoch_allowed_min,
blocks_seen,
last_seen_block,
blocks_processed,
last_process_block,
cus_total,
cus_in_deals,
current_commitment_status,
current_commitment,
current_epoch,
current_epoch_start_timestamp_sec,
current_epoch_duration_sec,
}
}

Expand Down Expand Up @@ -225,4 +300,34 @@ impl ChainListenerMetrics {
.get_or_create(&CommitmentLabel { commitment_id })
.set(0);
}

pub fn observe_allowed_proofs_settings(&self, max_allowed: i64, min_allowed: i64) {
self.ccp_proofs_per_epoch_allowed_max.set(max_allowed);
self.ccp_proofs_per_epoch_allowed_min.set(min_allowed);
}

pub fn observe_current_epoch(&self, epoch: i64) {
self.current_epoch.set(epoch);
}

pub fn observe_epoch_settings(&self, start_timestamp: i64, duration: i64) {
self.current_epoch_start_timestamp_sec.set(start_timestamp);
self.current_epoch_duration_sec.set(duration);
}

pub fn observe_cus_total(&self, n: i64) {
self.cus_total.set(n);
}

pub fn observe_cus_in_deals(&self, n: i64) {
self.cus_in_deals.set(n);
}

pub fn observe_cus_in_deals_added(&self, n: i64) {
self.cus_in_deals.inc_by(n);
}

pub fn observe_cus_in_deals_removed(&self, n: i64) {
self.cus_in_deals.dec_by(n);
}
}
Loading