From 2217cd3ffb701e6beeaf5510632d8a6a10f8fca0 Mon Sep 17 00:00:00 2001 From: Nabarun Gogoi Date: Wed, 26 Jun 2024 17:56:37 +0530 Subject: [PATCH] Support events handlers in multiple data sources for a contract address (#526) * Support processing events in multiple subgraph datasources for a single contract address * Fix parsing event topic in graph-node watcher * Update codegen templates * Fix dummy indexer method in graph-node test * Upgrade package versions to 0.2.102 --- lerna.json | 2 +- packages/cache/package.json | 2 +- packages/cli/package.json | 12 +- packages/codegen/package.json | 4 +- .../codegen/src/data/entities/Contract.yaml | 1 + .../src/templates/indexer-template.handlebars | 36 +++--- .../src/templates/package-template.handlebars | 10 +- packages/graph-node/package.json | 10 +- packages/graph-node/src/loader.ts | 8 +- packages/graph-node/src/watcher.ts | 111 +++++++++-------- packages/graph-node/test/utils/indexer.ts | 2 +- packages/ipld-eth-client/package.json | 6 +- packages/peer/package.json | 2 +- packages/rpc-eth-client/package.json | 8 +- packages/solidity-mapper/package.json | 2 +- packages/test/package.json | 2 +- packages/tracing-client/package.json | 2 +- packages/util/package.json | 8 +- packages/util/src/common.ts | 20 +-- packages/util/src/database.ts | 2 +- packages/util/src/indexer.ts | 115 +++++++++++------- packages/util/src/types.ts | 4 +- 22 files changed, 212 insertions(+), 157 deletions(-) diff --git a/lerna.json b/lerna.json index fdeb12bf0..4e4984258 100644 --- a/lerna.json +++ b/lerna.json @@ -2,7 +2,7 @@ "packages": [ "packages/*" ], - "version": "0.2.101", + "version": "0.2.102", "npmClient": "yarn", "useWorkspaces": true, "command": { diff --git a/packages/cache/package.json b/packages/cache/package.json index 1757a6f51..96695147d 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cache", - "version": "0.2.101", + "version": "0.2.102", "description": "Generic object cache", "main": "dist/index.js", "scripts": { diff --git a/packages/cli/package.json b/packages/cli/package.json index e805dc578..d2fd8dc6e 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/cli", - "version": "0.2.101", + "version": "0.2.102", "main": "dist/index.js", "license": "AGPL-3.0", "scripts": { @@ -15,13 +15,13 @@ }, "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.101", - "@cerc-io/ipld-eth-client": "^0.2.101", + "@cerc-io/cache": "^0.2.102", + "@cerc-io/ipld-eth-client": "^0.2.102", "@cerc-io/libp2p": "^0.42.2-laconic-0.1.4", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.101", - "@cerc-io/rpc-eth-client": "^0.2.101", - "@cerc-io/util": "^0.2.101", + "@cerc-io/peer": "^0.2.102", + "@cerc-io/rpc-eth-client": "^0.2.102", + "@cerc-io/util": "^0.2.102", "@ethersproject/providers": "^5.4.4", "@graphql-tools/utils": "^9.1.1", "@ipld/dag-cbor": "^8.0.0", diff --git a/packages/codegen/package.json b/packages/codegen/package.json index a117cb378..edd7370dc 100644 --- a/packages/codegen/package.json +++ b/packages/codegen/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/codegen", - "version": "0.2.101", + "version": "0.2.102", "description": "Code generator", "private": true, "main": "index.js", @@ -20,7 +20,7 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/util": "^0.2.101", + "@cerc-io/util": "^0.2.102", "@graphql-tools/load-files": "^6.5.2", "@npmcli/package-json": "^5.0.0", "@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git", diff --git a/packages/codegen/src/data/entities/Contract.yaml b/packages/codegen/src/data/entities/Contract.yaml index 60a7ac4da..8eceed687 100644 --- a/packages/codegen/src/data/entities/Contract.yaml +++ b/packages/codegen/src/data/entities/Contract.yaml @@ -2,6 +2,7 @@ className: Contract indexOn: - columns: - address + - kind unique: true columns: - name: id diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 0036e331e..b8a308a57 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -525,23 +525,27 @@ export class Indexer implements IndexerInterface { } {{/if}} - parseEventNameAndArgs (kind: string, logObj: any): { eventParsed: boolean, eventDetails: any } { + parseEventNameAndArgs (watchedContracts: Contract[], logObj: any): { eventParsed: boolean, eventDetails: any } { const { topics, data } = logObj; - - const contract = this._contractMap.get(kind); - assert(contract); - - let logDescription: ethers.utils.LogDescription; - try { - logDescription = contract.parseLog({ data, topics }); - } catch (err) { - // Return if no matching event found - if ((err as Error).message.includes('no matching event')) { - log(`WARNING: Skipping event for contract ${kind} as no matching event found in the ABI`); - return { eventParsed: false, eventDetails: {} }; + let logDescription: ethers.utils.LogDescription | undefined; + + for (const watchedContract of watchedContracts) { + const contract = this._contractMap.get(watchedContract.kind); + assert(contract); + + try { + logDescription = contract.parseLog({ data, topics }); + break; + } catch (err) { + // Continue loop only if no matching event found + if (!((err as Error).message.includes('no matching event'))) { + throw err; + } } + } - throw err; + if (!logDescription) { + return { eventParsed: false, eventDetails: {} }; } const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription); @@ -647,8 +651,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - isWatchedContract (address : string): Contract | undefined { - return this._baseIndexer.isWatchedContract(address); + isContractAddressWatched (address : string): Contract[] | undefined { + return this._baseIndexer.isContractAddressWatched(address); } getWatchedContracts (): Contract[] { diff --git a/packages/codegen/src/templates/package-template.handlebars b/packages/codegen/src/templates/package-template.handlebars index 9913179ee..d0cef9739 100644 --- a/packages/codegen/src/templates/package-template.handlebars +++ b/packages/codegen/src/templates/package-template.handlebars @@ -41,12 +41,12 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.3.19", - "@cerc-io/cli": "^0.2.101", - "@cerc-io/ipld-eth-client": "^0.2.101", - "@cerc-io/solidity-mapper": "^0.2.101", - "@cerc-io/util": "^0.2.101", + "@cerc-io/cli": "^0.2.102", + "@cerc-io/ipld-eth-client": "^0.2.102", + "@cerc-io/solidity-mapper": "^0.2.102", + "@cerc-io/util": "^0.2.102", {{#if (subgraphPath)}} - "@cerc-io/graph-node": "^0.2.101", + "@cerc-io/graph-node": "^0.2.102", {{/if}} "@ethersproject/providers": "^5.4.4", "debug": "^4.3.1", diff --git a/packages/graph-node/package.json b/packages/graph-node/package.json index 0fd237a79..8e7dc8a7a 100644 --- a/packages/graph-node/package.json +++ b/packages/graph-node/package.json @@ -1,10 +1,10 @@ { "name": "@cerc-io/graph-node", - "version": "0.2.101", + "version": "0.2.102", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { - "@cerc-io/solidity-mapper": "^0.2.101", + "@cerc-io/solidity-mapper": "^0.2.102", "@ethersproject/providers": "^5.4.4", "@graphprotocol/graph-ts": "^0.22.0", "@nomiclabs/hardhat-ethers": "^2.0.2", @@ -51,9 +51,9 @@ "dependencies": { "@apollo/client": "^3.3.19", "@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2", - "@cerc-io/cache": "^0.2.101", - "@cerc-io/ipld-eth-client": "^0.2.101", - "@cerc-io/util": "^0.2.101", + "@cerc-io/cache": "^0.2.102", + "@cerc-io/ipld-eth-client": "^0.2.102", + "@cerc-io/util": "^0.2.102", "@types/json-diff": "^0.5.2", "@types/yargs": "^17.0.0", "bn.js": "^4.11.9", diff --git a/packages/graph-node/src/loader.ts b/packages/graph-node/src/loader.ts index cae400bc9..ccc52c75d 100644 --- a/packages/graph-node/src/loader.ts +++ b/packages/graph-node/src/loader.ts @@ -50,6 +50,7 @@ export interface Context { rpcSupportsBlockHashParam: boolean; block?: Block; contractAddress?: string; + dataSourceName?: string; } const log = debug('vulcanize:graph-node'); @@ -719,13 +720,14 @@ export const instantiate = async ( }, 'dataSource.context': async () => { assert(context.contractAddress); - const contract = indexer.isWatchedContract(context.contractAddress); + const watchedContracts = indexer.isContractAddressWatched(context.contractAddress); + const dataSourceContract = watchedContracts?.find(contract => contract.kind === context.dataSourceName); - if (!contract) { + if (!dataSourceContract) { return null; } - return database.toGraphContext(instanceExports, contract.context); + return database.toGraphContext(instanceExports, dataSourceContract.context); }, 'dataSource.network': async () => { assert(dataSource); diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 26e13a9c3..c2d3217f3 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -169,7 +169,6 @@ export class GraphWatcher { async addContracts () { assert(this._indexer); assert(this._indexer.watchContract); - assert(this._indexer.isWatchedContract); // Watching the contract(s) if not watched already. for (const dataSource of this._dataSources) { @@ -177,7 +176,7 @@ export class GraphWatcher { // Skip for templates as they are added dynamically. if (address) { - const watchedContract = await this._indexer.isWatchedContract(address); + const watchedContract = this._indexer.isContractAddressWatched(address); if (!watchedContract) { await this._indexer.watchContract(address, name, true, startBlock); @@ -197,64 +196,79 @@ export class GraphWatcher { const blockData = this._context.block; assert(blockData); - assert(this._indexer && this._indexer.isWatchedContract); - const watchedContract = this._indexer.isWatchedContract(contract); - assert(watchedContract); + assert(this._indexer); + const watchedContracts = this._indexer.isContractAddressWatched(contract); + assert(watchedContracts); - // Get dataSource in subgraph yaml based on contract address. - const dataSource = this._dataSources.find(dataSource => dataSource.name === watchedContract.kind); + // Get dataSources in subgraph yaml based on contract kind (same as dataSource.name) + const dataSources = this._dataSources + .filter(dataSource => watchedContracts.some(contract => contract.kind === dataSource.name)); - if (!dataSource) { + if (!dataSources.length) { log(`Subgraph doesn't have configuration for contract ${contract}`); return; } - this._context.contractAddress = contract; + for (const dataSource of dataSources) { + this._context.contractAddress = contract; + this._context.dataSourceName = dataSource.name; - const { instance, contractInterface } = this._dataSourceMap[watchedContract.kind]; - assert(instance); - const { exports: instanceExports } = instance; - - // Get event handler based on event topic (from event signature). - const eventTopic = contractInterface.getEventTopic(eventSignature); - const eventHandler = dataSource.mapping.eventHandlers.find((eventHandler: any) => { - // The event signature we get from logDescription is different than that given in the subgraph yaml file. - // For eg. event in subgraph.yaml: Stake(indexed address,uint256); from logDescription: Stake(address,uint256) - // ethers.js doesn't recognize the subgraph event signature with indexed keyword before param type. - // Match event topics from cleaned subgraph event signature (Stake(indexed address,uint256) -> Stake(address,uint256)). - const subgraphEventTopic = contractInterface.getEventTopic(eventHandler.event.replace(/indexed /g, '')); - - return subgraphEventTopic === eventTopic; - }); + const { instance, contractInterface } = this._dataSourceMap[dataSource.name]; + assert(instance); + const { exports: instanceExports } = instance; + let eventTopic: string; + + try { + eventTopic = contractInterface.getEventTopic(eventSignature); + } catch (err) { + // Continue loop only if no matching event found + if (!((err as Error).message.includes('no matching event'))) { + throw err; + } - if (!eventHandler) { - log(`No handler configured in subgraph for event ${eventSignature}`); - return; - } + continue; + } - const eventFragment = contractInterface.getEvent(eventSignature); + // Get event handler based on event topic (from event signature). + const eventHandler = dataSource.mapping.eventHandlers.find((eventHandler: any) => { + // The event signature we get from logDescription is different than that given in the subgraph yaml file. + // For eg. event in subgraph.yaml: Stake(indexed address,uint256); from logDescription: Stake(address,uint256) + // ethers.js doesn't recognize the subgraph event signature with indexed keyword before param type. + // Match event topics from cleaned subgraph event signature (Stake(indexed address,uint256) -> Stake(address,uint256)). + const subgraphEventTopic = contractInterface.getEventTopic(eventHandler.event.replace(/indexed /g, '')); - const tx = this._getTransactionData(txHash, extraData.ethFullTransactions); + return subgraphEventTopic === eventTopic; + }); - const data = { - block: blockData, - inputs: eventFragment.inputs, - event, - tx, - eventIndex - }; + if (!eventHandler) { + log(`No handler configured in subgraph for event ${eventSignature}`); + return; + } - // Create ethereum event to be passed to the wasm event handler. - console.time(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`); - const ethereumEvent = await createEvent(instanceExports, contract, data); - console.timeEnd(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`); - try { - console.time(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`); - await this._handleMemoryError(instanceExports[eventHandler.handler](ethereumEvent), dataSource.name); - console.timeEnd(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`); - } catch (error) { - this._clearCachedEntities(); - throw error; + const eventFragment = contractInterface.getEvent(eventSignature); + + const tx = this._getTransactionData(txHash, extraData.ethFullTransactions); + + const data = { + block: blockData, + inputs: eventFragment.inputs, + event, + tx, + eventIndex + }; + + // Create ethereum event to be passed to the wasm event handler. + console.time(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`); + const ethereumEvent = await createEvent(instanceExports, contract, data); + console.timeEnd(`time:graph-watcher#handleEvent-createEvent-block-${block.number}-event-${eventSignature}`); + try { + console.time(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`); + await this._handleMemoryError(instanceExports[eventHandler.handler](ethereumEvent), dataSource.name); + console.timeEnd(`time:graph-watcher#handleEvent-exec-${dataSource.name}-event-handler-${eventSignature}`); + } catch (error) { + this._clearCachedEntities(); + throw error; + } } } @@ -311,6 +325,7 @@ export class GraphWatcher { for (const contractAddress of contractAddressList) { this._context.contractAddress = contractAddress; + this._context.dataSourceName = dataSource.name; // Call all the block handlers one after another for a contract. const blockHandlerPromises = dataSource.mapping.blockHandlers.map(async (blockHandler: any): Promise => { diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index e3eda96bc..660aba1e5 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -248,7 +248,7 @@ export class Indexer implements IndexerInterface { return undefined; } - isWatchedContract (address : string): ContractInterface | undefined { + isContractAddressWatched (address : string): ContractInterface[] | undefined { return undefined; } diff --git a/packages/ipld-eth-client/package.json b/packages/ipld-eth-client/package.json index c2ac0afc6..759aef692 100644 --- a/packages/ipld-eth-client/package.json +++ b/packages/ipld-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/ipld-eth-client", - "version": "0.2.101", + "version": "0.2.102", "description": "IPLD ETH Client", "main": "dist/index.js", "scripts": { @@ -20,8 +20,8 @@ "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { "@apollo/client": "^3.7.1", - "@cerc-io/cache": "^0.2.101", - "@cerc-io/util": "^0.2.101", + "@cerc-io/cache": "^0.2.102", + "@cerc-io/util": "^0.2.102", "cross-fetch": "^3.1.4", "debug": "^4.3.1", "ethers": "^5.4.4", diff --git a/packages/peer/package.json b/packages/peer/package.json index a9028c4e7..30fda3c2f 100644 --- a/packages/peer/package.json +++ b/packages/peer/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/peer", - "version": "0.2.101", + "version": "0.2.102", "description": "libp2p module", "main": "dist/index.js", "exports": "./dist/index.js", diff --git a/packages/rpc-eth-client/package.json b/packages/rpc-eth-client/package.json index e478ba172..e4437ce24 100644 --- a/packages/rpc-eth-client/package.json +++ b/packages/rpc-eth-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/rpc-eth-client", - "version": "0.2.101", + "version": "0.2.102", "description": "RPC ETH Client", "main": "dist/index.js", "scripts": { @@ -19,9 +19,9 @@ }, "homepage": "https://github.com/cerc-io/watcher-ts#readme", "dependencies": { - "@cerc-io/cache": "^0.2.101", - "@cerc-io/ipld-eth-client": "^0.2.101", - "@cerc-io/util": "^0.2.101", + "@cerc-io/cache": "^0.2.102", + "@cerc-io/ipld-eth-client": "^0.2.102", + "@cerc-io/util": "^0.2.102", "chai": "^4.3.4", "ethers": "^5.4.4", "left-pad": "^1.3.0", diff --git a/packages/solidity-mapper/package.json b/packages/solidity-mapper/package.json index 492311277..4e54fe2c9 100644 --- a/packages/solidity-mapper/package.json +++ b/packages/solidity-mapper/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/solidity-mapper", - "version": "0.2.101", + "version": "0.2.102", "main": "dist/index.js", "license": "AGPL-3.0", "devDependencies": { diff --git a/packages/test/package.json b/packages/test/package.json index 9c2c98374..5fe1b9731 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/test", - "version": "0.2.101", + "version": "0.2.102", "main": "dist/index.js", "license": "AGPL-3.0", "private": true, diff --git a/packages/tracing-client/package.json b/packages/tracing-client/package.json index f7ad182f3..9df9cf149 100644 --- a/packages/tracing-client/package.json +++ b/packages/tracing-client/package.json @@ -1,6 +1,6 @@ { "name": "@cerc-io/tracing-client", - "version": "0.2.101", + "version": "0.2.102", "description": "ETH VM tracing client", "main": "dist/index.js", "scripts": { diff --git a/packages/util/package.json b/packages/util/package.json index 568a96a18..677032ed0 100644 --- a/packages/util/package.json +++ b/packages/util/package.json @@ -1,13 +1,13 @@ { "name": "@cerc-io/util", - "version": "0.2.101", + "version": "0.2.102", "main": "dist/index.js", "license": "AGPL-3.0", "dependencies": { "@apollo/utils.keyvaluecache": "^1.0.1", "@cerc-io/nitro-node": "^0.1.15", - "@cerc-io/peer": "^0.2.101", - "@cerc-io/solidity-mapper": "^0.2.101", + "@cerc-io/peer": "^0.2.102", + "@cerc-io/solidity-mapper": "^0.2.102", "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "@ethersproject/properties": "^5.7.0", "@ethersproject/providers": "^5.4.4", @@ -54,7 +54,7 @@ "yargs": "^17.0.1" }, "devDependencies": { - "@cerc-io/cache": "^0.2.101", + "@cerc-io/cache": "^0.2.102", "@nomiclabs/hardhat-waffle": "^2.0.1", "@types/bunyan": "^1.8.8", "@types/express": "^4.17.14", diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 9933a939b..049ff22a2 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -12,7 +12,7 @@ import { UNKNOWN_EVENT_NAME } from './constants'; import { JobQueue } from './job-queue'; -import { BlockProgressInterface, IndexerInterface, EventInterface, EthFullTransaction, EthFullBlock } from './types'; +import { BlockProgressInterface, IndexerInterface, EventInterface, EthFullTransaction, EthFullBlock, ContractInterface } from './types'; import { wait } from './misc'; import { OrderDirection } from './database'; import { JobQueueConfig } from './config'; @@ -242,14 +242,14 @@ const _processEvents = async ( // throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); // } - const watchedContract = indexer.isWatchedContract(event.contract); + const watchedContracts = indexer.isContractAddressWatched(event.contract); - if (watchedContract) { + if (watchedContracts) { // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. if (event.eventName === UNKNOWN_EVENT_NAME) { // Parse the unknown event and save updated event to the db - const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContract.kind); + const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContracts); if (eventParsed) { updatedDbEvents.push(parsedEvent); @@ -353,14 +353,14 @@ const _processEventsInSubgraphOrder = async ( // Parse events of initially unwatched contracts for (const event of unwatchedContractEvents) { - const watchedContract = indexer.isWatchedContract(event.contract); + const watchedContracts = indexer.isContractAddressWatched(event.contract); - if (watchedContract) { + if (watchedContracts) { // We might not have parsed this event yet. This can happen if the contract was added // as a result of a previous event in the same block. if (event.eventName === UNKNOWN_EVENT_NAME) { // Parse the unknown event and save updated event to the db - const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContract.kind); + const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContracts); if (eventParsed) { updatedDbEvents.push(parsedEvent); @@ -397,12 +397,14 @@ const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eve ); }; -const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, contractKind: string): { eventParsed: boolean, event: EventInterface } => { +const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, watchedContracts: ContractInterface[]): { eventParsed: boolean, event: EventInterface } => { const logObj = JSONbigNative.parse(event.extraInfo); assert(indexer.parseEventNameAndArgs); - const { eventParsed, eventDetails: { eventName, eventInfo, eventSignature } } = indexer.parseEventNameAndArgs(contractKind, logObj); + const { eventParsed, eventDetails: { eventName, eventInfo, eventSignature } } = indexer.parseEventNameAndArgs(watchedContracts, logObj); if (!eventParsed) { + // Skip unparsable events + log(`WARNING: Skipping event for contract ${event.contract} as no matching event found in the ABI`); return { eventParsed: false, event }; } diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 753a5a3a3..2fcd642fd 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -665,7 +665,7 @@ export class Database { async saveContract (repo: Repository, address: string, kind: string, checkpoint: boolean, startingBlock: number, context?: any): Promise { const contract = await repo .createQueryBuilder() - .where('address = :address', { address }) + .where('address = :address AND kind = :kind', { address, kind }) .getOne(); const entity = repo.create({ address, kind, checkpoint, startingBlock, context }); diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index fdd1f4384..395ba187c 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -117,7 +117,7 @@ export class Indexer { _ethProvider: ethers.providers.JsonRpcProvider; _jobQueue: JobQueue; - _watchedContracts: { [key: string]: ContractInterface } = {}; + _watchedContractsByAddressMap: { [key: string]: ContractInterface[] } = {}; _stateStatusMap: { [key: string]: StateStatus } = {}; _currentEndpointIndex = { @@ -203,8 +203,12 @@ export class Indexer { const contracts = await this._db.getContracts(); - this._watchedContracts = contracts.reduce((acc: { [key: string]: ContractInterface }, contract) => { - acc[contract.address] = contract; + this._watchedContractsByAddressMap = contracts.reduce((acc: { [key: string]: ContractInterface[] }, contract) => { + if (!acc[contract.address]) { + acc[contract.address] = []; + } + + acc[contract.address].push(contract); return acc; }, {}); @@ -441,7 +445,7 @@ export class Indexer { toBlock: number, eventSignaturesMap: Map, parseEventNameAndArgs: ( - kind: string, + watchedContracts: ContractInterface[], logObj: { topics: string[]; data: string } ) => { eventParsed: boolean, eventDetails: any } ): Promise<{ @@ -553,7 +557,7 @@ export class Indexer { async fetchEvents ( blockHash: string, blockNumber: number, eventSignaturesMap: Map, - parseEventNameAndArgs: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any } + parseEventNameAndArgs: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any } ): Promise<{ events: DeepPartial[], transactions: EthFullTransaction[]}> { const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics); @@ -572,7 +576,7 @@ export class Indexer { blockHash: string, blockNumber: number, addresses: string[], eventSignaturesMap: Map, - parseEventNameAndArgs: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any } + parseEventNameAndArgs: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any } ): Promise[]> { const { topics } = this._createLogsFilters(eventSignaturesMap); const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics); @@ -618,7 +622,7 @@ export class Indexer { createDbEventsFromLogsAndTxs ( blockHash: string, logs: any, transactions: any, - parseEventNameAndArgs: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any } + parseEventNameAndArgs: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any } ): DeepPartial[] { const transactionMap: {[key: string]: any} = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; @@ -665,12 +669,13 @@ export class Indexer { const extraInfo: { [key: string]: any } = { topics, data, tx, logIndex }; const contract = ethers.utils.getAddress(address); - const watchedContract = this.isWatchedContract(contract); + const watchedContracts = this.isContractAddressWatched(contract); - if (watchedContract) { - const { eventParsed, eventDetails } = parseEventNameAndArgs(watchedContract.kind, logObj); + if (watchedContracts) { + const { eventParsed, eventDetails } = parseEventNameAndArgs(watchedContracts, logObj); if (!eventParsed) { // Skip unparsable events + log(`WARNING: Skipping event for contract ${contract} as no matching event found in ABI`); continue; } @@ -856,19 +861,22 @@ export class Indexer { return this._db.getEventsInRange(fromBlockNumber, toBlockNumber); } - isWatchedContract (address : string): ContractInterface | undefined { - return this._watchedContracts[address]; + isContractAddressWatched (address : string): ContractInterface[] | undefined { + return this._watchedContractsByAddressMap[address]; } getContractsByKind (kind: string): ContractInterface[] { - const watchedContracts = Object.values(this._watchedContracts) - .filter(contract => contract.kind === kind); + const watchedContracts = Object.values(this._watchedContractsByAddressMap) + .reduce( + (acc, contracts) => acc.concat(contracts.filter(contract => contract.kind === kind)), + [] + ); return watchedContracts; } getWatchedContracts (): ContractInterface[] { - return Object.values(this._watchedContracts); + return Object.values(this._watchedContractsByAddressMap).flat(); } async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number, context?: any): Promise { @@ -902,7 +910,19 @@ export class Indexer { } cacheContract (contract: ContractInterface): void { - this._watchedContracts[contract.address] = contract; + if (!this._watchedContractsByAddressMap[contract.address]) { + this._watchedContractsByAddressMap[contract.address] = []; + } + + // Check if contract with kind is already cached and skip + const isAlreadyCached = this._watchedContractsByAddressMap[contract.address] + .some(watchedContract => contract.id === watchedContract.id); + + if (isAlreadyCached) { + return; + } + + this._watchedContractsByAddressMap[contract.address].push(contract); } async getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise { @@ -940,7 +960,7 @@ export class Indexer { } // Get all the contracts. - const contracts = Object.values(this._watchedContracts); + const [contracts] = Object.values(this._watchedContractsByAddressMap); // Getting the block for checkpoint. const block = await this.getBlockProgress(blockHash); @@ -994,10 +1014,11 @@ export class Indexer { } // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); + const watchedContracts = this._watchedContractsByAddressMap[contractAddress]; + assert(watchedContracts, `Contract ${contractAddress} not watched`); + const [firstWatchedContract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock); - if (block.blockNumber < contract.startingBlock) { + if (block.blockNumber < firstWatchedContract.startingBlock) { return; } @@ -1016,10 +1037,13 @@ export class Indexer { } // Get all the contracts. - const contracts = Object.values(this._watchedContracts); + const watchedContractsByAddress = Object.values(this._watchedContractsByAddressMap); // Create an initial state for each contract. - for (const contract of contracts) { + for (const watchedContracts of watchedContractsByAddress) { + // Get the first watched contract + const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock); + // Check if contract has checkpointing on. if (contract.checkpoint) { // Check if starting block not reached yet. @@ -1064,8 +1088,9 @@ export class Indexer { assert(block); // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); + const watchedContracts = this._watchedContractsByAddressMap[contractAddress]; + assert(watchedContracts, `Contract ${contractAddress} not watched`); + const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock); if (block.blockNumber < contract.startingBlock) { return; @@ -1100,8 +1125,9 @@ export class Indexer { } // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); + const watchedContracts = this._watchedContractsByAddressMap[contractAddress]; + assert(watchedContracts, `Contract ${contractAddress} not watched`); + const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock); if (block.blockNumber < contract.startingBlock) { return; @@ -1138,8 +1164,9 @@ export class Indexer { } // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); + const watchedContracts = this._watchedContractsByAddressMap[contractAddress]; + assert(watchedContracts, `Contract ${contractAddress} not watched`); + const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock); if (currentBlock.blockNumber < contract.startingBlock) { return; @@ -1341,16 +1368,16 @@ export class Indexer { return; } - const contracts = Object.values(this._watchedContracts); + const contractAddresses = Object.keys(this._watchedContractsByAddressMap); // TODO: Fire a single query for all contracts. - for (const contract of contracts) { - const initState = await this._db.getLatestState(contract.address, StateKind.Init); - const diffState = await this._db.getLatestState(contract.address, StateKind.Diff); - const diffStagedState = await this._db.getLatestState(contract.address, StateKind.DiffStaged); - const checkpointState = await this._db.getLatestState(contract.address, StateKind.Checkpoint); + for (const contractAddress of contractAddresses) { + const initState = await this._db.getLatestState(contractAddress, StateKind.Init); + const diffState = await this._db.getLatestState(contractAddress, StateKind.Diff); + const diffStagedState = await this._db.getLatestState(contractAddress, StateKind.DiffStaged); + const checkpointState = await this._db.getLatestState(contractAddress, StateKind.Checkpoint); - this._stateStatusMap[contract.address] = { + this._stateStatusMap[contractAddress] = { init: initState?.block.blockNumber, diff: diffState?.block.blockNumber, diff_staged: diffStagedState?.block.blockNumber, @@ -1372,7 +1399,7 @@ export class Indexer { } await this._db.deleteEntitiesByConditions(dbTx, 'contract', { startingBlock: MoreThan(blockNumber) }); - this._clearWatchedContracts((watchedContracts) => watchedContracts.startingBlock > blockNumber); + this._clearWatchedContracts((watchedContract) => watchedContract.startingBlock > blockNumber); await this._db.deleteEntitiesByConditions(dbTx, 'block_progress', { blockNumber: MoreThan(blockNumber) }); @@ -1414,7 +1441,7 @@ export class Indexer { } } - async clearProcessedBlockData (block: BlockProgressInterface, entities: EntityTarget<{ blockNumber: number }>[]): Promise { + async clearProcessedBlockData (block: BlockProgressInterface, entities: EntityTarget<{ blockHash: string }>[]): Promise { const dbTx = await this._db.createTransactionRunner(); try { @@ -1434,11 +1461,15 @@ export class Indexer { } } - _clearWatchedContracts (removFilter: (watchedContract: ContractInterface) => boolean): void { - this._watchedContracts = Object.values(this._watchedContracts) - .filter(watchedContract => !removFilter(watchedContract)) - .reduce((acc: {[key: string]: ContractInterface}, watchedContract) => { - acc[watchedContract.address] = watchedContract; + _clearWatchedContracts (removeFilter: (watchedContract: ContractInterface) => boolean): void { + this._watchedContractsByAddressMap = Object.entries(this._watchedContractsByAddressMap) + .map(([address, watchedContracts]): [string, ContractInterface[]] => [ + address, + watchedContracts.filter(watchedContract => !removeFilter(watchedContract)) + ]) + .filter(([, watchedContracts]) => watchedContracts.length) + .reduce((acc: {[key: string]: ContractInterface[]}, [address, watchedContracts]) => { + acc[address] = watchedContracts; return acc; }, {}); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 479433669..2d677a093 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -202,8 +202,8 @@ export interface IndexerInterface { saveEventEntity (dbEvent: EventInterface): Promise saveEvents (dbEvents: DeepPartial[]): Promise processEvent (event: EventInterface, extraData: ExtraEventData): Promise - parseEventNameAndArgs?: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any } - isWatchedContract: (address: string) => ContractInterface | undefined; + parseEventNameAndArgs?: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any } + isContractAddressWatched: (address: string) => ContractInterface[] | undefined; getWatchedContracts: () => ContractInterface[] getContractsByKind?: (kind: string) => ContractInterface[] addContracts?: () => Promise