Skip to content

Commit

Permalink
support/storage: Add GetLatestLedgerSequence method to Archive interf…
Browse files Browse the repository at this point in the history
…ace (#5362)
  • Loading branch information
urvisavla authored Jun 28, 2024
1 parent 6fbc6e0 commit 27a3c2a
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 69 deletions.
6 changes: 3 additions & 3 deletions exp/services/ledgerexporter/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (config *Config) ValidateAndSetLedgerRange(ctx context.Context, archive his
return errors.New("invalid end value, must be greater than start")
}

latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(archive)
latestNetworkLedger = latestNetworkLedger + (datastore.GetHistoryArchivesCheckPointFrequency() * 2)
latestNetworkLedger, err := archive.GetLatestLedgerSequence()
latestNetworkLedger = latestNetworkLedger + (archive.GetCheckpointManager().GetCheckpointFrequency() * 2)

if err != nil {
return errors.Wrap(err, "Failed to retrieve the latest ledger sequence from history archives.")
Expand Down Expand Up @@ -189,7 +189,7 @@ func (config *Config) GenerateCaptiveCoreConfig(coreBinFromPath string) (ledgerb
BinaryPath: config.StellarCoreConfig.StellarCoreBinaryPath,
NetworkPassphrase: params.NetworkPassphrase,
HistoryArchiveURLs: params.HistoryArchiveURLs,
CheckpointFrequency: datastore.GetHistoryArchivesCheckPointFrequency(),
CheckpointFrequency: historyarchive.DefaultCheckpointFrequency,
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
UserAgent: "ledger-exporter",
Expand Down
28 changes: 19 additions & 9 deletions exp/services/ledgerexporter/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ import (
"fmt"
"testing"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"

"github.com/stretchr/testify/require"

"github.com/stellar/go/historyarchive"
)

func TestNewConfig(t *testing.T) {
ctx := context.Background()

mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 5}, nil).Once()
mockArchive.On("GetLatestLedgerSequence").Return(uint32(5), nil).Once()
mockArchive.On("GetCheckpointManager").
Return(historyarchive.NewCheckpointManager(
historyarchive.DefaultCheckpointFrequency)).Once()

config, err := NewConfig(
RuntimeSettings{StartLedger: 2, EndLedger: 3, ConfigFilePath: "test/test.toml", Mode: Append}, nil)
Expand Down Expand Up @@ -198,7 +199,7 @@ func TestInvalidCaptiveCoreTomlPath(t *testing.T) {

func TestValidateStartAndEndLedger(t *testing.T) {
latestNetworkLedger := uint32(20000)
latestNetworkLedgerPadding := datastore.GetHistoryArchivesCheckPointFrequency() * 2
latestNetworkLedgerPadding := historyarchive.DefaultCheckpointFrequency * 2

tests := []struct {
name string
Expand Down Expand Up @@ -282,7 +283,10 @@ func TestValidateStartAndEndLedger(t *testing.T) {

ctx := context.Background()
mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: latestNetworkLedger}, nil)
mockArchive.On("GetLatestLedgerSequence").Return(latestNetworkLedger, nil)
mockArchive.On("GetCheckpointManager").
Return(historyarchive.NewCheckpointManager(
historyarchive.DefaultCheckpointFrequency))

mockedHasCtr := 0
for _, tt := range tests {
Expand All @@ -302,7 +306,7 @@ func TestValidateStartAndEndLedger(t *testing.T) {
}
})
}
mockArchive.AssertNumberOfCalls(t, "GetRootHAS", mockedHasCtr)
mockArchive.AssertExpectations(t)
}

func TestAdjustedLedgerRangeBoundedMode(t *testing.T) {
Expand Down Expand Up @@ -358,7 +362,10 @@ func TestAdjustedLedgerRangeBoundedMode(t *testing.T) {

ctx := context.Background()
mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 500}, nil).Times(len(tests))
mockArchive.On("GetLatestLedgerSequence").Return(uint32(500), nil)
mockArchive.On("GetCheckpointManager").
Return(historyarchive.NewCheckpointManager(
historyarchive.DefaultCheckpointFrequency))

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -421,7 +428,10 @@ func TestAdjustedLedgerRangeUnBoundedMode(t *testing.T) {
ctx := context.Background()

mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: 500}, nil).Times(len(tests))
mockArchive.On("GetLatestLedgerSequence").Return(uint32(500), nil)
mockArchive.On("GetCheckpointManager").
Return(historyarchive.NewCheckpointManager(
historyarchive.DefaultCheckpointFrequency))

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type ArchiveInterface interface {
GetLedgerHeader(chk uint32) (xdr.LedgerHeaderHistoryEntry, error)
GetRootHAS() (HistoryArchiveState, error)
GetLedgers(start, end uint32) (map[uint32]*Ledger, error)
GetLatestLedgerSequence() (uint32, error)
GetCheckpointHAS(chk uint32) (HistoryArchiveState, error)
PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error
PutRootHAS(has HistoryArchiveState, opts *CommandOptions) error
Expand Down Expand Up @@ -176,6 +177,16 @@ func (a *Archive) PutPathHAS(path string, has HistoryArchiveState, opts *Command
return a.backend.PutFile(path, io.NopCloser(bytes.NewReader(buf)))
}

func (a *Archive) GetLatestLedgerSequence() (uint32, error) {
has, err := a.GetRootHAS()
if err != nil {
log.Error("Error getting root HAS from archive", err)
return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from history archive")
}

return has.CurrentLedger, nil
}

func (a *Archive) BucketExists(bucket Hash) (bool, error) {
return a.cachedExists(BucketPath(bucket))
}
Expand Down
10 changes: 10 additions & 0 deletions historyarchive/archive_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ func (pa *ArchivePool) GetCheckpointHAS(chk uint32) (HistoryArchiveState, error)
})
}

