diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index 205aa6a9d..d5948cb2d 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -87,8 +87,6 @@ eventsInBatch = 50 subgraphEventsOrder = true blockDelayInMilliSecs = 2000 - prefetchBlocksInMem = true - prefetchBlockCount = 10 # Boolean to switch between modes of processing events when starting the server. # Setting to true will fetch filtered events and required blocks in a range of blocks and then process them. diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 2eae5ad58..d748d7b66 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -688,10 +688,6 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchEventsAndSaveBlocks (blocks: DeepPartial[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial[] }[]> { - return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this.eventSignaturesMap, this.parseEventNameAndArgs.bind(this)); - } - async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgress, events: DeepPartial[], diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 0987eeb53..771408a1a 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -114,12 +114,6 @@ export class Indexer implements IndexerInterface { return ''; } - async fetchEventsAndSaveBlocks (blocks: DeepPartial[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> { - assert(blocks); - - return []; - } - async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{ blockProgress: BlockProgressInterface; events: DeepPartial[]; diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 67774f155..8222a4c97 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -68,34 +68,13 @@ export const fetchBlocksAtHeight = async ( ): Promise[]> => { let blocks = []; - // Check for blocks in cache if prefetchBlocksInMem flag set. - if (jobQueueConfig.prefetchBlocksInMem) { - // Get blocks prefetched in memory. - blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber); - log('size:common#fetchBlocksAtHeight-prefetch-_blockAndEventsMap-size:', blockAndEventsMap.size); - } - - if (!blocks.length) { - // Try fetching blocks from the db. - const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false); - blocks = blockProgressEntities.map((block: any) => { - block.timestamp = block.blockTimestamp; - - return block; - }); - } - - if (jobQueueConfig.prefetchBlocksInMem && !blocks.length) { - // If blocks not found in the db and cache, fetch next batch. - log(`common#cache-miss-${blockNumber}`); + // Try fetching blocks from the db. + const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false); + blocks = blockProgressEntities.map((block: any) => { + block.timestamp = block.blockTimestamp; - // Wait for blocks to be prefetched. - console.time('time:common#fetchBlocks-_prefetchBlocks'); - await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, blockAndEventsMap); - console.timeEnd('time:common#fetchBlocks-_prefetchBlocks'); - - blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber); - } + return block; + }); // Try fetching blocks from eth-server until found. while (!blocks.length) { @@ -180,125 +159,6 @@ export const fetchAndSaveFilteredLogsAndBlocks = async ( return blocksData.map(({ blockProgress }) => blockProgress); }; -export const _prefetchBlocks = async ( - blockNumber: number, - indexer: IndexerInterface, - jobQueueConfig: JobQueueConfig, - blockAndEventsMap: Map -): Promise => { - // Clear cache of any remaining blocks. - blockAndEventsMap.clear(); - - const blocksWithEvents = await _fetchBatchBlocks( - indexer, - jobQueueConfig, - blockNumber, - blockNumber + jobQueueConfig.prefetchBlockCount - ); - - blocksWithEvents.forEach(({ blockProgress, events }) => { - blockAndEventsMap.set( - blockProgress.blockHash, - { - block: blockProgress, - events, - // TODO: Set ethFullBlock and ethFullTransactions - ethFullBlock: {} as EthFullBlock, - ethFullTransactions: [] - }); - }); -}; - -/** - * Method to fetch blocks (with events) in the given range. - * @param indexer - * @param jobQueueConfig - * @param startBlock - * @param endBlock - */ -export const _fetchBatchBlocks = async ( - indexer: IndexerInterface, - jobQueueConfig: JobQueueConfig, - startBlock: number, - endBlock: number -): Promise<{ - blockProgress: BlockProgressInterface, - events: DeepPartial[] -}[]> => { - let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock); - let blocks = []; - - // Fetch blocks again if there are missing blocks. - while (true) { - console.time('time:common#fetchBatchBlocks-getBlocks'); - - const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber })); - const settledResults = await Promise.allSettled(blockPromises); - - const res: any[] = []; - for (let index = 0; index < settledResults.length; index++) { - const result = settledResults[index]; - // If fulfilled, return value - if (result.status === 'fulfilled') { - res.push(result.value); - continue; - } - - // If rejected, check error - // Handle null block error in case of Lotus EVM - // Otherwise, rethrow error - const err = result.reason; - if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) { - throw err; - } - - log(`Block ${blockNumbers[index]} requested was null (FEVM), skipping`); - - // Remove the corresponding block number from the blockNumbers to avoid retrying for the same - blockNumbers = blockNumbers.splice(index, 1); - - // Stop the iteration at the first null block found - // To avoid saving blocks after the null block - // so that they don't conflict with blocks fetched when processBlockByNumber gets called for the null block - // TODO: Optimize - break; - } - - console.timeEnd('time:common#fetchBatchBlocks-getBlocks'); - - const firstMissingBlockIndex = res.findIndex(blocks => blocks.length === 0); - - if (firstMissingBlockIndex === -1) { - blocks = res; - break; - } else if (firstMissingBlockIndex > 0) { - blocks = res.slice(0, firstMissingBlockIndex); - break; - } - - log(`No blocks fetched for block number ${blockNumbers[0]}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); - await wait(jobQueueConfig.blockDelayInMilliSecs); - } - - // Flatten array as there can be multiple blocks at the same height - blocks = blocks.flat(); - - if (jobQueueConfig.jobDelayInMilliSecs) { - await wait(jobQueueConfig.jobDelayInMilliSecs); - } - - blocks.forEach(block => { - block.blockTimestamp = Number(block.timestamp); - block.blockNumber = Number(block.blockNumber); - }); - - console.time('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks'); - const blockAndEventsList = await indexer.fetchEventsAndSaveBlocks(blocks); - console.timeEnd('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks'); - - return blockAndEventsList; -}; - /** * Process events in batches for a block. * @param indexer @@ -602,9 +462,3 @@ export const createCheckpointJob = async (jobQueue: JobQueue, blockHash: string, } ); }; - -const getPrefetchedBlocksAtHeight = (blockAndEventsMap: Map, blockNumber: number):any[] => { - return Array.from(blockAndEventsMap.values()) - .filter(({ block }) => Number(block.blockNumber) === blockNumber) - .map(prefetchedBlock => prefetchedBlock.block); -}; diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 718d0c1fe..1b797897e 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -22,8 +22,6 @@ export interface JobQueueConfig { lazyUpdateBlockProgress?: boolean; subgraphEventsOrder: boolean; blockDelayInMilliSecs: number; - prefetchBlocksInMem: boolean; - prefetchBlockCount: number; // Block range in which logs are fetched during historical blocks processing historicalLogsBlockRange?: number; // Max block range of historical processing after which it waits for completion of events processing diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 3dcd152fb..e227d9e1b 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -359,106 +359,6 @@ export class Indexer { return this._db.getEvent(id); } - // For each of the given blocks, fetches events and saves them along with the block to db - // Returns an array with [block, events] for all the given blocks - async fetchEventsAndSaveBlocks (blocks: DeepPartial[], eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> { - if (!blocks.length) { - return []; - } - - const fromBlock = blocks[0].blockNumber; - const toBlock = blocks[blocks.length - 1].blockNumber; - log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`); - - const dbEventsMap = await this.fetchEventsForBlocks(blocks, eventSignaturesMap, parseEventNameAndArgs); - - const blocksWithEventsPromises = blocks.map(async block => { - const blockHash = block.blockHash; - assert(blockHash); - - const dbEvents = dbEventsMap.get(blockHash) || []; - const [blockProgress] = await this.saveBlockWithEvents(block, dbEvents); - log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetched for block: ${blockHash} num events: ${blockProgress.numEvents}`); - - return { blockProgress, events: [] }; - }); - - return Promise.all(blocksWithEventsPromises); - } - - // Fetch events (to be saved to db) for a block range - async fetchEventsForBlocks (blocks: DeepPartial[], eventSignaturesMap: Map, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise[]>> { - if (!blocks.length) { - return new Map(); - } - - // Fetch logs for block range of given blocks - const fromBlock = blocks[0].blockNumber; - const toBlock = blocks[blocks.length - 1].blockNumber; - - assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient'); - - const { addresses, topics } = this._createLogsFilters(eventSignaturesMap); - - const { logs } = await this._ethClient.getLogsForBlockRange({ - fromBlock, - toBlock, - addresses, - topics - }); - - // Skip further processing if no relevant logs found in the entire block range - if (!logs.length) { - return new Map(); - } - - // Sort logs according to blockhash - const blockLogsMap = this._reduceLogsToBlockLogsMap(logs); - - // Fetch transactions for given blocks - const transactionsMap: Map = new Map(); - const transactionPromises = blocks.map(async (block) => { - assert(block.blockHash); - - // Skip fetching txs if no relevant logs found in this block - if (!blockLogsMap.has(block.blockHash)) { - return; - } - - const blockWithTransactions = await this._ethClient.getBlockWithTransactions({ blockHash: block.blockHash, blockNumber: block.blockNumber }); - const { - allEthHeaderCids: { - nodes: [ - { - ethTransactionCidsByHeaderId: { - nodes: transactions - } - } - ] - } - } = blockWithTransactions; - - transactionsMap.set(block.blockHash, transactions); - }); - - await Promise.all(transactionPromises); - - // Map db ready events according to blockhash - const dbEventsMap: Map[]> = new Map(); - blocks.forEach(block => { - const blockHash = block.blockHash; - assert(blockHash); - - const logs = blockLogsMap.get(blockHash) || []; - const transactions = transactionsMap.get(blockHash) || []; - - const dbEvents = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs); - dbEventsMap.set(blockHash, dbEvents); - }); - - return dbEventsMap; - } - async fetchAndSaveFilteredEventsAndBlocks ( fromBlock: number, toBlock: number, diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 25036251c..d8656727f 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -423,7 +423,11 @@ export class JobRunner { log(`Processing chain pruning at ${pruneBlockHeight}`); // Assert we're at a depth where pruning is safe. - assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH)); + if (!(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH))) { + const message = `Pruning is not safe at height ${pruneBlockHeight}, latest indexed block height ${syncStatus.latestIndexedBlockNumber}`; + log(message); + throw new Error(message); + } // Check that we haven't already pruned at this depth. if (syncStatus.latestCanonicalBlockNumber >= pruneBlockHeight) { @@ -585,34 +589,26 @@ export class JobRunner { } if (!blockProgress) { - const prefetchedBlock = this._blockAndEventsMap.get(blockHash); - - // Check if prefetched block is set properly - // prefetchedBlock.block is an empty object when running in realtime processing - if (prefetchedBlock && prefetchedBlock.block.blockHash) { - ({ block: blockProgress } = prefetchedBlock); - } else { - // Delay required to process block. - const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; - await wait(jobDelayInMilliSecs); - - console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); - log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`); - let ethFullTransactions; - [blockProgress,, ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp }); - log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`); - console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); - const data = this._blockAndEventsMap.get(blockHash); - assert(data); - - this._blockAndEventsMap.set( - blockHash, - { - ...data, - block: blockProgress, - ethFullTransactions - }); - } + // Delay required to process block. + const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; + await wait(jobDelayInMilliSecs); + + console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); + log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`); + let ethFullTransactions; + [blockProgress, , ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp }); + log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`); + console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents'); + const data = this._blockAndEventsMap.get(blockHash); + assert(data); + + this._blockAndEventsMap.set( + blockHash, + { + ...data, + block: blockProgress, + ethFullTransactions + }); } if (!blockProgress.isComplete) { diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 1e899cc47..6dbd3c31a 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -173,7 +173,6 @@ export interface IndexerInterface { getLatestStateIndexedBlock (): Promise getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> getAncestorAtDepth (blockHash: string, depth: number): Promise - fetchEventsAndSaveBlocks (blocks: DeepPartial[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial[] }[]> saveBlockAndFetchEvents (block: DeepPartial): Promise<[ BlockProgressInterface, DeepPartial[],