Skip to content

Commit

Permalink
Make Allocator batchWaitTime configurable (googleforgames#2589)
Browse files Browse the repository at this point in the history
* Make Allocator batchWaitTime configurable

Resolves googleforgames#2586

* removed `allocation` prefix on var declaration
* Updated documentation
* Include `allocationBatchWaitTime` documentation entries

Co-authored-by: Mark Mandel <markmandel@google.com>
  • Loading branch information
valentintorikian and markmandel authored May 26, 2022
1 parent aebe6fc commit f781316
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 67 deletions.
7 changes: 4 additions & 3 deletions cmd/allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func main() {
return err
})

h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout)
h := newServiceHandler(kubeClient, agonesClient, health, conf.MTLSDisabled, conf.TLSDisabled, conf.remoteAllocationTimeout, conf.totalRemoteAllocationTimeout, conf.allocationBatchWaitTime)

if !h.tlsDisabled {
watcherTLS, err := fsnotify.NewWatcher()
Expand Down Expand Up @@ -280,7 +280,7 @@ func runGRPC(h *serviceHandler, grpcPort int) {
}()
}

func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *serviceHandler {
func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.Interface, health healthcheck.Handler, mTLSDisabled bool, tlsDisabled bool, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, allocationBatchWaitTime time.Duration) *serviceHandler {
defaultResync := 30 * time.Second
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)
Expand All @@ -293,7 +293,8 @@ func newServiceHandler(kubeClient kubernetes.Interface, agonesClient versioned.I
kubeClient,
gameserverallocations.NewAllocationCache(agonesInformerFactory.Agones().V1().GameServers(), gsCounter, health),
remoteAllocationTimeout,
totalRemoteAllocationTimeout)
totalRemoteAllocationTimeout,
allocationBatchWaitTime)

ctx := signals.NewSigKillContext()
h := serviceHandler{
Expand Down
5 changes: 5 additions & 0 deletions cmd/allocator/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
apiServerSustainedQPSFlag = "api-server-qps"
apiServerBurstQPSFlag = "api-server-qps-burst"
logLevelFlag = "log-level"
allocationBatchWaitTime = "allocation-batch-wait-time"
)

func init() {
Expand All @@ -62,6 +63,7 @@ type config struct {
LogLevel string
totalRemoteAllocationTimeout time.Duration
remoteAllocationTimeout time.Duration
allocationBatchWaitTime time.Duration
}

func parseEnvFlags() config {
Expand All @@ -79,6 +81,7 @@ func parseEnvFlags() config {
viper.SetDefault(remoteAllocationTimeoutFlag, 10*time.Second)
viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second)
viper.SetDefault(logLevelFlag, "Info")
viper.SetDefault(allocationBatchWaitTime, 500*time.Millisecond)

pflag.Int32(httpPortFlag, viper.GetInt32(httpPortFlag), "Port to listen on for REST requests")
pflag.Int32(grpcPortFlag, viper.GetInt32(grpcPortFlag), "Port to listen on for gRPC requests")
Expand All @@ -93,6 +96,7 @@ func parseEnvFlags() config {
pflag.Duration(remoteAllocationTimeoutFlag, viper.GetDuration(remoteAllocationTimeoutFlag), "Flag to set remote allocation call timeout.")
pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.")
pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level")
pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand Down Expand Up @@ -127,6 +131,7 @@ func parseEnvFlags() config {
LogLevel: viper.GetString(logLevelFlag),
remoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag),
totalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag),
allocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime),
}
}

Expand Down
96 changes: 51 additions & 45 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (
logLevelFlag = "log-level"
logSizeLimitMBFlag = "log-size-limit-mb"
kubeconfigFlag = "kubeconfig"
allocationBatchWaitTime = "allocation-batch-wait-time"
defaultResync = 30 * time.Second
)

Expand Down Expand Up @@ -210,7 +211,8 @@ func main() {
gsSetController := gameserversets.NewController(wh, health, gsCounter,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory, agonesClient, agonesInformerFactory, 10*time.Second, 30*time.Second)
gasController := gameserverallocations.NewController(api, health, gsCounter, kubeClient, kubeInformerFactory,
agonesClient, agonesInformerFactory, 10*time.Second, 30*time.Second, ctlConf.AllocationBatchWaitTime)
fasController := fleetautoscalers.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)

