Skip to content

Commit

Permalink
feat: support pluggable state store (#173)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored May 3, 2024
1 parent 4618660 commit d7fb8f1
Show file tree
Hide file tree
Showing 44 changed files with 219 additions and 132 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ build_example:
lint:
golangci-lint run

lint-fix:
golangci-lint run --fix

build_all: build build_example

test:
Expand Down
3 changes: 2 additions & 1 deletion admin/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package utils

import (
"fmt"
"github.com/functionstream/function-stream/admin/client"

adminclient "github.com/functionstream/function-stream/admin/client"
)

func MakeQueueSourceTubeConfig(subName string, topics ...string) []adminclient.ModelTubeConfig {
Expand Down
15 changes: 8 additions & 7 deletions benchmark/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@ package benchmark

import (
"context"
"math/rand"
"os"
"runtime/pprof"
"strconv"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/functionstream/function-stream/admin/client"
adminclient "github.com/functionstream/function-stream/admin/client"
adminutils "github.com/functionstream/function-stream/admin/utils"
"github.com/functionstream/function-stream/common"
"github.com/functionstream/function-stream/fs"
"github.com/functionstream/function-stream/fs/contube"
"github.com/functionstream/function-stream/perf"
"github.com/functionstream/function-stream/server"
"math/rand"
"os"
"runtime/pprof"
"strconv"
"testing"
"time"
)

func BenchmarkStressForBasicFunc(b *testing.B) {
Expand Down
3 changes: 2 additions & 1 deletion cmd/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var (
)

func init() {
Cmd.PersistentFlags().StringVarP(&c.Config.ServiceAddr, "service-address", "s", "http://localhost:7300", "Service address")
Cmd.PersistentFlags().StringVarP(&c.Config.ServiceAddr, "service-address", "s",
"http://localhost:7300", "Service address")

Cmd.AddCommand(create.Cmd)
Cmd.AddCommand(list.Cmd)
Expand Down
5 changes: 3 additions & 2 deletions cmd/client/consume/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package consume

import (
"fmt"
"github.com/functionstream/function-stream/admin/client"
"os"

adminclient "github.com/functionstream/function-stream/admin/client"
"github.com/functionstream/function-stream/cmd/client/common"
"github.com/spf13/cobra"
"golang.org/x/net/context"
"os"
)

var (
Expand Down
7 changes: 4 additions & 3 deletions cmd/client/create/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package create
import (
"context"
"fmt"
"github.com/functionstream/function-stream/admin/client"
"io"
"os"

adminclient "github.com/functionstream/function-stream/admin/client"
"github.com/functionstream/function-stream/admin/utils"
"github.com/functionstream/function-stream/cmd/client/common"
fs_cmmon "github.com/functionstream/function-stream/common"
"github.com/spf13/cobra"
"io"
"os"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/delete/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package del

import (
"github.com/functionstream/function-stream/admin/client"
adminclient "github.com/functionstream/function-stream/admin/client"
"github.com/functionstream/function-stream/cmd/client/common"
"github.com/spf13/cobra"
)
Expand Down
5 changes: 3 additions & 2 deletions cmd/client/list/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package list
import (
"context"
"fmt"
"github.com/functionstream/function-stream/admin/client"
"os"

adminclient "github.com/functionstream/function-stream/admin/client"
"github.com/functionstream/function-stream/cmd/client/common"
"github.com/spf13/cobra"
"os"
)

var Cmd = &cobra.Command{
Expand Down
5 changes: 3 additions & 2 deletions cmd/client/produce/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package produce

import (
"fmt"
"github.com/functionstream/function-stream/admin/client"
"os"

adminclient "github.com/functionstream/function-stream/admin/client"
"github.com/functionstream/function-stream/cmd/client/common"
"github.com/spf13/cobra"
"golang.org/x/net/context"
"os"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package main

import (
"fmt"
"os"

"github.com/functionstream/function-stream/cmd/client"
"github.com/functionstream/function-stream/cmd/perf"
"github.com/functionstream/function-stream/cmd/server"
"github.com/functionstream/function-stream/cmd/standalone"
"github.com/spf13/cobra"
"os"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion cmd/perf/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package perf

import (
"context"
"io"

"github.com/functionstream/function-stream/common"
"github.com/functionstream/function-stream/perf"
"github.com/spf13/cobra"
"io"
)

var (
Expand Down
6 changes: 4 additions & 2 deletions cmd/server/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package server

import (
"context"
"io"

"github.com/functionstream/function-stream/common"
"github.com/functionstream/function-stream/server"
"github.com/spf13/cobra"
"io"
)

var (
Expand All @@ -45,7 +46,8 @@ var (
func init() {
Cmd.Flags().StringVarP(&config.configFile, "config-file", "c", "conf/function-stream.yaml",
"path to the config file (default is conf/function-stream.yaml)")
Cmd.Flags().BoolVarP(&config.loadConfigFromEnv, "load-config-from-env", "e", false, "load config from env (default is false)")
Cmd.Flags().BoolVarP(&config.loadConfigFromEnv, "load-config-from-env", "e", false,
"load config from env (default is false)")
}

func exec(*cobra.Command, []string) {
Expand Down
3 changes: 2 additions & 1 deletion cmd/standalone/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package standalone

import (
"context"
"io"

"github.com/functionstream/function-stream/common"
"github.com/functionstream/function-stream/server"
"github.com/spf13/cobra"
"io"
)

var (
Expand Down
6 changes: 3 additions & 3 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ const (
MemoryTubeType = "memory"
HttpTubeType = "http"

DefaultAddr = "localhost:7300"
DefaultPulsarURL = "pulsar://localhost:6650"
DefaultTubeType = PulsarTubeType
WASMRuntime = "wasm"

RuntimeArchiveConfigKey = "archive"

StateStorePebble = "pebble"
)
3 changes: 2 additions & 1 deletion common/model/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package model

import (
"strings"

"github.com/functionstream/function-stream/fs/contube"
"github.com/pkg/errors"
"strings"
)

type TubeConfig struct {
Expand Down
8 changes: 5 additions & 3 deletions common/model/function_serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package model
import (
"encoding/json"
"fmt"
"github.com/functionstream/function-stream/common"
"gopkg.in/yaml.v3"
"reflect"
"testing"

"github.com/functionstream/function-stream/common"
"gopkg.in/yaml.v3"
)

func TestFunctionSerde(t *testing.T) {
Expand Down Expand Up @@ -110,7 +111,8 @@ func TestFunctionSerdeWithNil(t *testing.T) {

fmt.Println(string(data))

f.Sources = []*TubeConfig{} // The nil would be expected to be converted to a zero-length array for the YAML serialization
f.Sources = []*TubeConfig{} // The nil would be expected to be converted to a zero-length array for the YAML
// serialization

// YAML Deserialization
err = yaml.Unmarshal(data, &f2)
Expand Down
3 changes: 2 additions & 1 deletion fs/api/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package api

import (
"log/slog"

"github.com/functionstream/function-stream/common/model"
"github.com/functionstream/function-stream/fs/contube"
"golang.org/x/net/context"
"log/slog"
)

type FunctionInstance interface {
Expand Down
1 change: 1 addition & 0 deletions fs/contube/contube.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package contube
import (
"context"
"fmt"

"github.com/pkg/errors"
)

Expand Down
8 changes: 5 additions & 3 deletions fs/contube/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package contube

import (
"github.com/pkg/errors"
"golang.org/x/net/context"
"io"
"log/slog"
"net/http"
"sync"
"sync/atomic"

"github.com/pkg/errors"
"golang.org/x/net/context"
)

type state int
Expand Down Expand Up @@ -133,7 +134,8 @@ func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Re
return nil, ErrSinkTubeNotImplemented
}

func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error), logger *slog.Logger) func(http.ResponseWriter, *http.Request) {
func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error),
logger *slog.Logger) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
endpoint, err := getEndpoint(r)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion fs/contube/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package contube

import (
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"testing"
)

func TestHttpTubeHandleRecord(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions fs/contube/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestMemoryTube(t *testing.T) {
var events []Record

topics := []string{"topic1", "topic2", "topic3"}
source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics, SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap())
source, err := memoryQueueFactory.NewSourceTube(ctx, (&SourceQueueConfig{Topics: topics,
SubName: "consume-" + strconv.Itoa(rand.Int())}).ToConfigMap())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -72,7 +73,8 @@ func TestMemoryTube(t *testing.T) {
wg.Wait()
cancel()

// Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and the released queue is successful.
// Give enough time to ensure that the goroutine execution within NewSource Tube and NewSinkTube is complete and
// the released queue is successful.
time.Sleep(100 * time.Millisecond)

// assert the memoryQueueFactory.queues is empty.
Expand Down
5 changes: 3 additions & 2 deletions fs/contube/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package contube

import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/pkg/errors"
"log/slog"
"sync/atomic"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pkg/errors"
)

const (
Expand Down
3 changes: 2 additions & 1 deletion fs/func_ctx_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package fs

import (
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFuncCtx_NilStore(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions fs/instance_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package fs

import (
"context"
"log/slog"
"reflect"

"github.com/functionstream/function-stream/common"
"github.com/functionstream/function-stream/common/model"
"github.com/functionstream/function-stream/fs/api"
"github.com/functionstream/function-stream/fs/contube"
"github.com/pkg/errors"
"log/slog"
"reflect"
)

type FunctionInstanceImpl struct {
Expand Down Expand Up @@ -53,7 +54,8 @@ func NewDefaultInstanceFactory() api.FunctionInstanceFactory {
return &DefaultInstanceFactory{}
}

func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, funcCtx api.FunctionContext, index int32, logger *slog.Logger) api.FunctionInstance {
func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function, funcCtx api.FunctionContext,
index int32, logger *slog.Logger) api.FunctionInstance {
ctx, cancelFunc := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, CtxKeyFunctionName, definition.Name)
ctx = context.WithValue(ctx, CtxKeyInstanceIndex, index)
Expand All @@ -69,7 +71,8 @@ func (f *DefaultInstanceFactory) NewFunctionInstance(definition *model.Function,
}
}

func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory, sources []<-chan contube.Record, sink chan<- contube.Record) {
func (instance *FunctionInstanceImpl) Run(runtimeFactory api.FunctionRuntimeFactory, sources []<-chan contube.Record,
sink chan<- contube.Record) {
runtime, err := runtimeFactory.NewFunctionRuntime(instance)
if err != nil {
instance.readyCh <- errors.Wrap(err, "Error creating runtime")
Expand Down
Loading

0 comments on commit d7fb8f1

Please sign in to comment.