Skip to content

Commit

Permalink
Merge pull request #5624 from oasisprotocol/ptrus/stable/22.2.x/backp…
Browse files Browse the repository at this point in the history
…ort-5622

[BACKPORT/22.2.x] go/archive: fix runtime queries on archive node
  • Loading branch information
ptrus authored Apr 9, 2024
2 parents b847b90 + 99761ad commit c472f9e
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 11 deletions.
4 changes: 4 additions & 0 deletions .changelog/5622.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/archive: fix runtime queries on archive nodes

Fixes storage worker initialization on archive nodes which was causing runtime
queries to fail.
62 changes: 61 additions & 1 deletion go/consensus/tendermint/full/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
tmcore "github.com/tendermint/tendermint/rpc/core"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
tmtypes "github.com/tendermint/tendermint/types"
tdbm "github.com/tendermint/tm-db"

"github.com/oasisprotocol/oasis-core/go/common/identity"
Expand All @@ -36,6 +37,7 @@ type archiveService struct {
*commonNode

abciClient abcicli.Client
eb *tmtypes.EventBus

quitCh chan struct{}

Expand All @@ -48,6 +50,10 @@ func (srv *archiveService) Start() error {
return fmt.Errorf("tendermint: service already started")
}

if err := srv.eb.Start(); err != nil {
return err
}

if err := srv.commonNode.start(); err != nil {
return err
}
Expand All @@ -69,6 +75,16 @@ func (srv *archiveService) Start() error {
}
}()

// Start command dispatchers for all the service clients.
srv.serviceClientsWg.Add(len(srv.serviceClients))
for _, svc := range srv.serviceClients {
svc := svc
go func(svc api.ServiceClient) {
defer srv.serviceClientsWg.Done()
srv.serviceClientWorker(srv.ctx, svc)
}(svc)
}

srv.commonNode.finishStart()

return nil
Expand Down Expand Up @@ -212,6 +228,7 @@ func NewArchive(
return nil, err
}

