From 812eb18883cc70e069de632ffef4724eb0a4064c Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Tue, 15 Nov 2022 11:31:08 +0100 Subject: [PATCH] Ping tool exports prometheus metrics (#8023) Export prometheus metrics from the ping tool. The ping tool is not monitored by prometheus explicitly, therefore the metrics can be pushed to a metrics gateway. --- Cargo.lock | 8 +++++-- tools/ping/Cargo.toml | 4 ++++ tools/ping/src/cli.rs | 4 ++++ tools/ping/src/lib.rs | 49 +++++++++++++++++++++++++++++---------- tools/ping/src/metrics.rs | 33 ++++++++++++++++++++++++++ 5 files changed, 84 insertions(+), 14 deletions(-) create mode 100644 tools/ping/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 83932b42c1e..a1d58a90577 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3357,12 +3357,16 @@ dependencies = [ name = "near-ping" version = "0.0.0" dependencies = [ + "actix-web", "anyhow", "chrono", "clap 3.1.18", + "near-jsonrpc", "near-network", "near-o11y", "near-primitives", + "once_cell", + "prometheus", "tokio", "tracing", ] @@ -4351,9 +4355,9 @@ dependencies = [ [[package]] name = "prometheus" -version = "0.13.1" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cface98dfa6d645ea4c789839f176e4b072265d085bfcc48eaa8d137f58d3c39" +checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" dependencies = [ "cfg-if 1.0.0", "fnv", diff --git a/tools/ping/Cargo.toml b/tools/ping/Cargo.toml index d62d9804f5f..8511445863b 100644 --- a/tools/ping/Cargo.toml +++ b/tools/ping/Cargo.toml @@ -7,12 +7,16 @@ rust-version.workspace = true edition = "2021" [dependencies] +actix-web.workspace = true anyhow.workspace = true chrono.workspace = true clap.workspace = true +once_cell.workspace = true +prometheus.workspace = true tokio.workspace = true tracing.workspace = true +near-jsonrpc = { path = "../../chain/jsonrpc" } near-network = { path = "../../chain/network" } near-o11y = { path = "../../core/o11y" } near-primitives = { path = "../../core/primitives" } diff --git a/tools/ping/src/cli.rs b/tools/ping/src/cli.rs index 1ca13726a9b..05933d763b9 100644 --- a/tools/ping/src/cli.rs +++ b/tools/ping/src/cli.rs @@ -48,6 +48,9 @@ pub struct PingCommand { /// number of seconds to wait for incoming data before timing out #[clap(long)] recv_timeout_seconds: Option, + /// Listen address for prometheus metrics. + #[clap(long, default_value = "0.0.0.0:9000")] + prometheus_addr: String, } fn display_stats(stats: &mut [(crate::PeerIdentifier, crate::PingStats)], peer_id: &PeerId) { @@ -212,6 +215,7 @@ impl PingCommand { filter, csv, &mut stats, + &self.prometheus_addr, ) .await?; display_stats(&mut stats, &peer.id); diff --git a/tools/ping/src/lib.rs b/tools/ping/src/lib.rs index c3d24b831ca..faf55deb4d2 100644 --- a/tools/ping/src/lib.rs +++ b/tools/ping/src/lib.rs @@ -1,4 +1,6 @@ +use actix_web::{web, App, HttpServer}; use anyhow::Context; +pub use cli::PingCommand; use near_network::raw::{ConnectError, Connection, ReceivedMessage}; use near_network::time; use near_network::types::HandshakeFailureReason; @@ -14,8 +16,7 @@ use std::pin::Pin; pub mod cli; mod csv; - -pub use cli::PingCommand; +mod metrics; // TODO: also log number of bytes/other messages (like Blocks) received? #[derive(Debug, Default)] @@ -122,15 +123,17 @@ struct AppInfo { requests: BTreeMap>, timeouts: BTreeSet, account_filter: Option>, + chain_id: String, } impl AppInfo { - fn new(account_filter: Option>) -> Self { + fn new(account_filter: Option>, chain_id: &str) -> Self { Self { stats: HashMap::new(), requests: BTreeMap::new(), timeouts: BTreeSet::new(), account_filter, + chain_id: chain_id.to_owned(), } } @@ -143,10 +146,15 @@ impl AppInfo { None } - fn ping_sent(&mut self, peer_id: &PeerId, nonce: u64) { + fn ping_sent(&mut self, peer_id: &PeerId, nonce: u64, chain_id: &str) { let timestamp = time::Instant::now(); let timeout = timestamp + PING_TIMEOUT; + let account_id = self.peer_id_to_account_id(&peer_id); + crate::metrics::PING_SENT + .with_label_values(&[&chain_id, &peer_str(peer_id, account_id)]) + .inc(); + match self.stats.entry(peer_id.clone()) { Entry::Occupied(mut e) => { let state = e.get_mut(); @@ -212,7 +220,7 @@ impl AppInfo { assert!(self.timeouts.remove(&PingTimeout { peer_id: peer_id.clone(), nonce, - timeout: times.timeout + timeout: times.timeout, })); let l: std::time::Duration = latency.try_into().unwrap(); @@ -322,8 +330,12 @@ fn handle_message( ) -> anyhow::Result<()> { match &msg { ReceivedMessage::Pong { nonce, source } => { + let chain_id = app_info.chain_id.clone(); // Avoid an immutable borrow during a mutable borrow. if let Some((latency, account_id)) = app_info.pong_received(source, *nonce, received_at) { + crate::metrics::PONG_RECEIVED + .with_label_values(&[&chain_id, &peer_str(&source, account_id)]) + .observe(latency.as_seconds_f64()); if let Some(csv) = latencies_csv { csv.write(source, account_id, latency).context("Failed writing to CSV file")?; } @@ -380,8 +392,9 @@ async fn ping_via_node( account_filter: Option>, mut latencies_csv: Option, ping_stats: &mut Vec<(PeerIdentifier, PingStats)>, + prometheus_addr: &str, ) -> anyhow::Result<()> { - let mut app_info = AppInfo::new(account_filter); + let mut app_info = AppInfo::new(account_filter, chain_id); app_info.add_peer(&peer_id, None); @@ -392,10 +405,7 @@ async fn ping_via_node( chain_id, genesis_hash, head_height, - time::Duration::seconds(recv_timeout_seconds.into()), - ) - .await - { + time::Duration::seconds(recv_timeout_seconds.into())).await { Ok(p) => p, Err(ConnectError::HandshakeFailure(reason)) => { match reason { @@ -425,6 +435,19 @@ async fn ping_via_node( let next_timeout = tokio::time::sleep(std::time::Duration::ZERO); tokio::pin!(next_timeout); + let server = HttpServer::new(move || { + App::new().service( + web::resource("/metrics").route(web::get().to(near_jsonrpc::prometheus_handler)), + ) + }) + .bind(prometheus_addr) + .unwrap() + .workers(1) + .shutdown_timeout(3) + .disable_signals() + .run(); + tokio::spawn(server); + loop { let target = app_info.pick_next_target(); let pending_timeout = prepare_timeout(next_timeout.as_mut(), &app_info); @@ -436,7 +459,7 @@ async fn ping_via_node( if result.is_err() { break; } - app_info.ping_sent(&target, nonce); + app_info.ping_sent(&target, nonce, &chain_id); nonce += 1; next_ping.as_mut().reset(tokio::time::Instant::now() + std::time::Duration::from_millis(ping_frequency_millis)); } @@ -461,10 +484,12 @@ async fn ping_via_node( _ = &mut next_timeout, if pending_timeout.is_some() => { let t = pending_timeout.unwrap(); app_info.pop_timeout(&t); + let account_id = app_info.peer_id_to_account_id(&t.peer_id); + crate::metrics::PONG_TIMEOUTS.with_label_values(&[&chain_id, &peer_str(&t.peer_id, account_id)]).inc(); if let Some(csv) = latencies_csv.as_mut() { result = csv.write_timeout( &t.peer_id, - app_info.peer_id_to_account_id(&t.peer_id) + account_id, ) .context("Failed writing to CSV file"); if result.is_err() { diff --git a/tools/ping/src/metrics.rs b/tools/ping/src/metrics.rs new file mode 100644 index 00000000000..1ae364a692c --- /dev/null +++ b/tools/ping/src/metrics.rs @@ -0,0 +1,33 @@ +use near_o11y::metrics::{ + exponential_buckets, try_create_histogram_vec, try_create_int_counter_vec, HistogramVec, + IntCounterVec, +}; +use once_cell::sync::Lazy; + +pub(crate) static PONG_RECEIVED: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "ping_pong_received", + "Round-trip time of ping-pong", + &["chain_id", "account_id"], + Some(exponential_buckets(0.00001, 1.6, 40).unwrap()), + ) + .unwrap() +}); + +pub(crate) static PONG_TIMEOUTS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "ping_pong_timeout", + "Number of pongs that were not received", + &["chain_id", "account_id"], + ) + .unwrap() +}); + +pub(crate) static PING_SENT: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "ping_ping_sent", + "Number of pings sent", + &["chain_id", "account_id"], + ) + .unwrap() +});