Skip to content

Commit

Permalink
Ping tool exports prometheus metrics (#8023)
Browse files Browse the repository at this point in the history
Export prometheus metrics from the ping tool.
The ping tool is not monitored by prometheus explicitly, therefore the metrics can be pushed to a metrics gateway.
  • Loading branch information
nikurt authored Nov 15, 2022
1 parent 1dcfe52 commit 812eb18
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 14 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions tools/ping/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ rust-version.workspace = true
edition = "2021"

[dependencies]
actix-web.workspace = true
anyhow.workspace = true
chrono.workspace = true
clap.workspace = true
once_cell.workspace = true
prometheus.workspace = true
tokio.workspace = true
tracing.workspace = true

near-jsonrpc = { path = "../../chain/jsonrpc" }
near-network = { path = "../../chain/network" }
near-o11y = { path = "../../core/o11y" }
near-primitives = { path = "../../core/primitives" }
4 changes: 4 additions & 0 deletions tools/ping/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct PingCommand {
/// number of seconds to wait for incoming data before timing out
#[clap(long)]
recv_timeout_seconds: Option<u32>,
/// Listen address for prometheus metrics.
#[clap(long, default_value = "0.0.0.0:9000")]
prometheus_addr: String,
}

fn display_stats(stats: &mut [(crate::PeerIdentifier, crate::PingStats)], peer_id: &PeerId) {
Expand Down Expand Up @@ -212,6 +215,7 @@ impl PingCommand {
filter,
csv,
&mut stats,
&self.prometheus_addr,
)
.await?;
display_stats(&mut stats, &peer.id);
Expand Down
49 changes: 37 additions & 12 deletions tools/ping/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use actix_web::{web, App, HttpServer};
use anyhow::Context;
pub use cli::PingCommand;
use near_network::raw::{ConnectError, Connection, ReceivedMessage};
use near_network::time;
use near_network::types::HandshakeFailureReason;
Expand All @@ -14,8 +16,7 @@ use std::pin::Pin;

pub mod cli;
mod csv;

pub use cli::PingCommand;
mod metrics;

// TODO: also log number of bytes/other messages (like Blocks) received?
#[derive(Debug, Default)]
Expand Down Expand Up @@ -122,15 +123,17 @@ struct AppInfo {
requests: BTreeMap<PingTarget, HashMap<Nonce, PingTimes>>,
timeouts: BTreeSet<PingTimeout>,
account_filter: Option<HashSet<AccountId>>,
chain_id: String,
}

impl AppInfo {
fn new(account_filter: Option<HashSet<AccountId>>) -> Self {
fn new(account_filter: Option<HashSet<AccountId>>, chain_id: &str) -> Self {
Self {
stats: HashMap::new(),
requests: BTreeMap::new(),
timeouts: BTreeSet::new(),
account_filter,
chain_id: chain_id.to_owned(),
}
}

Expand All @@ -143,10 +146,15 @@ impl AppInfo {
None
}

fn ping_sent(&mut self, peer_id: &PeerId, nonce: u64) {
fn ping_sent(&mut self, peer_id: &PeerId, nonce: u64, chain_id: &str) {
let timestamp = time::Instant::now();
let timeout = timestamp + PING_TIMEOUT;

let account_id = self.peer_id_to_account_id(&peer_id);
crate::metrics::PING_SENT
.with_label_values(&[&chain_id, &peer_str(peer_id, account_id)])
.inc();

match self.stats.entry(peer_id.clone()) {
Entry::Occupied(mut e) => {
let state = e.get_mut();
Expand Down Expand Up @@ -212,7 +220,7 @@ impl AppInfo {
assert!(self.timeouts.remove(&PingTimeout {
peer_id: peer_id.clone(),
nonce,
timeout: times.timeout
timeout: times.timeout,
}));

let l: std::time::Duration = latency.try_into().unwrap();
Expand Down Expand Up @@ -322,8 +330,12 @@ fn handle_message(
) -> anyhow::Result<()> {
match &msg {
ReceivedMessage::Pong { nonce, source } => {
let chain_id = app_info.chain_id.clone(); // Avoid an immutable borrow during a mutable borrow.
if let Some((latency, account_id)) = app_info.pong_received(source, *nonce, received_at)
{
crate::metrics::PONG_RECEIVED
.with_label_values(&[&chain_id, &peer_str(&source, account_id)])
.observe(latency.as_seconds_f64());
if let Some(csv) = latencies_csv {
csv.write(source, account_id, latency).context("Failed writing to CSV file")?;
}
Expand Down Expand Up @@ -380,8 +392,9 @@ async fn ping_via_node(
account_filter: Option<HashSet<AccountId>>,
mut latencies_csv: Option<crate::csv::LatenciesCsv>,
ping_stats: &mut Vec<(PeerIdentifier, PingStats)>,
prometheus_addr: &str,
) -> anyhow::Result<()> {
let mut app_info = AppInfo::new(account_filter);
let mut app_info = AppInfo::new(account_filter, chain_id);

app_info.add_peer(&peer_id, None);

Expand All @@ -392,10 +405,7 @@ async fn ping_via_node(
chain_id,
genesis_hash,
head_height,
time::Duration::seconds(recv_timeout_seconds.into()),
)
.await
{
time::Duration::seconds(recv_timeout_seconds.into())).await {
Ok(p) => p,
Err(ConnectError::HandshakeFailure(reason)) => {
match reason {
Expand Down Expand Up @@ -425,6 +435,19 @@ async fn ping_via_node(
let next_timeout = tokio::time::sleep(std::time::Duration::ZERO);
tokio::pin!(next_timeout);

let server = HttpServer::new(move || {
App::new().service(
web::resource("/metrics").route(web::get().to(near_jsonrpc::prometheus_handler)),
)
})
.bind(prometheus_addr)
.unwrap()
.workers(1)
.shutdown_timeout(3)
.disable_signals()
.run();
tokio::spawn(server);

loop {
let target = app_info.pick_next_target();
let pending_timeout = prepare_timeout(next_timeout.as_mut(), &app_info);
Expand All @@ -436,7 +459,7 @@ async fn ping_via_node(
if result.is_err() {
break;
}
app_info.ping_sent(&target, nonce);
app_info.ping_sent(&target, nonce, &chain_id);
nonce += 1;
next_ping.as_mut().reset(tokio::time::Instant::now() + std::time::Duration::from_millis(ping_frequency_millis));
}
Expand All @@ -461,10 +484,12 @@ async fn ping_via_node(
_ = &mut next_timeout, if pending_timeout.is_some() => {
let t = pending_timeout.unwrap();
app_info.pop_timeout(&t);
let account_id = app_info.peer_id_to_account_id(&t.peer_id);
crate::metrics::PONG_TIMEOUTS.with_label_values(&[&chain_id, &peer_str(&t.peer_id, account_id)]).inc();
if let Some(csv) = latencies_csv.as_mut() {
result = csv.write_timeout(
&t.peer_id,
app_info.peer_id_to_account_id(&t.peer_id)
account_id,
)
.context("Failed writing to CSV file");
if result.is_err() {
Expand Down
33 changes: 33 additions & 0 deletions tools/ping/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use near_o11y::metrics::{
exponential_buckets, try_create_histogram_vec, try_create_int_counter_vec, HistogramVec,
IntCounterVec,
};
use once_cell::sync::Lazy;

pub(crate) static PONG_RECEIVED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"ping_pong_received",
"Round-trip time of ping-pong",
&["chain_id", "account_id"],
Some(exponential_buckets(0.00001, 1.6, 40).unwrap()),
)
.unwrap()
});

pub(crate) static PONG_TIMEOUTS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"ping_pong_timeout",
"Number of pongs that were not received",
&["chain_id", "account_id"],
)
.unwrap()
});

pub(crate) static PING_SENT: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"ping_ping_sent",
"Number of pings sent",
&["chain_id", "account_id"],
)
.unwrap()
});

0 comments on commit 812eb18

Please sign in to comment.