srv.eb = tmtypes.NewEventBus()
// Setup minimal tendermint environment needed to support consensus queries.
tmcore.SetEnvironment(&tmcore.Environment{
ProxyAppQuery: tmproxy.NewAppConnQuery(srv.abciClient),
Expand All @@ -223,7 +240,7 @@ func NewArchive(
GenDoc: tmGenDoc,
Logger: logger,
Config: *tmConfig.RPC,
EventBus: nil,
EventBus: srv.eb,
P2PPeers: nil,
P2PTransport: nil,
PubKey: nil,
Expand All @@ -235,3 +252,46 @@ func NewArchive(

return srv, srv.initialize()
}

// serviceClientWorker handles command dispatching.
func (srv *archiveService) serviceClientWorker(ctx context.Context, svc api.ServiceClient) {
sd := svc.ServiceDescriptor()
if sd == nil {
// Some services don't actually need a worker.
return
}

// Archive only handles commands.
cmdCh := sd.Commands()
if cmdCh == nil {
// Services without commands do not need a worker.
return
}

logger := srv.Logger.With("service", sd.Name())
logger.Info("starting command dispatcher")

// Fetch and remember the latest block. This won't change on an archive node.
latestBlock, err := srv.commonNode.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
logger.Error("failed to fetch latest block",
"err", err,
)
return
}

// Service client event loop.
for {
select {
case <-ctx.Done():
return
case cmd := <-cmdCh:
if err := svc.DeliverCommand(ctx, latestBlock.Height, cmd); err != nil {
logger.Error("failed to deliver command to service client",
"err", err,
)
continue
}
}
}
}
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func doMigrate(cmd *cobra.Command, args []string) error {
err := func() error {
runtimeDir := registry.GetRuntimeStateDir(dataDir, rt)

history, err := history.New(runtimeDir, rt, nil, false)
history, err := history.New(runtimeDir, rt, nil, false, false)
if err != nil {
return fmt.Errorf("error creating history provider: %w", err)
}
Expand Down
24 changes: 24 additions & 0 deletions go/oasis-test-runner/scenario/e2e/runtime/archive_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/runtime/client/api"
)

Expand Down Expand Up @@ -278,6 +279,29 @@ func (sc *archiveAPI) testArchiveAPI(ctx context.Context, archiveCtrl *oasis.Con
return fmt.Errorf("runtime WatchBlocks: %w", err)
}
defer sub.Close()

// Temporary configure the archive as the client controller.
clientCtrl := sc.Net.ClientController()
sc.Net.SetClientController(archiveCtrl)
defer func() {
sc.Net.SetClientController(clientCtrl)
}()

// Test runtime client query.
sc.Logger.Info("testing runtime client query")
rsp, err := sc.submitKeyValueRuntimeGetQuery(
ctx,
runtimeID,
"my_key",
roothash.RoundLatest,
)
if err != nil {
return fmt.Errorf("failed to query runtime: %w", err)
}
if rsp != "my_value" {
return fmt.Errorf("response does not have expected value (got: '%v', expected: '%v')", rsp, "my_value")
}

return nil
}

Expand Down
9 changes: 8 additions & 1 deletion go/runtime/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type runtimeHistory struct {
lastStorageSyncedRound uint64

haveLocalStorageWorker bool
isArchive bool

pruner Pruner
pruneInterval time.Duration
Expand Down Expand Up @@ -176,6 +177,11 @@ func (h *runtimeHistory) ConsensusCheckpoint(height int64) error {
}

func (h *runtimeHistory) StorageSyncCheckpoint(ctx context.Context, round uint64) error {
if h.isArchive {
// If we are in archive mode, ignore storage sync checkpoints.
return nil
}

if !h.haveLocalStorageWorker {
panic("received storage sync checkpoint when local storage worker is disabled")
}
Expand Down Expand Up @@ -376,7 +382,7 @@ func (h *runtimeHistory) pruneWorker() {
}

// New creates a new runtime history keeper.
func New(dataDir string, runtimeID common.Namespace, cfg *Config, haveLocalStorageWorker bool) (History, error) {
func New(dataDir string, runtimeID common.Namespace, cfg *Config, haveLocalStorageWorker bool, isArchive bool) (History, error) {
db, err := newDB(filepath.Join(dataDir, DbFilename), runtimeID)
if err != nil {
return nil, err
Expand All @@ -402,6 +408,7 @@ func New(dataDir string, runtimeID common.Namespace, cfg *Config, haveLocalStora
cancelCtx: cancelCtx,
db: db,
haveLocalStorageWorker: haveLocalStorageWorker,
isArchive: isArchive,
blocksNotifier: pubsub.NewBroker(true),
pruner: pruner,
pruneInterval: cfg.PruneInterval,
Expand Down
14 changes: 7 additions & 7 deletions go/runtime/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestHistory(t *testing.T) {
runtimeID := common.NewTestNamespaceFromSeed([]byte("history test ns 1"), 0)
runtimeID2 := common.NewTestNamespaceFromSeed([]byte("history test ns 2"), 0)

history, err := New(dataDir, runtimeID, NewDefaultConfig(), true)
history, err := New(dataDir, runtimeID, NewDefaultConfig(), true, false)
require.NoError(err, "New")

require.Equal(runtimeID, history.RuntimeID())
Expand Down Expand Up @@ -141,10 +141,10 @@ func TestHistory(t *testing.T) {

// Try to manually load the block index database with incorrect runtime ID.
// Use path from the first runtime.
_, err = New(dataDir, runtimeID2, NewDefaultConfig(), true)
_, err = New(dataDir, runtimeID2, NewDefaultConfig(), true, false)
require.Error(err, "New should return an error on runtime mismatch")

history, err = New(dataDir, runtimeID, NewDefaultConfig(), true)
history, err = New(dataDir, runtimeID, NewDefaultConfig(), true, false)
require.NoError(err, "New")

require.Equal(runtimeID, history.RuntimeID())
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestWatchBlocks(t *testing.T) {
runtimeID := common.NewTestNamespaceFromSeed([]byte("history test ns 1"), 0)

// Test history with local storage.
history, err := New(dataDir, runtimeID, NewDefaultConfig(), true)
history, err := New(dataDir, runtimeID, NewDefaultConfig(), true, false)
require.NoError(err, "New")
// No blocks should be received.
testWatchBlocks(t, history, 0)
Expand Down Expand Up @@ -246,7 +246,7 @@ func TestWatchBlocks(t *testing.T) {
dataDir2, err := os.MkdirTemp("", "oasis-runtime-history-test_")
require.NoError(err, "TempDir")
defer os.RemoveAll(dataDir2)
history, err = New(dataDir2, runtimeID, NewDefaultConfig(), false)
history, err = New(dataDir2, runtimeID, NewDefaultConfig(), false, false)
require.NoError(err, "New")
// No blocks should be received.
testWatchBlocks(t, history, 0)
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestHistoryPrune(t *testing.T) {
history, err := New(dataDir, runtimeID, &Config{
Pruner: NewKeepLastPruner(10),
PruneInterval: 100 * time.Millisecond,
}, true)
}, true, false)
require.NoError(err, "New")
defer history.Close()

Expand Down Expand Up @@ -417,7 +417,7 @@ func TestHistoryPruneError(t *testing.T) {
history, err := New(dataDir, runtimeID, &Config{
Pruner: NewKeepLastPruner(10),
PruneInterval: 100 * time.Millisecond,
}, true)
}, true, false)
require.NoError(err, "New")
defer history.Close()

Expand Down
2 changes: 1 addition & 1 deletion go/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id common.Nam
// Create runtime history keeper.
// NOTE: Archive node won't commit any new blocks, so disable waiting for storage sync commits.
haveLocalStorageWorker := r.cfg.Mode.HasLocalStorage() && r.consensus.Mode() != consensus.ModeArchive
history, err := history.New(rt.dataDir, id, &r.cfg.History, haveLocalStorageWorker)
history, err := history.New(rt.dataDir, id, &r.cfg.History, haveLocalStorageWorker, r.consensus.Mode() == consensus.ModeArchive)
if err != nil {
return fmt.Errorf("runtime/registry: cannot create block history for runtime %s: %w", id, err)
}
Expand Down

0 comments on commit c472f9e

Please sign in to comment.