Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc fixes #5352

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion exp/services/ledgerexporter/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (a *App) init(ctx context.Context, runtimeSettings RuntimeSettings) error {
if a.config, err = NewConfig(runtimeSettings); err != nil {
return errors.Wrap(err, "Could not load configuration")
}
if archive, err = a.config.GenerateHistoryArchive(ctx); err != nil {
if archive, err = a.config.GenerateHistoryArchive(ctx, logger); err != nil {
return err
}
if err = a.config.ValidateAndSetLedgerRange(ctx, archive); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion exp/services/ledgerexporter/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"

"github.com/pelletier/go-toml"

Expand Down Expand Up @@ -142,12 +143,13 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his
return nil
}

func (config *Config) GenerateHistoryArchive(ctx context.Context) (historyarchive.ArchiveInterface, error) {
func (config *Config) GenerateHistoryArchive(ctx context.Context, entry *log.Entry) (historyarchive.ArchiveInterface, error) {
return historyarchive.NewArchivePool(config.StellarCoreConfig.HistoryArchiveUrls, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: config.UserAgent,
Context: ctx,
},
Logger: logger,
})
}

Expand Down
4 changes: 2 additions & 2 deletions exp/services/ledgerexporter/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGenerateHistoryArchiveFromPreconfiguredNetwork(t *testing.T) {
RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/valid_captive_core_preconfigured.toml", Mode: Append})
require.NoError(t, err)

_, err = config.GenerateHistoryArchive(ctx)
_, err = config.GenerateHistoryArchive(ctx, nil)
require.NoError(t, err)
}

Expand All @@ -56,7 +56,7 @@ func TestGenerateHistoryArchiveFromManulConfiguredNetwork(t *testing.T) {
RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/valid_captive_core_manual.toml", Mode: Append})
require.NoError(t, err)

_, err = config.GenerateHistoryArchive(ctx)
_, err = config.GenerateHistoryArchive(ctx, nil)
require.NoError(t, err)
}

