Skip to content

Commit

Permalink
Merge pull request #67 from bobanetwork/ledgerwatch-devel
Browse files Browse the repository at this point in the history
Merge upstream 20231108
  • Loading branch information
boyuan-chen authored Nov 9, 2023
2 parents a894eb0 + 4759fc0 commit 40dfc8b
Show file tree
Hide file tree
Showing 97 changed files with 4,369 additions and 999 deletions.
32 changes: 32 additions & 0 deletions cl/persistence/beacon_indicies/indicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,38 @@ func MarkRootCanonical(ctx context.Context, tx kv.RwTx, slot uint64, blockRoot l
return tx.Put(kv.CanonicalBlockRoots, base_encoding.Encode64(slot), blockRoot[:])
}

func WriteExecutionBlockNumber(tx kv.RwTx, blockRoot libcommon.Hash, blockNumber uint64) error {
return tx.Put(kv.BlockRootToBlockNumber, blockRoot[:], base_encoding.Encode64(blockNumber))
}

func WriteExecutionBlockHash(tx kv.RwTx, blockRoot, blockHash libcommon.Hash) error {
return tx.Put(kv.BlockRootToBlockHash, blockRoot[:], blockHash[:])
}

func ReadExecutionBlockNumber(tx kv.Tx, blockRoot libcommon.Hash) (*uint64, error) {
val, err := tx.GetOne(kv.BlockRootToBlockNumber, blockRoot[:])
if err != nil {
return nil, err
}
if len(val) == 0 {
return nil, nil
}
ret := new(uint64)
*ret = base_encoding.Decode64(val)
return ret, nil
}

func ReadExecutionBlockHash(tx kv.Tx, blockRoot libcommon.Hash) (libcommon.Hash, error) {
val, err := tx.GetOne(kv.BlockRootToBlockHash, blockRoot[:])
if err != nil {
return libcommon.Hash{}, err
}
if len(val) == 0 {
return libcommon.Hash{}, nil
}
return libcommon.BytesToHash(val), nil
}

