Skip to content

Commit

Permalink
Refactor job-runner CLI to cli package (#255)
Browse files Browse the repository at this point in the history
* Fix eden-watcher server initialization

* Add an indexer method to watch subgraph contracts

* Refactor job-runner CLI to cli package

* Move watcher reset commands to refactored code
  • Loading branch information
prathamesh0 authored Nov 23, 2022
1 parent cc28474 commit 4bfb007
Show file tree
Hide file tree
Showing 17 changed files with 226 additions and 509 deletions.
4 changes: 2 additions & 2 deletions packages/cli/src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ export class BaseCmd {
_jobQueue?: JobQueue
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
_graphDb?: GraphDatabase
_eventWatcher?: EventWatcherInterface
_graphDb?: GraphDatabase;
_eventWatcher?: EventWatcherInterface;

get config (): Config | undefined {
return this._config;
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ export * from './inspect-cid';
export * from './import-state';
export * from './export-state';
export * from './server';
export * from './job-runner';
152 changes: 152 additions & 0 deletions packages/cli/src/job-runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//
// Copyright 2022 Vulcanize, Inc.
//

import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';

import { JsonRpcProvider } from '@ethersproject/providers';
import { GraphWatcher } from '@cerc-io/graph-node';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Clients,
JobRunner as BaseJobRunner,
JobQueueConfig,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
startMetricsServer
} from '@cerc-io/util';

import { BaseCmd } from './base';

interface Arguments {
configFile: string;
}

export class JobRunnerCmd {
_argv?: Arguments
_baseCmd: BaseCmd;

constructor () {
this._baseCmd = new BaseCmd();
}

get jobQueue (): JobQueue | undefined {
return this._baseCmd.jobQueue;
}

get indexer (): IndexerInterface | undefined {
return this._baseCmd.indexer;
}

async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);

return this._baseCmd.initConfig(this._argv.configFile);
}

async init (
Database: new (config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (
serverConfig: ServerConfig,
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcher
) => IndexerInterface,
clients: { [key: string]: any } = {},
entityQueryTypeMap?: Map<any, any>,
entityToLatestEntityMap?: Map<any, any>
): Promise<void> {
await this.initConfig();

await this._baseCmd.init(Database, Indexer, clients, entityQueryTypeMap, entityToLatestEntityMap);
}

async exec (startJobRunner: (jobRunner: JobRunner) => Promise<void>): Promise<void> {
const config = this._baseCmd.config;
const jobQueue = this._baseCmd.jobQueue;
const indexer = this._baseCmd.indexer;

assert(config);
assert(jobQueue);
assert(indexer);

if (indexer.addContracts) {
await indexer.addContracts();
}

const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue);

await jobRunner.jobQueue.deleteAllJobs();
await jobRunner.baseJobRunner.resetToPrevIndexedBlock();

await startJobRunner(jobRunner);
jobRunner.baseJobRunner.handleShutdown();

await startMetricsServer(config, indexer);
}

_getArgv (): any {
return yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.argv;
}
}

export class JobRunner {
jobQueue: JobQueue
baseJobRunner: BaseJobRunner
_indexer: IndexerInterface
_jobQueueConfig: JobQueueConfig

constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this.jobQueue = jobQueue;
this.baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this.jobQueue);
}

async subscribeBlockProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this.baseJobRunner.processBlock(job);
});
}

async subscribeEventProcessingQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
await this.baseJobRunner.processEvent(job);
});
}

async subscribeHooksQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this.baseJobRunner.processHooks(job);
});
}

async subscribeBlockCheckpointQueue (): Promise<void> {
await this.jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this.baseJobRunner.processCheckpoint(job);
});
}
}
3 changes: 0 additions & 3 deletions packages/cli/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import 'reflect-metadata';
import fs from 'fs';
import path from 'path';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import { PubSub } from 'graphql-subscriptions';
Expand Down Expand Up @@ -34,7 +32,6 @@ import { BaseCmd } from './base';

interface Arguments {
configFile: string;
importFile: string;
}

export class ServerCmd {
Expand Down
7 changes: 7 additions & 0 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,13 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getLatestStateIndexedBlock();
}

{{#if (subgraphPath)}}
async addContracts (): Promise<void> {
// Watching all the contracts in the subgraph.
await this._graphWatcher.addContracts();
}

{{/if}}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ export const main = async (): Promise<any> => {
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

// Watching all the contracts in the subgraph.
await graphWatcher.addContracts();
await indexer.addContracts();
{{/if}}

const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
Expand Down
2 changes: 1 addition & 1 deletion packages/eden-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@
eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
prefetchBlockCount = 10
5 changes: 5 additions & 0 deletions packages/eden-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getLatestStateIndexedBlock();
}

async addContracts (): Promise<void> {
// Watching all the contracts in the subgraph.
await this._graphWatcher.addContracts();
}

async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
}
Expand Down
122 changes: 11 additions & 111 deletions packages/eden-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,125 +2,25 @@
// Copyright 2021 Vulcanize, Inc.
//

import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';

import {
getConfig,
Config,
JobQueue,
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
JobQueueConfig,
DEFAULT_CONFIG_PATH,
initClients,
startMetricsServer
} from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { JobRunner, JobRunnerCmd } from '@cerc-io/cli';

import { Indexer } from './indexer';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Database, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP } from './database';

const log = debug('vulcanize:job-runner');

export class JobRunner {
_indexer: Indexer
_jobQueue: JobQueue
_baseJobRunner: BaseJobRunner
_jobQueueConfig: JobQueueConfig

constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
this._indexer = indexer;
this._jobQueue = jobQueue;
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
}

async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
await this.subscribeBlockCheckpointQueue();
await this.subscribeHooksQueue();
this._baseJobRunner.handleShutdown();
}

async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);
});
}

async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseJobRunner.processEvent(job);
});
}

async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
await this._baseJobRunner.processHooks(job);
});
}

async subscribeBlockCheckpointQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
await this._baseJobRunner.processCheckpoint(job);
});
}
}

export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.argv;

const config: Config = await getConfig(argv.f);
const { ethClient, ethProvider } = await initClients(config);

const db = new Database(config.database);
await db.init();

const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();

const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();

graphWatcher.setIndexer(indexer);
await graphWatcher.init();

// Watching all the contracts in the subgraph.
await graphWatcher.addContracts();

const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();

startMetricsServer(config, indexer);
const jobRunnerCmd = new JobRunnerCmd();
await jobRunnerCmd.init(Database, Indexer, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);

await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise<void> => {
await jobRunner.subscribeBlockProcessingQueue();
await jobRunner.subscribeEventProcessingQueue();
await jobRunner.subscribeBlockCheckpointQueue();
await jobRunner.subscribeHooksQueue();
});
};

main().then(() => {
Expand Down
2 changes: 1 addition & 1 deletion packages/eden-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd();
await serverCmd.init(Database, Indexer, EventWatcher, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await serverCmd.init(Database, Indexer, EventWatcher, {}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);

const typeDefs = fs.readFileSync(path.join(__dirname, 'schema.gql')).toString();

Expand Down
Loading

0 comments on commit 4bfb007

Please sign in to comment.