Skip to content

Commit

Permalink
Dynamic PDB to Game Room Namespaces (#607)
Browse files Browse the repository at this point in the history
* refactor(watcher): move wait group to struct
* refactor(watcher): spawn goroutines in separate func
* feat(runtime): create and delete PDB in scheduler operations
* feat(runtime): add disruption mitigation
* feat(watcher): add goroutine to check for disruptions
* docs(runtime): add disruption worker and config
  • Loading branch information
hspedro authored Mar 14, 2024
1 parent 6b6a0b7 commit 1f58dbd
Show file tree
Hide file tree
Showing 15 changed files with 639 additions and 52 deletions.
13 changes: 11 additions & 2 deletions cmd/runtimewatcher/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/topfreegames/maestro/internal/core/services/events"
"github.com/topfreegames/maestro/internal/core/services/workers"
"github.com/topfreegames/maestro/internal/core/worker"
workerconfigs "github.com/topfreegames/maestro/internal/core/worker/config"
"github.com/topfreegames/maestro/internal/core/worker/runtimewatcher"
"github.com/topfreegames/maestro/internal/service"
)
Expand All @@ -42,16 +43,24 @@ func provideRuntimeWatcherBuilder() *worker.WorkerBuilder {
}
}

func provideRuntimeWatcherConfig(c config.Config) *workerconfigs.RuntimeWatcherConfig {
return &workerconfigs.RuntimeWatcherConfig{
DisruptionWorkerIntervalSeconds: c.GetDuration("runtimeWatcher.disruptionWorker.intervalSeconds"),
DisruptionSafetyPercentage: c.GetFloat64("runtimeWatcher.disruptionWorker.safetyPercentage"),
}
}

var WorkerOptionsSet = wire.NewSet(
service.NewRuntimeKubernetes,
service.NewRoomStorageRedis,
RoomManagerSet,
wire.Struct(new(worker.WorkerOptions), "RoomManager", "Runtime"))
provideRuntimeWatcherConfig,
wire.Struct(new(worker.WorkerOptions), "Runtime", "RoomStorage", "RoomManager", "RuntimeWatcherConfig"))

