Skip to content

Commit

Permalink
fixed modcache
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 1, 2024
1 parent ad6d96c commit d99df51
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 50 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
bin
ffaas.toml
.db
config.toml
.db
.modcache
34 changes: 8 additions & 26 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func main() {
seed bool
)
flagSet := flag.NewFlagSet("ffaas", flag.ExitOnError)
flagSet.StringVar(&configFile, "config", "ffaas.toml", "")
flagSet.StringVar(&configFile, "config", "config.toml", "")
flagSet.BoolVar(&seed, "seed", false, "")
flagSet.Parse(os.Args[1:])

Expand All @@ -35,7 +35,7 @@ func main() {
}

cfg := storage.NewBoltConfig().
WithPath(config.Get().StoragePath).
WithPath(config.Get().BoltStoragePath).
WithReadOnly(false)
store, err := storage.NewBoltStore(cfg)
if err != nil {
Expand All @@ -52,26 +52,6 @@ func main() {
server := api.NewServer(store, metricStore, modCache)
fmt.Printf("api server running\t%s\n", config.GetApiUrl())
log.Fatal(server.Listen(config.Get().APIServerAddr))

// engine, err := actor.NewEngine(nil)
// if err != nil {
// log.Fatal(err)
// }

// eventPID := engine.SpawnFunc(func(c *actor.Context) {
// switch msg := c.Message().(type) {
// case actor.ActorInitializedEvent:
// if strings.Contains(msg.PID.String(), "runtime") {
// fmt.Println("got this", msg.PID)
// engine.Stop(msg.PID)
// }
// }
// }, "event")
// engine.Subscribe(eventPID)

// wasmServer := wasmhttp.NewServer(config.Get().WASMServerAddr, engine, memstore, metricStore, modCache)
// fmt.Printf("wasm server running\t%s\n", config.GetWasmUrl())
// log.Fatal(wasmServer.Listen())
}

func seedEndpoint(store storage.Store, cache storage.ModCacher) {
Expand All @@ -94,10 +74,12 @@ func seedEndpoint(store storage.Store, cache storage.ModCacher) {
store.CreateDeploy(deploy)
fmt.Printf("endpoint: %s\n", endpoint.URL)

// compCache := wazero.NewCompilationCache()
// compile(context.TODO(), compCache, deploy.Blob)
// fmt.Printf("%+v\n", compCache)
// cache.Put(endpoint.ID, compCache)
modCache, err := wazero.NewCompilationCacheWithDir(".modcache")
if err != nil {
log.Fatal(err)
}
compile(context.TODO(), modCache, deploy.Blob)
cache.Put(endpoint.ID, modCache)
}

func compile(ctx context.Context, cache wazero.CompilationCache, blob []byte) {
Expand Down
9 changes: 5 additions & 4 deletions cmd/wasmserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
)

func main() {
err := config.Parse("ffaas.toml")
err := config.Parse("config.toml")
if err != nil {
log.Fatal(err)
}

cfg := storage.NewBoltConfig().
WithPath(config.Get().StoragePath).
WithPath(config.Get().BoltStoragePath).
WithReadOnly(true)
store, err := storage.NewBoltStore(cfg)
if err != nil {
Expand All @@ -33,20 +33,21 @@ func main() {
metricStore = storage.NewMemoryMetricStore()
)

remote := remote.New("localhost:6666", nil)
remote := remote.New(config.Get().WASMClusterAddr, nil)
engine, err := actor.NewEngine(&actor.EngineConfig{
Remote: remote,
})
if err != nil {
log.Fatal(err)
}
// TODO: Get these values from the config.
c, err := cluster.New(cluster.Config{
Region: "f",
Engine: engine,
ID: "member1",
ClusterProvider: cluster.NewSelfManagedProvider(),
})
c.RegisterKind(actrs.KindRuntime, actrs.NewRuntime(store), &cluster.KindConfig{})
c.RegisterKind(actrs.KindRuntime, actrs.NewRuntime(store, modCache), &cluster.KindConfig{})
c.Start()

server := actrs.NewWasmServer(config.Get().WASMServerAddr, c, store, metricStore, modCache)
Expand Down
13 changes: 10 additions & 3 deletions pkg/actrs/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ const KindRuntime = "runtime"
// Runtime is an actor that can execute compiled WASM blobs in a distributed cluster.
type Runtime struct {
store storage.Store
cache storage.ModCacher
}

func NewRuntime(store storage.Store) actor.Producer {
func NewRuntime(store storage.Store, cache storage.ModCacher) actor.Producer {
return func() actor.Receiver {
return &Runtime{
store: store,
cache: cache,
}
}
}
Expand All @@ -48,16 +50,21 @@ func (r *Runtime) Receive(c *actor.Context) {
slog.Warn("runtime could not find the endpoint's active deploy from store", "err", err)
return
}
cache := wazero.NewCompilationCache()
httpmod, _ := NewRequestModule(msg)
r.exec(context.TODO(), deploy.Blob, cache, endpoint.Environment, httpmod)
modcache, ok := r.cache.Get(endpoint.ID)
if !ok {
modcache = wazero.NewCompilationCache()
slog.Warn("no cache hit", "endpoint", endpoint.ID)
}
r.exec(context.TODO(), deploy.Blob, modcache, endpoint.Environment, httpmod)
resp := &proto.HTTPResponse{
Response: httpmod.responseBytes,
RequestID: msg.ID,
StatusCode: http.StatusOK,
}
c.Respond(resp)
c.Engine().Poison(c.PID())
r.cache.Put(endpoint.ID, modcache)
}
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,28 @@ import (
)

const defaultConfig = `
wasmServerAddr = ":5000"
apiServerAddr = ":3000"
storageDriver = "bolt"
storagePath = ".db"
wasmClusterAddr = "localhost:6666"
wasmServerAddr = "localhost:5000"
apiServerAddr = "localhost:3000"
storageDriver = "bolt"
bolStoragePath = ".db"
`

// Config holds the global configuration which is READONLY ofcourse.
// Config holds the global configuration which is READONLY.
var config Config

type Config struct {
APIServerAddr string
WASMServerAddr string
StorageDriver string
StoragePath string
APIServerAddr string
WASMServerAddr string
StorageDriver string
BoltStoragePath string
WASMClusterAddr string
}

func Parse(path string) error {
_, err := os.Stat(path)
if errors.Is(err, os.ErrNotExist) {
if err := os.WriteFile("ffaas.toml", []byte(defaultConfig), os.ModePerm); err != nil {
if err := os.WriteFile("config.toml", []byte(defaultConfig), os.ModePerm); err != nil {
return err
}
}
Expand All @@ -51,14 +53,12 @@ func makeURL(address string) string {
host = address
port = ""
}

if host == "" {
host = "0.0.0.0"
}
if port == "" || port == "http" {
port = "80"
}

return "http://" + net.JoinHostPort(host, port)
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/storage/bbolt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package storage

import (
"errors"
"fmt"
"os"

"github.com/anthdm/ffaas/pkg/types"
Expand All @@ -27,7 +29,7 @@ func (config BoltConfig) WithPath(path string) BoltConfig {
}

func (config BoltConfig) WithReadOnly(b bool) BoltConfig {
config.readonly = true
config.readonly = b
return config
}

Expand All @@ -38,7 +40,8 @@ type BoltStore struct {

func NewBoltStore(config BoltConfig) (*BoltStore, error) {
var init bool
if _, err := os.Stat(config.path); err != nil {
_, err := os.Stat(config.path)
if errors.Is(err, os.ErrNotExist) {
init = true
}
db, err := bbolt.Open(config.path, 0600, &bbolt.Options{
Expand All @@ -48,7 +51,7 @@ func NewBoltStore(config BoltConfig) (*BoltStore, error) {
return nil, err
}

if init {
if init && !config.readonly {
tx, err := db.Begin(true)
if err != nil {
return nil, err
Expand Down Expand Up @@ -77,6 +80,9 @@ func NewBoltStore(config BoltConfig) (*BoltStore, error) {
func (s *BoltStore) CreateEndpoint(e *types.Endpoint) error {
return s.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket([]byte("endpoint"))
if bucket == nil {
return fmt.Errorf("could not find bucket: %s", "endpoint")
}
b, err := msgpack.Marshal(e)
if err != nil {
return err
Expand Down

0 comments on commit d99df51

Please sign in to comment.