From 7ecd32768e41fe8f497c16d69ea24392ec1c2a33 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Thu, 20 Feb 2025 13:06:31 +0000 Subject: [PATCH] experiment --- graph/proto/tracing.proto | 20 ++-- graph/src/components/tracing.rs | 143 +++++++++++++++---------- graph/src/env/mod.rs | 5 + graph/src/grpc/pb/graph.tracing.v1.rs | 20 +++- node/src/main.rs | 24 ++--- node/src/opt.rs | 7 ++ server/grpc/src/lib.rs | 144 +++++++++++--------------- store/postgres/src/lib.rs | 3 +- store/postgres/src/relational.rs | 3 +- 9 files changed, 200 insertions(+), 169 deletions(-) diff --git a/graph/proto/tracing.proto b/graph/proto/tracing.proto index 9cf036d126e..455b2bef323 100644 --- a/graph/proto/tracing.proto +++ b/graph/proto/tracing.proto @@ -2,16 +2,18 @@ syntax = "proto3"; package graph.tracing.v1; -service Stream { - rpc QueryTrace(Request) returns (stream Trace); -} +service Stream { rpc QueryTrace(Request) returns (stream Response); } -message Request { - int32 deployment_id = 1; -} +message Request { int32 deployment_id = 1; } + +message Response { repeated Trace traces = 1; } message Trace { - int32 deployment_id = 1; - string query = 2; - uint64 duration_millis = 3; + int32 deployment_id = 1; + string query = 2; + uint64 duration_millis = 3; + uint32 children = 4; + optional uint64 conn_wait_millis = 5; + optional uint64 permit_wait_millis = 6; + optional uint64 entity_count = 7; } diff --git a/graph/src/components/tracing.rs b/graph/src/components/tracing.rs index c6ac44fd3d7..adc20aada3a 100644 --- a/graph/src/components/tracing.rs +++ b/graph/src/components/tracing.rs @@ -1,14 +1,24 @@ +use lazy_static::lazy_static; + use std::{collections::HashMap, sync::Arc, time::Duration}; -use futures03::TryFutureExt; -use slog::{o, Logger}; use tokio::sync::{mpsc, watch::Receiver, RwLock}; -use crate::prelude::LoggerFactory; - use super::store::DeploymentId; const DEFAULT_BUFFER_SIZE: usize = 100; +#[cfg(not(test))] +const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_secs(10); +#[cfg(test)] +const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_millis(100); +lazy_static! { + pub static ref TRACING_RUNTIME: tokio::runtime::Runtime = + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); +} #[derive(Debug, Clone)] pub struct Subscriptions { @@ -33,59 +43,42 @@ pub struct TracingControl { default_buffer_size: usize, } -// impl Default for TracingControl { -// fn default() -> Self { -// let subscriptions = Subscriptions::default(); -// let subs = subscriptions.clone(); -// let watcher = std::thread::spawn(move || { -// let runtime = tokio::runtime::Builder::new_multi_thread() -// .enable_all() -// .build() -// .unwrap(); -// runtime.block_on() -// }) -// .join() -// .unwrap() -// .unwrap(); - -// Self { -// subscriptions, -// default_buffer_size: DEFAULT_BUFFER_SIZE, -// watcher, -// } -// } -// } - impl TracingControl { - pub async fn start() -> Self { + /// Starts a new tracing control instance.If an async runtime is not available, a new one will be created. + pub fn start() -> Self { + Self::new(DEFAULT_BUFFER_SIZE) + } + + pub fn new(buffer_size: usize) -> Self { let subscriptions = Subscriptions::default(); let subs = subscriptions.clone(); - let watcher = indexer_watcher::new_watcher( - #[cfg(test)] - Duration::from_millis(100), - #[cfg(not(test))] - Duration::from_secs(30), - move || { - let subs = subs.clone(); - - async move { Ok(subs.inner.read().await.clone()) } - }, - ) - .await + + let watcher = std::thread::spawn(move || { + let handle = + tokio::runtime::Handle::try_current().unwrap_or(TRACING_RUNTIME.handle().clone()); + + handle.block_on(async move { + indexer_watcher::new_watcher(INDEXER_WATCHER_INTERVAL, move || { + let subs = subs.clone(); + + async move { Ok(subs.inner.read().await.clone()) } + }) + .await + }) + }) + .join() + .unwrap() .unwrap(); + Self { watcher, subscriptions, - default_buffer_size: DEFAULT_BUFFER_SIZE, + default_buffer_size: buffer_size, } } - // pub fn new(default_buffer_size: Option) -> Self { - // Self { - // default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE), - // ..Default::default() - // } - // } + /// Returns a producer for a given deployment ID. If the producer is closed, it will return None. + /// The producer could still be closed in the meantime. pub fn producer(&self, key: DeploymentId) -> Option> { self.watcher .borrow() @@ -94,6 +87,8 @@ impl TracingControl { .filter(|sender| !sender.is_closed()) } + /// Creates a new subscription for a given deployment ID with a given buffer size. If a subscription already + /// exists, it will be replaced. pub async fn subscribe_with_chan_size( &self, key: DeploymentId, @@ -118,14 +113,41 @@ impl TracingControl { mod test { use anyhow::anyhow; + use tokio::time::{self, Instant}; use tokio_retry::Retry; use super::*; - use std::{future::IntoFuture, sync::Arc}; + use std::sync::Arc; + + #[tokio::test] + async fn test_watcher() { + let x = time::Instant::now(); + let x = indexer_watcher::new_watcher(Duration::from_millis(10), move || { + let x = x.clone(); + + async move { + let now = Instant::now(); + Ok(now.duration_since(x)) + } + }) + .await + .unwrap(); + + Retry::spawn(vec![Duration::from_secs(10); 3].into_iter(), move || { + let x = x.clone(); + async move { + let count = x.borrow().clone(); + println!("{}", count.as_millis()); + Err::(anyhow!("millis: {}", count.as_millis())) + } + }) + .await + .unwrap(); + } #[tokio::test] async fn test_tracing_control() { - let control: TracingControl<()> = TracingControl::start().await; + let control: TracingControl<()> = TracingControl::start(); let control = Arc::new(control); // produce before subscription @@ -133,11 +155,11 @@ mod test { assert!(tx.is_none()); // drop the subscription - let rx = control.subscribe(DeploymentId(123)); + let rx = control.subscribe(DeploymentId(123)).await; let c = control.clone(); // check subscription is none because channel is closed - let tx = Retry::spawn(vec![Duration::from_secs(5); 10].into_iter(), move || { + let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || { let control = c.clone(); async move { match control.producer(DeploymentId(123)) { @@ -158,9 +180,22 @@ mod test { assert!(tx.is_none()); // re-create subscription - let _rx = control.subscribe(DeploymentId(123)); + let _rx = control.subscribe(DeploymentId(123)).await; + // check old subscription was replaced - let tx = control.producer(DeploymentId(123)); - assert!(!tx.unwrap().is_closed()) + let c = control.clone(); + let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || { + let tx = c.producer(DeploymentId(123)); + async move { + match tx { + Some(sender) if !sender.is_closed() => Ok(sender), + Some(_) => Err(anyhow!("Sender is closed")), + None => Err(anyhow!("Sender not created yet")), + } + } + }) + .await + .unwrap(); + assert!(!tx.is_closed()) } } diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 4383ce17b5c..ec895a4b489 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -247,6 +247,8 @@ pub struct EnvVars { /// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS`. /// The default value is 60 seconds. pub firehose_block_fetch_timeout: u64, + /// Set by the environment variable `GRAPH_ENABLE_TRACING_GRPC_SERVER`. + pub enable_tracing_grpc_server: bool, } impl EnvVars { @@ -339,6 +341,7 @@ impl EnvVars { block_write_capacity: inner.block_write_capacity.0, firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit, firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout, + enable_tracing_grpc_server: inner.enable_tracing_grpc_server, }) } @@ -506,6 +509,8 @@ struct Inner { firehose_block_fetch_retry_limit: usize, #[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS", default = "60")] firehose_block_fetch_timeout: u64, + #[envconfig(from = "GRAPH_NODE_ENABLE_QUERY_TRACING_GRPC", default = "false")] + enable_tracing_grpc_server: EnvVarBoolean, } #[derive(Clone, Debug)] diff --git a/graph/src/grpc/pb/graph.tracing.v1.rs b/graph/src/grpc/pb/graph.tracing.v1.rs index 472c0d14a26..47e589c5bef 100644 --- a/graph/src/grpc/pb/graph.tracing.v1.rs +++ b/graph/src/grpc/pb/graph.tracing.v1.rs @@ -7,6 +7,12 @@ pub struct Request { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] +pub struct Response { + #[prost(message, repeated, tag = "1")] + pub traces: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct Trace { #[prost(int32, tag = "1")] pub deployment_id: i32, @@ -14,6 +20,14 @@ pub struct Trace { pub query: ::prost::alloc::string::String, #[prost(uint64, tag = "3")] pub duration_millis: u64, + #[prost(uint32, tag = "4")] + pub children: u32, + #[prost(uint64, optional, tag = "5")] + pub conn_wait_millis: ::core::option::Option, + #[prost(uint64, optional, tag = "6")] + pub permit_wait_millis: ::core::option::Option, + #[prost(uint64, optional, tag = "7")] + pub entity_count: ::core::option::Option, } /// Generated client implementations. pub mod stream_client { @@ -104,7 +118,7 @@ pub mod stream_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< - tonic::Response>, + tonic::Response>, tonic::Status, > { self.inner @@ -136,7 +150,7 @@ pub mod stream_server { pub trait Stream: Send + Sync + 'static { /// Server streaming response type for the QueryTrace method. type QueryTraceStream: tonic::codegen::tokio_stream::Stream< - Item = std::result::Result, + Item = std::result::Result, > + Send + 'static; @@ -229,7 +243,7 @@ pub mod stream_server { struct QueryTraceSvc(pub Arc); impl tonic::server::ServerStreamingService for QueryTraceSvc { - type Response = super::Trace; + type Response = super::Response; type ResponseStream = T::QueryTraceStream; type Future = BoxFuture< tonic::Response, diff --git a/node/src/main.rs b/node/src/main.rs index 9c3f00d34ae..15404c90225 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -154,6 +154,9 @@ async fn main() { // Obtain metrics server port let metrics_port = opt.metrics_port; + // Obtain tracing server port + let tracing_grpc_port = opt.grpc_port; + // Obtain the fork base URL let fork_base = match &opt.fork_base { Some(url) => { @@ -517,9 +520,11 @@ async fn main() { // Run the index node server graph::spawn(async move { index_node_server.start(index_node_port).await }); - graph::spawn(async move { - graph_server_grpc::start(8888).await.unwrap(); - }); + if env_vars.enable_tracing_grpc_server { + graph::spawn(async move { + graph_server_grpc::start(tracing_grpc_port).await.unwrap(); + }); + } graph::spawn(async move { metrics_server @@ -564,19 +569,6 @@ async fn main() { } }); - let mut rx = TRACING_CONTROL.subscribe(DeploymentId(1)).await; - loop { - let trace = rx.recv().await; - match trace { - Some(trace) => { - info!(&logger, "#### trace: {:?}", trace); - } - None => { - break; - } - } - } - graph::futures03::future::pending::<()>().await; } diff --git a/node/src/opt.rs b/node/src/opt.rs index e4dc44ba92a..ebe74868cf5 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -154,6 +154,13 @@ pub struct Opt { help = "Port for the Prometheus metrics server" )] pub metrics_port: u16, + #[clap( + long, + default_value = "8060", + value_name = "PORT", + help = "Port for the query tracing GRPC Server" + )] + pub grpc_port: u16, #[clap( long, default_value = "default", diff --git a/server/grpc/src/lib.rs b/server/grpc/src/lib.rs index 85773a065d0..69df7a7592f 100644 --- a/server/grpc/src/lib.rs +++ b/server/grpc/src/lib.rs @@ -3,17 +3,17 @@ pub struct TracingServer; use std::pin::Pin; use graph::{ - futures03::Stream, + data::query::Trace, + futures03::{stream::unfold, Stream, StreamExt as _}, grpc::pb::graph::tracing::v1::{ stream_server::{Stream as StreamProto, StreamServer}, - Request, Trace as TraceProto, + Request, Response as ResponseProto, Trace as TraceProto, }, - tokio_stream::wrappers::ReceiverStream, }; use graph_store_postgres::TRACING_CONTROL; use tonic::{async_trait, Status}; -type ResponseStream = Pin> + Send>>; +type ResponseStream = Pin> + Send>>; #[async_trait] impl StreamProto for TracingServer { @@ -25,94 +25,72 @@ impl StreamProto for TracingServer { ) -> std::result::Result, tonic::Status> { let Request { deployment_id } = request.into_inner(); - let mut rx = TRACING_CONTROL + let rx = TRACING_CONTROL .subscribe(graph::components::store::DeploymentId(deployment_id)) .await; - // let stream: Pin>> = unfold(rx, |mut rx| async move { - // rx.recv().await.map(|trace| { - // let trace = match trace { - // graph::data::query::Trace::None => vec![], - // graph::data::query::Trace::Root { - // query, - // variables, - // query_id, - // setup, - // elapsed, - // query_parsing, - // blocks, - // } => vec![], - // graph::data::query::Trace::Block { - // block, - // elapsed, - // permit_wait, - // children, - // } => vec![], - // graph::data::query::Trace::Query { - // query, - // elapsed, - // conn_wait, - // permit_wait, - // entity_count, - // children, - // } => vec![TraceProto { - // deployment_id, - // query, - // duration_millis: elapsed.as_millis() as u64, - // }], - // }; - - // (trace, rx) - // }) - // }) - // .boxed(); - // .flatten(); - // - let (tx, rx2) = tokio::sync::mpsc::channel(100); - - tokio::spawn(async move { - while let Some(result) = rx.recv().await { - let out = match result { - graph::data::query::Trace::None => continue, - graph::data::query::Trace::Root { - query: _, - variables: _, - query_id: _, - setup: _, - elapsed: _, - query_parsing: _, - blocks: _, - } => continue, - graph::data::query::Trace::Block { - block: _, - elapsed: _, - permit_wait: _, - children: _, - } => continue, - graph::data::query::Trace::Query { - query, - elapsed, - conn_wait: _, - permit_wait: _, - entity_count: _, - children: _, - } => TraceProto { + fn query_traces(deployment_id: i32, trace: Trace, out: &mut Vec) { + match trace { + Trace::Root { + query, + variables: _, + query_id: _, + setup: _, + elapsed, + query_parsing: _, + blocks: _, + } => out.push(TraceProto { + deployment_id, + query: query.to_string(), + duration_millis: elapsed.as_millis() as u64, + children: 0, + conn_wait_millis: None, + permit_wait_millis: None, + entity_count: None, + }), + Trace::Query { + query, + elapsed, + conn_wait, + permit_wait, + entity_count, + children, + } => { + out.push(TraceProto { deployment_id, - query, + query: query.to_string(), duration_millis: elapsed.as_millis() as u64, - }, - }; + children: children.len() as u32, + conn_wait_millis: Some(conn_wait.as_millis() as u64), + permit_wait_millis: Some(permit_wait.as_millis() as u64), + entity_count: Some(entity_count as u64), + }); - tx.send(Ok(out)).await.unwrap(); + for (_key, child) in children { + query_traces(deployment_id, child, out); + } + } + _ => return, } - println!("\tstream ended"); - }); + } + + let stream: Pin> + Send>> = + unfold(rx, move |mut rx| async move { + rx.recv().await.and_then(|trace| { + let mut traces = vec![]; + query_traces(deployment_id, trace, &mut traces); - let out_stream = ReceiverStream::new(rx2); + if traces.is_empty() { + None + } else { + Some((ResponseProto { traces }, rx)) + } + }) + }) + .map(Ok) + .boxed(); - Ok(tonic::Response::new( - Box::pin(out_stream) as Self::QueryTraceStream - )) + Ok(tonic::Response::new(stream as Self::QueryTraceStream)) } } diff --git a/store/postgres/src/lib.rs b/store/postgres/src/lib.rs index 1e2dc912713..83ccd93a8eb 100644 --- a/store/postgres/src/lib.rs +++ b/store/postgres/src/lib.rs @@ -18,8 +18,7 @@ use graph::data::query::Trace; use graph::prelude::lazy_static; lazy_static! { - pub static ref TRACING_CONTROL: Arc> = - Arc::new(TracingControl::default()); + pub static ref TRACING_CONTROL: Arc> = Arc::new(TracingControl::start()); } mod advisory_lock; diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 9719d0cee26..6930978cea5 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -33,6 +33,7 @@ use graph::blockchain::BlockTime; use graph::cheap_clone::CheapClone; use graph::components::store::write::{RowGroup, WriteChunk}; use graph::components::subgraph::PoICausalityRegion; +use graph::constraint_violation; use graph::data::graphql::TypeExt as _; use graph::data::query::Trace; use graph::data::value::Word; @@ -42,7 +43,6 @@ use graph::schema::{ EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition, InputSchema, }; use graph::slog::warn; -use graph::{constraint_violation, tokio}; use index::IndexList; use inflector::Inflector; use itertools::Itertools; @@ -54,7 +54,6 @@ use std::fmt::{self, Write}; use std::ops::Range; use std::str::FromStr; use std::sync::{Arc, Mutex}; -use std::thread; use std::time::{Duration, Instant}; use crate::relational::value::{FromOidRow, OidRow};