Skip to content

Commit

Permalink
feat: add Run method for some specific use cases (#585)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Dec 24, 2024
1 parent 243f3fa commit 0822a6b
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 44 deletions.
134 changes: 100 additions & 34 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
"net"
nethttp "net/http"
"os"
"os/signal"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"

"connectrpc.com/connect"
Expand Down Expand Up @@ -158,6 +160,13 @@ type ActorSystem interface {
Uptime() int64
// Running returns true when the actor system is running
Running() bool
// Run starts the actor system, blocks on the signals channel, and then
// gracefully shuts the application down.
// It's designed to make typical applications simple to run.
// All of Run's functionality is implemented in terms of the exported
// Start and Stop methods. Applications with more specialized needs
// can use those methods directly instead of relying on Run.
Run(ctx context.Context, startHook func(ctx context.Context) error, stopHook func(ctx context.Context) error)
// handleRemoteAsk handles a synchronous message to another actor and expect a response.
// This block until a response is received or timed out.
handleRemoteAsk(ctx context.Context, to *PID, message proto.Message, timeout time.Duration) (response proto.Message, err error)
Expand Down Expand Up @@ -347,6 +356,97 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
return system, nil
}

// Run starts the actor system, blocks on the signals channel, and then
// gracefully shuts the application down and terminate the running processing.
// It's designed to make typical applications simple to run.
// The minimal GoAkt application looks like this:
//
// NewActorSystem(name, opts).Run(ctx, startHook, stopHook)
//
// All of Run's functionality is implemented in terms of the exported
// Start and Stop methods. Applications with more specialized needs
// can use those methods directly instead of relying on Run.
func (x *actorSystem) Run(ctx context.Context, startHook func(ctx context.Context) error, stophook func(ctx context.Context) error) {
if err := errorschain.
New(errorschain.ReturnFirst()).
AddError(startHook(ctx)).
AddError(x.Start(ctx)).
Error(); err != nil {
x.logger.Fatal(err)
os.Exit(1)
return
}

// wait for interruption/termination
notifier := make(chan os.Signal, 1)
done := make(chan types.Unit, 1)
signal.Notify(notifier, syscall.SIGINT, syscall.SIGTERM)
// wait for a shutdown signal, and then shutdown
go func() {
sig := <-notifier
x.logger.Infof("received an interrupt signal (%s) to shutdown", sig.String())

if err := errorschain.
New(errorschain.ReturnFirst()).
AddError(stophook(ctx)).
AddError(x.Stop(ctx)).
Error(); err != nil {
x.logger.Fatal(err)
}

signal.Stop(notifier)
done <- types.Unit{}
}()
<-done
pid := os.Getpid()
// make sure if it is unix init process to exit
if pid == 1 {
os.Exit(0)
}

process, _ := os.FindProcess(pid)
switch {
case runtime.GOOS == "windows":
_ = process.Kill()
default:
_ = process.Signal(syscall.SIGTERM)
}
}

// Start initializes the actor system.
// To guarantee a clean shutdown during unexpected system terminations,
// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop.
func (x *actorSystem) Start(ctx context.Context) error {
x.logger.Infof("%s actor system starting on %s/%s..", x.name, runtime.GOOS, runtime.GOARCH)
x.started.Store(true)
if err := errorschain.
New(errorschain.ReturnFirst()).
AddError(x.spawnRootGuardian(ctx)).
AddError(x.spawnSystemGuardian(ctx)).
AddError(x.spawnUserGuardian(ctx)).
AddError(x.spawnRebalancer(ctx)).
AddError(x.spawnJanitor(ctx)).
AddError(x.spawnDeadletters(ctx)).
AddError(x.enableRemoting(ctx)).
AddError(x.enableClustering(ctx)).
Error(); err != nil {
// reset the start
x.started.Store(false)
return err
}

x.scheduler.Start(ctx)
x.startedAt.Store(time.Now().Unix())
x.logger.Infof("%s actor system successfully started..:)", x.name)
return nil
}

// Stop stops the actor system and does not terminate the program.
// One needs to explicitly call os.Exit to terminate the program.
func (x *actorSystem) Stop(ctx context.Context) error {
return x.shutdown(ctx)
}

// Running returns true when the actor system is running
func (x *actorSystem) Running() bool {
return x.started.Load()
Expand Down Expand Up @@ -761,40 +861,6 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *
return address.From(actor.GetActorAddress()), nil
}

// Start initializes the actor system.
// To guarantee a clean shutdown during unexpected system terminations,
// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop.
func (x *actorSystem) Start(ctx context.Context) error {
x.logger.Infof("%s actor system starting on %s/%s..", x.name, runtime.GOOS, runtime.GOARCH)
x.started.Store(true)
if err := errorschain.
New(errorschain.ReturnFirst()).
AddError(x.spawnRootGuardian(ctx)).
AddError(x.spawnSystemGuardian(ctx)).
AddError(x.spawnUserGuardian(ctx)).
AddError(x.spawnRebalancer(ctx)).
AddError(x.spawnJanitor(ctx)).
AddError(x.spawnDeadletters(ctx)).
AddError(x.enableRemoting(ctx)).
AddError(x.enableClustering(ctx)).
Error(); err != nil {
// reset the start
x.started.Store(false)
return err
}

x.scheduler.Start(ctx)
x.startedAt.Store(time.Now().Unix())
x.logger.Infof("%s actor system successfully started..:)", x.name)
return nil
}

// Stop stops the actor system and does not terminate the program.
// One needs to explicitly call os.Exit to terminate the program.
func (x *actorSystem) Stop(ctx context.Context) error {
return x.shutdown(ctx)
}

// RemoteLookup for an actor on a remote host.
func (x *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request[internalpb.RemoteLookupRequest]) (*connect.Response[internalpb.RemoteLookupResponse], error) {
logger := x.logger
Expand Down
10 changes: 0 additions & 10 deletions actors/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -538,12 +537,3 @@ func extractMessage(bytes []byte) (string, error) {

return "", nil
}

func waitForSignals(t *testing.T, ch <-chan os.Signal, sig os.Signal) {
select {
case s := <-ch:
require.Equal(t, s, sig)
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for %v", sig)
}
}

0 comments on commit 0822a6b

Please sign in to comment.