From ed1919dd350dc408349794a3072d18934a1b471e Mon Sep 17 00:00:00 2001 From: David Boreham Date: Fri, 26 Jan 2024 10:16:15 -0700 Subject: [PATCH 1/3] Accommodate Filecoin EVM null blocks --- chain/arweave/src/chain.rs | 5 +- chain/cosmos/src/chain.rs | 5 +- chain/ethereum/src/adapter.rs | 25 +-- chain/ethereum/src/chain.rs | 13 +- chain/ethereum/src/ethereum_adapter.rs | 151 +++++++++++------- chain/near/src/chain.rs | 5 +- chain/substreams/src/trigger.rs | 5 +- graph/src/blockchain/block_stream.rs | 13 +- graph/src/blockchain/mock.rs | 7 +- graph/src/blockchain/polling_block_stream.rs | 33 +++- graph/src/components/store/traits.rs | 8 +- node/src/manager/commands/chain.rs | 4 +- store/postgres/src/chain_store.rs | 118 ++++++++++---- store/test-store/tests/postgres/chain_head.rs | 14 +- tests/src/fixture/mod.rs | 5 +- 15 files changed, 284 insertions(+), 127 deletions(-) diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index d371bbe7c9c..6a446244db8 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use graph::anyhow; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; @@ -188,7 +190,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -237,6 +239,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index 6ebd291a269..5e0e052856f 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; use graph::blockchain::BlockIngestor; use graph::prelude::MetricsRegistry; @@ -183,6 +185,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -192,7 +195,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 18b3432b03b..3a23b93d24b 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::Error; use ethabi::{Error as ABIError, Function, ParamType, Token}; use futures::Future; @@ -883,14 +885,8 @@ pub trait EthereumAdapter: Send + Sync + 'static { block: LightEthereumBlock, ) -> Pin> + Send>>; - /// Load block pointer for the specified `block number`. - fn block_pointer_from_number( - &self, - logger: &Logger, - block_number: BlockNumber, - ) -> Box + Send>; - - /// Find a block by its number, according to the Ethereum node. + /// Find a block by its number, according to the Ethereum node. If `retries` is passed, limits + /// the number of attempts. /// /// Careful: don't use this function without considering race conditions. /// Chain reorgs could happen at any time, and could affect the answer received. @@ -902,9 +898,20 @@ pub trait EthereumAdapter: Send + Sync + 'static { fn block_hash_by_block_number( &self, logger: &Logger, - block_number: BlockNumber, + block_number: BlockNumber ) -> Box, Error = Error> + Send>; + /// Finds the hash and number of the lowest non-null block with height greater than or equal to + /// the given number. + /// + /// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must + /// also be considered for the resolved block, in case it is higher than the requested number. + async fn nearest_block_hash_to_number( + &self, + logger: &Logger, + block_number: BlockNumber + ) -> Result; + /// Call the function of a smart contract. fn contract_call( &self, diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index c3ad5ca6861..40418260376 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::{anyhow, bail, Result}; use anyhow::{Context, Error}; use graph::blockchain::client::ChainClient; @@ -410,9 +412,9 @@ impl Blockchain for Chain { .clone(); adapter - .block_pointer_from_number(logger, number) - .compat() + .nearest_block_hash_to_number(logger, number) .await + .map_err(From::from) } } } @@ -594,7 +596,7 @@ impl TriggersAdapterTrait for TriggersAdapter { from: BlockNumber, to: BlockNumber, filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { blocks_with_triggers( self.chain_client.rpc()?.cheapest_with(&self.capabilities)?, self.logger.clone(), @@ -628,7 +630,7 @@ impl TriggersAdapterTrait for TriggersAdapter { BlockFinality::Final(_) => { let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?; let block_number = block.number() as BlockNumber; - let blocks = blocks_with_triggers( + let (blocks, _) = blocks_with_triggers( adapter, logger.clone(), self.chain_store.clone(), @@ -668,11 +670,12 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { let block: Option = self .chain_store .cheap_clone() - .ancestor_block(ptr, offset) + .ancestor_block(ptr, offset, root) .await? .map(json::from_value) .transpose()?; diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 8fca16df899..8b6dafe7850 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use futures::future; use futures::prelude::*; use futures03::{future::BoxFuture, stream::FuturesUnordered}; @@ -42,6 +44,7 @@ use std::iter::FromIterator; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; +// use back_to_the_future::futures_await; use crate::adapter::ProviderStatus; use crate::chain::BlockFinality; @@ -611,6 +614,7 @@ impl EthereumAdapter { stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| { let web3 = web3.clone(); retry(format!("load block ptr {}", block_num), &logger) + .when(|res| !res.is_ok() && !detect_null_block(res)) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -630,9 +634,17 @@ impl EthereumAdapter { .boxed() .compat() .from_err() + .then(|res| { + if detect_null_block(&res) { + Ok(None) + } else { + Some(res).transpose() + } + }) })) - .buffered(ENV_VARS.block_batch_size) - .map(|b| b.into()) + .buffered(ENV_VARS.block_batch_size) + .filter_map(|b| b) + .map(|b| b.into()) } /// Check if `block_ptr` refers to a block that is on the main chain, according to the Ethereum @@ -650,13 +662,12 @@ impl EthereumAdapter { logger: &Logger, block_ptr: BlockPtr, ) -> Result { - let block_hash = self - .block_hash_by_block_number(logger, block_ptr.number) - .compat() + // TODO: This considers null blocks, but we could instead bail if we encounter one as a + // small optimization. + let canonical_block = self + .nearest_block_hash_to_number(logger, block_ptr.number) .await?; - block_hash - .ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number)) - .map(|block_hash| block_hash == block_ptr.hash_as_h256()) + Ok(canonical_block == block_ptr) } pub(crate) fn logs_in_block_range( @@ -814,7 +825,17 @@ impl EthereumAdapter { }) .await?, ) - .map_err(Error::msg) + .map_err(Error::msg) + } +} + +// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific +// error returned when requesting such a null round. Ideally there should be a defined reponse or +// message for this case, or a check that is less dependent on the Filecoin implementation. +fn detect_null_block(res: &Result) -> bool { + match res { + Ok(_) => false, + Err(e) => e.to_string().contains("requested epoch was a null round") } } @@ -1107,26 +1128,6 @@ impl EthereumAdapterTrait for EthereumAdapter { Box::pin(block_future) } - fn block_pointer_from_number( - &self, - logger: &Logger, - block_number: BlockNumber, - ) -> Box + Send> { - Box::new( - self.block_hash_by_block_number(logger, block_number) - .and_then(move |block_hash_opt| { - block_hash_opt.ok_or_else(|| { - anyhow!( - "Ethereum node could not find start block hash by block number {}", - &block_number - ) - }) - }) - .from_err() - .map(move |block_hash| BlockPtr::from((block_hash, block_number))), - ) - } - fn block_hash_by_block_number( &self, logger: &Logger, @@ -1164,6 +1165,54 @@ impl EthereumAdapterTrait for EthereumAdapter { ) } + async fn nearest_block_hash_to_number( + &self, + logger: &Logger, + block_number: BlockNumber + ) -> Result { + let mut next_number = block_number; + loop { + let retry_log_message = format!( + "eth_getBlockByNumber RPC call for block number {}", + next_number + ); + let web3 = self.web3.clone(); + let logger = logger.clone(); + let res = retry(retry_log_message, &logger) + .when(|res| !res.is_ok() && !detect_null_block(res)) + .no_limit() + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + web3.eth() + .block(BlockId::Number(next_number.into())) + .await + .map(|block_opt| block_opt.and_then(|block| block.hash)) + .map_err(Error::from) + } + }) + .await + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!( + "Ethereum node took too long to return data for block #{}", + next_number + ) + }) + }); + if detect_null_block(&res) { + next_number += 1; + continue; + } + return match res { + Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)), + Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)), + Err(e) => Err(e), + } + } + } + fn contract_call( &self, logger: &Logger, @@ -1304,9 +1353,10 @@ impl EthereumAdapterTrait for EthereumAdapter { } } -/// Returns blocks with triggers, corresponding to the specified range and filters. +/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved +/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block. /// If a block contains no triggers, there may be no corresponding item in the stream. -/// However the `to` block will always be present, even if triggers are empty. +/// However the (resolved) `to` block will always be present, even if triggers are empty. /// /// Careful: don't use this function without considering race conditions. /// Chain reorgs could happen at any time, and could affect the answer received. @@ -1326,7 +1376,7 @@ pub(crate) async fn blocks_with_triggers( to: BlockNumber, filter: &TriggerFilter, unified_api_version: UnifiedMappingApiVersion, -) -> Result>, Error> { +) -> Result<(Vec>, BlockNumber), Error> { // Each trigger filter needs to be queried for the same block range // and the blocks yielded need to be deduped. If any error occurs // while searching for a trigger type, the entire operation fails. @@ -1337,6 +1387,15 @@ pub(crate) async fn blocks_with_triggers( let trigger_futs: FuturesUnordered, anyhow::Error>>> = FuturesUnordered::new(); + // Resolve the nearest non-null "to" block + debug!(logger, "Finding nearest valid `to` block to {}", to); + + let to_ptr = eth + .nearest_block_hash_to_number(&logger, to) + .await?; + let to_hash = to_ptr.hash_as_h256(); + let to = to_ptr.block_number(); + // Scan for Logs if !filter.log.is_empty() { let logs_future = get_logs_and_transactions( @@ -1393,28 +1452,10 @@ pub(crate) async fn blocks_with_triggers( trigger_futs.push(block_future) } - // Get hash for "to" block - let to_hash_fut = eth - .block_hash_by_block_number(&logger, to) - .and_then(|hash| match hash { - Some(hash) => Ok(hash), - None => { - warn!(logger, - "Ethereum endpoint is behind"; - "url" => eth.provider() - ); - bail!("Block {} not found in the chain", to) - } - }) - .compat(); - - // Join on triggers and block hash resolution - let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut); - - // Unpack and handle possible errors in the previously joined futures - let triggers = - triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?; - let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?; + // Join on triggers, unpack and handle possible errors + let triggers = trigger_futs.try_concat() + .await + .with_context(|| format!("Failed to obtain triggers for block {}", to))?; let mut block_hashes: HashSet = triggers.iter().map(EthereumTrigger::block_hash).collect(); @@ -1478,7 +1519,7 @@ pub(crate) async fn blocks_with_triggers( )); } - Ok(blocks) + Ok((blocks, to)) } pub(crate) async fn get_calls( diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index b689493bdd9..3cd2ad25f8b 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use graph::anyhow; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; @@ -232,7 +234,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -306,6 +308,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 01ee2e0a643..e49a22462fb 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use std::{collections::HashMap, str::FromStr, sync::Arc}; use anyhow::Error; @@ -114,6 +116,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { unimplemented!() } @@ -123,7 +126,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { unimplemented!() } diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 301b85f610a..897cb0b4515 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::Error; use async_stream::stream; use futures03::Stream; @@ -232,14 +234,15 @@ impl BlockWithTriggers { #[async_trait] pub trait TriggersAdapter: Send + Sync { - // Return the block that is `offset` blocks before the block pointed to - // by `ptr` from the local cache. An offset of 0 means the block itself, - // an offset of 1 means the block's parent etc. If the block is not in - // the local cache, return `None` + // Return the block that is `offset` blocks before the block pointed to by `ptr` from the local + // cache. An offset of 0 means the block itself, an offset of 1 means the block's parent etc. If + // `root` is passed, short-circuit upon finding a child of `root`. If the block is not in the + // local cache, return `None`. async fn ancestor_block( &self, ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error>; // Returns a sequence of blocks in increasing order of block number. @@ -253,7 +256,7 @@ pub trait TriggersAdapter: Send + Sync { from: BlockNumber, to: BlockNumber, filter: &C::TriggerFilter, - ) -> Result>, Error>; + ) -> Result<(Vec>, BlockNumber), Error>; // Used for reprocessing blocks when creating a data source. async fn triggers_in_block( diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 03cf5903544..2c7f5700e83 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -1,10 +1,12 @@ +// Portions copyright (2023) Vulcanize, Inc. + use crate::{ components::{ link_resolver::LinkResolver, store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator}, }, data::subgraph::UnifiedMappingApiVersion, - prelude::DataSourceTemplateInfo, + prelude::{BlockHash, DataSourceTemplateInfo}, }; use anyhow::Error; use async_trait::async_trait; @@ -190,6 +192,7 @@ impl TriggersAdapter for MockTriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { todo!() } @@ -199,7 +202,7 @@ impl TriggersAdapter for MockTriggersAdapter { _from: crate::components::store::BlockNumber, _to: crate::components::store::BlockNumber, _filter: &C::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { todo!() } diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index daebeef2bd4..231b3c38287 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::Error; use futures03::{stream::Stream, Future, FutureExt}; use std::cmp; @@ -363,22 +365,36 @@ where // 1000 triggers found, 2 per block, range_size = 1000 / 2 = 500 let range_size_upper_limit = max_block_range_size.min(ctx.previous_block_range_size * 10); - let range_size = if ctx.previous_triggers_per_block == 0.0 { + let target_range_size = if ctx.previous_triggers_per_block == 0.0 { range_size_upper_limit } else { (self.target_triggers_per_block_range as f64 / ctx.previous_triggers_per_block) .max(1.0) .min(range_size_upper_limit as f64) as BlockNumber }; - let to = cmp::min(from + range_size - 1, to_limit); + let to = cmp::min(from + target_range_size - 1, to_limit); info!( ctx.logger, "Scanning blocks [{}, {}]", from, to; - "range_size" => range_size + "target_range_size" => target_range_size ); - let blocks = self.adapter.scan_triggers(from, to, &self.filter).await?; + // Update with actually scanned range, to account for any skipped null blocks. + let (blocks, to) = self.adapter.scan_triggers(from, to, &self.filter).await?; + let range_size = to - from + 1; + + // There were no non-null finalized blocks greater than or equal to `to`. + // Retry until we find one. + if to > head_ptr.number - reorg_threshold { + return Ok(ReconciliationStep::Retry) + } + + info!( + ctx.logger, + "Scanned blocks [{}, {}]", from, to; + "range_size" => range_size + ); Ok(ReconciliationStep::ProcessDescendantBlocks( blocks, range_size, @@ -415,7 +431,9 @@ where // In principle this block should be in the store, but we have seen this error for deep // reorgs in ropsten. - let head_ancestor_opt = self.adapter.ancestor_block(head_ptr, offset).await?; + let head_ancestor_opt = self.adapter + .ancestor_block(head_ptr, offset, Some(subgraph_ptr.hash.clone())) + .await?; match head_ancestor_opt { None => { @@ -427,6 +445,11 @@ where Ok(ReconciliationStep::Retry) } Some(head_ancestor) => { + // Check if there was an interceding skipped (null) block. + if head_ancestor.number() != subgraph_ptr.number + 1 { + warn!(ctx.logger, "skipped block detected: {}", subgraph_ptr.number+1); + } + // We stopped one block short, so we'll compare the parent hash to the // subgraph ptr. if head_ancestor.parent_hash().as_ref() == Some(&subgraph_ptr.hash) { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index ad1e25bf63a..153a92b018a 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use web3::types::{Address, H256}; use super::*; @@ -450,14 +452,16 @@ pub trait ChainStore: Send + Sync + 'static { fn blocks(&self, hashes: &[BlockHash]) -> Result, Error>; /// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching - /// `block_hash` and offset=1 means its parent. Returns None if unable to complete due to - /// missing blocks in the chain store. + /// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding + /// a child of `root`. Returns None if unable to complete due to missing blocks in the chain + /// store. /// /// Returns an error if the offset would reach past the genesis block. async fn ancestor_block( self: Arc, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error>; /// Remove old blocks from the cache we maintain in the database and diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 31978ebb7d5..b62e0dcca87 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use std::sync::Arc; use graph::blockchain::BlockPtr; @@ -100,7 +102,7 @@ pub async fn info( let ancestor = match &head_block { None => None, Some(head_block) => chain_store - .ancestor_block(head_block.clone(), offset) + .ancestor_block(head_block.clone(), offset, None) .await? .map(json::from_value::) .transpose()? diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index bf15b964e4c..d2dd068ad72 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; @@ -348,10 +350,10 @@ mod data { create index blocks_number ON {nsp}.blocks using btree(number); create table {nsp}.call_cache ( - id bytea not null primary key, - return_value bytea not null, - contract_address bytea not null, - block_number int4 not null + id bytea not null primary key, + return_value bytea not null, + contract_address bytea not null, + block_number int4 not null ); create index call_cache_block_number_idx ON {nsp}.call_cache(block_number); @@ -844,10 +846,17 @@ mod data { conn: &PgConnection, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { - let data_and_hash = match self { + let short_circuit_predicate = if root.is_some() { + "and b.parent_hash <> $3" + } else { + "" + }; + + let data_and_ptr = match self { Storage::Shared => { - const ANCESTOR_SQL: &str = " + let query = format!(" with recursive ancestors(block_hash, block_offset) as ( values ($1, 0) union all @@ -855,27 +864,49 @@ mod data { from ancestors a, ethereum_blocks b where a.block_hash = b.hash and a.block_offset < $2 + {} ) - select a.block_hash as hash + select a.block_hash as hash, b.number as number from ancestors a - where a.block_offset = $2;"; + inner join ethereum_blocks b on a.block_hash = b.hash + order by a.block_offset desc limit 1", + short_circuit_predicate); + + // type Result = (Text, i32); + #[derive(QueryableByName)] + struct BlockHashAndNumber { + #[sql_type = "Text"] + hash: String, + #[sql_type = "Integer"] + number: i32, + } - let hash = sql_query(ANCESTOR_SQL) - .bind::(block_ptr.hash_hex()) - .bind::(offset as i64) - .get_result::(conn) - .optional()?; + let block = match root { + Some(root) => { + sql_query(query) + .bind::(block_ptr.hash_hex()) + .bind::(offset as i64) + .bind::(root.hash_hex()) + .get_result::(conn) + } + None => { + sql_query(query) + .bind::(block_ptr.hash_hex()) + .bind::(offset as i64) + .get_result::(conn) + } + }.optional()?; use public::ethereum_blocks as b; - match hash { + match block { None => None, - Some(hash) => Some(( + Some(block) => Some(( b::table - .filter(b::hash.eq(&hash.hash)) + .filter(b::hash.eq(&block.hash)) .select(b::data) .first::(conn)?, - BlockHash::from_str(&hash.hash)?, + BlockPtr::new(BlockHash::from_str(&block.hash)?, block.number), )), } } @@ -890,27 +921,49 @@ mod data { from ancestors a, {} b where a.block_hash = b.hash and a.block_offset < $2 + {} ) - select a.block_hash as hash + select a.block_hash as hash, b.number as number from ancestors a - where a.block_offset = $2;", - blocks.qname + inner join ethereum_blocks b on a.block_hash = b.hash + order by a.block_offset desc limit 1", + blocks.qname, + short_circuit_predicate ); - let hash = sql_query(query) - .bind::(block_ptr.hash_slice()) - .bind::(offset as i64) - .get_result::(conn) - .optional()?; - match hash { + #[derive(QueryableByName)] + struct BlockHashAndNumber { + #[sql_type = "Bytea"] + hash: Vec, + #[sql_type = "Integer"] + number: i32, + } + + let block = match root { + Some(root) => { + sql_query(query) + .bind::(block_ptr.hash_slice()) + .bind::(offset as i64) + .bind::(root.as_slice()) + .get_result::(conn) + } + None => { + sql_query(query) + .bind::(block_ptr.hash_slice()) + .bind::(offset as i64) + .get_result::(conn) + } + }.optional()?; + + match block { None => None, - Some(hash) => Some(( + Some(block) => Some(( blocks .table() - .filter(blocks.hash().eq(&hash.hash)) + .filter(blocks.hash().eq(&block.hash)) .select(blocks.data()) .first::(conn)?, - BlockHash::from(hash.hash), + BlockPtr::from((block.hash, block.number)), )), } } @@ -925,13 +978,13 @@ mod data { let data_and_ptr = { use graph::prelude::serde_json::json; - data_and_hash.map(|(data, hash)| { + data_and_ptr.map(|(data, ptr)| { ( match data.get("block") { Some(_) => data, None => json!({ "block": data, "transaction_receipts": [] }), }, - BlockPtr::new(hash, block_ptr.number - offset), + ptr, ) }) }; @@ -1821,6 +1874,7 @@ impl ChainStoreTrait for ChainStore { self: Arc, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { ensure!( block_ptr.number >= offset, @@ -1841,7 +1895,7 @@ impl ChainStoreTrait for ChainStore { .with_conn(move |conn, _| { chain_store .storage - .ancestor_block(conn, block_ptr_clone, offset) + .ancestor_block(conn, block_ptr_clone, offset, root) .map_err(StoreError::from) .map_err(CancelableError::from) }) diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index 612333bc411..2af03ea27dc 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + //! Test ChainStore implementation of Store, in particular, how //! the chain head pointer gets updated in various situations @@ -260,11 +262,11 @@ fn check_ancestor( let act = executor::block_on( store .cheap_clone() - .ancestor_block(child.block_ptr(), offset), + .ancestor_block(child.block_ptr(), offset, None), )? - .map(json::from_value::) - .transpose()? - .ok_or_else(|| anyhow!("block {} has no ancestor at offset {}", child.hash, offset))?; + .map(json::from_value::) + .transpose()? + .ok_or_else(|| anyhow!("block {} has no ancestor at offset {}", child.hash, offset))?; let act_hash = format!("{:x}", act.block.hash.unwrap()); let exp_hash = &exp.hash; @@ -303,12 +305,12 @@ fn ancestor_block_simple() { let res = executor::block_on( store .cheap_clone() - .ancestor_block(BLOCK_FIVE.block_ptr(), offset), + .ancestor_block(BLOCK_FIVE.block_ptr(), offset, None), ); assert!(res.is_err()); } - let block = executor::block_on(store.ancestor_block(BLOCK_TWO_NO_PARENT.block_ptr(), 1))?; + let block = executor::block_on(store.ancestor_block(BLOCK_TWO_NO_PARENT.block_ptr(), 1, None))?; assert!(block.is_none()); Ok(()) }); diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index b249d7b79d0..691d8e71d29 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + pub mod ethereum; use std::marker::PhantomData; @@ -748,6 +750,7 @@ impl TriggersAdapter for MockTriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result::Block>, Error> { todo!() } @@ -757,7 +760,7 @@ impl TriggersAdapter for MockTriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { todo!() } From c1dde6ad8cf309c3ef26e526b9925243945a667f Mon Sep 17 00:00:00 2001 From: David Boreham Date: Fri, 26 Jan 2024 12:56:26 -0700 Subject: [PATCH 2/3] Commit to trigger CI --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 809a86b7a17..8890ce5aba7 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Graph Node + [![Build Status](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml/badge.svg)](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml?query=branch%3Amaster) [![Getting Started Docs](https://img.shields.io/badge/docs-getting--started-brightgreen.svg)](docs/getting-started.md) From f0f1053b2fbebc1f81f8f49d49cf5ca307a94e0b Mon Sep 17 00:00:00 2001 From: David Boreham Date: Fri, 26 Jan 2024 12:58:46 -0700 Subject: [PATCH 3/3] Try to trigger CI on branch --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e8e9c9df317..6020db440eb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,7 @@ name: Continuous Integration on: push: - branches: [master] + branches: [master, handle-null-blocks-fork] pull_request: types: [opened, synchronize, reopened]