Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove prefetchBlocksInMem flag along with functionality #474

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/codegen/src/templates/config-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@
eventsInBatch = 50
subgraphEventsOrder = true
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

# Boolean to switch between modes of processing events when starting the server.
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
Expand Down
4 changes: 0 additions & 4 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -688,10 +688,6 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}

async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgress>[]): Promise<{ blockProgress: BlockProgress, events: DeepPartial<Event>[] }[]> {
return this._baseIndexer.fetchEventsAndSaveBlocks(blocks, this.eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
}

async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{
blockProgress: BlockProgress,
events: DeepPartial<Event>[],
Expand Down
6 changes: 0 additions & 6 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ export class Indexer implements IndexerInterface {
return '';
}

async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[]): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
assert(blocks);

return [];
}

async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{
blockProgress: BlockProgressInterface;
events: DeepPartial<EventInterface>[];
Expand Down
158 changes: 6 additions & 152 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,34 +68,13 @@ export const fetchBlocksAtHeight = async (
): Promise<DeepPartial<BlockProgressInterface>[]> => {
let blocks = [];

// Check for blocks in cache if prefetchBlocksInMem flag set.
if (jobQueueConfig.prefetchBlocksInMem) {
// Get blocks prefetched in memory.
blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber);
log('size:common#fetchBlocksAtHeight-prefetch-_blockAndEventsMap-size:', blockAndEventsMap.size);
}

if (!blocks.length) {
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;

return block;
});
}

if (jobQueueConfig.prefetchBlocksInMem && !blocks.length) {
// If blocks not found in the db and cache, fetch next batch.
log(`common#cache-miss-${blockNumber}`);
// Try fetching blocks from the db.
const blockProgressEntities = await indexer.getBlocksAtHeight(blockNumber, false);
blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;

// Wait for blocks to be prefetched.
console.time('time:common#fetchBlocks-_prefetchBlocks');
await _prefetchBlocks(blockNumber, indexer, jobQueueConfig, blockAndEventsMap);
console.timeEnd('time:common#fetchBlocks-_prefetchBlocks');

blocks = getPrefetchedBlocksAtHeight(blockAndEventsMap, blockNumber);
}
return block;
});