var RoomManagerSet = wire.NewSet(
service.NewSchedulerStoragePg,
service.NewClockTime,
service.NewPortAllocatorRandom,
service.NewRoomStorageRedis,
service.NewGameRoomInstanceStorageRedis,
service.NewSchedulerCacheRedis,
service.NewRoomManagerConfig,
Expand Down
28 changes: 20 additions & 8 deletions cmd/runtimewatcher/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ workers:
reporter:
metrics:
intervalMillis: 10000
runtimeWatcher:
disruptionWorker:
intervalSeconds: 5
safetyPercentage: 0.05

services:
roomManager:
Expand Down
3 changes: 2 additions & 1 deletion docs/reference/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ You could find all operations at [Operations section](Operations.md#available-op

> Note: In Maestro a worker is a collection of routines that executes a flow related to one and only one **Scheduler** each.
Runtime Watcher Worker listens to runtime events related to the **Scheduler** and reflects the changes in **Maestro**. Currently, it listens for Game Rooms creation, deletion, and update.
Runtime Watcher Worker listens to runtime events related to the **Scheduler** and reflects the changes in **Maestro**. Currently, it mitigate disruptions by looking at the current
amount of occupied rooms, and it listens for Game Rooms creation, deletion, and update.

![Runtime Watcher Worker IMAGE](../images/Architecture-Runtime-Watcher-Worker.jpg)

Expand Down
27 changes: 25 additions & 2 deletions docs/reference/Kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,31 @@ flowchart BT
```

### Runtime watcher
The runtime watcher component maintains a worker process for each scheduler that keeps watching and processing _change
events_ in pods resources. For doing that, it uses a [pods informer](https://pkg.go.dev/k8s.io/client-go/informers),
The runtime watcher component spawn two types of workers: one that is responsible for mitigating disruptions and another for processing _change events_ in pods resources.

#### Disruption Worker

This worker consists in a single goroutine with a ticker. Each time it runs, it will
check the number of occupied rooms at the time and try to mitigate disruptions. For k8s,
this mitigation consists on applying a PDB to the scheduler's namespace, that has
`minAvailable` equals to the number of occupied rooms plus a safety percentage.

One can configure the interval in which this worker runs and also the safety percentage
in `config/config.yaml`:

```yaml
runtimeWatcher:
disruptionWorker:
intervalSeconds: 5
safetyPercentage: 0.05
```
#### Pod Change Events Worker
For this type of worker, runtime watcher spawns multiple goroutines, maintaining a worker
process for each scheduler that keeps watching and processing _change events_ in pods
resources. For doing that, it uses a
[pods informer](https://pkg.go.dev/k8s.io/client-go/informers),
binding handlers for **add**, **update** and **delete** events for all pods managed by it.
This component is not responsible for updating/creating/deleting
Expand Down
8 changes: 7 additions & 1 deletion internal/adapters/runtime/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@
package kubernetes

import (
"github.com/topfreegames/maestro/internal/core/logs"
"github.com/topfreegames/maestro/internal/core/ports"
"go.uber.org/zap"
kube "k8s.io/client-go/kubernetes"
)

var _ ports.Runtime = (*kubernetes)(nil)

type kubernetes struct {
clientSet kube.Interface
logger *zap.Logger
}

func New(clientSet kube.Interface) *kubernetes {
return &kubernetes{clientSet}
return &kubernetes{
clientSet: clientSet,
logger: zap.L().With(zap.String(logs.LogFieldRuntime, "kubernetes")),
}
}
164 changes: 163 additions & 1 deletion internal/adapters/runtime/kubernetes/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,113 @@ package kubernetes

import (
"context"
"strconv"

"github.com/topfreegames/maestro/internal/core/entities"
"github.com/topfreegames/maestro/internal/core/ports/errors"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
v1Policy "k8s.io/api/policy/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

const (
DefaultDisruptionSafetyPercentage float64 = 0.05
MajorKubeVersionPDB int = 1
MinorKubeVersionPDB int = 21
)

func (k *kubernetes) isPDBSupported() bool {
// Check based on the kube version of the clientSet if PDBs are supported (1.21+)
version, err := k.clientSet.Discovery().ServerVersion()
if err != nil {
k.logger.Warn("Could not get kube API version, can not check for PDB support", zap.Error(err))
return false
}
major, err := strconv.Atoi(version.Major)
if err != nil {
k.logger.Warn(
"Could not convert major kube API version to int, can not check for PDB support",
zap.String("majorKubeAPIVersion", version.Major),
)
return false
}
if major < MajorKubeVersionPDB {
k.logger.Warn(
"Can not create PDB for this kube API version",
zap.Int("majorKubeAPIVersion", major),
zap.Int("majorPDBVersionRequired", MajorKubeVersionPDB),
)
return false
}
minor, err := strconv.Atoi(version.Minor)
if err != nil {
k.logger.Warn(
"Could not convert minor kube API version to int, can not check for PDB support",
zap.String("minorKubeAPIVersion", version.Minor),
)
return false
}
if minor < MinorKubeVersionPDB {
k.logger.Warn(
"Can not create PDB for this kube API version",
zap.Int("minorKubeAPIVersion", minor),
zap.Int("minorPDBVersionRequired", MinorKubeVersionPDB),
)
return false
}
return true
}

func (k *kubernetes) createPDBFromScheduler(ctx context.Context, scheduler *entities.Scheduler) (*v1Policy.PodDisruptionBudget, error) {
if scheduler == nil {
return nil, errors.NewErrInvalidArgument("scheduler pointer can not be nil")
}
pdbSpec := &v1Policy.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: scheduler.Name,
},
Spec: v1Policy.PodDisruptionBudgetSpec{
MinAvailable: &intstr.IntOrString{
Type: intstr.Int,
IntVal: int32(0),
},
},
}

if scheduler.Autoscaling != nil && scheduler.Autoscaling.Enabled {
pdbSpec.Spec.MinAvailable = &intstr.IntOrString{
Type: intstr.Int,
IntVal: int32(scheduler.Autoscaling.Min),
}
}

pdb, err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Create(ctx, pdbSpec, metav1.CreateOptions{})
if err != nil && !kerrors.IsAlreadyExists(err) {
k.logger.Warn("error creating pdb", zap.String("scheduler", scheduler.Name), zap.Error(err))
return nil, err
}

return pdb, nil
}

func (k *kubernetes) deletePDBFromScheduler(ctx context.Context, scheduler *entities.Scheduler) error {
if scheduler == nil {
return errors.NewErrInvalidArgument("scheduler pointer can not be nil")
}
if !k.isPDBSupported() {
return errors.NewErrUnexpected("PDBs are not supported for this kube API version")
}
err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Delete(ctx, scheduler.Name, metav1.DeleteOptions{})
if err != nil && !kerrors.IsNotFound(err) {
k.logger.Warn("error deleting pdb", zap.String("scheduler", scheduler.Name), zap.Error(err))
return err
}
return nil
}

func (k *kubernetes) CreateScheduler(ctx context.Context, scheduler *entities.Scheduler) error {
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -48,11 +147,20 @@ func (k *kubernetes) CreateScheduler(ctx context.Context, scheduler *entities.Sc
return errors.NewErrUnexpected("error creating scheduler: %s", err)
}

_, err = k.createPDBFromScheduler(ctx, scheduler)
if err != nil {
k.logger.Warn("PDB Creation during scheduler creation failed", zap.String("scheduler", scheduler.Name), zap.Error(err))
}

return nil
}

func (k *kubernetes) DeleteScheduler(ctx context.Context, scheduler *entities.Scheduler) error {
err := k.clientSet.CoreV1().Namespaces().Delete(ctx, scheduler.Name, metav1.DeleteOptions{})
err := k.deletePDBFromScheduler(ctx, scheduler)
if err != nil {
k.logger.Warn("PDB Deletion during scheduler deletion failed", zap.String("scheduler", scheduler.Name), zap.Error(err))
}
err = k.clientSet.CoreV1().Namespaces().Delete(ctx, scheduler.Name, metav1.DeleteOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return errors.NewErrNotFound("scheduler '%s' not found", scheduler.Name)
Expand All @@ -63,3 +171,57 @@ func (k *kubernetes) DeleteScheduler(ctx context.Context, scheduler *entities.Sc

return nil
}

func (k *kubernetes) MitigateDisruption(
ctx context.Context,
scheduler *entities.Scheduler,
roomAmount int,
safetyPercentage float64,
) error {
if scheduler == nil {
return errors.NewErrInvalidArgument("empty pointer received for scheduler, can not mitigate disruptions")
}

incSafetyPercentage := 1.0
if safetyPercentage < DefaultDisruptionSafetyPercentage {
k.logger.Warn(
"invalid safety percentage, using default percentage",
zap.Float64("safetyPercentage", safetyPercentage),
zap.Float64("DefaultDisruptionSafetyPercentage", DefaultDisruptionSafetyPercentage),
)
safetyPercentage = DefaultDisruptionSafetyPercentage
}
incSafetyPercentage += safetyPercentage

// For kubernetes mitigating disruptions means updating the current PDB
// minAvailable to the number of occupied rooms if above a threshold
pdb, err := k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Get(ctx, scheduler.Name, metav1.GetOptions{})
if err != nil && !kerrors.IsNotFound(err) {
// Non-recoverable errors
return errors.NewErrUnexpected("non recoverable error when getting PDB for scheduler '%s': %s", scheduler.Name, err)
}

if pdb == nil || kerrors.IsNotFound(err) {
pdb, err = k.createPDBFromScheduler(ctx, scheduler)
if err != nil {
return errors.NewErrUnexpected("error creating PDB for scheduler '%s': %s", scheduler.Name, err)
}
}

currentPdbMinAvailable := pdb.Spec.MinAvailable.IntVal
if currentPdbMinAvailable == int32(float64(roomAmount)*incSafetyPercentage) {
return nil
}

pdb.Spec.MinAvailable = &intstr.IntOrString{
Type: intstr.Int,
IntVal: int32(float64(roomAmount) * incSafetyPercentage),
}

_, err = k.clientSet.PolicyV1().PodDisruptionBudgets(scheduler.Name).Update(ctx, pdb, metav1.UpdateOptions{})
if err != nil {
return errors.NewErrUnexpected("error updating PDB to mitigate disruptions for scheduler '%s': %s", scheduler.Name, err)
}

return nil
}
Loading

0 comments on commit 1f58dbd

Please sign in to comment.