func WriteBeaconBlockHeader(ctx context.Context, tx kv.RwTx, signedHeader *cltypes.SignedBeaconBlockHeader) error {
headersBytes, err := signedHeader.EncodeSSZ(nil)
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions cl/persistence/beacon_indicies/indicies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,35 @@ func TestReadBeaconBlockHeader(t *testing.T) {
require.Equal(t, headerRoot, blockRoot)

}

func TestWriteExecutionBlockNumber(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
tx, _ := db.BeginRw(context.Background())
defer tx.Rollback()

tHash := libcommon.HexToHash("0x2")
require.NoError(t, WriteExecutionBlockNumber(tx, tHash, 1))
require.NoError(t, WriteExecutionBlockNumber(tx, tHash, 2))
require.NoError(t, WriteExecutionBlockNumber(tx, tHash, 3))

// Try to retrieve the block's slot by its blockRoot and verify
blockNumber, err := ReadExecutionBlockNumber(tx, tHash)
require.NoError(t, err)
require.Equal(t, uint64(3), *blockNumber)
}

func TestWriteExecutionBlockHash(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
tx, _ := db.BeginRw(context.Background())
defer tx.Rollback()

tHash := libcommon.HexToHash("0x2")
tHash2 := libcommon.HexToHash("0x3")
require.NoError(t, WriteExecutionBlockHash(tx, tHash, tHash2))
// Try to retrieve the block's slot by its blockRoot and verify
tHash3, err := ReadExecutionBlockHash(tx, tHash)
require.NoError(t, err)
require.Equal(t, tHash2, tHash3)
}
40 changes: 18 additions & 22 deletions cl/persistence/block_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package persistence

import (
"context"
"errors"
"fmt"
"io"
"path"
Expand All @@ -18,6 +19,8 @@ import (
"github.com/spf13/afero"
)

const subDivisionFolderSize = 10_000

type beaconChainDatabaseFilesystem struct {
rawDB RawBeaconBlockChain
cfg *clparams.BeaconChainConfig
Expand Down Expand Up @@ -73,6 +76,9 @@ func (b beaconChainDatabaseFilesystem) GetRange(ctx context.Context, tx kv.Tx, f
slot := slots[idx]

r, err := b.rawDB.BlockReader(ctx, slot, blockRoot)
if errors.Is(err, afero.ErrFileNotFound) {
continue
}
if err != nil {
return nil, err
}
Expand All @@ -98,7 +104,7 @@ func (b beaconChainDatabaseFilesystem) PurgeRange(ctx context.Context, tx kv.RwT
return err
}

return beacon_indicies.PruneBlockRoots(ctx, tx, from, from+count)
return nil
}

func (b beaconChainDatabaseFilesystem) WriteBlock(ctx context.Context, tx kv.RwTx, block *cltypes.SignedBeaconBlock, canonical bool) error {
Expand Down Expand Up @@ -139,6 +145,15 @@ func (b beaconChainDatabaseFilesystem) WriteBlock(ctx context.Context, tx kv.RwT
if err != nil {
return err
}
if block.Version() >= clparams.BellatrixVersion {
if err := beacon_indicies.WriteExecutionBlockNumber(tx, blockRoot, block.Block.Body.ExecutionPayload.BlockNumber); err != nil {
return err
}
if err := beacon_indicies.WriteExecutionBlockHash(tx, blockRoot, block.Block.Body.ExecutionPayload.BlockHash); err != nil {
return err
}
}

if err := beacon_indicies.WriteBeaconBlockHeaderAndIndicies(ctx, tx, &cltypes.SignedBeaconBlockHeader{
Signature: block.Signature,
Header: &cltypes.BeaconBlockHeader{
Expand All @@ -156,28 +171,9 @@ func (b beaconChainDatabaseFilesystem) WriteBlock(ctx context.Context, tx kv.RwT

// SlotToPaths define the file structure to store a block
//
// superEpoch = floor(slot / (epochSize ^ 2))
// epoch = floot(slot / epochSize)
// file is to be stored at
// "/signedBeaconBlock/{superEpoch}/{epoch}/{root}.ssz_snappy"
// "/signedBeaconBlock/{slot/10_000}/{root}.ssz_snappy"
func RootToPaths(slot uint64, root libcommon.Hash, config *clparams.BeaconChainConfig) (folderPath string, filePath string) {
folderPath = path.Clean(fmt.Sprintf("%d/%d", slot/(config.SlotsPerEpoch*config.SlotsPerEpoch), slot/config.SlotsPerEpoch))
folderPath = path.Clean(fmt.Sprintf("%d", slot/subDivisionFolderSize))
filePath = path.Clean(fmt.Sprintf("%s/%x.sz", folderPath, root))
return
}

func ValidateEpoch(fs afero.Fs, epoch uint64, config *clparams.BeaconChainConfig) error {
superEpoch := epoch / (config.SlotsPerEpoch)

// the folder path is superEpoch/epoch
folderPath := path.Clean(fmt.Sprintf("%d/%d", superEpoch, epoch))

fi, err := afero.ReadDir(fs, folderPath)
if err != nil {
return err
}
for _, fn := range fi {
fn.Name()
}
return nil
}
64 changes: 28 additions & 36 deletions cl/persistence/format/snapshot_format/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"sync"

libcommon "github.com/ledgerwatch/erigon-lib/common"

"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/persistence/format/chunk_encoding"
Expand All @@ -17,7 +19,8 @@ var buffersPool = sync.Pool{
}

type ExecutionBlockReaderByNumber interface {
BlockByNumber(number uint64) (*cltypes.Eth1Block, error)
TransactionsSSZ(w io.Writer, number uint64, hash libcommon.Hash) error
WithdrawalsSZZ(w io.Writer, number uint64, hash libcommon.Hash) error
}

const (
Expand All @@ -33,21 +36,22 @@ const (
)

func writeExecutionBlockPtr(w io.Writer, p *cltypes.Eth1Block) error {
temp := make([]byte, 8)
temp := make([]byte, 40)
binary.BigEndian.PutUint64(temp, p.BlockNumber)
copy(temp[8:], p.BlockHash[:])

return chunk_encoding.WriteChunk(w, temp, chunk_encoding.PointerDataType)
}

func readExecutionBlockPtr(r io.Reader) (uint64, error) {
func readExecutionBlockPtr(r io.Reader) (uint64, libcommon.Hash, error) {
b, dT, err := chunk_encoding.ReadChunkToBytes(r)
if err != nil {
return 0, err
return 0, libcommon.Hash{}, err
}
if dT != chunk_encoding.PointerDataType {
return 0, fmt.Errorf("malformed beacon block, invalid block pointer type %d, expected: %d", dT, chunk_encoding.ChunkDataType)
return 0, libcommon.Hash{}, fmt.Errorf("malformed beacon block, invalid block pointer type %d, expected: %d", dT, chunk_encoding.ChunkDataType)
}
return binary.BigEndian.Uint64(b), nil
return binary.BigEndian.Uint64(b[:8]), libcommon.BytesToHash(b[8:]), nil
}

func computeInitialOffset(version clparams.StateVersion) uint64 {
Expand All @@ -68,22 +72,25 @@ func computeInitialOffset(version clparams.StateVersion) uint64 {
}

// WriteBlockForSnapshot writes a block to the given writer in the format expected by the snapshot.
func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error {
// buf is just a reusable buffer. if it had to grow it will be returned back as grown.
func WriteBlockForSnapshot(w io.Writer, block *cltypes.SignedBeaconBlock, reusable []byte) ([]byte, error) {
bodyRoot, err := block.Block.Body.HashSSZ()
if err != nil {
return err
return reusable, err
}
reusable = reusable[:0]
// Maybe reuse the buffer?
encoded, err := block.EncodeSSZ(nil)
encoded, err := block.EncodeSSZ(reusable)
if err != nil {
return err
return reusable, err
}
reusable = encoded
version := block.Version()
if _, err := w.Write([]byte{byte(version)}); err != nil {
return err
return reusable, err
}
if _, err := w.Write(bodyRoot[:]); err != nil {
return err
return reusable, err
}
currentChunkLength := computeInitialOffset(version)

Expand All @@ -96,20 +103,21 @@ func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error
currentChunkLength += uint64(body.VoluntaryExits.EncodingSizeSSZ())
// Write the chunk and chunk attestations
if err := chunk_encoding.WriteChunk(w, encoded[:currentChunkLength], chunk_encoding.ChunkDataType); err != nil {
return err
return reusable, err
}
// we are done if we are before altair
if version <= clparams.AltairVersion {
return nil
return reusable, nil
}
encoded = encoded[currentChunkLength+uint64(body.ExecutionPayload.EncodingSizeSSZ()):]
if err := writeExecutionBlockPtr(w, body.ExecutionPayload); err != nil {
return err
encoded = encoded[currentChunkLength:]
if err := writeEth1BlockForSnapshot(w, encoded[:body.ExecutionPayload.EncodingSizeSSZ()], body.ExecutionPayload); err != nil {
return reusable, err
}
encoded = encoded[body.ExecutionPayload.EncodingSizeSSZ():]
if version <= clparams.BellatrixVersion {
return nil
return reusable, nil
}
return chunk_encoding.WriteChunk(w, encoded, chunk_encoding.ChunkDataType)
return reusable, chunk_encoding.WriteChunk(w, encoded, chunk_encoding.ChunkDataType)
}

func readMetadataForBlock(r io.Reader, b []byte) (clparams.StateVersion, error) {
Expand Down Expand Up @@ -153,23 +161,7 @@ func ReadRawBlockFromSnapshot(r io.Reader, out io.Writer, executionReader Execut
return v, nil
}
// Read the block pointer and retrieve chunk4 from the execution reader
blockPointer, err := readExecutionBlockPtr(r)
if err != nil {
return v, err
}
executionBlock, err := executionReader.BlockByNumber(blockPointer)
if err != nil {
return v, err
}
if executionBlock == nil {
return v, fmt.Errorf("execution block %d not found", blockPointer)
}
// TODO(Giulio2002): optimize GC
eth1Bytes, err := executionBlock.EncodeSSZ(nil)
if err != nil {
return v, err
}
if _, err := out.Write(eth1Bytes); err != nil {
if _, err := readEth1BlockFromSnapshot(r, out, executionReader, cfg); err != nil {
return v, err
}
if v <= clparams.BellatrixVersion {
Expand Down
3 changes: 2 additions & 1 deletion cl/persistence/format/snapshot_format/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func TestBlockSnapshotEncoding(t *testing.T) {
br = snapshot_format.MockBlockReader{Block: blk.Block.Body.ExecutionPayload}
}
var b bytes.Buffer
require.NoError(t, snapshot_format.WriteBlockForSnapshot(blk, &b))
_, err := snapshot_format.WriteBlockForSnapshot(&b, blk, nil)
require.NoError(t, err)
blk2, err := snapshot_format.ReadBlockFromSnapshot(&b, &br, &clparams.MainnetBeaconConfig)
require.NoError(t, err)
_ = blk2
Expand Down
92 changes: 92 additions & 0 deletions cl/persistence/format/snapshot_format/eth1_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package snapshot_format

import (
"fmt"
"io"

"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cl/persistence/format/chunk_encoding"
"github.com/ledgerwatch/erigon/core/types"
)

// WriteEth1BlockForSnapshot writes an execution block to the given writer in the format expected by the snapshot.
func writeEth1BlockForSnapshot(w io.Writer, encoded []byte, block *cltypes.Eth1Block) error {
pos := (length.Hash /*ParentHash*/ + length.Addr /*Miner*/ + length.Hash /*StateRoot*/ + length.Hash /*ReceiptsRoot*/ + types.BloomByteLength /*Bloom*/ +
length.Hash /*PrevRandao*/ + 32 /*BlockNumber + Timestamp + GasLimit + GasUsed */ + 4 /*ExtraDataOffset*/ + length.Hash /*BaseFee*/ +
length.Hash /*BlockHash*/ + 4 /*TransactionOffset*/)

if block.Version() >= clparams.CapellaVersion {
pos += 4 /*WithdrawalsOffset*/
}
if block.Version() >= clparams.DenebVersion {
pos += 16 /*BlobGasUsed + ExcessBlobGas*/
}
// Add metadata first for Eth1Block, aka. version
if _, err := w.Write([]byte{byte(block.Version())}); err != nil {
return err
}

// Maybe reuse the buffer?
pos += block.Extra.EncodingSizeSSZ()
if err := chunk_encoding.WriteChunk(w, encoded[:pos], chunk_encoding.ChunkDataType); err != nil {
return err
}
pos += block.Withdrawals.EncodingSizeSSZ()
pos += block.Transactions.EncodingSizeSSZ()
encoded = encoded[pos:]
//pos = 0
// write the block pointer
if err := writeExecutionBlockPtr(w, block); err != nil {
return err
}
// From now on here, just finish up
return chunk_encoding.WriteChunk(w, encoded, chunk_encoding.ChunkDataType)
}

func readEth1BlockFromSnapshot(r io.Reader, out io.Writer, executionReader ExecutionBlockReaderByNumber, cfg *clparams.BeaconChainConfig) (clparams.StateVersion, error) {
// Metadata section is just the current hardfork of the block.
vArr := make([]byte, 1)
if _, err := r.Read(vArr); err != nil {
return 0, err
}
v := clparams.StateVersion(vArr[0])

// Read the first chunk
dT1, err := chunk_encoding.ReadChunk(r, out)
if err != nil {
return v, err
}
if dT1 != chunk_encoding.ChunkDataType {
return v, fmt.Errorf("malformed beacon block, invalid chunk 1 type %d, expected: %d", dT1, chunk_encoding.ChunkDataType)
}
// Read the block pointer and retrieve chunk4 from the execution reader
blockNumber, blockHash, err := readExecutionBlockPtr(r)
if err != nil {
return v, err
}
err = executionReader.TransactionsSSZ(out, blockNumber, blockHash)
if err != nil {
return v, err
}

if v < clparams.CapellaVersion {
return v, nil
}
err = executionReader.WithdrawalsSZZ(out, blockNumber, blockHash)
if err != nil {
return v, err
}

// Read the 5h chunk
dT2, err := chunk_encoding.ReadChunk(r, out)
if err != nil {
return v, err
}
if dT2 != chunk_encoding.ChunkDataType {
return v, fmt.Errorf("malformed beacon block, invalid chunk 5 type %d, expected: %d", dT2, chunk_encoding.ChunkDataType)
}

return v, nil
}
Loading

0 comments on commit 40dfc8b

Please sign in to comment.