From d7fb8f1d58d663a2cf5c76da013207381c7378db Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Fri, 3 May 2024 20:54:07 +0800 Subject: [PATCH] feat: support pluggable state store (#173) --- Makefile | 3 ++ admin/utils/utils.go | 3 +- benchmark/bench_test.go | 15 +++--- cmd/client/cmd.go | 3 +- cmd/client/consume/cmd.go | 5 +- cmd/client/create/cmd.go | 7 +-- cmd/client/delete/cmd.go | 2 +- cmd/client/list/cmd.go | 5 +- cmd/client/produce/cmd.go | 5 +- cmd/main.go | 3 +- cmd/perf/cmd.go | 3 +- cmd/server/cmd.go | 6 ++- cmd/standalone/cmd.go | 3 +- common/constants.go | 6 +-- common/model/function.go | 3 +- common/model/function_serde_test.go | 8 ++-- fs/api/instance.go | 3 +- fs/contube/contube.go | 1 + fs/contube/http.go | 8 ++-- fs/contube/http_test.go | 3 +- fs/contube/memory_test.go | 6 ++- fs/contube/pulsar.go | 5 +- fs/func_ctx_impl_test.go | 3 +- fs/instance_impl.go | 11 +++-- fs/instance_impl_test.go | 5 +- fs/manager.go | 21 ++++---- fs/runtime/grpc/grpc_func.go | 15 +++--- fs/runtime/grpc/grpc_func_test.go | 5 +- fs/runtime/grpc/mock_grpc_func_test.go | 9 ++-- fs/runtime/wazero/wazero_runtime.go | 13 +++-- fs/statestore/pebble.go | 7 +-- fs/statestore/pebble_test.go | 3 +- go.mod | 4 +- go.sum | 8 ++-- perf/perf.go | 13 ++--- server/config.go | 19 +++++--- server/config_test.go | 5 +- server/function_service.go | 3 +- server/http_tube_service.go | 3 +- server/server.go | 66 +++++++++++++++++++------- server/server_test.go | 11 +++-- server/state_service.go | 5 +- server/tube_service.go | 5 +- tests/integration_test.go | 11 +++-- 44 files changed, 219 insertions(+), 132 deletions(-) diff --git a/Makefile b/Makefile index 9ce6ec53..a5f86ad1 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,9 @@ build_example: lint: golangci-lint run +lint-fix: + golangci-lint run --fix + build_all: build build_example test: diff --git a/admin/utils/utils.go b/admin/utils/utils.go index 2358b5ac..b08815b4 100644 --- a/admin/utils/utils.go +++ b/admin/utils/utils.go @@ -18,7 +18,8 @@ package utils import ( "fmt" - "github.com/functionstream/function-stream/admin/client" + + adminclient "github.com/functionstream/function-stream/admin/client" ) func MakeQueueSourceTubeConfig(subName string, topics ...string) []adminclient.ModelTubeConfig { diff --git a/benchmark/bench_test.go b/benchmark/bench_test.go index 15679092..eadf0652 100644 --- a/benchmark/bench_test.go +++ b/benchmark/bench_test.go @@ -18,21 +18,22 @@ package benchmark import ( "context" + "math/rand" + "os" + "runtime/pprof" + "strconv" + "testing" + "time" + "github.com/apache/pulsar-client-go/pulsaradmin" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" - "github.com/functionstream/function-stream/admin/client" + adminclient "github.com/functionstream/function-stream/admin/client" adminutils "github.com/functionstream/function-stream/admin/utils" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs" "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/perf" "github.com/functionstream/function-stream/server" - "math/rand" - "os" - "runtime/pprof" - "strconv" - "testing" - "time" ) func BenchmarkStressForBasicFunc(b *testing.B) { diff --git a/cmd/client/cmd.go b/cmd/client/cmd.go index bce9a708..d7de0792 100644 --- a/cmd/client/cmd.go +++ b/cmd/client/cmd.go @@ -35,7 +35,8 @@ var ( ) func init() { - Cmd.PersistentFlags().StringVarP(&c.Config.ServiceAddr, "service-address", "s", "http://localhost:7300", "Service address") + Cmd.PersistentFlags().StringVarP(&c.Config.ServiceAddr, "service-address", "s", + "http://localhost:7300", "Service address") Cmd.AddCommand(create.Cmd) Cmd.AddCommand(list.Cmd) diff --git a/cmd/client/consume/cmd.go b/cmd/client/consume/cmd.go index 0df1a2bc..3bc4c430 100644 --- a/cmd/client/consume/cmd.go +++ b/cmd/client/consume/cmd.go @@ -18,11 +18,12 @@ package consume import ( "fmt" - "github.com/functionstream/function-stream/admin/client" + "os" + + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" "github.com/spf13/cobra" "golang.org/x/net/context" - "os" ) var ( diff --git a/cmd/client/create/cmd.go b/cmd/client/create/cmd.go index 2396ed49..b0cb0751 100644 --- a/cmd/client/create/cmd.go +++ b/cmd/client/create/cmd.go @@ -19,13 +19,14 @@ package create import ( "context" "fmt" - "github.com/functionstream/function-stream/admin/client" + "io" + "os" + + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/admin/utils" "github.com/functionstream/function-stream/cmd/client/common" fs_cmmon "github.com/functionstream/function-stream/common" "github.com/spf13/cobra" - "io" - "os" ) var ( diff --git a/cmd/client/delete/cmd.go b/cmd/client/delete/cmd.go index 186001d5..095a7ba5 100644 --- a/cmd/client/delete/cmd.go +++ b/cmd/client/delete/cmd.go @@ -17,7 +17,7 @@ package del import ( - "github.com/functionstream/function-stream/admin/client" + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" "github.com/spf13/cobra" ) diff --git a/cmd/client/list/cmd.go b/cmd/client/list/cmd.go index 9ba5a0d1..b6a617e9 100644 --- a/cmd/client/list/cmd.go +++ b/cmd/client/list/cmd.go @@ -19,10 +19,11 @@ package list import ( "context" "fmt" - "github.com/functionstream/function-stream/admin/client" + "os" + + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" "github.com/spf13/cobra" - "os" ) var Cmd = &cobra.Command{ diff --git a/cmd/client/produce/cmd.go b/cmd/client/produce/cmd.go index ff47825c..59c0ff92 100644 --- a/cmd/client/produce/cmd.go +++ b/cmd/client/produce/cmd.go @@ -18,11 +18,12 @@ package produce import ( "fmt" - "github.com/functionstream/function-stream/admin/client" + "os" + + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/cmd/client/common" "github.com/spf13/cobra" "golang.org/x/net/context" - "os" ) var ( diff --git a/cmd/main.go b/cmd/main.go index 2485e6a6..ceb6c601 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,12 +18,13 @@ package main import ( "fmt" + "os" + "github.com/functionstream/function-stream/cmd/client" "github.com/functionstream/function-stream/cmd/perf" "github.com/functionstream/function-stream/cmd/server" "github.com/functionstream/function-stream/cmd/standalone" "github.com/spf13/cobra" - "os" ) var ( diff --git a/cmd/perf/cmd.go b/cmd/perf/cmd.go index 281dc1f2..84c055ea 100644 --- a/cmd/perf/cmd.go +++ b/cmd/perf/cmd.go @@ -18,10 +18,11 @@ package perf import ( "context" + "io" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/perf" "github.com/spf13/cobra" - "io" ) var ( diff --git a/cmd/server/cmd.go b/cmd/server/cmd.go index c3409e4a..decc5ce9 100644 --- a/cmd/server/cmd.go +++ b/cmd/server/cmd.go @@ -18,10 +18,11 @@ package server import ( "context" + "io" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/server" "github.com/spf13/cobra" - "io" ) var ( @@ -45,7 +46,8 @@ var ( func init() { Cmd.Flags().StringVarP(&config.configFile, "config-file", "c", "conf/function-stream.yaml", "path to the config file (default is conf/function-stream.yaml)") - Cmd.Flags().BoolVarP(&config.loadConfigFromEnv, "load-config-from-env", "e", false, "load config from env (default is false)") + Cmd.Flags().BoolVarP(&config.loadConfigFromEnv, "load-config-from-env", "e", false, + "load config from env (default is false)") } func exec(*cobra.Command, []string) { diff --git a/cmd/standalone/cmd.go b/cmd/standalone/cmd.go index d2c9b890..90c04590 100644 --- a/cmd/standalone/cmd.go +++ b/cmd/standalone/cmd.go @@ -18,10 +18,11 @@ package standalone import ( "context" + "io" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/server" "github.com/spf13/cobra" - "io" ) var ( diff --git a/common/constants.go b/common/constants.go index 97e12c52..e44d666b 100644 --- a/common/constants.go +++ b/common/constants.go @@ -21,9 +21,9 @@ const ( MemoryTubeType = "memory" HttpTubeType = "http" - DefaultAddr = "localhost:7300" - DefaultPulsarURL = "pulsar://localhost:6650" - DefaultTubeType = PulsarTubeType + WASMRuntime = "wasm" RuntimeArchiveConfigKey = "archive" + + StateStorePebble = "pebble" ) diff --git a/common/model/function.go b/common/model/function.go index 4b2cc9b1..883e9ce4 100644 --- a/common/model/function.go +++ b/common/model/function.go @@ -17,9 +17,10 @@ package model import ( + "strings" + "github.com/functionstream/function-stream/fs/contube" "github.com/pkg/errors" - "strings" ) type TubeConfig struct { diff --git a/common/model/function_serde_test.go b/common/model/function_serde_test.go index 6fac8088..3ea3d58f 100644 --- a/common/model/function_serde_test.go +++ b/common/model/function_serde_test.go @@ -19,10 +19,11 @@ package model import ( "encoding/json" "fmt" - "github.com/functionstream/function-stream/common" - "gopkg.in/yaml.v3" "reflect" "testing" + + "github.com/functionstream/function-stream/common" + "gopkg.in/yaml.v3" ) func TestFunctionSerde(t *testing.T) { @@ -110,7 +111,8 @@ func TestFunctionSerdeWithNil(t *testing.T) { fmt.Println(string(data)) - f.Sources = []*TubeConfig{} // The nil would be expected to be converted to a zero-length array for the YAML serialization + f.Sources = []*TubeConfig{} // The nil would be expected to be converted to a zero-length array for the YAML + // serialization // YAML Deserialization err = yaml.Unmarshal(data, &f2) diff --git a/fs/api/instance.go b/fs/api/instance.go index e5d923fc..059a6988 100644 --- a/fs/api/instance.go +++ b/fs/api/instance.go @@ -17,10 +17,11 @@ package api import ( + "log/slog" + "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/contube" "golang.org/x/net/context" - "log/slog" ) type FunctionInstance interface { diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 9d9d285c..587020f0 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -19,6 +19,7 @@ package contube import ( "context" "fmt" + "github.com/pkg/errors" ) diff --git a/fs/contube/http.go b/fs/contube/http.go index 125d5e2b..426af4e2 100644 --- a/fs/contube/http.go +++ b/fs/contube/http.go @@ -17,13 +17,14 @@ package contube import ( - "github.com/pkg/errors" - "golang.org/x/net/context" "io" "log/slog" "net/http" "sync" "sync/atomic" + + "github.com/pkg/errors" + "golang.org/x/net/context" ) type state int @@ -133,7 +134,8 @@ func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Re return nil, ErrSinkTubeNotImplemented } -func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error), logger *slog.Logger) func(http.ResponseWriter, *http.Request) { +func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error), + logger *slog.Logger) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { endpoint, err := getEndpoint(r) if err != nil { diff --git a/fs/contube/http_test.go b/fs/contube/http_test.go index 3c4e2dbb..dc389415 100644 --- a/fs/contube/http_test.go +++ b/fs/contube/http_test.go @@ -17,9 +17,10 @@ package contube import ( + "testing" + "github.com/stretchr/testify/assert" "golang.org/x/net/context" - "testing" ) func TestHttpTubeHandleRecord(t *testing.T) { diff --git a/fs/contube/memory_test.go b/fs/contube/memory_test.go index 138e16f1..f3627bad 100644 --- a/fs/contube/memory_test.go +++ b/fs/contube/memory_test.go @@ -35,7 +35,8 @@ func TestMemoryTube(t *testing.T) { var events []Record topics := []string{"topic1", "topic2", "topic3"} - source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) + source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, + SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { t.Fatal(err) } @@ -72,7 +73,8 @@ func TestMemoryTube(t *testing.T) { wg.Wait() cancel() - // Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and the released queue is successful. + // Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and + // the released queue is successful. time.Sleep(100 * time.Millisecond) // assert the memoryQueueFactory.queues is empty. diff --git a/fs/contube/pulsar.go b/fs/contube/pulsar.go index f67fa277..2e12700d 100644 --- a/fs/contube/pulsar.go +++ b/fs/contube/pulsar.go @@ -18,10 +18,11 @@ package contube import ( "context" - "github.com/apache/pulsar-client-go/pulsar" - "github.com/pkg/errors" "log/slog" "sync/atomic" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/pkg/errors" ) const ( diff --git a/fs/func_ctx_impl_test.go b/fs/func_ctx_impl_test.go index 127de42c..625ed521 100644 --- a/fs/func_ctx_impl_test.go +++ b/fs/func_ctx_impl_test.go @@ -17,8 +17,9 @@ package fs import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestFuncCtx_NilStore(t *testing.T) { diff --git a/fs/instance_impl.go b/fs/instance_impl.go index 70c04187..583e52f5 100644 --- a/fs/instance_impl.go +++ b/fs/instance_impl.go @@ -18,13 +18,14 @@ package fs import ( "context" + "log/slog" + "reflect" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/pkg/errors" - "log/slog" - "reflect" ) type FunctionInstanceImpl struct { @@ -53,7 +54,8 @@ func NewDefaultInstanceFactory() api.FunctionInstanceFactory { return &DefaultInstanceFactory{} } -func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, funcCtx api.FunctionContext, index int32, logger *slog.Logger) api.FunctionInstance { +func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, funcCtx api.FunctionContext, + index int32, logger *slog.Logger) api.FunctionInstance { ctx, cancelFunc := context.WithCancel(context.Background()) ctx = context.WithValue(ctx, CtxKeyFunctionName, definition.Name) ctx = context.WithValue(ctx, CtxKeyInstanceIndex, index) @@ -69,7 +71,8 @@ func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, } } -func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory, sources []<-chan contube.Record, sink chan<- contube.Record) { +func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory, sources []<-chan contube.Record, + sink chan<- contube.Record) { runtime, err := runtimeFactory.NewFunctionRuntime(instance) if err != nil { instance.readyCh <- errors.Wrap(err, "Error creating runtime") diff --git a/fs/instance_impl_test.go b/fs/instance_impl_test.go index e4ee40a8..ebaedb35 100644 --- a/fs/instance_impl_test.go +++ b/fs/instance_impl_test.go @@ -17,9 +17,10 @@ package fs import ( - "github.com/functionstream/function-stream/common/model" "log/slog" "testing" + + "github.com/functionstream/function-stream/common/model" ) func TestFunctionInstanceContextSetting(t *testing.T) { @@ -38,7 +39,7 @@ func TestFunctionInstanceContextSetting(t *testing.T) { t.Errorf("Expected '%s' in ctx to be '%s'", CtxKeyFunctionName, definition.Name) } - if ctxValue, ok := instance.Context().Value(CtxKey(CtxKeyInstanceIndex)).(int32); !ok || ctxValue != index { + if ctxValue, ok := instance.Context().Value(CtxKeyInstanceIndex).(int32); !ok || ctxValue != index { t.Errorf("Expected '%s' in ctx to be '%d'", CtxKeyInstanceIndex, index) } diff --git a/fs/manager.go b/fs/manager.go index a915935a..b67d867f 100644 --- a/fs/manager.go +++ b/fs/manager.go @@ -18,6 +18,11 @@ package fs import ( "context" + "log/slog" + "math/rand" + "strconv" + "sync" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs/api" @@ -25,10 +30,6 @@ import ( "github.com/functionstream/function-stream/fs/runtime/wazero" "github.com/functionstream/function-stream/fs/statestore" "github.com/pkg/errors" - "log/slog" - "math/rand" - "strconv" - "sync" ) type namespacedName struct { @@ -142,7 +143,8 @@ func NewFunctionManager(opts ...ManagerOption) (*FunctionManager, error) { for k := range options.tubeFactoryMap { loadedTubeFact = append(loadedTubeFact, k) } - log.Info("Function manager created", slog.Any("runtime-factories", loadedRuntimeFact), slog.Any("tube-factories", loadedTubeFact)) + log.Info("Function manager created", slog.Any("runtime-factories", loadedRuntimeFact), + slog.Any("tube-factories", loadedTubeFact)) return &FunctionManager{ options: options, functions: make(map[namespacedName][]api.FunctionInstance), @@ -150,7 +152,7 @@ func NewFunctionManager(opts ...ManagerOption) (*FunctionManager, error) { }, nil } -func (fm *FunctionManager) getTubeFactory(tubeConfig *model.TubeConfig) (contube.TubeFactory, error) { // TODO: Change input parameter to Type +func (fm *FunctionManager) getTubeFactory(tubeConfig *model.TubeConfig) (contube.TubeFactory, error) { get := func(t string) (contube.TubeFactory, error) { factory, exist := fm.options.tubeFactoryMap[t] if !exist { @@ -182,7 +184,7 @@ func (fm *FunctionManager) getRuntimeFactory(t string) (api.FunctionRuntimeFacto return factory, nil } -func (fm *FunctionManager) createFuncCtx(f *model.Function) api.FunctionContext { +func (fm *FunctionManager) createFuncCtx() api.FunctionContext { return NewFuncCtxImpl(fm.options.stateStore) } @@ -197,7 +199,7 @@ func (fm *FunctionManager) StartFunction(f *model.Function) error { } fm.functions[getName(f.Namespace, f.Name)] = make([]api.FunctionInstance, f.Replicas) fm.functionsLock.Unlock() - funcCtx := fm.createFuncCtx(f) + funcCtx := fm.createFuncCtx() for i := int32(0); i < f.Replicas; i++ { runtimeType := fm.getRuntimeType(f.Runtime) @@ -290,7 +292,8 @@ func (fm *FunctionManager) ProduceEvent(name string, event contube.Record) error func (fm *FunctionManager) ConsumeEvent(name string) (contube.Record, error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := fm.options.tubeFactoryMap["default"].NewSourceTube(ctx, (&contube.SourceQueueConfig{Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) + c, err := fm.options.tubeFactoryMap["default"].NewSourceTube(ctx, (&contube.SourceQueueConfig{ + Topics: []string{name}, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap()) if err != nil { return nil, err } diff --git a/fs/runtime/grpc/grpc_func.go b/fs/runtime/grpc/grpc_func.go index 25f81516..751946ac 100644 --- a/fs/runtime/grpc/grpc_func.go +++ b/fs/runtime/grpc/grpc_func.go @@ -18,6 +18,11 @@ package grpc import ( "fmt" + "log/slog" + "net" + "sync" + "sync/atomic" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" @@ -25,10 +30,6 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "log/slog" - "net" - "sync" - "sync/atomic" ) type GRPCFuncRuntime struct { @@ -211,7 +212,8 @@ func (f *GRPCFuncRuntime) Update(new *proto.FunctionStatus) { f.readyCh <- fmt.Errorf("function failed to start") } if f.status.Status != new.Status { - f.log.InfoContext(f.ctx, "Function status update", slog.Any("new_status", new.Status), slog.Any("old_status", f.status.Status)) + f.log.InfoContext(f.ctx, "Function status update", slog.Any("new_status", new.Status), + slog.Any("old_status", f.status.Status)) } f.status = new } @@ -323,7 +325,8 @@ func (f *FunctionServerImpl) PutState(ctx context.Context, req *proto.PutStateRe }, nil } -func (f *FunctionServerImpl) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, error) { +func (f *FunctionServerImpl) GetState(ctx context.Context, req *proto.GetStateRequest) (*proto.GetStateResponse, + error) { runtime, err := f.getFunctionRuntime(ctx) if err != nil { return nil, err diff --git a/fs/runtime/grpc/grpc_func_test.go b/fs/runtime/grpc/grpc_func_test.go index 376159f7..4c5636ee 100644 --- a/fs/runtime/grpc/grpc_func_test.go +++ b/fs/runtime/grpc/grpc_func_test.go @@ -18,13 +18,14 @@ package grpc import ( "context" + "testing" + "time" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs" "github.com/functionstream/function-stream/fs/contube" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestFMWithGRPCRuntime(t *testing.T) { diff --git a/fs/runtime/grpc/mock_grpc_func_test.go b/fs/runtime/grpc/mock_grpc_func_test.go index a4799e96..0c4ad8ef 100644 --- a/fs/runtime/grpc/mock_grpc_func_test.go +++ b/fs/runtime/grpc/mock_grpc_func_test.go @@ -18,6 +18,11 @@ package grpc import ( "errors" + "io" + "log/slog" + "strconv" + "testing" + "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/runtime/grpc/proto" "github.com/stretchr/testify/assert" @@ -25,10 +30,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" - "io" - "log/slog" - "strconv" - "testing" ) func StartMockGRPCFunc(t *testing.T, addr string) { diff --git a/fs/runtime/wazero/wazero_runtime.go b/fs/runtime/wazero/wazero_runtime.go index 8661b6c8..99ac4b67 100644 --- a/fs/runtime/wazero/wazero_runtime.go +++ b/fs/runtime/wazero/wazero_runtime.go @@ -17,6 +17,10 @@ package wazero import ( + "log/slog" + "os" + "strconv" + "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" @@ -26,9 +30,6 @@ import ( "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" "github.com/tetratelabs/wazero/sys" "golang.org/x/net/context" - "log/slog" - "os" - "strconv" ) type WazeroFunctionRuntimeFactory struct { @@ -41,7 +42,8 @@ func NewWazeroFunctionRuntimeFactory() api.FunctionRuntimeFactory { func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionInstance) (api.FunctionRuntime, error) { log := instance.Logger() r := wazero.NewRuntime(instance.Context()) - _, err := r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(func(ctx context.Context, m wazero_api.Module, a, b, c, d uint32) { + _, err := r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(func(ctx context.Context, + m wazero_api.Module, a, b, c, d uint32) { panic("abort") }).Export("abort").Instantiate(instance.Context()) if err != nil { @@ -74,7 +76,8 @@ func (f *WazeroFunctionRuntimeFactory) NewFunctionRuntime(instance api.FunctionI mod, err := r.InstantiateWithConfig(instance.Context(), wasmBytes, config) if err != nil { if exitErr, ok := err.(*sys.ExitError); ok && exitErr.ExitCode() != 0 { - return nil, errors.Wrap(err, "Error instantiating function, function exit with code"+strconv.Itoa(int(exitErr.ExitCode()))) + return nil, errors.Wrap(err, "Error instantiating function, function exit with code"+ + strconv.Itoa(int(exitErr.ExitCode()))) } else if !ok { return nil, errors.Wrap(err, "Error instantiating function") } diff --git a/fs/statestore/pebble.go b/fs/statestore/pebble.go index 41a26775..11f9fff9 100644 --- a/fs/statestore/pebble.go +++ b/fs/statestore/pebble.go @@ -17,11 +17,12 @@ package statestore import ( + "log/slog" + "os" + "github.com/cockroachdb/pebble" "github.com/functionstream/function-stream/fs/api" "github.com/pkg/errors" - "log/slog" - "os" ) type PebbleStateStore struct { @@ -34,7 +35,7 @@ type PebbleStateStoreConfig struct { DirName string } -func NewTmpPebbleStateStore() (*PebbleStateStore, error) { +func NewTmpPebbleStateStore() (api.StateStore, error) { dir, err := os.MkdirTemp("", "") if err != nil { return nil, err diff --git a/fs/statestore/pebble_test.go b/fs/statestore/pebble_test.go index f699d1d8..a9247ad3 100644 --- a/fs/statestore/pebble_test.go +++ b/fs/statestore/pebble_test.go @@ -17,10 +17,11 @@ package statestore_test import ( + "testing" + "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/statestore" "github.com/stretchr/testify/assert" - "testing" ) func TestPebbleStateStore(t *testing.T) { diff --git a/go.mod b/go.mod index 258e5a7d..1c87b639 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 github.com/tetratelabs/wazero v1.6.0 - golang.org/x/net v0.22.0 + golang.org/x/net v0.23.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.61.1 google.golang.org/protobuf v1.33.0 @@ -86,7 +86,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/atomic v1.11.0 // indirect - go.uber.org/multierr v1.9.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect golang.org/x/mod v0.15.0 // indirect diff --git a/go.sum b/go.sum index fb7ef7e6..e1ea8c87 100644 --- a/go.sum +++ b/go.sum @@ -212,8 +212,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= -go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -233,8 +233,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/perf/perf.go b/perf/perf.go index 7458852a..2cf288ff 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -20,18 +20,19 @@ import ( "context" "encoding/json" "fmt" - "github.com/bmizerany/perks/quantile" - "github.com/functionstream/function-stream/admin/client" - "github.com/functionstream/function-stream/admin/utils" - "github.com/functionstream/function-stream/common" - "github.com/functionstream/function-stream/fs/contube" - "golang.org/x/time/rate" "log/slog" "math/rand" "os" "strconv" "sync/atomic" "time" + + "github.com/bmizerany/perks/quantile" + adminclient "github.com/functionstream/function-stream/admin/client" + "github.com/functionstream/function-stream/admin/utils" + "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/fs/contube" + "golang.org/x/time/rate" ) type TubeBuilder func(ctx context.Context) (contube.TubeFactory, error) diff --git a/server/config.go b/server/config.go index 18ef7e8f..860bf585 100644 --- a/server/config.go +++ b/server/config.go @@ -17,17 +17,13 @@ package server import ( - "github.com/functionstream/function-stream/common" - "github.com/pkg/errors" - "github.com/spf13/viper" "log/slog" "os" "strings" -) -const ( - WASMRuntime = "wasm" - GRPCRuntime = "grpc" + "github.com/functionstream/function-stream/common" + "github.com/pkg/errors" + "github.com/spf13/viper" ) type FactoryConfig struct { @@ -36,6 +32,11 @@ type FactoryConfig struct { Config *common.ConfigMap `mapstructure:"config"` } +type StateStoreConfig struct { + Type *string `mapstructure:"type"` + Config *common.ConfigMap `mapstructure:"config"` +} + type Config struct { // ListenAddr is the address that the function stream REST service will listen on. ListenAddr string `mapstructure:"listen_addr"` @@ -45,6 +46,10 @@ type Config struct { // RuntimeFactory is the list of runtime factories that the function stream server will use. RuntimeFactory map[string]*FactoryConfig `mapstructure:"runtime_factory"` + + // StateStore is the configuration for the state store that the function stream server will use. + // Optional + StateStore *StateStoreConfig `mapstructure:"state_store"` } func init() { diff --git a/server/config_test.go b/server/config_test.go index 291be0ed..f735cb56 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -17,11 +17,12 @@ package server import ( + "os" + "testing" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "os" - "testing" ) func TestLoadConfigFromYaml(t *testing.T) { diff --git a/server/function_service.go b/server/function_service.go index 29513116..4168103b 100644 --- a/server/function_service.go +++ b/server/function_service.go @@ -18,11 +18,12 @@ package server import ( "errors" + "net/http" + restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" - "net/http" ) func (s *Server) makeFunctionService() *restful.WebService { diff --git a/server/http_tube_service.go b/server/http_tube_service.go index 60cb7dc0..71fa75da 100644 --- a/server/http_tube_service.go +++ b/server/http_tube_service.go @@ -17,9 +17,10 @@ package server import ( + "net/http" + restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" - "net/http" ) func (s *Server) makeHttpTubeService() *restful.WebService { diff --git a/server/server.go b/server/server.go index f86866c7..3f97a9ca 100644 --- a/server/server.go +++ b/server/server.go @@ -18,6 +18,14 @@ package server import ( "context" + "log/slog" + "net" + "net/http" + "net/url" + "strings" + "sync/atomic" + "time" + restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" "github.com/functionstream/function-stream/common" @@ -25,21 +33,16 @@ import ( "github.com/functionstream/function-stream/fs/api" "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/fs/runtime/wazero" + "github.com/functionstream/function-stream/fs/statestore" "github.com/go-openapi/spec" "github.com/pkg/errors" "k8s.io/utils/set" - "log/slog" - "net" - "net/http" - "net/url" - "strings" - "sync/atomic" - "time" ) var ( ErrUnsupportedTRuntimeType = errors.New("unsupported runtime type") ErrUnsupportedTubeType = errors.New("unsupported tube type") + ErrUnsupportedStateStore = errors.New("unsupported state store") ) type Server struct { @@ -51,13 +54,15 @@ type Server struct { type TubeLoaderType func(c *FactoryConfig) (contube.TubeFactory, error) type RuntimeLoaderType func(c *FactoryConfig) (api.FunctionRuntimeFactory, error) +type StateStoreLoaderType func(c *StateStoreConfig) (api.StateStore, error) type serverOptions struct { - httpListener net.Listener - managerOpts []fs.ManagerOption - httpTubeFact *contube.HttpTubeFactory - tubeLoader TubeLoaderType - runtimeLoader RuntimeLoaderType + httpListener net.Listener + managerOpts []fs.ManagerOption + httpTubeFact *contube.HttpTubeFactory + tubeLoader TubeLoaderType + runtimeLoader RuntimeLoaderType + stateStoreLoader StateStoreLoaderType } type ServerOption interface { @@ -114,6 +119,13 @@ func WithRuntimeLoader(loader RuntimeLoaderType) ServerOption { }) } +func WithStateStoreLoader(loader func(c *StateStoreConfig) (api.StateStore, error)) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + o.stateStoreLoader = loader + return o, nil + }) +} + func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[string]) (string, error) { if visited.Has(name) { return "", errors.Errorf("circular reference of factory %s", name) @@ -129,7 +141,8 @@ func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[str return name, nil } -func initFactories[T any](m map[string]*FactoryConfig, newFactory func(c *FactoryConfig) (T, error), setup func(n string, f T)) error { +func initFactories[T any](m map[string]*FactoryConfig, newFactory func(c *FactoryConfig) (T, error), + setup func(n string, f T)) error { factoryMap := make(map[string]T) for name := range m { @@ -169,12 +182,20 @@ func DefaultTubeLoader(c *FactoryConfig) (contube.TubeFactory, error) { func DefaultRuntimeLoader(c *FactoryConfig) (api.FunctionRuntimeFactory, error) { switch strings.ToLower(*c.Type) { - case WASMRuntime: + case common.WASMRuntime: return wazero.NewWazeroFunctionRuntimeFactory(), nil } return nil, errors.WithMessagef(ErrUnsupportedTRuntimeType, "unsupported runtime type: %s", *c.Type) } +func DefaultStateStoreLoader(c *StateStoreConfig) (api.StateStore, error) { + switch strings.ToLower(*c.Type) { + case common.StateStorePebble: + return statestore.NewTmpPebbleStateStore() + } + return nil, errors.WithMessagef(ErrUnsupportedStateStore, "unsupported state store type: %s", *c.Type) +} + func WithConfig(config *Config) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { ln, err := net.Listen("tcp", config.ListenAddr) @@ -188,12 +209,20 @@ func WithConfig(config *Config) ServerOption { if err != nil { return nil, err } - err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory, o.runtimeLoader, func(n string, f api.FunctionRuntimeFactory) { - o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f)) - }) + err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory, o.runtimeLoader, + func(n string, f api.FunctionRuntimeFactory) { + o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f)) + }) if err != nil { return nil, err } + if config.StateStore != nil { + stateStore, err := o.stateStoreLoader(config.StateStore) + if err != nil { + return nil, err + } + o.managerOpts = append(o.managerOpts, fs.WithStateStore(stateStore)) + } return o, nil }) } @@ -208,6 +237,7 @@ func NewServer(opts ...ServerOption) (*Server, error) { options.httpTubeFact = httpTubeFact options.tubeLoader = DefaultTubeLoader options.runtimeLoader = DefaultRuntimeLoader + options.stateStoreLoader = DefaultStateStoreLoader for _, o := range opts { if o == nil { continue @@ -250,7 +280,7 @@ func NewDefaultServer() (*Server, error) { }, RuntimeFactory: map[string]*FactoryConfig{ "wasm": { - Type: common.OptionalStr(WASMRuntime), + Type: common.OptionalStr(common.WASMRuntime), }, "default": { Ref: common.OptionalStr("wasm"), diff --git a/server/server_test.go b/server/server_test.go index 6b1c54cf..d62c1ba3 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -19,7 +19,12 @@ package server import ( "context" "encoding/json" - "github.com/functionstream/function-stream/admin/client" + "math/rand" + "net" + "strconv" + "testing" + + adminclient "github.com/functionstream/function-stream/admin/client" "github.com/functionstream/function-stream/common" "github.com/functionstream/function-stream/common/model" "github.com/functionstream/function-stream/fs" @@ -27,10 +32,6 @@ import ( "github.com/functionstream/function-stream/fs/contube" "github.com/functionstream/function-stream/tests" "github.com/stretchr/testify/assert" - "math/rand" - "net" - "strconv" - "testing" ) func getListener(t *testing.T) net.Listener { diff --git a/server/state_service.go b/server/state_service.go index 806b2c6d..a5527795 100644 --- a/server/state_service.go +++ b/server/state_service.go @@ -17,11 +17,12 @@ package server import ( + "io" + "net/http" + restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" "github.com/pkg/errors" - "io" - "net/http" ) func (s *Server) makeStateService() *restful.WebService { diff --git a/server/tube_service.go b/server/tube_service.go index 2dd4fc0a..adc8ea7f 100644 --- a/server/tube_service.go +++ b/server/tube_service.go @@ -17,11 +17,12 @@ package server import ( + "io" + "net/http" + restfulspec "github.com/emicklei/go-restful-openapi/v2" "github.com/emicklei/go-restful/v3" "github.com/functionstream/function-stream/fs/contube" - "io" - "net/http" ) // Due to this issue: https://github.com/emicklei/go-restful-openapi/issues/115, diff --git a/tests/integration_test.go b/tests/integration_test.go index b3de8cb9..b788e749 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -19,15 +19,16 @@ package tests import ( "context" "encoding/json" - "github.com/apache/pulsar-client-go/pulsar" - "github.com/functionstream/function-stream/admin/client" - "github.com/functionstream/function-stream/admin/utils" - "github.com/functionstream/function-stream/common" - "github.com/functionstream/function-stream/server" "io" "math/rand" "strconv" "testing" + + "github.com/apache/pulsar-client-go/pulsar" + adminclient "github.com/functionstream/function-stream/admin/client" + "github.com/functionstream/function-stream/admin/utils" + "github.com/functionstream/function-stream/common" + "github.com/functionstream/function-stream/server" ) func startServer() {