diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index 0d461cff07..fe3ca0266c 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -160,6 +160,50 @@ func TestNewLedgerBuffer(t *testing.T) { assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) } +func TestNewLedgerBufferSizeLessThanRangeSize(t *testing.T) { + startLedger := uint32(10) + endLedger := uint32(30) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 10 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 10 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := startLedger; i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + +func TestNewLedgerBufferSizeLargerThanRangeSize(t *testing.T) { + startLedger := uint32(1) + endLedger := uint32(15) + bsb := createBufferedStorageBackendForTesting() + bsb.config.NumWorkers = 2 + bsb.config.BufferSize = 100 + ledgerRange := BoundedRange(startLedger, endLedger) + mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, ledgerPerFileCount) + bsb.dataStore = mockDataStore + + ledgerBuffer, err := bsb.newLedgerBuffer(ledgerRange) + assert.Eventually(t, func() bool { return len(ledgerBuffer.ledgerQueue) == 15 }, time.Second*1, time.Millisecond*50) + assert.NoError(t, err) + + for i := startLedger; i < endLedger; i++ { + lcm, err := ledgerBuffer.getFromLedgerQueue(context.Background()) + assert.NoError(t, err) + assert.Equal(t, xdr.Uint32(i), lcm.StartSequence) + } + assert.Equal(t, ledgerRange, ledgerBuffer.ledgerRange) +} + func TestBSBGetLatestLedgerSequence(t *testing.T) { startLedger := uint32(3) endLedger := uint32(5) diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index d23bf0bfbd..7ee9dda083 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -13,6 +13,8 @@ import ( "github.com/stellar/go/support/collections/heap" "github.com/stellar/go/support/compressxdr" "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/ordered" + "github.com/stellar/go/xdr" ) @@ -54,6 +56,10 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu less := func(a, b ledgerBatchObject) bool { return a.startLedger < b.startLedger } + // ensure BufferSize does not exceed the total range + if ledgerRange.bounded { + bsb.config.BufferSize = uint32(ordered.Min(int(bsb.config.BufferSize), int(ledgerRange.to-ledgerRange.from)+1)) + } pq := heap.New(less, int(bsb.config.BufferSize)) ledgerBuffer := &ledgerBuffer{ diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 6a8b0a708d..e3af62d580 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## Pending + +### Breaking Changes + +- `--parallel-job-size` configuration parameter for the `stellar-horizon db reingest` command has been removed. + Job size will be automatically determined based on the number of workers (configuration parameter --parallel-workers), distributing + the range equally among them. The minimum job size will remain 64 ledgers and the start and end ledger range will be rounded to + the nearest checkpoint.([5484](https://github.com/stellar/go/pull/5484)) ## 2.32.0 diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 92a732e002..58c096a782 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -42,7 +42,6 @@ var ( dbDetectGapsCmd *cobra.Command reingestForce bool parallelWorkers uint - parallelJobSize uint32 retries uint retryBackoffSeconds uint ledgerBackendStr string @@ -118,14 +117,6 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint(1), Usage: "[optional] if this flag is set to > 1, horizon will parallelize reingestion using the supplied number of workers", }, - { - Name: "parallel-job-size", - ConfigKey: ¶llelJobSize, - OptType: types.Uint32, - Required: false, - FlagDefault: uint32(100000), - Usage: "[optional] parallel workers will run jobs processing ledger batches of the supplied size", - }, { Name: "retries", ConfigKey: &retries, @@ -178,7 +169,7 @@ func ingestRangeCmdOpts() support.ConfigOptions { var dbReingestRangeCmdOpts = ingestRangeCmdOpts() var dbFillGapsCmdOpts = ingestRangeCmdOpts() -func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { +func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, minBatchSize, maxBatchSize uint, config horizon.Config, storageBackendConfig ingest.StorageBackendConfig) error { var err error if reingestForce && parallelWorkers > 1 { @@ -186,9 +177,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } maxLedgersPerFlush := ingest.MaxLedgersPerFlush - if parallelJobSize < maxLedgersPerFlush { - maxLedgersPerFlush = parallelJobSize - } ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, @@ -214,15 +202,12 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, } if parallelWorkers > 1 { - system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers) + system, systemErr := ingest.NewParallelSystems(ingestConfig, parallelWorkers, minBatchSize, maxBatchSize) if systemErr != nil { return systemErr } - return system.ReingestRange( - ledgerRanges, - parallelJobSize, - ) + return system.ReingestRange(ledgerRanges) } system, systemErr := ingest.NewSystem(ingestConfig) @@ -479,6 +464,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor } } + maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize var err error var storageBackendConfig ingest.StorageBackendConfig options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} @@ -486,12 +472,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { return err } - // when using buffered storage, performance observations have noted optimal parallel batch size - // of 100, apply that as default if the flag was absent. - if !viper.IsSet("parallel-job-size") { - parallelJobSize = 100 - } options.NoCaptiveCore = true + maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize } if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { @@ -501,6 +483,8 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor []history.LedgerRange{{StartSequence: argsUInt32[0], EndSequence: argsUInt32[1]}}, reingestForce, parallelWorkers, + ingest.MinBatchSize, + maxBatchSize, *horizonConfig, storageBackendConfig, ) @@ -541,6 +525,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor withRange = true } + maxBatchSize := ingest.MaxCaptiveCoreBackendBatchSize var err error var storageBackendConfig ingest.StorageBackendConfig options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false} @@ -549,6 +534,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor return err } options.NoCaptiveCore = true + maxBatchSize = ingest.MaxBufferedStorageBackendBatchSize } if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { @@ -569,7 +555,7 @@ func DefineDBCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, hor hlog.Infof("found gaps %v", gaps) } - return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, *horizonConfig, storageBackendConfig) + return runDBReingestRangeFn(gaps, reingestForce, parallelWorkers, ingest.MinBatchSize, maxBatchSize, *horizonConfig, storageBackendConfig) }, } diff --git a/services/horizon/cmd/db_test.go b/services/horizon/cmd/db_test.go index 6a00576bd3..a2dd5f014c 100644 --- a/services/horizon/cmd/db_test.go +++ b/services/horizon/cmd/db_test.go @@ -25,7 +25,7 @@ type DBCommandsTestSuite struct { } func (s *DBCommandsTestSuite) SetupSuite() { - runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, + runDBReingestRangeFn = func([]history.LedgerRange, bool, uint, uint, uint, horizon.Config, ingest.StorageBackendConfig) error { return nil } @@ -45,66 +45,19 @@ func (s *DBCommandsTestSuite) BeforeTest(suiteName string, testName string) { s.rootCmd = NewRootCmd() } -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForBufferedBackend() { +func (s *DBCommandsTestSuite) TestInvalidParameterParallelJobSize() { s.rootCmd.SetArgs([]string{ "db", "reingest", "range", "--db-url", s.db.DSN, "--network", "testnet", "--parallel-workers", "2", + "--parallel-job-size", "10", "--ledgerbackend", "datastore", "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", "2", "10"}) - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100)) -} - -func (s *DBCommandsTestSuite) TestDefaultParallelJobSizeForCaptiveBackend() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(100_000)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForCaptive() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--stellar-core-binary-path", "/test/core/bin/path", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "captive-core", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) -} - -func (s *DBCommandsTestSuite) TestUsesParallelJobSizeWhenSetForBuffered() { - s.rootCmd.SetArgs([]string{ - "db", "reingest", "range", - "--db-url", s.db.DSN, - "--network", "testnet", - "--parallel-workers", "2", - "--parallel-job-size", "5", - "--ledgerbackend", "datastore", - "--datastore-config", "../internal/ingest/testdata/config.storagebackend.toml", - "2", - "10"}) - - require.NoError(s.T(), s.rootCmd.Execute()) - require.Equal(s.T(), parallelJobSize, uint32(5)) + require.Equal(s.T(), "unknown flag: --parallel-job-size", s.rootCmd.Execute().Error()) } func (s *DBCommandsTestSuite) TestDbReingestAndFillGapsCmds() { diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 63ee7ba457..38f9fe1d3a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -100,6 +100,16 @@ func (s LedgerBackendType) String() string { return "" } +const ( + HistoryCheckpointLedgerInterval uint = 64 + // MinBatchSize is the minimum batch size for reingestion + MinBatchSize uint = HistoryCheckpointLedgerInterval + // MaxBufferedStorageBackendBatchSize is the maximum batch size for Buffered Storage reingestion + MaxBufferedStorageBackendBatchSize uint = 200 * HistoryCheckpointLedgerInterval + // MaxCaptiveCoreBackendBatchSize is the maximum batch size for Captive Core reingestion + MaxCaptiveCoreBackendBatchSize uint = 20_000 * HistoryCheckpointLedgerInterval +) + type StorageBackendConfig struct { DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"` diff --git a/services/horizon/internal/ingest/parallel.go b/services/horizon/internal/ingest/parallel.go index 4f07c21cc4..a2a641c5cf 100644 --- a/services/horizon/internal/ingest/parallel.go +++ b/services/horizon/internal/ingest/parallel.go @@ -8,11 +8,7 @@ import ( "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" -) - -const ( - historyCheckpointLedgerInterval = 64 - minBatchSize = historyCheckpointLedgerInterval + "github.com/stellar/go/support/ordered" ) type rangeError struct { @@ -27,23 +23,32 @@ func (e rangeError) Error() string { type ParallelSystems struct { config Config workerCount uint + minBatchSize uint + maxBatchSize uint systemFactory func(Config) (System, error) } -func NewParallelSystems(config Config, workerCount uint) (*ParallelSystems, error) { +func NewParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint) (*ParallelSystems, error) { // Leaving this because used in tests, will update after a code review. - return newParallelSystems(config, workerCount, NewSystem) + return newParallelSystems(config, workerCount, minBatchSize, maxBatchSize, NewSystem) } // private version of NewParallel systems, allowing to inject a mock system -func newParallelSystems(config Config, workerCount uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { +func newParallelSystems(config Config, workerCount uint, minBatchSize, maxBatchSize uint, systemFactory func(Config) (System, error)) (*ParallelSystems, error) { if workerCount < 1 { return nil, errors.New("workerCount must be > 0") } - + if minBatchSize != 0 && minBatchSize < HistoryCheckpointLedgerInterval { + return nil, fmt.Errorf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval) + } + if minBatchSize != 0 && maxBatchSize != 0 && maxBatchSize < minBatchSize { + return nil, errors.New("maxBatchSize cannot be less than minBatchSize") + } return &ParallelSystems{ config: config, workerCount: workerCount, + maxBatchSize: maxBatchSize, + minBatchSize: minBatchSize, systemFactory: systemFactory, }, nil } @@ -112,20 +117,27 @@ func enqueueReingestTasks(ledgerRanges []history.LedgerRange, batchSize uint32, } return lowestLedger } +func (ps *ParallelSystems) calculateParallelLedgerBatchSize(rangeSize uint32) uint32 { + // calculate the initial batch size based on available workers + batchSize := rangeSize / uint32(ps.workerCount) + + // ensure the batch size meets min threshold + if ps.minBatchSize > 0 { + batchSize = ordered.Max(batchSize, uint32(ps.minBatchSize)) + } -func calculateParallelLedgerBatchSize(rangeSize uint32, batchSizeSuggestion uint32, workerCount uint) uint32 { - batchSize := batchSizeSuggestion - if batchSize == 0 || rangeSize/batchSize < uint32(workerCount) { - // let's try to make use of all the workers - batchSize = rangeSize / uint32(workerCount) + // ensure the batch size does not exceed max threshold + if ps.maxBatchSize > 0 { + batchSize = ordered.Min(batchSize, uint32(ps.maxBatchSize)) } - // Use a minimum batch size to make it worth it in terms of overhead - if batchSize < minBatchSize { - batchSize = minBatchSize + + // round down to the nearest multiple of HistoryCheckpointLedgerInterval + if batchSize > uint32(HistoryCheckpointLedgerInterval) { + return batchSize / uint32(HistoryCheckpointLedgerInterval) * uint32(HistoryCheckpointLedgerInterval) } - // Also, round the batch size to the closest, lower or equal 64 multiple - return (batchSize / historyCheckpointLedgerInterval) * historyCheckpointLedgerInterval + // HistoryCheckpointLedgerInterval is the minimum batch size. + return uint32(HistoryCheckpointLedgerInterval) } func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { @@ -136,9 +148,9 @@ func totalRangeSize(ledgerRanges []history.LedgerRange) uint32 { return sum } -func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange, batchSizeSuggestion uint32) error { +func (ps *ParallelSystems) ReingestRange(ledgerRanges []history.LedgerRange) error { var ( - batchSize = calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges), batchSizeSuggestion, ps.workerCount) + batchSize = ps.calculateParallelLedgerBatchSize(totalRangeSize(ledgerRanges)) reingestJobQueue = make(chan history.LedgerRange) wg sync.WaitGroup diff --git a/services/horizon/internal/ingest/parallel_test.go b/services/horizon/internal/ingest/parallel_test.go index 8004a4048c..f011f51021 100644 --- a/services/horizon/internal/ingest/parallel_test.go +++ b/services/horizon/internal/ingest/parallel_test.go @@ -1,6 +1,8 @@ package ingest import ( + "fmt" + "math" "math/rand" "sort" "sync" @@ -15,13 +17,88 @@ import ( ) func TestCalculateParallelLedgerBatchSize(t *testing.T) { - assert.Equal(t, uint32(6656), calculateParallelLedgerBatchSize(20096, 20096, 3)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 20096, 4)) - assert.Equal(t, uint32(4992), calculateParallelLedgerBatchSize(20096, 0, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(64, 32, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(2, 256, 4)) - assert.Equal(t, uint32(64), calculateParallelLedgerBatchSize(20096, 64, 1)) + config := Config{} + result := &mockSystem{} + factory := func(c Config) (System, error) { + return result, nil + } + + // worker count 0 + system, err := newParallelSystems(config, 0, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) + assert.EqualError(t, err, "workerCount must be > 0") + + // worker count 1, range smaller than HistoryCheckpointLedgerInterval + system, err = newParallelSystems(config, 1, 50, 200, factory) + assert.EqualError(t, err, fmt.Sprintf("minBatchSize must be at least the %d", HistoryCheckpointLedgerInterval)) + + // worker count 1, max batch size smaller than min batch size + system, err = newParallelSystems(config, 1, 5000, 200, factory) + assert.EqualError(t, err, "maxBatchSize cannot be less than minBatchSize") + + // worker count 1, captive core batch size + system, _ = newParallelSystems(config, 1, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) + assert.Equal(t, uint32(MaxCaptiveCoreBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxCaptiveCoreBackendBatchSize)+10)) + assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0)) + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down + + // worker count 1, buffered storage batch size + system, _ = newParallelSystems(config, 1, MinBatchSize, MaxBufferedStorageBackendBatchSize, factory) + assert.Equal(t, uint32(MaxBufferedStorageBackendBatchSize), system.calculateParallelLedgerBatchSize(uint32(MaxBufferedStorageBackendBatchSize)+10)) + assert.Equal(t, uint32(MinBatchSize), system.calculateParallelLedgerBatchSize(0)) + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10048)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(10090)) // round down + + // worker count 1, no min/max batch size + system, _ = newParallelSystems(config, 1, 0, 0, factory) + assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + assert.Equal(t, uint32(20032), system.calculateParallelLedgerBatchSize(20090)) // round down + + // worker count 1, min/max batch size + system, _ = newParallelSystems(config, 1, 64, 20000, factory) + assert.Equal(t, uint32(19968), system.calculateParallelLedgerBatchSize(20096)) // round down + system, _ = newParallelSystems(config, 1, 64, 30000, factory) + assert.Equal(t, uint32(20096), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + + // Tests for worker count 2 + + // no min/max batch size + system, _ = newParallelSystems(config, 2, 0, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(60)) // range smaller than 64 + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(128)) // exact multiple + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096)) + + // range larger than max batch size + system, _ = newParallelSystems(config, 2, 64, 10000, factory) + assert.Equal(t, uint32(9984), system.calculateParallelLedgerBatchSize(20096)) // round down + + // range smaller than min batch size + system, _ = newParallelSystems(config, 2, 64, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(50)) // min batch size + assert.Equal(t, uint32(10048), system.calculateParallelLedgerBatchSize(20096)) // exact multiple + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // min batch size + + // batch size equal to min + system, _ = newParallelSystems(config, 2, 100, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(100)) // round down + + // equal min/max batch size + system, _ = newParallelSystems(config, 2, 5000, 5000, factory) + assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) // round down + + // worker count 3 + system, _ = newParallelSystems(config, 3, 64, 7000, factory) + assert.Equal(t, uint32(6656), system.calculateParallelLedgerBatchSize(20096)) + + // worker count 4 + system, _ = newParallelSystems(config, 4, 64, 20000, factory) + assert.Equal(t, uint32(4992), system.calculateParallelLedgerBatchSize(20096)) //round down + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(64)) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(2)) + + // max possible workers + system, _ = newParallelSystems(config, math.MaxUint32, 0, 0, factory) + assert.Equal(t, uint32(64), system.calculateParallelLedgerBatchSize(math.MaxUint32)) } func TestParallelReingestRange(t *testing.T) { @@ -43,31 +120,27 @@ func TestParallelReingestRange(t *testing.T) { factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) assert.NoError(t, err) sort.Slice(rangesCalled, func(i, j int) bool { return rangesCalled[i].StartSequence < rangesCalled[j].StartSequence }) expected := []history.LedgerRange{ - {StartSequence: 1, EndSequence: 256}, {StartSequence: 257, EndSequence: 512}, {StartSequence: 513, EndSequence: 768}, {StartSequence: 769, EndSequence: 1024}, {StartSequence: 1025, EndSequence: 1280}, - {StartSequence: 1281, EndSequence: 1536}, {StartSequence: 1537, EndSequence: 1792}, {StartSequence: 1793, EndSequence: 2048}, {StartSequence: 2049, EndSequence: 2050}, + {StartSequence: 1, EndSequence: 640}, {StartSequence: 641, EndSequence: 1280}, {StartSequence: 1281, EndSequence: 1920}, {StartSequence: 1921, EndSequence: 2050}, } assert.Equal(t, expected, rangesCalled) rangesCalled = nil - system, err = newParallelSystems(config, 1, factory) + system, err = newParallelSystems(config, 1, 0, 0, factory) assert.NoError(t, err) result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1024)).Return(nil).Once() - err = system.ReingestRange([]history.LedgerRange{{1, 1024}}, 64) + err = system.ReingestRange([]history.LedgerRange{{1, 1024}}) result.AssertExpectations(t) expected = []history.LedgerRange{ - {StartSequence: 1, EndSequence: 64}, {StartSequence: 65, EndSequence: 128}, {StartSequence: 129, EndSequence: 192}, {StartSequence: 193, EndSequence: 256}, {StartSequence: 257, EndSequence: 320}, - {StartSequence: 321, EndSequence: 384}, {StartSequence: 385, EndSequence: 448}, {StartSequence: 449, EndSequence: 512}, {StartSequence: 513, EndSequence: 576}, {StartSequence: 577, EndSequence: 640}, - {StartSequence: 641, EndSequence: 704}, {StartSequence: 705, EndSequence: 768}, {StartSequence: 769, EndSequence: 832}, {StartSequence: 833, EndSequence: 896}, {StartSequence: 897, EndSequence: 960}, - {StartSequence: 961, EndSequence: 1024}, + {StartSequence: 1, EndSequence: 1024}, } assert.NoError(t, err) assert.Equal(t, expected, rangesCalled) @@ -77,19 +150,19 @@ func TestParallelReingestRangeError(t *testing.T) { config := Config{} result := &mockSystem{} // Fail on the second range - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Return(errors.New("failed because of foo")).Once() + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Return(errors.New("failed because of foo")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(nil) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1537)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, MinBatchSize, MaxCaptiveCoreBackendBatchSize, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1537, 2050]: error when processing [1537, 1792] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) } func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { @@ -98,27 +171,27 @@ func TestParallelReingestRangeErrorInEarlierJob(t *testing.T) { wg.Add(1) result := &mockSystem{} // Fail on an lower subrange after the first error - result.On("ReingestRange", []history.LedgerRange{{1025, 1280}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{641, 1280}}, false, false).Run(func(mock.Arguments) { // Wait for a more recent range to error wg.Wait() // This sleep should help making sure the result of this range is processed later than the one below // (there are no guarantees without instrumenting ReingestRange(), but that's too complicated) time.Sleep(50 * time.Millisecond) }).Return(errors.New("failed because of foo")).Once() - result.On("ReingestRange", []history.LedgerRange{{1537, 1792}}, false, false).Run(func(mock.Arguments) { + result.On("ReingestRange", []history.LedgerRange{{1281, 1920}}, false, false).Run(func(mock.Arguments) { wg.Done() }).Return(errors.New("failed because of bar")).Once() result.On("ReingestRange", mock.AnythingOfType("[]history.LedgerRange"), false, false).Return(error(nil)) - result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(1025)).Return(nil).Once() + result.On("RebuildTradeAggregationBuckets", uint32(1), uint32(641)).Return(nil).Once() factory := func(c Config) (System, error) { return result, nil } - system, err := newParallelSystems(config, 3, factory) + system, err := newParallelSystems(config, 3, 0, 0, factory) assert.NoError(t, err) - err = system.ReingestRange([]history.LedgerRange{{1, 2050}}, 258) + err = system.ReingestRange([]history.LedgerRange{{1, 2050}}) result.AssertExpectations(t) assert.Error(t, err) - assert.Equal(t, "job failed, recommended restart range: [1025, 2050]: error when processing [1025, 1280] range: failed because of foo", err.Error()) + assert.Equal(t, "job failed, recommended restart range: [641, 2050]: error when processing [641, 1280] range: failed because of foo", err.Error()) }