Skip to content

Commit

Permalink
Support graceful termination (googleforgames#2205)
Browse files Browse the repository at this point in the history
* Support gracefulTerminationPeriod

* add sigint and sigterm
* add gsStateChannel and SDK context
* add feature gate
* add tests 
* add description

Co-authored-by: Robert Bailey <robertbailey@google.com>
Co-authored-by: Mark Mandel <markmandel@google.com>
  • Loading branch information
3 people authored Sep 30, 2021
1 parent 466bc3b commit af3950f
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 2 deletions.
5 changes: 4 additions & 1 deletion cmd/sdk-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func main() {
}

ctx := signals.NewSigKillContext()

grpcServer := grpc.NewServer()
// don't graceful stop, because if we get a SIGKILL signal
// then the gameserver is being shut down, and we no longer
Expand Down Expand Up @@ -139,7 +140,9 @@ func main() {
if err != nil {
logger.WithError(err).Fatalf("Could not start sidecar")
}

if runtime.FeatureEnabled(runtime.FeatureSDKGracefulTermination) {
ctx = s.NewSDKServerContext(ctx)
}
go func() {
err := s.Run(ctx)
if err != nil {
Expand Down
24 changes: 23 additions & 1 deletion pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type SDKServer struct {
gsLabels map[string]string
gsAnnotations map[string]string
gsState agonesv1.GameServerState
gsStateChannel chan agonesv1.GameServerState
gsUpdateMutex sync.RWMutex
gsWaitForSync sync.WaitGroup
reserveTimer *time.Timer
Expand Down Expand Up @@ -134,6 +135,7 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
gsUpdateMutex: sync.RWMutex{},
gsWaitForSync: sync.WaitGroup{},
gsConnectedPlayers: []string{},
gsStateChannel: make(chan agonesv1.GameServerState, 2),
}

s.informerFactory = factory
Expand Down Expand Up @@ -435,11 +437,15 @@ func (s *SDKServer) Allocate(ctx context.Context, e *sdk.Empty) (*sdk.Empty, err
}

// Shutdown enters the Shutdown state change for this GameServer into
// the workqueue so it can be updated
// the workqueue so it can be updated. If gracefulTermination feature is enabled,
// Shutdown will block on GameServer being shutdown.
func (s *SDKServer) Shutdown(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) {
s.logger.Debug("Received Shutdown request, adding to queue")
s.stopReserveTimer()
s.enqueueState(agonesv1.GameServerStateShutdown)
if runtime.FeatureEnabled(runtime.FeatureSDKGracefulTermination) {
s.gsStateChannel <- agonesv1.GameServerStateShutdown
}
return e, nil
}

Expand Down Expand Up @@ -824,3 +830,19 @@ func (s *SDKServer) updateConnectedPlayers(ctx context.Context) error {
s.recorder.Event(gs, corev1.EventTypeNormal, "PlayerCount", fmt.Sprintf("Set to %d", gs.Status.Players.Count))
return nil
}

// NewSDKServerContext returns a Context that cancels when SIGTERM or os.Interrupt
// is received and the GameServer's Status is shutdown
func (s *SDKServer) NewSDKServerContext(ctx context.Context) context.Context {
sdkCtx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
for {
gsState := <-s.gsStateChannel
if gsState == agonesv1.GameServerStateShutdown {
cancel()
}
}
}()
return sdkCtx
}
185 changes: 185 additions & 0 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,6 +1345,191 @@ func TestSDKServerPlayerConnectAndDisconnect(t *testing.T) {
assert.Equal(t, []string{"2"}, list.List)
}

func TestSDKServerGracefulTerminationInterrupt(t *testing.T) {
t.Parallel()
agruntime.FeatureTestMutex.Lock()
defer agruntime.FeatureTestMutex.Unlock()

err := agruntime.ParseFeatures(string(agruntime.FeatureSDKGracefulTermination) + "=true")
require.NoError(t, err, "Can not parse FeatureSDKGracefulTermination feature")

m := agtesting.NewMocks()

m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
gs := agonesv1.GameServer{
ObjectMeta: metav1.ObjectMeta{
Name: "test", Namespace: "default",
},
Spec: agonesv1.GameServerSpec{Health: agonesv1.Health{Disabled: true}},
}
gs.ApplyDefaults()
return true, &agonesv1.GameServerList{Items: []agonesv1.GameServer{gs}}, nil
})
sc, err := defaultSidecar(m)
assert.Nil(t, err)

ctx, cancel := context.WithCancel(context.Background())
sdkCtx := sc.NewSDKServerContext(ctx)
sc.informerFactory.Start(sdkCtx.Done())
assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
err = sc.Run(sdkCtx)
assert.Nil(t, err)
wg.Done()
}()

assertContextCancelled := func(expected error, timeout time.Duration, ctx context.Context) {
select {
case <-ctx.Done():
assert.Equal(t, expected, ctx.Err())
case <-time.After(timeout):
assert.Fail(t, "should have gone to Reserved by now")
}
}

_, err = sc.Ready(sdkCtx, &sdk.Empty{})
require.NoError(t, err)
assert.Equal(t, agonesv1.GameServerStateRequestReady, sc.gsState)
// Mock interruption signal
cancel()
// Assert ctx is cancelled and sdkCtx is not cancelled
assertContextCancelled(context.Canceled, 1*time.Second, ctx)
assert.Nil(t, sdkCtx.Err())
// Assert gs is still requestReady
assert.Equal(t, agonesv1.GameServerStateRequestReady, sc.gsState)
// gs Shutdown
_, err = sc.Shutdown(sdkCtx, &sdk.Empty{})
require.NoError(t, err)
assert.Equal(t, agonesv1.GameServerStateShutdown, sc.gsState)
// Assert sdkCtx is cancelled after shutdown
assertContextCancelled(context.Canceled, 1*time.Second, sdkCtx)
wg.Wait()
}