Expand Down Expand Up @@ -253,6 +255,7 @@ func parseEnvFlags() config {
viper.SetDefault(enablePrometheusMetricsFlag, true)
viper.SetDefault(enableStackdriverMetricsFlag, false)
viper.SetDefault(stackdriverLabels, "")
viper.SetDefault(allocationBatchWaitTime, 500*time.Millisecond)

viper.SetDefault(projectIDFlag, "")
viper.SetDefault(numWorkersFlag, 64)
Expand Down Expand Up @@ -284,6 +287,7 @@ func parseEnvFlags() config {
pflag.String(logDirFlag, viper.GetString(logDirFlag), "If set, store logs in a given directory.")
pflag.Int32(logSizeLimitMBFlag, 1000, "Log file size limit in MB")
pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Agones Log level")
pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches")
runtime.FeaturesBindFlags()
pflag.Parse()

Expand Down Expand Up @@ -336,55 +340,57 @@ func parseEnvFlags() config {
}

return config{
MinPort: int32(viper.GetInt64(minPortFlag)),
MaxPort: int32(viper.GetInt64(maxPortFlag)),
SidecarImage: viper.GetString(sidecarImageFlag),
SidecarCPURequest: requestCPU,
SidecarCPULimit: limitCPU,
SidecarMemoryRequest: requestMemory,
SidecarMemoryLimit: limitMemory,
SdkServiceAccount: viper.GetString(sdkServerAccountFlag),
AlwaysPullSidecar: viper.GetBool(pullSidecarFlag),
KeyFile: viper.GetString(keyFileFlag),
CertFile: viper.GetString(certFileFlag),
KubeConfig: viper.GetString(kubeconfigFlag),
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
NumWorkers: int(viper.GetInt32(numWorkersFlag)),
APIServerSustainedQPS: int(viper.GetInt32(apiServerSustainedQPSFlag)),
APIServerBurstQPS: int(viper.GetInt32(apiServerBurstQPSFlag)),
LogDir: viper.GetString(logDirFlag),
LogLevel: viper.GetString(logLevelFlag),
LogSizeLimitMB: int(viper.GetInt32(logSizeLimitMBFlag)),
StackdriverLabels: viper.GetString(stackdriverLabels),
MinPort: int32(viper.GetInt64(minPortFlag)),
MaxPort: int32(viper.GetInt64(maxPortFlag)),
SidecarImage: viper.GetString(sidecarImageFlag),
SidecarCPURequest: requestCPU,
SidecarCPULimit: limitCPU,
SidecarMemoryRequest: requestMemory,
SidecarMemoryLimit: limitMemory,
SdkServiceAccount: viper.GetString(sdkServerAccountFlag),
AlwaysPullSidecar: viper.GetBool(pullSidecarFlag),
KeyFile: viper.GetString(keyFileFlag),
CertFile: viper.GetString(certFileFlag),
KubeConfig: viper.GetString(kubeconfigFlag),
PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag),
Stackdriver: viper.GetBool(enableStackdriverMetricsFlag),
GCPProjectID: viper.GetString(projectIDFlag),
NumWorkers: int(viper.GetInt32(numWorkersFlag)),
APIServerSustainedQPS: int(viper.GetInt32(apiServerSustainedQPSFlag)),
APIServerBurstQPS: int(viper.GetInt32(apiServerBurstQPSFlag)),
LogDir: viper.GetString(logDirFlag),
LogLevel: viper.GetString(logLevelFlag),
LogSizeLimitMB: int(viper.GetInt32(logSizeLimitMBFlag)),
StackdriverLabels: viper.GetString(stackdriverLabels),
AllocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime),
}
}

