Skip to content

Commit

Permalink
graph, store: Support multiple subgraph datasources
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Dec 16, 2024
1 parent 56f041a commit 14853a5
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 52 deletions.
129 changes: 77 additions & 52 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,30 +339,82 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
pub async fn blocks_with_subgraph_triggers(
&self,
logger: &Logger,
subgraph_filter: &SubgraphFilter,
filters: &[SubgraphFilter],
range: SubgraphTriggerScanRange<C>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let store = self
.source_subgraph_stores
.get(&subgraph_filter.subgraph)
.ok_or_else(|| anyhow!("Store not found for subgraph: {}", subgraph_filter.subgraph))?;
if filters.is_empty() {
return Err(anyhow!("No subgraph filters provided"));
}

let schema = <dyn crate::components::store::SourceableStore>::input_schema(store);
let (blocks, hash_to_entities) = match range {
SubgraphTriggerScanRange::Single(block) => {
let hash_to_entities = self
.fetch_entities_for_filters(filters, block.number(), block.number())
.await?;

let adapter = self.adapter.clone();
(vec![block], hash_to_entities)
}
SubgraphTriggerScanRange::Range(from, to) => {
let hash_to_entities = self.fetch_entities_for_filters(filters, from, to).await?;

let block_numbers: HashSet<BlockNumber> = hash_to_entities
.iter()
.flat_map(|(_, entities)| entities.keys().copied())
.chain(std::iter::once(to))
.collect();

let blocks = self
.adapter
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
.await?;

scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
(blocks, hash_to_entities)
}
};

create_subgraph_triggers::<C>(logger.clone(), blocks, hash_to_entities).await
}

async fn fetch_entities_for_filters(
&self,
filters: &[SubgraphFilter],
from: BlockNumber,
to: BlockNumber,
) -> Result<Vec<(DeploymentHash, BTreeMap<BlockNumber, Vec<EntityWithType>>)>, Error> {
let futures = filters
.iter()
.filter_map(|filter| {
self.source_subgraph_stores
.get(&filter.subgraph)
.map(|store| {
let store = store.clone();
let schema = store.input_schema();

async move {
let entities =
get_entities_for_range(&store, filter, &schema, from, to).await?;
Ok::<_, Error>((filter.subgraph.clone(), entities))
}
})
})
.collect::<Vec<_>>();

if futures.is_empty() {
return Ok(Vec::new());
}

futures03::future::try_join_all(futures).await
}
}

