Skip to content

Commit

Permalink
[bridge-indexer] - Bridge indexer cleanup refactoring 1/n (#18439)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
patrickkuo authored Jul 1, 2024
1 parent 5428392 commit f262288
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 84 deletions.
2 changes: 1 addition & 1 deletion crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ sui-types.workspace = true
telemetry-subscribers.workspace = true
tracing.workspace = true
backoff.workspace = true
sui-config.workspace = true

[dev-dependencies]
sui-types = { workspace = true, features = ["test-utils"] }
sui-config.workspace = true
sui-test-transaction-builder.workspace = true
test-cluster.workspace = true
hex-literal = "0.3.4"
Expand Down
29 changes: 10 additions & 19 deletions crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use serde::Deserialize;
use std::{env, fs, path::Path};
use serde::{Deserialize, Serialize};
use std::env;

/// config as loaded from `config.yaml`.
#[derive(Debug, Clone, Deserialize)]
pub struct Config {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct IndexerConfig {
pub remote_store_url: String,
pub eth_rpc_url: String,
pub db_url: Option<String>,
#[serde(default = "default_db_url")]
pub db_url: String,
pub checkpoints_path: String,
pub concurrency: u64,
pub bridge_genesis_checkpoint: u64,
Expand All @@ -21,17 +21,8 @@ pub struct Config {
pub sui_rpc_url: Option<String>,
}

/// Load the config to run.
pub fn load_config(path: &Path) -> Result<Config> {
let reader = fs::File::open(path)?;
let mut config: Config = serde_yaml::from_reader(reader)?;
if let Ok(db_url) = env::var("DB_URL") {
config.db_url = Some(db_url);
} else {
match config.db_url.as_ref() {
Some(_) => (),
None => panic!("db_url must be set in config or via the $DB_URL env var"),
}
}
Ok(config.clone())
impl sui_config::Config for IndexerConfig {}

pub fn default_db_url() -> String {
env::var("DB_URL").expect("db_url must be set in config or via the $DB_URL env var")
}
14 changes: 7 additions & 7 deletions crates/sui-bridge-indexer/src/eth_worker.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::config::Config;
use crate::config::IndexerConfig;
use crate::latest_eth_syncer::LatestEthSyncer;
use crate::metrics::BridgeIndexerMetrics;
use crate::postgres_manager::get_latest_eth_token_transfer;
use crate::postgres_manager::{write, PgPool};
use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus};
use anyhow::Result;
use anyhow::{anyhow, Result};
use ethers::providers::Provider;
use ethers::providers::{Http, Middleware};
use ethers::types::Address as EthAddress;
Expand All @@ -30,16 +30,16 @@ pub struct EthBridgeWorker {
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
bridge_address: EthAddress,
config: Config,
config: IndexerConfig,
}

impl EthBridgeWorker {
pub fn new(
pg_pool: PgPool,
bridge_metrics: Arc<BridgeMetrics>,
metrics: BridgeIndexerMetrics,
config: Config,
) -> Result<Self, Box<dyn std::error::Error>> {
config: IndexerConfig,
) -> Result<Self, anyhow::Error> {
let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;

let provider = Arc::new(
Expand Down Expand Up @@ -75,7 +75,7 @@ impl EthBridgeWorker {
EthSyncer::new(eth_client, finalized_contract_addresses)
.run(self.bridge_metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;
.map_err(|e| anyhow!(format!("{e:?}")))?;

let provider_clone = self.provider.clone();
let pg_pool_clone = self.pg_pool.clone();
Expand Down Expand Up @@ -120,7 +120,7 @@ impl EthBridgeWorker {
)
.run(self.metrics.clone())
.await
.map_err(|e| anyhow::anyhow!(format!("{:?}", e)))?;
.map_err(|e| anyhow!(format!("{e:?}")))?;

let provider_clone = self.provider.clone();
let pg_pool_clone = self.pg_pool.clone();
Expand Down
32 changes: 15 additions & 17 deletions crates/sui-bridge-indexer/src/latest_eth_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
//! only query from that block number onwards. The syncer also keeps track of the last
//! block on Ethereum and will only query for events up to that block number.
use ethers::providers::{Http, Middleware, Provider};
use ethers::providers::{Http, JsonRpcClient, Middleware, Provider};
use ethers::types::Address as EthAddress;
use mysten_metrics::metered_channel::{channel, Receiver, Sender};
use mysten_metrics::spawn_logged_monitored_task;
use std::cmp::min;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -38,7 +40,7 @@ pub type EthTargetAddresses = HashMap<EthAddress, u64>;
#[allow(clippy::new_without_default)]
impl<P> LatestEthSyncer<P>
where
P: ethers::providers::JsonRpcClient + 'static,
P: JsonRpcClient + 'static,
{
pub fn new(
eth_client: Arc<EthClient<P>>,
Expand All @@ -57,9 +59,9 @@ where
metrics: BridgeIndexerMetrics,
) -> BridgeResult<(
Vec<JoinHandle<()>>,
mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec<RawEthLog>)>,
Receiver<(EthAddress, u64, Vec<RawEthLog>)>,
)> {
let (eth_evnets_tx, eth_events_rx) = mysten_metrics::metered_channel::channel(
let (eth_evnets_tx, eth_events_rx) = channel(
ETH_EVENTS_CHANNEL_SIZE,
&mysten_metrics::get_metrics()
.unwrap()
Expand Down Expand Up @@ -91,27 +93,26 @@ where
contract_address: EthAddress,
mut start_block: u64,
provider: Arc<Provider<Http>>,
events_sender: mysten_metrics::metered_channel::Sender<(EthAddress, u64, Vec<RawEthLog>)>,
events_sender: Sender<(EthAddress, u64, Vec<RawEthLog>)>,
eth_client: Arc<EthClient<P>>,
metrics: BridgeIndexerMetrics,
) {
tracing::info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
info!(contract_address=?contract_address, "Starting eth events listening task from block {start_block}");
let mut interval = time::interval(BLOCK_QUERY_INTERVAL);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let Ok(Ok(new_block)) = retry_with_max_elapsed_time!(
provider.get_block_number(),
time::Duration::from_secs(600)
) else {
let Ok(Ok(new_block)) =
retry_with_max_elapsed_time!(provider.get_block_number(), Duration::from_secs(600))
else {
error!("Failed to get latest block from eth client after retry");
continue;
};

let new_block: u64 = new_block.as_u64();

if new_block < start_block {
tracing::info!(
info!(
contract_address=?contract_address,
"New block {} is smaller than start block {}, ignore",
new_block,
Expand All @@ -121,8 +122,7 @@ where
}

// Each query does at most ETH_LOG_QUERY_MAX_BLOCK_RANGE blocks.
let end_block =
std::cmp::min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block);
let end_block = min(start_block + ETH_LOG_QUERY_MAX_BLOCK_RANGE - 1, new_block);
let timer = Instant::now();
let Ok(Ok(events)) = retry_with_max_elapsed_time!(
eth_client.get_raw_events_in_range(contract_address, start_block, end_block),
Expand Down Expand Up @@ -152,11 +152,9 @@ where
.await
.expect("All Eth event channel receivers are closed");
if len != 0 {
tracing::info!(
info!(
?contract_address,
start_block,
end_block,
"Observed {len} new Eth events",
start_block, end_block, "Observed {len} new Eth events",
);
}
if let Some(last_block) = last_block {
Expand Down
46 changes: 21 additions & 25 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,28 @@ use anyhow::Result;
use clap::*;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use std::collections::{HashMap, HashSet};
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge_indexer::eth_worker::EthBridgeWorker;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{
get_connection_pool, read_sui_progress_store, PgProgressStore,
};
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transcations_loop;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;
use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics};
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use sui_sdk::SuiClientBuilder;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::task::JoinHandle;

use mysten_metrics::metered_channel::channel;
use sui_bridge_indexer::config::IndexerConfig;
use sui_config::Config;
use tokio::sync::oneshot;
use tracing::info;

Expand All @@ -51,7 +53,7 @@ async fn main() -> Result<()> {
.expect("Couldn't get current directory")
.join("config.yaml")
};
let config = load_config(&config_path).unwrap();
let config = IndexerConfig::load(&config_path)?;
let config_clone = config.clone();

// Init metrics server
Expand All @@ -60,7 +62,7 @@ async fn main() -> Result<()> {
.parse()
.unwrap_or_else(|err| panic!("Failed to parse metric address: {}", err)),
);
let registry: Registry = registry_service.default_registry();
let registry = registry_service.default_registry();

mysten_metrics::init_metrics(&registry);

Expand All @@ -73,64 +75,58 @@ async fn main() -> Result<()> {
let bridge_metrics = Arc::new(BridgeMetrics::new(&registry));

// unwrap safe: db_url must be set in `load_config` above
let db_url = config.db_url.clone().unwrap();
let db_url = config.db_url.clone();

// TODO: retry_with_max_elapsed_time
let eth_worker = EthBridgeWorker::new(
get_connection_pool(db_url.clone()),
bridge_metrics.clone(),
indexer_meterics.clone(),
config.clone(),
)
.unwrap();
)?;

let eth_client = Arc::new(
EthClient::<ethers::providers::Http>::new(
&config.eth_rpc_url,
HashSet::from_iter(vec![eth_worker.bridge_address()]),
bridge_metrics.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
.await?,
);

let unfinalized_handle = eth_worker
.start_indexing_unfinalized_events(eth_client.clone())
.await
.unwrap();
.await?;
let finalized_handle = eth_worker
.start_indexing_finalized_events(eth_client.clone())
.await
.unwrap();
.await?;
let handles = vec![unfinalized_handle, finalized_handle];

if let Some(sui_rpc_url) = config.sui_rpc_url.clone() {
start_processing_sui_checkpoints_by_querying_txes(
start_processing_sui_checkpoints_by_querying_txns(
sui_rpc_url,
db_url.clone(),
indexer_meterics.clone(),
bridge_metrics,
)
.await
.unwrap();
.await?;
} else {
let _ = start_processing_sui_checkpoints(
start_processing_sui_checkpoints(
&config_clone,
db_url,
indexer_meterics,
ingestion_metrics,
)
.await;
.await?;
}

// We are not waiting for the sui tasks to finish here, which is ok.
let _ = futures::future::join_all(handles).await;
futures::future::join_all(handles).await;

Ok(())
}

async fn start_processing_sui_checkpoints(
config: &sui_bridge_indexer::config::Config,
config: &sui_bridge_indexer::config::IndexerConfig,
db_url: String,
indexer_meterics: BridgeIndexerMetrics,
ingestion_metrics: DataIngestionMetrics,
Expand Down Expand Up @@ -165,14 +161,14 @@ async fn start_processing_sui_checkpoints(
.await
}

async fn start_processing_sui_checkpoints_by_querying_txes(
async fn start_processing_sui_checkpoints_by_querying_txns(
sui_rpc_url: String,
db_url: String,
indexer_metrics: BridgeIndexerMetrics,
bridge_metrics: Arc<BridgeMetrics>,
) -> Result<Vec<JoinHandle<()>>> {
let pg_pool = get_connection_pool(db_url.clone());
let (tx, rx) = mysten_metrics::metered_channel::channel(
let (tx, rx) = channel(
100,
&mysten_metrics::get_metrics()
.unwrap()
Expand All @@ -188,7 +184,7 @@ async fn start_processing_sui_checkpoints_by_querying_txes(
"start_sui_tx_polling_task"
));
handles.push(spawn_logged_monitored_task!(
handle_sui_transcations_loop(pg_pool.clone(), rx, indexer_metrics.clone()),
handle_sui_transactions_loop(pg_pool.clone(), rx, indexer_metrics.clone()),
"handle_sui_transcations_loop"
));
Ok(handles)
Expand Down
Loading

0 comments on commit f262288

Please sign in to comment.