Skip to content

Commit

Permalink
experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Feb 24, 2025
1 parent ae551e6 commit 7ecd327
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 169 deletions.
20 changes: 11 additions & 9 deletions graph/proto/tracing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ syntax = "proto3";

package graph.tracing.v1;

service Stream {
rpc QueryTrace(Request) returns (stream Trace);
}
service Stream { rpc QueryTrace(Request) returns (stream Response); }

message Request {
int32 deployment_id = 1;
}
message Request { int32 deployment_id = 1; }

message Response { repeated Trace traces = 1; }

message Trace {
int32 deployment_id = 1;
string query = 2;
uint64 duration_millis = 3;
int32 deployment_id = 1;
string query = 2;
uint64 duration_millis = 3;
uint32 children = 4;
optional uint64 conn_wait_millis = 5;
optional uint64 permit_wait_millis = 6;
optional uint64 entity_count = 7;
}
143 changes: 89 additions & 54 deletions graph/src/components/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
use lazy_static::lazy_static;

use std::{collections::HashMap, sync::Arc, time::Duration};

use futures03::TryFutureExt;
use slog::{o, Logger};
use tokio::sync::{mpsc, watch::Receiver, RwLock};

use crate::prelude::LoggerFactory;

use super::store::DeploymentId;

const DEFAULT_BUFFER_SIZE: usize = 100;
#[cfg(not(test))]
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_secs(10);
#[cfg(test)]
const INDEXER_WATCHER_INTERVAL: Duration = Duration::from_millis(100);
lazy_static! {
pub static ref TRACING_RUNTIME: tokio::runtime::Runtime =
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
}