func TestSDKServerGracefulTerminationShutdown(t *testing.T) {
t.Parallel()
agruntime.FeatureTestMutex.Lock()
defer agruntime.FeatureTestMutex.Unlock()

err := agruntime.ParseFeatures(string(agruntime.FeatureSDKGracefulTermination) + "=true")
require.NoError(t, err, "Can not parse FeatureSDKGracefulTermination feature")

m := agtesting.NewMocks()

m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
gs := agonesv1.GameServer{
ObjectMeta: metav1.ObjectMeta{
Name: "test", Namespace: "default",
},
Spec: agonesv1.GameServerSpec{Health: agonesv1.Health{Disabled: true}},
}
gs.ApplyDefaults()
return true, &agonesv1.GameServerList{Items: []agonesv1.GameServer{gs}}, nil
})

sc, err := defaultSidecar(m)
assert.Nil(t, err)

ctx, cancel := context.WithCancel(context.Background())
sdkCtx := sc.NewSDKServerContext(ctx)
sc.informerFactory.Start(sdkCtx.Done())
assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
err = sc.Run(sdkCtx)
assert.Nil(t, err)
wg.Done()
}()

assertContextCancelled := func(expected error, timeout time.Duration, ctx context.Context) {
select {
case <-ctx.Done():
assert.Equal(t, expected, ctx.Err())
case <-time.After(timeout):
assert.Fail(t, "should have gone to Reserved by now")
}
}

_, err = sc.Ready(sdkCtx, &sdk.Empty{})
require.NoError(t, err)
assert.Equal(t, agonesv1.GameServerStateRequestReady, sc.gsState)
// gs Shutdown
_, err = sc.Shutdown(sdkCtx, &sdk.Empty{})
require.NoError(t, err)
assert.Equal(t, agonesv1.GameServerStateShutdown, sc.gsState)
// assert none of the context have been cancelled
assert.Nil(t, sdkCtx.Err())
assert.Nil(t, ctx.Err())
// Mock interruption signal
cancel()
// Assert ctx is cancelled and sdkCtx is not cancelled
assertContextCancelled(context.Canceled, 2*time.Second, ctx)
assertContextCancelled(context.Canceled, 2*time.Second, sdkCtx)
wg.Wait()
}

func TestSDKServerGracefulTerminationGameServerStateChannel(t *testing.T) {
t.Parallel()
agruntime.FeatureTestMutex.Lock()
defer agruntime.FeatureTestMutex.Unlock()

err := agruntime.ParseFeatures(string(agruntime.FeatureSDKGracefulTermination) + "=true")
require.NoError(t, err, "Can not parse FeatureSDKGracefulTermination feature")

m := agtesting.NewMocks()

m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
gs := agonesv1.GameServer{
ObjectMeta: metav1.ObjectMeta{
Name: "test", Namespace: "default",
},
Spec: agonesv1.GameServerSpec{Health: agonesv1.Health{Disabled: true}},
}
gs.ApplyDefaults()
return true, &agonesv1.GameServerList{Items: []agonesv1.GameServer{gs}}, nil
})

sc, err := defaultSidecar(m)
assert.Nil(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sdkCtx := sc.NewSDKServerContext(ctx)
sc.informerFactory.Start(sdkCtx.Done())
assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))

go func() {
err = sc.Run(sdkCtx)
assert.Nil(t, err)
}()
assertGameServerStateChannel := func(expected agonesv1.GameServerState, timeout time.Duration, gsStateChannel chan agonesv1.GameServerState) {
select {
case current := <-gsStateChannel:
assert.Equal(t, expected, current)
case <-time.After(timeout):
assert.Fail(t, "should have gone to Reserved by now")
}
}
_, err = sc.Ready(sdkCtx, &sdk.Empty{})
require.NoError(t, err)
assert.Equal(t, agonesv1.GameServerStateRequestReady, sc.gsState)
// gs Shutdown
_, err = sc.Shutdown(sdkCtx, &sdk.Empty{})
require.NoError(t, err)
assert.Equal(t, agonesv1.GameServerStateShutdown, sc.gsState)
// assert none of the context have been cancelled
assert.Nil(t, sdkCtx.Err())
assert.Nil(t, ctx.Err())
assertGameServerStateChannel(agonesv1.GameServerStateShutdown, 1*time.Second, sc.gsStateChannel)
}

