Skip to content

Commit

Permalink
ingest/ledgerbackend: Improve thread-safety of stellarCoreRunner.clos…
Browse files Browse the repository at this point in the history
…e() (#5307)
  • Loading branch information
tamirms authored May 10, 2024
1 parent 38d28bb commit a4e5a3f
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 59 deletions.
6 changes: 0 additions & 6 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,6 @@ func (c *CaptiveStellarCore) startPreparingRange(ctx context.Context, ledgerRang
if err := c.stellarCoreRunner.close(); err != nil {
return false, errors.Wrap(err, "error closing existing session")
}

// Make sure Stellar-Core is terminated before starting a new instance.
processExited, _ := c.stellarCoreRunner.getProcessExitError()
if !processExited {
return false, errors.New("the previous Stellar-Core instance is still running")
}
}

var err error
Expand Down
2 changes: 0 additions & 2 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ func TestCaptivePrepareRangeCloseNotFullyTerminated(t *testing.T) {
mockRunner.On("getMetaPipe").Return((<-chan metaResult)(metaChan))
mockRunner.On("context").Return(ctx)
mockRunner.On("close").Return(nil)
mockRunner.On("getProcessExitError").Return(true, nil)
mockRunner.On("getProcessExitError").Return(false, nil)

mockArchive := &historyarchive.MockArchive{}
mockArchive.
Expand Down
2 changes: 1 addition & 1 deletion ingest/ledgerbackend/mock_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func simpleCommandMock() *mockCmd {
cmdMock.On("getStdout").Return(writer)
cmdMock.On("setStderr", mock.Anything)
cmdMock.On("getStderr").Return(writer)
cmdMock.On("getProcess").Return(&os.Process{})
cmdMock.On("getProcess").Return(&os.Process{}).Maybe()
cmdMock.On("setExtraFiles", mock.Anything)
cmdMock.On("Start").Return(nil)
return cmdMock
Expand Down
88 changes: 41 additions & 47 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/pkg/errors"

"github.com/stellar/go/protocols/stellarcore"
"github.com/stellar/go/support/log"
)
Expand Down Expand Up @@ -65,6 +66,7 @@ type stellarCoreRunner struct {
systemCaller systemCaller

lock sync.Mutex
closeOnce sync.Once
processExited bool
processExitError error

Expand Down Expand Up @@ -285,9 +287,6 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
r.lock.Lock()
defer r.lock.Unlock()

r.mode = stellarCoreRunnerModeOffline
r.storagePath = r.getFullStoragePath()

// check if we have already been closed
if r.ctx.Err() != nil {
return r.ctx.Err()
Expand All @@ -297,6 +296,9 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
return errors.New("runner already started")
}

r.mode = stellarCoreRunnerModeOffline
r.storagePath = r.getFullStoragePath()

rangeArg := fmt.Sprintf("%d/%d", to, to-from+1)
params := []string{"catchup", rangeArg, "--metadata-output-stream", r.getPipeName()}

Expand Down Expand Up @@ -350,9 +352,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
r.lock.Lock()
defer r.lock.Unlock()

r.mode = stellarCoreRunnerModeOnline
r.storagePath = r.getFullStoragePath()

// check if we have already been closed
if r.ctx.Err() != nil {
return r.ctx.Err()
Expand All @@ -362,6 +361,9 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
return errors.New("runner already started")
}

r.mode = stellarCoreRunnerModeOnline
r.storagePath = r.getFullStoragePath()

var err error

if r.useDB {
Expand Down Expand Up @@ -546,53 +548,45 @@ func (r *stellarCoreRunner) getProcessExitError() (bool, error) {
// the necessary cleanup on the resources associated with the captive core process
// close is both thread safe and idempotent
func (r *stellarCoreRunner) close() error {
r.lock.Lock()
started := r.started
storagePath := r.storagePath

r.storagePath = ""

// check if we have already closed
if storagePath == "" {
var closeError error
r.closeOnce.Do(func() {
r.lock.Lock()
// we cancel the context while holding the lock in order to guarantee that
// this captive core instance cannot start once the lock is released.
// catchup() and runFrom() can only execute while holding the lock and if
// the context is canceled both catchup() and runFrom() will abort early
// without performing any side effects (e.g. state mutations).
r.cancel()
r.lock.Unlock()
return nil
}

if !started {
// Update processExited if handleExit that updates it not even started
// (error before command run).
r.processExited = true
}

r.cancel()
r.lock.Unlock()
// only reap captive core sub process and related go routines if we've started
// otherwise, just cleanup the temp dir
if r.started {
// wait for the stellar core process to terminate
r.wg.Wait()

// only reap captive core sub process and related go routines if we've started
// otherwise, just cleanup the temp dir
if started {
// wait for the stellar core process to terminate
r.wg.Wait()
// drain meta pipe channel to make sure the ledger buffer goroutine exits
for range r.getMetaPipe() {

// drain meta pipe channel to make sure the ledger buffer goroutine exits
for range r.getMetaPipe() {
}

// now it's safe to close the pipe reader
// because the ledger buffer is no longer reading from it
r.pipe.Reader.Close()
}

// now it's safe to close the pipe reader
// because the ledger buffer is no longer reading from it
r.pipe.Reader.Close()
}

if r.mode != 0 && (runtime.GOOS == "windows" ||
(r.processExitError != nil && r.processExitError != context.Canceled) ||
r.mode == stellarCoreRunnerModeOffline) {
// It's impossible to send SIGINT on Windows so buckets can become
// corrupted. If we can't reuse it, then remove it.
// We also remove the storage path if there was an error terminating the
// process (files can be corrupted).
// We remove all files when reingesting to save disk space.
return r.systemCaller.removeAll(storagePath)
}
if r.mode != 0 && (runtime.GOOS == "windows" ||
(r.processExitError != nil && r.processExitError != context.Canceled) ||
r.mode == stellarCoreRunnerModeOffline) {
// It's impossible to send SIGINT on Windows so buckets can become
// corrupted. If we can't reuse it, then remove it.
// We also remove the storage path if there was an error terminating the
// process (files can be corrupted).
// We remove all files when reingesting to save disk space.
closeError = r.systemCaller.removeAll(r.storagePath)
return
}
})

return nil
return closeError
}
62 changes: 59 additions & 3 deletions ingest/ledgerbackend/stellar_core_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ledgerbackend
import (
"context"
"encoding/json"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -46,7 +47,7 @@ func TestCloseOffline(t *testing.T) {
"fd:3",
"--in-memory",
).Return(cmdMock)
scMock.On("removeAll", mock.Anything).Return(nil)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.catchup(100, 200))
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestCloseOnlineWithError(t *testing.T) {
"--metadata-output-stream",
"fd:3",
).Return(cmdMock)
scMock.On("removeAll", mock.Anything).Return(nil)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.runFrom(100, "hash"))
Expand All @@ -149,6 +150,61 @@ func TestCloseOnlineWithError(t *testing.T) {
assert.NoError(t, runner.close())
}

func TestCloseConcurrency(t *testing.T) {
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

captiveCoreToml.AddExamplePubnetValidators()

runner := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/usr/bin/stellar-core",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
StoragePath: "/tmp/captive-core",
})

cmdMock := simpleCommandMock()
cmdMock.On("Wait").Return(errors.New("wait error")).WaitUntil(time.After(time.Millisecond * 300))
defer cmdMock.AssertExpectations(t)

// Replace system calls with a mock
scMock := &mockSystemCaller{}
defer scMock.AssertExpectations(t)
scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil)
scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil)
scMock.On("command",
"/usr/bin/stellar-core",
"--conf",
mock.Anything,
"--console",
"catchup",
"200/101",
"--metadata-output-stream",
"fd:3",
"--in-memory",
).Return(cmdMock)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
runner.systemCaller = scMock

assert.NoError(t, runner.catchup(100, 200))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, runner.close())
exited, err := runner.getProcessExitError()
assert.True(t, exited)
assert.Error(t, err)
}()
}

wg.Wait()
}

func TestRunFromUseDBLedgersMatch(t *testing.T) {
captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)
Expand Down Expand Up @@ -300,7 +356,7 @@ func TestRunFromUseDBLedgersInFront(t *testing.T) {
scMock := &mockSystemCaller{}
defer scMock.AssertExpectations(t)
// Storage dir is removed because ledgers do not match
scMock.On("removeAll", mock.Anything).Return(nil)
scMock.On("removeAll", mock.Anything).Return(nil).Once()
scMock.On("stat", mock.Anything).Return(isDirImpl(true), nil)
scMock.On("writeFile", mock.Anything, mock.Anything, mock.Anything).Return(nil)
scMock.On("command",
Expand Down

0 comments on commit a4e5a3f

Please sign in to comment.