Skip to content

Commit

Permalink
Fix error transition in resume state (#5132)
Browse files Browse the repository at this point in the history
If there is an error rebuilding trade aggregation buckets
we should retry ingesting the ledger. The previous code
terminated ingestion entirely when encountering a
rebuild trade aggregations error.
  • Loading branch information
tamirms authored Nov 29, 2023
1 parent f65bf8a commit d5e0726
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
2 changes: 1 addition & 1 deletion services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (r resumeState) run(s *system) (transition, error) {
rebuildStart := time.Now()
err = s.historyQ.RebuildTradeAggregationBuckets(s.ctx, ingestLedger, ingestLedger, s.config.RoundingSlippageFilter)
if err != nil {
return stop(), errors.Wrap(err, "error rebuilding trade aggregations")
return retryResume(r), errors.Wrap(err, "error rebuilding trade aggregations")
}
rebuildDuration := time.Since(rebuildStart).Seconds()
s.Metrics().LedgerIngestionTradeAggregationDuration.Observe(float64(rebuildDuration))
Expand Down
30 changes: 30 additions & 0 deletions services/horizon/internal/ingest/resume_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,36 @@ func (s *ResumeTestTestSuite) TestErrorSettingCursorIgnored() {
)
}

func (s *ResumeTestTestSuite) TestRebuildTradeAggregationBucketsError() {
s.historyQ.On("Begin", s.ctx).Return(nil).Once()
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once()
s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once()
s.historyQ.On("GetLatestHistoryLedger", s.ctx).Return(uint32(100), nil)

s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")).
Run(func(args mock.Arguments) {
meta := args.Get(0).(xdr.LedgerCloseMeta)
s.Assert().Equal(uint32(101), meta.LedgerSequence())
}).
Return(
ledgerStats{},
nil,
).Once()

s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).
Return(errors.New("transient error")).Once()

next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system)
s.Assert().EqualError(err, "error rebuilding trade aggregations: transient error")
s.Assert().Equal(
transition{
node: resumeState{latestSuccessfullyProcessedLedger: 100},
sleepDuration: defaultSleep,
},
next,
)
}

func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() {
s.historyQ.On("Begin", s.ctx).Return(nil).Once()
s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once()
Expand Down

0 comments on commit d5e0726

Please sign in to comment.