diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 50e933bb6a..25b22d01d5 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -452,12 +452,6 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang if err := c.stellarCoreRunner.close(); err != nil { return false, errors.Wrap(err, "error closing existing session") } - - // Make sure Stellar-Core is terminated before starting a new instance. - processExited, _ := c.stellarCoreRunner.getProcessExitError() - if !processExited { - return false, errors.New("the previous Stellar-Core instance is still running") - } } var err error diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 5178fd97a1..a367f560f1 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -302,8 +302,6 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) { mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan)) mockRunner.On("context").Return(ctx) mockRunner.On("close").Return(nil) - mockRunner.On("getProcessExitError").Return(true, nil) - mockRunner.On("getProcessExitError").Return(false, nil) mockArchive := &historyarchive.MockArchive{} mockArchive. diff --git a/ingest/ledgerbackend/mock_cmd_test.go b/ingest/ledgerbackend/mock_cmd_test.go index a1d280421a..a28b6c8d01 100644 --- a/ingest/ledgerbackend/mock_cmd_test.go +++ b/ingest/ledgerbackend/mock_cmd_test.go @@ -70,7 +70,7 @@ func simpleCommandMock() *mockCmd { cmdMock.On("getStdout").Return(writer) cmdMock.On("setStderr", mock.Anything) cmdMock.On("getStderr").Return(writer) - cmdMock.On("getProcess").Return(&os.Process{}) + cmdMock.On("getProcess").Return(&os.Process{}).Maybe() cmdMock.On("setExtraFiles", mock.Anything) cmdMock.On("Start").Return(nil) return cmdMock diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 1c2c09c4a6..7f883b69c5 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/stellar/go/protocols/stellarcore" "github.com/stellar/go/support/log" ) @@ -65,6 +66,7 @@ type stellarCoreRunner struct { systemCaller systemCaller lock sync.Mutex + closeOnce sync.Once processExited bool processExitError error @@ -285,9 +287,6 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { r.lock.Lock() defer r.lock.Unlock() - r.mode = stellarCoreRunnerModeOffline - r.storagePath = r.getFullStoragePath() - // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() @@ -297,6 +296,9 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { return errors.New("runner already started") } + r.mode = stellarCoreRunnerModeOffline + r.storagePath = r.getFullStoragePath() + rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()} @@ -350,9 +352,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { r.lock.Lock() defer r.lock.Unlock() - r.mode = stellarCoreRunnerModeOnline - r.storagePath = r.getFullStoragePath() - // check if we have already been closed if r.ctx.Err() != nil { return r.ctx.Err() @@ -362,6 +361,9 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { return errors.New("runner already started") } + r.mode = stellarCoreRunnerModeOnline + r.storagePath = r.getFullStoragePath() + var err error if r.useDB { @@ -546,53 +548,45 @@ func (r *stellarCoreRunner) getProcessExitError() (bool, error) { // the necessary cleanup on the resources associated with the captive core process // close is both thread safe and idempotent func (r *stellarCoreRunner) close() error { - r.lock.Lock() - started := r.started - storagePath := r.storagePath - - r.storagePath = "" - - // check if we have already closed - if storagePath == "" { + var closeError error + r.closeOnce.Do(func() { + r.lock.Lock() + // we cancel the context while holding the lock in order to guarantee that + // this captive core instance cannot start once the lock is released. + // catchup() and runFrom() can only execute while holding the lock and if + // the context is canceled both catchup() and runFrom() will abort early + // without performing any side effects (e.g. state mutations). + r.cancel() r.lock.Unlock() - return nil - } - if !started { - // Update processExited if handleExit that updates it not even started - // (error before command run). - r.processExited = true - } - - r.cancel() - r.lock.Unlock() + // only reap captive core sub process and related go routines if we've started + // otherwise, just cleanup the temp dir + if r.started { + // wait for the stellar core process to terminate + r.wg.Wait() - // only reap captive core sub process and related go routines if we've started - // otherwise, just cleanup the temp dir - if started { - // wait for the stellar core process to terminate - r.wg.Wait() + // drain meta pipe channel to make sure the ledger buffer goroutine exits + for range r.getMetaPipe() { - // drain meta pipe channel to make sure the ledger buffer goroutine exits - for range r.getMetaPipe() { + } + // now it's safe to close the pipe reader + // because the ledger buffer is no longer reading from it + r.pipe.Reader.Close() } - // now it's safe to close the pipe reader - // because the ledger buffer is no longer reading from it - r.pipe.Reader.Close() - } - - if r.mode != 0 && (runtime.GOOS == "windows" || - (r.processExitError != nil && r.processExitError != context.Canceled) || - r.mode == stellarCoreRunnerModeOffline) { - // It's impossible to send SIGINT on Windows so buckets can become - // corrupted. If we can't reuse it, then remove it. - // We also remove the storage path if there was an error terminating the - // process (files can be corrupted). - // We remove all files when reingesting to save disk space. - return r.systemCaller.removeAll(storagePath) - } + if r.mode != 0 && (runtime.GOOS == "windows" || + (r.processExitError != nil && r.processExitError != context.Canceled) || + r.mode == stellarCoreRunnerModeOffline) { + // It's impossible to send SIGINT on Windows so buckets can become + // corrupted. If we can't reuse it, then remove it. + // We also remove the storage path if there was an error terminating the + // process (files can be corrupted). + // We remove all files when reingesting to save disk space. + closeError = r.systemCaller.removeAll(r.storagePath) + return + } + }) - return nil + return closeError } diff --git a/ingest/ledgerbackend/stellar_core_runner_test.go b/ingest/ledgerbackend/stellar_core_runner_test.go index 60871922b7..00cb29137b 100644 --- a/ingest/ledgerbackend/stellar_core_runner_test.go +++ b/ingest/ledgerbackend/stellar_core_runner_test.go @@ -3,6 +3,7 @@ package ledgerbackend import ( "context" "encoding/json" + "sync" "testing" "time" @@ -46,7 +47,7 @@ func TestCloseOffline(t *testing.T) { "fd:3", "--in-memory", ).Return(cmdMock) - scMock.On("removeAll", mock.Anything).Return(nil) + scMock.On("removeAll", mock.Anything).Return(nil).Once() runner.systemCaller = scMock assert.NoError(t, runner.catchup(100, 200)) @@ -133,7 +134,7 @@ func TestCloseOnlineWithError(t *testing.T) { "--metadata-output-stream", "fd:3", ).Return(cmdMock) - scMock.On("removeAll", mock.Anything).Return(nil) + scMock.On("removeAll", mock.Anything).Return(nil).Once() runner.systemCaller = scMock assert.NoError(t, runner.runFrom(100, "hash")) @@ -149,6 +150,61 @@ func TestCloseOnlineWithError(t *testing.T) { assert.NoError(t, runner.close()) } +func TestCloseConcurrency(t *testing.T) { + captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) + assert.NoError(t, err) + + captiveCoreToml.AddExamplePubnetValidators() + + runner := newStellarCoreRunner(CaptiveCoreConfig{ + BinaryPath: "/usr/bin/stellar-core", + HistoryArchiveURLs: []string{"http://localhost"}, + Log: log.New(), + Context: context.Background(), + Toml: captiveCoreToml, + StoragePath: "/tmp/captive-core", + }) + + cmdMock := simpleCommandMock() + cmdMock.On("Wait").Return(errors.New("wait error")).WaitUntil(time.After(time.Millisecond * 300)) + defer cmdMock.AssertExpectations(t) + + // Replace system calls with a mock + scMock := &mockSystemCaller{} + defer scMock.AssertExpectations(t) + scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) + scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) + scMock.On("command", + "/usr/bin/stellar-core", + "--conf", + mock.Anything, + "--console", + "catchup", + "200/101", + "--metadata-output-stream", + "fd:3", + "--in-memory", + ).Return(cmdMock) + scMock.On("removeAll", mock.Anything).Return(nil).Once() + runner.systemCaller = scMock + + assert.NoError(t, runner.catchup(100, 200)) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, runner.close()) + exited, err := runner.getProcessExitError() + assert.True(t, exited) + assert.Error(t, err) + }() + } + + wg.Wait() +} + func TestRunFromUseDBLedgersMatch(t *testing.T) { captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{}) assert.NoError(t, err) @@ -300,7 +356,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) { scMock := &mockSystemCaller{} defer scMock.AssertExpectations(t) // Storage dir is removed because ledgers do not match - scMock.On("removeAll", mock.Anything).Return(nil) + scMock.On("removeAll", mock.Anything).Return(nil).Once() scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil) scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil) scMock.On("command",