diff --git a/cmd/wasmserver/main.go b/cmd/wasmserver/main.go index da93362..1181ae1 100644 --- a/cmd/wasmserver/main.go +++ b/cmd/wasmserver/main.go @@ -53,6 +53,7 @@ func main() { } c.RegisterKind(actrs.KindRuntime, actrs.NewRuntime(store, modCache), &cluster.KindConfig{}) c.Engine().Spawn(actrs.NewMetric, actrs.KindMetric, actor.WithID("1")) + c.Engine().Spawn(actrs.NewRuntimeManager(c), actrs.KindRuntimeManager, actor.WithID("1")) c.Start() server := actrs.NewWasmServer( diff --git a/internal/actrs/runtime.go b/internal/actrs/runtime.go index 923a772..0e51458 100644 --- a/internal/actrs/runtime.go +++ b/internal/actrs/runtime.go @@ -7,10 +7,10 @@ import ( "fmt" "log/slog" "net/http" + "os" "time" "github.com/anthdm/hollywood/actor" - "github.com/anthdm/raptor/internal/runtime" "github.com/anthdm/raptor/internal/shared" "github.com/anthdm/raptor/internal/spidermonkey" "github.com/anthdm/raptor/internal/storage" @@ -18,18 +18,31 @@ import ( "github.com/anthdm/raptor/proto" "github.com/google/uuid" "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" prot "google.golang.org/protobuf/proto" ) const KindRuntime = "runtime" +var ( + runtimeKeepAlive = time.Second +) + +type shutdown struct{} + // Runtime is an actor that can execute compiled WASM blobs in a distributed cluster. type Runtime struct { - store storage.Store - cache storage.ModCacher - started time.Time - deployID uuid.UUID + store storage.Store + cache storage.ModCacher + started time.Time + deploymentID uuid.UUID + + managerPID *actor.PID + runtime wazero.Runtime + mod wazero.CompiledModule + blob []byte + repeat actor.SendRepeater } func NewRuntime(store storage.Store, cache storage.ModCacher) actor.Producer { @@ -44,29 +57,70 @@ func NewRuntime(store storage.Store, cache storage.ModCacher) actor.Producer { func (r *Runtime) Receive(c *actor.Context) { switch msg := c.Message().(type) { case actor.Started: + r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive) r.started = time.Now() + r.managerPID = c.Engine().Registry.GetPID(KindRuntimeManager, "1") case actor.Stopped: + c.Send(r.managerPID, removeRuntime{key: r.deploymentID.String()}) + r.runtime.Close(context.Background()) + // Releasing this mod will invalidate the cache for some reason. + // r.mod.Close(context.TODO()) case *proto.HTTPRequest: + // Refresh the keepAlive timer + r.repeat = c.SendRepeat(c.PID(), shutdown{}, runtimeKeepAlive) + if r.runtime == nil { + r.initialize(msg) + } // Handle the HTTP request that is forwarded from the WASM server actor. r.handleHTTPRequest(c, msg) + + case shutdown: + c.Engine().Poison(c.PID()) } } -func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest) { - r.deployID = uuid.MustParse(msg.DeploymentID) - deploy, err := r.store.GetDeployment(r.deployID) +func (r *Runtime) initialize(msg *proto.HTTPRequest) error { + ctx := context.Background() + r.deploymentID = uuid.MustParse(msg.DeploymentID) + + // TODO: this could be coming from a Redis cache instead of Postres. + // Maybe only the blob. Not sure... + deploy, err := r.store.GetDeployment(r.deploymentID) if err != nil { - slog.Warn("runtime could not find deploy from store", "err", err, "id", r.deployID) - respondError(ctx, http.StatusInternalServerError, "internal server error", msg.ID) - return + slog.Warn("runtime could not find deploy from store", "err", err, "id", r.deploymentID) + return fmt.Errorf("runtime: could not find deployment (%s)", r.deploymentID) } + r.blob = deploy.Blob // can be optimized - modCache, ok := r.cache.Get(deploy.EndpointID) + modCache, ok := r.cache.Get(r.deploymentID) if !ok { + slog.Warn("no cache hit", "endpoint", r.deploymentID) modCache = wazero.NewCompilationCache() - slog.Warn("no cache hit", "endpoint", deploy.EndpointID) } + config := wazero.NewRuntimeConfigCompiler().WithCompilationCache(modCache) + r.runtime = wazero.NewRuntimeWithConfig(ctx, config) + wasi_snapshot_preview1.MustInstantiate(ctx, r.runtime) + + var blob []byte + if msg.Runtime == "js" { + blob = spidermonkey.WasmBlob + } else if msg.Runtime == "go" { + blob = deploy.Blob + } + + mod, err := r.runtime.CompileModule(ctx, blob) + if err != nil { + return fmt.Errorf("failed to compile module: %s", err) + } + + r.cache.Put(deploy.ID, modCache) + + r.mod = mod + return nil +} + +func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest) { b, err := prot.Marshal(msg) if err != nil { slog.Warn("failed to marshal incoming HTTP request", "err", err) @@ -75,25 +129,22 @@ func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest) } in := bytes.NewReader(b) - out := &bytes.Buffer{} - args := runtime.InvokeArgs{ - Env: msg.Env, - In: in, - Out: out, - Cache: modCache, - } + out := &bytes.Buffer{} // TODO: pool this bad boy - switch msg.Runtime { - case "go": - args.Blob = deploy.Blob - case "js": - args.Blob = spidermonkey.WasmBlob - args.Args = []string{"", "-e", string(deploy.Blob)} - default: - err = fmt.Errorf("invalid runtime: %s", msg.Runtime) + args := []string{} + if msg.Runtime == "js" { + args = []string{"", "-e", string(r.blob)} } - err = runtime.Invoke(context.Background(), args) + modConf := wazero.NewModuleConfig(). + WithStdin(in). + WithStdout(out). + WithStderr(os.Stderr). + WithArgs(args...) + for k, v := range msg.Env { + modConf = modConf.WithEnv(k, v) + } + _, err = r.runtime.InstantiateModule(context.Background(), r.mod, modConf) if err != nil { slog.Error("runtime invoke error", "err", err) respondError(ctx, http.StatusInternalServerError, "internal server error", msg.ID) @@ -113,20 +164,16 @@ func (r *Runtime) handleHTTPRequest(ctx *actor.Context, msg *proto.HTTPRequest) ctx.Respond(resp) - r.cache.Put(deploy.EndpointID, modCache) - - ctx.Engine().Poison(ctx.PID()) - // only send metrics when its a request on LIVE if !msg.Preview { metric := types.RuntimeMetric{ ID: uuid.New(), StartTime: r.started, Duration: time.Since(r.started), - DeploymentID: deploy.ID, - EndpointID: deploy.EndpointID, - RequestURL: msg.URL, - StatusCode: status, + DeploymentID: r.deploymentID, + // EndpointID: deploy.EndpointID, + RequestURL: msg.URL, + StatusCode: status, } pid := ctx.Engine().Registry.GetPID(KindMetric, "1") ctx.Send(pid, metric) diff --git a/internal/actrs/runtime_manager.go b/internal/actrs/runtime_manager.go new file mode 100644 index 0000000..1a5c057 --- /dev/null +++ b/internal/actrs/runtime_manager.go @@ -0,0 +1,61 @@ +package actrs + +import ( + "fmt" + + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/cluster" +) + +const KindRuntimeManager = "runtime_manager" + +type requestRuntime struct { + key string +} + +type addRuntime struct { + key string + pid *actor.PID +} + +type removeRuntime struct { + key string +} + +type RuntimeManager struct { + runtimes map[string]*actor.PID + cluster *cluster.Cluster +} + +func NewRuntimeManager(c *cluster.Cluster) actor.Producer { + return func() actor.Receiver { + return &RuntimeManager{ + runtimes: make(map[string]*actor.PID), + cluster: c, + } + } +} + +func (rm *RuntimeManager) Receive(c *actor.Context) { + switch msg := c.Message().(type) { + case requestRuntime: + pid := rm.runtimes[msg.key] + if pid == nil { + fmt.Println("runtime cold") + pid = rm.cluster.Activate(KindRuntime, cluster.NewActivationConfig()) + rm.runtimes[msg.key] = pid + } else { + fmt.Println("runtime warm") + } + c.Respond(pid) + case addRuntime: + fmt.Println("adding runtime") + rm.runtimes[msg.key] = msg.pid + case removeRuntime: + delete(rm.runtimes, msg.key) + fmt.Println("removing runtime") + case actor.Started: + case actor.Stopped: + _ = msg + } +} diff --git a/internal/actrs/wasmserver.go b/internal/actrs/wasmserver.go index dd715e7..07ce930 100644 --- a/internal/actrs/wasmserver.go +++ b/internal/actrs/wasmserver.go @@ -2,8 +2,10 @@ package actrs import ( "log" + "log/slog" "net/http" "strings" + "time" "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/cluster" @@ -29,24 +31,26 @@ func newRequestWithResponse(request *proto.HTTPRequest) requestWithResponse { // WasmServer is an HTTP server that will proxy and route the request to the corresponding function. type WasmServer struct { - server *http.Server - self *actor.PID - store storage.Store - metricStore storage.MetricStore - cache storage.ModCacher - cluster *cluster.Cluster - responses map[string]chan *proto.HTTPResponse + server *http.Server + self *actor.PID + store storage.Store + metricStore storage.MetricStore + cache storage.ModCacher + cluster *cluster.Cluster + responses map[string]chan *proto.HTTPResponse + runtimeManager *actor.PID } // NewWasmServer return a new wasm server given a storage and a mod cache. func NewWasmServer(addr string, cluster *cluster.Cluster, store storage.Store, metricStore storage.MetricStore, cache storage.ModCacher) actor.Producer { return func() actor.Receiver { s := &WasmServer{ - store: store, - metricStore: metricStore, - cache: cache, - cluster: cluster, - responses: make(map[string]chan *proto.HTTPResponse), + store: store, + metricStore: metricStore, + cache: cache, + cluster: cluster, + responses: make(map[string]chan *proto.HTTPResponse), + runtimeManager: cluster.Engine().Registry.GetPID(KindRuntimeManager, "1"), } server := &http.Server{ Handler: s, @@ -64,7 +68,8 @@ func (s *WasmServer) Receive(c *actor.Context) { case actor.Stopped: case requestWithResponse: s.responses[msg.request.ID] = msg.response - s.sendRequestToRuntime(msg.request) + pid := s.requestRuntime(c, msg.request.DeploymentID) + s.cluster.Engine().SendWithSender(pid, msg.request, s.self) case *proto.HTTPResponse: if resp, ok := s.responses[msg.RequestID]; ok { resp <- msg @@ -80,9 +85,21 @@ func (s *WasmServer) initialize(c *actor.Context) { }() } +func (s *WasmServer) requestRuntime(c *actor.Context, key string) *actor.PID { + res, err := c.Request(s.runtimeManager, requestRuntime{ + key: key, + }, time.Millisecond*5).Result() + if err != nil { + slog.Warn("runtime manager response failed", "err", err) + } + pid, ok := res.(*actor.PID) + if !ok { + slog.Warn("runtime manager responded with a non *actor.PID") + } + return pid +} + func (s *WasmServer) sendRequestToRuntime(req *proto.HTTPRequest) { - pid := s.cluster.Activate(KindRuntime, cluster.NewActivationConfig()) - s.cluster.Engine().SendWithSender(pid, req, s.self) } // TODO(anthdm): Handle the favicon.ico