Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Refactor captive core process manager #5360

Merged
merged 4 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,11 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error
)
}

c.stellarCoreRunner = c.stellarCoreRunnerFactory()
err = c.stellarCoreRunner.catchup(from, to)
if err != nil {
stellarCoreRunner := c.stellarCoreRunnerFactory()
if err = stellarCoreRunner.catchup(from, to); err != nil {
return errors.Wrap(err, "error running stellar-core")
}
c.stellarCoreRunner = stellarCoreRunner

// The next ledger should be the first ledger of the checkpoint containing
// the requested ledger
Expand All @@ -375,11 +375,11 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(ctx context.Context, fro
return errors.Wrap(err, "error calculating ledger and hash for stellar-core run")
}

c.stellarCoreRunner = c.stellarCoreRunnerFactory()
err = c.stellarCoreRunner.runFrom(runFrom, ledgerHash)
if err != nil {
stellarCoreRunner := c.stellarCoreRunnerFactory()
if err = stellarCoreRunner.runFrom(runFrom, ledgerHash); err != nil {
return errors.Wrap(err, "error running stellar-core")
}
c.stellarCoreRunner = stellarCoreRunner

// In the online mode we update nextLedger after streaming the first ledger.
// This is to support versions before and after/including v17.1.0 that
Expand Down Expand Up @@ -556,7 +556,7 @@ func (c *CaptiveStellarCore) isPrepared(ledgerRange Range) bool {
return false
}

if exited, _ := c.stellarCoreRunner.getProcessExitError(); exited {
if _, exited := c.stellarCoreRunner.getProcessExitError(); exited {
return false
}

Expand Down Expand Up @@ -627,9 +627,6 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
if c.stellarCoreRunner == nil {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core cannot be nil, call PrepareRange first")
}
if c.closed {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core has an error, call PrepareRange first")
}

if sequence < c.nextExpectedSequence() {
return xdr.LedgerCloseMeta{}, errors.Errorf(
Expand All @@ -647,12 +644,17 @@ func (c *CaptiveStellarCore) GetLedger(ctx context.Context, sequence uint32) (xd
)
}

ch, ok := c.stellarCoreRunner.getMetaPipe()
if !ok {
return xdr.LedgerCloseMeta{}, errors.New("stellar-core is not running, call PrepareRange first")
}

// Now loop along the range until we find the ledger we want.
for {
select {
case <-ctx.Done():
return xdr.LedgerCloseMeta{}, ctx.Err()
case result, ok := <-c.stellarCoreRunner.getMetaPipe():
case result, ok := <-ch:
found, ledger, err := c.handleMetaPipeResult(sequence, result, ok)
if found || err != nil {
return ledger, err
Expand Down Expand Up @@ -732,7 +734,7 @@ func (c *CaptiveStellarCore) checkMetaPipeResult(result metaResult, ok bool) err
return err
}
if !ok || result.err != nil {
exited, err := c.stellarCoreRunner.getProcessExitError()
err, exited := c.stellarCoreRunner.getProcessExitError()
if exited && err != nil {
// Case 2 - The stellar core process exited unexpectedly with an error message
return errors.Wrap(err, "stellar core exited unexpectedly")
Expand Down Expand Up @@ -775,12 +777,12 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
if c.stellarCoreRunner == nil {
return 0, errors.New("stellar-core cannot be nil, call PrepareRange first")
}
if c.closed {
return 0, errors.New("stellar-core is closed, call PrepareRange first")

ch, ok := c.stellarCoreRunner.getMetaPipe()
if !ok {
return 0, errors.New("stellar-core is not running, call PrepareRange first")
}
if c.lastLedger == nil {
return c.nextExpectedSequence() - 1 + uint32(len(c.stellarCoreRunner.getMetaPipe())), nil
return c.nextExpectedSequence() - 1 + uint32(len(ch)), nil
}
return *c.lastLedger, nil
}
Expand Down
64 changes: 31 additions & 33 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ func (m *stellarCoreRunnerMock) runFrom(from uint32, hash string) error {
return a.Error(0)
}

func (m *stellarCoreRunnerMock) getMetaPipe() <-chan metaResult {
func (m *stellarCoreRunnerMock) getMetaPipe() (<-chan metaResult, bool) {
a := m.Called()
return a.Get(0).(<-chan metaResult)
return a.Get(0).(<-chan metaResult), a.Bool(1)
}

func (m *stellarCoreRunnerMock) getProcessExitError() (bool, error) {
func (m *stellarCoreRunnerMock) getProcessExitError() (error, bool) {
a := m.Called()
return a.Bool(0), a.Error(1)
return a.Error(0), a.Bool(1)
}

func (m *stellarCoreRunnerMock) close() error {
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestCaptivePrepareRange(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -251,8 +251,8 @@ func TestCaptivePrepareRangeCrash(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("getProcessExitError").Return(true, errors.New("exit code -1"))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getProcessExitError").Return(errors.New("exit code -1"), true)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("close").Return(nil).Once()
mockRunner.On("context").Return(ctx)

Expand Down Expand Up @@ -292,7 +292,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Twice()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)

Expand Down Expand Up @@ -364,7 +364,7 @@ func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("close").Return(fmt.Errorf("transient error"))
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)
mockRunner.On("context").Return(ctx)

captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestCaptivePrepareRange_FromIsAheadOfRootHAS(t *testing.T) {
}

mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100)))
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestCaptivePrepareRangeWithDB_FromIsAheadOfRootHAS(t *testing.T) {
LedgerCloseMeta: &meta,
}
mockRunner.On("runFrom", uint32(99), "").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

assert.NoError(t, captiveBackend.PrepareRange(ctx, UnboundedRange(100)))
Expand Down Expand Up @@ -517,7 +517,6 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) {
func TestCaptivePrepareRange_ErrCatchup(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(100), uint32(192)).Return(errors.New("transient error")).Once()
mockRunner.On("close").Return(nil).Once()

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -552,7 +551,6 @@ func TestCaptivePrepareRange_ErrCatchup(t *testing.T) {
func TestCaptivePrepareRangeUnboundedRange_ErrRunFrom(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(126), "0000000000000000000000000000000000000000000000000000000000000000").Return(errors.New("transient error")).Once()
mockRunner.On("close").Return(nil).Once()

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -604,9 +602,9 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(64), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -653,7 +651,7 @@ func TestGetLatestLedgerSequence(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -699,7 +697,7 @@ func TestGetLatestLedgerSequenceRaceCondition(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("runFrom", mock.Anything, mock.Anything).Return(nil)

Expand Down Expand Up @@ -766,9 +764,9 @@ func TestCaptiveGetLedger(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down Expand Up @@ -857,7 +855,7 @@ func TestCaptiveGetLedgerCacheLatestLedger(t *testing.T) {
defer cancel()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(65), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -919,7 +917,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T)
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)

Expand Down Expand Up @@ -965,7 +963,7 @@ func TestCaptiveGetLedger_NextLedger0RangeFromIsSmallerThanLedgerFromBuffer(t *t
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(64), mock.Anything).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)

Expand Down Expand Up @@ -1067,13 +1065,13 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
ctx, cancel := context.WithCancel(ctx)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil).Run(func(args mock.Arguments) {
cancel()
}).Once()
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

// even if the request to fetch the latest checkpoint succeeds, we should fail at creating the subprocess
mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -1125,7 +1123,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(fmt.Errorf("transient error")).Once()

Expand Down Expand Up @@ -1167,7 +1165,7 @@ func TestCaptiveAfterClose(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil)
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil).Once()

Expand Down Expand Up @@ -1222,7 +1220,7 @@ func TestGetLedgerBoundsCheck(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(128), uint32(130)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -1346,9 +1344,9 @@ func TestCaptiveGetLedgerTerminatedUnexpectedly(t *testing.T) {
ctx := testCase.ctx
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("catchup", uint32(64), uint32(100)).Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("getProcessExitError").Return(testCase.processExited, testCase.processExitedError)
mockRunner.On("getProcessExitError").Return(testCase.processExitedError, testCase.processExited)
mockRunner.On("close").Return(nil).Once()

mockArchive := &historyarchive.MockArchive{}
Expand Down Expand Up @@ -1514,7 +1512,7 @@ func TestCaptiveRunFromParams(t *testing.T) {
func TestCaptiveIsPrepared(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("context").Return(context.Background()).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

// c.prepared == nil
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1578,7 +1576,7 @@ func TestCaptiveIsPreparedCoreContextCancelled(t *testing.T) {
mockRunner := &stellarCoreRunnerMock{}
ctx, cancel := context.WithCancel(context.Background())
mockRunner.On("context").Return(ctx).Maybe()
mockRunner.On("getProcessExitError").Return(false, nil)
mockRunner.On("getProcessExitError").Return(nil, false)

rang := UnboundedRange(100)
captiveBackend := CaptiveStellarCore{
Expand Down Expand Up @@ -1630,7 +1628,7 @@ func TestCaptivePreviousLedgerCheck(t *testing.T) {
ctx := context.Background()
mockRunner := &stellarCoreRunnerMock{}
mockRunner.On("runFrom", uint32(254), "0101010100000000000000000000000000000000000000000000000000000000").Return(nil).Once()
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan), true)
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil).Once()

Expand Down
Loading
Loading