Skip to content

Commit

Permalink
Merge pull request #5943 from oasisprotocol/peternose/trivial/refacto…
Browse files Browse the repository at this point in the history
…r-cleanup

go: Refactor and cleanup code
  • Loading branch information
peternose authored Nov 25, 2024
2 parents 993982b + 7a86da6 commit d35e24b
Show file tree
Hide file tree
Showing 30 changed files with 290 additions and 365 deletions.
Empty file added .changelog/5943.trivial.md
Empty file.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 14 additions & 20 deletions go/common/badger/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
package badger

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/dgraph-io/badger/v4"

"github.com/oasisprotocol/oasis-core/go/common/logging"
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
)

const (
Expand Down Expand Up @@ -50,22 +51,20 @@ type GCWorker struct {

db *badger.DB

closeOnce sync.Once
closeCh chan struct{}
closedCh chan struct{}
startOne cmSync.One
}

// Close halts the GC worker.
func (gc *GCWorker) Close() {
gc.closeOnce.Do(func() {
close(gc.closeCh)
<-gc.closedCh
})
// Start starts the GC worker.
func (gc *GCWorker) Start() {
gc.startOne.TryStart(gc.run)
}

func (gc *GCWorker) worker() {
defer close(gc.closedCh)
// Stop halts the GC worker.
func (gc *GCWorker) Stop() {
gc.startOne.TryStop()
}

func (gc *GCWorker) run(ctx context.Context) {
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()

Expand All @@ -79,7 +78,7 @@ func (gc *GCWorker) worker() {

for {
select {
case <-gc.closeCh:
case <-ctx.Done():
return
case <-ticker.C:
}
Expand All @@ -99,14 +98,9 @@ func (gc *GCWorker) worker() {
// NewGCWorker creates a new BadgerDB value log GC worker for the provided
// db, logging to the specified logger.
func NewGCWorker(logger *logging.Logger, db *badger.DB) *GCWorker {
gc := &GCWorker{
return &GCWorker{
logger: logger,
db: db,
closeCh: make(chan struct{}),
closedCh: make(chan struct{}),
startOne: cmSync.NewOne(),
}

go gc.worker()

return gc
}
7 changes: 5 additions & 2 deletions go/common/persistent/persistent.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type CommonStore struct {

// Close closes the database handle.
func (cs *CommonStore) Close() {
cs.gc.Close()
cs.gc.Stop()
cs.db.Close()
}

Expand All @@ -61,9 +61,12 @@ func NewCommonStore(dataDir string) (*CommonStore, error) {
return nil, fmt.Errorf("failed to open persistence database: %w", err)
}

gc := cmnBadger.NewGCWorker(logger, db)
gc.Start()

cs := &CommonStore{
db: db,
gc: cmnBadger.NewGCWorker(logger, db),
gc: gc,
}

return cs, nil
Expand Down
5 changes: 5 additions & 0 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (m NodeMode) IsClientOnly() bool {
return false
}

// IsArchive returns true iff the mode is set to archive node mode.
func (m NodeMode) IsArchive() bool {
return m == ModeArchive
}

// HasLocalStorage returns true iff the mode is one that has local storage.
func (m NodeMode) HasLocalStorage() bool {
switch m {
Expand Down
7 changes: 5 additions & 2 deletions go/consensus/cometbft/db/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ func New(fn string, noSuffix bool) (dbm.DB, error) {
return nil, fmt.Errorf("cometbft/db/badger: failed to open database: %w", err)
}

gc := cmnBadger.NewGCWorker(logger, db)
gc.Start()

impl := &badgerDBImpl{
logger: logger,
db: db,
gc: cmnBadger.NewGCWorker(logger, db),
gc: gc,
}

return impl, nil
Expand Down Expand Up @@ -211,7 +214,7 @@ func (d *badgerDBImpl) ReverseIterator(start, end []byte) (dbm.Iterator, error)
func (d *badgerDBImpl) Close() error {
err := os.ErrClosed
d.closeOnce.Do(func() {
d.gc.Close()
d.gc.Stop()

if err = d.db.Close(); err != nil {
d.logger.Error("Close failed",
Expand Down
6 changes: 1 addition & 5 deletions go/consensus/cometbft/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,11 +526,7 @@ func (t *fullService) lazyInit() error { // nolint: gocyclo
return err
}
pruneCfg.NumKept = config.GlobalConfig.Consensus.Prune.NumKept
pruneCfg.PruneInterval = config.GlobalConfig.Consensus.Prune.Interval
const minPruneInterval = 1 * time.Second
if pruneCfg.PruneInterval < minPruneInterval {
pruneCfg.PruneInterval = minPruneInterval
}
pruneCfg.PruneInterval = max(config.GlobalConfig.Consensus.Prune.Interval, time.Second)

appConfig := &abci.ApplicationConfig{
DataDir: filepath.Join(t.dataDir, tmcommon.StateDir),
Expand Down
8 changes: 4 additions & 4 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common"
"github.com/oasisprotocol/oasis-core/go/worker/compute/executor"
workerKeymanager "github.com/oasisprotocol/oasis-core/go/worker/keymanager"
"github.com/oasisprotocol/oasis-core/go/worker/registration"
workerRegistration "github.com/oasisprotocol/oasis-core/go/worker/registration"
workerSentry "github.com/oasisprotocol/oasis-core/go/worker/sentry"
workerStorage "github.com/oasisprotocol/oasis-core/go/worker/storage"
)
Expand Down Expand Up @@ -86,7 +86,7 @@ type Node struct {
ClientWorker *workerClient.Worker
SentryWorker *workerSentry.Worker
P2P p2pAPI.Service
RegistrationWorker *registration.Worker
RegistrationWorker *workerRegistration.Worker
KeymanagerWorker *workerKeymanager.Worker
BeaconWorker *workerBeacon.Worker
readyCh chan struct{}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (n *Node) initRuntimeWorkers() error {
workerCommonCfg := n.CommonWorker.GetConfig()

// Initialize the registration worker.
n.RegistrationWorker, err = registration.New(
n.RegistrationWorker, err = workerRegistration.New(
n.Consensus.Beacon(),
n.Consensus.Registry(),
n.Identity,
Expand All @@ -269,7 +269,7 @@ func (n *Node) initRuntimeWorkers() error {
n.RuntimeRegistry,
)
if genesisDoc.Registry.Parameters.DebugAllowUnroutableAddresses {
registration.DebugForceAllowUnroutableAddresses()
workerRegistration.DebugForceAllowUnroutableAddresses()
}
if err != nil {
n.logger.Error("failed to initialize worker registration",
Expand Down
3 changes: 2 additions & 1 deletion go/oasis-node/cmd/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func doMigrate(_ *cobra.Command, args []string) error {
err := func() error {
runtimeDir := registry.GetRuntimeStateDir(dataDir, rt)

history, err := history.New(runtimeDir, rt, nil, false)
prunerFactory := history.NewNonePrunerFactory()
history, err := history.New(rt, runtimeDir, prunerFactory, false)
if err != nil {
return fmt.Errorf("error creating history provider: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/oasis-test-runner/oasis/oasis.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (n *Node) Start() error {

if hosted.localConfig != nil {
if n.Config.Runtime.RuntimeConfig == nil {
n.Config.Runtime.RuntimeConfig = make(map[string]interface{})
n.Config.Runtime.RuntimeConfig = make(map[string]map[string]interface{})
}
n.Config.Runtime.RuntimeConfig[hosted.runtime.ID().String()] = hosted.localConfig
}
Expand Down
25 changes: 15 additions & 10 deletions go/p2p/peermgmt/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,12 @@ func (r *peerRegistry) watch(ctx context.Context) {
for {
select {
case nodes := <-nodeListCh:
r.handleNodes(nodes.Nodes, true)
r.clearNodes()
r.handleNodes(nodes.Nodes)

case nodeEv := <-nodeCh:
if nodeEv.IsRegistration {
r.handleNodes([]*node.Node{nodeEv.Node}, false)
r.handleNodes([]*node.Node{nodeEv.Node})
}

case <-ctx.Done():
Expand All @@ -171,8 +172,19 @@ func (r *peerRegistry) watch(ctx context.Context) {
}
}

// clearNodes clears the protocols and topics supported by the observed nodes.
func (r *peerRegistry) clearNodes() {
r.mu.Lock()
defer r.mu.Unlock()

// Clear current state.
r.peers = make(map[core.PeerID]peer.AddrInfo)
r.protocolPeers = make(map[core.ProtocolID]map[core.PeerID]struct{})
r.topicPeers = make(map[string]map[core.PeerID]struct{})
}

// handleNodes updates protocols and topics supported by the given nodes and resets them if needed.
func (r *peerRegistry) handleNodes(nodes []*node.Node, reset bool) {
func (r *peerRegistry) handleNodes(nodes []*node.Node) {
defer r.initOnce.Do(func() {
close(r.initCh)
})
Expand Down Expand Up @@ -202,13 +214,6 @@ func (r *peerRegistry) handleNodes(nodes []*node.Node, reset bool) {
r.mu.Lock()
defer r.mu.Unlock()

// Remove previous state.
if reset {
r.peers = make(map[core.PeerID]peer.AddrInfo)
r.protocolPeers = make(map[core.ProtocolID]map[core.PeerID]struct{})
r.topicPeers = make(map[string]map[core.PeerID]struct{})
}

// Add/update new peers.
for p, data := range peers {
// Remove old protocols/topics.
Expand Down
2 changes: 1 addition & 1 deletion go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ type VersionInfo struct {
// format if any.
TEE []byte `json:"tee,omitempty"`

// BundleChecksum is the SHA256 hash of the runtime bundle (optional).
// BundleChecksum is the SHA256 hash of the runtime bundle manifest (optional).
BundleChecksum []byte `json:"bundle_checksum,omitempty"`
}

Expand Down
24 changes: 12 additions & 12 deletions go/runtime/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ func (bnd *Bundle) ExplodedPath(dataDir, fn string) string {
return filepath.Join(subDir, fn)
}

// WriteExploded writes the extracted runtime bundle to the appropriate
// location under the specified data directory.
func (bnd *Bundle) WriteExploded(dataDir string) error {
// WriteExploded extracts the runtime bundle, writes it to the appropriate
// data directory, and returns the path to the written location.
func (bnd *Bundle) WriteExploded(dataDir string) (string, error) {
if err := bnd.Validate(); err != nil {
return fmt.Errorf("runtime/bundle: refusing to explode malformed bundle: %w", err)
return "", fmt.Errorf("runtime/bundle: refusing to explode malformed bundle: %w", err)
}

subDir := bnd.ExplodedPath(dataDir, "")
Expand All @@ -432,30 +432,30 @@ func (bnd *Bundle) WriteExploded(dataDir string) error {
fn = bnd.ExplodedPath(dataDir, fn)
h, rdErr := HashAllData(NewFileData(fn))
if rdErr != nil {
return fmt.Errorf("runtime/bundle: failed to re-load asset '%s': %w", fn, rdErr)
return "", fmt.Errorf("runtime/bundle: failed to re-load asset '%s': %w", fn, rdErr)
}

he, rdErr := HashAllData(expected)
if rdErr != nil {
return fmt.Errorf("runtime/bundle: failed to re-load asset '%s': %w", fn, rdErr)
return "", fmt.Errorf("runtime/bundle: failed to re-load asset '%s': %w", fn, rdErr)
}

if !h.Equal(&he) {
return fmt.Errorf("runtime/bundle: corrupt asset: '%s'", fn)
return "", fmt.Errorf("runtime/bundle: corrupt asset: '%s'", fn)
}
}
default:
// Extract the bundle to disk.
if !os.IsNotExist(err) {
return fmt.Errorf("runtime/bundle: failed to stat asset directory '%s': %w", subDir, err)
return "", fmt.Errorf("runtime/bundle: failed to stat asset directory '%s': %w", subDir, err)
}

for _, v := range []string{
subDir,
bnd.ExplodedPath(dataDir, manifestPath),
} {
if err = os.MkdirAll(v, 0o700); err != nil {
return fmt.Errorf("runtime/bundle: failed to create asset sub-dir '%s': %w", v, err)
return "", fmt.Errorf("runtime/bundle: failed to create asset sub-dir '%s': %w", v, err)
}
}
for fn, data := range bnd.Data {
Expand All @@ -481,20 +481,20 @@ func (bnd *Bundle) WriteExploded(dataDir string) error {
return nil
}()
if err != nil {
return err
return "", err
}
}

for id, comp := range bnd.Manifest.GetAvailableComponents() {
if comp.Executable != "" {
if err := os.Chmod(bnd.ExplodedPath(dataDir, comp.Executable), 0o700); err != nil {
return fmt.Errorf("runtime/bundle: failed to fixup executable permissions for '%s': %w", id, err)
return "", fmt.Errorf("runtime/bundle: failed to fixup executable permissions for '%s': %w", id, err)
}
}
}
}

return nil
return subDir, nil
}

// Close closes the bundle, releasing resources.
Expand Down
10 changes: 5 additions & 5 deletions go/runtime/bundle/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,17 @@ func TestBundle(t *testing.T) {
})

t.Run("Explode", func(t *testing.T) {
err := bundle.WriteExploded(tmpDir)
_, err := bundle.WriteExploded(tmpDir)
require.NoError(t, err, "WriteExploded")

// Abuse the fact that we do an integrity check if the bundle
// is already exploded.
err = bundle.WriteExploded(tmpDir)
_, err = bundle.WriteExploded(tmpDir)
require.NoError(t, err, "WriteExploded(again)")
})
}

func TestDeatchedBundle(t *testing.T) {
func TestDetachedBundle(t *testing.T) {
execFile := os.Args[0]
execBuf, err := os.ReadFile(execFile)
if err != nil {
Expand Down Expand Up @@ -189,12 +189,12 @@ func TestDeatchedBundle(t *testing.T) {
})

t.Run("Explode", func(t *testing.T) {
err := bundle.WriteExploded(tmpDir)
_, err := bundle.WriteExploded(tmpDir)
require.NoError(t, err, "WriteExploded")

// Abuse the fact that we do an integrity check if the bundle
// is already exploded.
err = bundle.WriteExploded(tmpDir)
_, err = bundle.WriteExploded(tmpDir)
require.NoError(t, err, "WriteExploded(again)")
})
}
Expand Down
Loading

0 comments on commit d35e24b

Please sign in to comment.