func (pa *ArchivePool) GetLatestLedgerSequence() (uint32, error) {
has, err := pa.GetRootHAS()
if err != nil {
log.Error("Error getting root HAS from archive", err)
return 0, errors.Wrap(err, "failed to retrieve the latest ledger sequence from history archive")
}

return has.CurrentLedger, nil
}

func (pa *ArchivePool) PutCheckpointHAS(chk uint32, has HistoryArchiveState, opts *CommandOptions) error {
return pa.runRoundRobin(func(ai ArchiveInterface) error {
return ai.PutCheckpointHAS(chk, has, opts)
Expand Down
5 changes: 5 additions & 0 deletions historyarchive/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ type MockArchive struct {
mock.Mock
}

func (m *MockArchive) GetLatestLedgerSequence() (uint32, error) {
a := m.Called()
return a.Get(0).(uint32), a.Error(1)
}

func (m *MockArchive) GetCheckpointManager() CheckpointManager {
a := m.Called()
return a.Get(0).(CheckpointManager)
Expand Down
53 changes: 0 additions & 53 deletions support/datastore/history_archive.go

This file was deleted.

8 changes: 6 additions & 2 deletions support/datastore/resumablemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,12 @@ func TestResumability(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockArchive := &historyarchive.MockArchive{}
mockArchive.On("GetRootHAS").Return(historyarchive.HistoryArchiveState{CurrentLedger: tt.latestLedger}, tt.archiveError).Once()

mockArchive.On("GetLatestLedgerSequence").Return(tt.latestLedger, tt.archiveError).Once()
if tt.archiveError == nil {
mockArchive.On("GetCheckpointManager").
Return(historyarchive.NewCheckpointManager(
historyarchive.DefaultCheckpointFrequency)).Once()
}
mockDataStore := &MockDataStore{}
tt.registerMockCalls(mockDataStore)

Expand Down
4 changes: 2 additions & 2 deletions support/datastore/resumeablemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ func (rm resumableManagerService) FindStart(ctx context.Context, start, end uint
networkLatest := uint32(0)
if end < 1 {
var latestErr error
networkLatest, latestErr = GetLatestLedgerSequenceFromHistoryArchives(rm.archive)
networkLatest, latestErr = rm.archive.GetLatestLedgerSequence()
if latestErr != nil {
err := errors.Wrap(latestErr, "Resumability of requested export ledger range, was not able to get latest ledger from network")
return 0, false, err
}
networkLatest = networkLatest + (GetHistoryArchivesCheckPointFrequency() * 2)
networkLatest = networkLatest + (rm.archive.GetCheckpointManager().GetCheckpointFrequency() * 2)
log.Infof("Resumability computed effective latest network ledger including padding of checkpoint frequency to be %d", networkLatest)

if start > networkLatest {
Expand Down

0 comments on commit 27a3c2a

Please sign in to comment.