// config stores all required configuration to create a game server controller.
type config struct {
MinPort int32
MaxPort int32
SidecarImage string
SidecarCPURequest resource.Quantity
SidecarCPULimit resource.Quantity
SidecarMemoryRequest resource.Quantity
SidecarMemoryLimit resource.Quantity
SdkServiceAccount string
AlwaysPullSidecar bool
PrometheusMetrics bool
Stackdriver bool
StackdriverLabels string
KeyFile string
CertFile string
KubeConfig string
GCPProjectID string
NumWorkers int
APIServerSustainedQPS int
APIServerBurstQPS int
LogDir string
LogLevel string
LogSizeLimitMB int
MinPort int32
MaxPort int32
SidecarImage string
SidecarCPURequest resource.Quantity
SidecarCPULimit resource.Quantity
SidecarMemoryRequest resource.Quantity
SidecarMemoryLimit resource.Quantity
SdkServiceAccount string
AlwaysPullSidecar bool
PrometheusMetrics bool
Stackdriver bool
StackdriverLabels string
KeyFile string
CertFile string
KubeConfig string
GCPProjectID string
NumWorkers int
APIServerSustainedQPS int
APIServerBurstQPS int
LogDir string
LogLevel string
LogSizeLimitMB int
AllocationBatchWaitTime time.Duration
}

// validate ensures the ctlConfig data is valid.
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/templates/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ spec:
value: {{ .Values.agones.controller.logLevel | quote }}
- name: FEATURE_GATES
value: {{ .Values.agones.featureGates | quote }}
- name: ALLOCATION_BATCH_WAIT_TIME
value: {{ .Values.agones.controller.allocationBatchWaitTime | quote }}
{{- if .Values.agones.controller.persistentLogs }}
- name: LOG_DIR
value: "/home/agones/logs"
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/templates/service/allocation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ spec:
value: {{ .Values.agones.allocator.logLevel | quote }}
- name: FEATURE_GATES
value: {{ .Values.agones.featureGates | quote }}
- name: ALLOCATION_BATCH_WAIT_TIME
value: {{ .Values.agones.allocator.allocationBatchWaitTime | quote }}
ports:
{{- if .Values.agones.allocator.service.http.enabled }}
- name: {{ .Values.agones.allocator.service.http.portName }}
Expand Down
2 changes: 2 additions & 0 deletions install/helm/agones/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ agones:
periodSeconds: 3
failureThreshold: 3
timeoutSeconds: 1
allocationBatchWaitTime: 500ms
ping:
install: true
resources: {}
Expand Down Expand Up @@ -199,6 +200,7 @@ agones:
disableTLS: false
remoteAllocationTimeout: 10s
totalRemoteAllocationTimeout: 30s
allocationBatchWaitTime: 500ms
image:
registry: gcr.io/agones-images
tag: 1.24.0-dev
Expand Down
4 changes: 4 additions & 0 deletions install/yaml/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14040,6 +14040,8 @@ spec:
value: "info"
- name: FEATURE_GATES
value: ""
- name: ALLOCATION_BATCH_WAIT_TIME
value: "500ms"
- name: LOG_DIR
value: "/home/agones/logs"
- name: LOG_SIZE_LIMIT_MB
Expand Down Expand Up @@ -14264,6 +14266,8 @@ spec:
value: "info"
- name: FEATURE_GATES
value: ""
- name: ALLOCATION_BATCH_WAIT_TIME
value: "500ms"
ports:
- name: https
containerPort: 8443
Expand Down
7 changes: 4 additions & 3 deletions pkg/gameserverallocations/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ const (
allocatorPort = "443"
maxBatchQueue = 100
maxBatchBeforeRefresh = 100
batchWaitTime = 500 * time.Millisecond
)

