Skip to content

Commit

Permalink
Merge pull request #150 from EigenExplorer/146-optimize-validator-see…
Browse files Browse the repository at this point in the history
…ding

146 optimize validator seeding
  • Loading branch information
uditdc authored Jul 22, 2024
2 parents afc0f91 + 27b8f67 commit 97e0325
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- AlterTable
ALTER TABLE "Validator" ADD COLUMN "activationEpoch" BIGINT NOT NULL,
ADD COLUMN "exitEpoch" BIGINT NOT NULL,
ADD COLUMN "pubkey" TEXT NOT NULL,
ADD COLUMN "updatedAt" TIMESTAMP(3) NOT NULL;
4 changes: 4 additions & 0 deletions packages/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,16 @@ model ValidatorRestake {

model Validator {
validatorIndex BigInt @id
pubkey String
status String
balance BigInt
effectiveBalance BigInt
slashed Boolean
withdrawalCredentials String
activationEpoch BigInt
exitEpoch BigInt
updatedAt DateTime
}

model Withdrawal {
Expand Down
4 changes: 3 additions & 1 deletion packages/seeder/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ NETWORK = "holesky"
NETWORK_CHAIN_RPC_URL = ""
NETWORK_CHAIN_WSS_URL = ""
DATABASE_URL = ""
DIRECT_URL = ""
DIRECT_URL = ""
VALIDATOR_SERVER_URL = ""
VALIDATOR_BEARER_TOKEN = ""
19 changes: 1 addition & 18 deletions packages/seeder/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async function seedEigenData() {
await seedCompletedWithdrawals()
await seedDeposits()
await seedPods()
await seedValidators()
} catch (error) {
console.log('Failed to seed data at:', Date.now())
console.log(error)
Expand All @@ -85,23 +86,6 @@ async function seedMetadata() {
}
}

async function seedEigenPodValidators() {
await delay(360)

while (true) {
try {
console.log('\nSeeding Validators data ...')

await seedValidators(true)
} catch (error) {
console.log(error)
console.log('Failed to seed Validators at:', Date.now())
}

await delay(3600)
}
}

async function seedEigenStrategiesData() {
await delay(120)

Expand Down Expand Up @@ -138,6 +122,5 @@ async function seedRestakedData() {

seedEigenData()
seedMetadata()
seedEigenPodValidators()
seedEigenStrategiesData()
seedRestakedData()
234 changes: 135 additions & 99 deletions packages/seeder/src/seedValidators.ts
Original file line number Diff line number Diff line change
@@ -1,119 +1,155 @@
import 'dotenv/config'

import type prisma from '@prisma/client'
import { getPrismaClient } from './utils/prismaClient'
import { bulkUpdateDbTransactions } from './utils/seeder'

import { chunkArray } from './utils/array'

export async function seedValidators(shouldClearPrev?: boolean) {
export async function seedValidators() {
const prismaClient = getPrismaClient()

let validators = []
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
const podValidators: any[] = []
const podAddresses = await prismaClient.pod.findMany({
select: { address: true }
})
const podAddressList = podAddresses.map((p) => p.address.toLowerCase())
const startAt = await getLastUpdate()
const endAt = new Date()

// Bail early if there is no time diff to sync
if (startAt) {
if (endAt.getTime() - startAt.getTime() <= 0) {
console.log(
`[In Sync] [Data] Validators from: ${startAt.getTime()} to: ${endAt.getTime()}`
)
return
}
}

const lastValidatorIndex = await prismaClient.validator.findFirst({
select: { validatorIndex: true },
orderBy: { validatorIndex: 'desc' }
})
try {
// Get all pod addresses and add it to the request
const maxPodsPerPage = 10000
const totalPods = await prismaClient.pod.count()
const totalPodsPages = Math.ceil(totalPods / maxPodsPerPage)

// biome-ignore lint/suspicious/noExplicitAny: <explanation>
const dbTransactions: any[] = []
let validatorList: prisma.Validator[] = []

for (let i = 0; i < totalPodsPages; i++) {
const podAddresses = await prismaClient.pod.findMany({
select: { address: true },
take: maxPodsPerPage,
skip: i * maxPodsPerPage
})

const rpcUrl = process.env.NETWORK_CHAIN_RPC_URL
const status = 'finalized'

console.time('Done in')
console.log('Seeding validators for eigenpods:', podAddressList.length)

let isAtEnd = false
let batchIndex = 0
let currentIndex =
!shouldClearPrev && lastValidatorIndex
? Number(lastValidatorIndex.validatorIndex) + 1
: 0
const chunkSize = 6000
const batchSize = 24_000
const clearPerv = shouldClearPrev
? shouldClearPrev
: lastValidatorIndex?.validatorIndex
? false
: true

while (!isAtEnd) {
const validatorRestakeIds = Array.from(
{ length: batchSize },
(_, i) => currentIndex + i
)
const chunks = chunkArray(validatorRestakeIds, chunkSize)

console.log(
`[Batch] Validator chunk ${batchIndex} from ${currentIndex} - ${
currentIndex + batchSize
}`
)

await Promise.allSettled(
chunks.map(async (chunk, i) => {
const req = await fetch(
`${rpcUrl}/eth/v1/beacon/states/${status}/validators?id=${chunk.join(
','
)}`
const podAddressList = podAddresses.map((p) => p.address.toLowerCase())
const validators = await fetchValidators(podAddressList, startAt)
validatorList = validatorList.concat(validators)

console.log(
`[Batch] Validators loaded count: ${validators.length} from: ${
i * maxPodsPerPage
} to ${Math.min(totalPods, (i + 1) * maxPodsPerPage)}`
)
}

if (validatorList.length > 0) {
if (!startAt) {
dbTransactions.push(prismaClient.validator.deleteMany())
dbTransactions.push(
prismaClient.validator.createMany({
data: validatorList
})
)
} else {
const validatorIndexs = validatorList.map((v) => v.validatorIndex)
dbTransactions.push(
prismaClient.validator.deleteMany({
where: { validatorIndex: { in: validatorIndexs } }
})
)

const validatorsData = await req.json()
validators = validators.concat(validatorsData.data)

// biome-ignore lint/suspicious/noExplicitAny: <explanation>
validatorsData.data.map((v: any) => {
const withdrawalCredentials = `0x${v.validator.withdrawal_credentials
.slice(-40)
.toLowerCase()}`

if (podAddressList.indexOf(withdrawalCredentials) !== -1) {
podValidators.push({
validatorIndex: v.index as bigint,
status: v.status as string,

balance: v.balance as bigint,
effectiveBalance: v.validator.effective_balance as bigint,
slashed: v.validator.slashed as boolean,
withdrawalCredentials: v.validator
.withdrawal_credentials as string
})
}
})

if (validatorsData.data.length < chunkSize) {
isAtEnd = true
}
})
)
dbTransactions.push(
prismaClient.validator.createMany({
data: validatorList
})
)
}

await bulkUpdateDbTransactions(
dbTransactions,
`[Data] Validators updated: ${validatorList.length}`
)
} else {
console.log(
`[In Sync] [Data] Validators from: ${startAt?.getTime()} to: ${endAt.getTime()}`
)
}
} catch (error) {
console.log('Error seeding Validators: ', error)
}
}

async function fetchValidators(podAddressList: string[], startAt: Date | null) {
const url = process.env.VALIDATOR_SERVER_URL
const token = process.env.VALIDATOR_BEARER_TOKEN

const requestBody = {
podAddresses: podAddressList,
startAt: startAt ? startAt.toISOString() : undefined
}

const response = await fetch(`${url}/validators`, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`,
'Content-Type': 'application/json',
'Accept-Encoding': 'gzip'
},
body: JSON.stringify(requestBody)
})

if (!response.ok) {
throw new Error(`HTTP error: ${response.status}`)
}

if (!response.body) {
throw new Error('Empty response')
}

const reader = response.body.getReader()
const decoder = new TextDecoder()

const chunks: Uint8Array[] = []
let totalSize = 0

currentIndex += batchSize
batchIndex++
while (true) {
const { done, value } = await reader.read()

if (done) {
break
}

chunks.push(value)
totalSize += value.length
}

// biome-ignore lint/suspicious/noExplicitAny: <explanation>
const dbTransactions: any[] = []
const fullBuffer = new Uint8Array(totalSize)
let offset = 0

// Clear all validator data
if (clearPerv) {
dbTransactions.push(prismaClient.validator.deleteMany())
for (const chunk of chunks) {
fullBuffer.set(chunk, offset)
offset += chunk.length
}

dbTransactions.push(
prismaClient.validator.createMany({
data: podValidators
})
)
const jsonString = decoder.decode(fullBuffer)
const validatorsData = JSON.parse(jsonString)

return validatorsData
}

await bulkUpdateDbTransactions(
dbTransactions,
`[Data] Validator updated size: ${podValidators.length}`
)
async function getLastUpdate() {
const prismaClient = getPrismaClient()

const latestRecord = await prismaClient.validator.findFirst({
select: { updatedAt: true },
orderBy: { updatedAt: 'desc' }
})

console.log('Seeded Validators', podValidators.length)
console.timeEnd('Done in')
return latestRecord ? latestRecord.updatedAt : null
}

0 comments on commit 97e0325

Please sign in to comment.