Skip to content

Commit

Permalink
Subgraph Composition: Support multiple subgraph datasources
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Feb 4, 2025
1 parent 63ea9d7 commit 14c2762
Show file tree
Hide file tree
Showing 35 changed files with 2,224 additions and 175 deletions.
4 changes: 2 additions & 2 deletions chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use graph::{
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
};
use prost::Message;
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::sync::Arc;

use crate::adapter::TriggerFilter;
Expand Down Expand Up @@ -272,7 +272,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<ArweaveBlock>, Error> {
todo!()
}
Expand Down
4 changes: 2 additions & 2 deletions chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use graph::components::network_provider::ChainName;
use graph::env::EnvVars;
use graph::prelude::MetricsRegistry;
use graph::substreams::Clock;
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::sync::Arc;

Expand Down Expand Up @@ -200,7 +200,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<Block>, Error> {
todo!()
}
Expand Down
19 changes: 9 additions & 10 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use graph::{
},
};
use prost::Message;
use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};
use std::future::Future;
use std::iter::FromIterator;
use std::sync::Arc;
Expand Down Expand Up @@ -747,7 +747,7 @@ pub struct TriggersAdapter {
async fn fetch_unique_blocks_from_cache(
logger: &Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
block_numbers: BTreeSet<BlockNumber>,
) -> (Vec<Arc<ExtendedBlockPtr>>, Vec<i32>) {
// Load blocks from the cache
let blocks_map = chain_store
Expand Down Expand Up @@ -795,7 +795,7 @@ async fn fetch_unique_blocks_from_cache(
async fn load_blocks<F, Fut>(
logger: &Logger,
chain_store: Arc<dyn ChainStore>,
block_numbers: HashSet<BlockNumber>,
block_numbers: BTreeSet<BlockNumber>,
fetch_missing: F,
) -> Result<Vec<BlockFinality>>
where
Expand Down Expand Up @@ -843,7 +843,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
logger: Logger,
block_numbers: HashSet<BlockNumber>,
block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<BlockFinality>> {
match &*self.chain_client {
ChainClient::Firehose(endpoints) => {
Expand Down Expand Up @@ -1200,7 +1200,6 @@ mod tests {
use graph::{slog, tokio};

use super::*;
use std::collections::HashSet;
use std::sync::Arc;

// Helper function to create test blocks
Expand All @@ -1224,7 +1223,7 @@ mod tests {
let block = create_test_block(1, "block1");
chain_store.blocks.insert(1, vec![block.clone()]);

let block_numbers: HashSet<_> = vec![1].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand All @@ -1246,7 +1245,7 @@ mod tests {
.blocks
.insert(1, vec![block1.clone(), block2.clone()]);

let block_numbers: HashSet<_> = vec![1].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand All @@ -1266,7 +1265,7 @@ mod tests {
let block = create_test_block(1, "block1");
chain_store.blocks.insert(1, vec![block.clone()]);

let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1, 2].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand All @@ -1287,7 +1286,7 @@ mod tests {
chain_store.blocks.insert(1, vec![block1.clone()]);
chain_store.blocks.insert(2, vec![block2.clone()]);

let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1, 2].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand Down Expand Up @@ -1316,7 +1315,7 @@ mod tests {
.blocks
.insert(2, vec![block2a.clone(), block2b.clone()]);

let block_numbers: HashSet<_> = vec![1, 2, 3].into_iter().collect();
let block_numbers: BTreeSet<_> = vec![1, 2, 3].into_iter().collect();

let (blocks, missing) =
fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await;
Expand Down
4 changes: 2 additions & 2 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use graph::{
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
};
use prost::Message;
use std::collections::HashSet;
use std::collections::BTreeSet;
use std::sync::Arc;

use crate::adapter::TriggerFilter;
Expand Down Expand Up @@ -328,7 +328,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<Block>> {
unimplemented!()
}
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use graph::{
};
use graph_runtime_wasm::module::ToAscPtr;
use lazy_static::__Deref;
use std::{collections::HashSet, sync::Arc};
use std::{collections::BTreeSet, sync::Arc};

use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges};

Expand Down Expand Up @@ -139,7 +139,7 @@ impl blockchain::TriggersAdapter<Chain> for TriggersAdapter {
async fn load_block_ptrs_by_numbers(
&self,
_logger: Logger,
_block_numbers: HashSet<BlockNumber>,
_block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<Block>, Error> {
unimplemented!()
}
Expand Down
1 change: 1 addition & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ where
.iter()
.map(|handler| handler.entity.clone())
.collect(),
manifest_idx: ds.manifest_idx,
})
.collect::<Vec<_>>();

Expand Down
152 changes: 98 additions & 54 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::Error;
use async_stream::stream;
use futures03::Stream;
use prost_types::Any;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -339,53 +339,127 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
pub async fn blocks_with_subgraph_triggers(
&self,
logger: &Logger,
subgraph_filter: &SubgraphFilter,
filters: &[SubgraphFilter],
range: SubgraphTriggerScanRange<C>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let store = self
.source_subgraph_stores
.get(&subgraph_filter.subgraph)
.ok_or_else(|| anyhow!("Store not found for subgraph: {}", subgraph_filter.subgraph))?;
if filters.is_empty() {
return Err(anyhow!("No subgraph filters provided"));
}

let (blocks, hash_to_entities) = match range {
SubgraphTriggerScanRange::Single(block) => {
let hash_to_entities = self
.fetch_entities_for_filters(filters, block.number(), block.number())
.await?;

(vec![block], hash_to_entities)
}
SubgraphTriggerScanRange::Range(from, to) => {
let hash_to_entities = self.fetch_entities_for_filters(filters, from, to).await?;

// Get block numbers that have entities
let mut block_numbers: BTreeSet<_> = hash_to_entities
.iter()
.flat_map(|(_, entities, _)| entities.keys().copied())
.collect();

// Always include the last block in the range
block_numbers.insert(to);

let blocks = self
.adapter
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
.await?;

let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);
(blocks, hash_to_entities)
}
};

let adapter = self.adapter.clone();
create_subgraph_triggers::<C>(logger.clone(), blocks, hash_to_entities).await
}

async fn fetch_entities_for_filters(
&self,
filters: &[SubgraphFilter],
from: BlockNumber,
to: BlockNumber,
) -> Result<
Vec<(
DeploymentHash,
BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
u32,
)>,
Error,
> {
let futures = filters
.iter()
.filter_map(|filter| {
self.source_subgraph_stores
.get(&filter.subgraph)
.map(|store| {
let store = store.clone();
let schema = store.input_schema();

async move {
let entities =
get_entities_for_range(&store, filter, &schema, from, to).await?;
Ok::<_, Error>((filter.subgraph.clone(), entities, filter.manifest_idx))
}
})
})
.collect::<Vec<_>>();

if futures.is_empty() {
return Ok(Vec::new());
}

scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
futures03::future::try_join_all(futures).await
}
}

fn create_subgraph_trigger_from_entities(
filter: &SubgraphFilter,
subgraph: &DeploymentHash,
entities: Vec<EntitySourceOperation>,
manifest_idx: u32,
) -> Vec<subgraph::TriggerData> {
entities
.into_iter()
.map(|entity| subgraph::TriggerData {
source: filter.subgraph.clone(),
source: subgraph.clone(),
entity,
source_idx: manifest_idx,
})
.collect()
}

async fn create_subgraph_triggers<C: Blockchain>(
logger: Logger,
blocks: Vec<C::Block>,
filter: &SubgraphFilter,
mut entities: BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
subgraph_data: Vec<(
DeploymentHash,
BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
u32,
)>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let logger_clone = logger.cheap_clone();

let blocks: Vec<BlockWithTriggers<C>> = blocks
.into_iter()
.map(|block| {
let block_number = block.number();
let trigger_data = entities
.remove(&block_number)
.map(|e| create_subgraph_trigger_from_entities(filter, e))
.unwrap_or_else(Vec::new);
let mut all_trigger_data = Vec::new();

for (hash, entities, manifest_idx) in subgraph_data.iter() {
if let Some(block_entities) = entities.get(&block_number) {
let trigger_data = create_subgraph_trigger_from_entities(
hash,
block_entities.clone(),
*manifest_idx,
);
all_trigger_data.extend(trigger_data);
}
}

BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger_clone)
BlockWithTriggers::new_with_subgraph_triggers(block, all_trigger_data, &logger_clone)
})
.collect();

Expand All @@ -397,36 +471,6 @@ pub enum SubgraphTriggerScanRange<C: Blockchain> {
Range(BlockNumber, BlockNumber),
}

async fn scan_subgraph_triggers<C: Blockchain>(
logger: &Logger,
store: &Arc<dyn SourceableStore>,
adapter: &Arc<dyn TriggersAdapter<C>>,
schema: &InputSchema,
filter: &SubgraphFilter,
range: SubgraphTriggerScanRange<C>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
match range {
SubgraphTriggerScanRange::Single(block) => {
let entities =
get_entities_for_range(store, filter, schema, block.number(), block.number())
.await?;
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
}
SubgraphTriggerScanRange::Range(from, to) => {
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
// Ensure the 'to' block is included in the block_numbers
block_numbers.insert(to);

let blocks = adapter
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
.await?;

create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum EntityOperationKind {
Create,
Expand Down Expand Up @@ -474,11 +518,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
to: BlockNumber,
filter: &Arc<TriggerFilterWrapper<C>>,
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
if !filter.subgraph_filter.is_empty() {
let blocks_with_triggers = self
.blocks_with_subgraph_triggers(
logger,
subgraph_filter,
&filter.subgraph_filter,
SubgraphTriggerScanRange::Range(from, to),
)
.await?;
Expand All @@ -504,11 +548,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
"block_hash" => block.hash().hash_hex(),
);

if let Some(subgraph_filter) = filter.subgraph_filter.first() {
if !filter.subgraph_filter.is_empty() {
let blocks_with_triggers = self
.blocks_with_subgraph_triggers(
logger,
subgraph_filter,
&filter.subgraph_filter,
SubgraphTriggerScanRange::Single(block),
)
.await?;
Expand Down Expand Up @@ -594,7 +638,7 @@ pub trait TriggersAdapter<C: Blockchain>: Send + Sync {
async fn load_block_ptrs_by_numbers(
&self,
logger: Logger,
block_numbers: HashSet<BlockNumber>,
block_numbers: BTreeSet<BlockNumber>,
) -> Result<Vec<C::Block>>;
}

Expand Down
Loading

0 comments on commit 14c2762

Please sign in to comment.