diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 02bc3d534..f36020978 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -154,6 +154,10 @@ export class Indexer implements IPLDIndexerInterface { this._populateRelationsMap(); } + get serverConfig () { + return this._serverConfig; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchIPLDStatus(); diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index c162e1405..5091feb62 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -191,6 +191,10 @@ export class Indexer implements IPLDIndexerInterface { this._populateRelationsMap(); } + get serverConfig () { + return this._serverConfig; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchIPLDStatus(); diff --git a/packages/erc20-watcher/src/cli/reset-cmds/state.ts b/packages/erc20-watcher/src/cli/reset-cmds/state.ts index 54dc8ce40..6e648bd05 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/state.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/state.ts @@ -43,7 +43,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/erc20-watcher/src/cli/watch-contract.ts b/packages/erc20-watcher/src/cli/watch-contract.ts index 649478d58..c218a790a 100644 --- a/packages/erc20-watcher/src/cli/watch-contract.ts +++ b/packages/erc20-watcher/src/cli/watch-contract.ts @@ -43,7 +43,7 @@ import { CONTRACT_KIND } from '../utils/index'; }).argv; const config: Config = await getConfig(argv.configFile); - const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config; + const { database: dbConfig, server, jobQueue: jobQueueConfig } = config; const { ethClient, ethProvider } = await initClients(config); assert(dbConfig); @@ -59,7 +59,7 @@ import { CONTRACT_KIND } from '../utils/index'; const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(server, db, ethClient, ethProvider, jobQueue); await indexer.watchContract(argv.address, CONTRACT_KIND, argv.checkpoint, argv.startingBlock); diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index 747fe9875..78aa7af54 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -72,7 +72,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 9bc1ff346..cf878e69d 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions } from '@vulcanize/util'; +import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig } from '@vulcanize/util'; import { Database } from './database'; import { Event } from './entity/Event'; @@ -46,20 +46,22 @@ export class Indexer implements IndexerInterface { _ethClient: EthClient _ethProvider: BaseProvider _baseIndexer: BaseIndexer + _serverConfig: ServerConfig _abi: JsonFragment[] _storageLayout: StorageLayout _contract: ethers.utils.Interface _serverMode: string - constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) { + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue) { assert(db); assert(ethClient); this._db = db; this._ethClient = ethClient; this._ethProvider = ethProvider; - this._serverMode = serverMode; + this._serverConfig = serverConfig; + this._serverMode = serverConfig.mode; this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue); const { abi, storageLayout } = artifacts; @@ -73,6 +75,10 @@ export class Indexer implements IndexerInterface { this._contract = new ethers.utils.Interface(this._abi); } + get serverConfig () { + return this._serverConfig; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); } diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index f083bddf1..90f6d71a5 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -82,7 +82,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index 407779482..8ab3f335e 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -38,7 +38,7 @@ export const main = async (): Promise => { const config: Config = await getConfig(argv.f); const { ethClient, ethProvider } = await initClients(config); - const { host, port, mode, kind: watcherKind } = config.server; + const { host, port, kind: watcherKind } = config.server; const db = new Database(config.database); await db.init(); @@ -55,7 +55,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); diff --git a/packages/erc721-watcher/src/indexer.ts b/packages/erc721-watcher/src/indexer.ts index 8aa6a3f6f..883b0071e 100644 --- a/packages/erc721-watcher/src/indexer.ts +++ b/packages/erc721-watcher/src/indexer.ts @@ -137,6 +137,10 @@ export class Indexer implements IPLDIndexerInterface { this._relationsMap = new Map(); } + get serverConfig () { + return this._serverConfig; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchIPLDStatus(); diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index ee71043b3..37240b190 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -5,10 +5,15 @@ import { IndexerInterface, BlockProgressInterface, EventInterface, - SyncStatusInterface + SyncStatusInterface, + ServerConfig as ServerConfigInterface } from '@vulcanize/util'; export class Indexer implements IndexerInterface { + get serverConfig () { + return new ServerConfig(); + } + async getBlockProgress (blockHash: string): Promise { assert(blockHash); @@ -140,3 +145,29 @@ class SyncStatus implements SyncStatusInterface { this.latestCanonicalBlockNumber = 0; } } + +class ServerConfig implements ServerConfigInterface { + host: string; + port: number; + mode: string; + kind: string; + checkpointing: boolean; + checkpointInterval: number; + ipfsApiAddr: string; + subgraphPath: string; + wasmRestartBlocksInterval: number; + filterLogs: boolean; + + constructor () { + this.host = ''; + this.port = 0; + this.mode = ''; + this.kind = ''; + this.checkpointing = false; + this.checkpointInterval = 0; + this.ipfsApiAddr = ''; + this.subgraphPath = ''; + this.wasmRestartBlocksInterval = 0; + this.filterLogs = false; + } +} diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 3457a79bc..9c8c38e70 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -160,6 +160,10 @@ export class Indexer implements IPLDIndexerInterface { this._populateRelationsMap(); } + get serverConfig () { + return this._serverConfig; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchIPLDStatus(); diff --git a/packages/mobymask-watcher/environments/local.toml b/packages/mobymask-watcher/environments/local.toml index b1fddfccc..ec60fe7ba 100644 --- a/packages/mobymask-watcher/environments/local.toml +++ b/packages/mobymask-watcher/environments/local.toml @@ -12,6 +12,9 @@ # IPFS API address (can be taken from the output on running the IPFS daemon). # ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" + # Boolean to filter logs by contract. + filterLogs = true + [database] type = "postgres" host = "localhost" diff --git a/packages/mobymask-watcher/src/cli/index-block.ts b/packages/mobymask-watcher/src/cli/index-block.ts index be7032826..3c7dc4bd3 100644 --- a/packages/mobymask-watcher/src/cli/index-block.ts +++ b/packages/mobymask-watcher/src/cli/index-block.ts @@ -138,8 +138,14 @@ main().catch(err => { const processEvent = async (indexer: Indexer, block: BlockProgress, event: Event) => { const eventIndex = event.index; + // Check that events are processed in order. + if (eventIndex <= block.lastProcessedEventIndex) { + throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); + } + // Check if previous event in block has been processed exactly before this and abort if not. - if (eventIndex > 0) { // Skip the first event in the block. + // Skip check if logs fetched are filtered by contract address. + if (!indexer.serverConfig.filterLogs) { const prevIndex = eventIndex - 1; if (prevIndex !== block.lastProcessedEventIndex) { diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index f3f5e03dd..41ecb4e34 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -137,6 +137,10 @@ export class Indexer implements IPLDIndexerInterface { this._relationsMap = new Map(); } + get serverConfig () { + return this._serverConfig; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchIPLDStatus(); @@ -799,24 +803,43 @@ export class Indexer implements IPLDIndexerInterface { async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); + let block: any, logs: any[]; - const logsPromise = this._ethClient.getLogs({ blockHash }); - const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); - - let [ - { block, logs }, - { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } + if (this._serverConfig.filterLogs) { + const watchedContracts = this._baseIndexer.getWatchedContracts(); + + // TODO: Query logs by multiple contracts. + const contractlogsWithBlockPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ + blockHash, + contract: watchedContract.address + })); + + const contractlogsWithBlock = await Promise.all(contractlogsWithBlockPromises); + + // Flatten logs by contract and sort by index. + logs = contractlogsWithBlock.map(data => { + return data.logs; + }).flat() + .sort((a, b) => { + return a.index - b.index; + }); + + ({ block } = await this._ethClient.getBlockByHash(blockHash)); + } else { + ({ block, logs } = await this._ethClient.getLogs({ blockHash })); + } + + const { + allEthHeaderCids: { + nodes: [ + { + ethTransactionCidsByHeaderId: { + nodes: transactions } - ] - } + } + ] } - ] = await Promise.all([logsPromise, transactionsPromise]); + } = await this._ethClient.getBlockWithTransactions({ blockHash }); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/mobymask-watcher/tsconfig.json b/packages/mobymask-watcher/tsconfig.json index 99712bdf7..b30e16293 100644 --- a/packages/mobymask-watcher/tsconfig.json +++ b/packages/mobymask-watcher/tsconfig.json @@ -6,7 +6,7 @@ // "incremental": true, /* Enable incremental compilation */ "target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */ "module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */ - // "lib": [], /* Specify library files to be included in the compilation. */ + "lib": ["es2019"], /* Specify library files to be included in the compilation. */ // "allowJs": true, /* Allow javascript files to be compiled. */ // "checkJs": true, /* Report errors in .js files. */ // "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', 'react', 'react-jsx' or 'react-jsxdev'. */ diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts index 205d6be6c..ea6745195 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts @@ -61,7 +61,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode); + const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index a2451e023..9ee540a77 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -79,7 +79,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode); + const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 4314965ca..75e6c05c0 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -11,7 +11,7 @@ import { providers, utils, BigNumber } from 'ethers'; import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue, Where } from '@vulcanize/util'; +import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue, Where, ServerConfig } from '@vulcanize/util'; import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; @@ -46,8 +46,9 @@ export class Indexer implements IndexerInterface { _ethClient: EthClient _baseIndexer: BaseIndexer _isDemo: boolean + _serverConfig: ServerConfig - constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) { + constructor (serverConfig: ServerConfig, db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue) { assert(db); assert(uniClient); assert(erc20Client); @@ -57,8 +58,13 @@ export class Indexer implements IndexerInterface { this._uniClient = uniClient; this._erc20Client = erc20Client; this._ethClient = ethClient; + this._serverConfig = serverConfig; this._baseIndexer = new BaseIndexer(this._db, this._ethClient, ethProvider, jobQueue); - this._isDemo = mode === 'demo'; + this._isDemo = serverConfig.mode === 'demo'; + } + + get serverConfig () { + return this._serverConfig; } getResultEvent (event: Event): ResultEvent { diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index 39f9ce830..ef8a44c72 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -95,7 +95,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode); + const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 5b8994a0d..81676a80d 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -60,7 +60,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); const pubSub = new PubSub(); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubSub, jobQueue); diff --git a/packages/uni-watcher/src/chain-pruning.test.ts b/packages/uni-watcher/src/chain-pruning.test.ts index 2c600069d..2b1f96348 100644 --- a/packages/uni-watcher/src/chain-pruning.test.ts +++ b/packages/uni-watcher/src/chain-pruning.test.ts @@ -61,7 +61,7 @@ describe('chain pruning', () => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); assert(indexer, 'Could not create indexer object.'); jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); diff --git a/packages/uni-watcher/src/cli/reset-cmds/state.ts b/packages/uni-watcher/src/cli/reset-cmds/state.ts index a0b10ed9a..8ac5cbe23 100644 --- a/packages/uni-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-watcher/src/cli/reset-cmds/state.ts @@ -40,7 +40,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/uni-watcher/src/cli/watch-contract.ts b/packages/uni-watcher/src/cli/watch-contract.ts index 47d48ec46..8886ed84b 100644 --- a/packages/uni-watcher/src/cli/watch-contract.ts +++ b/packages/uni-watcher/src/cli/watch-contract.ts @@ -64,7 +64,7 @@ import { Indexer } from '../indexer'; const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); await indexer.watchContract(argv.address, argv.kind, argv.checkpoint, argv.startingBlock); diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index 4e902d78d..f2cfa6878 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -72,7 +72,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 91fa28bd8..3fb57b86f 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -3,13 +3,13 @@ // import debug from 'debug'; -import { DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; +import { DeepPartial, FindConditions, FindManyOptions, QueryRunner, Server } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import assert from 'assert'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue, Where, QueryOptions } from '@vulcanize/util'; +import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue, Where, QueryOptions, ServerConfig } from '@vulcanize/util'; import { Database } from './database'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; @@ -40,15 +40,17 @@ export class Indexer implements IndexerInterface { _ethClient: EthClient _baseIndexer: BaseIndexer _ethProvider: ethers.providers.BaseProvider + _serverConfig: ServerConfig _factoryContract: ethers.utils.Interface _poolContract: ethers.utils.Interface _nfpmContract: ethers.utils.Interface - constructor (db: Database, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { this._db = db; this._ethClient = ethClient; this._ethProvider = ethProvider; + this._serverConfig = serverConfig; this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue); this._factoryContract = new ethers.utils.Interface(factoryABI); @@ -56,6 +58,10 @@ export class Indexer implements IndexerInterface { this._nfpmContract = new ethers.utils.Interface(nfpmABI); } + get serverConfig () { + return this._serverConfig; + } + async init (): Promise { await this._baseIndexer.fetchContracts(); } diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index daf3db543..ee1abbae1 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -84,7 +84,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index 0a383ba5c..82473be46 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -56,7 +56,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); diff --git a/packages/uni-watcher/src/smoke.test.ts b/packages/uni-watcher/src/smoke.test.ts index cc70a55f9..d6cad1b48 100644 --- a/packages/uni-watcher/src/smoke.test.ts +++ b/packages/uni-watcher/src/smoke.test.ts @@ -127,7 +127,7 @@ describe('uni-watcher', () => { factory = new Contract(factoryContract.address, FACTORY_ABI, signer); // Verifying with the db. - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.'); }); @@ -263,7 +263,7 @@ describe('uni-watcher', () => { nfpm = new Contract(nfpmContract.address, NFPM_ABI, signer); // Verifying with the db. - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); await indexer.init(); assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.'); }); diff --git a/packages/uni-watcher/test/init.ts b/packages/uni-watcher/test/init.ts index 12bd76e07..7fdd12862 100644 --- a/packages/uni-watcher/test/init.ts +++ b/packages/uni-watcher/test/init.ts @@ -81,7 +81,7 @@ const main = async () => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); + const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); let factory: Contract; // Checking whether factory is deployed. diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index e11b6c807..85a09a5b9 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -34,6 +34,7 @@ export interface ServerConfig { ipfsApiAddr: string; subgraphPath: string; wasmRestartBlocksInterval: number; + filterLogs: boolean; } export interface UpstreamConfig { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 17bd1cad0..d1a07442f 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -315,6 +315,10 @@ export class Indexer { return watchedContracts; } + getWatchedContracts (): ContractInterface[] { + return Object.values(this._watchedContracts); + } + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { assert(this._db.saveContract); const dbTx = await this._db.createTransactionRunner(); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 008e63c89..3c5356fc0 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -279,8 +279,14 @@ export class JobRunner { const eventIndex = event.index; // log(`Processing event ${event.id} index ${eventIndex}`); + // Check that events are processed in order. + if (eventIndex <= block.lastProcessedEventIndex) { + throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); + } + // Check if previous event in block has been processed exactly before this and abort if not. - if (eventIndex > 0) { // Skip the first event in the block. + // Skip check if logs fetched are filtered by contract address. + if (!this._indexer.serverConfig.filterLogs) { const prevIndex = eventIndex - 1; if (prevIndex !== block.lastProcessedEventIndex) { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index ea82ecaa7..8005f90f0 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -4,6 +4,7 @@ import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; +import { ServerConfig } from './config'; import { Where, QueryOptions } from './database'; import { IpldStatus } from './ipld-indexer'; @@ -76,6 +77,7 @@ export interface IPLDBlockInterface { } export interface IndexerInterface { + readonly serverConfig: ServerConfig getBlockProgress (blockHash: string): Promise getBlockProgressEntities (where: FindConditions, options: FindManyOptions): Promise getEvent (id: string): Promise