fn create_subgraph_trigger_from_entities(
filter: &SubgraphFilter,
subgraph: &DeploymentHash,
entities: Vec<EntityWithType>,
) -> Vec<subgraph::TriggerData> {
entities
.into_iter()
.map(|entity| subgraph::TriggerData {
source: filter.subgraph.clone(),
source: subgraph.clone(),
entity,
})
.collect()
Expand All @@ -371,21 +423,24 @@ fn create_subgraph_trigger_from_entities(
async fn create_subgraph_triggers<C: Blockchain>(
logger: Logger,
blocks: Vec<C::Block>,
filter: &SubgraphFilter,
mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
subgraph_data: Vec<(DeploymentHash, BTreeMap<BlockNumber, Vec<EntityWithType>>)>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
let logger_clone = logger.cheap_clone();

let blocks: Vec<BlockWithTriggers<C>> = blocks
.into_iter()
.map(|block| {
let block_number = block.number();
let trigger_data = entities
.remove(&block_number)
.map(|e| create_subgraph_trigger_from_entities(filter, e))
.unwrap_or_else(Vec::new);
let mut all_trigger_data = Vec::new();

for (hash, entities) in subgraph_data.iter() {
if let Some(block_entities) = entities.get(&block_number) {
let trigger_data =
create_subgraph_trigger_from_entities(hash, block_entities.clone());
all_trigger_data.extend(trigger_data);
}
}

BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger_clone)
BlockWithTriggers::new_with_subgraph_triggers(block, all_trigger_data, &logger_clone)
})
.collect();

Expand All @@ -397,36 +452,6 @@ pub enum SubgraphTriggerScanRange<C: Blockchain> {
Range(BlockNumber, BlockNumber),
}

async fn scan_subgraph_triggers<C: Blockchain>(
logger: &Logger,
store: &Arc<dyn SourceableStore>,
adapter: &Arc<dyn TriggersAdapter<C>>,
schema: &InputSchema,
filter: &SubgraphFilter,
range: SubgraphTriggerScanRange<C>,
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
match range {
SubgraphTriggerScanRange::Single(block) => {
let entities =
get_entities_for_range(store, filter, schema, block.number(), block.number())
.await?;
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
}
SubgraphTriggerScanRange::Range(from, to) => {
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
// Ensure the 'to' block is included in the block_numbers
block_numbers.insert(to);

let blocks = adapter
.load_block_ptrs_by_numbers(logger.clone(), block_numbers)
.await?;

create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum EntitySubgraphOperation {
Create,
Expand Down Expand Up @@ -474,11 +499,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
to: BlockNumber,
filter: &Arc<TriggerFilterWrapper<C>>,
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
if !filter.subgraph_filter.is_empty() {
let blocks_with_triggers = self
.blocks_with_subgraph_triggers(
logger,
subgraph_filter,
&filter.subgraph_filter,
SubgraphTriggerScanRange::Range(from, to),
)
.await?;
Expand All @@ -504,11 +529,11 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
"block_hash" => block.hash().hash_hex(),
);

if let Some(subgraph_filter) = filter.subgraph_filter.first() {
if !filter.subgraph_filter.is_empty() {
let blocks_with_triggers = self
.blocks_with_subgraph_triggers(
logger,
subgraph_filter,
&filter.subgraph_filter,
SubgraphTriggerScanRange::Single(block),
)
.await?;
Expand Down
72 changes: 72 additions & 0 deletions store/test-store/tests/chain/ethereum/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,78 @@ specVersion: 1.3.0
}
}

#[tokio::test]
async fn multiple_subgraph_ds_manifest() {
let yaml = "
schema:
file:
/: /ipfs/Qmschema
dataSources:
- name: SubgraphSource1
kind: subgraph
entities:
- Gravatar
network: mainnet
source:
address: 'QmSWWT2yrTFDZSL8tRyoHEVrcEKAUsY2hj2TMQDfdDZU8h'
startBlock: 9562480
mapping:
apiVersion: 0.0.6
language: wasm/assemblyscript
entities:
- TestEntity
file:
/: /ipfs/Qmmapping
handlers:
- handler: handleEntity
entity: User
- name: SubgraphSource2
kind: subgraph
entities:
- Profile
network: mainnet
source:
address: 'QmT8B2R7J9yzbZXkqRefmZPkXmE8pCsRKmMj3rGN1Qoe4k'
startBlock: 9562500
mapping:
apiVersion: 0.0.6
language: wasm/assemblyscript
entities:
- TestEntity2
file:
/: /ipfs/Qmmapping
handlers:
- handler: handleProfile
entity: Profile
specVersion: 1.3.0
";

let manifest = resolve_manifest(yaml, SPEC_VERSION_1_3_0).await;

assert_eq!("Qmmanifest", manifest.id.as_str());
assert_eq!(manifest.data_sources.len(), 2);

// Validate first data source
match &manifest.data_sources[0] {
DataSourceEnum::Subgraph(ds) => {
assert_eq!(ds.name, "SubgraphSource1");
assert_eq!(ds.kind, "subgraph");
assert_eq!(ds.source.start_block, 9562480);
}
_ => panic!("Expected a subgraph data source"),
}

// Validate second data source
match &manifest.data_sources[1] {
DataSourceEnum::Subgraph(ds) => {
assert_eq!(ds.name, "SubgraphSource2");
assert_eq!(ds.kind, "subgraph");
assert_eq!(ds.source.start_block, 9562500);
}
_ => panic!("Expected a subgraph data source"),
}
}

#[tokio::test]
async fn graft_manifest() {
const YAML: &str = "
Expand Down

0 comments on commit 14853a5

Please sign in to comment.