Skip to content

Commit

Permalink
tests: add catchpoint downloading/parsing to e2e catchup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cce committed Jan 9, 2025
1 parent c60db8d commit 3b7a939
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 88 deletions.
2 changes: 1 addition & 1 deletion ledger/acctdeltas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo
last64KSize = chunkSize
last64KAccountCreationTime = time.Duration(0)
}
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
chunk.Balances = make([]encoded.BalanceRecordV6, chunkSize)
for i := uint64(0); i < chunkSize; i++ {
var randomAccount encoded.BalanceRecordV6
Expand Down
25 changes: 14 additions & 11 deletions ledger/catchpointfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type catchpointFileWriter struct {
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunk CatchpointSnapshotChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
Expand All @@ -80,12 +80,15 @@ type catchpointFileWriter struct {
onlineRoundParamsDone bool
}

type catchpointFileBalancesChunkV5 struct {
// CatchpointSnapshotChunkV5 defines the encoding of "balances.X.msgpack" files in the catchpoint snapshot
// used before database schema v6, which split accounts from asset/app resource data.
type CatchpointSnapshotChunkV5 struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`
Balances []encoded.BalanceRecordV5 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
}

type catchpointFileChunkV6 struct {
// CatchpointSnapshotChunkV6 defines the current encoding of "balances.X.msgpack" files in the catchpoint snapshot.
type CatchpointSnapshotChunkV6 struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
Expand All @@ -95,7 +98,7 @@ type catchpointFileChunkV6 struct {
OnlineRoundParams []encoded.OnlineRoundParamsRecordV6 `codec:"orp,allocbound=BalancesPerCatchpointFileChunk"`
}

func (chunk catchpointFileChunkV6) empty() bool {
func (chunk CatchpointSnapshotChunkV6) empty() bool {
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0 && len(chunk.OnlineAccounts) == 0 && len(chunk.OnlineRoundParams) == 0
}

Expand Down Expand Up @@ -216,7 +219,7 @@ func (cw *catchpointFileWriter) FileWriteStep(stepCtx context.Context) (more boo
return
}

writerRequest := make(chan catchpointFileChunkV6, 1)
writerRequest := make(chan CatchpointSnapshotChunkV6, 1)
writerResponse := make(chan error, 2)
go cw.asyncWriter(writerRequest, writerResponse, cw.chunkNum)
defer func() {
Expand Down Expand Up @@ -298,11 +301,11 @@ func (cw *catchpointFileWriter) FileWriteStep(stepCtx context.Context) (more boo
cw.chunkNum++
writerRequest <- cw.chunk
// indicate that we need a readDatabaseStep
cw.chunk = catchpointFileChunkV6{}
cw.chunk = CatchpointSnapshotChunkV6{}
}
}

func (cw *catchpointFileWriter) asyncWriter(chunks chan catchpointFileChunkV6, response chan error, chunkNum uint64) {
func (cw *catchpointFileWriter) asyncWriter(chunks chan CatchpointSnapshotChunkV6, response chan error, chunkNum uint64) {
defer close(response)
for chk := range chunks {
chunkNum++
Expand Down Expand Up @@ -341,7 +344,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
return err
}
if len(balances) > 0 {
cw.chunk = catchpointFileChunkV6{Balances: balances, numAccounts: numAccounts}
cw.chunk = CatchpointSnapshotChunkV6{Balances: balances, numAccounts: numAccounts}
return nil
}
// It might seem reasonable, but do not close accountsIterator here,
Expand Down Expand Up @@ -372,7 +375,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
}
}
if len(kvrs) > 0 {
cw.chunk = catchpointFileChunkV6{KVs: kvrs}
cw.chunk = CatchpointSnapshotChunkV6{KVs: kvrs}
return nil
}
// Do not close kvRows here, or it will start over on the next iteration
Expand Down Expand Up @@ -401,7 +404,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
}
}
if len(onlineAccts) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineAccounts: onlineAccts}
cw.chunk = CatchpointSnapshotChunkV6{OnlineAccounts: onlineAccts}
return nil
}
// Do not close onlineAccountRows here, or it will start over on the next iteration
Expand Down Expand Up @@ -430,7 +433,7 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
}
}
if len(onlineRndParams) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineRoundParams: onlineRndParams}
cw.chunk = CatchpointSnapshotChunkV6{OnlineRoundParams: onlineRndParams}
return nil
}
// Do not close onlineRndParamsRows here, or it will start over on the next iteration
Expand Down
32 changes: 21 additions & 11 deletions ledger/catchpointfilewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,12 @@ func TestCatchpointFileBalancesChunkEncoding(t *testing.T) {
kvs[i] = kv
}

chunk1 := catchpointFileChunkV6{}
chunk1 := CatchpointSnapshotChunkV6{}
chunk1.Balances = balances
chunk1.KVs = kvs
encodedChunk := chunk1.MarshalMsg(nil)

var chunk2 catchpointFileChunkV6
var chunk2 CatchpointSnapshotChunkV6
_, err := chunk2.UnmarshalMsg(encodedChunk)
require.NoError(t, err)

Expand Down Expand Up @@ -291,7 +291,7 @@ func TestBasicCatchpointWriter(t *testing.T) {
balanceFileName := fmt.Sprintf(catchpointBalancesFileNameTemplate, 1)
require.Equal(t, balanceFileName, catchpointContent[1].headerName)

var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
err = protocol.Decode(catchpointContent[1].data, &chunk)
require.NoError(t, err)
require.Equal(t, uint64(len(accts)), uint64(len(chunk.Balances)))
Expand Down Expand Up @@ -834,14 +834,16 @@ func TestExactAccountChunk(t *testing.T) {
partitiontest.PartitionTest(t)
// t.Parallel() // probably not good to parallelize catchpoint file save/load

t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39, 40) })
t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 40) })
t.Run("v40_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 100) })
t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 40) })
t.Run("future_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 100) })
t.Run("v39", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV39, 40, false) })
t.Run("v40", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 40, false) })
t.Run("v40_noSPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 63, false) })
t.Run("v40_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusV40, 64, true) })
t.Run("future", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 40, false) })
t.Run("future_SPstall", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 64, true) })
t.Run("future_SPstall300", func(t *testing.T) { testExactAccountChunk(t, protocol.ConsensusFuture, 300, true) })
}

func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraBlocks int) {
func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraBlocks int, longHistory bool) {
genBalances, addrs, _ := ledgertesting.NewTestGenesis(func(c *ledgertesting.GenesisCfg) {
c.OnlineCount = 1 // addrs[0] is online
}, ledgertesting.TurnOffRewards)
Expand Down Expand Up @@ -901,27 +903,35 @@ func testExactAccountChunk(t *testing.T, proto protocol.ConsensusVersion, extraB
valLowestRound := dl.validator.trackers.acctsOnline.voters.lowestRound(valDBRound)
require.Equal(t, genLowestRound, valLowestRound)
require.Equal(t, genDBRound, valDBRound)
// genDBRound is MaxAcctLookback (4) rounds behind genR
require.Equal(t, genR, genDBRound+basics.Round(dl.generator.cfg.MaxAcctLookback))
// This assert, plus previous assert on genR guarantees that genDBRound is:
// BalancesPerCatchpointFileChunk-12+extraBlocks-MaxAcctLookback (560 for 64 extraBlocks, 536 for 40 extraBlocks)

var onlineExcludeBefore basics.Round
// we added so many blocks that lowestRound is stuck at first state proof, round 240?
if normalHorizon := (genDBRound + 1).SubSaturate(basics.Round(params.MaxBalLookback)); normalHorizon == genLowestRound {
t.Logf("subtest is exercising case where 320 rounds of history are already in DB")
require.EqualValues(t, genLowestRound, params.StateProofInterval-params.StateProofVotersLookback)
require.False(t, longHistory)
} else if normalHorizon > genLowestRound {
t.Logf("subtest is exercising case where votersTracker causes onlineaccounts & onlineroundparams to extend history to round %d (DBRound %d)", genLowestRound, genDBRound)
onlineExcludeBefore = normalHorizon // fails without this adjustment
require.True(t, longHistory)
} else {
require.FailNow(t, "unexpected lowest round %d, normal horizon %d", genLowestRound, normalHorizon)
}

cph := testWriteCatchpoint(t, config.Consensus[proto], dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0, onlineExcludeBefore)

decodedData := readCatchpointFile(t, catchpointFilePath)

// decode and verify some stats about balances chunk contents
var chunks []catchpointFileChunkV6
var chunks []CatchpointSnapshotChunkV6
for i, d := range decodedData {
t.Logf("section %d: %s", i, d.headerName)
if strings.HasPrefix(d.headerName, "balances.") {
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
err := protocol.Decode(d.data, &chunk)
require.NoError(t, err)
t.Logf("chunk %d balances: %d, kvs: %d, onlineaccounts: %d, onlineroundparams: %d", i, len(chunk.Balances), len(chunk.KVs), len(chunk.OnlineAccounts), len(chunk.OnlineRoundParams))
Expand Down
4 changes: 2 additions & 2 deletions ledger/catchupaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
// we won't get to this point, since we've already verified the version in processStagingContent
return errors.New("unsupported version")
case CatchpointFileVersionV5:
var balances catchpointFileBalancesChunkV5
var balances CatchpointSnapshotChunkV5
err = protocol.Decode(bytes, &balances)
if err != nil {
return err
Expand All @@ -542,7 +542,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte
fallthrough
case CatchpointFileVersionV8:
// V8 added online accounts and online round params data + hashes, and added them to the v6 chunk format
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
err = protocol.Decode(bytes, &chunk)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions ledger/catchupaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func createTestingEncodedChunks(accountsCount uint64) (encodedAccountChunks [][]
if accounts >= accountsCount-64*1024 && last64KIndex == -1 {
last64KIndex = len(encodedAccountChunks)
}
var chunk catchpointFileChunkV6
var chunk CatchpointSnapshotChunkV6
chunk.Balances = make([]encoded.BalanceRecordV6, chunkSize)
for i := uint64(0); i < chunkSize; i++ {
var randomAccount encoded.BalanceRecordV6
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestCatchupAccessorResourceCountMismatch(t *testing.T) {
err = catchpointAccessor.ProcessStagingBalances(ctx, CatchpointContentFileName, encodedFileHeader, &progress)
require.NoError(t, err)

var balances catchpointFileChunkV6
var balances CatchpointSnapshotChunkV6
balances.Balances = make([]encoded.BalanceRecordV6, 1)
var randomAccount encoded.BalanceRecordV6
accountData := trackerdb.BaseAccountData{}
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestCatchupAccessorProcessStagingBalances(t *testing.T) {
}

// make chunks
chunks := []catchpointFileChunkV6{
chunks := []CatchpointSnapshotChunkV6{
{
Balances: []encoded.BalanceRecordV6{
encodedBalanceRecordFromBase(ledgertesting.RandomAddress(), acctA, nil, false),
Expand Down
Loading

0 comments on commit 3b7a939

Please sign in to comment.