#[derive(Debug, Clone)]
pub struct Subscriptions<T> {
Expand All @@ -33,59 +43,42 @@ pub struct TracingControl<T> {
default_buffer_size: usize,
}

// impl<T: Send + Clone + 'static> Default for TracingControl<T> {
// fn default() -> Self {
// let subscriptions = Subscriptions::default();
// let subs = subscriptions.clone();
// let watcher = std::thread::spawn(move || {
// let runtime = tokio::runtime::Builder::new_multi_thread()
// .enable_all()
// .build()
// .unwrap();
// runtime.block_on()
// })
// .join()
// .unwrap()
// .unwrap();

// Self {
// subscriptions,
// default_buffer_size: DEFAULT_BUFFER_SIZE,
// watcher,
// }
// }
// }

impl<T: Send + Clone + 'static> TracingControl<T> {
pub async fn start() -> Self {
/// Starts a new tracing control instance.If an async runtime is not available, a new one will be created.
pub fn start() -> Self {
Self::new(DEFAULT_BUFFER_SIZE)
}

pub fn new(buffer_size: usize) -> Self {
let subscriptions = Subscriptions::default();
let subs = subscriptions.clone();
let watcher = indexer_watcher::new_watcher(
#[cfg(test)]
Duration::from_millis(100),
#[cfg(not(test))]
Duration::from_secs(30),
move || {
let subs = subs.clone();

async move { Ok(subs.inner.read().await.clone()) }
},
)
.await

let watcher = std::thread::spawn(move || {
let handle =
tokio::runtime::Handle::try_current().unwrap_or(TRACING_RUNTIME.handle().clone());

handle.block_on(async move {
indexer_watcher::new_watcher(INDEXER_WATCHER_INTERVAL, move || {
let subs = subs.clone();

async move { Ok(subs.inner.read().await.clone()) }
})
.await
})
})
.join()
.unwrap()
.unwrap();

Self {
watcher,
subscriptions,
default_buffer_size: DEFAULT_BUFFER_SIZE,
default_buffer_size: buffer_size,
}
}
// pub fn new(default_buffer_size: Option<usize>) -> Self {
// Self {
// default_buffer_size: default_buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE),
// ..Default::default()
// }
// }

/// Returns a producer for a given deployment ID. If the producer is closed, it will return None.
/// The producer could still be closed in the meantime.
pub fn producer(&self, key: DeploymentId) -> Option<mpsc::Sender<T>> {
self.watcher
.borrow()
Expand All @@ -94,6 +87,8 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
.filter(|sender| !sender.is_closed())
}

/// Creates a new subscription for a given deployment ID with a given buffer size. If a subscription already
/// exists, it will be replaced.
pub async fn subscribe_with_chan_size(
&self,
key: DeploymentId,
Expand All @@ -118,26 +113,53 @@ impl<T: Send + Clone + 'static> TracingControl<T> {
mod test {

use anyhow::anyhow;
use tokio::time::{self, Instant};
use tokio_retry::Retry;

use super::*;
use std::{future::IntoFuture, sync::Arc};
use std::sync::Arc;

#[tokio::test]
async fn test_watcher() {
let x = time::Instant::now();
let x = indexer_watcher::new_watcher(Duration::from_millis(10), move || {
let x = x.clone();

async move {
let now = Instant::now();
Ok(now.duration_since(x))
}
})
.await
.unwrap();

Retry::spawn(vec![Duration::from_secs(10); 3].into_iter(), move || {
let x = x.clone();
async move {
let count = x.borrow().clone();
println!("{}", count.as_millis());
Err::<Duration, anyhow::Error>(anyhow!("millis: {}", count.as_millis()))
}
})
.await
.unwrap();
}

#[tokio::test]
async fn test_tracing_control() {
let control: TracingControl<()> = TracingControl::start().await;
let control: TracingControl<()> = TracingControl::start();
let control = Arc::new(control);

// produce before subscription
let tx = control.producer(DeploymentId(123));
assert!(tx.is_none());

// drop the subscription
let rx = control.subscribe(DeploymentId(123));
let rx = control.subscribe(DeploymentId(123)).await;

let c = control.clone();
// check subscription is none because channel is closed
let tx = Retry::spawn(vec![Duration::from_secs(5); 10].into_iter(), move || {
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
let control = c.clone();
async move {
match control.producer(DeploymentId(123)) {
Expand All @@ -158,9 +180,22 @@ mod test {
assert!(tx.is_none());

// re-create subscription
let _rx = control.subscribe(DeploymentId(123));
let _rx = control.subscribe(DeploymentId(123)).await;

// check old subscription was replaced
let tx = control.producer(DeploymentId(123));
assert!(!tx.unwrap().is_closed())
let c = control.clone();
let tx = Retry::spawn(vec![INDEXER_WATCHER_INTERVAL; 2].into_iter(), move || {
let tx = c.producer(DeploymentId(123));
async move {
match tx {
Some(sender) if !sender.is_closed() => Ok(sender),
Some(_) => Err(anyhow!("Sender is closed")),
None => Err(anyhow!("Sender not created yet")),
}
}
})
.await
.unwrap();
assert!(!tx.is_closed())
}
}
5 changes: 5 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ pub struct EnvVars {
/// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS`.
/// The default value is 60 seconds.
pub firehose_block_fetch_timeout: u64,
/// Set by the environment variable `GRAPH_ENABLE_TRACING_GRPC_SERVER`.
pub enable_tracing_grpc_server: bool,
}

impl EnvVars {
Expand Down Expand Up @@ -339,6 +341,7 @@ impl EnvVars {
block_write_capacity: inner.block_write_capacity.0,
firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit,
firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout,
enable_tracing_grpc_server: inner.enable_tracing_grpc_server,
})
}

Expand Down Expand Up @@ -506,6 +509,8 @@ struct Inner {
firehose_block_fetch_retry_limit: usize,
#[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS", default = "60")]
firehose_block_fetch_timeout: u64,
#[envconfig(from = "GRAPH_NODE_ENABLE_QUERY_TRACING_GRPC", default = "false")]
enable_tracing_grpc_server: EnvVarBoolean,
}

#[derive(Clone, Debug)]
Expand Down
20 changes: 17 additions & 3 deletions graph/src/grpc/pb/graph.tracing.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,27 @@ pub struct Request {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Response {
#[prost(message, repeated, tag = "1")]
pub traces: ::prost::alloc::vec::Vec<Trace>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Trace {
#[prost(int32, tag = "1")]
pub deployment_id: i32,
#[prost(string, tag = "2")]
pub query: ::prost::alloc::string::String,
#[prost(uint64, tag = "3")]
pub duration_millis: u64,
#[prost(uint32, tag = "4")]
pub children: u32,
#[prost(uint64, optional, tag = "5")]
pub conn_wait_millis: ::core::option::Option<u64>,
#[prost(uint64, optional, tag = "6")]
pub permit_wait_millis: ::core::option::Option<u64>,
#[prost(uint64, optional, tag = "7")]
pub entity_count: ::core::option::Option<u64>,
}
/// Generated client implementations.
pub mod stream_client {
Expand Down Expand Up @@ -104,7 +118,7 @@ pub mod stream_client {
&mut self,
request: impl tonic::IntoRequest<super::Request>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::Trace>>,
tonic::Response<tonic::codec::Streaming<super::Response>>,
tonic::Status,
> {
self.inner
Expand Down Expand Up @@ -136,7 +150,7 @@ pub mod stream_server {
pub trait Stream: Send + Sync + 'static {
/// Server streaming response type for the QueryTrace method.
type QueryTraceStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::Trace, tonic::Status>,
Item = std::result::Result<super::Response, tonic::Status>,
>
+ Send
+ 'static;
Expand Down Expand Up @@ -229,7 +243,7 @@ pub mod stream_server {
struct QueryTraceSvc<T: Stream>(pub Arc<T>);
impl<T: Stream> tonic::server::ServerStreamingService<super::Request>
for QueryTraceSvc<T> {
type Response = super::Trace;
type Response = super::Response;
type ResponseStream = T::QueryTraceStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
Expand Down
24 changes: 8 additions & 16 deletions node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ async fn main() {
// Obtain metrics server port
let metrics_port = opt.metrics_port;

// Obtain tracing server port
let tracing_grpc_port = opt.grpc_port;

// Obtain the fork base URL
let fork_base = match &opt.fork_base {
Some(url) => {
Expand Down Expand Up @@ -517,9 +520,11 @@ async fn main() {
// Run the index node server
graph::spawn(async move { index_node_server.start(index_node_port).await });

graph::spawn(async move {
graph_server_grpc::start(8888).await.unwrap();
});
if env_vars.enable_tracing_grpc_server {
graph::spawn(async move {
graph_server_grpc::start(tracing_grpc_port).await.unwrap();
});
}

graph::spawn(async move {
metrics_server
Expand Down Expand Up @@ -564,19 +569,6 @@ async fn main() {
}
});

let mut rx = TRACING_CONTROL.subscribe(DeploymentId(1)).await;
loop {
let trace = rx.recv().await;
match trace {
Some(trace) => {
info!(&logger, "#### trace: {:?}", trace);
}
None => {
break;
}
}
}

graph::futures03::future::pending::<()>().await;
}

Expand Down
7 changes: 7 additions & 0 deletions node/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ pub struct Opt {
help = "Port for the Prometheus metrics server"
)]
pub metrics_port: u16,
#[clap(
long,
default_value = "8060",
value_name = "PORT",
help = "Port for the query tracing GRPC Server"
)]
pub grpc_port: u16,
#[clap(
long,
default_value = "default",
Expand Down
Loading

0 comments on commit 7ecd327

Please sign in to comment.