Skip to content

Commit

Permalink
Merge pull request #810 from oasisprotocol/ptrus/fix/dont-parallel-pr…
Browse files Browse the repository at this point in the history
…ework

fix: Run PreWork once, even for parallel analyzers
  • Loading branch information
ptrus authored Dec 3, 2024
2 parents 1505498 + 95a2c97 commit ca4ef6d
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 29 deletions.
1 change: 1 addition & 0 deletions .changelog/810.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix: Run PreWork once, even for parallel analyzers
4 changes: 4 additions & 0 deletions analyzer/aggregate_stats/aggregate_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func NewAggregateStatsAnalyzer(target storage.TargetStorage, logger *log.Logger)
}, nil
}

func (a *aggregateStatsAnalyzer) PreWork(ctx context.Context) error {
return nil
}

func (a *aggregateStatsAnalyzer) Start(ctx context.Context) {
a.aggregateStatsWorker(ctx)
}
Expand Down
4 changes: 4 additions & 0 deletions analyzer/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ var (

// Analyzer is a worker that analyzes a subset of the Oasis Network.
type Analyzer interface {
// PreWork is called before the analyzer starts its work.
// When running parallel analyzers, this method will only be called once.
PreWork(ctx context.Context) error

// Start starts the analyzer. The method should return once the analyzer
// is confident it has (and will have) no more work to do; that's possibly never.
Start(ctx context.Context)
Expand Down
39 changes: 23 additions & 16 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ const (
// block based analyzer.
type BlockProcessor interface {
// PreWork performs tasks that need to be done before the main processing loop starts.
// In parallel mode, this method is called by a single instance.
PreWork(ctx context.Context) error

// ProcessBlock processes the provided block, retrieving all required information
// from source storage and committing an atomically-executed batch of queries
// to target storage.
Expand Down Expand Up @@ -299,14 +301,29 @@ func expiresWithin(ctx context.Context, margin time.Duration) bool {
return time.Until(deadline) < margin
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Run prework.
if err := b.processor.PreWork(ctx); err != nil {
b.logger.Error("prework failed", "err", err)
return
func (b *blockBasedAnalyzer) PreWork(ctx context.Context) error {
// Run processor-specific pre-work first.
err := b.processor.PreWork(ctx)
if err != nil {
return fmt.Errorf("processor pre-work failed: %w", err)
}

// Run general block-based analyzer pre-work.
if b.slowSync && !b.ensureSlowSyncPrerequisites(ctx) {
// We cannot continue or recover automatically. Logging happens inside the validate function.
return fmt.Errorf("failed to validate prerequisites for slow-sync mode")
}

if !b.slowSync && b.softEnqueueGapsInProcessedBlocks(ctx) != nil {
// We cannot continue or recover automatically. Logging happens inside the validate function.
return fmt.Errorf("failed to soft-enqueue gaps in already-processed blocks")
}

return nil
}

// Start starts the block analyzer.
func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// The default max block height that the analyzer will process. This value is not
// indicative of the maximum height the Oasis blockchain can reach; rather it
// is set to golang's maximum int64 value for convenience.
Expand All @@ -316,16 +333,6 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
to = b.blockRange.To
}

if b.slowSync && !b.ensureSlowSyncPrerequisites(ctx) {
// We cannot continue or recover automatically. Logging happens inside the validate function.
return
}

if !b.slowSync && b.softEnqueueGapsInProcessedBlocks(ctx) != nil {
// We cannot continue or recover automatically. Logging happens inside the validate function.
return
}

// Start processing blocks.
backoff, err := util.NewBackoff(
100*time.Millisecond,
Expand Down
32 changes: 19 additions & 13 deletions analyzer/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func setupDB(t *testing.T) *postgres.Client {
return testDB
}

func setupAnalyzerWithPreWork(t *testing.T, testDb *postgres.Client, p *mockProcessor, cfg *config.BlockBasedAnalyzerConfig, mode analyzer.BlockAnalysisMode) analyzer.Analyzer {
a := setupAnalyzer(t, testDb, p, cfg, mode)
require.NoError(t, a.PreWork(context.Background()), "analyzer.PreWork")

return a
}

func setupAnalyzer(t *testing.T, testDb *postgres.Client, p *mockProcessor, cfg *config.BlockBasedAnalyzerConfig, mode analyzer.BlockAnalysisMode) analyzer.Analyzer {
// Modify the processor in-place (!): make sure the isFastSync field is in agreement with the analyzer's mode.
p.isFastSync = (mode == analyzer.FastSyncMode)
Expand Down Expand Up @@ -189,7 +196,7 @@ func TestMultipleFastSyncBlockAnalyzers(t *testing.T) {
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
analyzer := setupAnalyzerWithPreWork(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down Expand Up @@ -239,7 +246,7 @@ func TestFailingFastSyncBlockAnalyzers(t *testing.T) {
}
}
p := &mockProcessor{name: "test_analyzer", storage: db, fail: fail}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
analyzer := setupAnalyzerWithPreWork(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down Expand Up @@ -282,7 +289,7 @@ func TestDistinctFastSyncBlockAnalyzers(t *testing.T) {
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test_analyzer_%d", i), storage: db}
analyzer := setupAnalyzer(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
analyzer := setupAnalyzerWithPreWork(t, db, p, testFastSyncBlockBasedConfig, analyzer.FastSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down Expand Up @@ -319,7 +326,7 @@ func TestSlowSyncBlockAnalyzer(t *testing.T) {

db := setupDB(t)
p := &mockProcessor{name: "test_analyzer", storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)
analyzer := setupAnalyzerWithPreWork(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)

// Run the analyzer and ensure all blocks are processed.
var wg sync.WaitGroup
Expand Down Expand Up @@ -358,7 +365,7 @@ func TestFailingSlowSyncBlockAnalyzer(t *testing.T) {
}
return nil
}}
analyzer := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 100, BatchSize: 100}, analyzer.SlowSyncMode)
analyzer := setupAnalyzerWithPreWork(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 100, BatchSize: 100}, analyzer.SlowSyncMode)

// Run the analyzer and ensure all blocks are processed.
var wg sync.WaitGroup
Expand Down Expand Up @@ -395,7 +402,7 @@ func TestDistinctSlowSyncBlockAnalyzers(t *testing.T) {
as := []analyzer.Analyzer{}
for i := 0; i < numAnalyzers; i++ {
p := &mockProcessor{name: fmt.Sprintf("test_analyzer_%d", i), storage: db}
analyzer := setupAnalyzer(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)
analyzer := setupAnalyzerWithPreWork(t, db, p, testBlockBasedConfig, analyzer.SlowSyncMode)
ps = append(ps, p)
as = append(as, analyzer)
}
Expand Down Expand Up @@ -435,19 +442,19 @@ func TestFinalizeFastSync(t *testing.T) {
// Run multiple analyzers, each on a separate block range, to simulate past Nexus invocations.
// Note: The .Start() call blocks until the analyzer finishes.
p := &mockProcessor{name: "consensus", storage: db}
setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 10, FastSync: &config.FastSyncConfig{To: 10}}, analyzer.FastSyncMode).Start(ctx)
setupAnalyzerWithPreWork(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 10, FastSync: &config.FastSyncConfig{To: 10}}, analyzer.FastSyncMode).Start(ctx)
require.Nil(t, p.fastSyncFinalizedAt,
fmt.Sprintf("fast-sync analyzer should never finalize fast-sync, but it did at %d", p.fastSyncFinalizedAt))

p = &mockProcessor{name: "consensus", storage: db}
setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 5, To: 20}, analyzer.SlowSyncMode).Start(ctx)
setupAnalyzerWithPreWork(t, db, p, &config.BlockBasedAnalyzerConfig{From: 5, To: 20}, analyzer.SlowSyncMode).Start(ctx)
require.NotNil(t, p.fastSyncFinalizedAt,
"slow-sync analyzer should have finalized fast sync because it's taking up work from a fast-sync analyzer")
require.Equal(t, int64(10), *p.fastSyncFinalizedAt,
"slow-sync analyzer should finalize fast-sync at the height of the last fast-processed block")

p = &mockProcessor{name: "consensus", storage: db}
setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 21, To: 30}, analyzer.SlowSyncMode).Start(ctx)
setupAnalyzerWithPreWork(t, db, p, &config.BlockBasedAnalyzerConfig{From: 21, To: 30}, analyzer.SlowSyncMode).Start(ctx)
require.Nil(t, p.fastSyncFinalizedAt,
"second slow-sync analyzer should not finalize fast-sync because its range extends an existing slow-sync-analyzed range")
}
Expand Down Expand Up @@ -512,17 +519,16 @@ func TestRefuseSlowSyncOnDirtyRange(t *testing.T) {

p = &mockProcessor{name: "consensus", storage: db}
a := setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 1, To: 15}, analyzer.SlowSyncMode)
a.Start(ctx)
require.Zero(t, len(p.processedBlocks),
"slow-sync analyzer should refuse to process anything because the already-analyzed range is non-contiguous")
require.Error(t, a.PreWork(ctx), "slow-sync analyzer should refuse to process anything because the already-analyzed range is non-contiguous")

// Patch up the holes with a fast-sync analyzer.
fp := &mockProcessor{name: "consensus", storage: db}
setupAnalyzer(t, db, fp, &config.BlockBasedAnalyzerConfig{From: 1, To: 10, FastSync: &config.FastSyncConfig{To: 10}}, analyzer.FastSyncMode).Start(ctx)
setupAnalyzerWithPreWork(t, db, fp, &config.BlockBasedAnalyzerConfig{From: 1, To: 10, FastSync: &config.FastSyncConfig{To: 10}}, analyzer.FastSyncMode).Start(ctx)
require.Equal(t, fp.processedBlocks, map[uint64]struct{}{1: {}, 2: {}},
"fast-sync analyzer should have processed the missing blocks")

// Try a slow-sync analyzer again
require.NoError(t, a.PreWork(ctx), "slow-sync analyzer should be able to process the missing blocks")
a.Start(ctx)
require.Equal(t, p.processedBlocks, map[uint64]struct{}{11: {}, 12: {}, 13: {}, 14: {}, 15: {}},
"slow-sync analyzer should have processed the missing blocks")
Expand Down
4 changes: 4 additions & 0 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func NewAnalyzer[Item any](
return a, nil
}

func (a *itemBasedAnalyzer[Item]) PreWork(ctx context.Context) error {
return nil
}

// sendQueueLength reports the current number of items in the work queue to Prometheus.
func (a *itemBasedAnalyzer[Item]) sendQueueLengthMetric(ctx context.Context) (int, error) {
queueLength, err := a.processor.QueueLength(ctx)
Expand Down
19 changes: 19 additions & 0 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,14 +694,29 @@ func (a *Service) Start() {
// Start fast-sync analyzers.
fastSyncWg := map[string]*sync.WaitGroup{} // syncTag -> wg with all fast-sync analyzers with that tag
for _, an := range a.fastSyncAnalyzers {
var runPreWork bool
wg, ok := fastSyncWg[an.SyncTag]
if !ok {
wg = &sync.WaitGroup{}
fastSyncWg[an.SyncTag] = wg

// This is the first fast-sync analyzer with this SyncTag, so PreWork should be run.
// This relies on the assumption that each distinct 'type' of fast-sync analyzer uses a unique `SyncTag`.
// This assumption holds true at the moment.
runPreWork = true
}
wg.Add(1)
go func(an SyncedAnalyzer) {
defer wg.Done()

if runPreWork {
a.logger.Info("running pre-work for analyzer", "analyzer", an.Analyzer.Name())
if err := an.Analyzer.PreWork(ctx); err != nil {
a.logger.Error("fast-sync analyzer failed pre-work", "analyzer", an.Analyzer.Name(), "error", err.Error())
return
}
}

an.Analyzer.Start(ctx)
}(an)
}
Expand All @@ -727,6 +742,10 @@ func (a *Service) Start() {
case <-ctx.Done():
return
case <-util.ClosingChannel(prereqWg):
if err := an.Analyzer.PreWork(ctx); err != nil {
a.logger.Error("slow-sync analyzer failed pre-work", "analyzer", an.Analyzer.Name(), "error", err.Error())
return
}
an.Analyzer.Start(ctx)
}
}(an)
Expand Down
4 changes: 4 additions & 0 deletions cmd/analyzer/analyzer_sequencing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ var _ analyzer.Analyzer = (*DummyAnalyzer)(nil)

var finishLogLock = &sync.Mutex{} // for use by all DummyAnalyzer instances

func (a *DummyAnalyzer) PreWork(ctx context.Context) error {
return nil
}

func (a *DummyAnalyzer) Start(ctx context.Context) {
time.Sleep(a.duration)
finishLogLock.Lock()
Expand Down

0 comments on commit ca4ef6d

Please sign in to comment.