// Try fetching blocks from eth-server until found.
while (!blocks.length) {
Expand Down Expand Up @@ -180,125 +159,6 @@ export const fetchAndSaveFilteredLogsAndBlocks = async (
return blocksData.map(({ blockProgress }) => blockProgress);
};

export const _prefetchBlocks = async (
blockNumber: number,
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
blockAndEventsMap: Map<string, PrefetchedBlock>
): Promise<void> => {
// Clear cache of any remaining blocks.
blockAndEventsMap.clear();

const blocksWithEvents = await _fetchBatchBlocks(
indexer,
jobQueueConfig,
blockNumber,
blockNumber + jobQueueConfig.prefetchBlockCount
);

blocksWithEvents.forEach(({ blockProgress, events }) => {
blockAndEventsMap.set(
blockProgress.blockHash,
{
block: blockProgress,
events,
// TODO: Set ethFullBlock and ethFullTransactions
ethFullBlock: {} as EthFullBlock,
ethFullTransactions: []
});
});
};

/**
* Method to fetch blocks (with events) in the given range.
* @param indexer
* @param jobQueueConfig
* @param startBlock
* @param endBlock
*/
export const _fetchBatchBlocks = async (
indexer: IndexerInterface,
jobQueueConfig: JobQueueConfig,
startBlock: number,
endBlock: number
): Promise<{
blockProgress: BlockProgressInterface,
events: DeepPartial<EventInterface>[]
}[]> => {
let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock);
let blocks = [];

// Fetch blocks again if there are missing blocks.
while (true) {
console.time('time:common#fetchBatchBlocks-getBlocks');

const blockPromises = blockNumbers.map(async blockNumber => indexer.getBlocks({ blockNumber }));
const settledResults = await Promise.allSettled(blockPromises);

const res: any[] = [];
for (let index = 0; index < settledResults.length; index++) {
const result = settledResults[index];
// If fulfilled, return value
if (result.status === 'fulfilled') {
res.push(result.value);
continue;
}

// If rejected, check error
// Handle null block error in case of Lotus EVM
// Otherwise, rethrow error
const err = result.reason;
if (!(err.code === errors.SERVER_ERROR && err.error && err.error.message === NULL_BLOCK_ERROR)) {
throw err;
}

log(`Block ${blockNumbers[index]} requested was null (FEVM), skipping`);

// Remove the corresponding block number from the blockNumbers to avoid retrying for the same
blockNumbers = blockNumbers.splice(index, 1);

// Stop the iteration at the first null block found
// To avoid saving blocks after the null block
// so that they don't conflict with blocks fetched when processBlockByNumber gets called for the null block
// TODO: Optimize
break;
}

console.timeEnd('time:common#fetchBatchBlocks-getBlocks');

const firstMissingBlockIndex = res.findIndex(blocks => blocks.length === 0);

if (firstMissingBlockIndex === -1) {
blocks = res;
break;
} else if (firstMissingBlockIndex > 0) {
blocks = res.slice(0, firstMissingBlockIndex);
break;
}

log(`No blocks fetched for block number ${blockNumbers[0]}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
await wait(jobQueueConfig.blockDelayInMilliSecs);
}

// Flatten array as there can be multiple blocks at the same height
blocks = blocks.flat();

if (jobQueueConfig.jobDelayInMilliSecs) {
await wait(jobQueueConfig.jobDelayInMilliSecs);
}

blocks.forEach(block => {
block.blockTimestamp = Number(block.timestamp);
block.blockNumber = Number(block.blockNumber);
});

console.time('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks');
const blockAndEventsList = await indexer.fetchEventsAndSaveBlocks(blocks);
console.timeEnd('time:common#fetchBatchBlocks-fetchEventsAndSaveBlocks');

return blockAndEventsList;
};

/**
* Process events in batches for a block.
* @param indexer
Expand Down Expand Up @@ -602,9 +462,3 @@ export const createCheckpointJob = async (jobQueue: JobQueue, blockHash: string,
}
);
};

const getPrefetchedBlocksAtHeight = (blockAndEventsMap: Map<string, PrefetchedBlock>, blockNumber: number):any[] => {
return Array.from(blockAndEventsMap.values())
.filter(({ block }) => Number(block.blockNumber) === blockNumber)
.map(prefetchedBlock => prefetchedBlock.block);
};
2 changes: 0 additions & 2 deletions packages/util/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ export interface JobQueueConfig {
lazyUpdateBlockProgress?: boolean;
subgraphEventsOrder: boolean;
blockDelayInMilliSecs: number;
prefetchBlocksInMem: boolean;
prefetchBlockCount: number;
// Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange?: number;
// Max block range of historical processing after which it waits for completion of events processing
Expand Down
100 changes: 0 additions & 100 deletions packages/util/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,106 +359,6 @@ export class Indexer {
return this._db.getEvent(id);
}

// For each of the given blocks, fetches events and saves them along with the block to db
// Returns an array with [block, events] for all the given blocks
async fetchEventsAndSaveBlocks (blocks: DeepPartial<BlockProgressInterface>[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<{ blockProgress: BlockProgressInterface, events: DeepPartial<EventInterface>[] }[]> {
if (!blocks.length) {
return [];
}

const fromBlock = blocks[0].blockNumber;
const toBlock = blocks[blocks.length - 1].blockNumber;
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetching from upstream server for range [${fromBlock}, ${toBlock}]`);

const dbEventsMap = await this.fetchEventsForBlocks(blocks, eventSignaturesMap, parseEventNameAndArgs);

const blocksWithEventsPromises = blocks.map(async block => {
const blockHash = block.blockHash;
assert(blockHash);

const dbEvents = dbEventsMap.get(blockHash) || [];
const [blockProgress] = await this.saveBlockWithEvents(block, dbEvents);
log(`fetchEventsAndSaveBlocks#fetchEventsForBlocks: fetched for block: ${blockHash} num events: ${blockProgress.numEvents}`);

return { blockProgress, events: [] };
});

return Promise.all(blocksWithEventsPromises);
}

// Fetch events (to be saved to db) for a block range
async fetchEventsForBlocks (blocks: DeepPartial<BlockProgressInterface>[], eventSignaturesMap: Map<string, string[]>, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<Map<string, DeepPartial<EventInterface>[]>> {
if (!blocks.length) {
return new Map();
}

// Fetch logs for block range of given blocks
const fromBlock = blocks[0].blockNumber;
const toBlock = blocks[blocks.length - 1].blockNumber;

assert(this._ethClient.getLogsForBlockRange, 'getLogsForBlockRange() not implemented in ethClient');

const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);

const { logs } = await this._ethClient.getLogsForBlockRange({
fromBlock,
toBlock,
addresses,
topics
});

// Skip further processing if no relevant logs found in the entire block range
if (!logs.length) {
return new Map();
}

// Sort logs according to blockhash
const blockLogsMap = this._reduceLogsToBlockLogsMap(logs);

// Fetch transactions for given blocks
const transactionsMap: Map<string, any> = new Map();
const transactionPromises = blocks.map(async (block) => {
assert(block.blockHash);

// Skip fetching txs if no relevant logs found in this block
if (!blockLogsMap.has(block.blockHash)) {
return;
}

const blockWithTransactions = await this._ethClient.getBlockWithTransactions({ blockHash: block.blockHash, blockNumber: block.blockNumber });
const {
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
} = blockWithTransactions;

transactionsMap.set(block.blockHash, transactions);
});

await Promise.all(transactionPromises);

// Map db ready events according to blockhash
const dbEventsMap: Map<string, DeepPartial<EventInterface>[]> = new Map();
blocks.forEach(block => {
const blockHash = block.blockHash;
assert(blockHash);

const logs = blockLogsMap.get(blockHash) || [];
const transactions = transactionsMap.get(blockHash) || [];

const dbEvents = this.createDbEventsFromLogsAndTxs(blockHash, logs, transactions, parseEventNameAndArgs);
dbEventsMap.set(blockHash, dbEvents);
});

return dbEventsMap;
}

async fetchAndSaveFilteredEventsAndBlocks (
fromBlock: number,
toBlock: number,
Expand Down
54 changes: 25 additions & 29 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,11 @@ export class JobRunner {
log(`Processing chain pruning at ${pruneBlockHeight}`);

// Assert we're at a depth where pruning is safe.
assert(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH));
if (!(syncStatus.latestIndexedBlockNumber >= (pruneBlockHeight + MAX_REORG_DEPTH))) {
const message = `Pruning is not safe at height ${pruneBlockHeight}, latest indexed block height ${syncStatus.latestIndexedBlockNumber}`;
log(message);
throw new Error(message);
}

// Check that we haven't already pruned at this depth.
if (syncStatus.latestCanonicalBlockNumber >= pruneBlockHeight) {
Expand Down Expand Up @@ -585,34 +589,26 @@ export class JobRunner {
}

if (!blockProgress) {
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);

// Check if prefetched block is set properly
// prefetchedBlock.block is an empty object when running in realtime processing
if (prefetchedBlock && prefetchedBlock.block.blockHash) {
({ block: blockProgress } = prefetchedBlock);
} else {
// Delay required to process block.
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
await wait(jobDelayInMilliSecs);

console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`);
let ethFullTransactions;
[blockProgress,, ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
const data = this._blockAndEventsMap.get(blockHash);
assert(data);

this._blockAndEventsMap.set(
blockHash,
{
...data,
block: blockProgress,
ethFullTransactions
});
}
// Delay required to process block.
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
await wait(jobDelayInMilliSecs);

console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
log(`_indexBlock#saveBlockAndFetchEvents: fetching from upstream server ${blockHash}`);
let ethFullTransactions;
[blockProgress, , ethFullTransactions] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
log(`_indexBlock#saveBlockAndFetchEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
const data = this._blockAndEventsMap.get(blockHash);
assert(data);

this._blockAndEventsMap.set(
blockHash,
{
...data,
block: blockProgress,
ethFullTransactions
});
}

if (!blockProgress.isComplete) {
Expand Down
Loading
Loading