diff --git a/actors/actor_system.go b/actors/actor_system.go index 1bbf27d8..191a2199 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -31,11 +31,13 @@ import ( "net" nethttp "net/http" "os" + "os/signal" "regexp" "runtime" "strconv" "strings" "sync" + "syscall" "time" "connectrpc.com/connect" @@ -158,6 +160,13 @@ type ActorSystem interface { Uptime() int64 // Running returns true when the actor system is running Running() bool + // Run starts the actor system, blocks on the signals channel, and then + // gracefully shuts the application down. + // It's designed to make typical applications simple to run. + // All of Run's functionality is implemented in terms of the exported + // Start and Stop methods. Applications with more specialized needs + // can use those methods directly instead of relying on Run. + Run(ctx context.Context, startHook func(ctx context.Context) error, stopHook func(ctx context.Context) error) // handleRemoteAsk handles a synchronous message to another actor and expect a response. // This block until a response is received or timed out. handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error) @@ -347,6 +356,97 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) { return system, nil } +// Run starts the actor system, blocks on the signals channel, and then +// gracefully shuts the application down and terminate the running processing. +// It's designed to make typical applications simple to run. +// The minimal GoAkt application looks like this: +// +// NewActorSystem(name, opts).Run(ctx, startHook, stopHook) +// +// All of Run's functionality is implemented in terms of the exported +// Start and Stop methods. Applications with more specialized needs +// can use those methods directly instead of relying on Run. +func (x *actorSystem) Run(ctx context.Context, startHook func(ctx context.Context) error, stophook func(ctx context.Context) error) { + if err := errorschain. + New(errorschain.ReturnFirst()). + AddError(startHook(ctx)). + AddError(x.Start(ctx)). + Error(); err != nil { + x.logger.Fatal(err) + os.Exit(1) + return + } + + // wait for interruption/termination + notifier := make(chan os.Signal, 1) + done := make(chan types.Unit, 1) + signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM) + // wait for a shutdown signal, and then shutdown + go func() { + sig := <-notifier + x.logger.Infof("received an interrupt signal (%s) to shutdown", sig.String()) + + if err := errorschain. + New(errorschain.ReturnFirst()). + AddError(stophook(ctx)). + AddError(x.Stop(ctx)). + Error(); err != nil { + x.logger.Fatal(err) + } + + signal.Stop(notifier) + done <- types.Unit{} + }() + <-done + pid := os.Getpid() + // make sure if it is unix init process to exit + if pid == 1 { + os.Exit(0) + } + + process, _ := os.FindProcess(pid) + switch { + case runtime.GOOS == "windows": + _ = process.Kill() + default: + _ = process.Signal(syscall.SIGTERM) + } +} + +// Start initializes the actor system. +// To guarantee a clean shutdown during unexpected system terminations, +// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop. +func (x *actorSystem) Start(ctx context.Context) error { + x.logger.Infof("%s actor system starting on %s/%s..", x.name, runtime.GOOS, runtime.GOARCH) + x.started.Store(true) + if err := errorschain. + New(errorschain.ReturnFirst()). + AddError(x.spawnRootGuardian(ctx)). + AddError(x.spawnSystemGuardian(ctx)). + AddError(x.spawnUserGuardian(ctx)). + AddError(x.spawnRebalancer(ctx)). + AddError(x.spawnJanitor(ctx)). + AddError(x.spawnDeadletters(ctx)). + AddError(x.enableRemoting(ctx)). + AddError(x.enableClustering(ctx)). + Error(); err != nil { + // reset the start + x.started.Store(false) + return err + } + + x.scheduler.Start(ctx) + x.startedAt.Store(time.Now().Unix()) + x.logger.Infof("%s actor system successfully started..:)", x.name) + return nil +} + +// Stop stops the actor system and does not terminate the program. +// One needs to explicitly call os.Exit to terminate the program. +func (x *actorSystem) Stop(ctx context.Context) error { + return x.shutdown(ctx) +} + // Running returns true when the actor system is running func (x *actorSystem) Running() bool { return x.started.Load() @@ -761,40 +861,6 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr * return address.From(actor.GetActorAddress()), nil } -// Start initializes the actor system. -// To guarantee a clean shutdown during unexpected system terminations, -// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop. -func (x *actorSystem) Start(ctx context.Context) error { - x.logger.Infof("%s actor system starting on %s/%s..", x.name, runtime.GOOS, runtime.GOARCH) - x.started.Store(true) - if err := errorschain. - New(errorschain.ReturnFirst()). - AddError(x.spawnRootGuardian(ctx)). - AddError(x.spawnSystemGuardian(ctx)). - AddError(x.spawnUserGuardian(ctx)). - AddError(x.spawnRebalancer(ctx)). - AddError(x.spawnJanitor(ctx)). - AddError(x.spawnDeadletters(ctx)). - AddError(x.enableRemoting(ctx)). - AddError(x.enableClustering(ctx)). - Error(); err != nil { - // reset the start - x.started.Store(false) - return err - } - - x.scheduler.Start(ctx) - x.startedAt.Store(time.Now().Unix()) - x.logger.Infof("%s actor system successfully started..:)", x.name) - return nil -} - -// Stop stops the actor system and does not terminate the program. -// One needs to explicitly call os.Exit to terminate the program. -func (x *actorSystem) Stop(ctx context.Context) error { - return x.shutdown(ctx) -} - // RemoteLookup for an actor on a remote host. func (x *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request[internalpb.RemoteLookupRequest]) (*connect.Response[internalpb.RemoteLookupResponse], error) { logger := x.logger diff --git a/actors/mock_test.go b/actors/mock_test.go index 3b50c9e4..67a93ceb 100644 --- a/actors/mock_test.go +++ b/actors/mock_test.go @@ -28,7 +28,6 @@ import ( "context" "encoding/json" "fmt" - "os" "strconv" "sync" "testing" @@ -538,12 +537,3 @@ func extractMessage(bytes []byte) (string, error) { return "", nil } - -func waitForSignals(t *testing.T, ch <-chan os.Signal, sig os.Signal) { - select { - case s := <-ch: - require.Equal(t, s, sig) - case <-time.After(2 * time.Second): - t.Fatalf("timeout waiting for %v", sig) - } -}