Skip to content

Commit

Permalink
Merge pull request #457 from oasisprotocol/mitjat/fast-sync-consensus
Browse files Browse the repository at this point in the history
consensus: fast-sync mode: Skip dead reckoning. Also bugfixes.
  • Loading branch information
mitjat authored Oct 3, 2023
2 parents 8369d4a + 82f9cfc commit c0b1933
Show file tree
Hide file tree
Showing 25 changed files with 2,400 additions and 980 deletions.
32 changes: 19 additions & 13 deletions analyzer/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const (
// Default number of blocks to be processed in a batch.
defaultBatchSize = 1_000
// Lock expire timeout for blocks (in minutes). Locked blocks not processed within
// this time can be picked again.
// this time can be picked again. Keep strictly > 1; the analyzer stops processing
// blocks before the lock expires, by a safety margin of 1 minute.
lockExpiryMinutes = 5
)

Expand Down Expand Up @@ -77,13 +78,13 @@ func (b *blockBasedAnalyzer) firstUnprocessedBlock(ctx context.Context) (first u
return
}

// unlockBlock unlocks a block.
func (b *blockBasedAnalyzer) unlockBlock(ctx context.Context, height uint64) {
// unlockBlocks unlocks the given blocks.
func (b *blockBasedAnalyzer) unlockBlocks(ctx context.Context, heights []uint64) {
rows, err := b.target.Query(
ctx,
queries.UnlockBlockForProcessing,
queries.UnlockBlocksForProcessing,
b.analyzerName,
height,
heights,
)
if err == nil {
rows.Close()
Expand Down Expand Up @@ -218,7 +219,7 @@ func (b *blockBasedAnalyzer) softEnqueueGapsInProcessedBlocks(ctx context.Contex
b.logger.Error("failed to soft-enqueue gaps in already-processed blocks", "err", err, "from", b.blockRange.From, "to", b.blockRange.To)
return err
}
b.logger.Error("ensured that any gaps in the already-processed block range can be picked up later", "from", b.blockRange.From, "to", b.blockRange.To)
b.logger.Info("ensured that any gaps in the already-processed block range can be picked up later", "from", b.blockRange.From, "to", b.blockRange.To)
return nil
}

Expand Down Expand Up @@ -246,12 +247,12 @@ func (b *blockBasedAnalyzer) ensureSlowSyncPrerequisites(ctx context.Context) (o
return false
}
if !precededBySlowSync {
b.logger.Error("finalizing the work of previous fast-sync analyzer(s)", "last_fast_sync_height", maxProcessedHeight)
b.logger.Info("finalizing the work of previous fast-sync analyzer(s)", "last_fast_sync_height", maxProcessedHeight)
if err := b.processor.FinalizeFastSync(ctx, maxProcessedHeight); err != nil {
b.logger.Error("failed to finalize the fast-sync phase (i.e. download genesis or similar)", "err", err)
return false
}
b.logger.Error("fast-sync finalization complete; proceeding with regular slow-sync analysis")
b.logger.Info("fast-sync finalization complete; proceeding with regular slow-sync analysis")
}

return true
Expand Down Expand Up @@ -309,7 +310,9 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
b.logger.Warn("shutting down block analyzer", "reason", ctx.Err())
return
}
batchCtx, batchCtxCancel = context.WithTimeout(ctx, lockExpiryMinutes*time.Minute)
// The context for processing the batch of blocks is shorter than the lock expiry.
// This is to ensure that the batch is processed before the locks expire.
batchCtx, batchCtxCancel = context.WithTimeout(ctx, (lockExpiryMinutes-1)*time.Minute)

// Pick a batch of blocks to process.
b.logger.Info("picking a batch of blocks to process", "from", b.blockRange.From, "to", to, "is_fast_sync", !b.slowSync)
Expand All @@ -329,7 +332,7 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// blocks that are not yet available. In this case, wait before processing every block,
// so that the backoff mechanism can tweak the per-block wait time as needed.
//
// Note: If the batch size is greater than 50, the time required to process the blocks
// Note: If the batch size is too large, the time required to process the blocks
// in the batch will exceed the current lock expiry of 5min. The analyzer will terminate
// the batch early and attempt to refresh the locks for a new batch.
if b.slowSync {
Expand All @@ -338,10 +341,13 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {
// Process the next block
case <-batchCtx.Done():
b.logger.Info("batch locks expiring; refreshing batch")
b.unlockBlocks(ctx, heights) // Locks are _about_ to expire, but are not expired yet. Unlock explicitly so blocks can be grabbed sooner.
break
case <-ctx.Done():
batchCtxCancel()
b.logger.Warn("shutting down block analyzer", "reason", ctx.Err())
b.unlockBlocks(ctx, heights) // Give others a chance to process our blocks even before their locks implicitly expire
b.logger.Info("unlocked db rows", "heights", heights)
return
}
}
Expand All @@ -363,13 +369,13 @@ func (b *blockBasedAnalyzer) Start(ctx context.Context) {

// If running in slow-sync, stop processing the batch on error so that
// the blocks are always processed in order.
if b.slowSync {
// Also stop processing the batch if the batch context is done (= "expired").
if b.slowSync || batchCtx.Err() != nil {
break
}

// Unlock a failed block, so it can be retried sooner.
// TODO: Could add a hook to unlock all remaining blocks in the batch on graceful shutdown.
b.unlockBlock(ctx, height)
b.unlockBlocks(ctx, []uint64{height})
continue
}
cancel()
Expand Down
Loading

0 comments on commit c0b1933

Please sign in to comment.