From 29a9f539cc2253da5d6a85babc0da3d593bf920e Mon Sep 17 00:00:00 2001 From: Bob Jr Date: Tue, 26 Dec 2023 12:01:19 +0700 Subject: [PATCH] [updateCandidateStatus] Reduce rpc calls in updateSignerPenAndStatus (resolve #801) --- crawl.js | 253 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 161 insertions(+), 92 deletions(-) diff --git a/crawl.js b/crawl.js index 0daafbf8..0c5e29dd 100644 --- a/crawl.js +++ b/crawl.js @@ -21,7 +21,7 @@ let cpValidator = 0 let tweetedMN = '' -async function watchValidator () { +async function watchValidator() { var blockNumber = cpValidator || await web3.eth.getBlockNumber() try { blockNumber = blockNumber || await web3.eth.getBlockNumber() @@ -160,7 +160,7 @@ async function watchValidator () { } } -async function updateCandidateInfo (candidate) { +async function updateCandidateInfo(candidate) { try { let capacity = await validator.methods.getCandidateCap(candidate).call() let owner = (await validator.methods.getCandidateOwner(candidate).call() || '').toLowerCase() @@ -206,7 +206,7 @@ async function updateCandidateInfo (candidate) { } } -async function updateVoterCap (candidate, voter) { +async function updateVoterCap(candidate, voter) { try { let capacity = await validator.methods.getVoterCap(candidate, voter).call() logger.debug('Update voter %s for candidate %s capacity %s', voter, candidate, String(capacity)) @@ -229,7 +229,7 @@ async function updateVoterCap (candidate, voter) { } // Get current candates -async function getCurrentCandidates () { +async function getCurrentCandidates() { try { let candidates = await validator.methods.getCandidates().call() let candidatesInDb = await db.Candidate.find({ @@ -257,7 +257,7 @@ async function getCurrentCandidates () { } } -async function updateSignerPenAndStatus () { +async function updateSignerPenAndStatus() { try { const latestBlockNumber = await web3.eth.getBlockNumber() const latestCheckpoint = latestBlockNumber - (latestBlockNumber % parseInt(config.get('blockchain.epoch'))) @@ -265,100 +265,169 @@ async function updateSignerPenAndStatus () { const blk = await web3.eth.getBlock(latestCheckpoint) const signers = [] const penalties = [] + let candidateBulkOps = [] + let statusBulkOps = [] + // get candidate list const candidates = await db.Candidate.find({ smartContractAddress: config.get('blockchain.validatorAddress'), candidate: { $ne: 'RESIGNED' } - }) - // loop and get status - await Promise.all(candidates.map(async (c) => { - const data = { - 'jsonrpc': '2.0', - 'method': 'eth_getCandidateStatus', - 'params': [c.candidate.toLowerCase(), 'latest'], - 'id': config.get('blockchain.networkId') + }, + { + candidate: 1, + status: 1 } - const response = await axios.post(config.get('blockchain.rpc'), data) - - if (response.data) { - const result = (response.data.result || {}).status - switch (result) { - case 'MASTERNODE': - signers.push(c.candidate) - await db.Candidate.findOneAndUpdate({ - smartContractAddress: config.get('blockchain.validatorAddress'), - candidate: c.candidate.toLowerCase() - }, { - $set: { - status: 'MASTERNODE' - } - }, { upsert: true }) - await db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, { - epoch: currentEpoch, - candidate: c.candidate, - status: 'MASTERNODE', - epochCreatedAt: moment.unix(blk.timestamp).utc() - }, { upsert: true }) - break - case 'SLASHED': - logger.info('Update candidate %s slashed at blockNumber %s', c.candidate, String(blk.number)) - // fireNotification - if (result.toLowerCase() !== c.status.toLowerCase()) { - // get all voters who have capacity > 0 - const voters = await db.Voter.find({ - candidate: c.candidate, - smartContractAddress: config.get('blockchain.validatorAddress'), - capacityNumber: { $gt: 0 } + ) + const onchainCandidateParams = { + 'jsonrpc': '2.0', + 'method': 'eth_getCandidates', + 'params': ['latest'], + 'id': config.get('blockchain.networkId') + } + const response = await axios.post(config.get('blockchain.rpc'), onchainCandidateParams) + if (response.data && response.data.result) { + const allCandidates = response.data.result.candidates + if (!allCandidates || allCandidates.length == 0) { + logger.error('no onchain candidate found') + return + } + for (const c of candidates) { + const thisCandidate = allCandidates[c] + switch (thisCandidate.status) { + case 'MASTERNODE': + signers.push(c.candidate) + candidateBulkOps.push({ + updateOne: { + filter: { + smartContractAddress: config.get('blockchain.validatorAddress'), + candidate: c.candidate.toLowerCase() + }, + update: { + $set: { + status: 'MASTERNODE' + } + }, + upsert: false + } }) - if (voters && voters.length > 0) { - await Promise.all(voters.map(async (v) => { - await fireNotification(v.voter, c.candidate, c.name, 'Slash', latestBlockNumber) - })) + statusBulkOps.push({ + updateOne: { + filter: { + epoch: currentEpoch, candidate: c.candidate + }, + update: { + $set: { + epoch: currentEpoch, + candidate: c.candidate, + status: 'MASTERNODE', + epochCreatedAt: moment.unix(blk.timestamp).utc() + } + }, + upsert: false + } + }) + break + case 'SLASHED': + logger.info('Update candidate %s slashed at blockNumber %s', c.candidate, String(blk.number)) + // fireNotification + if (thisCandidate.status.toLowerCase() !== c.status.toLowerCase()) { + // get all voters who have capacity > 0 + const voters = await db.Voter.find({ + candidate: c.candidate, + smartContractAddress: config.get('blockchain.validatorAddress'), + capacityNumber: { $gt: 0 } + }) + if (voters && voters.length > 0) { + await Promise.all(voters.map(async (v) => { + await fireNotification(v.voter, c.candidate, c.name, 'Slash', latestBlockNumber) + })) + } } - } + penalties.push(c.candidate) + + candidateBulkOps.push({ + updateOne: { + filter: { + smartContractAddress: config.get('blockchain.validatorAddress'), + candidate: c.candidate.toLowerCase() + }, + update: { + $set: { + status: 'SLASHED' + } + }, + upsert: false + } + }) - db.Candidate.findOneAndUpdate({ - smartContractAddress: config.get('blockchain.validatorAddress'), - candidate: c.candidate.toLowerCase() - }, { - $set: { - status: 'SLASHED' - } - }, { upsert: true }).then(() => true) - .catch(error => console.log(error)) + statusBulkOps.push({ + updateOne: { + filter: { + epoch: currentEpoch, candidate: c.candidate + }, + update: { + $set: { + epoch: currentEpoch, + candidate: c.candidate, + status: 'SLASHED', + epochCreatedAt: moment.unix(blk.timestamp).utc() + } + }, + upsert: false + } + }) + break + case 'PROPOSED': + candidateBulkOps.push({ + updateOne: { + filter: { + smartContractAddress: config.get('blockchain.validatorAddress'), + candidate: c.candidate.toLowerCase() + }, + update: { + $set: { + status: 'PROPOSED' + } + }, + upsert: false + } + }) + + statusBulkOps.push({ + updateOne: { + filter: { + epoch: currentEpoch, candidate: c.candidate + }, + update: { + $set: { + epoch: currentEpoch, + candidate: c.candidate, + status: 'PROPOSED', + epochCreatedAt: moment.unix(blk.timestamp).utc() + } + }, + upsert: false + } + }) + default: + break - db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, { - epoch: currentEpoch, - candidate: c.candidate, - status: 'SLASHED', - epochCreatedAt: moment.unix(blk.timestamp).utc() - }, { upsert: true }).then(() => true) - .catch(error => console.log(error)) - penalties.push(c.candidate) - break - case 'PROPOSED': - await db.Candidate.findOneAndUpdate({ - smartContractAddress: config.get('blockchain.validatorAddress'), - candidate: c.candidate.toLowerCase() - }, { - $set: { - status: 'PROPOSED' - } - }, { upsert: true }) - await db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, { - epoch: currentEpoch, - candidate: c.candidate, - status: 'PROPOSED', - epochCreatedAt: moment.unix(blk.timestamp).utc() - }, { upsert: true }) - break - default: - break } } - })) + + } + + if (candidateBulkOps.length > 0) { + const res = await db.Candidate.collection.bulkWrite(candidateBulkOps) + logger.debug(`Update candidates at block ${blk.number}, result ${res}`) + } + if (statusBulkOps.length > 0) { + const res = await db.Status.collection.bulkWrite(statusBulkOps) + logger.debug(`Update statuses at block ${blk.number}, result ${res}`) + } + await db.Signer.findOneAndUpdate({ blockNumber: blk.number }, { networkId: config.get('blockchain.networkId'), blockNumber: blk.number, @@ -381,7 +450,7 @@ async function updateSignerPenAndStatus () { } let sleep = (time) => new Promise((resolve) => setTimeout(resolve, time)) -async function watchNewBlock (n) { +async function watchNewBlock(n) { try { let blockNumber = await web3.eth.getBlockNumber() n = n || blockNumber @@ -505,7 +574,7 @@ async function watchNewBlock (n) { return watchNewBlock(n) } -async function fireNotification (voter, candidate, name, event, blockNumber, amount = '') { +async function fireNotification(voter, candidate, name, event, blockNumber, amount = '') { try { const isRead = false await db.Notification.findOneAndUpdate({ @@ -526,7 +595,7 @@ async function fireNotification (voter, candidate, name, event, blockNumber, amo } } -function diff (a, b) { +function diff(a, b) { return a.filter((i) => { return b.indexOf(i) < 0 }) @@ -561,11 +630,11 @@ const getBlockSigners = async (number) => { return [] } -async function updateLatestSignedBlock (blk) { +async function updateLatestSignedBlock(blk) { try { if (!blk || blk.number % parseInt(config.get('blockchain.blockSignerGap')) !== - parseInt(config.get('blockchain.blockSignerDelay')) + parseInt(config.get('blockchain.blockSignerDelay')) ) { return } @@ -597,7 +666,7 @@ async function updateLatestSignedBlock (blk) { } } -async function getPastEvent () { +async function getPastEvent() { let blockNumber = await web3.eth.getBlockNumber() let lastBlockTx = await db.Transaction.findOne().sort({ blockNumber: -1 }) let lb = (lastBlockTx && lastBlockTx.blockNumber) ? lastBlockTx.blockNumber : 0