Skip to content

Commit

Permalink
runtime keep alive
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 9, 2024
1 parent e9669b1 commit e9b810e
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 52 deletions.
1 change: 1 addition & 0 deletions cmd/wasmserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func main() {
}
c.RegisterKind(actrs.KindRuntime, actrs.NewRuntime(store, modCache), &cluster.KindConfig{})
c.Engine().Spawn(actrs.NewMetric, actrs.KindMetric, actor.WithID("1"))
c.Engine().Spawn(actrs.NewRuntimeManager(c), actrs.KindRuntimeManager, actor.WithID("1"))
c.Start()

server := actrs.NewWasmServer(
Expand Down
121 changes: 84 additions & 37 deletions internal/actrs/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,42 @@ import (
"fmt"
"log/slog"
"net/http"
"os"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/raptor/internal/runtime"
"github.com/anthdm/raptor/internal/shared"
"github.com/anthdm/raptor/internal/spidermonkey"
"github.com/anthdm/raptor/internal/storage"
"github.com/anthdm/raptor/internal/types"
"github.com/anthdm/raptor/proto"
"github.com/google/uuid"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"

prot "google.golang.org/protobuf/proto"
)

const KindRuntime = "runtime"

var (
runtimeKeepAlive = time.Second
)

type shutdown struct{}

// Runtime is an actor that can execute compiled WASM blobs in a distributed cluster.
type Runtime struct {
store storage.Store
cache storage.ModCacher
started time.Time
deployID uuid.UUID
store storage.Store
cache storage.ModCacher
started time.Time
deploymentID uuid.UUID

managerPID *actor.PID
runtime wazero.Runtime
mod wazero.CompiledModule
blob []byte
repeat actor.SendRepeater
}

func NewRuntime(store storage.Store, cache storage.ModCacher) actor.Producer {
Expand All @@ -44,29 +57,70 @@ func NewRuntime(store storage.Store, cache storage.ModCacher) actor.Producer {
func (r *Runtime) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case actor.Started:
r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive)
r.started = time.Now()
r.managerPID = c.Engine().Registry.GetPID(KindRuntimeManager, "1")
case actor.Stopped:
c.Send(r.managerPID, removeRuntime{key: r.deploymentID.String()})
r.runtime.Close(context.Background())
// Releasing this mod will invalidate the cache for some reason.
// r.mod.Close(context.TODO())
case *proto.HTTPRequest:
// Refresh the keepAlive timer
r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive)
if r.runtime == nil {
r.initialize(msg)
}
// Handle the HTTP request that is forwarded from the WASM server actor.
r.handleHTTPRequest(c, msg)

case shutdown:
c.Engine().Poison(c.PID())
}
}

func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest) {
r.deployID = uuid.MustParse(msg.DeploymentID)
deploy, err := r.store.GetDeployment(r.deployID)
func (r *Runtime) initialize(msg *proto.HTTPRequest) error {
ctx := context.Background()
r.deploymentID = uuid.MustParse(msg.DeploymentID)

// TODO: this could be coming from a Redis cache instead of Postres.
// Maybe only the blob. Not sure...
deploy, err := r.store.GetDeployment(r.deploymentID)
if err != nil {
slog.Warn("runtime could not find deploy from store", "err", err, "id", r.deployID)
respondError(ctx, http.StatusInternalServerError, "internal server error", msg.ID)
return
slog.Warn("runtime could not find deploy from store", "err", err, "id", r.deploymentID)
return fmt.Errorf("runtime: could not find deployment (%s)", r.deploymentID)
}
r.blob = deploy.Blob // can be optimized

modCache, ok := r.cache.Get(deploy.EndpointID)
modCache, ok := r.cache.Get(r.deploymentID)
if !ok {
slog.Warn("no cache hit", "endpoint", r.deploymentID)
modCache = wazero.NewCompilationCache()
slog.Warn("no cache hit", "endpoint", deploy.EndpointID)
}

config := wazero.NewRuntimeConfigCompiler().WithCompilationCache(modCache)
r.runtime = wazero.NewRuntimeWithConfig(ctx, config)
wasi_snapshot_preview1.MustInstantiate(ctx, r.runtime)

var blob []byte
if msg.Runtime == "js" {
blob = spidermonkey.WasmBlob
} else if msg.Runtime == "go" {
blob = deploy.Blob
}

mod, err := r.runtime.CompileModule(ctx, blob)
if err != nil {
return fmt.Errorf("failed to compile module: %s", err)
}

r.cache.Put(deploy.ID, modCache)

r.mod = mod
return nil
}

func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest) {
b, err := prot.Marshal(msg)
if err != nil {
slog.Warn("failed to marshal incoming HTTP request", "err", err)
Expand All @@ -75,25 +129,22 @@ func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest)
}

in := bytes.NewReader(b)
out := &bytes.Buffer{}
args := runtime.InvokeArgs{
Env: msg.Env,
In: in,
Out: out,
Cache: modCache,
}
out := &bytes.Buffer{} // TODO: pool this bad boy

