diff --git a/Cargo.lock b/Cargo.lock index 58f20a4372..d192e9ae67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1827,9 +1827,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.79" +version = "0.1.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" +checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", @@ -6218,6 +6218,7 @@ name = "hubble" version = "0.1.0" dependencies = [ "alloy", + "async-trait", "axum 0.6.20", "backon", "base64 0.21.7", diff --git a/dictionary.txt b/dictionary.txt index 1265017312..9e3c7ff22c 100644 --- a/dictionary.txt +++ b/dictionary.txt @@ -540,6 +540,7 @@ fetchurl fetchzip fields fileset +finalizer flamegraph fleek floorlog diff --git a/hubble/.sqlx/query-01c82e1e1eb476c1285d3c24347115e794f29318aaca341c04261f81bcd1a903.json b/hubble/.sqlx/query-01c82e1e1eb476c1285d3c24347115e794f29318aaca341c04261f81bcd1a903.json new file mode 100644 index 0000000000..78f5e639f4 --- /dev/null +++ b/hubble/.sqlx/query-01c82e1e1eb476c1285d3c24347115e794f29318aaca341c04261f81bcd1a903.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM v0.logs WHERE chain_id = $1 AND height = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Int4" + ] + }, + "nullable": [] + }, + "hash": "01c82e1e1eb476c1285d3c24347115e794f29318aaca341c04261f81bcd1a903" +} diff --git a/hubble/.sqlx/query-28002e29c0654545286e0c1aff7fefc27655e8c4f347b5ab79a33f6739b6f31d.json b/hubble/.sqlx/query-28002e29c0654545286e0c1aff7fefc27655e8c4f347b5ab79a33f6739b6f31d.json new file mode 100644 index 0000000000..694969f7a2 --- /dev/null +++ b/hubble/.sqlx/query-28002e29c0654545286e0c1aff7fefc27655e8c4f347b5ab79a33f6739b6f31d.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT start_height, max(end_height) as end_height\n FROM hubble.block_fix\n WHERE start_height = (\n SELECT min(start_height)\n FROM hubble.block_fix\n WHERE indexer_id = $1\n )\n GROUP BY start_height\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "start_height", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "end_height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "28002e29c0654545286e0c1aff7fefc27655e8c4f347b5ab79a33f6739b6f31d" +} diff --git a/hubble/.sqlx/query-2b8671fae8d787b0815d58fac90c8cc344b4d2d666acba2ee980b3d7fce6b2f1.json b/hubble/.sqlx/query-2b8671fae8d787b0815d58fac90c8cc344b4d2d666acba2ee980b3d7fce6b2f1.json new file mode 100644 index 0000000000..1ba7498425 --- /dev/null +++ b/hubble/.sqlx/query-2b8671fae8d787b0815d58fac90c8cc344b4d2d666acba2ee980b3d7fce6b2f1.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM hubble.block_status\n WHERE indexer_id = $1 AND height = $2\n RETURNING hash\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hash", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2b8671fae8d787b0815d58fac90c8cc344b4d2d666acba2ee980b3d7fce6b2f1" +} diff --git a/hubble/.sqlx/query-449af1f5d6d35ac5ec7600bc85fc0fdd9786d09a13fde5de6b525ceb47eecd11.json b/hubble/.sqlx/query-449af1f5d6d35ac5ec7600bc85fc0fdd9786d09a13fde5de6b525ceb47eecd11.json new file mode 100644 index 0000000000..d41b10580a --- /dev/null +++ b/hubble/.sqlx/query-449af1f5d6d35ac5ec7600bc85fc0fdd9786d09a13fde5de6b525ceb47eecd11.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT MAX(height) as height\n FROM hubble.contract_status\n WHERE internal_chain_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + null + ] + }, + "hash": "449af1f5d6d35ac5ec7600bc85fc0fdd9786d09a13fde5de6b525ceb47eecd11" +} diff --git a/hubble/.sqlx/query-5456a84ffdf5ebecc099b99333561c6999c4dbebb73e39d6532d03951dad244a.json b/hubble/.sqlx/query-5456a84ffdf5ebecc099b99333561c6999c4dbebb73e39d6532d03951dad244a.json new file mode 100644 index 0000000000..10bbb1dccf --- /dev/null +++ b/hubble/.sqlx/query-5456a84ffdf5ebecc099b99333561c6999c4dbebb73e39d6532d03951dad244a.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT height height\n FROM hubble.block_status\n WHERE indexer_id = $1 AND height > $2\n ORDER BY updated_at\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "5456a84ffdf5ebecc099b99333561c6999c4dbebb73e39d6532d03951dad244a" +} diff --git a/hubble/.sqlx/query-59bfa1ad1e07273027c8569988beef9f0b0d3eafb44d987130d06a38c034fef6.json b/hubble/.sqlx/query-59bfa1ad1e07273027c8569988beef9f0b0d3eafb44d987130d06a38c034fef6.json new file mode 100644 index 0000000000..e9c2608377 --- /dev/null +++ b/hubble/.sqlx/query-59bfa1ad1e07273027c8569988beef9f0b0d3eafb44d987130d06a38c034fef6.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT height\n FROM hubble.indexer_status\n WHERE indexer_id = $1\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "59bfa1ad1e07273027c8569988beef9f0b0d3eafb44d987130d06a38c034fef6" +} diff --git a/hubble/.sqlx/query-64266fb4ad7fa4dfe3489573740897685c23f7ffdb423457747f6e38c984035a.json b/hubble/.sqlx/query-64266fb4ad7fa4dfe3489573740897685c23f7ffdb423457747f6e38c984035a.json new file mode 100644 index 0000000000..a956b9174a --- /dev/null +++ b/hubble/.sqlx/query-64266fb4ad7fa4dfe3489573740897685c23f7ffdb423457747f6e38c984035a.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT MIN(height) min_height, MAX(height) max_height\n FROM hubble.block_status\n WHERE indexer_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "min_height", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "max_height", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "64266fb4ad7fa4dfe3489573740897685c23f7ffdb423457747f6e38c984035a" +} diff --git a/hubble/.sqlx/query-66098f24ec521e0b7ae5baed7273767a0acbcaccb4907ea358b1417fcdfb9f1a.json b/hubble/.sqlx/query-66098f24ec521e0b7ae5baed7273767a0acbcaccb4907ea358b1417fcdfb9f1a.json new file mode 100644 index 0000000000..26b897262e --- /dev/null +++ b/hubble/.sqlx/query-66098f24ec521e0b7ae5baed7273767a0acbcaccb4907ea358b1417fcdfb9f1a.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT cl.transaction_hash, cl.height, cl.log_index, cl.client_id\n FROM v1_evm.client_created cl\n WHERE cl.internal_chain_id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "transaction_hash", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "height", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "log_index", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "client_id", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + true, + true, + true, + true + ] + }, + "hash": "66098f24ec521e0b7ae5baed7273767a0acbcaccb4907ea358b1417fcdfb9f1a" +} diff --git a/hubble/.sqlx/query-6b41e53bdcfd1235f3cdc46a8d4ed6627f1d79b682ba4c0f5f47732df3bf2b0c.json b/hubble/.sqlx/query-6b41e53bdcfd1235f3cdc46a8d4ed6627f1d79b682ba4c0f5f47732df3bf2b0c.json new file mode 100644 index 0000000000..91b951a388 --- /dev/null +++ b/hubble/.sqlx/query-6b41e53bdcfd1235f3cdc46a8d4ed6627f1d79b682ba4c0f5f47732df3bf2b0c.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM hubble.block_fix\n WHERE indexer_id = $1\n AND start_height = $2\n AND end_height <= $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "6b41e53bdcfd1235f3cdc46a8d4ed6627f1d79b682ba4c0f5f47732df3bf2b0c" +} diff --git a/hubble/.sqlx/query-8720e0b358cd12588bcf09347a506229f032457d0159324bc7d9f476ffa9e4b9.json b/hubble/.sqlx/query-8720e0b358cd12588bcf09347a506229f032457d0159324bc7d9f476ffa9e4b9.json new file mode 100644 index 0000000000..722b5e7268 --- /dev/null +++ b/hubble/.sqlx/query-8720e0b358cd12588bcf09347a506229f032457d0159324bc7d9f476ffa9e4b9.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO hubble.block_status (indexer_id, height, hash, timestamp)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (indexer_id, height) DO \n UPDATE SET\n hash = excluded.hash,\n timestamp = excluded.timestamp\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Text", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "8720e0b358cd12588bcf09347a506229f032457d0159324bc7d9f476ffa9e4b9" +} diff --git a/hubble/.sqlx/query-8b2d541673102e4e7010d5ed81c6f9f30476642596872a2d1abe1d13a7fde266.json b/hubble/.sqlx/query-8b2d541673102e4e7010d5ed81c6f9f30476642596872a2d1abe1d13a7fde266.json new file mode 100644 index 0000000000..82bbdc9662 --- /dev/null +++ b/hubble/.sqlx/query-8b2d541673102e4e7010d5ed81c6f9f30476642596872a2d1abe1d13a7fde266.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT hash FROM hubble.block_status\n WHERE indexer_id = $1 AND height = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hash", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8b2d541673102e4e7010d5ed81c6f9f30476642596872a2d1abe1d13a7fde266" +} diff --git a/hubble/.sqlx/query-c6e7eceed358235902b07e91e149c29178aa7ba7c4bbd704cb9c4080240be4c4.json b/hubble/.sqlx/query-c6e7eceed358235902b07e91e149c29178aa7ba7c4bbd704cb9c4080240be4c4.json new file mode 100644 index 0000000000..727094e927 --- /dev/null +++ b/hubble/.sqlx/query-c6e7eceed358235902b07e91e149c29178aa7ba7c4bbd704cb9c4080240be4c4.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE hubble.block_fix\n SET start_height = $3\n WHERE indexer_id = $1\n AND start_height = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "c6e7eceed358235902b07e91e149c29178aa7ba7c4bbd704cb9c4080240be4c4" +} diff --git a/hubble/.sqlx/query-d63f763014520aa4298d9602c36964f44613152ce4a06a204c17a7987bcde776.json b/hubble/.sqlx/query-d63f763014520aa4298d9602c36964f44613152ce4a06a204c17a7987bcde776.json new file mode 100644 index 0000000000..3b1d7a8555 --- /dev/null +++ b/hubble/.sqlx/query-d63f763014520aa4298d9602c36964f44613152ce4a06a204c17a7987bcde776.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO v0.clients (chain_id, client_id, counterparty_chain_id)\n VALUES ($1, $2, $3)\n ON CONFLICT DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int4", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "d63f763014520aa4298d9602c36964f44613152ce4a06a204c17a7987bcde776" +} diff --git a/hubble/.sqlx/query-d7bd3a8550dd532755fedb002f87be7bd2ff363f1b76794094690c6e022bec1c.json b/hubble/.sqlx/query-d7bd3a8550dd532755fedb002f87be7bd2ff363f1b76794094690c6e022bec1c.json new file mode 100644 index 0000000000..9502324410 --- /dev/null +++ b/hubble/.sqlx/query-d7bd3a8550dd532755fedb002f87be7bd2ff363f1b76794094690c6e022bec1c.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO hubble.indexer_status (indexer_id, height, timestamp)\n VALUES ($1, $2, $3)\n ON CONFLICT (indexer_id) DO \n UPDATE SET\n height = excluded.height,\n timestamp = excluded.timestamp\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "d7bd3a8550dd532755fedb002f87be7bd2ff363f1b76794094690c6e022bec1c" +} diff --git a/hubble/Cargo.toml b/hubble/Cargo.toml index 947926a899..5ad632cae0 100644 --- a/hubble/Cargo.toml +++ b/hubble/Cargo.toml @@ -19,6 +19,7 @@ path = "src/main.rs" [dependencies] alloy = { version = "0.1", features = ["full"] } +async-trait = "0.1.82" axum = { workspace = true, features = ["macros", "tokio"] } backon = "0.4.4" base64 = { workspace = true } diff --git a/hubble/hubble.nix b/hubble/hubble.nix index 01b6132a34..92814f4b73 100644 --- a/hubble/hubble.nix +++ b/hubble/hubble.nix @@ -55,6 +55,24 @@ indexers = mkOption { type = types.listOf ( types.submodule { + options.indexer_id = mkOption { + type = types.nullOr types.str; + description = "Id of the indexer which is used by the internal administration of Hubble. Should never change."; + example = "amazing-testnet"; + default = null; + }; + options.internal_chain_id = mkOption { + type = types.nullOr types.number; + description = "Hubble internal chain id, used to fetch the current height when migrating to fetchers."; + example = "4"; + default = null; + }; + options.new_chain_override = mkOption { + type = types.nullOr types.bool; + description = "Indicator that this is a new chain, so the current height must not be used when migrating to fetchers."; + example = "false"; + default = null; + }; options.label = mkOption { type = types.str; example = "something-custom"; }; options.filter = mkOption { type = types.nullOr types.str; @@ -97,7 +115,7 @@ options.chain_id = mkOption { type = types.nullOr types.str; example = "union-testnet-8"; default = null; }; options.grpc_url = mkOption { type = types.nullOr types.str; example = "https://grpc.example.com"; default = null; }; - options.type = mkOption { type = types.enum [ "tendermint" "ethereum" "beacon" "bera" "ethereum-fork" "arb" "scroll" ]; }; + options.type = mkOption { type = types.enum [ "tendermint" "ethereum" "beacon" "bera" "ethereum-fork" "arb" "scroll" "eth-fetcher" ]; }; options.start_height = mkOption { type = types.int; example = 1; default = 0; }; options.chunk_size = mkOption { type = types.int; example = 1; default = 200; }; options.until = mkOption { type = types.int; example = 1; default = 1000000000000; }; diff --git a/hubble/src/chain_id_query.rs b/hubble/src/chain_id_query.rs index fa12d8bc9e..acc2117e42 100644 --- a/hubble/src/chain_id_query.rs +++ b/hubble/src/chain_id_query.rs @@ -41,6 +41,8 @@ pub async fn tx(db: PgPool, indexers: Indexers) { for indexer in indexers { match indexer { + IndexerConfig::DummyFetcher(_) => {} + IndexerConfig::EthFetcher(_) => {} IndexerConfig::Scroll(_) => {} IndexerConfig::Arb(_) => {} IndexerConfig::Beacon(_) => {} diff --git a/hubble/src/cli.rs b/hubble/src/cli.rs index 81ba0b93ce..e2891cac45 100644 --- a/hubble/src/cli.rs +++ b/hubble/src/cli.rs @@ -78,6 +78,10 @@ pub enum IndexerConfig { Arb(crate::arb::Config), #[serde(rename = "scroll")] Scroll(crate::scroll::Config), + #[serde(rename = "dummy-fetcher")] + DummyFetcher(crate::indexer::dummy::config::Config), + #[serde(rename = "eth-fetcher")] + EthFetcher(crate::indexer::eth::config::Config), } impl IndexerConfig { @@ -90,6 +94,8 @@ impl IndexerConfig { Self::EthFork(cfg) => &cfg.label, Self::Arb(cfg) => &cfg.label, Self::Scroll(cfg) => &cfg.label, + Self::DummyFetcher(cfg) => &cfg.indexer_id, + Self::EthFetcher(cfg) => &cfg.indexer_id, } } } @@ -151,6 +157,22 @@ impl IndexerConfig { .instrument(indexer_span) .await } + Self::DummyFetcher(cfg) => { + cfg.build(db) + .instrument(initializer_span) + .await? + .index() + .instrument(indexer_span) + .await + } + Self::EthFetcher(cfg) => { + cfg.build(db) + .instrument(initializer_span) + .await? + .index() + .instrument(indexer_span) + .await + } } } } diff --git a/hubble/src/eth/client.rs b/hubble/src/eth/client.rs index 515eaf0362..ce20a948ec 100644 --- a/hubble/src/eth/client.rs +++ b/hubble/src/eth/client.rs @@ -1,6 +1,8 @@ #![allow(clippy::disallowed_types)] use alloy::{ eips::{BlockId, BlockNumberOrTag}, + network::{Ethereum, Network}, + primitives::TxHash, providers::{Provider, RootProvider}, rpc::types::{Block, BlockTransactionsKind, Filter, Log, TransactionReceipt}, transports::{ @@ -37,4 +39,12 @@ impl RaceClient>> { ) -> Result, RpcError> { self.race(|c| c.get_logs(filter)).await } + + pub async fn get_transaction_by_hash( + &self, + tx_hash: TxHash, + ) -> Result::TransactionResponse>, RpcError> + { + self.race_some(|c| c.get_transaction_by_hash(tx_hash)).await + } } diff --git a/hubble/src/eth/indexer.rs b/hubble/src/eth/indexer.rs index 733e2ac916..7013e8777c 100644 --- a/hubble/src/eth/indexer.rs +++ b/hubble/src/eth/indexer.rs @@ -593,16 +593,16 @@ impl BlockInsert { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TransactionInsert { - hash: String, - index: i32, - events: Vec, + pub hash: String, + pub index: i32, + pub events: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EventInsert { - data: serde_json::Value, - log_index: usize, - transaction_log_index: i32, + pub data: serde_json::Value, + pub log_index: usize, + pub transaction_log_index: i32, } pub trait ToLowerHex { diff --git a/hubble/src/indexer/api.rs b/hubble/src/indexer/api.rs new file mode 100644 index 0000000000..97cf570a81 --- /dev/null +++ b/hubble/src/indexer/api.rs @@ -0,0 +1,175 @@ +use std::{fmt::Display, ops::Range}; + +use axum::async_trait; +use color_eyre::eyre::Report; +use futures::Stream; +use sqlx::Postgres; +use time::OffsetDateTime; +use tokio::task::JoinSet; +use tracing::error; + +#[derive(Debug, thiserror::Error)] +pub enum IndexerError { + #[error("received unexpected height {0}: expecting {1}")] + UnexpectedHeightSingle(BlockHeight, BlockHeight), + #[error("received unexpected height {0} (range {1}): expecting {2}")] + UnexpectedHeightRange(BlockHeight, BlockRange, BlockHeight), + #[error("error reading block {0} (range {1}): {2}")] + ErrorReadingBlock(BlockHeight, BlockRange, Report), + #[error("expected to receive block {0} (range {1})")] + MissingBlock(BlockHeight, BlockRange), + #[error("received block while not expecting more (range {0}): {1}")] + TooManyBlocks(BlockRange, BlockReference), + #[error("received error while not expecting more (range {0}): {1}")] + TooManyBlocksError(BlockRange, Report), + #[error("no block at: {0}")] + NoBlock(BlockSelection), + #[error("database error: {0}")] + DatabaseError(sqlx::Error), + #[error("provider error: {0}")] + ProviderError(Report), + #[error("internal error: {0}")] + InternalError(Report), +} + +impl From for IndexerError { + fn from(error: Report) -> Self { + Self::InternalError(error) + } +} + +impl From for IndexerError { + fn from(error: sqlx::Error) -> Self { + Self::DatabaseError(error) + } +} + +pub type IndexerId = String; +pub type BlockHeight = u64; +pub type BlockHash = String; +pub type BlockTimestamp = OffsetDateTime; + +#[derive(Clone, Debug)] +pub struct BlockRange { + pub start_inclusive: BlockHeight, + pub end_exclusive: BlockHeight, +} + +impl BlockRange { + pub fn range_chunks(self, chunk_size: usize) -> impl Iterator { + let range: Range = self.clone().into(); + + range.step_by(chunk_size).map(move |start_inclusive| { + let end_exclusive = (start_inclusive + chunk_size as u64).min(self.end_exclusive); + (start_inclusive..end_exclusive).into() + }) + } + + pub fn len(&self) -> u64 { + self.end_exclusive - self.start_inclusive + } +} + +impl From> for BlockRange { + fn from(range: Range) -> Self { + Self { + start_inclusive: range.start, + end_exclusive: range.end, + } + } +} + +impl Into> for BlockRange { + fn into(self) -> Range { + self.start_inclusive..self.end_exclusive + } +} + +impl Display for BlockRange { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[{},{})", self.start_inclusive, self.end_exclusive) + } +} + +impl IntoIterator for BlockRange { + type Item = BlockHeight; + type IntoIter = Range; + + fn into_iter(self) -> Self::IntoIter { + self.into() + } +} + +#[derive(Clone, Copy)] +pub enum FetchMode { + Eager, + Lazy, +} + +#[derive(Debug)] +pub enum BlockSelection { + LastFinalized, + Height(BlockHeight), +} + +impl Display for BlockSelection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BlockSelection::LastFinalized => write!(f, "last-finalized"), + BlockSelection::Height(height) => write!(f, "{}", height), + } + } +} + +#[async_trait] +pub trait FetcherClient: Display + Send + Sync + Clone + Sized + 'static { + type BlockHandle: BlockHandle; + type Context: Display + Send + Sync + Clone + 'static; + + async fn create( + pg_pool: sqlx::PgPool, + join_set: &mut JoinSet>, + context: Self::Context, + ) -> Result; + + async fn fetch_single( + &self, + selection: BlockSelection, + mode: FetchMode, + ) -> Result; +} + +#[derive(Clone, Debug)] +pub struct BlockReference { + pub height: BlockHeight, + pub hash: BlockHash, + pub timestamp: BlockTimestamp, +} + +impl BlockReference { + pub fn new(height: BlockHeight, hash: BlockHash, timestamp: BlockTimestamp) -> BlockReference { + BlockReference { + height, + hash, + timestamp, + } + } +} + +impl Display for BlockReference { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.height) + } +} + +#[async_trait] +pub trait BlockHandle: Send + Sync + Sized { + fn reference(&self) -> BlockReference; + fn fetch_range( + &self, + range: BlockRange, + mode: FetchMode, + ) -> Result> + Send + Unpin, IndexerError>; + async fn insert(&self, tx: &mut sqlx::Transaction<'_, Postgres>) -> Result<(), IndexerError>; + async fn update(&self, tx: &mut sqlx::Transaction<'_, Postgres>) -> Result<(), IndexerError>; +} diff --git a/hubble/src/indexer/dummy/config.rs b/hubble/src/indexer/dummy/config.rs new file mode 100644 index 0000000000..7e49778e39 --- /dev/null +++ b/hubble/src/indexer/dummy/config.rs @@ -0,0 +1,24 @@ +use color_eyre::eyre::Report; +use sqlx::PgPool; +use unionlabs::aptos::block_info::BlockHeight; + +use super::dummy::{DummyContext, DummyFetcherClient}; +use crate::indexer::{api::IndexerId, Indexer}; + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct Config { + pub indexer_id: IndexerId, + pub start_height: BlockHeight, +} + +impl Config { + pub async fn build(self, pg_pool: PgPool) -> Result, Report> { + Ok(Indexer::new( + pg_pool, + self.indexer_id, + self.start_height, + 5, + DummyContext { bla: 42 }, + )) + } +} diff --git a/hubble/src/indexer/dummy/dummy.rs b/hubble/src/indexer/dummy/dummy.rs new file mode 100644 index 0000000000..88aca576d2 --- /dev/null +++ b/hubble/src/indexer/dummy/dummy.rs @@ -0,0 +1,178 @@ +use std::fmt::{Display, Formatter}; + +use axum::async_trait; +use color_eyre::eyre::Report; +use futures::{stream::FuturesOrdered, Stream}; +use sqlx::Postgres; +use time::OffsetDateTime; +use tokio::task::JoinSet; +use tracing::{debug, info}; + +use crate::indexer::api::{ + BlockHandle, BlockRange, BlockReference, BlockSelection, FetchMode, FetcherClient, IndexerError, +}; + +#[derive(Clone)] +pub struct DummyBlock { + reference: BlockReference, + dummy_client: DummyFetcherClient, + content: Option, +} + +#[async_trait] +impl BlockHandle for DummyBlock { + fn reference(&self) -> BlockReference { + self.reference.clone() + } + + fn fetch_range( + &self, + block_range: BlockRange, + mode: FetchMode, + ) -> Result>, IndexerError> { + debug!("fetch_range => {}", block_range); + + Ok(FuturesOrdered::from_iter(block_range.into_iter().map( + |height| async move { + let message = match self + .dummy_client + .fetch_single(BlockSelection::Height(height), mode) + .await + { + Ok(block) => { + info!("fetch_range - fetched: {}", height); + + Ok(block) + } + Err(report) => { + info!("fetch_range - error at {}: {}", height, report); + + Err(report) + } + }; + debug!("sending message for {}", height); + + message + }, + ))) + } + + async fn insert(&self, _tx: &mut sqlx::Transaction<'_, Postgres>) -> Result<(), IndexerError> { + let content = match self.content.clone() { + Some(content) => content, + None => { + self.dummy_client + .fetch_content(self.reference.clone()) + .await? + } + }; + + info!( + "chain: {} - insert: {} - {}", + self.dummy_client.internal_chain_id, + self.reference(), + content, + ); + + // sleep(Duration::from_millis(50)).await; + + Ok(()) + } + + async fn update(&self, _tx: &mut sqlx::Transaction<'_, Postgres>) -> Result<(), IndexerError> { + info!( + "chain: {} - update: {} - {}", + self.dummy_client.internal_chain_id, + self.reference(), + match self.content.clone() { + Some(content) => content, + None => + self.dummy_client + .fetch_content(self.reference.clone()) + .await?, + }, + ); + + // sleep(Duration::from_millis(50)).await; + + Ok(()) + } +} + +#[derive(Clone)] +pub struct DummyFetcherClient { + internal_chain_id: u32, +} + +impl Display for DummyFetcherClient { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "dummy: internal-chain-id: {}", self.internal_chain_id) + } +} + +impl DummyFetcherClient { + fn new(internal_chain_id: u32) -> Self { + DummyFetcherClient { internal_chain_id } + } + + async fn fetch_content(&self, block_reference: BlockReference) -> Result { + // sleep(Duration::from_millis(10)).await; + Ok(format!("{}", block_reference)) + } +} + +#[derive(Clone)] +pub struct DummyContext { + pub bla: u64, +} + +impl Display for DummyContext { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "dummy: bla: {}", self.bla) + } +} + +#[async_trait] +impl FetcherClient for DummyFetcherClient { + type BlockHandle = DummyBlock; + type Context = DummyContext; + + async fn create( + _pg_pool: sqlx::PgPool, + _join_set: &mut JoinSet>, + _context: DummyContext, + ) -> Result { + let fetcher_client = DummyFetcherClient::new(8); + + Ok(fetcher_client) + } + + async fn fetch_single( + &self, + selection: BlockSelection, + mode: FetchMode, + ) -> Result { + // sleep(Duration::from_millis(10)).await; + let reference = BlockReference::new( + match selection { + BlockSelection::LastFinalized => 43000 as u64, + BlockSelection::Height(height) => height, + }, + match selection { + BlockSelection::LastFinalized => "42".to_string(), + BlockSelection::Height(height) => format!("{}", height), + // BlockReference::Height(height) => format!("{}-{}", height, OffsetDateTime::now_utc()), + }, + OffsetDateTime::now_utc(), + ); + + Ok(DummyBlock { + reference: reference.clone(), + dummy_client: self.clone(), + content: match mode { + FetchMode::Eager => Some(self.fetch_content(reference.clone()).await?), + FetchMode::Lazy => None, + }, + }) + } +} diff --git a/hubble/src/indexer/dummy/mod.rs b/hubble/src/indexer/dummy/mod.rs new file mode 100644 index 0000000000..3367c39dec --- /dev/null +++ b/hubble/src/indexer/dummy/mod.rs @@ -0,0 +1,2 @@ +pub mod config; +mod dummy; diff --git a/hubble/src/indexer/eth/block_handle.rs b/hubble/src/indexer/eth/block_handle.rs new file mode 100644 index 0000000000..a37230b93f --- /dev/null +++ b/hubble/src/indexer/eth/block_handle.rs @@ -0,0 +1,135 @@ +use alloy::rpc::types::Block; +use axum::async_trait; +use color_eyre::eyre::Report; +use const_hex::ToHexExt; +use futures::{stream::FuturesOrdered, Stream}; +use sqlx::Postgres; +use tracing::debug; + +use super::fetcher_client::EthFetcherClient; +use crate::{ + eth::BlockInsert, + indexer::{ + api::{BlockHandle, BlockRange, BlockReference, BlockSelection, FetchMode, IndexerError}, + eth::postgres::delete_eth_log, + }, + postgres::{insert_batch_logs, update_contracts_indexed_heights, InsertMode}, +}; + +#[derive(Clone)] +pub enum BlockDetails { + Lazy(Block), + Eager(Option), +} + +#[derive(Clone)] +pub struct EthBlockHandle { + pub reference: BlockReference, + pub details: BlockDetails, + pub eth_client: EthFetcherClient, + pub provider_index: usize, +} + +impl EthBlockHandle { + async fn get_block_insert(&self) -> Result, Report> { + Ok(match self.details.clone() { + BlockDetails::Eager(block_insert) => block_insert, + BlockDetails::Lazy(block) => { + self.eth_client + .fetch_details(&block, self.provider_index) + .await? + } + }) + } +} + +#[async_trait] +impl BlockHandle for EthBlockHandle { + fn reference(&self) -> BlockReference { + self.reference.clone() + } + + fn fetch_range( + &self, + block_range: BlockRange, + fetch_mode: FetchMode, + ) -> Result>, IndexerError> { + debug!("{}: fetching", block_range); + + Ok(FuturesOrdered::from_iter( + block_range.clone().into_iter().map(|height| async move { + self.eth_client + .fetch_single_with_provider( + BlockSelection::Height(height), + fetch_mode, + Some(self.provider_index), + ) + .await + }), + )) + } + + async fn insert(&self, tx: &mut sqlx::Transaction<'_, Postgres>) -> Result<(), IndexerError> { + let reference = self.reference(); + debug!("{}", reference); + + let block_to_insert = self.get_block_insert().await?; + + match block_to_insert { + Some(block_to_insert) => { + debug!( + "{}: block with transactions ({}) => insert", + reference, + block_to_insert.transactions.len() + ); + + // TODO: remove to this module once legacy eth is removed + insert_batch_logs(tx, vec![block_to_insert.into()], InsertMode::Insert).await?; + } + None => { + debug!("{}: block without transactions => ignore", reference); + } + } + + // TODO: remove once all data based on new hubble tables + debug!("{}: updating contract heights", reference); + update_contracts_indexed_heights( + tx, + self.eth_client + .contracts + .iter() + .map(|addr| format!("0x{}", addr.encode_hex())) + .collect(), + reference.height as i64, + reference.timestamp, + self.eth_client.chain_id, + ) + .await?; + + debug!("{}: done", reference); + + Ok(()) + } + + async fn update(&self, tx: &mut sqlx::Transaction<'_, Postgres>) -> Result<(), IndexerError> { + let reference = self.reference(); + debug!("{}", reference); + + let block_to_insert = self.get_block_insert().await?; + + if let Some(block_to_insert) = block_to_insert { + debug!( + "{}: block with transactions ({}) => upsert", + reference, + block_to_insert.transactions.len() + ); + insert_batch_logs(tx, vec![block_to_insert.into()], InsertMode::Upsert).await?; + } else { + debug!("{}: block without transactions => delete", reference); + delete_eth_log(tx, self.eth_client.chain_id.db, reference.height).await?; + } + + debug!("{}: done", reference); + Ok(()) + } +} diff --git a/hubble/src/indexer/eth/config.rs b/hubble/src/indexer/eth/config.rs new file mode 100644 index 0000000000..92c48f4869 --- /dev/null +++ b/hubble/src/indexer/eth/config.rs @@ -0,0 +1,66 @@ +use color_eyre::eyre::Report; +use sqlx::PgPool; +use url::Url; + +use super::{context::EthContext, fetcher_client::EthFetcherClient}; +use crate::indexer::{ + api::{BlockHeight, IndexerId}, + Indexer, +}; + +const DEFAULT_CHUNK_SIZE: usize = 200; + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct Config { + pub indexer_id: IndexerId, + pub start_height: BlockHeight, + pub chunk_size: Option, + pub urls: Vec, + pub internal_chain_id: Option, + pub new_chain_override: Option, +} + +impl Config { + pub async fn build(self, pg_pool: PgPool) -> Result, Report> { + // temporary safety-measure to fetch the start height from the contracts table, + // because there will be no chain_state record after migrations + + let start_height = match self.internal_chain_id { + Some(internal_chain_id) => { + let record = sqlx::query!( + r#" + SELECT MAX(height) as height + FROM hubble.contract_status + WHERE internal_chain_id = $1 + "#, + internal_chain_id, + ) + .fetch_optional(&pg_pool) + .await?; + + record + .expect("record when internal chain id is configured") + .height + .map(|h| h as BlockHeight) + .expect("expecting height when existing chain is configured") + } + None => { + assert!( + self.new_chain_override + .expect("new chain override to be configured"), + "new chain override to be true" + ); + + self.start_height + } + }; + + Ok(Indexer::new( + pg_pool, + self.indexer_id, + start_height, + self.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE), + EthContext { urls: self.urls }, + )) + } +} diff --git a/hubble/src/indexer/eth/context.rs b/hubble/src/indexer/eth/context.rs new file mode 100644 index 0000000000..6092f12651 --- /dev/null +++ b/hubble/src/indexer/eth/context.rs @@ -0,0 +1,23 @@ +use std::fmt::Display; + +use url::Url; + +#[derive(Clone)] +pub struct EthContext { + pub urls: Vec, +} + +impl Display for EthContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "urls: {}", + self.urls + .iter() + .enumerate() + .map(|(index, url)| format!("{}: {}", index, url.as_str().to_string())) + .collect::>() + .join(", ") + ) + } +} diff --git a/hubble/src/indexer/eth/create_client_tracker.rs b/hubble/src/indexer/eth/create_client_tracker.rs new file mode 100644 index 0000000000..a3e8fba8fb --- /dev/null +++ b/hubble/src/indexer/eth/create_client_tracker.rs @@ -0,0 +1,92 @@ +use std::{str::FromStr, time::Duration}; + +use alloy::{ + primitives::FixedBytes, + providers::RootProvider, + transports::http::{Client, Http}, +}; +use tokio::{task::JoinSet, time::interval}; +use tracing::{debug, info, info_span, warn, Instrument}; +use unionlabs::{ + encoding::{DecodeAs, EthAbi}, + ibc::lightclients::cometbls::client_state::ClientState, +}; + +use crate::{chain_id_query::IbcHandler, indexer::api::IndexerError, race_client::RaceClient}; + +pub fn schedule_create_client_checker( + pg_pool: sqlx::PgPool, + join_set: &mut JoinSet>, + provider: RaceClient>>, + internal_chain_id: i32, +) { + join_set.spawn(async move { + let mut interval = interval(Duration::from_secs(10 * 60)); + + loop { + info!("check"); + + let eth_clients = sqlx::query!( + r#" + SELECT cl.transaction_hash, cl.height, cl.log_index, cl.client_id + FROM v1_evm.client_created cl + WHERE cl.internal_chain_id = $1 + "#, + internal_chain_id + ) + .fetch_all(&pg_pool) + .await + .unwrap(); + + for record in eth_clients { + let height = record.height.expect("block height"); + let transaction_hash = record.transaction_hash.expect("transaction hash"); + + info!("{}-{}: checking", height, transaction_hash); + + let Some(client_id) = record.client_id else { + debug!("{}-{}: no client id => skipping", height, transaction_hash); + continue; + }; + + let tx = provider + .get_transaction_by_hash(FixedBytes::from_str(&transaction_hash).expect("valid transaction hash")) + .await? + .expect("transaction"); + + let msg = match ::abi_decode(&tx.input,true) { + Ok(msg) => msg, + Err(err) => { + warn!("{}-{}: cannot decode, most likely due to ABI change: {} => skipping", height, transaction_hash, err); + continue + } + }; + + match &*msg._0.client_type { + "cometbls" => { + let cs = ClientState::decode_as::(&msg._0.client_state_bytes).unwrap(); + + sqlx::query!( + r#" + INSERT INTO v0.clients (chain_id, client_id, counterparty_chain_id) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING + "#, + internal_chain_id, + client_id, + cs.chain_id.to_string(), + ) + .execute(&pg_pool) + .await?; + } + ty => { + warn!("{}-{}: unknown evm client type `{} => skipping", height, transaction_hash, ty); + continue + } + } + } + + interval.tick().await; + } + }.instrument(info_span!("clients"))); +} diff --git a/hubble/src/indexer/eth/fetcher_client.rs b/hubble/src/indexer/eth/fetcher_client.rs new file mode 100644 index 0000000000..3266425ade --- /dev/null +++ b/hubble/src/indexer/eth/fetcher_client.rs @@ -0,0 +1,336 @@ +use std::{collections::HashMap, fmt::Display, time::Duration}; + +use alloy::{ + eips::BlockId, + primitives::{Address, BloomInput}, + providers::{Provider, ProviderBuilder, RootProvider}, + rpc::types::{Block, BlockTransactionsKind, Filter, Log}, + transports::http::{Client, Http}, +}; +use axum::async_trait; +use color_eyre::eyre::Report; +use time::OffsetDateTime; +use tokio::task::JoinSet; +use tracing::{debug, info, info_span, trace, warn, Instrument}; + +use super::{block_handle::EthBlockHandle, context::EthContext}; +use crate::{ + eth::{BlockInsert, EventInsert, FromProviderError, ToLowerHex, TransactionInsert}, + indexer::{ + api::{BlockReference, BlockSelection, FetchMode, FetcherClient, IndexerError}, + eth::{block_handle::BlockDetails, create_client_tracker::schedule_create_client_checker}, + }, + postgres::{fetch_or_insert_chain_id_tx, ChainId}, + race_client::RaceClient, +}; + +trait BlockReferenceProvider { + fn block_reference(&self) -> Result; +} + +impl BlockReferenceProvider for Block { + fn block_reference(&self) -> Result { + Ok(BlockReference { + height: self + .header + .number + .ok_or(Report::msg("block without a number"))?, + hash: self + .header + .hash + .map(|h| h.to_lower_hex()) + .ok_or(Report::msg("block without a hash"))?, + timestamp: OffsetDateTime::from_unix_timestamp(self.header.timestamp as i64) + .map_err(FromProviderError::from)?, + }) + } +} + +#[derive(Clone)] +pub struct EthFetcherClient { + pub chain_id: ChainId, + pub provider: RaceClient>>, + pub contracts: Vec
, +} + +impl Display for EthFetcherClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "chain_id: {}", self.chain_id) + } +} + +impl EthFetcherClient { + pub async fn fetch_single_with_provider( + &self, + selection: BlockSelection, + mode: FetchMode, + provider_index: Option, + ) -> Result { + let provider = match provider_index { + Some(provider_index) => { + RaceClient::new(vec![self.provider.clients[provider_index].clone()]) + } + None => self.provider.clone(), + }; + + match provider_index { + None => debug!("{}: fetching (race)", selection), + Some(provider_index) => debug!( + "{}: fetching (provider index: {})", + selection, provider_index + ), + } + + let block = provider + .get_block( + match selection { + BlockSelection::LastFinalized => BlockId::finalized(), + BlockSelection::Height(height) => BlockId::number(height), + }, + BlockTransactionsKind::Full, + ) + .await; + + match block { + Ok(Some(block)) => { + let fastest_index = self.provider.fastest_index(); + + debug!("{}: fetched (provider index: {})", selection, fastest_index); + + Ok(EthBlockHandle { + reference: block.block_reference()?, + details: match mode { + FetchMode::Lazy => BlockDetails::Lazy(block), + FetchMode::Eager => { + BlockDetails::Eager(self.fetch_details(&block, fastest_index).await?) + } + }, + eth_client: self.clone(), + provider_index: fastest_index, + }) + } + Ok(None) => { + info!("{}: does not exist", selection); + + Err(IndexerError::NoBlock(selection).into()) + } + Err(report) => { + info!("{}: error: {}", selection, report); + + Err(report.into()) + } + } + } + + pub async fn fetch_details( + &self, + block: &Block, + provider_index: usize, + ) -> Result, Report> { + let block_reference = block.block_reference()?; + + info!("{}: fetch", block_reference); + // We check for a potential log match, which potentially avoids querying + // eth_getLogs. + let bloom = block.header.logs_bloom; + if self + .contracts + .iter() + .all(|contract| !bloom.contains_input(BloomInput::Raw(&contract.into_array()))) + { + info!("{}: ignored (bloom)", block_reference); + return Ok(None); + } + + // We know now there is a potential match, we still apply a Filter to only + // get the logs we want. + let log_filter = Filter::new().select(block.header.hash.unwrap()); + let log_addresses: Vec
= self.contracts.iter().cloned().collect(); + let log_filter = log_filter.address(log_addresses); + + let logs = self.provider.clients[provider_index] + .get_logs(&log_filter) + .await + .map_err(FromProviderError::DataNotFound)?; + + // The bloom filter returned a false positive, and we don't actually have matching logs. + if logs.is_empty() { + info!("{}: fetch => ignored (strict)", block_reference); + return Ok(None); + } + + let events_by_transaction = { + let mut map: HashMap<(_, _), Vec> = HashMap::with_capacity(logs.len()); + for log in logs { + if log.removed { + continue; + } + + map.entry(( + log.transaction_hash.unwrap(), + log.transaction_index.unwrap(), + )) + .and_modify(|logs| logs.push(log.clone())) + .or_insert(vec![log]); + } + map + }; + + let transactions: Vec = events_by_transaction + .into_iter() + .map(|((transaction_hash, transaction_index), logs)| { + let transaction_hash = transaction_hash.to_lower_hex(); + let transaction_index = transaction_index as i32; + + let events: Vec = logs + .into_iter() + .enumerate() + .map(|(transaction_log_index, log)| { + let data = serde_json::to_value(&log).unwrap(); + EventInsert { + data, + log_index: log.log_index.unwrap() as usize, + transaction_log_index: transaction_log_index as i32, + } + }) + .collect(); + + trace!( + "{}: fetch => events: {} (transaction {}/{})", + block_reference, + events.len(), + transaction_index, + transaction_hash + ); + + TransactionInsert { + hash: transaction_hash, + index: transaction_index, + events, + } + }) + .collect(); + + debug!( + "{}: fetch => done (transactions: {})", + block_reference, + transactions.len() + ); + + Ok(Some(BlockInsert { + chain_id: self.chain_id, + hash: block_reference.hash, + header: block.clone(), + height: block_reference.height as i32, + time: block_reference.timestamp, + transactions, + })) + } +} + +#[async_trait] +impl FetcherClient for EthFetcherClient { + type BlockHandle = EthBlockHandle; + type Context = EthContext; + + async fn create( + pg_pool: sqlx::PgPool, + join_set: &mut JoinSet>, + context: EthContext, + ) -> Result { + let provider = RaceClient::new( + context + .urls + .into_iter() + .map(|url| ProviderBuilder::new().on_http(url)) + .collect(), + ); + + info!("fetching chain-id from node"); + let chain_id = provider.get_chain_id().await?; + info!("fetched chain-id from node: {}", chain_id); + + let indexing_span = info_span!("indexer", chain_id = chain_id); + async move { + let mut tx = pg_pool.begin().await?; + + let chain_id = fetch_or_insert_chain_id_tx(&mut tx, chain_id.to_string()) + .await? + .get_inner_logged(); + + // TODO: remove once all data is based on new hubble tables + let rows = loop { + let rows = sqlx::query!( + r#" + SELECT c.address, COALESCE(cs.height, c.height - 1) as indexed_height + FROM v0.contracts c + LEFT JOIN hubble.contract_status cs + ON c.chain_id = cs.internal_chain_id and c.address = cs.address + WHERE c.chain_id = $1 + "#, + chain_id.db + ) + .fetch_all(tx.as_mut()) + .await?; + + if rows.is_empty() { + warn!("no contracts found to track, retrying in 20 seconds"); + tokio::time::sleep(Duration::from_secs(20)).await; + continue; + } + break rows; + }; + + let lowest: u64 = rows + .iter() + .map(|row| row.indexed_height.expect("query to return indexed_height")) + .min() + .expect("contracts should exist in the db") + .try_into() + .expect("indexed_height should be positive"); + + let highest: u64 = rows + .iter() + .map(|row| row.indexed_height.expect("query to return indexed_height")) + .max() + .expect("contracts should exist in the db") + .try_into() + .expect("indexed_height should be positive"); + + if lowest != highest { + info!("detected new contract. reload blocks that might be affected."); + warn!("NOT YET IMPLEMENTED"); + // TODO: initiate repair + }; + + let contracts = rows + .into_iter() + .map(|row| { + row.address + .parse() + .expect("database should contain valid addresses") + }) + .collect(); + + tx.commit().await?; + + schedule_create_client_checker(pg_pool, join_set, provider.clone(), chain_id.db); + + Ok(EthFetcherClient { + chain_id, + provider, + contracts, + }) + } + .instrument(indexing_span) + .await + } + + async fn fetch_single( + &self, + selection: BlockSelection, + mode: FetchMode, + ) -> Result { + self.fetch_single_with_provider(selection, mode, None).await + } +} diff --git a/hubble/src/indexer/eth/mod.rs b/hubble/src/indexer/eth/mod.rs new file mode 100644 index 0000000000..abaac21603 --- /dev/null +++ b/hubble/src/indexer/eth/mod.rs @@ -0,0 +1,17 @@ +use alloy::transports::{RpcError, TransportErrorKind}; +use color_eyre::eyre::Report; + +use super::api::IndexerError; + +mod block_handle; +pub mod config; +mod context; +mod create_client_tracker; +mod fetcher_client; +mod postgres; + +impl From> for IndexerError { + fn from(error: RpcError) -> Self { + Self::ProviderError(Report::from(error)) + } +} diff --git a/hubble/src/indexer/eth/postgres.rs b/hubble/src/indexer/eth/postgres.rs new file mode 100644 index 0000000000..099b8b1c5f --- /dev/null +++ b/hubble/src/indexer/eth/postgres.rs @@ -0,0 +1,23 @@ +use sqlx::{Postgres, Transaction}; + +use crate::{indexer::api::BlockHeight, postgres::schedule_replication_reset}; + +pub async fn delete_eth_log( + tx: &mut Transaction<'_, Postgres>, + chain_id: i32, + height: BlockHeight, +) -> sqlx::Result<()> { + sqlx::query!( + " + DELETE FROM v0.logs WHERE chain_id = $1 AND height = $2 + ", + chain_id, + height as i32 + ) + .execute(tx.as_mut()) + .await?; + + schedule_replication_reset(tx, chain_id, height as i64, "block reorg (delete)").await?; + + Ok(()) +} diff --git a/hubble/src/indexer/fetcher.rs b/hubble/src/indexer/fetcher.rs new file mode 100644 index 0000000000..832e175b6a --- /dev/null +++ b/hubble/src/indexer/fetcher.rs @@ -0,0 +1,165 @@ +use std::time::Duration; + +use color_eyre::eyre::Report; +use tokio::time::sleep; +use tracing::{debug, error, info, info_span, warn, Instrument}; + +use super::{ + api::{BlockHeight, FetcherClient}, + Indexer, +}; +use crate::indexer::{ + api::{BlockHandle, BlockRange, BlockSelection, FetchMode, IndexerError}, + postgres::{get_current_height, update_block_status, update_current_height}, + HappyRangeFetcher, +}; + +impl Indexer { + pub async fn run_fetcher(&self, fetcher_client: T) -> Result<(), IndexerError> { + self.run_to_finalized(&fetcher_client) + .instrument(info_span!("run-to-finalized")) + .await?; + self.run_to_tip(&fetcher_client) + .instrument(info_span!("run-to-tip")) + .await + } + + async fn run_to_finalized(&self, fetcher_client: &T) -> Result<(), IndexerError> { + loop { + debug!("fetching last finalized block"); + match fetcher_client + .fetch_single(BlockSelection::LastFinalized, FetchMode::Lazy) + .await + { + Ok(last_finalized) => { + let next_height = self.next_height().await?; + if next_height + self.chunk_size as u64 > last_finalized.reference().height { + info!("near finalized height (current: {} finalized: {}) => start 'run to tip'", next_height, last_finalized.reference()); + return Ok(()); + } + + let catch_up_range: BlockRange = + (next_height..last_finalized.reference().height).into(); + info!("missing blocks: {}", catch_up_range); + + for slice in catch_up_range.range_chunks(self.chunk_size).into_iter() { + info!("{}: handling chunk", slice); + + last_finalized + .fetch_range_expect_all(slice.clone(), FetchMode::Eager, |block| { + self.store_block(block) + }) + .instrument(info_span!("store")) + .await?; + + info!("{}: handled chunk", &slice); + } + } + Err(IndexerError::NoBlock(_)) => { + info!("no finalized block => start 'run to tip'"); + return Ok(()); + } + Err(error) => { + info!("error reading: {}", error); + return Err(error); + } + }; + } + } + + async fn store_block(&self, block_handle: T::BlockHandle) -> Result<(), Report> { + let reference = block_handle.reference(); + debug!("store: {}", reference); + + let mut tx = self.pg_pool.begin().await?; + + block_handle.insert(&mut tx).await?; + + update_current_height( + &mut tx, + self.indexer_id.clone(), + reference.height, + reference.timestamp, + ) + .await?; + + tx.commit().await?; + + Ok(()) + } + + async fn run_to_tip(&self, fetcher_client: &T) -> Result<(), IndexerError> { + loop { + let next_height = self.next_height().await?; + info!("{}: fetching", next_height); + + match fetcher_client + .fetch_single(BlockSelection::Height(next_height), FetchMode::Eager) + .await + { + Ok(block_handle) => { + let reference = &block_handle.reference(); + debug!("{}: handling", reference); + + if next_height != reference.height { + error!( + "{}: unexpected height (actual {}, expecting {})", + reference.height, reference.height, next_height + ); + return Err(IndexerError::UnexpectedHeightSingle( + next_height, + reference.height, + )); + } + + let mut tx = self.pg_pool.begin().await?; + block_handle + .insert(&mut tx) + .instrument(info_span!("insert")) + .await?; + + debug!("{}: update height", reference); + update_current_height( + &mut tx, + self.indexer_id.clone(), + reference.height, + reference.timestamp, + ) + .await?; + + debug!("{}: update status", reference); + update_block_status( + &mut tx, + self.indexer_id.clone(), + reference.height, + reference.hash.clone(), + reference.timestamp, + ) + .await?; + + tx.commit().await?; + debug!("{}: handled", reference); + } + Err(IndexerError::NoBlock(_)) => { + debug!("{}: no block yet => sleep", next_height); + sleep(Duration::from_millis(1000)).await; + } + Err(_) => { + warn!("{}: error reading block => sleep", next_height); + sleep(Duration::from_millis(1000)).await; + } + } + } + } + + async fn next_height(&self) -> Result { + let mut tx = self.pg_pool.begin().await?; + let result = get_current_height(&mut tx, self.indexer_id.clone()) + .await? + .map(|current_height| current_height + 1) // we store the last indexed height, so next is one higher + .unwrap_or(self.start_height); + tx.commit().await?; + + Ok(result) + } +} diff --git a/hubble/src/indexer/finalizer.rs b/hubble/src/indexer/finalizer.rs new file mode 100644 index 0000000000..df176e84f3 --- /dev/null +++ b/hubble/src/indexer/finalizer.rs @@ -0,0 +1,199 @@ +use std::{cmp::min, time::Duration}; + +use color_eyre::eyre::Report; +use tokio::time::sleep; +use tracing::{debug, info, info_span, trace, warn, Instrument}; + +use super::{ + api::{BlockRange, FetcherClient, IndexerError}, + postgres::get_next_block_to_refresh, + Indexer, +}; +use crate::indexer::{ + api::{BlockHandle, BlockHeight, BlockSelection, FetchMode}, + postgres::{ + delete_block_status, get_block_range_to_finalize, get_block_status_hash, + update_block_status, + }, + HappyRangeFetcher, +}; + +impl Indexer { + pub async fn run_finalizer(&self, fetcher_client: T) -> Result<(), IndexerError> { + loop { + if let Some(block_range_to_finalize) = self.block_range_to_finalize().await? { + info!("{}: begin", block_range_to_finalize); + + match fetcher_client + .fetch_single(BlockSelection::LastFinalized, FetchMode::Lazy) + .await + { + Ok(last_finalized) => { + let reference = last_finalized.reference(); + trace!( + "{}: current finalized: {}", + block_range_to_finalize, + reference + ); + + if block_range_to_finalize.start_inclusive <= reference.height { + // find the end of the range to finalize + let end_of_chunk = block_range_to_finalize.start_inclusive + + self.chunk_size as BlockHeight; + let end_until_finalized = reference.height + 1; + let end_until_last_tracked_block = + block_range_to_finalize.end_exclusive; + + let range_to_finalize_end_exclusive = min( + end_of_chunk, + min(end_until_finalized, end_until_last_tracked_block), + ); + + let range_to_finalize = (block_range_to_finalize.start_inclusive + ..range_to_finalize_end_exclusive) + .into(); + debug!( + "{}: finalizing: {}", + block_range_to_finalize, range_to_finalize + ); + + self.finalize_blocks(&last_finalized, range_to_finalize) + .instrument(info_span!("finalize")) + .await?; + } else { + trace!( + "{}: nothing to finalize (before finalized {})", + block_range_to_finalize, + reference + ); + } + + if let Some(height) = self.next_block_to_refresh(reference.height).await? { + let range_to_refresh = (height + ..(min( + height + self.chunk_size as BlockHeight, + block_range_to_finalize.end_exclusive, + ))) + .into(); + info!("updating: {}", range_to_refresh); + + self.finalize_blocks(&last_finalized, range_to_refresh) + .instrument(info_span!("monitor")) + .await?; + } else { + trace!("{}: nothing to update", block_range_to_finalize); + } + } + Err(IndexerError::NoBlock(_)) => { + info!("no finalized height => retry later"); + sleep(Duration::from_millis(1000)).await; + } + Err(error) => { + warn!("error fetching finalized height ({}) => retry later", error); + sleep(Duration::from_millis(1000)).await; + } + } + + if block_range_to_finalize.len() < self.chunk_size as u64 { + info!("not much to finalize => retry later"); + sleep(Duration::from_millis(1000)).await; + } + } else { + info!("nothing to finalize => retry later"); + sleep(Duration::from_millis(1000)).await; + } + } + } + + async fn finalize_blocks( + &self, + last_finalized: &T::BlockHandle, + block_range: BlockRange, + ) -> Result<(), Report> { + let last_finalized_reference = last_finalized.reference(); + + last_finalized + .fetch_range_expect_all(block_range.clone(), FetchMode::Lazy, |block| { + self.finalize_block(block, last_finalized_reference.height) + }) + .instrument(info_span!("finalize")) + .await?; + + info!("{}: done", block_range); + + Ok(()) + } + + async fn finalize_block( + &self, + block: T::BlockHandle, + last_finalized_height: BlockHeight, + ) -> Result<(), Report> { + let reference = block.reference(); + debug!("{}: finalizing", reference); + + let mut tx = self.pg_pool.begin().await?; + let is_finalized = last_finalized_height >= reference.height; + + if let Some(old_hash) = match is_finalized { + true => delete_block_status(&mut tx, self.indexer_id.clone(), reference.height).await?, + false => { + get_block_status_hash(&mut tx, self.indexer_id.clone(), reference.height).await? + } + } { + if old_hash != reference.hash { + debug!( + "{}: changed ({} > {} => updating)", + reference.height, old_hash, reference.hash, + ); + block + .update(&mut tx) + .instrument(info_span!("update")) + .await?; + } + + if !is_finalized { + debug!("{}: update status", reference); + update_block_status( + &mut tx, + self.indexer_id.clone(), + reference.height, + reference.hash.clone(), + reference.timestamp, + ) + .await?; + } + } else { + warn!( + "{}: expecting block-status, but there was none at height", + reference + ); + } + + tx.commit().await?; + + debug!("{}: finalized", reference); + + Ok(()) + } + + async fn block_range_to_finalize(&self) -> Result, Report> { + let mut tx = self.pg_pool.begin().await?; + let result = get_block_range_to_finalize(&mut tx, self.indexer_id.clone()).await?; + tx.commit().await?; + + Ok(result) + } + + async fn next_block_to_refresh( + &self, + consensus_height: BlockHeight, + ) -> Result, Report> { + let mut tx = self.pg_pool.begin().await?; + let result = + get_next_block_to_refresh(&mut tx, self.indexer_id.clone(), consensus_height).await?; + tx.commit().await?; + + Ok(result) + } +} diff --git a/hubble/src/indexer/fixer.rs b/hubble/src/indexer/fixer.rs new file mode 100644 index 0000000000..a50efafc8f --- /dev/null +++ b/hubble/src/indexer/fixer.rs @@ -0,0 +1,128 @@ +use std::{cmp::min, time::Duration}; + +use color_eyre::eyre::Report; +use tokio::time::sleep; +use tracing::{debug, info, info_span, trace, warn, Instrument}; + +use super::{ + api::{BlockRange, FetcherClient, IndexerError}, + postgres::{get_block_range_to_fix, update_block_range_to_fix}, + Indexer, +}; +use crate::indexer::{ + api::{BlockHandle, BlockHeight, BlockSelection, FetchMode}, + HappyRangeFetcher, +}; + +impl Indexer { + pub async fn run_fixer(&self, fetcher_client: T) -> Result<(), IndexerError> { + loop { + if let Some(block_range_to_fix) = self.block_range_to_fix().await? { + info!("{}: begin", block_range_to_fix); + + match fetcher_client + .fetch_single(BlockSelection::LastFinalized, FetchMode::Lazy) + .await + { + Ok(last_finalized) => { + let last_finalized_reference = last_finalized.reference(); + + trace!( + "{}: current finalized: {}", + block_range_to_fix, + last_finalized_reference, + ); + + if block_range_to_fix.start_inclusive <= last_finalized_reference.height { + // find the end of the range to fix + let end_of_chunk_exclusive = + block_range_to_fix.start_inclusive + self.chunk_size as BlockHeight; + let end_until_finalized = last_finalized_reference.height + 1; + let end_until_last_block_to_fix = block_range_to_fix.end_exclusive; + + let range_to_fix_end = min( + end_of_chunk_exclusive, + min(end_until_finalized, end_until_last_block_to_fix), + ); + + let range_to_fix: BlockRange = + (block_range_to_fix.start_inclusive..range_to_fix_end).into(); + debug!("{}: fixing: {}", block_range_to_fix, range_to_fix); + + self.fix_blocks(&last_finalized, range_to_fix.clone()) + .instrument(info_span!("fix")) + .await?; + + self.remove_blocks_to_fix(range_to_fix).await? + } + } + Err(IndexerError::NoBlock(_)) => { + info!("{}: no finalized block => retry later", block_range_to_fix); + sleep(Duration::from_millis(1000)).await; + } + Err(error) => { + warn!( + "{}: error finding finalized block ({}) => retry later", + block_range_to_fix, error, + ); + sleep(Duration::from_millis(1000)).await; + } + } + } else { + info!("nothing scheduled to fix => retry later"); + sleep(Duration::from_millis(1000)).await; + } + } + } + + async fn fix_blocks( + &self, + last_finalized: &T::BlockHandle, + block_range: BlockRange, + ) -> Result<(), Report> { + last_finalized + .fetch_range_expect_all(block_range.clone(), FetchMode::Eager, |block| { + self.fix_block(block) + }) + .instrument(info_span!("fix")) + .await?; + + info!("{}: done", &block_range); + + Ok(()) + } + + async fn fix_block(&self, block: T::BlockHandle) -> Result<(), Report> { + let reference = block.reference(); + debug!("{}: fixing", reference); + + let mut tx = self.pg_pool.begin().await?; + + block + .update(&mut tx) + .instrument(info_span!("rewrite")) + .await?; + + tx.commit().await?; + + debug!("{}: fixed", reference); + + Ok(()) + } + + async fn block_range_to_fix(&self) -> Result, Report> { + let mut tx = self.pg_pool.begin().await?; + let result = get_block_range_to_fix(&mut tx, self.indexer_id.clone()).await?; + tx.commit().await?; + + Ok(result) + } + + async fn remove_blocks_to_fix(&self, range: BlockRange) -> Result<(), Report> { + let mut tx = self.pg_pool.begin().await?; + update_block_range_to_fix(&mut tx, self.indexer_id.clone(), range).await?; + tx.commit().await?; + + Ok(()) + } +} diff --git a/hubble/src/indexer/mod.rs b/hubble/src/indexer/mod.rs new file mode 100644 index 0000000000..0837506f33 --- /dev/null +++ b/hubble/src/indexer/mod.rs @@ -0,0 +1,216 @@ +pub mod api; +pub mod dummy; +pub mod eth; +mod fetcher; +mod finalizer; +mod fixer; +mod postgres; + +use std::{future::Future, time::Duration}; + +use api::{ + BlockHandle, BlockHeight, BlockRange, FetchMode, FetcherClient, IndexerError, IndexerId, +}; +use color_eyre::eyre::Report; +use futures::StreamExt; +use tokio::{task::JoinSet, time::sleep}; +use tracing::{error, info}; + +enum EndOfRunResult { + Exit, + Restart, +} + +#[derive(Clone)] +pub struct Indexer { + pub pg_pool: sqlx::PgPool, + pub indexer_id: IndexerId, + pub start_height: BlockHeight, + pub chunk_size: usize, + pub context: T::Context, +} + +impl Indexer +where + T: FetcherClient, +{ + pub fn new( + pg_pool: sqlx::PgPool, + indexer_id: IndexerId, + start_height: BlockHeight, + chunk_size: usize, + context: T::Context, + ) -> Self { + Indexer { + pg_pool, + indexer_id, + start_height, + chunk_size, + context, + } + } + + pub async fn index(&self) -> Result<(), Report> { + loop { + let mut join_set = JoinSet::new(); + + match self + .create_fetcher_client(&mut join_set, self.context.clone()) + .await + { + Some(fetcher_client) => { + let self_clone = self.clone(); + let fetcher_client_clone = fetcher_client.clone(); + join_set + .spawn(async move { self_clone.run_fetcher(fetcher_client_clone).await }); + + let self_clone = self.clone(); + let fetcher_client_clone = fetcher_client.clone(); + join_set + .spawn(async move { self_clone.run_finalizer(fetcher_client_clone).await }); + + let self_clone = self.clone(); + let fetcher_client_clone = fetcher_client.clone(); + join_set.spawn(async move { self_clone.run_fixer(fetcher_client_clone).await }); + + if let EndOfRunResult::Exit = self + .handle_end_of_run(&mut join_set, fetcher_client) + .await? + { + return Ok(()); + } + } + None => { + // can't create client => try again later + sleep(Duration::from_millis(1000)).await; + } + } + } + } + + async fn create_fetcher_client( + &self, + join_set: &mut JoinSet>, + context: T::Context, + ) -> Option { + info!("creating client (context: {})", self.context); + match T::create(self.pg_pool.clone(), join_set, context).await { + Ok(client) => { + info!("created client: {}", client); + Some(client) + } + Err(error) => { + error!( + "error creating client: {} (context: {})", + error, self.context + ); + None + } + } + } + + async fn handle_end_of_run( + &self, + join_set: &mut JoinSet>, + fetcher_client: T, + ) -> Result { + while let Some(res) = join_set.join_next().await { + match res { + Ok(Err(err)) => { + error!( + "{}: error: {:?}. re-initialize (client: {}, context: {})", + self.indexer_id, err, fetcher_client, self.context + ); + join_set.abort_all(); + sleep(Duration::from_secs(1)).await; + info!("{}: restarting", self.indexer_id); + return Ok(EndOfRunResult::Restart); + } + Err(err) => return Err(err.into()), + Ok(Ok(_)) => { + info!("{}: indexer exited gracefully", self.indexer_id); + } + } + } + Ok(EndOfRunResult::Exit) + } +} + +// Utility that verifies that received blocks have the expected height. +// 'f' can assume that the height is verified, so it only needs to +// implement the 'happy path'. +pub trait HappyRangeFetcher { + async fn fetch_range_expect_all( + &self, + range: BlockRange, + mode: FetchMode, + f: F, + ) -> Result<(), IndexerError> + where + F: Fn(T) -> Fut, + Fut: Future>; +} + +impl HappyRangeFetcher for T { + async fn fetch_range_expect_all( + &self, + range: BlockRange, + mode: FetchMode, + f: F, + ) -> Result<(), IndexerError> + where + F: Fn(T) -> Fut, + Fut: Future>, + { + let mut stream = self.fetch_range(range.clone(), mode)?; + let mut expected_block_heights = range.clone().into_iter(); + + while let Some(expected_block_height) = expected_block_heights.next() { + match stream.next().await { + Some(Ok(block)) => { + let actual_block_height = block.reference().height; + + match expected_block_height == actual_block_height { + true => f(block).await?, + false => { + error!( + "{}: unexpected height (actual {}, expecting {})", + actual_block_height, actual_block_height, expected_block_height + ); + return Err(IndexerError::UnexpectedHeightRange( + expected_block_height, + range, + actual_block_height, + )); + } + } + } + Some(Err(error)) => { + error!("{}: error reading block: {})", expected_block_height, error); + return Err(IndexerError::ErrorReadingBlock( + expected_block_height, + range, + error.into(), + )); + } + None => { + error!( + "{}: missing block: {})", + expected_block_height, expected_block_height + ); + return Err(IndexerError::MissingBlock(expected_block_height, range)); + } + } + } + + if let Some(result) = stream.next().await { + error!("{}: too many blocks", range); + return Err(match result { + Ok(block) => IndexerError::TooManyBlocks(range, block.reference()), + Err(error) => IndexerError::TooManyBlocksError(range, Report::from(error)), + }); + } + + Ok(()) + } +} diff --git a/hubble/src/indexer/postgres.rs b/hubble/src/indexer/postgres.rs new file mode 100644 index 0000000000..d255da45b4 --- /dev/null +++ b/hubble/src/indexer/postgres.rs @@ -0,0 +1,223 @@ +use core::fmt::Debug; + +use sqlx::Postgres; +use time::OffsetDateTime; + +use crate::indexer::api::{BlockHash, BlockHeight, BlockRange, IndexerId}; + +pub async fn get_current_height( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, +) -> sqlx::Result> { + let record = sqlx::query!( + " + SELECT height + FROM hubble.indexer_status + WHERE indexer_id = $1 + LIMIT 1 + ", + indexer_id, + ) + .fetch_optional(tx.as_mut()) + .await?; + + Ok(record.map(|h| h.height as BlockHeight)) +} + +pub async fn update_current_height( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, + height: BlockHeight, + timestamp: OffsetDateTime, +) -> sqlx::Result<()> { + sqlx::query!( + " + INSERT INTO hubble.indexer_status (indexer_id, height, timestamp) + VALUES ($1, $2, $3) + ON CONFLICT (indexer_id) DO + UPDATE SET + height = excluded.height, + timestamp = excluded.timestamp + ", + indexer_id, + height as i64, + timestamp, + ) + .execute(tx.as_mut()) + .await?; + + Ok(()) +} + +pub async fn get_block_range_to_finalize( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, +) -> sqlx::Result> { + let record = sqlx::query!( + " + SELECT MIN(height) min_height, MAX(height) max_height + FROM hubble.block_status + WHERE indexer_id = $1 + ", + indexer_id, + ) + .fetch_one(tx.as_mut()) + .await?; + + Ok(match (record.min_height, record.max_height) { + (Some(min), Some(max)) => Some((min as BlockHeight..max as BlockHeight + 1).into()), + (None, None) => None, + _ => unreachable!("expecting min_height and max_height to be either null or available"), + }) +} + +pub async fn get_next_block_to_refresh( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, + consensus_height: BlockHeight, +) -> sqlx::Result> { + let record = sqlx::query!( + " + SELECT height height + FROM hubble.block_status + WHERE indexer_id = $1 AND height > $2 + ORDER BY updated_at + ", + indexer_id, + consensus_height as i64, + ) + .fetch_optional(tx.as_mut()) + .await?; + + Ok(record.map(|r| r.height as BlockHeight)) +} + +pub async fn get_block_range_to_fix( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, +) -> sqlx::Result> { + let record = sqlx::query!( + " + SELECT start_height, max(end_height) as end_height + FROM hubble.block_fix + WHERE start_height = ( + SELECT min(start_height) + FROM hubble.block_fix + WHERE indexer_id = $1 + ) + GROUP BY start_height + ", + indexer_id, + ) + .fetch_optional(tx.as_mut()) + .await?; + + Ok(record.map(|r| { + (r.start_height as BlockHeight + ..r.end_height.expect("end_height column value") as BlockHeight) + .into() + })) +} + +pub async fn update_block_range_to_fix( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, + range: BlockRange, +) -> sqlx::Result<()> { + // update start of ranges + sqlx::query!( + " + UPDATE hubble.block_fix + SET start_height = $3 + WHERE indexer_id = $1 + AND start_height = $2 + ", + indexer_id, + range.start_inclusive as i64, + range.end_exclusive as i64, + ) + .execute(tx.as_mut()) + .await?; + + // remove empty ranges + sqlx::query!( + " + DELETE FROM hubble.block_fix + WHERE indexer_id = $1 + AND start_height = $2 + AND end_height <= $2 + ", + indexer_id, + range.end_exclusive as i64, + ) + .execute(tx.as_mut()) + .await?; + + Ok(()) +} + +pub async fn delete_block_status( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, + height: BlockHeight, +) -> sqlx::Result> { + let record = sqlx::query!( + " + DELETE FROM hubble.block_status + WHERE indexer_id = $1 AND height = $2 + RETURNING hash + ", + indexer_id, + height as i64, + ) + .fetch_optional(tx.as_mut()) + .await?; + + Ok(record.map(|r| r.hash as BlockHash)) +} + +pub async fn get_block_status_hash( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, + height: BlockHeight, +) -> sqlx::Result> { + let record = sqlx::query!( + " + SELECT hash FROM hubble.block_status + WHERE indexer_id = $1 AND height = $2 + ", + indexer_id, + height as i64, + ) + .fetch_optional(tx.as_mut()) + .await?; + + Ok(record.map(|r| r.hash as BlockHash)) +} + +pub async fn update_block_status( + tx: &mut sqlx::Transaction<'_, Postgres>, + indexer_id: IndexerId, + height: BlockHeight, + hash: BlockHash, + timestamp: OffsetDateTime, +) -> sqlx::Result<()> { + sqlx::query!( + " + INSERT INTO hubble.block_status (indexer_id, height, hash, timestamp) + VALUES ($1, $2, $3, $4) + ON CONFLICT (indexer_id, height) DO + UPDATE SET + hash = excluded.hash, + timestamp = excluded.timestamp + ", + indexer_id, + height as i64, + hash, + timestamp, + ) + .execute(tx.as_mut()) + .await?; + + Ok(()) +} diff --git a/hubble/src/main.rs b/hubble/src/main.rs index 3c014e9e72..a61b211d40 100644 --- a/hubble/src/main.rs +++ b/hubble/src/main.rs @@ -20,6 +20,7 @@ mod cli; mod consensus; mod eth; mod healthz; +mod indexer; mod logging; mod metrics; mod postgres; diff --git a/hubble/src/postgres.rs b/hubble/src/postgres.rs index d2ea3c07af..14ab0c9539 100644 --- a/hubble/src/postgres.rs +++ b/hubble/src/postgres.rs @@ -235,7 +235,8 @@ where ); let min_height = height.iter().min().expect("at least one height"); - schedule_replication_reset(tx, *chain_id, (*min_height).into(), "block reorg").await?; + schedule_replication_reset(tx, *chain_id, (*min_height).into(), "block reorg (upsert)") + .await?; } } Ok(()) @@ -414,6 +415,32 @@ impl FetchOrCreated { } } +pub async fn fetch_or_insert_chain_id_tx( + tx: &mut sqlx::Transaction<'_, Postgres>, + canonical: String, +) -> sqlx::Result> { + use FetchOrCreated::*; + let db_chain_id = if let Some(chain_id) = sqlx::query!( + "SELECT id FROM \"v0\".chains WHERE chain_id = $1 LIMIT 1", + canonical.to_string() + ) + .fetch_optional(tx.as_mut()) + .await? + { + Fetched(ChainId::new(chain_id.id, canonical.leak())) + } else { + let id = sqlx::query!( + "INSERT INTO \"v0\".chains (chain_id) VALUES ($1) RETURNING id", + canonical.to_string() + ) + .fetch_one(tx.as_mut()) + .await? + .id; + Created(ChainId::new(id, canonical.leak())) + }; + Ok(db_chain_id) +} + pub async fn fetch_or_insert_chain_id<'a, A: Acquire<'a, Database = Postgres>>( db: A, canonical: String, diff --git a/hubble/src/race_client.rs b/hubble/src/race_client.rs index b1bef957f1..75eb936d94 100644 --- a/hubble/src/race_client.rs +++ b/hubble/src/race_client.rs @@ -30,8 +30,11 @@ impl RaceClient { } pub fn fastest(&self) -> &C { - let fastest = self.fastest.load(Ordering::Relaxed); - &self.clients[fastest] + &self.clients[self.fastest_index()] + } + + pub fn fastest_index(&self) -> usize { + self.fastest.load(Ordering::Relaxed) } /// Run the provided closure over the clients, returning the first encountered Ok, or if all error, the first