func defaultSidecar(m agtesting.Mocks) (*SDKServer, error) {
server, err := NewSDKServer("test", "default", m.KubeClient, m.AgonesClient)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/runtime/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ const (

// FeatureCustomFasSyncInterval is a feature flag that enables custom the FleetAutoscaler rsync interval
FeatureCustomFasSyncInterval Feature = "CustomFasSyncInterval"

// FeatureSDKGracefulTermination is a feature flag that enables SDK to support gracefulTermination
FeatureSDKGracefulTermination Feature = "SDKGracefulTermination"
)

var (
Expand All @@ -68,6 +71,7 @@ var (
FeatureStateAllocationFilter: false,
FeaturePlayerAllocationFilter: false,
FeatureCustomFasSyncInterval: false,
FeatureSDKGracefulTermination: false,
}

// featureGates is the storage of what features are enabled
Expand Down
3 changes: 3 additions & 0 deletions site/content/en/docs/Guides/Client SDKs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ from Kubernetes when the backing Pod goes into Termination state.
Be aware that if you use a variation of `System.exit(0)` after calling SDK.Shutdown(), your game server container may
restart for a brief period, inline with our [Health Checking]({{% ref "/docs/Guides/health-checking.md#health-failure-strategy" %}}) policies.

If the `SDKGracefulTermination` alpha feature is enabled, when the SDK server receives the TERM signal before calling SDK.Shutdown(),
the SDK server would stay alive for the period of the terminationGracePeriodSeconds until SDK.Shutdown() has been called

### Configuration Retrieval

#### GameServer()
Expand Down
15 changes: 15 additions & 0 deletions site/content/en/docs/Guides/feature-stages.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ that can be found in the [Helm configuration]({{< ref "/docs/Installation/Instal

The current set of `alpha` and `beta` feature gates are:

{{% feature expiryVersion="1.18.0" %}}
| Feature Name | Gate | Default | Stage | Since |
|--------------|---------|---------|-------|-------|
| Example Gate (not in use) | `Example` | Disabled | None | 0.13.0 |
Expand All @@ -34,6 +35,20 @@ The current set of `alpha` and `beta` feature gates are:
| [Custom resync period for FleetAutoscaler](https://github.com/googleforgames/agones/issues/1955) | `CustomFasSyncInterval` | Disabled | `Alpha` | 1.17.0 |
| [GameServer state filtering on GameServerAllocations](https://github.com/googleforgames/agones/issues/1239) | `StateAllocationFilter` | Disabled | `Alpha` | 1.14.0 |
| [GameServer player capacity filtering on GameServerAllocations](https://github.com/googleforgames/agones/issues/1239) | `PlayerAllocationFilter` | Disabled | `Alpha` | 1.14.0 |
{{% /feature %}}
{{% feature publishVersion="1.18.0" %}}
| Feature Name | Gate | Default | Stage | Since |
|--------------|---------|---------|-------|-------|
| Example Gate (not in use) | `Example` | Disabled | None | 0.13.0 |
| [Player Tracking]({{< ref "/docs/Guides/player-tracking.md" >}}) | `PlayerTracking` | Disabled | `Alpha` | 1.6.0 |
| [SDK Send GameServer on Watch execution]({{< ref "/docs/Guides/Client SDKs/_index.md#watchgameserverfunctiongameserver" >}}) | `SDKWatchSendOnExecute` | Enabled | `Beta` | 1.12.0 |
| Fix for RollingUpdate [Scale down](https://github.com/googleforgames/agones/issues/1625) | `RollingUpdateOnReady` | Enabled | `Beta` | 1.14.0 |
| [Utilize Node ExternalDNS](https://github.com/googleforgames/agones/issues/1921) and additional [details]({{< ref "/docs/FAQ/_index.md" >}}) | `NodeExternalDNS` | Disabled | `Alpha` | 1.12.0 |
| [Custom resync period for FleetAutoscaler](https://github.com/googleforgames/agones/issues/1955) | `CustomFasSyncInterval` | Disabled | `Alpha` | 1.17.0 |
| [GameServer state filtering on GameServerAllocations](https://github.com/googleforgames/agones/issues/1239) | `StateAllocationFilter` | Disabled | `Alpha` | 1.14.0 |
| [GameServer player capacity filtering on GameServerAllocations](https://github.com/googleforgames/agones/issues/1239) | `PlayerAllocationFilter` | Disabled | `Alpha` | 1.14.0 |
| [Graceful Termination for GameServer SDK](https://github.com/googleforgames/agones/pull/2205) | `SDKGracefulTermination` | Disabled | `Alpha` | 1.18.0 |
{{% /feature %}}

## Description of Stages

Expand Down

0 comments on commit af3950f

Please sign in to comment.