var allocationRetry = wait.Backoff{
Expand Down Expand Up @@ -108,6 +107,7 @@ type Allocator struct {
remoteAllocationCallback func(context.Context, string, grpc.DialOption, *pb.AllocationRequest) (*pb.AllocationResponse, error)
remoteAllocationTimeout time.Duration
totalRemoteAllocationTimeout time.Duration
batchWaitTime time.Duration
}

// request is an async request for allocation
Expand All @@ -125,7 +125,7 @@ type response struct {

// NewAllocator creates an instance of Allocator
func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPolicyInformer, secretInformer informercorev1.SecretInformer, gameServerGetter getterv1.GameServersGetter,
kubeClient kubernetes.Interface, allocationCache *AllocationCache, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration) *Allocator {
kubeClient kubernetes.Interface, allocationCache *AllocationCache, remoteAllocationTimeout time.Duration, totalRemoteAllocationTimeout time.Duration, batchWaitTime time.Duration) *Allocator {
ah := &Allocator{
pendingRequests: make(chan request, maxBatchQueue),
allocationPolicyLister: policyInformer.Lister(),
Expand All @@ -134,6 +134,7 @@ func NewAllocator(policyInformer multiclusterinformerv1.GameServerAllocationPoli
secretSynced: secretInformer.Informer().HasSynced,
gameServerGetter: gameServerGetter,
allocationCache: allocationCache,
batchWaitTime: batchWaitTime,
remoteAllocationTimeout: remoteAllocationTimeout,
totalRemoteAllocationTimeout: totalRemoteAllocationTimeout,
remoteAllocationCallback: func(ctx context.Context, endpoint string, dialOpts grpc.DialOption, request *pb.AllocationRequest) (*pb.AllocationResponse, error) {
Expand Down Expand Up @@ -531,7 +532,7 @@ func (c *Allocator) ListenAndAllocate(ctx context.Context, updateWorkerCount int
list = nil
requestCount = 0
// slow down cpu churn, and allow items to batch
time.Sleep(batchWaitTime)
time.Sleep(c.batchWaitTime)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/gameserverallocations/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestAllocatorApplyAllocationToGameServer(t *testing.T) {
m.KubeInformerFactory.Core().V1().Secrets(),
m.AgonesClient.AgonesV1(), m.KubeClient,
NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory), healthcheck.NewHandler()),
time.Second, 5*time.Second,
time.Second, 5*time.Second, 500*time.Millisecond,
)

gs, err := allocator.applyAllocationToGameServer(ctx, allocationv1.MetaPatch{}, &agonesv1.GameServer{})
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestAllocationApplyAllocationError(t *testing.T) {
m.KubeInformerFactory.Core().V1().Secrets(),
m.AgonesClient.AgonesV1(), m.KubeClient,
NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory), healthcheck.NewHandler()),
time.Second, 5*time.Second,
time.Second, 5*time.Second, 500*time.Millisecond,
)

gsa, err := allocator.applyAllocationToGameServer(ctx, allocationv1.MetaPatch{}, &agonesv1.GameServer{})
Expand Down Expand Up @@ -686,7 +686,8 @@ func newFakeAllocator() (*Allocator, agtesting.Mocks) {
m.KubeClient,
NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), counter, healthcheck.NewHandler()),
time.Second,
5*time.Second)
5*time.Second,
500*time.Millisecond)
a.recorder = m.FakeRecorder

return a, m
Expand Down
4 changes: 3 additions & 1 deletion pkg/gameserverallocations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func NewController(apiServer *apiserver.APIServer,
agonesInformerFactory externalversions.SharedInformerFactory,
remoteAllocationTimeout time.Duration,
totalAllocationTimeout time.Duration,
allocationBatchWaitTime time.Duration,
) *Controller {
c := &Controller{
api: apiServer,
Expand All @@ -71,7 +72,8 @@ func NewController(apiServer *apiserver.APIServer,
kubeClient,
NewAllocationCache(agonesInformerFactory.Agones().V1().GameServers(), counter, health),
remoteAllocationTimeout,
totalAllocationTimeout),
totalAllocationTimeout,
allocationBatchWaitTime),
}
c.baseLogger = runtime.NewLoggerWithType(c)

Expand Down
2 changes: 1 addition & 1 deletion pkg/gameserverallocations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func newFakeControllerWithTimeout(remoteAllocationTimeout time.Duration, totalRe
m.Mux = http.NewServeMux()
counter := gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory)
api := apiserver.NewAPIServer(m.Mux)
c := NewController(api, healthcheck.NewHandler(), counter, m.KubeClient, m.KubeInformerFactory, m.AgonesClient, m.AgonesInformerFactory, remoteAllocationTimeout, totalRemoteAllocationTimeout)
c := NewController(api, healthcheck.NewHandler(), counter, m.KubeClient, m.KubeInformerFactory, m.AgonesClient, m.AgonesInformerFactory, remoteAllocationTimeout, totalRemoteAllocationTimeout, 500*time.Millisecond)
c.recorder = m.FakeRecorder
c.allocator.recorder = m.FakeRecorder
return c, m
Expand Down
Loading

0 comments on commit f781316

Please sign in to comment.