Skip to content

Commit

Permalink
Update state updater and Lsm holder to use conditional put using etag (
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox authored Jan 4, 2025
1 parent d183c4e commit 8703a18
Show file tree
Hide file tree
Showing 16 changed files with 329 additions and 515 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Agent struct {
partitionLeaders map[string]map[int]map[int]int32
clusterMembershipFactory ClusterMembershipFactory
tableGetter sst.TableGetter
authCaches *auth.UserAuthCaches
authCaches *auth.UserAuthCaches
}

func NewAgent(cfg Conf, objStore objstore.Client) (*Agent, error) {
Expand Down
7 changes: 1 addition & 6 deletions agent/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,8 @@ func CreateConfFromCommandConf(commandConf CommandConf) (Conf, error) {
cfg.ClusterMembershipConfig.EvictionInterval = evictionInterval
// configure controller
cfg.ControllerConf.SSTableBucketName = dataBucketName
cfg.ControllerConf.ControllerMetaDataKeyPrefix = "meta-data"
cfg.ControllerConf.ControllerMetaDataKey = "meta-data"
cfg.ControllerConf.ControllerMetaDataBucketName = dataBucketName
// We put the controller state machine in a different bucket - as this will likely be configured with an expiry
// TODO move to Dynamo based state machine
controllerStateMachineBucketName := commandConf.ClusterName + "-controller-sm"
cfg.ControllerConf.ControllerStateUpdaterBucketName = controllerStateMachineBucketName
cfg.ControllerConf.ControllerStateUpdaterKeyPrefix = "meta-state"
cfg.ControllerConf.AzInfo = commandConf.Location
cfg.ControllerConf.LsmConf.SSTableBucketName = dataBucketName
// configure compaction workers
Expand Down
10 changes: 5 additions & 5 deletions cluster/clustered_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type ClusteredData struct {
dataKeyPrefix string
stateMachine *StateUpdater
objStoreClient objstore.Client
epoch uint64
opts ClusteredDataConf
readyState clusteredDataState
epoch uint64
opts ClusteredDataConf
readyState clusteredDataState
stopping atomic.Bool
logPrefix string
}
Expand All @@ -66,8 +66,8 @@ func NewClusteredData(stateMachineBucketName string, stateMachineKeyPrefix strin
objStoreClient: objStoreClient,
dataBucketName: dataBucketName,
dataKeyPrefix: dataKeyPrefix,
stateMachine: NewStateUpdator(stateMachineBucketName, stateMachineKeyPrefix, objStoreClient,
StateUpdatorOpts{}),
stateMachine: NewStateUpdater(stateMachineBucketName, stateMachineKeyPrefix, objStoreClient,
StateUpdaterOpts{}),
opts: opts,
readyState: clusteredDataStateReady,
logPrefix: fmt.Sprintf("clustered data(%s / %s) -", dataBucketName, stateMachineBucketName),
Expand Down
2 changes: 1 addition & 1 deletion cluster/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewMembership(cfg MembershipConf, data []byte, objStoreClient objstore.Clie
return &Membership{
id: -1,
data: data,
stateUpdater: NewStateUpdator(cfg.BucketName, cfg.KeyPrefix, objStoreClient, StateUpdatorOpts{}),
stateUpdater: NewStateUpdater(cfg.BucketName, cfg.KeyPrefix, objStoreClient, StateUpdaterOpts{}),
updateInterval: cfg.UpdateInterval,
evictionDuration: cfg.EvictionInterval,
stateChangedCallback: stateChangedCallback,
Expand Down
Loading

0 comments on commit 8703a18

Please sign in to comment.