Skip to content

Commit

Permalink
fix(hubble): improve reorg handling - use indexer_id
Browse files Browse the repository at this point in the history
  • Loading branch information
qlp committed Sep 12, 2024
1 parent 4558679 commit db4ac33
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 97 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

4 changes: 2 additions & 2 deletions hubble/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl IndexerConfig {
Self::EthFork(cfg) => &cfg.label,
Self::Arb(cfg) => &cfg.label,
Self::Scroll(cfg) => &cfg.label,
Self::DummyFetcher(cfg) => &cfg.label,
Self::EthFetcher(cfg) => &cfg.label,
Self::DummyFetcher(cfg) => &cfg.indexer_id,
Self::EthFetcher(cfg) => &cfg.indexer_id,
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions hubble/src/fetcher/dummy/config.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use color_eyre::eyre::Report;
use sqlx::PgPool;
use unionlabs::aptos::block_info::BlockHeight;

use super::dummy::{DummyContext, DummyFetcherClient};
use crate::fetcher::fetcher::Indexer;
use crate::fetcher::fetcher::{Indexer, IndexerId};

#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config {
pub label: String,
pub start_height: u64,
pub indexer_id: IndexerId,
pub start_height: BlockHeight,
}

impl Config {
pub async fn build(self, pg_pool: PgPool) -> Result<Indexer<DummyFetcherClient>, Report> {
Ok(Indexer::new(
pg_pool,
self.label,
self.indexer_id,
self.start_height,
5,
DummyContext { bla: 42 },
Expand Down
10 changes: 5 additions & 5 deletions hubble/src/fetcher/eth/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use sqlx::PgPool;
use url::Url;

use super::eth::{EthContext, EthFetcherClient};
use crate::fetcher::fetcher::{BlockHeight, Indexer};
use crate::fetcher::fetcher::{BlockHeight, Indexer, IndexerId};

const DEFAULT_CHUNK_SIZE: usize = 200;

#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config {
pub label: String,
pub start_height: u64,
pub indexer_id: IndexerId,
pub start_height: BlockHeight,
pub chunk_size: Option<usize>,
pub urls: Vec<Url>,
pub internal_chain_id: Option<i32>,
Expand All @@ -19,7 +19,7 @@ pub struct Config {

impl Config {
pub async fn build(self, pg_pool: PgPool) -> Result<Indexer<EthFetcherClient>, Report> {
// temporary safety-measure to fetch the start height from the contracts table,
// temporary safety-measure to fetch the start height from the contracts table,
// because there will be no chain_state record after migrations

let start_height = match self.internal_chain_id {
Expand Down Expand Up @@ -54,7 +54,7 @@ impl Config {

Ok(Indexer::new(
pg_pool,
self.label,
self.indexer_id,
start_height,
self.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE),
EthContext { urls: self.urls },
Expand Down
37 changes: 20 additions & 17 deletions hubble/src/fetcher/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub enum FetcherError {
NoBlock(BlockHeight),
}

pub type IndexerId = String;
pub type BlockHeight = u64;
pub type BlockHash = String;
pub type BlockTimestamp = OffsetDateTime;
Expand Down Expand Up @@ -106,7 +107,7 @@ pub enum FetchMessage<T: BlockHandle> {
#[derive(Clone)]
pub struct Indexer<T: FetcherClient> {
pg_pool: sqlx::PgPool,
label: String,
indexer_id: IndexerId,
start_height: BlockHeight,
chunk_size: usize,
context: T::Context,
Expand All @@ -115,14 +116,14 @@ pub struct Indexer<T: FetcherClient> {
impl<T: FetcherClient> Indexer<T> {
pub fn new(
pg_pool: sqlx::PgPool,
label: String,
indexer_id: IndexerId,
start_height: BlockHeight,
chunk_size: usize,
context: T::Context,
) -> Self {
Indexer {
pg_pool,
label,
indexer_id,
start_height,
chunk_size,
context,
Expand Down Expand Up @@ -279,7 +280,7 @@ impl<T: FetcherClient> Indexer<T> {

update_current_height(
&mut tx,
self.label.clone(),
self.indexer_id.clone(),
block_handle.height()?,
block_handle.timestamp()?,
)
Expand Down Expand Up @@ -330,7 +331,7 @@ impl<T: FetcherClient> Indexer<T> {
debug!("{}: update height", block_handle.height()?);
update_current_height(
&mut tx,
self.label.clone(),
self.indexer_id.clone(),
block_handle.height()?,
block_handle.timestamp()?,
)
Expand All @@ -339,7 +340,7 @@ impl<T: FetcherClient> Indexer<T> {
debug!("{}: update status", block_handle.height()?);
update_block_status(
&mut tx,
self.label.clone(),
self.indexer_id.clone(),
block_handle.height()?,
block_handle.hash()?,
block_handle.timestamp()?,
Expand Down Expand Up @@ -482,10 +483,12 @@ impl<T: FetcherClient> Indexer<T> {

if let Some(old_hash) = match finalized {
true => {
delete_block_status(&mut tx, self.label.clone(), block_height).await?
delete_block_status(&mut tx, self.indexer_id.clone(), block_height)
.await?
}
false => {
get_block_status_hash(&mut tx, self.label.clone(), block_height).await?
get_block_status_hash(&mut tx, self.indexer_id.clone(), block_height)
.await?
}
} {
if old_hash != block_handle.hash()? {
Expand All @@ -505,7 +508,7 @@ impl<T: FetcherClient> Indexer<T> {
debug!("{}: update status", block_height);
update_block_status(
&mut tx,
self.label.clone(),
self.indexer_id.clone(),
block_height,
block_handle.hash()?,
block_handle.timestamp()?,
Expand Down Expand Up @@ -538,7 +541,7 @@ impl<T: FetcherClient> Indexer<T> {

async fn block_range_to_finalize(&self) -> Result<Option<BlockRange>, Report> {
let mut tx = self.pg_pool.begin().await?;
let result = get_block_range_to_finalize(&mut tx, self.label.clone()).await?;
let result = get_block_range_to_finalize(&mut tx, self.indexer_id.clone()).await?;
tx.commit().await?;

Ok(result)
Expand All @@ -550,7 +553,7 @@ impl<T: FetcherClient> Indexer<T> {
) -> Result<Option<BlockHeight>, Report> {
let mut tx = self.pg_pool.begin().await?;
let result =
get_next_block_to_refresh(&mut tx, self.label.clone(), consensus_height).await?;
get_next_block_to_refresh(&mut tx, self.indexer_id.clone(), consensus_height).await?;
tx.commit().await?;

Ok(result)
Expand Down Expand Up @@ -673,23 +676,23 @@ impl<T: FetcherClient> Indexer<T> {

async fn block_range_to_fix(&self) -> Result<Option<BlockRange>, Report> {
let mut tx = self.pg_pool.begin().await?;
let result = get_block_range_to_fix(&mut tx, self.label.clone()).await?;
let result = get_block_range_to_fix(&mut tx, self.indexer_id.clone()).await?;
tx.commit().await?;

Ok(result)
}

async fn remove_blocks_to_fix(&self, range: BlockRange) -> Result<(), Report> {
let mut tx = self.pg_pool.begin().await?;
update_block_range_to_fix(&mut tx, self.label.clone(), range).await?;
update_block_range_to_fix(&mut tx, self.indexer_id.clone(), range).await?;
tx.commit().await?;

Ok(())
}

async fn next_height(&self) -> Result<BlockHeight, Report> {
let mut tx = self.pg_pool.begin().await?;
let result = get_current_height(&mut tx, self.label.clone())
let result = get_current_height(&mut tx, self.indexer_id.clone())
.await?
.map(|current_height| current_height + 1) // we store the last indexed height, so next is one heigher
.unwrap_or(self.start_height);
Expand All @@ -708,16 +711,16 @@ impl<T: FetcherClient> Indexer<T> {
Ok(Err(err)) => {
error!(
"{}: error: {:?}. re-initialize (client: {}, context: {})",
self.label, err, fetcher_client, self.context
self.indexer_id, err, fetcher_client, self.context
);
join_set.abort_all();
sleep(Duration::from_secs(1)).await;
info!("{}: restarting", self.label);
info!("{}: restarting", self.indexer_id);
return Ok(EndOfRunResult::Restart);
}
Err(err) => return Err(err.into()),
Ok(Ok(_)) => {
info!("{}: indexer exited gracefully", self.label);
info!("{}: indexer exited gracefully", self.indexer_id);
}
}
}
Expand Down
Loading

0 comments on commit db4ac33

Please sign in to comment.