diff --git a/packages/cli/src/reset/state.ts b/packages/cli/src/reset/state.ts index 9750c149f..8f484adaf 100644 --- a/packages/cli/src/reset/state.ts +++ b/packages/cli/src/reset/state.ts @@ -61,7 +61,6 @@ export class ResetStateCmd { const { blockNumber } = this._argv; try { // Delete all State entries after the given block - assert(this._database.removeStatesAfterBlock); await this._database.removeStatesAfterBlock(dbTx, blockNumber); // Reset the stateSyncStatus. diff --git a/packages/codegen/README.md b/packages/codegen/README.md index cdaafece0..5c8168239 100644 --- a/packages/codegen/README.md +++ b/packages/codegen/README.md @@ -18,6 +18,14 @@ ## Run +Follow the steps below or follow the demos: + +* [Subgraph watcher](./subgraph-demo.md) + +* [Non subgraph watcher](./non-subgraph-demo.md) + +Steps: + * Create a `.yaml` config file in the following format for generating a watcher: ```yaml diff --git a/packages/codegen/non-subgraph-demo.md b/packages/codegen/non-subgraph-demo.md new file mode 100644 index 000000000..0e5055da3 --- /dev/null +++ b/packages/codegen/non-subgraph-demo.md @@ -0,0 +1,348 @@ +# Subgraph watcher demo + +* Clone the [stack-orchestrator](https://github.com/vulcanize/stack-orchestrator) repo. + + ```bash + git clone https://github.com/vulcanize/stack-orchestrator + ``` + +* Create a `config.sh` file. + + ```bash + cd stack-orchestrator/helper-scripts + ./create-config.sh + ``` + +* Setup the required repositories. + + ```bash + ./setup-repositories.sh -p ssh + ``` + +* Checkout [v4 release](https://github.com/cerc-io/go-ethereum/releases/tag/v1.10.26-statediff-4.2.2-alpha) in go-ethereum repo. The path for go-ethereum is specified by `vulcanize_go_ethereum` variable in `config.sh` file created in stack-orchestrator repo. + + ```bash + # In go-ethereum repo. + git checkout v1.10.26-statediff-4.2.2-alpha + ``` + +* Update to use latest images for ipld-eth-db and ipld-eth-server + + * In [docker/latest/docker-compose-db-sharding.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-db-sharding.yml) update image version + + ```yml + services: + migrations: + image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v4.2.3-alpha + ``` + + * In [docker/latest/docker-compose-ipld-eth-server.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-ipld-eth-server.yml) update image version + + ```yml + services: + ipld-eth-server: + image: git.vdb.to/cerc-io/ipld-eth-server/ipld-eth-server:v4.2.3-alpha + ``` + +* To run the stack-orchestrator, the docker-compose version used is: + + ```bash + docker-compose version + + # docker-compose version 1.29.2, build 5becea4c + ``` + +* Run the stack-orchestrator + + ```bash + cd stack-orchestrator/helper-scripts + ``` + + ```bash + ./wrapper.sh -f true \ + -m true \ + -s v4 \ + -l latest \ + -v remove \ + -p ../config.sh + ``` + +* In [packages/codegen](./), create a `config.yaml` file: + + ```yaml + # Config to generate demo-erc721-watcher using codegen. + # Contracts to watch (required). + contracts: + # Contract name. + - name: ERC721 + # Contract file path or an url. + path: ../../node_modules/@openzeppelin/contracts/token/ERC721/ERC721.sol + # Contract kind + kind: ERC721 + + # Output folder path (logs output using `stdout` if not provided). + outputFolder: ../demo-erc721-watcher + + # Code generation mode [eth_call | storage | all | none] (default: none). + mode: all + + # Kind of watcher [lazy | active] (default: active). + kind: active + + # Watcher server port (default: 3008). + port: 3009 + + # Flatten the input contract file(s) [true | false] (default: true). + flatten: true + ``` + +* Run codegen to generate watcher: + + ```bash + yarn codegen --config-file ./config.yaml + ``` + + The watcher should be generated in `packages/demo-erc721-watcher` + +* Create a postgres12 database for the watcher: + + ```bash + sudo su - postgres + + # If database already exists + # dropdb demo-erc721-watcher + + createdb demo-erc721-watcher + ``` + +* Create database for the job queue and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro): + + ```bash + # If database already exists + # dropdb demo-erc721-watcher-job-queue + + createdb demo-erc721-watcher-job-queue + ``` + + ``` + postgres@tesla:~$ psql -U postgres -h localhost demo-erc721-watcher-job-queue + Password for user postgres: + psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1)) + SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) + Type "help" for help. + + demo-erc721-watcher-job-queue=# CREATE EXTENSION pgcrypto; + CREATE EXTENSION + demo-erc721-watcher-job-queue=# exit + ``` + +## Custom hooks: + +For generating default state for `ERC721` from the indexer methods, replace the `handleEvent` hook in `demo-erc721-watcher/src/hooks.ts` file with: + +```ts +export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise { + assert(indexer); + assert(eventData); + + // Perform indexing based on the type of event. + switch (eventData.event.__typename) { + case 'TransferEvent': { + // Get event fields from eventData. + const { from, to, tokenId } = eventData.event; + + // Update balance entry for the sender in database. + if (from !== '0x0000000000000000000000000000000000000000') { + await indexer._balances(eventData.block.hash, eventData.contract, from, true); + } + + // Update balance entry for the receiver in database. + if (to !== '0x0000000000000000000000000000000000000000') { + await indexer._balances(eventData.block.hash, eventData.contract, to, true); + } + + // Update owner for the tokenId in database. + await indexer._owners(eventData.block.hash, eventData.contract, tokenId, true); + + break; + } + case 'ApprovalEvent': { + // Get event fields from eventData. + const { tokenId } = eventData.event; + + // Update tokenApprovals for the tokenId in database. + await indexer._tokenApprovals(eventData.block.hash, eventData.contract, tokenId, true); + + break; + } + case 'ApprovalForAllEvent': { + // Get event fields from eventData. + const { owner, operator } = eventData.event; + + // Update operatorApprovals for the tokenId in database. + await indexer._operatorApprovals(eventData.block.hash, eventData.contract, owner, operator, true); + + break; + } + } +} +``` + + Here, the `diff` is passed as true to indexer methods to store default state. + +* In `watcher-ts` repo, follow the instructions in [Setup](../../README.md#setup) for installing and building packages. + + ```bash + # After setup + yarn && yarn build + ``` + +* In `packages/demo-erc721-watcher`, run the job-runner: + + ```bash + yarn job-runner + ``` + +* Run the watcher: + + ```bash + yarn server + ``` + +## Operations + +Run the following in [packages/erc721-watcher](../erc721-watcher/): + +* Get the signer account address and export to a shell variable: + + ```bash + yarn account + ``` + + ```bash + export SIGNER_ADDRESS="" + ``` + +* Connect MetaMask to `http://localhost:8545` (with chain ID `99`) + +* Add a second account to Metamask and export the account address to a shell variable for later use: + + ```bash + export RECIPIENT_ADDRESS="" + ``` + +* Deploy token: + + ```bash + yarn nft:deploy + ``` + +* Set the returned address to the variable `$NFT_ADDRESS`: + + ```bash + NFT_ADDRESS= + ``` + +* Run the following GQL mutation in generated watcher graphql endpoint http://127.0.0.1:3009/graphql + + ```graphql + mutation { + watchContract( + address: "NFT_ADDRESS" + kind: "ERC721" + checkpoint: true + ) + } + ``` + +* Run the following GQL subscription in generated watcher graphql endpoint: + + ```graphql + subscription { + onEvent { + event { + __typename + ... on TransferEvent { + from + to + tokenId + }, + ... on ApprovalEvent { + owner + approved + tokenId + } + }, + block { + number + hash + } + } + } + ``` + +* Mint token: + + ```bash + yarn nft:mint --nft $NFT_ADDRESS --to $SIGNER_ADDRESS --token-id 1 + ``` + + * A `Transfer` event to `$SIGNER_ADDRESS` shall be visible in the subscription at endpoint. + + * An auto-generated `diff` entry `State` should be added with `parent` cid pointing to the initial checkpoint `State`. + +* Run the `getState` query at the endpoint to get the latest `State` for `NFT_ADDRESS`: + + ```graphql + query { + getState ( + blockHash: "EVENT_BLOCK_HASH" + contractAddress: "NFT_ADDRESS" + # kind: "checkpoint" + kind: "diff" + ) { + cid + block { + cid + hash + number + timestamp + parentHash + } + contractAddress + data + } + } + ``` + +* Transfer token: + + ```bash + yarn nft:transfer --nft $NFT_ADDRESS --from $SIGNER_ADDRESS --to $RECIPIENT_ADDRESS --token-id 1 + ``` + + * An `Approval` event for `ZERO_ADDRESS` shall be visible in the subscription at endpoint. + + * A `Transfer` event to `$RECIPIENT_ADDRESS` shall be visible in the subscription at endpoint. + + * An auto-generated `diff` entry `State` should be added with `parent` cid pointing to the previous `State`. + +* Run the `getState` query again at the endpoint with event blockHash. + +* Get the latest `blockHash`: + + ```bash + yarn block:latest + ``` + +* In `packages/demo-erc721-watcher`, create a checkpoint using CLI: + + ```bash + yarn checkpoint create --address $NFT_ADDRESS + ``` + + * Run the `getState` query again with the output blockHash and kind `checkpoint` at the endpoint. + + * The latest checkpoint should have the aggregate of state diffs since the last checkpoint. + + * The `State` entries can be seen in `pg-admin` in table `state`. diff --git a/packages/codegen/src/entity.ts b/packages/codegen/src/entity.ts index 18bcd331f..5180f6481 100644 --- a/packages/codegen/src/entity.ts +++ b/packages/codegen/src/entity.ts @@ -253,6 +253,20 @@ export class Entity { // Add subgraph entity specific columns. entityObject = this._addSubgraphColumns(subgraphTypeDefs, entityObject, def); + // Add is_pruned column. + entityObject.columns.push({ + name: 'isPruned', + pgType: 'boolean', + tsType: 'boolean', + columnType: 'Column', + columnOptions: [ + { + option: 'default', + value: false + } + ] + }); + // Add decimalTransformer column option if required. this._addDecimalTransformerOption(entityObject); diff --git a/packages/codegen/src/templates/checkpoint-create-template.handlebars b/packages/codegen/src/templates/checkpoint-create-template.handlebars index bc16d8a29..5f69004f9 100644 --- a/packages/codegen/src/templates/checkpoint-create-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-create-template.handlebars @@ -40,13 +40,12 @@ export const handler = async (argv: any): Promise => { await db.init(); {{#if (subgraphPath)}} - const graphDb = new GraphDatabase(config.server, db.baseDatabase); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); - {{/if}} + {{/if}} const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/codegen/src/templates/checkpoint-verify-template.handlebars b/packages/codegen/src/templates/checkpoint-verify-template.handlebars index c782d30db..e2829dbfd 100644 --- a/packages/codegen/src/templates/checkpoint-verify-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-verify-template.handlebars @@ -9,7 +9,7 @@ import assert from 'assert'; import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; -import { Database } from '../../database'; +import { Database, ENTITY_TO_LATEST_ENTITY_MAP, ENTITY_QUERY_TYPE_MAP } from '../../database'; import { Indexer } from '../../indexer'; const log = debug('vulcanize:checkpoint-verify'); @@ -34,7 +34,7 @@ export const handler = async (argv: any): Promise => { const db = new Database(config.database); await db.init(); - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index df410b59d..304f0c902 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -7,6 +7,9 @@ import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner import path from 'path'; import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; +{{#if (subgraphPath)}} +import { ENTITY_QUERY_TYPE } from '@cerc-io/graph-node'; +{{/if}} import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -21,11 +24,23 @@ import { {{query.entityName}} } from './entity/{{query.entityName}}'; import { {{subgraphEntity.className}} } from './entity/{{subgraphEntity.className}}'; {{/each}} -export const ENTITIES = [ - {{~#each queries as | query |}}{{query.entityName}}, {{/each}} +{{#if (subgraphPath)}} +export const SUBGRAPH_ENTITIES = new Set([ {{~#each subgraphEntities as | subgraphEntity |}}{{subgraphEntity.className}} {{~#unless @last}}, {{/unless}} + {{~/each}}]); +{{/if}} +export const ENTITIES = [ + {{~#if (subgraphPath)}}...SUBGRAPH_ENTITIES, {{/if}} + {{~#each queries as | query |}}{{query.entityName}} + {{~#unless @last}}, {{/unless}} {{~/each}}]; +{{#if (subgraphPath)}} +// Map: Entity to suitable query type. +export const ENTITY_QUERY_TYPE_MAP = new Map any, ENTITY_QUERY_TYPE>([]); + +export const ENTITY_TO_LATEST_ENTITY_MAP: Map = new Map(); +{{/if}} export class Database implements DatabaseInterface { _config: ConnectionOptions; @@ -38,6 +53,9 @@ export class Database implements DatabaseInterface { this._config = { ...config, + {{#if (subgraphPath)}} + subscribers: [path.join(__dirname, 'entity/Subscriber.*')], + {{/if}} entities: [path.join(__dirname, 'entity/*')] }; diff --git a/packages/codegen/src/templates/export-state-template.handlebars b/packages/codegen/src/templates/export-state-template.handlebars index 405efeea7..edc8d3e5b 100644 --- a/packages/codegen/src/templates/export-state-template.handlebars +++ b/packages/codegen/src/templates/export-state-template.handlebars @@ -24,7 +24,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; {{/if}} import * as codec from '@ipld/dag-cbor'; -import { Database } from '../database'; +import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:export-state'); @@ -59,7 +59,7 @@ const main = async (): Promise => { await db.init(); {{#if (subgraphPath)}} - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index 4a4c6241f..9804a6da1 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -7,14 +7,14 @@ import 'reflect-metadata'; import yargs from 'yargs'; import { hideBin } from 'yargs/helpers'; import debug from 'debug'; - import { PubSub } from 'graphql-subscriptions'; +import { PubSub } from 'graphql-subscriptions'; import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util'; {{#if (subgraphPath)}} import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; {{/if}} -import { Database } from './database'; +import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; {{#if (subgraphPath)}} @@ -72,7 +72,7 @@ export const main = async (): Promise => { await db.init(); {{#if (subgraphPath)}} - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index dd3c81ef7..632bb34b2 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -17,7 +17,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; {{/if}} import * as codec from '@ipld/dag-cbor'; -import { Database } from '../database'; +import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database'; import { Indexer } from '../indexer'; import { EventWatcher } from '../events'; import { State } from '../entity/State'; @@ -50,7 +50,7 @@ export const main = async (): Promise => { await db.init(); {{#if (subgraphPath)}} - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/codegen/src/templates/index-block-template.handlebars b/packages/codegen/src/templates/index-block-template.handlebars index c86d03e81..31aa6e1a6 100644 --- a/packages/codegen/src/templates/index-block-template.handlebars +++ b/packages/codegen/src/templates/index-block-template.handlebars @@ -12,7 +12,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlo import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; {{/if}} -import { Database } from '../database'; +import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database'; import { Indexer } from '../indexer'; const log = debug('vulcanize:index-block'); @@ -44,7 +44,7 @@ const main = async (): Promise => { await db.init(); {{#if (subgraphPath)}} - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 0a5e021f8..bc73984d4 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -41,7 +41,7 @@ import { GraphWatcher } from '@cerc-io/graph-node'; {{#each contracts as | contract |}} import {{contract.contractName}}Artifacts from './artifacts/{{contract.contractName}}.json'; {{/each}} -import { Database, ENTITIES } from './database'; +import { Database, ENTITIES{{#if (subgraphPath)}}, SUBGRAPH_ENTITIES{{/if}} } from './database'; import { createInitialState, handleEvent, createStateDiff, createStateCheckpoint } from './hooks'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -104,17 +104,17 @@ export class Indexer implements IndexerInterface { this._abiMap = new Map(); this._storageLayoutMap = new Map(); this._contractMap = new Map(); - {{#each contracts as | contract |}} + const { abi: {{contract.contractName}}ABI, {{#if contract.contractStorageLayout}} storageLayout: {{contract.contractName}}StorageLayout {{/if}} } = {{contract.contractName}}Artifacts; - {{/each}} {{#each contracts as | contract |}} + assert({{contract.contractName}}ABI); this._abiMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}ABI); {{#if contract.contractStorageLayout}} @@ -122,9 +122,9 @@ export class Indexer implements IndexerInterface { this._storageLayoutMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}StorageLayout); {{/if}} this._contractMap.set(KIND_{{capitalize contract.contractName}}, new ethers.utils.Interface({{contract.contractName}}ABI)); - {{/each}} {{#if (subgraphPath)}} + this._entityTypesMap = new Map(); this._populateEntityTypesMap(); @@ -259,8 +259,10 @@ export class Indexer implements IndexerInterface { } async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { + console.time('time:indexer#processCanonicalBlock-finalize_auto_diffs'); // Finalize staged diff blocks if any. await this._baseIndexer.finalizeDiffStaged(blockHash); + console.timeEnd('time:indexer#processCanonicalBlock-finalize_auto_diffs'); // Call custom stateDiff hook. await createStateDiff(this, blockHash); @@ -275,7 +277,9 @@ export class Indexer implements IndexerInterface { const checkpointInterval = this._serverConfig.checkpointInterval; if (checkpointInterval <= 0) return; + console.time('time:indexer#processCheckpoint-checkpoint'); await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval); + console.timeEnd('time:indexer#processCheckpoint-checkpoint'); } async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise { @@ -308,7 +312,9 @@ export class Indexer implements IndexerInterface { // Method used to create auto diffs (diff_staged). async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise { + console.time('time:indexer#createDiffStaged-auto_diff'); await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data); + console.timeEnd('time:indexer#createDiffStaged-auto_diff'); } // Method to be used by createStateDiff hook. @@ -361,13 +367,25 @@ export class Indexer implements IndexerInterface { return data; } + async getSubgraphEntities ( + entity: new () => Entity, + block: BlockHeight, + where: { [key: string]: any } = {}, + queryOptions: QueryOptions = {}, + selections: ReadonlyArray = [] + ): Promise { + return this._graphWatcher.getEntities(entity, this._relationsMap, block, where, queryOptions, selections); + } + {{/if}} async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); {{#if (subgraphPath)}} + console.time('time:indexer#processEvent-mapping_code'); // Call subgraph handler for event. await this._graphWatcher.handleEvent(resultEvent); + console.timeEnd('time:indexer#processEvent-mapping_code'); {{/if}} // Call custom hook function for indexing on event. @@ -380,8 +398,10 @@ export class Indexer implements IndexerInterface { } async processBlock (blockProgress: BlockProgress): Promise { + console.time('time:indexer#processBlock-init_state'); // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); + console.timeEnd('time:indexer#processBlock-init_state'); {{#if (subgraphPath)}} this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); @@ -390,11 +410,15 @@ export class Indexer implements IndexerInterface { {{#if (subgraphPath)}} async processBlockAfterEvents (blockHash: string, blockNumber: number): Promise { + console.time('time:indexer#processBlockAfterEvents-mapping_code'); // Call subgraph handler for block. await this._graphWatcher.handleBlock(blockHash, blockNumber); + console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code'); + console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state'); // Persist subgraph state to the DB. await this.dumpSubgraphState(blockHash); + console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state'); } {{/if}} @@ -557,15 +581,23 @@ export class Indexer implements IndexerInterface { } async markBlocksAsPruned (blocks: BlockProgress[]): Promise { - return this._baseIndexer.markBlocksAsPruned(blocks); + await this._baseIndexer.markBlocksAsPruned(blocks); + {{#if (subgraphPath)}} + + await this._graphWatcher.pruneEntities(FrothyEntity, blocks, SUBGRAPH_ENTITIES); + {{/if}} } {{#if (subgraphPath)}} async pruneFrothyEntities (blockNumber: number): Promise { await this._graphWatcher.pruneFrothyEntities(FrothyEntity, blockNumber); } - {{/if}} + async resetLatestEntities (blockNumber: number): Promise { + await this._graphWatcher.resetLatestEntities(blockNumber); + } + + {{/if}} async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } @@ -581,9 +613,13 @@ export class Indexer implements IndexerInterface { const entities = [...ENTITIES]; {{/if}} await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); - } + {{#if (subgraphPath)}} + await this.resetLatestEntities(blockNumber); + {{/if}} + } {{#if (subgraphPath)}} + getEntityTypesMap (): Map { return this._entityTypesMap; } diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 3e37e1973..2c1183907 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -28,7 +28,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; {{/if}} import { Indexer } from './indexer'; -import { Database } from './database'; +import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database'; const log = debug('vulcanize:job-runner'); @@ -97,7 +97,7 @@ export const main = async (): Promise => { await db.init(); {{#if (subgraphPath)}} - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/codegen/src/templates/resolvers-template.handlebars b/packages/codegen/src/templates/resolvers-template.handlebars index b9496fbd5..ef097dbc8 100644 --- a/packages/codegen/src/templates/resolvers-template.handlebars +++ b/packages/codegen/src/templates/resolvers-template.handlebars @@ -12,10 +12,12 @@ import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigInt import { Indexer } from './indexer'; import { EventWatcher } from './events'; +{{#if (subgraphPath)}} {{#each subgraphQueries as | query |}} import { {{query.entityName}} } from './entity/{{query.entityName}}'; {{/each}} +{{/if}} const log = debug('vulcanize:resolver'); diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index b64aa1093..807b4df66 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -20,7 +20,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { createResolvers } from './resolvers'; import { Indexer } from './indexer'; -import { Database } from './database'; +import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database'; import { EventWatcher } from './events'; const log = debug('vulcanize:server'); @@ -45,7 +45,7 @@ export const main = async (): Promise => { await db.init(); {{#if (subgraphPath)}} - const graphDb = new GraphDatabase(config.server, db.baseDatabase); + const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP); await graphDb.init(); const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server); diff --git a/packages/codegen/src/templates/subscriber-template.handlebars b/packages/codegen/src/templates/subscriber-template.handlebars index fc1127d5a..e18e41738 100644 --- a/packages/codegen/src/templates/subscriber-template.handlebars +++ b/packages/codegen/src/templates/subscriber-template.handlebars @@ -5,16 +5,16 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } from 'typeorm'; import { FrothyEntity } from './FrothyEntity'; -import { ENTITIES } from '../database'; +import { ENTITY_TO_LATEST_ENTITY_MAP, SUBGRAPH_ENTITIES } from '../database'; import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node'; @EventSubscriber() export class EntitySubscriber implements EntitySubscriberInterface { async afterInsert (event: InsertEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); + await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } async afterUpdate (event: UpdateEvent): Promise { - await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event); + await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP); } } diff --git a/packages/codegen/src/templates/types-template.handlebars b/packages/codegen/src/templates/types-template.handlebars index c67c3814a..0f9ed5e50 100644 --- a/packages/codegen/src/templates/types-template.handlebars +++ b/packages/codegen/src/templates/types-template.handlebars @@ -1,8 +1,8 @@ // // Copyright 2021 Vulcanize, Inc. // - {{#each types as | type |}} + export enum {{type.name}} { {{#each type.values as | value |}} {{value}} = '{{value}}', diff --git a/packages/codegen/subgraph-demo.md b/packages/codegen/subgraph-demo.md new file mode 100644 index 000000000..0e956bf3e --- /dev/null +++ b/packages/codegen/subgraph-demo.md @@ -0,0 +1,270 @@ +# Subgraph watcher demo + +* Clone the [stack-orchestrator](https://github.com/vulcanize/stack-orchestrator) repo. + + ```bash + git clone https://github.com/vulcanize/stack-orchestrator + ``` + +* Create a `config.sh` file. + + ```bash + cd stack-orchestrator/helper-scripts + ./create-config.sh + ``` + +* Setup the required repositories. + + ```bash + ./setup-repositories.sh -p ssh + ``` + +* Checkout [v4 release](https://github.com/cerc-io/go-ethereum/releases/tag/v1.10.26-statediff-4.2.2-alpha) in go-ethereum repo. The path for go-ethereum is specified by `vulcanize_go_ethereum` variable in `config.sh` file created in stack-orchestrator repo. + + ```bash + # In go-ethereum repo. + git checkout v1.10.26-statediff-4.2.2-alpha + ``` + +* Update to use latest images for ipld-eth-db and ipld-eth-server + + * In [docker/latest/docker-compose-db-sharding.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-db-sharding.yml) update image version + + ```yml + services: + migrations: + image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v4.2.3-alpha + ``` + + * In [docker/latest/docker-compose-ipld-eth-server.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-ipld-eth-server.yml) update image version + + ```yml + services: + ipld-eth-server: + image: git.vdb.to/cerc-io/ipld-eth-server/ipld-eth-server:v4.2.3-alpha + ``` + +* To run the stack-orchestrator, the docker-compose version used is: + + ```bash + docker-compose version + + # docker-compose version 1.29.2, build 5becea4c + ``` + +* Run the stack-orchestrator + + ```bash + cd stack-orchestrator/helper-scripts + ``` + + ```bash + ./wrapper.sh -f true \ + -m true \ + -s v4 \ + -l latest \ + -v remove \ + -p ../config.sh + ``` + +* In watcher-ts [packages/graph-node](../graph-node/), deploy an `Example` contract: + + ```bash + yarn example:deploy + ``` + +* Set the returned address to the variable `$EXAMPLE_ADDRESS`: + + ```bash + export EXAMPLE_ADDRESS= + ``` + +* In [packages/graph-node/test/subgraph/example1/subgraph.yaml](../graph-node/test/subgraph/example1/subgraph.yaml), set the source address for `Example1` datasource to the `EXAMPLE_ADDRESS`. Then in [packages/graph-node](../graph-node/) run: + + ```bash + yarn build:example + ``` + +* In [packages/codegen](./), create a `config.yaml` file: + + ```yaml + # Example config.yaml + # Contracts to watch (required). + # Can pass empty array ([]) when using subgraphPath. + contracts: + # Contract name. + - name: Example + # Contract file path or an url. + path: ../graph-node/test/contracts/Example.sol + # Contract kind (should match that in {subgraphPath}/subgraph.yaml if subgraphPath provided) + kind: Example1 + + # Output folder path (logs output using `stdout` if not provided). + outputFolder: ../test-watcher + + # Code generation mode [eth_call | storage | all | none] (default: none). + mode: none + + # Kind of watcher [lazy | active] (default: active). + kind: active + + # Watcher server port (default: 3008). + port: 3008 + + # Flatten the input contract file(s) [true | false] (default: true). + flatten: true + + # Path to the subgraph build (optional). + # Can set empty contracts array when using subgraphPath. + subgraphPath: ../graph-node/test/subgraph/example1/build + ``` + +* Run codegen to generate watcher: + + ```bash + yarn codegen --config-file ./config.yaml + ``` + + The watcher should be generated in `packages/test-watcher` + +* Create a postgres12 database for the watcher: + + ```bash + sudo su - postgres + + # If database already exists + # dropdb test-watcher + + createdb test-watcher + ``` + +* Create database for the job queue and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro): + + ```bash + # If database already exists + # dropdb test-watcher-job-queue + + createdb test-watcher-job-queue + ``` + + ``` + postgres@tesla:~$ psql -U postgres -h localhost test-watcher-job-queue + Password for user postgres: + psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1)) + SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off) + Type "help" for help. + + test-watcher-job-queue=# CREATE EXTENSION pgcrypto; + CREATE EXTENSION + test-watcher-job-queue=# exit + ``` + +* In `watcher-ts` repo, follow the instructions in [Setup](../../README.md#setup) for installing and building packages. + + ```bash + # After setup + yarn && yarn build + ``` + +* In `packages/test-watcher`, run the job-runner: + + ```bash + yarn job-runner + ``` + +* Run the watcher: + + ```bash + yarn server + ``` + +## Operations + +* Run the following GQL subscription at the [graphql endpoint](http://localhost:3008/graphql): + + ```graphql + subscription { + onEvent { + event { + __typename + ... on TestEvent { + param1 + param2 + }, + }, + block { + number + hash + } + } + } + ``` + +* In [packages/graph-node](../graph-node/), trigger the `Test` event by calling a example contract method: + + ```bash + yarn example:test --address $EXAMPLE_ADDRESS + ``` + + * A `Test` event shall be visible in the subscription at endpoint. + + * The subgraph entity `Category` should be updated in the database. + + * An auto-generated `diff-staged` entry `State` should be added. + +* Run the query for entity in at the endpoint: + + ```graphql + query { + category( + block: { hash: "EVENT_BLOCK_HASH" }, + id: "1" + ) { + __typename + id + count + name + } + } + ``` + +* Run the `getState` query at the endpoint to get the latest `State` for `EXAMPLE_ADDRESS`: + + ```graphql + query { + getState ( + blockHash: "EVENT_BLOCK_HASH" + contractAddress: "EXAMPLE_ADDRESS" + # kind: "checkpoint" + # kind: "diff" + kind: "diff_staged" + ) { + cid + block { + cid + hash + number + timestamp + parentHash + } + contractAddress + data + } + } + ``` + +* `diff` states get created corresponding to the `diff_staged` states when their respective blocks reach the pruned region. + +* In `packages/test-watcher`: + + * After the `diff` state has been created, create a `checkpoint`: + + ```bash + yarn checkpoint create --address $EXAMPLE_ADDRESS + ``` + + * A `checkpoint` state should be created at the latest canonical block hash. + + * Run the `getState` query again at the endpoint with the output `blockHash` and kind `checkpoint`. + +* All the `State` entries can be seen in `pg-admin` in table `state`. diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 5cc44f0dc..6c8cd2827 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -271,6 +271,11 @@ export class Indexer implements IndexerInterface { return undefined; } + async processStateCheckpoint (contractAddress: string, blockHash: string): Promise { + // TODO: Call checkpoint hook. + return false; + } + async processCheckpoint (blockHash: string): Promise { // TODO Implement } diff --git a/packages/erc721-watcher/README.md b/packages/erc721-watcher/README.md index 2df4ef94e..682e72c39 100644 --- a/packages/erc721-watcher/README.md +++ b/packages/erc721-watcher/README.md @@ -142,6 +142,12 @@ GQL console: http://localhost:3006/graphql yarn reset job-queue --block-number ``` + * Reset state: + + ```bash + yarn reset state --block-number + ``` + * `block-number`: Block number to which to reset the watcher. * To export and import the watcher state: diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 4dfabca30..53a5fdb62 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -244,4 +244,16 @@ export class Indexer implements IndexerInterface { async resetWatcherToBlock (blockNumber: number): Promise { return undefined; } + + cacheContract (contract: ContractInterface): void { + return undefined; + } + + async processInitialState (contractAddress: string, blockHash: string): Promise { + return undefined; + } + + async processStateCheckpoint (contractAddress: string, blockHash: string): Promise { + return false; + } } diff --git a/packages/mobymask-watcher/README.md b/packages/mobymask-watcher/README.md index c998fb51a..2b63d4347 100644 --- a/packages/mobymask-watcher/README.md +++ b/packages/mobymask-watcher/README.md @@ -136,6 +136,12 @@ GQL console: http://localhost:3010/graphql yarn reset job-queue --block-number ``` + * Reset state: + + ```bash + yarn reset state --block-number + ``` + * `block-number`: Block number to which to reset the watcher. * To export and import the watcher state: diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 417874d80..631cba291 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -482,22 +482,6 @@ export class Database { async getLatestPrunedEntity (repo: Repository, id: string, canonicalBlockNumber: number): Promise { // Filter out latest entity from pruned blocks. - - const entityInPrunedRegion = await repo.createQueryBuilder('entity') - .innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash') - .where('block.is_pruned = false') - .andWhere('entity.id = :id', { id }) - .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) - .orderBy('entity.block_number', 'DESC') - .limit(1) - .getOne(); - - return entityInPrunedRegion; - } - - async getLatestPrunedEntityWithoutJoin (repo: Repository, id: string, canonicalBlockNumber: number): Promise { - // Filter out latest entity from pruned blocks. - const entityInPrunedRegion = await repo.createQueryBuilder('entity') .where('entity.id = :id', { id }) .andWhere('entity.is_pruned = false') diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index a5e8c3c39..2bcac72ca 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -687,7 +687,6 @@ export class Indexer { } // Call initial state hook. - assert(indexer.processInitialState); const stateData = await indexer.processInitialState(contract.address, blockHash); const block = await this.getBlockProgress(blockHash); @@ -800,7 +799,6 @@ export class Indexer { assert(currentBlock.blockNumber <= stateSyncStatus.latestIndexedBlockNumber, 'State should be indexed for checkpoint at a block'); // Call state checkpoint hook and check if default checkpoint is disabled. - assert(indexer.processStateCheckpoint); const disableDefaultCheckpoint = await indexer.processStateCheckpoint(contractAddress, currentBlock.blockHash); if (disableDefaultCheckpoint) { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 420d47a5f..913576b1a 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -423,10 +423,7 @@ export class JobRunner { _updateWatchedContracts (job: any): void { const { data: { contract } } = job; - - assert(this._indexer.cacheContract); this._indexer.cacheContract(contract); - this._indexer.updateStateStatusMap(contract.address, {}); } } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index d07879607..6acc521a1 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -109,13 +109,12 @@ export interface IndexerInterface { parseEventNameAndArgs?: (kind: string, logObj: any) => any isWatchedContract: (address: string) => ContractInterface | undefined; getContractsByKind?: (kind: string) => ContractInterface[] - cacheContract?: (contract: ContractInterface) => void; + cacheContract: (contract: ContractInterface) => void; watchContract: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise getEntityTypesMap?: () => Map getRelationsMap?: () => Map - createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise - processInitialState?: (contractAddress: string, blockHash: string) => Promise - processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise + processInitialState: (contractAddress: string, blockHash: string) => Promise + processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise processBlock: (blockProgres: BlockProgressInterface) => Promise processBlockAfterEvents?: (blockHash: string, blockNumber: number) => Promise processCanonicalBlock (blockHash: string, blockNumber: number): Promise @@ -175,7 +174,7 @@ export interface DatabaseInterface { getDiffStatesInRange (contractAddress: string, startBlock: number, endBlock: number): Promise getNewState (): StateInterface removeStates(queryRunner: QueryRunner, blockNumber: number, kind: StateKind): Promise - removeStatesAfterBlock?: (queryRunner: QueryRunner, blockNumber: number) => Promise + removeStatesAfterBlock: (queryRunner: QueryRunner, blockNumber: number) => Promise saveOrUpdateState (queryRunner: QueryRunner, state: StateInterface): Promise getStateSyncStatus (): Promise updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise