Skip to content

Commit

Permalink
Add filterLogs flag to fetch logs by contract (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikugogoi authored Jul 18, 2022
1 parent 9a35955 commit e266869
Show file tree
Hide file tree
Showing 33 changed files with 158 additions and 48 deletions.
4 changes: 4 additions & 0 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ export class Indexer implements IPLDIndexerInterface {
this._populateRelationsMap();
}

get serverConfig () {
return this._serverConfig;
}

async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchIPLDStatus();
Expand Down
4 changes: 4 additions & 0 deletions packages/eden-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ export class Indexer implements IPLDIndexerInterface {
this._populateRelationsMap();
}

get serverConfig () {
return this._serverConfig;
}

async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchIPLDStatus();
Expand Down
2 changes: 1 addition & 1 deletion packages/erc20-watcher/src/cli/reset-cmds/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export const handler = async (argv: any): Promise<void> => {

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);

const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
Expand Down
4 changes: 2 additions & 2 deletions packages/erc20-watcher/src/cli/watch-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import { CONTRACT_KIND } from '../utils/index';
}).argv;

const config: Config = await getConfig(argv.configFile);
const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config;
const { database: dbConfig, server, jobQueue: jobQueueConfig } = config;
const { ethClient, ethProvider } = await initClients(config);

assert(dbConfig);
Expand All @@ -59,7 +59,7 @@ import { CONTRACT_KIND } from '../utils/index';
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(server, db, ethClient, ethProvider, jobQueue);

await indexer.watchContract(argv.address, CONTRACT_KIND, argv.checkpoint, argv.startingBlock);

Expand Down
2 changes: 1 addition & 1 deletion packages/erc20-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);

const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);

Expand Down
12 changes: 9 additions & 3 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers';

import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig } from '@vulcanize/util';

import { Database } from './database';
import { Event } from './entity/Event';
Expand Down Expand Up @@ -46,20 +46,22 @@ export class Indexer implements IndexerInterface {
_ethClient: EthClient
_ethProvider: BaseProvider
_baseIndexer: BaseIndexer
_serverConfig: ServerConfig

_abi: JsonFragment[]
_storageLayout: StorageLayout
_contract: ethers.utils.Interface
_serverMode: string

constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue) {
assert(db);
assert(ethClient);

this._db = db;
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._serverMode = serverMode;
this._serverConfig = serverConfig;
this._serverMode = serverConfig.mode;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue);

const { abi, storageLayout } = artifacts;
Expand All @@ -73,6 +75,10 @@ export class Indexer implements IndexerInterface {
this._contract = new ethers.utils.Interface(this._abi);
}

get serverConfig () {
return this._serverConfig;
}

async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/erc20-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();