Expand Down
2 changes: 2 additions & 0 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/stellar/go/support/errors"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"
)
Expand All @@ -43,6 +44,7 @@ type CommandOptions struct {
type ArchiveOptions struct {
storage.ConnectOptions

Logger *supportlog.Entry
// NetworkPassphrase defines the expected network of history archive. It is
// checked when getting HAS. If network passphrase does not match, error is
// returned.
Expand Down
7 changes: 5 additions & 2 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/pkg/errors"

log "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

Expand All @@ -19,6 +20,7 @@ import (
// An ArchivePool is just a collection of `ArchiveInterface`s so that we can
// distribute requests fairly throughout the pool.
type ArchivePool struct {
logger *log.Entry
backoff backoff.BackOff
pool []ArchiveInterface
curr int
Expand Down Expand Up @@ -46,6 +48,7 @@ func NewArchivePoolWithBackoff(archiveURLs []string, opts ArchiveOptions, strate
ap := ArchivePool{
pool: make([]ArchiveInterface, 0, len(archiveURLs)),
backoff: strategy,
logger: opts.Logger,
}
var lastErr error

Expand Down Expand Up @@ -107,8 +110,8 @@ func (pa *ArchivePool) runRoundRobin(runner func(ai ArchiveInterface) error) err
}

// Intentionally avoid logging context errors
if stats := ai.GetStats(); len(stats) > 0 {
log.WithField("error", err).Warnf(
if stats := ai.GetStats(); len(stats) > 0 && pa.logger != nil {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
pa.logger.WithField("error", err).Warnf(
"Encountered an error with archive '%s'",
stats[0].GetBackendName())
}
Expand Down
7 changes: 4 additions & 3 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func NewCaptive(config CaptiveCoreConfig) (*CaptiveStellarCore, error) {
archivePool, err := historyarchive.NewArchivePool(
config.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
Logger: config.Log,
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Expand Down Expand Up @@ -780,14 +781,14 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence(ctx context.Context) (uint3
// all subsequent calls to PrepareRange(), GetLedger(), etc will fail.
// Close is thread-safe and can be called from another go routine.
func (c *CaptiveStellarCore) Close() error {
// after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail
c.cancel()

c.stellarCoreLock.RLock()
defer c.stellarCoreLock.RUnlock()

c.closed = true

// after the CaptiveStellarCore context is canceled all subsequent calls to PrepareRange() will fail
c.cancel()

// TODO: Sucks to ignore the error here, but no worse than it was before,
// so...
if c.ledgerHashStore != nil {
Expand Down
26 changes: 14 additions & 12 deletions ingest/ledgerbackend/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ledgerbackend

import (
"io"
"io/fs"
"io/ioutil"
"os"
Expand Down Expand Up @@ -40,7 +39,7 @@ func (realSystemCaller) stat(name string) (isDir, error) {

func (realSystemCaller) command(name string, arg ...string) cmdI {
cmd := exec.Command(name, arg...)
return &realCmd{cmd}
return &realCmd{Cmd: cmd}
}

type cmdI interface {
Expand All @@ -49,36 +48,39 @@ type cmdI interface {
Start() error
Run() error
setDir(dir string)
setStdout(stdout io.Writer)
getStdout() io.Writer
setStderr(stderr io.Writer)
getStderr() io.Writer
setStdout(stdout *logLineWriter)
getStdout() *logLineWriter
setStderr(stderr *logLineWriter)
getStderr() *logLineWriter
getProcess() *os.Process
setExtraFiles([]*os.File)
}

type realCmd struct {
*exec.Cmd
stdout, stderr *logLineWriter
2opremio marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *realCmd) setDir(dir string) {
r.Cmd.Dir = dir
}

func (r *realCmd) setStdout(stdout io.Writer) {
func (r *realCmd) setStdout(stdout *logLineWriter) {
r.stdout = stdout
r.Cmd.Stdout = stdout
}

func (r *realCmd) getStdout() io.Writer {
return r.Cmd.Stdout
func (r *realCmd) getStdout() *logLineWriter {
return r.stdout
}

func (r *realCmd) setStderr(stderr io.Writer) {
func (r *realCmd) setStderr(stderr *logLineWriter) {
r.stderr = stderr
r.Cmd.Stderr = stderr
}

func (r *realCmd) getStderr() io.Writer {
return r.Cmd.Stderr
func (r *realCmd) getStderr() *logLineWriter {
return r.stderr
}

func (r *realCmd) getProcess() *os.Process {
Expand Down
17 changes: 9 additions & 8 deletions ingest/ledgerbackend/mock_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ func (m *mockCmd) setDir(dir string) {
m.Called(dir)
}

func (m *mockCmd) setStdout(stdout io.Writer) {
func (m *mockCmd) setStdout(stdout *logLineWriter) {
m.Called(stdout)
}

func (m *mockCmd) getStdout() io.Writer {
func (m *mockCmd) getStdout() *logLineWriter {
args := m.Called()
return args.Get(0).(io.Writer)
return args.Get(0).(*logLineWriter)
}

func (m *mockCmd) setStderr(stderr io.Writer) {
func (m *mockCmd) setStderr(stderr *logLineWriter) {
m.Called(stderr)
}

func (m *mockCmd) getStderr() io.Writer {
func (m *mockCmd) getStderr() *logLineWriter {
args := m.Called()
return args.Get(0).(io.Writer)
return args.Get(0).(*logLineWriter)
}

func (m *mockCmd) getProcess() *os.Process {
Expand All @@ -64,12 +64,13 @@ func (m *mockCmd) setExtraFiles(files []*os.File) {

func simpleCommandMock() *mockCmd {
_, writer := io.Pipe()
llw := logLineWriter{pipeWriter: writer}
cmdMock := &mockCmd{}
cmdMock.On("setDir", mock.Anything)
cmdMock.On("setStdout", mock.Anything)
cmdMock.On("getStdout").Return(writer)
cmdMock.On("getStdout").Return(&llw)
cmdMock.On("setStderr", mock.Anything)
cmdMock.On("getStderr").Return(writer)
cmdMock.On("getStderr").Return(&llw)
cmdMock.On("getProcess").Return(&os.Process{}).Maybe()
cmdMock.On("setExtraFiles", mock.Anything)
cmdMock.On("Start").Return(nil)
Expand Down
29 changes: 24 additions & 5 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,32 @@ func (r *stellarCoreRunner) getConfFileName() string {
return path
}

func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
type logLineWriter struct {
pipeWriter *io.PipeWriter
wg sync.WaitGroup
}

func (l *logLineWriter) Write(p []byte) (n int, err error) {
return l.pipeWriter.Write(p)
}

func (l *logLineWriter) Close() error {
err := l.pipeWriter.Close()
l.wg.Wait()
return err
}

func (r *stellarCoreRunner) getLogLineWriter() *logLineWriter {
rd, wr := io.Pipe()
br := bufio.NewReader(rd)

result := &logLineWriter{
pipeWriter: wr,
}
// Strip timestamps from log lines from captive stellar-core. We emit our own.
dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `)
result.wg.Add(1)
go func() {
defer result.wg.Done()
levelRx := regexp.MustCompile(`\[(\w+) ([A-Z]+)\] (.*)`)
for {
line, err := br.ReadString('\n')
Expand Down Expand Up @@ -238,7 +257,7 @@ func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
}
}
}()
return wr
return result
}

func (r *stellarCoreRunner) offlineInfo() (stellarcore.InfoResponse, error) {
Expand Down Expand Up @@ -526,8 +545,8 @@ func (r *stellarCoreRunner) handleExit() {

// closeLogLineWriters closes the go routines created by getLogLineWriter()
func (r *stellarCoreRunner) closeLogLineWriters(cmd cmdI) {
cmd.getStdout().(*io.PipeWriter).Close()
cmd.getStderr().(*io.PipeWriter).Close()
cmd.getStdout().Close()
cmd.getStderr().Close()
}

// getMetaPipe returns a channel which contains ledgers streamed from the captive core subprocess
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func NewSystem(config Config) (System, error) {
archive, err := historyarchive.NewArchivePool(
config.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
Logger: log.WithField("subservice", "archive"),
NetworkPassphrase: config.NetworkPassphrase,
CheckpointFrequency: config.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Expand Down
7 changes: 5 additions & 2 deletions support/datastore/history_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package datastore
import (
"context"

log "github.com/sirupsen/logrus"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
)

Expand All @@ -15,7 +17,7 @@ const (
Testnet = "testnet"
)

func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string) (historyarchive.ArchiveInterface, error) {
func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string, userAgent string, logger *supportlog.Entry) (historyarchive.ArchiveInterface, error) {
var historyArchiveUrls []string
switch networkName {
case Pubnet:
Expand All @@ -27,6 +29,7 @@ func CreateHistoryArchiveFromNetworkName(ctx context.Context, networkName string
}

return historyarchive.NewArchivePool(historyArchiveUrls, historyarchive.ArchiveOptions{
Logger: logger,
ConnectOptions: storage.ConnectOptions{
UserAgent: userAgent,
Context: ctx,
Expand Down
24 changes: 1 addition & 23 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,7 @@ func (s *Session) context(requestCtx context.Context) (context.Context, context.

// Begin binds this session to a new transaction.
func (s *Session) Begin(ctx context.Context) error {
if s.tx != nil {
return errors.New("already in transaction")
}
ctx, cancel, err := s.context(ctx)
if err != nil {
return err
}

tx, err := s.DB.BeginTxx(ctx, nil)
if err != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
cancel()
return knownErr
}

cancel()
return errors.Wrap(err, "beginx failed")
}
log.Debug("sql: begin")
s.tx = tx
s.txOptions = nil
s.txCancel = cancel
return nil
return s.BeginTx(ctx, nil)
}

// BeginTx binds this session to a new transaction which is configured with the
Expand Down
Loading