Skip to content

Commit

Permalink
graph: Add retries for FirehoseEndpoint::load_blocks_by_numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Feb 21, 2025
1 parent 3c448de commit b27502c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
13 changes: 13 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ pub struct EnvVars {
pub firehose_disable_extended_blocks_for_chains: Vec<String>,

pub block_write_capacity: usize,

/// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_RETRY_LIMIT`.
/// The default value is 10.
pub firehose_block_fetch_retry_limit: usize,
/// Set by the environment variable `GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS`.
/// The default value is 60 seconds.
pub firehose_block_fetch_timeout: u64,
}

impl EnvVars {
Expand Down Expand Up @@ -330,6 +337,8 @@ impl EnvVars {
inner.firehose_disable_extended_blocks_for_chains,
),
block_write_capacity: inner.block_write_capacity.0,
firehose_block_fetch_retry_limit: inner.firehose_block_fetch_retry_limit,
firehose_block_fetch_timeout: inner.firehose_block_fetch_timeout,
})
}

Expand Down Expand Up @@ -493,6 +502,10 @@ struct Inner {
firehose_disable_extended_blocks_for_chains: Option<String>,
#[envconfig(from = "GRAPH_NODE_BLOCK_WRITE_CAPACITY", default = "4_000_000_000")]
block_write_capacity: NoUnderscores<usize>,
#[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_RETRY_LIMIT", default = "10")]
firehose_block_fetch_retry_limit: usize,
#[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_TIMEOUT_SECS", default = "60")]
firehose_block_fetch_timeout: u64,
}

#[derive(Clone, Debug)]
Expand Down
33 changes: 26 additions & 7 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_trait::async_trait;
use futures03::StreamExt;
use http0::uri::{Scheme, Uri};
use itertools::Itertools;
use slog::{error, info, Logger};
use slog::{error, info, trace, Logger};
use std::{collections::HashMap, fmt::Display, ops::ControlFlow, sync::Arc, time::Duration};
use tokio::sync::OnceCell;
use tonic::codegen::InterceptedService;
Expand All @@ -33,6 +33,7 @@ use crate::components::network_provider::NetworkDetails;
use crate::components::network_provider::ProviderCheckStrategy;
use crate::components::network_provider::ProviderManager;
use crate::components::network_provider::ProviderName;
use crate::prelude::retry;

/// This is constant because we found this magic number of connections after
/// which the grpc connections start to hang.
Expand Down Expand Up @@ -425,7 +426,7 @@ impl FirehoseEndpoint {
}

pub async fn load_blocks_by_numbers<M>(
&self,
self: Arc<Self>,
numbers: Vec<u64>,
logger: &Logger,
) -> Result<Vec<M>, anyhow::Error>
Expand All @@ -435,21 +436,39 @@ impl FirehoseEndpoint {
let mut blocks = Vec::with_capacity(numbers.len());

for number in numbers {
debug!(
let provider_name = self.provider.as_str();

trace!(
logger,
"Loading block for block number {}", number;
"provider" => self.provider.as_str(),
"provider" => provider_name,
);

match self.get_block_by_number::<M>(number, logger).await {
let retry_log_message = format!("get_block_by_number for block {}", number);
let endpoint_for_retry = self.cheap_clone();

let logger_for_retry = logger.clone();
let logger_for_error = logger.clone();

let block = retry(retry_log_message, &logger_for_retry)
.limit(ENV_VARS.firehose_block_fetch_retry_limit)
.timeout_secs(ENV_VARS.firehose_block_fetch_timeout)
.run(move || {
let e = endpoint_for_retry.cheap_clone();
let l = logger_for_retry.clone();
async move { e.get_block_by_number::<M>(number, &l).await }
})
.await;

match block {
Ok(block) => {
blocks.push(block);
}
Err(e) => {
error!(
logger,
logger_for_error,
"Failed to load block number {}: {}", number, e;
"provider" => self.provider.as_str(),
"provider" => provider_name,
);
return Err(anyhow::format_err!(
"failed to load block number {}: {}",
Expand Down

0 comments on commit b27502c

Please sign in to comment.