From 8400cb905eb74512fb012a4322f5175aad127b4a Mon Sep 17 00:00:00 2001 From: Tochemey Date: Mon, 23 Dec 2024 23:00:35 +0000 Subject: [PATCH 1/2] feat: add Run method for some specific use cases --- actors/actor_system.go | 134 ++++++++++++++++++++++++++++++----------- actors/mock_test.go | 10 --- 2 files changed, 100 insertions(+), 44 deletions(-) diff --git a/actors/actor_system.go b/actors/actor_system.go index 1bbf27d8..eb67b4c8 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -31,11 +31,13 @@ import ( "net" nethttp "net/http" "os" + "os/signal" "regexp" "runtime" "strconv" "strings" "sync" + "syscall" "time" "connectrpc.com/connect" @@ -158,6 +160,13 @@ type ActorSystem interface { Uptime() int64 // Running returns true when the actor system is running Running() bool + // Run starts the actor system, blocks on the signals channel, and then + // gracefully shuts the application down. + // It's designed to make typical applications simple to run. + // All of Run's functionality is implemented in terms of the exported + // Start and Stop methods. Applications with more specialized needs + // can use those methods directly instead of relying on Run. + Run(ctx context.Context, hooks ...func() error) // handleRemoteAsk handles a synchronous message to another actor and expect a response. // This block until a response is received or timed out. handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) @@ -347,6 +356,97 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) { return system, nil } +// Run starts the actor system, blocks on the signals channel, and then +// gracefully shuts the application down and terminate the running processing. +// It's designed to make typical applications simple to run. +// The minimal GoAkt application looks like this: +// +// NewActorSystem(name).Run(ctx) +// +// All of Run's functionality is implemented in terms of the exported +// Start and Stop methods. Applications with more specialized needs +// can use those methods directly instead of relying on Run. +func (x *actorSystem) Run(ctx context.Context, hooks ...func() error) { + if err := x.Start(ctx); err != nil { + x.logger.Fatal(err) + os.Exit(1) + return + } + + // wait for interruption/termination + notifier := make(chan os.Signal, 1) + done := make(chan types.Unit, 1) + signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM) + // wait for a shutdown signal, and then shutdown + go func() { + sig := <-notifier + x.logger.Infof("received an interrupt signal (%s) to shutdown", sig.String()) + errs := make([]error, len(hooks)) + for i, hook := range hooks { + errs[i] = hook() + } + + if err := errorschain. + New(errorschain.ReturnFirst()). + AddErrors(errs...). + AddError(x.Stop(ctx)). + Error(); err != nil { + x.logger.Fatal(err) + } + + signal.Stop(notifier) + done <- types.Unit{} + }() + <-done + pid := os.Getpid() + // make sure if it is unix init process to exit + if pid == 1 { + os.Exit(0) + } + + process, _ := os.FindProcess(pid) + switch { + case runtime.GOOS == "windows": + _ = process.Kill() + default: + _ = process.Signal(syscall.SIGTERM) + } +} + +// Start initializes the actor system. +// To guarantee a clean shutdown during unexpected system terminations, +// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop. +func (x *actorSystem) Start(ctx context.Context) error { + x.logger.Infof("%s actor system starting on %s/%s..", x.name, runtime.GOOS, runtime.GOARCH) + x.started.Store(true) + if err := errorschain. + New(errorschain.ReturnFirst()). + AddError(x.spawnRootGuardian(ctx)). + AddError(x.spawnSystemGuardian(ctx)). + AddError(x.spawnUserGuardian(ctx)). + AddError(x.spawnRebalancer(ctx)). + AddError(x.spawnJanitor(ctx)). + AddError(x.spawnDeadletters(ctx)). + AddError(x.enableRemoting(ctx)). + AddError(x.enableClustering(ctx)). + Error(); err != nil { + // reset the start + x.started.Store(false) + return err + } + + x.scheduler.Start(ctx) + x.startedAt.Store(time.Now().Unix()) + x.logger.Infof("%s actor system successfully started..:)", x.name) + return nil +} + +// Stop stops the actor system and does not terminate the program. +// One needs to explicitly call os.Exit to terminate the program. +func (x *actorSystem) Stop(ctx context.Context) error { + return x.shutdown(ctx) +} + // Running returns true when the actor system is running func (x *actorSystem) Running() bool { return x.started.Load() @@ -761,40 +861,6 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr * return address.From(actor.GetActorAddress()), nil } -// Start initializes the actor system. -// To guarantee a clean shutdown during unexpected system terminations, -// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop. -func (x *actorSystem) Start(ctx context.Context) error { - x.logger.Infof("%s actor system starting on %s/%s..", x.name, runtime.GOOS, runtime.GOARCH) - x.started.Store(true) - if err := errorschain. - New(errorschain.ReturnFirst()). - AddError(x.spawnRootGuardian(ctx)). - AddError(x.spawnSystemGuardian(ctx)). - AddError(x.spawnUserGuardian(ctx)). - AddError(x.spawnRebalancer(ctx)). - AddError(x.spawnJanitor(ctx)). - AddError(x.spawnDeadletters(ctx)). - AddError(x.enableRemoting(ctx)). - AddError(x.enableClustering(ctx)). - Error(); err != nil { - // reset the start - x.started.Store(false) - return err - } - - x.scheduler.Start(ctx) - x.startedAt.Store(time.Now().Unix()) - x.logger.Infof("%s actor system successfully started..:)", x.name) - return nil -} - -// Stop stops the actor system and does not terminate the program. -// One needs to explicitly call os.Exit to terminate the program. -func (x *actorSystem) Stop(ctx context.Context) error { - return x.shutdown(ctx) -} - // RemoteLookup for an actor on a remote host. func (x *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request[internalpb.RemoteLookupRequest]) (*connect.Response[internalpb.RemoteLookupResponse], error) { logger := x.logger diff --git a/actors/mock_test.go b/actors/mock_test.go index 3b50c9e4..67a93ceb 100644 --- a/actors/mock_test.go +++ b/actors/mock_test.go @@ -28,7 +28,6 @@ import ( "context" "encoding/json" "fmt" - "os" "strconv" "sync" "testing" @@ -538,12 +537,3 @@ func extractMessage(bytes []byte) (string, error) { return "", nil } - -func waitForSignals(t *testing.T, ch <-chan os.Signal, sig os.Signal) { - select { - case s := <-ch: - require.Equal(t, s, sig) - case <-time.After(2 * time.Second): - t.Fatalf("timeout waiting for %v", sig) - } -} From 6177689264b036aef97ab695d0f43da7acf8c7f0 Mon Sep 17 00:00:00 2001 From: Tochemey Date: Tue, 24 Dec 2024 10:19:32 +0000 Subject: [PATCH 2/2] feat: add Run method for some specific use cases --- actors/actor_system.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/actors/actor_system.go b/actors/actor_system.go index eb67b4c8..191a2199 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -166,7 +166,7 @@ type ActorSystem interface { // All of Run's functionality is implemented in terms of the exported // Start and Stop methods. Applications with more specialized needs // can use those methods directly instead of relying on Run. - Run(ctx context.Context, hooks ...func() error) + Run(ctx context.Context, startHook func(ctx context.Context) error, stopHook func(ctx context.Context) error) // handleRemoteAsk handles a synchronous message to another actor and expect a response. // This block until a response is received or timed out. handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) @@ -361,13 +361,17 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) { // It's designed to make typical applications simple to run. // The minimal GoAkt application looks like this: // -// NewActorSystem(name).Run(ctx) +// NewActorSystem(name, opts).Run(ctx, startHook, stopHook) // // All of Run's functionality is implemented in terms of the exported // Start and Stop methods. Applications with more specialized needs // can use those methods directly instead of relying on Run. -func (x *actorSystem) Run(ctx context.Context, hooks ...func() error) { - if err := x.Start(ctx); err != nil { +func (x *actorSystem) Run(ctx context.Context, startHook func(ctx context.Context) error, stophook func(ctx context.Context) error) { + if err := errorschain. + New(errorschain.ReturnFirst()). + AddError(startHook(ctx)). + AddError(x.Start(ctx)). + Error(); err != nil { x.logger.Fatal(err) os.Exit(1) return @@ -381,14 +385,10 @@ func (x *actorSystem) Run(ctx context.Context, hooks ...func() error) { go func() { sig := <-notifier x.logger.Infof("received an interrupt signal (%s) to shutdown", sig.String()) - errs := make([]error, len(hooks)) - for i, hook := range hooks { - errs[i] = hook() - } if err := errorschain. New(errorschain.ReturnFirst()). - AddErrors(errs...). + AddError(stophook(ctx)). AddError(x.Stop(ctx)). Error(); err != nil { x.logger.Fatal(err)