Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peerdas - ensure there are at least n peers per sampling column subnet #7274

Open
wants to merge 67 commits into
base: peerDAS
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
4805a2e
feat: placeholder PR for electra
g11tech Jan 24, 2024
499d93c
feat: implement peerDAS on electra
g11tech Jan 24, 2024
156ef53
fix: docker build issue for c-kzg
matthewkeil Jun 21, 2024
47eedae
feat: get various sync mechanisms working with/without sharded data
g11tech Jul 14, 2024
d423004
feat: add the modifications to work with devnet2
g11tech Jul 14, 2024
a0c5d27
fix: refactor to add and use nodeid computation and clear out nodeid …
g11tech Jul 16, 2024
c7f6341
fix the types/test
g11tech Aug 9, 2024
81aaeb5
feat: add and use metadatav3 for peer custody subnet
g11tech Aug 12, 2024
e6c613f
rename electra fork to peerdas for rebase and make csc in metadata uint8
g11tech Aug 27, 2024
a3533f8
add supernode flag to configure node custody requirement and make it …
g11tech Aug 27, 2024
54579b0
add more info for debugging
g11tech Aug 27, 2024
180f7d8
fix log
g11tech Aug 27, 2024
4b6f167
fix bug
g11tech Aug 27, 2024
ae7678e
fx
g11tech Aug 27, 2024
a33a72f
subnet count 128
g11tech Aug 27, 2024
585165e
remove banning unknown block, addmore log
g11tech Aug 28, 2024
2833ac0
make the csc encoding updates as per latest spec
g11tech Sep 5, 2024
bf08852
resolve availability when datacolumns are downloaded and matched
g11tech Sep 7, 2024
006e781
add debug log
g11tech Sep 7, 2024
aece0ab
fix add missing data availability resolutions
g11tech Sep 10, 2024
387da88
add more log
g11tech Sep 10, 2024
2bc1a0d
add cache tracking
g11tech Sep 10, 2024
5e1de6f
trying some fix
g11tech Sep 10, 2024
d7721f8
fix bug
g11tech Sep 10, 2024
bd84892
more log
g11tech Sep 10, 2024
de341b5
add send more log
g11tech Sep 10, 2024
8c21168
make pull a little less agressive
g11tech Sep 10, 2024
d35873e
further wait till cutoff for all data to be available
g11tech Sep 11, 2024
f7571f4
add some more loggig and availaibility tracking
g11tech Sep 11, 2024
74d8122
add some log for debugging inbound data columns request
g11tech Sep 11, 2024
af933fb
some fixes
g11tech Sep 11, 2024
56c8c6e
custodied column fetch debugging log
g11tech Sep 12, 2024
2b10e4d
datacolumns retrival fix
g11tech Sep 12, 2024
cdd9bae
update compute spec tests
g11tech Sep 12, 2024
c4d04ee
fix the column id compute
g11tech Sep 13, 2024
3470076
more debug log
g11tech Sep 13, 2024
4ec7aff
edge case optimization
g11tech Sep 13, 2024
a33303f
feat: refactor and unit test getDataColumnSidecars (#7072)
matthewkeil Sep 16, 2024
bd4f7f9
feat: update ckzg to final DAS version (#7050)
matthewkeil Sep 16, 2024
b1940ee
fix: remove ckzg build script (#7089)
matthewkeil Sep 17, 2024
20ef4c6
feat: validate data column sidecars (#7073)
matthewkeil Sep 17, 2024
fee7c08
validate inclusion proof
g11tech Sep 17, 2024
574837a
use sample subnets for data availability
g11tech Sep 21, 2024
6a77828
add debug console log
g11tech Sep 21, 2024
cec27d6
handle edge case
g11tech Sep 21, 2024
a3de70f
turn persisting network identity to default true
g11tech Sep 26, 2024
cce193b
improve logging for debugging
g11tech Oct 1, 2024
2736b8c
add enhance datacolumn serving logs
g11tech Oct 1, 2024
a0e0087
more log
g11tech Oct 1, 2024
b04aaef
migrate datacolumns to finalized
g11tech Oct 2, 2024
7c9a01c
add debug and fix datacolumns migration and improve log
g11tech Oct 2, 2024
1c08ab3
some fixing of beacon params
g11tech Oct 2, 2024
513bccc
fix
g11tech Oct 2, 2024
fccf9a2
add prevdownload tracker
g11tech Oct 8, 2024
8689c76
feat: check for no commitments on block or column in sidecar validati…
matthewkeil Oct 22, 2024
c8075d0
feat: log peer disconnect info (#7231)
matthewkeil Nov 25, 2024
76cdf61
feat: use subnet request mechanism for subnet samping strategy
twoeths Dec 3, 2024
bd79900
feat: enhance peersRequestedSubnetsToQuery metric for column subnet type
twoeths Dec 3, 2024
a7423f6
chore: add peerCountPerSamplingColumnSubnet metric
twoeths Dec 3, 2024
64aeadb
feat: add network.targetColumnSubnetPeers cli flag
twoeths Dec 3, 2024
fe3975e
chore: remove unused peerIdToCustodySubnetCount variable
twoeths Dec 3, 2024
2926c7c
fix: ensure we have peer's custodySubnets on metadata response
twoeths Jan 8, 2025
8c76989
fix: get custodySubnets on status response
twoeths Jan 8, 2025
323cbe0
fix: request status when metadata is old or non-existent in onMetadata()
twoeths Jan 13, 2025
23e8927
fix: add back onlyConnect* flags
twoeths Jan 13, 2025
6f82fe7
fix: handle onDiscoveredPeer() with no custodySubnetCount information
twoeths Jan 13, 2025
e1e9a64
chore: address PR comments
twoeths Jan 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
FROM --platform=${BUILDPLATFORM:-amd64} node:22.4-slim as build_src
ARG COMMIT
WORKDIR /usr/app
RUN apt-get update && apt-get install -y g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y git g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*

COPY . .

Expand All @@ -23,7 +23,7 @@ RUN cd packages/cli && GIT_COMMIT=${COMMIT} yarn write-git-data
# Note: This step is redundant for the host arch
FROM node:22.4-slim as build_deps
WORKDIR /usr/app
RUN apt-get update && apt-get install -y g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y git g++ make python3 python3-setuptools && apt-get clean && rm -rf /var/lib/apt/lists/*

COPY --from=build_src /usr/app .

Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
"@lodestar/utils": "^1.21.0",
"@lodestar/validator": "^1.21.0",
"@multiformats/multiaddr": "^12.1.3",
"c-kzg": "^2.1.2",
"c-kzg": "^4.0.1",
"datastore-core": "^9.1.1",
"datastore-level": "^10.1.1",
"deepmerge": "^4.3.1",
Expand Down
49 changes: 39 additions & 10 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import {
reconstructFullBlockOrContents,
signedBeaconBlockToBlinded,
} from "@lodestar/state-transition";
import {ForkExecution, SLOTS_PER_HISTORICAL_ROOT, isForkExecution} from "@lodestar/params";
import {ForkExecution, SLOTS_PER_HISTORICAL_ROOT, isForkExecution, ForkName} from "@lodestar/params";
import {sleep, fromHex, toHex} from "@lodestar/utils";
import {
peerdas,
deneb,
isSignedBlockContents,
ProducedBlockSource,
Expand All @@ -23,10 +24,13 @@ import {
BlockInput,
BlobsSource,
BlockInputDataBlobs,
BlockInputDataDataColumns,
DataColumnsSource,
BlockInputData,
} from "../../../../chain/blocks/types.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {computeBlobSidecars} from "../../../../util/blobs.js";
import {computeBlobSidecars, computeDataColumnSidecars} from "../../../../util/blobs.js";
import {BlockError, BlockErrorCode, BlockGossipError} from "../../../../chain/errors/index.js";
import {OpSource} from "../../../../metrics/validatorMonitor.js";
import {NetworkEvent} from "../../../../network/index.js";
Expand Down Expand Up @@ -65,17 +69,40 @@ export function getBeaconBlockApi({
opts: PublishBlockOpts = {}
) => {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput, signedBlock: SignedBeaconBlock, blobSidecars: deneb.BlobSidecars;
let blockForImport: BlockInput,
signedBlock: SignedBeaconBlock,
blobSidecars: deneb.BlobSidecars,
dataColumnSidecars: peerdas.DataColumnSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock} = signedBlockOrContents);
blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents);
const blockData = {
fork: config.getForkName(signedBlock.message.slot),
blobs: blobSidecars,
blobsSource: BlobsSource.api,
blobsBytes: blobSidecars.map(() => null),
} as BlockInputDataBlobs;
const fork = config.getForkName(signedBlock.message.slot);
let blockData: BlockInputData;
if (fork === ForkName.peerdas) {
dataColumnSidecars = computeDataColumnSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
dataColumnsLen: dataColumnSidecars.length,
// dataColumnsIndex is a 1 based index of ith column present in dataColumns[custodyColumns[i-1]]
dataColumnsIndex: new Uint8Array(Array.from({length: dataColumnSidecars.length}, (_, j) => 1 + j)),
dataColumns: dataColumnSidecars,
dataColumnsBytes: dataColumnSidecars.map(() => null),
dataColumnsSource: DataColumnsSource.api,
} as BlockInputDataDataColumns;
blobSidecars = [];
} else if (fork === ForkName.deneb) {
blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
blobs: blobSidecars,
blobsSource: BlobsSource.api,
blobsBytes: blobSidecars.map(() => null),
} as BlockInputDataBlobs;
dataColumnSidecars = [];
} else {
throw Error(`Invalid data fork=${fork} for publish`);
}

blockForImport = getBlockInput.availableData(
config,
signedBlock,
Expand All @@ -87,6 +114,7 @@ export function getBeaconBlockApi({
} else {
signedBlock = signedBlockOrContents;
blobSidecars = [];
dataColumnSidecars = [];
blockForImport = getBlockInput.preData(config, signedBlock, BlockSource.api, context?.sszBytes ?? null);
}

Expand Down Expand Up @@ -221,6 +249,7 @@ export function getBeaconBlockApi({
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
...dataColumnSidecars.map((dataColumnSidecar) => () => network.publishDataColumnSidecar(dataColumnSidecar)),
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
Expand Down
72 changes: 66 additions & 6 deletions packages/beacon-node/src/chain/archiver/archiveBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export async function archiveBlocks(

// NOTE: The finalized block will be exactly the first block of `epoch` or previous
const finalizedPostDeneb = finalizedCheckpoint.epoch >= config.DENEB_FORK_EPOCH;
const finalizedPostPeerDAS = finalizedCheckpoint.epoch >= config.PEERDAS_FORK_EPOCH;

const finalizedCanonicalBlockRoots: BlockRootSlot[] = finalizedCanonicalBlocks.map((block) => ({
slot: block.slot,
Expand All @@ -60,8 +61,13 @@ export async function archiveBlocks(
});

if (finalizedPostDeneb) {
await migrateBlobSidecarsFromHotToColdDb(config, db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blobSidecars from hot DB to cold DB");
const migratedEntries = await migrateBlobSidecarsFromHotToColdDb(config, db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated blobSidecars from hot DB to cold DB", {migratedEntries});
}

if (finalizedPostPeerDAS) {
const migratedEntries = await migrateDataColumnSidecarsFromHotToColdDb(config, db, finalizedCanonicalBlockRoots);
logger.verbose("Migrated dataColumnSidecars from hot DB to cold DB", {migratedEntries});
}
}

Expand All @@ -77,7 +83,12 @@ export async function archiveBlocks(

if (finalizedPostDeneb) {
await db.blobSidecars.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical blobsSider from hot DB");
logger.verbose("Deleted non canonical blobSidecars from hot DB");
}

if (finalizedPostPeerDAS) {
await db.dataColumnSidecars.batchDelete(nonCanonicalBlockRoots);
logger.verbose("Deleted non canonical dataColumnSidecars from hot DB");
}
}

Expand All @@ -104,6 +115,11 @@ export async function archiveBlocks(
}
}

if (finalizedPostPeerDAS) {
// TODO
// Keep only `[current_epoch - max(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, archiveBlobEpochs)]
}

// Prunning potential checkpoint data
const finalizedCanonicalNonCheckpointBlocks = getNonCheckpointBlocks(finalizedCanonicalBlockRoots);
const nonCheckpointBlockRoots: Uint8Array[] = [...nonCanonicalBlockRoots];
Expand Down Expand Up @@ -162,18 +178,22 @@ async function migrateBlobSidecarsFromHotToColdDb(
config: ChainForkConfig,
db: IBeaconDb,
blocks: BlockRootSlot[]
): Promise<void> {
): Promise<number> {
let migratedWrappedBlobSidecars = 0;
for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
if (canonicalBlocks.length === 0) return;
if (canonicalBlocks.length === 0) break;

// load Buffer instead of ssz deserialized to improve performance
const canonicalBlobSidecarsEntries: KeyValue<Slot, Uint8Array>[] = await Promise.all(
canonicalBlocks
.filter((block) => config.getForkSeq(block.slot) >= ForkSeq.deneb)
.filter((block) => {
const blkSeq = config.getForkSeq(block.slot);
return blkSeq >= ForkSeq.deneb && blkSeq < ForkSeq.peerdas;
})
.map(async (block) => {
const bytes = await db.blobSidecars.getBinary(block.root);
if (!bytes) {
Expand All @@ -188,7 +208,47 @@ async function migrateBlobSidecarsFromHotToColdDb(
db.blobSidecarsArchive.batchPutBinary(canonicalBlobSidecarsEntries),
db.blobSidecars.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
migratedWrappedBlobSidecars += canonicalBlobSidecarsEntries.length;
}

return migratedWrappedBlobSidecars;
}

async function migrateDataColumnSidecarsFromHotToColdDb(
config: ChainForkConfig,
db: IBeaconDb,
blocks: BlockRootSlot[]
): Promise<number> {
let migratedWrappedDataColumns = 0;
for (let i = 0; i < blocks.length; i += BLOB_SIDECAR_BATCH_SIZE) {
const toIdx = Math.min(i + BLOB_SIDECAR_BATCH_SIZE, blocks.length);
const canonicalBlocks = blocks.slice(i, toIdx);

// processCanonicalBlocks
if (canonicalBlocks.length === 0) break;

// load Buffer instead of ssz deserialized to improve performance
const canonicalDataColumnSidecarsEntries: KeyValue<Slot, Uint8Array>[] = await Promise.all(
canonicalBlocks
.filter((block) => config.getForkSeq(block.slot) >= ForkSeq.peerdas)
.map(async (block) => {
const bytes = await db.dataColumnSidecars.getBinary(block.root);
if (!bytes) {
throw Error(`No dataColumnSidecars found for slot ${block.slot} root ${toHex(block.root)}`);
}
return {key: block.slot, value: bytes};
})
);

// put to blockArchive db and delete block db
await Promise.all([
db.dataColumnSidecarsArchive.batchPutBinary(canonicalDataColumnSidecarsEntries),
db.dataColumnSidecars.batchDelete(canonicalBlocks.map((block) => block.root)),
]);
migratedWrappedDataColumns += canonicalDataColumnSidecarsEntries.length;
}

return migratedWrappedDataColumns;
}

/**
Expand Down
74 changes: 61 additions & 13 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import {toHexString} from "@chainsafe/ssz";
import {capella, ssz, altair, BeaconBlock} from "@lodestar/types";
import {ForkLightClient, ForkSeq, INTERVALS_PER_SLOT, MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {
ForkName,
ForkLightClient,
ForkSeq,
INTERVALS_PER_SLOT,
MAX_SEED_LOOKAHEAD,
SLOTS_PER_EPOCH,
} from "@lodestar/params";
import {
CachedBeaconStateAltair,
computeEpochAtSlot,
Expand Down Expand Up @@ -101,6 +108,39 @@ export async function importBlock(
this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
callInNextEventLoop(async () => {
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

// dataPromise will not end up here, but preDeneb could. In future we might also allow syncing
// out of data range blocks and import then in forkchoice although one would not be able to
// attest and propose with such head similar to optimistic sync
if (blockInput.type === BlockInputType.availableData) {
const {blockData} = blockInput;
if (blockData.fork === ForkName.deneb) {
const {blobsSource, blobs} = blockData;

this.metrics?.importBlock.blobsBySource.inc({blobsSource});
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
} else if (blockData.fork === ForkName.peerdas) {
// TODO peerDAS build and emit the event for the datacolumns
}
}
});

// 3. Import attestations to fork choice
//
// - For each attestation
Expand Down Expand Up @@ -424,16 +464,20 @@ export async function importBlock(
blockInput.type === BlockInputType.availableData &&
this.emitter.listenerCount(routes.events.EventType.blobSidecar)
) {
const {blobs} = blockInput.blockData;
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
if (blockInput.blockData.fork === ForkName.deneb) {
const {blobs} = blockInput.blockData;
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
} else {
// TODO add event for datacolumns
}
}
});
Expand All @@ -454,8 +498,12 @@ export async function importBlock(
// out of data range blocks and import then in forkchoice although one would not be able to
// attest and propose with such head similar to optimistic sync
if (blockInput.type === BlockInputType.availableData) {
const {blobsSource} = blockInput.blockData;
this.metrics?.importBlock.blobsBySource.inc({blobsSource});
if (blockInput.blockData.fork === ForkName.deneb) {
const {blobsSource} = blockInput.blockData;
this.metrics?.importBlock.blobsBySource.inc({blobsSource});
} else {
// TODO add data columns metrics
}
}

const advancedSlot = this.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);
Expand Down
Loading
Loading