switch msg.Runtime {
case "go":
args.Blob = deploy.Blob
case "js":
args.Blob = spidermonkey.WasmBlob
args.Args = []string{"", "-e", string(deploy.Blob)}
default:
err = fmt.Errorf("invalid runtime: %s", msg.Runtime)
args := []string{}
if msg.Runtime == "js" {
args = []string{"", "-e", string(r.blob)}
}

err = runtime.Invoke(context.Background(), args)
modConf := wazero.NewModuleConfig().
WithStdin(in).
WithStdout(out).
WithStderr(os.Stderr).
WithArgs(args...)
for k, v := range msg.Env {
modConf = modConf.WithEnv(k, v)
}
_, err = r.runtime.InstantiateModule(context.Background(), r.mod, modConf)
if err != nil {
slog.Error("runtime invoke error", "err", err)
respondError(ctx, http.StatusInternalServerError, "internal server error", msg.ID)
Expand All @@ -113,20 +164,16 @@ func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest)

ctx.Respond(resp)

r.cache.Put(deploy.EndpointID, modCache)

ctx.Engine().Poison(ctx.PID())

// only send metrics when its a request on LIVE
if !msg.Preview {
metric := types.RuntimeMetric{
ID: uuid.New(),
StartTime: r.started,
Duration: time.Since(r.started),
DeploymentID: deploy.ID,
EndpointID: deploy.EndpointID,
RequestURL: msg.URL,
StatusCode: status,
DeploymentID: r.deploymentID,
// EndpointID: deploy.EndpointID,
RequestURL: msg.URL,
StatusCode: status,
}
pid := ctx.Engine().Registry.GetPID(KindMetric, "1")
ctx.Send(pid, metric)
Expand Down
61 changes: 61 additions & 0 deletions internal/actrs/runtime_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package actrs

import (
"fmt"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/cluster"
)

const KindRuntimeManager = "runtime_manager"

type requestRuntime struct {
key string
}

type addRuntime struct {
key string
pid *actor.PID
}

type removeRuntime struct {
key string
}

type RuntimeManager struct {
runtimes map[string]*actor.PID
cluster *cluster.Cluster
}

func NewRuntimeManager(c *cluster.Cluster) actor.Producer {
return func() actor.Receiver {
return &RuntimeManager{
runtimes: make(map[string]*actor.PID),
cluster: c,
}
}
}

func (rm *RuntimeManager) Receive(c *actor.Context) {
switch msg := c.Message().(type) {
case requestRuntime:
pid := rm.runtimes[msg.key]
if pid == nil {
fmt.Println("runtime cold")
pid = rm.cluster.Activate(KindRuntime, cluster.NewActivationConfig())
rm.runtimes[msg.key] = pid
} else {
fmt.Println("runtime warm")
}
c.Respond(pid)
case addRuntime:
fmt.Println("adding runtime")
rm.runtimes[msg.key] = msg.pid
case removeRuntime:
delete(rm.runtimes, msg.key)
fmt.Println("removing runtime")
case actor.Started:
case actor.Stopped:
_ = msg
}
}
47 changes: 32 additions & 15 deletions internal/actrs/wasmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package actrs

import (
"log"
"log/slog"
"net/http"
"strings"
"time"

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/cluster"
Expand All @@ -29,24 +31,26 @@ func newRequestWithResponse(request *proto.HTTPRequest) requestWithResponse {

// WasmServer is an HTTP server that will proxy and route the request to the corresponding function.
type WasmServer struct {
server *http.Server
self *actor.PID
store storage.Store
metricStore storage.MetricStore
cache storage.ModCacher
cluster *cluster.Cluster
responses map[string]chan *proto.HTTPResponse
server *http.Server
self *actor.PID
store storage.Store
metricStore storage.MetricStore
cache storage.ModCacher
cluster *cluster.Cluster
responses map[string]chan *proto.HTTPResponse
runtimeManager *actor.PID
}

// NewWasmServer return a new wasm server given a storage and a mod cache.
func NewWasmServer(addr string, cluster *cluster.Cluster, store storage.Store, metricStore storage.MetricStore, cache storage.ModCacher) actor.Producer {
return func() actor.Receiver {
s := &WasmServer{
store: store,
metricStore: metricStore,
cache: cache,
cluster: cluster,
responses: make(map[string]chan *proto.HTTPResponse),
store: store,
metricStore: metricStore,
cache: cache,
cluster: cluster,
responses: make(map[string]chan *proto.HTTPResponse),
runtimeManager: cluster.Engine().Registry.GetPID(KindRuntimeManager, "1"),
}
server := &http.Server{
Handler: s,
Expand All @@ -64,7 +68,8 @@ func (s *WasmServer) Receive(c *actor.Context) {
case actor.Stopped:
case requestWithResponse:
s.responses[msg.request.ID] = msg.response
s.sendRequestToRuntime(msg.request)
pid := s.requestRuntime(c, msg.request.DeploymentID)
s.cluster.Engine().SendWithSender(pid, msg.request, s.self)
case *proto.HTTPResponse:
if resp, ok := s.responses[msg.RequestID]; ok {
resp <- msg
Expand All @@ -80,9 +85,21 @@ func (s *WasmServer) initialize(c *actor.Context) {
}()
}

func (s *WasmServer) requestRuntime(c *actor.Context, key string) *actor.PID {
res, err := c.Request(s.runtimeManager, requestRuntime{
key: key,
}, time.Millisecond*5).Result()
if err != nil {
slog.Warn("runtime manager response failed", "err", err)
}
pid, ok := res.(*actor.PID)
if !ok {
slog.Warn("runtime manager responded with a non *actor.PID")
}
return pid
}

func (s *WasmServer) sendRequestToRuntime(req *proto.HTTPRequest) {
pid := s.cluster.Activate(KindRuntime, cluster.NewActivationConfig())
s.cluster.Engine().SendWithSender(pid, req, s.self)
}

// TODO(anthdm): Handle the favicon.ico
Expand Down

0 comments on commit e9b810e

Please sign in to comment.