diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 35efb1b4..5b98eea6 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -296,6 +296,10 @@ export class Indexer { await this.triggerIndexingOnEvent(event); } + async processBlock (blockProgress: BlockProgress): Promise { + // Method for processing on indexing new block. + } + parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index b996a756..1053d855 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -41,7 +41,9 @@ export class EthClient { async getStorageAt ({ blockHash, contract, slot }: { blockHash: string, contract: string, slot: string }): Promise<{ value: string, proof: { data: string } }> { slot = `0x${padKey(slot)}`; + console.time(`time:eth-client#getStorageAt-${JSON.stringify({ blockHash, contract, slot })}`); const result = await this._getCachedOrFetch('getStorageAt', { blockHash, contract, slot }); + console.timeEnd(`time:eth-client#getStorageAt-${JSON.stringify({ blockHash, contract, slot })}`); const { getStorageAt: { value, cid, ipldBlock } } = result; return { @@ -63,27 +65,37 @@ export class EthClient { } async getBlockWithTransactions ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise { - return this._graphqlClient.query( + console.time(`time:eth-client#getBlockWithTransactions-${JSON.stringify({ blockNumber, blockHash })}`); + const result = await this._graphqlClient.query( ethQueries.getBlockWithTransactions, { blockNumber: blockNumber?.toString(), blockHash } ); + console.timeEnd(`time:eth-client#getBlockWithTransactions-${JSON.stringify({ blockNumber, blockHash })}`); + + return result; } async getBlocks ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise { - return this._graphqlClient.query( + console.time(`time:eth-client#getBlocks-${JSON.stringify({ blockNumber, blockHash })}`); + const result = await this._graphqlClient.query( ethQueries.getBlocks, { blockNumber: blockNumber?.toString(), blockHash } ); + console.timeEnd(`time:eth-client#getBlocks-${JSON.stringify({ blockNumber, blockHash })}`); + + return result; } async getBlockByHash (blockHash?: string): Promise { + console.time(`time:eth-client#getBlockByHash-${blockHash}`); const result = await this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash }); + console.timeEnd(`time:eth-client#getBlockByHash-${blockHash}`); return { block: { @@ -95,7 +107,9 @@ export class EthClient { } async getLogs (vars: Vars): Promise { + console.time(`time:eth-client#getLogs-${JSON.stringify(vars)}`); const result = await this._getCachedOrFetch('getLogs', vars); + console.timeEnd(`time:eth-client#getLogs-${JSON.stringify(vars)}`); const { getLogs: resultLogs, block: { diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index bb7fb666..03fc6894 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -9,11 +9,11 @@ import { DeepPartial, FindConditions, FindManyOptions, - FindOneOptions, LessThanOrEqual, QueryRunner } from 'typeorm'; import path from 'path'; +import { SelectionNode } from 'graphql'; import { Database as BaseDatabase, @@ -21,7 +21,7 @@ import { BlockHeight, QueryOptions, Where, - Relation + ENTITY_QUERY_TYPE } from '@vulcanize/util'; import { Factory } from './entity/Factory'; @@ -47,10 +47,32 @@ import { SyncStatus } from './entity/SyncStatus'; import { TickDayData } from './entity/TickDayData'; import { Contract } from './entity/Contract'; +// Map: Entity to suitable query type. +const ENTITY_QUERY_TYPE_MAP = new Map any, number>([ + [Bundle, ENTITY_QUERY_TYPE.SINGULAR], + [Factory, ENTITY_QUERY_TYPE.SINGULAR], + [Pool, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Token, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Burn, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Mint, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Swap, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Transaction, ENTITY_QUERY_TYPE.DISTINCT_ON], + [TokenDayData, ENTITY_QUERY_TYPE.DISTINCT_ON], + [TokenHourData, ENTITY_QUERY_TYPE.DISTINCT_ON], + [PoolDayData, ENTITY_QUERY_TYPE.DISTINCT_ON], + [PoolHourData, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Position, ENTITY_QUERY_TYPE.DISTINCT_ON], + [PositionSnapshot, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Tick, ENTITY_QUERY_TYPE.DISTINCT_ON], + [TickDayData, ENTITY_QUERY_TYPE.DISTINCT_ON], + [UniswapDayData, ENTITY_QUERY_TYPE.GROUP_BY] +]); + export class Database implements DatabaseInterface { _config: ConnectionOptions _conn!: Connection _baseDatabase: BaseDatabase + _relationsMap: Map constructor (config: ConnectionOptions) { assert(config); @@ -61,6 +83,12 @@ export class Database implements DatabaseInterface { }; this._baseDatabase = new BaseDatabase(this._config); + this._relationsMap = new Map(); + this._populateRelationsMap(); + } + + get cachedEntities () { + return this._baseDatabase.cachedEntities; } async init (): Promise { @@ -79,20 +107,7 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } - - return entity; + return this._baseDatabase.getModelEntity(repo, whereOptions); } async getBundle (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial): Promise { @@ -107,23 +122,15 @@ export class Database implements DatabaseInterface { whereOptions.blockNumber = LessThanOrEqual(blockNumber); } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } - - return entity; + return this._baseDatabase.getModelEntity(repo, whereOptions); } - async getToken (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial, loadRelations = false): Promise { + async getToken ( + queryRunner: QueryRunner, + { id, blockHash, blockNumber }: DeepPartial, + loadRelations = false, + selections: ReadonlyArray = [] + ): Promise { const repo = queryRunner.manager.getRepository(Token); const whereOptions: FindConditions = { id }; @@ -135,31 +142,17 @@ export class Database implements DatabaseInterface { whereOptions.blockNumber = LessThanOrEqual(blockNumber); } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash, number: blockNumber }, - [ - { - entity: Pool, - type: 'many-to-many', - field: 'whitelistPools' - } - ], - [entity] + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + Token, + [entity], + selections ); } @@ -180,7 +173,12 @@ export class Database implements DatabaseInterface { return res; } - async getPool (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial, loadRelations = false): Promise { + async getPool ( + queryRunner: QueryRunner, + { id, blockHash, blockNumber }: DeepPartial, + loadRelations = false, + selections: ReadonlyArray = [] + ): Promise { const repo = queryRunner.manager.getRepository(Pool); const whereOptions: FindConditions = { id }; @@ -192,36 +190,17 @@ export class Database implements DatabaseInterface { whereOptions.blockNumber = LessThanOrEqual(blockNumber); } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash, number: blockNumber }, - [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ], - [entity] + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + Pool, + [entity], + selections ); } @@ -255,55 +234,15 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - }, - { - entity: Tick, - type: 'one-to-one', - field: 'tickLower' - }, - { - entity: Tick, - type: 'one-to-one', - field: 'tickUpper' - }, - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - } - ], + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + Position, [entity] ); } @@ -322,30 +261,15 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool' - } - ], + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + Tick, [entity] ); } @@ -375,30 +299,15 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool' - } - ], + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + PoolDayData, [entity] ); } @@ -414,30 +323,15 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool' - } - ], + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + PoolHourData, [entity] ); } @@ -453,18 +347,7 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + const entity = await this._baseDatabase.getModelEntity(repo, whereOptions); return entity; } @@ -477,30 +360,15 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash }, - [ - { - entity: Token, - type: 'one-to-one', - field: 'token' - } - ], + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + TokenDayData, [entity] ); } @@ -516,30 +384,15 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash }, - [ - { - entity: Token, - type: 'one-to-one', - field: 'token' - } - ], + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + TokenHourData, [entity] ); } @@ -555,35 +408,15 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + let entity = await this._baseDatabase.getModelEntity(repo, whereOptions); if (loadRelations && entity) { [entity] = await this._baseDatabase.loadRelations( queryRunner, { hash: blockHash }, - [ - { - entity: Tick, - type: 'one-to-one', - field: 'tick' - }, - { - entity: Pool, - type: 'one-to-one', - field: 'pool' - } - ], + this._relationsMap, + ENTITY_QUERY_TYPE_MAP, + TickDayData, [entity] ); } @@ -599,33 +432,22 @@ export class Database implements DatabaseInterface { whereOptions.blockHash = blockHash; } - const findOptions = { - where: whereOptions, - order: { - blockNumber: 'DESC' - } - }; - - let entity = await repo.findOne(findOptions as FindOneOptions); - - if (!entity && findOptions.where.blockHash) { - entity = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); - } + const entity = await this._baseDatabase.getModelEntity(repo, whereOptions); return entity; } - async getModelEntities (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: Relation[] = []): Promise { - return this._baseDatabase.getModelEntities(queryRunner, entity, block, where, queryOptions, relations); + async getModelEntities (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, selections: ReadonlyArray = []): Promise { + return this._baseDatabase.getModelEntities(queryRunner, this._relationsMap, ENTITY_QUERY_TYPE_MAP, entity, block, where, queryOptions, selections); } - async getModelEntitiesNoTx (entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: Relation[] = []): Promise { + async getModelEntitiesNoTx (entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, selections: ReadonlyArray = []): Promise { const queryRunner = this._conn.createQueryRunner(); let res; try { await queryRunner.connect(); - res = await this.getModelEntities(queryRunner, entity, block, where, queryOptions, relations); + res = await this.getModelEntities(queryRunner, entity, block, where, queryOptions, selections); } finally { await queryRunner.release(); } @@ -637,119 +459,170 @@ export class Database implements DatabaseInterface { const repo = queryRunner.manager.getRepository(Factory); factory.blockNumber = block.number; factory.blockHash = block.hash; - return repo.save(factory); + const data = await repo.save(factory); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveBundle (queryRunner: QueryRunner, bundle: Bundle, block: Block): Promise { const repo = queryRunner.manager.getRepository(Bundle); bundle.blockNumber = block.number; bundle.blockHash = block.hash; - return repo.save(bundle); + const data = await repo.save(bundle); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async savePool (queryRunner: QueryRunner, pool: Pool, block: Block): Promise { const repo = queryRunner.manager.getRepository(Pool); pool.blockNumber = block.number; pool.blockHash = block.hash; - return repo.save(pool); + const data = await repo.save(pool); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async savePoolDayData (queryRunner: QueryRunner, poolDayData: PoolDayData, block: Block): Promise { const repo = queryRunner.manager.getRepository(PoolDayData); poolDayData.blockNumber = block.number; poolDayData.blockHash = block.hash; - return repo.save(poolDayData); + const data = await repo.save(poolDayData); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async savePoolHourData (queryRunner: QueryRunner, poolHourData: PoolHourData, block: Block): Promise { const repo = queryRunner.manager.getRepository(PoolHourData); poolHourData.blockNumber = block.number; poolHourData.blockHash = block.hash; - return repo.save(poolHourData); + const data = await repo.save(poolHourData); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveToken (queryRunner: QueryRunner, token: Token, block: Block): Promise { const repo = queryRunner.manager.getRepository(Token); token.blockNumber = block.number; token.blockHash = block.hash; - return repo.save(token); + const data = await repo.save(token); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveTransaction (queryRunner: QueryRunner, transaction: Transaction, block: Block): Promise { const repo = queryRunner.manager.getRepository(Transaction); transaction.blockNumber = block.number; transaction.blockHash = block.hash; - return repo.save(transaction); + const data = await repo.save(transaction); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveUniswapDayData (queryRunner: QueryRunner, uniswapDayData: UniswapDayData, block: Block): Promise { const repo = queryRunner.manager.getRepository(UniswapDayData); uniswapDayData.blockNumber = block.number; uniswapDayData.blockHash = block.hash; - return repo.save(uniswapDayData); + const data = await repo.save(uniswapDayData); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveTokenDayData (queryRunner: QueryRunner, tokenDayData: TokenDayData, block: Block): Promise { const repo = queryRunner.manager.getRepository(TokenDayData); tokenDayData.blockNumber = block.number; tokenDayData.blockHash = block.hash; - return repo.save(tokenDayData); + const data = await repo.save(tokenDayData); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveTokenHourData (queryRunner: QueryRunner, tokenHourData: TokenHourData, block: Block): Promise { const repo = queryRunner.manager.getRepository(TokenHourData); tokenHourData.blockNumber = block.number; tokenHourData.blockHash = block.hash; - return repo.save(tokenHourData); + const data = await repo.save(tokenHourData); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveTick (queryRunner: QueryRunner, tick: Tick, block: Block): Promise { const repo = queryRunner.manager.getRepository(Tick); tick.blockNumber = block.number; tick.blockHash = block.hash; - return repo.save(tick); + const data = await repo.save(tick); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveTickDayData (queryRunner: QueryRunner, tickDayData: TickDayData, block: Block): Promise { const repo = queryRunner.manager.getRepository(TickDayData); tickDayData.blockNumber = block.number; tickDayData.blockHash = block.hash; - return repo.save(tickDayData); + const data = await repo.save(tickDayData); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async savePosition (queryRunner: QueryRunner, position: Position, block: Block): Promise { const repo = queryRunner.manager.getRepository(Position); position.blockNumber = block.number; position.blockHash = block.hash; - return repo.save(position); + const data = await repo.save(position); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async savePositionSnapshot (queryRunner: QueryRunner, positionSnapshot: PositionSnapshot, block: Block): Promise { const repo = queryRunner.manager.getRepository(PositionSnapshot); positionSnapshot.blockNumber = block.number; positionSnapshot.blockHash = block.hash; - return repo.save(positionSnapshot); + const data = await repo.save(positionSnapshot); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveMint (queryRunner: QueryRunner, mint: Mint, block: Block): Promise { const repo = queryRunner.manager.getRepository(Mint); mint.blockNumber = block.number; mint.blockHash = block.hash; - return repo.save(mint); + const data = await repo.save(mint); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveBurn (queryRunner: QueryRunner, burn: Burn, block: Block): Promise { const repo = queryRunner.manager.getRepository(Burn); burn.blockNumber = block.number; burn.blockHash = block.hash; - return repo.save(burn); + const data = await repo.save(burn); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async saveSwap (queryRunner: QueryRunner, swap: Swap, block: Block): Promise { const repo = queryRunner.manager.getRepository(Swap); swap.blockNumber = block.number; swap.blockHash = block.hash; - return repo.save(swap); + const data = await repo.save(swap); + this._baseDatabase.cacheUpdatedEntity(repo, data); + + return data; } async getContracts (): Promise { @@ -877,4 +750,103 @@ export class Database implements DatabaseInterface { async getAncestorAtDepth (blockHash: string, depth: number): Promise { return this._baseDatabase.getAncestorAtDepth(blockHash, depth); } + + _populateRelationsMap (): void { + // Needs to be generated by codegen. + this._relationsMap.set(Pool, { + token0: { + entity: Token, + type: 'one-to-one' + }, + token1: { + entity: Token, + type: 'one-to-one' + } + }); + + this._relationsMap.set(Burn, { + pool: { + entity: Pool, + type: 'one-to-one' + }, + transaction: { + entity: Transaction, + type: 'one-to-one' + } + }); + + this._relationsMap.set(Mint, { + pool: { + entity: Pool, + type: 'one-to-one' + }, + transaction: { + entity: Transaction, + type: 'one-to-one' + } + }); + + this._relationsMap.set(Swap, { + pool: { + entity: Pool, + type: 'one-to-one' + }, + transaction: { + entity: Transaction, + type: 'one-to-one' + } + }); + + this._relationsMap.set(Token, { + whitelistPools: { + entity: Pool, + type: 'many-to-many' + } + }); + + this._relationsMap.set(Transaction, { + mints: { + entity: Mint, + type: 'one-to-many', + foreignKey: 'transaction' + }, + burns: { + entity: Burn, + type: 'one-to-many', + foreignKey: 'transaction' + }, + swaps: { + entity: Swap, + type: 'one-to-many', + foreignKey: 'transaction' + } + }); + + this._relationsMap.set(Position, { + pool: { + entity: Pool, + type: 'one-to-one' + }, + token0: { + entity: Token, + type: 'one-to-one' + }, + token1: { + entity: Token, + type: 'one-to-one' + }, + tickLower: { + entity: Tick, + type: 'one-to-one' + }, + tickUpper: { + entity: Tick, + type: 'one-to-one' + }, + transaction: { + entity: Transaction, + type: 'one-to-one' + } + }); + } } diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index ef5186c2..a758fcf4 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -11,7 +11,8 @@ import { providers, utils, BigNumber } from 'ethers'; import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, Relation, GraphDecimal, JobQueue, Where, DEFAULT_LIMIT } from '@vulcanize/util'; +import { IndexerInterface, Indexer as BaseIndexer, QueryOptions, OrderDirection, BlockHeight, GraphDecimal, JobQueue, Where, DEFAULT_LIMIT, eventProcessingEthCallDuration } from '@vulcanize/util'; +import { SelectionNode } from 'graphql'; import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; @@ -155,6 +156,22 @@ export class Indexer implements IndexerInterface { console.timeEnd('time:indexer#processEvent-mapping_code'); } + async processBlock (blockProgress: BlockProgress): Promise { + // Set latest block in frothy region to cachedEntities.frothyBlocks map. + if (!this._db.cachedEntities.frothyBlocks.has(blockProgress.blockHash)) { + this._db.cachedEntities.frothyBlocks.set( + blockProgress.blockHash, + { + blockNumber: blockProgress.blockNumber, + parentHash: blockProgress.parentHash, + entities: new Map() + } + ); + + log(`Size of cachedEntities.frothyBlocks map: ${this._db.cachedEntities.frothyBlocks.size}`); + } + } + async getBlockEntities (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise { if (where.timestamp_gt) { where.blockTimestamp = MoreThan(where.timestamp_gt); @@ -227,12 +244,12 @@ export class Indexer implements IndexerInterface { return res; } - async getPool (id: string, block: BlockHeight): Promise { + async getPool (id: string, block: BlockHeight, selections: ReadonlyArray = []): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { - res = await this._db.getPool(dbTx, { id, blockHash: block.hash, blockNumber: block.number }, true); + res = await this._db.getPool(dbTx, { id, blockHash: block.hash, blockNumber: block.number }, true, selections); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -244,12 +261,12 @@ export class Indexer implements IndexerInterface { return res; } - async getToken (id: string, block: BlockHeight): Promise { + async getToken (id: string, block: BlockHeight, selections: ReadonlyArray = []): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { - res = await this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number }, true); + res = await this._db.getToken(dbTx, { id, blockHash: block.hash, blockNumber: block.number }, true, selections); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -261,7 +278,7 @@ export class Indexer implements IndexerInterface { return res; } - async getEntities (entity: new () => Entity, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions, relations?: Relation[]): Promise { + async getEntities (entity: new () => Entity, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions, selections: ReadonlyArray = []): Promise { const dbTx = await this._db.createTransactionRunner(); let res; @@ -299,7 +316,7 @@ export class Indexer implements IndexerInterface { queryOptions.limit = DEFAULT_LIMIT; } - res = await this._db.getModelEntities(dbTx, entity, block, where, queryOptions, relations); + res = await this._db.getModelEntities(dbTx, entity, block, where, queryOptions, selections); dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -379,7 +396,10 @@ export class Indexer implements IndexerInterface { } async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise { - return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force); + const syncStatus = await this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force); + this._updateCachedEntitiesFrothyBlocks(syncStatus.latestCanonicalBlockHash, syncStatus.latestCanonicalBlockNumber); + + return syncStatus; } async getSyncStatus (): Promise { @@ -410,10 +430,39 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } + _updateCachedEntitiesFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) { + const canonicalBlock = this._db.cachedEntities.frothyBlocks.get(canonicalBlockHash); + + if (canonicalBlock) { + // Update latestPrunedEntities map with entities from latest canonical block. + canonicalBlock.entities.forEach((entityIdMap, entityTableName) => { + entityIdMap.forEach((data, id) => { + let entityIdMap = this._db.cachedEntities.latestPrunedEntities.get(entityTableName); + + if (!entityIdMap) { + entityIdMap = new Map(); + } + + entityIdMap.set(id, data); + this._db.cachedEntities.latestPrunedEntities.set(entityTableName, entityIdMap); + }); + }); + } + + // Remove pruned blocks from frothyBlocks. + const prunedBlockHashes = Array.from(this._db.cachedEntities.frothyBlocks.entries()) + .filter(([, value]) => value.blockNumber <= canonicalBlockNumber) + .map(([blockHash]) => blockHash); + + prunedBlockHashes.forEach(blockHash => this._db.cachedEntities.frothyBlocks.delete(blockHash)); + } + async _fetchEvents (block: DeepPartial): Promise[]> { assert(block.blockHash); + console.time('time:indexer#_fetchEvents-uni-get-events'); const events = await this._uniClient.getEvents(block.blockHash); + console.timeEnd('time:indexer#_fetchEvents-uni-get-events'); const dbEvents: Array> = []; @@ -550,6 +599,8 @@ export class Indexer implements IndexerInterface { token.id = tokenAddress; console.time('time:indexer#_initToken-eth_call_for_token'); + const endTimer = eventProcessingEthCallDuration.startTimer(); + const symbolPromise = this._erc20Client.getSymbol(block.hash, tokenAddress); const namePromise = this._erc20Client.getName(block.hash, tokenAddress); const totalSupplyPromise = this._erc20Client.getTotalSupply(block.hash, tokenAddress); @@ -562,6 +613,7 @@ export class Indexer implements IndexerInterface { { value: decimals } ] = await Promise.all([symbolPromise, namePromise, totalSupplyPromise, decimalsPromise]); + endTimer(); console.timeEnd('time:indexer#_initToken-eth_call_for_token'); token.symbol = symbol; @@ -1381,7 +1433,11 @@ export class Indexer implements IndexerInterface { if (!position) { try { console.time('time:indexer#_getPosition-eth_call_for_positions'); + let endTimer = eventProcessingEthCallDuration.startTimer(); + const { value: positionResult } = await this._uniClient.positions(blockHash, contractAddress, tokenId); + + endTimer(); console.timeEnd('time:indexer#_getPosition-eth_call_for_positions'); let factoryAddress = FACTORY_ADDRESS; @@ -1393,7 +1449,11 @@ export class Indexer implements IndexerInterface { } console.time('time:indexer#_getPosition-eth_call_for_getPool'); + endTimer = eventProcessingEthCallDuration.startTimer(); + let { value: poolAddress } = await this._uniClient.callGetPool(blockHash, factoryAddress, positionResult.token0, positionResult.token1, positionResult.fee); + + endTimer(); console.timeEnd('time:indexer#_getPosition-eth_call_for_getPool'); // Get the pool address in lowercase. @@ -1441,7 +1501,11 @@ export class Indexer implements IndexerInterface { async _updateFeeVars (position: Position, block: Block, contractAddress: string, tokenId: bigint): Promise { try { console.time('time:indexer#_updateFeeVars-eth_call_for_positions'); + const endTimer = eventProcessingEthCallDuration.startTimer(); + const { value: positionResult } = await this._uniClient.positions(block.hash, contractAddress, tokenId); + + endTimer(); console.timeEnd('time:indexer#_updateFeeVars-eth_call_for_positions'); if (positionResult) { diff --git a/packages/uni-info-watcher/src/resolvers.ts b/packages/uni-info-watcher/src/resolvers.ts index 5c67f2f3..1b4835ea 100644 --- a/packages/uni-info-watcher/src/resolvers.ts +++ b/packages/uni-info-watcher/src/resolvers.ts @@ -5,7 +5,7 @@ import assert from 'assert'; import BigInt from 'apollo-type-bigint'; import debug from 'debug'; -import { GraphQLScalarType } from 'graphql'; +import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; import { BlockHeight, gqlQueryCount, gqlTotalQueryCount, GraphDecimal, OrderDirection } from '@vulcanize/util'; @@ -25,6 +25,7 @@ import { UniswapDayData } from './entity/UniswapDayData'; import { Position } from './entity/Position'; import { EventWatcher } from './events'; import { Transaction } from './entity/Transaction'; +import { FACTORY_ADDRESS, BUNDLE_ID } from './utils/constants'; const log = debug('vulcanize:resolver'); @@ -62,7 +63,10 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Query: { - bundle: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + bundle: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight } + ) => { log('bundle', id, block); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('bundle').inc(1); @@ -75,97 +79,72 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch gqlTotalQueryCount.inc(1); gqlQueryCount.labels('bundles').inc(1); - return indexer.getEntities(Bundle, block, {}, { limit: first }); + return indexer.getEntities(Bundle, block, { id: BUNDLE_ID }, { limit: first }); }, - burns: async (_: any, { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + burns: async ( + _: any, + { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }, + __: any, + info: GraphQLResolveInfo + ) => { log('burns', first, orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('burns').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getEntities( Burn, block, where, { limit: first, orderBy, orderDirection }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool', - childRelations: [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ] - }, - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - } - ] + info.fieldNodes[0].selectionSet.selections ); }, - factories: async (_: any, { block = {}, first }: { first: number, block: BlockHeight }) => { + factories: async ( + _: any, + { block = {}, first }: { first: number, block: BlockHeight } + ) => { log('factories', block, first); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('factories').inc(1); - return indexer.getEntities(Factory, block, {}, { limit: first }); + return indexer.getEntities(Factory, block, { id: FACTORY_ADDRESS }, { limit: first }); }, - mints: async (_: any, { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + mints: async ( + _: any, + { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }, + __: any, + info: GraphQLResolveInfo + ) => { log('mints', first, orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('mints').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getEntities( Mint, block, where, { limit: first, orderBy, orderDirection }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool', - childRelations: [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ] - }, - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - } - ] + info.fieldNodes[0].selectionSet.selections ); }, - pool: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + pool: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('pool', id, block); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('pool').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getPool(id, block); + return indexer.getPool(id, block, info.fieldNodes[0].selectionSet.selections); }, poolDayDatas: async (_: any, { block = {}, first, skip, orderBy, orderDirection, where }: { block: BlockHeight, first: number, skip: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { @@ -176,69 +155,50 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch return indexer.getEntities(PoolDayData, block, where, { limit: first, skip, orderBy, orderDirection }); }, - pools: async (_: any, { block = {}, first, orderBy, orderDirection, where = {} }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + pools: async ( + _: any, + { block = {}, first, orderBy, orderDirection, where = {} }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }, + __: any, + info: GraphQLResolveInfo + ) => { log('pools', block, first, orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('pools').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getEntities( Pool, block, where, { limit: first, orderBy, orderDirection }, - [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ] + info.fieldNodes[0].selectionSet.selections ); }, - swaps: async (_: any, { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + swaps: async ( + _: any, + { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }, + __: any, + info: GraphQLResolveInfo + ) => { log('swaps', first, orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('swaps').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getEntities( Swap, block, where, { limit: first, orderBy, orderDirection }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool', - childRelations: [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ] - }, - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - } - ] + info.fieldNodes[0].selectionSet.selections ); }, - ticks: async (_: any, { block = {}, first, skip, where = {} }: { block: BlockHeight, first: number, skip: number, where: { [key: string]: any } }) => { + ticks: async ( + _: any, + { block = {}, first, skip, where = {} }: { block: BlockHeight, first: number, skip: number, where: { [key: string]: any } } + ) => { log('ticks', block, first, skip, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('ticks').inc(1); @@ -246,35 +206,44 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch return indexer.getEntities(Tick, block, where, { limit: first, skip }); }, - token: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + token: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('token', id, block); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('token').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getToken(id, block); + return indexer.getToken(id, block, info.fieldNodes[0].selectionSet.selections); }, - tokens: async (_: any, { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + tokens: async ( + _: any, + { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }, + __: any, + info: GraphQLResolveInfo + ) => { log('tokens', orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('tokens').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getEntities( Token, block, where, { limit: first, orderBy, orderDirection }, - [ - { - entity: Pool, - type: 'many-to-many', - field: 'whitelistPools' - } - ] + info.fieldNodes[0].selectionSet.selections ); }, - tokenDayDatas: async (_: any, { block = {}, first, skip, orderBy, orderDirection, where }: { block: BlockHeight, first: number, skip: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + tokenDayDatas: async ( + _: any, + { block = {}, first, skip, orderBy, orderDirection, where }: { block: BlockHeight, first: number, skip: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } } + ) => { log('tokenDayDatas', first, skip, orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('tokenDayDatas').inc(1); @@ -282,7 +251,10 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch return indexer.getEntities(TokenDayData, block, where, { limit: first, skip, orderBy, orderDirection }); }, - tokenHourDatas: async (_: any, { block = {}, first, skip, orderBy, orderDirection, where }: { block: BlockHeight, first: number, skip: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + tokenHourDatas: async ( + _: any, + { block = {}, first, skip, orderBy, orderDirection, where }: { block: BlockHeight, first: number, skip: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } } + ) => { log('tokenHourDatas', first, skip, orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('tokenHourDatas').inc(1); @@ -290,112 +262,30 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch return indexer.getEntities(TokenHourData, block, where, { limit: first, skip, orderBy, orderDirection }); }, - transactions: async (_: any, { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + transactions: async ( + _: any, + { block = {}, first, orderBy, orderDirection, where }: { block: BlockHeight, first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }, + __: any, + info: GraphQLResolveInfo + ) => { log('transactions', first, orderBy, orderDirection); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('transactions').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getEntities( Transaction, block, where, { limit: first, orderBy, orderDirection }, - [ - { - entity: Mint, - type: 'one-to-many', - field: 'mints', - foreignKey: 'transaction', - childRelations: [ - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - }, - { - entity: Pool, - type: 'one-to-one', - field: 'pool', - childRelations: [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ] - } - ] - }, - { - entity: Burn, - type: 'one-to-many', - field: 'burns', - foreignKey: 'transaction', - childRelations: [ - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - }, - { - entity: Pool, - type: 'one-to-one', - field: 'pool', - childRelations: [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ] - } - ] - }, - { - entity: Swap, - type: 'one-to-many', - field: 'swaps', - foreignKey: 'transaction', - childRelations: [ - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - }, - { - entity: Pool, - type: 'one-to-one', - field: 'pool', - childRelations: [ - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - } - ] - } - ] - } - ] + info.fieldNodes[0].selectionSet.selections ); }, - uniswapDayDatas: async (_: any, { block = {}, first, skip, orderBy, orderDirection, where }: { block: BlockHeight, first: number, skip: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => { + uniswapDayDatas: async ( + _: any, + { block = {}, first, skip, orderBy, orderDirection, where }: { block: BlockHeight, first: number, skip: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } } + ) => { log('uniswapDayDatas', first, skip, orderBy, orderDirection, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('uniswapDayDatas').inc(1); @@ -403,48 +293,23 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch return indexer.getEntities(UniswapDayData, block, where, { limit: first, skip, orderBy, orderDirection }); }, - positions: async (_: any, { block = {}, first, where }: { block: BlockHeight, first: number, where: { [key: string]: any } }) => { + positions: async ( + _: any, + { block = {}, first, where }: { block: BlockHeight, first: number, where: { [key: string]: any } }, + __: any, + info: GraphQLResolveInfo + ) => { log('positions', first, where); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('positions').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getEntities( Position, block, where, { limit: first }, - [ - { - entity: Pool, - type: 'one-to-one', - field: 'pool' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token0' - }, - { - entity: Token, - type: 'one-to-one', - field: 'token1' - }, - { - entity: Tick, - type: 'one-to-one', - field: 'tickLower' - }, - { - entity: Tick, - type: 'one-to-one', - field: 'tickUpper' - }, - { - entity: Transaction, - type: 'one-to-one', - field: 'transaction' - } - ] + info.fieldNodes[0].selectionSet.selections ); }, diff --git a/packages/uni-info-watcher/src/utils/constants.ts b/packages/uni-info-watcher/src/utils/constants.ts index 44761a5d..e7b5729b 100644 --- a/packages/uni-info-watcher/src/utils/constants.ts +++ b/packages/uni-info-watcher/src/utils/constants.ts @@ -10,6 +10,7 @@ export const ADDRESS_ZERO = utils.getAddress('0x00000000000000000000000000000000 export const FACTORY_ADDRESS = '0x1F98431c8aD98523631AE4a59f267346ea31F984'; export const NFPM_ADDRESS = '0xC36442b4a4522E871399CD717aBDD847Ab11FE88'; +export const BUNDLE_ID = '1'; export const WATCHED_CONTRACTS = [ { diff --git a/packages/uni-info-watcher/src/utils/index.ts b/packages/uni-info-watcher/src/utils/index.ts index 75b56321..a2151758 100644 --- a/packages/uni-info-watcher/src/utils/index.ts +++ b/packages/uni-info-watcher/src/utils/index.ts @@ -2,7 +2,7 @@ // Copyright 2021 Vulcanize, Inc. // -import { BigNumber, utils } from 'ethers'; +import { utils } from 'ethers'; import { QueryRunner } from 'typeorm'; import assert from 'assert'; @@ -63,19 +63,7 @@ export const bigDecimalExponated = (value: GraphDecimal, power: bigint): GraphDe return new GraphDecimal(1); } - const negativePower = power < BigInt(0); - let result = (new GraphDecimal(0)).plus(value); - const powerAbs = BigNumber.from(power).abs(); - - for (let i = BigNumber.from(1); i.lt(powerAbs); i = i.add(1)) { - result = result.times(value); - } - - if (negativePower) { - result = safeDiv(new GraphDecimal(1), result); - } - - return result; + return value.pow(power.toString()); }; export const loadFactory = async (db: Database, dbTx: QueryRunner, block: Block, isDemo: boolean): Promise => { diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 9c7e0669..83e12a2b 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -119,6 +119,10 @@ export class Indexer implements IndexerInterface { } } + async processBlock (blockProgress: BlockProgress): Promise { + // Method for processing on indexing new block. + } + parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 78b147a1..902118aa 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -63,9 +63,7 @@ export const processBlockByNumber = async ( }); if (!blocks.length) { - console.time('time:common#processBlockByNumber-ipld-eth-server'); blocks = await indexer.getBlocks({ blockNumber }); - console.timeEnd('time:common#processBlockByNumber-ipld-eth-server'); } if (blocks.length) { diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 9969eb15..0884b41b 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -3,6 +3,7 @@ // import assert from 'assert'; +import debug from 'debug'; import { Brackets, Connection, @@ -18,12 +19,13 @@ import { SelectQueryBuilder } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; +import { RawSqlResultsToEntityTransformer } from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer'; import _ from 'lodash'; -import { RelationType } from 'typeorm/metadata/types/RelationTypes'; +import { SelectionNode } from 'graphql'; import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types'; import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME } from './constants'; -import { blockProgressCount, eventCount } from './metrics'; +import { blockProgressCount, eventCount, eventProcessingLoadEntityDBQueryDuration, eventProcessingLoadEntityCacheHitCount, eventProcessingLoadEntityCount } from './metrics'; const OPERATOR_MAP = { equals: '=', @@ -40,6 +42,14 @@ const OPERATOR_MAP = { const INSERT_EVENTS_BATCH = 100; export const DEFAULT_LIMIT = 100; +const log = debug('vulcanize:database'); + +export enum ENTITY_QUERY_TYPE { + SINGULAR, + DISTINCT_ON, + GROUP_BY +} + export interface BlockHeight { number?: number; hash?: string; @@ -65,19 +75,38 @@ export interface Where { }] } -export type Relation = { entity: any, type: RelationType, field: string, foreignKey?: string, childRelations?: Relation[] } +// Cache for updated entities used in job-runner event processing. +export interface CachedEntities { + frothyBlocks: Map< + string, + { + blockNumber: number; + parentHash: string; + entities: Map>; + } + >; + latestPrunedEntities: Map>; +} export class Database { _config: ConnectionOptions _conn!: Connection _blockCount = 0 _eventCount = 0 + _cachedEntities: CachedEntities = { + frothyBlocks: new Map(), + latestPrunedEntities: new Map() + } constructor (config: ConnectionOptions) { assert(config); this._config = config; } + get cachedEntities () { + return this._cachedEntities; + } + async init (): Promise { assert(!this._conn); @@ -358,7 +387,54 @@ export class Database { return event; } - async getModelEntities (queryRunner: QueryRunner, entity: new () => Entity, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, relations: Relation[] = []): Promise { + async getModelEntities ( + queryRunner: QueryRunner, + relationsMap: Map, + entityQueryMap: Map Entity, ENTITY_QUERY_TYPE>, + entity: new () => Entity, + block: BlockHeight, + where: Where = {}, + queryOptions: QueryOptions = {}, + selections: ReadonlyArray = [] + ): Promise { + let entities: Entity[]; + + // Use different suitable query patterns based on entities. + switch (entityQueryMap.get(entity)) { + case ENTITY_QUERY_TYPE.SINGULAR: + entities = await this.getModelEntitiesSingular(queryRunner, entity, block, where); + break; + + case ENTITY_QUERY_TYPE.DISTINCT_ON: + entities = await this.getModelEntitiesDistinctOn(queryRunner, entity, block, where, queryOptions); + break; + + case ENTITY_QUERY_TYPE.GROUP_BY: + entities = await this.getModelEntitiesGroupBy(queryRunner, entity, block, where, queryOptions); + break; + + default: + log(`Invalid entity query type for entity ${entity}`); + entities = []; + break; + } + + if (!entities.length) { + return []; + } + + entities = await this.loadRelations(queryRunner, block, relationsMap, entityQueryMap, entity, entities, selections); + + return entities; + } + + async getModelEntitiesGroupBy ( + queryRunner: QueryRunner, + entity: new () => Entity, + block: BlockHeight, + where: Where = {}, + queryOptions: QueryOptions = {} + ): Promise { const repo = queryRunner.manager.getRepository(entity); const { tableName } = repo.metadata; @@ -408,152 +484,346 @@ export class Database { selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit); } - let entities = await selectQueryBuilder.getMany(); + const entities = await selectQueryBuilder.getMany(); - if (!entities.length) { - return []; + return entities; + } + + async getModelEntitiesDistinctOn ( + queryRunner: QueryRunner, + entity: new () => Entity, + block: BlockHeight, + where: Where = {}, + queryOptions: QueryOptions = {} + ): Promise { + const repo = queryRunner.manager.getRepository(entity); + + let subQuery = repo.createQueryBuilder('subTable') + .distinctOn(['subTable.id']) + .addFrom('block_progress', 'blockProgress') + .where('subTable.block_hash = blockProgress.block_hash') + .andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false }) + .addOrderBy('subTable.id', 'ASC') + .addOrderBy('subTable.block_number', 'DESC'); + + if (block.hash) { + const { canonicalBlockNumber, blockHashes } = await this.getFrothyRegion(queryRunner, block.hash); + + subQuery = subQuery + .andWhere(new Brackets(qb => { + qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes }) + .orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }); + })); } - entities = await this.loadRelations(queryRunner, block, relations, entities); + if (block.number) { + subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number }); + } - return entities; + subQuery = this._buildQuery(repo, subQuery, where); + + let selectQueryBuilder = queryRunner.manager.createQueryBuilder() + .from( + `(${subQuery.getQuery()})`, + 'latestEntities' + ) + .setParameters(subQuery.getParameters()); + + if (queryOptions.orderBy) { + selectQueryBuilder = this._orderQuery(repo, selectQueryBuilder, queryOptions, 'subTable_'); + if (queryOptions.orderBy !== 'id') { + selectQueryBuilder = this._orderQuery(repo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }, 'subTable_'); + } + } + + if (queryOptions.skip) { + selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip); + } + + if (queryOptions.limit) { + selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit); + } + + let entities = await selectQueryBuilder.getRawMany(); + entities = await this._transformResults(queryRunner, repo.createQueryBuilder('subTable'), entities); + + return entities as Entity[]; } - async loadRelations (queryRunner: QueryRunner, block: BlockHeight, relations: Relation[], entities: Entity[]): Promise { - const relationPromises = relations.map(async relation => { - const { entity: relationEntity, type, field, foreignKey, childRelations = [] } = relation; - - switch (type) { - case 'one-to-many': { - assert(foreignKey); - - const where: Where = { - [foreignKey]: [{ - value: entities.map((entity: any) => entity.id), - not: false, - operator: 'in' - }] - }; - - const relatedEntities = await this.getModelEntities( - queryRunner, - relationEntity, - block, - where, - {}, - childRelations - ); - - const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => { - // Related entity might be loaded with data. - const parentEntityId = entity[foreignKey].id ?? entity[foreignKey]; - - if (!acc[parentEntityId]) { - acc[parentEntityId] = []; - } + async getModelEntitiesSingular ( + queryRunner: QueryRunner, + entity: new () => Entity, + block: BlockHeight, + where: Where = {} + ): Promise { + const repo = queryRunner.manager.getRepository(entity); + const { tableName } = repo.metadata; - if (acc[parentEntityId].length < DEFAULT_LIMIT) { - acc[parentEntityId].push(entity); - } + let selectQueryBuilder = repo.createQueryBuilder(tableName) + .addFrom('block_progress', 'blockProgress') + .where(`${tableName}.block_hash = blockProgress.block_hash`) + .andWhere('blockProgress.is_pruned = :isPruned', { isPruned: false }) + .addOrderBy(`${tableName}.block_number`, 'DESC') + .limit(1); - return acc; - }, {}); + if (block.hash) { + const { canonicalBlockNumber, blockHashes } = await this.getFrothyRegion(queryRunner, block.hash); - entities.forEach((entity: any) => { - if (relatedEntitiesMap[entity.id]) { - entity[field] = relatedEntitiesMap[entity.id]; - } else { - entity[field] = []; - } - }); + selectQueryBuilder = selectQueryBuilder + .andWhere(new Brackets(qb => { + qb.where(`${tableName}.block_hash IN (:...blockHashes)`, { blockHashes }) + .orWhere(`${tableName}.block_number <= :canonicalBlockNumber`, { canonicalBlockNumber }); + })); + } + + if (block.number) { + selectQueryBuilder = selectQueryBuilder.andWhere(`${tableName}.block_number <= :blockNumber`, { blockNumber: block.number }); + } + + selectQueryBuilder = this._buildQuery(repo, selectQueryBuilder, where); + + const entities = await selectQueryBuilder.getMany(); + + return entities as Entity[]; + } + + async getModelEntity (repo: Repository, whereOptions: any): Promise { + eventProcessingLoadEntityCount.inc(); - break; + const findOptions = { + where: whereOptions, + order: { + blockNumber: 'DESC' + } + }; + + if (findOptions.where.blockHash) { + // Check cache only if latestPrunedEntities is updated. + // latestPrunedEntities is updated when frothyBlocks is filled till canonical block height. + if (this._cachedEntities.latestPrunedEntities.size > 0) { + let frothyBlock = this._cachedEntities.frothyBlocks.get(findOptions.where.blockHash); + let canonicalBlockNumber = -1; + + // Loop through frothy region until latest entity is found. + while (frothyBlock) { + const entity = frothyBlock.entities + .get(repo.metadata.tableName) + ?.get(findOptions.where.id); + + if (entity) { + eventProcessingLoadEntityCacheHitCount.inc(); + return _.cloneDeep(entity) as Entity; + } + + canonicalBlockNumber = frothyBlock.blockNumber + 1; + frothyBlock = this._cachedEntities.frothyBlocks.get(frothyBlock.parentHash); } - case 'many-to-many': { - const relatedIds = entities.reduce((acc, entity: any) => { - entity[field].forEach((relatedEntityId: any) => acc.add(relatedEntityId)); + // Canonical block number is not assigned if blockHash does not exist in frothy region. + // Get latest pruned entity from cache only if blockHash exists in frothy region. + // i.e. Latest entity in cache is the version before frothy region. + if (canonicalBlockNumber > -1) { + // If entity not found in frothy region get latest entity in the pruned region. + // Check if latest entity is cached in pruned region. + const entity = this._cachedEntities.latestPrunedEntities + .get(repo.metadata.tableName) + ?.get(findOptions.where.id); + + if (entity) { + eventProcessingLoadEntityCacheHitCount.inc(); + return _.cloneDeep(entity) as Entity; + } - return acc; - }, new Set()); + // Get latest pruned entity from DB if not found in cache. + const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer(); + const dbEntity = await this._getLatestPrunedEntity(repo, findOptions.where.id, canonicalBlockNumber); + endTimer(); - const where: Where = { - id: [{ - value: Array.from(relatedIds), - not: false, - operator: 'in' - }] - }; + if (dbEntity) { + // Update latest pruned entity in cache. + this.cacheUpdatedEntity(repo, dbEntity, true); + } - const relatedEntities = await this.getModelEntities( - queryRunner, - relationEntity, - block, - where, - {}, - childRelations - ); + return dbEntity; + } + } - const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => { - acc[entity.id] = entity; + const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer(); + const dbEntity = await this.getPrevEntityVersion(repo.queryRunner!, repo, findOptions); + endTimer(); - return acc; - }, {}); + return dbEntity; + } - entities.forEach((entity: any) => { - const relatedEntityIds: Set = entity[field].reduce((acc: Set, id: string) => { - acc.add(id); + return repo.findOne(findOptions); + } + + async loadRelations ( + queryRunner: QueryRunner, + block: BlockHeight, + relationsMap: Map, + entityQueryMap: Map Entity, ENTITY_QUERY_TYPE>, + entity: new () => Entity, + entities: Entity[], + selections: ReadonlyArray = [] + ): Promise { + const relations = relationsMap.get(entity); + + if (!relations) { + return entities; + } + + // Filter selections from GQL query which are relations. + const relationPromises = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value])) + .map(async (selection) => { + assert(selection.kind === 'Field'); + const field = selection.name.value; + const { entity: relationEntity, type, foreignKey } = relations[field]; + let childSelections = selection.selectionSet?.selections || []; + + // Filter out __typename field in GQL for loading relations. + childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename')); + + switch (type) { + case 'one-to-many': { + assert(foreignKey); + + const where: Where = { + [foreignKey]: [{ + value: entities.map((entity: any) => entity.id), + not: false, + operator: 'in' + }] + }; + + const relatedEntities = await this.getModelEntities( + queryRunner, + relationsMap, + entityQueryMap, + relationEntity, + block, + where, + {}, + childSelections + ); + + const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => { + // Related entity might be loaded with data. + const parentEntityId = entity[foreignKey].id ?? entity[foreignKey]; + + if (!acc[parentEntityId]) { + acc[parentEntityId] = []; + } + + if (acc[parentEntityId].length < DEFAULT_LIMIT) { + acc[parentEntityId].push(entity); + } + + return acc; + }, {}); + + entities.forEach((entity: any) => { + if (relatedEntitiesMap[entity.id]) { + entity[field] = relatedEntitiesMap[entity.id]; + } else { + entity[field] = []; + } + }); + + break; + } + + case 'many-to-many': { + const relatedIds = entities.reduce((acc, entity: any) => { + entity[field].forEach((relatedEntityId: any) => acc.add(relatedEntityId)); return acc; }, new Set()); - entity[field] = []; + const where: Where = { + id: [{ + value: Array.from(relatedIds), + not: false, + operator: 'in' + }] + }; + + const relatedEntities = await this.getModelEntities( + queryRunner, + relationsMap, + entityQueryMap, + relationEntity, + block, + where, + {}, + childSelections + ); + + entities.forEach((entity: any) => { + const relatedEntityIds: Set = entity[field].reduce((acc: Set, id: string) => { + acc.add(id); + + return acc; + }, new Set()); - relatedEntities.forEach((relatedEntity: any) => { - if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) { - entity[field].push(relatedEntity); - } + entity[field] = []; + + relatedEntities.forEach((relatedEntity: any) => { + if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) { + entity[field].push(relatedEntity); + } + }); }); - }); - break; - } + break; + } + + default: { + // For one-to-one/many-to-one relations. + if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') { + // Avoid loading relation if selections only has id field. + entities.forEach((entity: any) => { + entity[field] = { id: entity[field] }; + }); - default: { - // For one-to-one/many-to-one relations. - const where: Where = { - id: [{ - value: entities.map((entity: any) => entity[field]), - not: false, - operator: 'in' - }] - }; - - const relatedEntities = await this.getModelEntities( - queryRunner, - relationEntity, - block, - where, - {}, - childRelations - ); - - const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => { - acc[entity.id] = entity; - - return acc; - }, {}); - - entities.forEach((entity: any) => { - if (relatedEntitiesMap[entity[field]]) { - entity[field] = relatedEntitiesMap[entity[field]]; + break; } - }); - break; + const where: Where = { + id: [{ + value: entities.map((entity: any) => entity[field]), + not: false, + operator: 'in' + }] + }; + + const relatedEntities = await this.getModelEntities( + queryRunner, + relationsMap, + entityQueryMap, + relationEntity, + block, + where, + {}, + childSelections + ); + + const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => { + acc[entity.id] = entity; + + return acc; + }, {}); + + entities.forEach((entity: any) => { + if (relatedEntitiesMap[entity[field]]) { + entity[field] = relatedEntitiesMap[entity[field]]; + } + }); + + break; + } } - } - }); + }); await Promise.all(relationPromises); @@ -611,23 +881,25 @@ export class Database { if (id) { // Entity found in frothy region. findOptions.where.blockHash = blockHash; - } else { - // If entity not found in frothy region get latest entity in the pruned region. - // Filter out entities from pruned blocks. - const canonicalBlockNumber = blockNumber + 1; - const entityInPrunedRegion:any = await repo.createQueryBuilder('entity') - .innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash') - .where('block.is_pruned = false') - .andWhere('entity.id = :id', { id: findOptions.where.id }) - .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) - .orderBy('entity.block_number', 'DESC') - .limit(1) - .getOne(); - - findOptions.where.blockHash = entityInPrunedRegion?.blockHash; + + return repo.findOne(findOptions); } - return repo.findOne(findOptions); + return this._getLatestPrunedEntity(repo, findOptions.where.id, blockNumber + 1); + } + + async _getLatestPrunedEntity (repo: Repository, id: string, canonicalBlockNumber: number): Promise { + // Filter out latest entity from pruned blocks. + const entityInPrunedRegion = await repo.createQueryBuilder('entity') + .innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash') + .where('block.is_pruned = false') + .andWhere('entity.id = :id', { id }) + .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) + .orderBy('entity.block_number', 'DESC') + .limit(1) + .getOne(); + + return entityInPrunedRegion; } async getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> { @@ -693,6 +965,37 @@ export class Database { return repo.save(entity); } + cacheUpdatedEntity (repo: Repository, entity: any, pruned = false): void { + const tableName = repo.metadata.tableName; + + if (pruned) { + let entityIdMap = this._cachedEntities.latestPrunedEntities.get(tableName); + + if (!entityIdMap) { + entityIdMap = new Map(); + } + + entityIdMap.set(entity.id, _.cloneDeep(entity)); + this._cachedEntities.latestPrunedEntities.set(tableName, entityIdMap); + return; + } + + const frothyBlock = this._cachedEntities.frothyBlocks.get(entity.blockHash); + + // Update frothyBlock only if already present in cache. + // Might not be present when event processing starts without block processing on job retry. + if (frothyBlock) { + let entityIdMap = frothyBlock.entities.get(tableName); + + if (!entityIdMap) { + entityIdMap = new Map(); + } + + entityIdMap.set(entity.id, _.cloneDeep(entity)); + frothyBlock.entities.set(tableName, entityIdMap); + } + } + async _fetchBlockCount (): Promise { this._blockCount = await this._conn.getRepository('block_progress') .count(); @@ -775,7 +1078,8 @@ export class Database { _orderQuery ( repo: Repository, selectQueryBuilder: SelectQueryBuilder, - orderOptions: { orderBy?: string, orderDirection?: string } + orderOptions: { orderBy?: string, orderDirection?: string }, + columnPrefix = '' ): SelectQueryBuilder { const { orderBy, orderDirection } = orderOptions; assert(orderBy); @@ -784,8 +1088,21 @@ export class Database { assert(columnMetadata); return selectQueryBuilder.addOrderBy( - `${selectQueryBuilder.alias}.${columnMetadata.propertyAliasName}`, + `"${selectQueryBuilder.alias}"."${columnPrefix}${columnMetadata.databaseName}"`, orderDirection === 'desc' ? 'DESC' : 'ASC' ); } + + async _transformResults (queryRunner: QueryRunner, qb: SelectQueryBuilder, rawResults: any[]): Promise { + const transformer = new RawSqlResultsToEntityTransformer( + qb.expressionMap, + queryRunner.manager.connection.driver, + [], + [], + queryRunner + ); + + assert(qb.expressionMap.mainAlias); + return transformer.transform(rawResults, qb.expressionMap.mainAlias); + } } diff --git a/packages/util/src/graph-decimal.ts b/packages/util/src/graph-decimal.ts index 2be856f1..c3e87a79 100644 --- a/packages/util/src/graph-decimal.ts +++ b/packages/util/src/graph-decimal.ts @@ -91,6 +91,13 @@ export class GraphDecimal { return new GraphDecimal(this.value.div(param)); } + pow (n: Decimal.Value | GraphDecimal): GraphDecimal { + this._checkOutOfRange(this); + const param = this._checkOutOfRange(n); + + return new GraphDecimal(this.value.pow(param)); + } + isZero (): boolean { this._checkOutOfRange(this); diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 3de228e7..ac34c5c1 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -12,7 +12,7 @@ import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; import { wait } from './misc'; import { createPruningJob } from './common'; -import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; +import { eventProcessingLoadEntityCacheHitCount, eventProcessingLoadEntityCount, lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; const log = debug('vulcanize:job-runner'); @@ -76,6 +76,7 @@ export class JobRunner { } async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise { + console.time('time:job-runner#_pruneChain'); const { pruneBlockHeight } = job.data; log(`Processing chain pruning at ${pruneBlockHeight}`); @@ -116,6 +117,7 @@ export class JobRunner { // Update the canonical block in the SyncStatus. await this._indexer.updateSyncStatusCanonicalBlock(newCanonicalBlockHash, pruneBlockHeight); } + console.timeEnd('time:job-runner#_pruneChain'); } async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise { @@ -228,6 +230,7 @@ export class JobRunner { this._blockEventsMap.set(blockHash, events); } + console.time('time:job-runner#_indexBlock-saveBlockProgress'); blockProgress = await this._indexer.saveBlockProgress({ blockHash, blockNumber, @@ -236,8 +239,10 @@ export class JobRunner { numEvents: events.length, isComplete: events.length === 0 }); + console.timeEnd('time:job-runner#_indexBlock-saveBlockProgress'); } + await this._indexer.processBlock(blockProgress); this._blockNumEvents = blockProgress.numEvents; // Check if block has unprocessed events. @@ -336,6 +341,8 @@ export class JobRunner { dbEvent.eventInfo = JSON.stringify(eventInfo); } + eventProcessingLoadEntityCount.set(0); + eventProcessingLoadEntityCacheHitCount.set(0); await this._indexer.processEvent(dbEvent); } @@ -360,6 +367,9 @@ export class JobRunner { const watchedContract = this._indexer.isWatchedContract(dbEvent.contract); if (watchedContract) { + eventProcessingLoadEntityCount.set(0); + eventProcessingLoadEntityCacheHitCount.set(0); + // Events of contract added in same block might be processed multiple times. // This is because there is no check for lastProcessedEventIndex against these events. await this._indexer.processEvent(dbEvent); @@ -383,7 +393,9 @@ export class JobRunner { if (this._jobQueueConfig.lazyUpdateBlockProgress) { // Update in database at end of all events processing. + console.time('time:job-runner#_processEvents-updateBlockProgress'); await this._indexer.updateBlockProgress(block, block.lastProcessedEventIndex); + console.timeEnd('time:job-runner#_processEvents-updateBlockProgress'); } console.timeEnd('time:job-runner#_processEvents-events'); diff --git a/packages/util/src/metrics.ts b/packages/util/src/metrics.ts index db05aa21..1a139043 100644 --- a/packages/util/src/metrics.ts +++ b/packages/util/src/metrics.ts @@ -49,6 +49,26 @@ export const eventCount = new promClient.Gauge({ help: 'Total entries in event table' }); +export const eventProcessingLoadEntityCount = new promClient.Gauge({ + name: 'event_processing_load_entity_total', + help: 'Total load entities in a single event processing' +}); + +export const eventProcessingLoadEntityCacheHitCount = new promClient.Gauge({ + name: 'event_processing_load_entity_cache_hit_total', + help: 'Total load entities hitting cache in a single event processing' +}); + +export const eventProcessingLoadEntityDBQueryDuration = new promClient.Histogram({ + name: 'event_processing_load_entity_db_query_seconds', + help: 'Duration of DB query made in event processing' +}); + +export const eventProcessingEthCallDuration = new promClient.Histogram({ + name: 'event_processing_eth_call_duration_seconds', + help: 'Duration of eth_calls made in event processing' +}); + // Export metrics on a server const app: Application = express(); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index fbff7cac..deff0b41 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -4,7 +4,7 @@ import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; -import { Where, QueryOptions } from './database'; +import { Where, QueryOptions, CachedEntities } from './database'; export interface BlockProgressInterface { id: number; @@ -69,6 +69,7 @@ export interface IndexerInterface { saveEventEntity (dbEvent: EventInterface): Promise; saveEvents (dbEvents: EventInterface[]): Promise; processEvent (event: EventInterface): Promise; + processBlock (blockProgress: BlockProgressInterface): Promise; parseEventNameAndArgs?: (kind: string, logObj: any) => any; isWatchedContract: (address: string) => ContractInterface | undefined; cacheContract?: (contract: ContractInterface) => void;