const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
Expand Down
4 changes: 2 additions & 2 deletions packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const main = async (): Promise<any> => {
const config: Config = await getConfig(argv.f);
const { ethClient, ethProvider } = await initClients(config);

const { host, port, mode, kind: watcherKind } = config.server;
const { host, port, kind: watcherKind } = config.server;

const db = new Database(config.database);
await db.init();
Expand All @@ -55,7 +55,7 @@ export const main = async (): Promise<any> => {

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
await indexer.init();

const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
Expand Down
4 changes: 4 additions & 0 deletions packages/erc721-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ export class Indexer implements IPLDIndexerInterface {
this._relationsMap = new Map();
}

get serverConfig () {
return this._serverConfig;
}

async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchIPLDStatus();
Expand Down
33 changes: 32 additions & 1 deletion packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import {
IndexerInterface,
BlockProgressInterface,
EventInterface,
SyncStatusInterface
SyncStatusInterface,
ServerConfig as ServerConfigInterface
} from '@vulcanize/util';

export class Indexer implements IndexerInterface {
get serverConfig () {
return new ServerConfig();
}

async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {
assert(blockHash);

Expand Down Expand Up @@ -140,3 +145,29 @@ class SyncStatus implements SyncStatusInterface {
this.latestCanonicalBlockNumber = 0;
}
}

class ServerConfig implements ServerConfigInterface {
host: string;
port: number;
mode: string;
kind: string;
checkpointing: boolean;
checkpointInterval: number;
ipfsApiAddr: string;
subgraphPath: string;
wasmRestartBlocksInterval: number;
filterLogs: boolean;

constructor () {
this.host = '';
this.port = 0;
this.mode = '';
this.kind = '';
this.checkpointing = false;
this.checkpointInterval = 0;
this.ipfsApiAddr = '';
this.subgraphPath = '';
this.wasmRestartBlocksInterval = 0;
this.filterLogs = false;
}
}
4 changes: 4 additions & 0 deletions packages/graph-test-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ export class Indexer implements IPLDIndexerInterface {
this._populateRelationsMap();
}

get serverConfig () {
return this._serverConfig;
}

async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchIPLDStatus();
Expand Down
3 changes: 3 additions & 0 deletions packages/mobymask-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# IPFS API address (can be taken from the output on running the IPFS daemon).
# ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"

# Boolean to filter logs by contract.
filterLogs = true

[database]
type = "postgres"
host = "localhost"
Expand Down
8 changes: 7 additions & 1 deletion packages/mobymask-watcher/src/cli/index-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,14 @@ main().catch(err => {
const processEvent = async (indexer: Indexer, block: BlockProgress, event: Event) => {
const eventIndex = event.index;

// Check that events are processed in order.
if (eventIndex <= block.lastProcessedEventIndex) {
throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
}

// Check if previous event in block has been processed exactly before this and abort if not.
if (eventIndex > 0) { // Skip the first event in the block.
// Skip check if logs fetched are filtered by contract address.
if (!indexer.serverConfig.filterLogs) {
const prevIndex = eventIndex - 1;

if (prevIndex !== block.lastProcessedEventIndex) {
Expand Down
53 changes: 38 additions & 15 deletions packages/mobymask-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ export class Indexer implements IPLDIndexerInterface {
this._relationsMap = new Map();
}

get serverConfig () {
return this._serverConfig;
}

async init (): Promise<void> {
await this._baseIndexer.fetchContracts();
await this._baseIndexer.fetchIPLDStatus();
Expand Down Expand Up @@ -799,24 +803,43 @@ export class Indexer implements IPLDIndexerInterface {

async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
let block: any, logs: any[];

const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });

let [
{ block, logs },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
if (this._serverConfig.filterLogs) {
const watchedContracts = this._baseIndexer.getWatchedContracts();

// TODO: Query logs by multiple contracts.
const contractlogsWithBlockPromises = watchedContracts.map((watchedContract): Promise<any> => this._ethClient.getLogs({
blockHash,
contract: watchedContract.address
}));

const contractlogsWithBlock = await Promise.all(contractlogsWithBlockPromises);

// Flatten logs by contract and sort by index.
logs = contractlogsWithBlock.map(data => {
return data.logs;
}).flat()
.sort((a, b) => {
return a.index - b.index;
});

({ block } = await this._ethClient.getBlockByHash(blockHash));
} else {
({ block, logs } = await this._ethClient.getLogs({ blockHash }));
}

const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
]
}
}
]
}
] = await Promise.all([logsPromise, transactionsPromise]);
} = await this._ethClient.getBlockWithTransactions({ blockHash });

const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
Expand Down
2 changes: 1 addition & 1 deletion packages/mobymask-watcher/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// "incremental": true, /* Enable incremental compilation */
"target": "es5", /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', 'ES2021', or 'ESNEXT'. */
"module": "commonjs", /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */
// "lib": [], /* Specify library files to be included in the compilation. */
"lib": ["es2019"], /* Specify library files to be included in the compilation. */
// "allowJs": true, /* Allow javascript files to be compiled. */
// "checkJs": true, /* Report errors in .js files. */
// "jsx": "preserve", /* Specify JSX code generation: 'preserve', 'react-native', 'react', 'react-jsx' or 'react-jsxdev'. */
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/cli/reset-cmds/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export const handler = async (argv: any): Promise<void> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);

const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);

const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);

Expand Down
12 changes: 9 additions & 3 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { providers, utils, BigNumber } from 'ethers';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue, Where } from '@vulcanize/util';
import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue, Where, ServerConfig } from '@vulcanize/util';

import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
Expand Down Expand Up @@ -46,8 +46,9 @@ export class Indexer implements IndexerInterface {
_ethClient: EthClient
_baseIndexer: BaseIndexer
_isDemo: boolean
_serverConfig: ServerConfig

constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) {
constructor (serverConfig: ServerConfig, db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue) {
assert(db);
assert(uniClient);
assert(erc20Client);
Expand All @@ -57,8 +58,13 @@ export class Indexer implements IndexerInterface {
this._uniClient = uniClient;
this._erc20Client = erc20Client;
this._ethClient = ethClient;
this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, ethProvider, jobQueue);
this._isDemo = mode === 'demo';
this._isDemo = serverConfig.mode === 'demo';
}

get serverConfig () {
return this._serverConfig;
}

getResultEvent (event: Event): ResultEvent {
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, config.server.mode);
const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);

const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);

const pubSub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubSub, jobQueue);
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-watcher/src/chain-pruning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ describe('chain pruning', () => {

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });

indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);
assert(indexer, 'Could not create indexer object.');

jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-watcher/src/cli/reset-cmds/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export const handler = async (argv: any): Promise<void> => {

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);

const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
Expand Down
Loading

0 comments on commit e266869

Please sign in to comment.