diff --git a/packages/prisma/migrations/20240719140156_include_validator_epoch_fields/migration.sql b/packages/prisma/migrations/20240719140156_include_validator_epoch_fields/migration.sql new file mode 100644 index 00000000..5b9ee7e1 --- /dev/null +++ b/packages/prisma/migrations/20240719140156_include_validator_epoch_fields/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable +ALTER TABLE "Validator" ADD COLUMN "activationEpoch" BIGINT NOT NULL, +ADD COLUMN "exitEpoch" BIGINT NOT NULL, +ADD COLUMN "pubkey" TEXT NOT NULL, +ADD COLUMN "updatedAt" TIMESTAMP(3) NOT NULL; diff --git a/packages/prisma/schema.prisma b/packages/prisma/schema.prisma index 0ccdea60..6eeb3cba 100644 --- a/packages/prisma/schema.prisma +++ b/packages/prisma/schema.prisma @@ -170,12 +170,16 @@ model ValidatorRestake { model Validator { validatorIndex BigInt @id + pubkey String status String balance BigInt effectiveBalance BigInt slashed Boolean withdrawalCredentials String + activationEpoch BigInt + exitEpoch BigInt + updatedAt DateTime } model Withdrawal { diff --git a/packages/seeder/.env.example b/packages/seeder/.env.example index 2ae88a07..d2bb2f50 100644 --- a/packages/seeder/.env.example +++ b/packages/seeder/.env.example @@ -3,4 +3,6 @@ NETWORK = "holesky" NETWORK_CHAIN_RPC_URL = "" NETWORK_CHAIN_WSS_URL = "" DATABASE_URL = "" -DIRECT_URL = "" \ No newline at end of file +DIRECT_URL = "" +VALIDATOR_SERVER_URL = "" +VALIDATOR_BEARER_TOKEN = "" \ No newline at end of file diff --git a/packages/seeder/src/index.ts b/packages/seeder/src/index.ts index ce18a302..3ac9804e 100644 --- a/packages/seeder/src/index.ts +++ b/packages/seeder/src/index.ts @@ -59,6 +59,7 @@ async function seedEigenData() { await seedCompletedWithdrawals() await seedDeposits() await seedPods() + await seedValidators() } catch (error) { console.log('Failed to seed data at:', Date.now()) console.log(error) @@ -85,23 +86,6 @@ async function seedMetadata() { } } -async function seedEigenPodValidators() { - await delay(360) - - while (true) { - try { - console.log('\nSeeding Validators data ...') - - await seedValidators(true) - } catch (error) { - console.log(error) - console.log('Failed to seed Validators at:', Date.now()) - } - - await delay(3600) - } -} - async function seedEigenStrategiesData() { await delay(120) @@ -138,6 +122,5 @@ async function seedRestakedData() { seedEigenData() seedMetadata() -seedEigenPodValidators() seedEigenStrategiesData() seedRestakedData() diff --git a/packages/seeder/src/seedValidators.ts b/packages/seeder/src/seedValidators.ts index bbdd8011..e7005ffc 100644 --- a/packages/seeder/src/seedValidators.ts +++ b/packages/seeder/src/seedValidators.ts @@ -1,119 +1,155 @@ import 'dotenv/config' +import type prisma from '@prisma/client' import { getPrismaClient } from './utils/prismaClient' import { bulkUpdateDbTransactions } from './utils/seeder' -import { chunkArray } from './utils/array' - -export async function seedValidators(shouldClearPrev?: boolean) { +export async function seedValidators() { const prismaClient = getPrismaClient() - let validators = [] - // biome-ignore lint/suspicious/noExplicitAny: - const podValidators: any[] = [] - const podAddresses = await prismaClient.pod.findMany({ - select: { address: true } - }) - const podAddressList = podAddresses.map((p) => p.address.toLowerCase()) + const startAt = await getLastUpdate() + const endAt = new Date() + + // Bail early if there is no time diff to sync + if (startAt) { + if (endAt.getTime() - startAt.getTime() <= 0) { + console.log( + `[In Sync] [Data] Validators from: ${startAt.getTime()} to: ${endAt.getTime()}` + ) + return + } + } - const lastValidatorIndex = await prismaClient.validator.findFirst({ - select: { validatorIndex: true }, - orderBy: { validatorIndex: 'desc' } - }) + try { + // Get all pod addresses and add it to the request + const maxPodsPerPage = 10000 + const totalPods = await prismaClient.pod.count() + const totalPodsPages = Math.ceil(totalPods / maxPodsPerPage) + + // biome-ignore lint/suspicious/noExplicitAny: + const dbTransactions: any[] = [] + let validatorList: prisma.Validator[] = [] + + for (let i = 0; i < totalPodsPages; i++) { + const podAddresses = await prismaClient.pod.findMany({ + select: { address: true }, + take: maxPodsPerPage, + skip: i * maxPodsPerPage + }) - const rpcUrl = process.env.NETWORK_CHAIN_RPC_URL - const status = 'finalized' - - console.time('Done in') - console.log('Seeding validators for eigenpods:', podAddressList.length) - - let isAtEnd = false - let batchIndex = 0 - let currentIndex = - !shouldClearPrev && lastValidatorIndex - ? Number(lastValidatorIndex.validatorIndex) + 1 - : 0 - const chunkSize = 6000 - const batchSize = 24_000 - const clearPerv = shouldClearPrev - ? shouldClearPrev - : lastValidatorIndex?.validatorIndex - ? false - : true - - while (!isAtEnd) { - const validatorRestakeIds = Array.from( - { length: batchSize }, - (_, i) => currentIndex + i - ) - const chunks = chunkArray(validatorRestakeIds, chunkSize) - - console.log( - `[Batch] Validator chunk ${batchIndex} from ${currentIndex} - ${ - currentIndex + batchSize - }` - ) - - await Promise.allSettled( - chunks.map(async (chunk, i) => { - const req = await fetch( - `${rpcUrl}/eth/v1/beacon/states/${status}/validators?id=${chunk.join( - ',' - )}` + const podAddressList = podAddresses.map((p) => p.address.toLowerCase()) + const validators = await fetchValidators(podAddressList, startAt) + validatorList = validatorList.concat(validators) + + console.log( + `[Batch] Validators loaded count: ${validators.length} from: ${ + i * maxPodsPerPage + } to ${Math.min(totalPods, (i + 1) * maxPodsPerPage)}` + ) + } + + if (validatorList.length > 0) { + if (!startAt) { + dbTransactions.push(prismaClient.validator.deleteMany()) + dbTransactions.push( + prismaClient.validator.createMany({ + data: validatorList + }) + ) + } else { + const validatorIndexs = validatorList.map((v) => v.validatorIndex) + dbTransactions.push( + prismaClient.validator.deleteMany({ + where: { validatorIndex: { in: validatorIndexs } } + }) ) - const validatorsData = await req.json() - validators = validators.concat(validatorsData.data) - - // biome-ignore lint/suspicious/noExplicitAny: - validatorsData.data.map((v: any) => { - const withdrawalCredentials = `0x${v.validator.withdrawal_credentials - .slice(-40) - .toLowerCase()}` - - if (podAddressList.indexOf(withdrawalCredentials) !== -1) { - podValidators.push({ - validatorIndex: v.index as bigint, - status: v.status as string, - - balance: v.balance as bigint, - effectiveBalance: v.validator.effective_balance as bigint, - slashed: v.validator.slashed as boolean, - withdrawalCredentials: v.validator - .withdrawal_credentials as string - }) - } - }) - - if (validatorsData.data.length < chunkSize) { - isAtEnd = true - } - }) - ) + dbTransactions.push( + prismaClient.validator.createMany({ + data: validatorList + }) + ) + } + + await bulkUpdateDbTransactions( + dbTransactions, + `[Data] Validators updated: ${validatorList.length}` + ) + } else { + console.log( + `[In Sync] [Data] Validators from: ${startAt?.getTime()} to: ${endAt.getTime()}` + ) + } + } catch (error) { + console.log('Error seeding Validators: ', error) + } +} + +async function fetchValidators(podAddressList: string[], startAt: Date | null) { + const url = process.env.VALIDATOR_SERVER_URL + const token = process.env.VALIDATOR_BEARER_TOKEN + + const requestBody = { + podAddresses: podAddressList, + startAt: startAt ? startAt.toISOString() : undefined + } + + const response = await fetch(`${url}/validators`, { + method: 'POST', + headers: { + Authorization: `Bearer ${token}`, + 'Content-Type': 'application/json', + 'Accept-Encoding': 'gzip' + }, + body: JSON.stringify(requestBody) + }) + + if (!response.ok) { + throw new Error(`HTTP error: ${response.status}`) + } + + if (!response.body) { + throw new Error('Empty response') + } + + const reader = response.body.getReader() + const decoder = new TextDecoder() + + const chunks: Uint8Array[] = [] + let totalSize = 0 - currentIndex += batchSize - batchIndex++ + while (true) { + const { done, value } = await reader.read() + + if (done) { + break + } + + chunks.push(value) + totalSize += value.length } - // biome-ignore lint/suspicious/noExplicitAny: - const dbTransactions: any[] = [] + const fullBuffer = new Uint8Array(totalSize) + let offset = 0 - // Clear all validator data - if (clearPerv) { - dbTransactions.push(prismaClient.validator.deleteMany()) + for (const chunk of chunks) { + fullBuffer.set(chunk, offset) + offset += chunk.length } - dbTransactions.push( - prismaClient.validator.createMany({ - data: podValidators - }) - ) + const jsonString = decoder.decode(fullBuffer) + const validatorsData = JSON.parse(jsonString) + + return validatorsData +} - await bulkUpdateDbTransactions( - dbTransactions, - `[Data] Validator updated size: ${podValidators.length}` - ) +async function getLastUpdate() { + const prismaClient = getPrismaClient() + + const latestRecord = await prismaClient.validator.findFirst({ + select: { updatedAt: true }, + orderBy: { updatedAt: 'desc' } + }) - console.log('Seeded Validators', podValidators.length) - console.timeEnd('Done in') + return latestRecord ? latestRecord.updatedAt : null }