Skip to content

Commit

Permalink
Support pluggable tube/runtime loader (#172)
Browse files Browse the repository at this point in the history
Signed-off-by: Zike Yang <zike@apache.org>
  • Loading branch information
RobertIndie authored May 3, 2024
1 parent d65f987 commit 4618660
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 61 deletions.
4 changes: 3 additions & 1 deletion fs/contube/contube.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,7 @@ func (e *RecordImpl) GetPayload() []byte {
}

func (e *RecordImpl) Commit() {
e.commitFunc()
if e.commitFunc != nil {
e.commitFunc()
}
}
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ require (
github.com/emicklei/go-restful-openapi/v2 v2.9.2-0.20231020145053-a5b7d60bb267
github.com/emicklei/go-restful/v3 v3.12.0
github.com/go-openapi/spec v0.21.0
github.com/gorilla/mux v1.8.1
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.0
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/tetratelabs/wazero v1.6.0
golang.org/x/net v0.22.0
golang.org/x/time v0.5.0
google.golang.org/grpc v1.61.1
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
)

require (
Expand All @@ -36,9 +37,7 @@ require (
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/danieljoos/wincred v1.2.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/frankban/quicktest v1.14.6 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down Expand Up @@ -85,7 +84,6 @@ require (
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.18.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
Expand All @@ -105,7 +103,6 @@ require (
k8s.io/apimachinery v0.29.1 // indirect
k8s.io/client-go v0.29.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/utils v0.0.0-20240102154912-e7106e64919e // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
18 changes: 0 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ github.com/AthenZ/athenz v1.11.50 h1:mCyQhI32GHPpPde9NVChI46hpRjw+vX1Z4RN8GCDILE
github.com/AthenZ/athenz v1.11.50/go.mod h1:HfKWur/iDpTKNb2TVaKKy4mt+Qa0PnZpIOqcmR9/i+Q=
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I=
github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
Expand Down Expand Up @@ -37,28 +35,20 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/danieljoos/wincred v1.2.1 h1:dl9cBrupW8+r5250DYkYxocLeZ1Y4vB1kxgtjxw8GQs=
github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5FdCcyfPwps=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY=
github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/emicklei/go-restful-openapi/v2 v2.9.1 h1:Of8B1rXdG81il5TTiSY+9Qrh7pYOr8aLdynHIpvo7fM=
github.com/emicklei/go-restful-openapi/v2 v2.9.1/go.mod h1:VKNgZyYviM1hnyrjD9RDzP2RuE94xTXxV+u6MGN4v4k=
github.com/emicklei/go-restful-openapi/v2 v2.9.2-0.20231020145053-a5b7d60bb267 h1:9hKp1vLTq4I9hA/hhZHOUTNX8DGFdLsLMl9pHl9VJAA=
github.com/emicklei/go-restful-openapi/v2 v2.9.2-0.20231020145053-a5b7d60bb267/go.mod h1:4CTuOXHFg3jkvCpnXN+Wkw5prVUnP8hIACssJTYorWo=
github.com/emicklei/go-restful/v3 v3.7.3/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk=
github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps=
Expand All @@ -71,11 +61,9 @@ github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ=
github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY=
github.com/go-openapi/jsonreference v0.19.6/go.mod h1:diGHMEHg2IqXZGKxqyvWdfWU/aim5Dprw5bqpKkTvns=
github.com/go-openapi/jsonreference v0.20.0/go.mod h1:Ag74Ico3lPc+zR+qjn4XBUmXymS4zJbYVCZmcgkasdo=
github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ=
github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4=
github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I=
github.com/go-openapi/spec v0.20.9/go.mod h1:2OpW+JddWPrpXSCIX8eOx7lZ5iyuWj3RYR6VaaBKcWA=
github.com/go-openapi/spec v0.21.0 h1:LTVzPc3p/RzRnkQqLRndbAzjY0d0BCL72A6j3CdL9ZY=
github.com/go-openapi/spec v0.21.0/go.mod h1:78u6VdPw81XU44qEWGhtr982gJ5BWg2c0I5XwVMotYk=
Expand Down Expand Up @@ -107,8 +95,6 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
Expand Down Expand Up @@ -168,7 +154,6 @@ github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTw
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -247,7 +232,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
Expand All @@ -263,7 +247,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -276,7 +259,6 @@ golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
Expand Down
10 changes: 3 additions & 7 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/functionstream/function-stream/common"
"github.com/pkg/errors"
"github.com/spf13/viper"
"k8s.io/utils/set"
"log/slog"
"os"
"strings"
Expand Down Expand Up @@ -52,7 +51,7 @@ func init() {
viper.SetDefault("ListenAddr", "7300")
}

func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig, supportedTypes set.Set[string]) error {
func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig) error {
for name, factory := range m {
if ref := factory.Ref; ref != nil && *ref != "" {
referred, ok := m[strings.ToLower(*ref)]
Expand All @@ -70,9 +69,6 @@ func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig, supportedT
if factory.Type == nil {
return errors.Errorf("%s factory %s has no type", n, name)
}
if !supportedTypes.Has(strings.ToLower(*factory.Type)) {
return errors.Errorf("%s factory %s has unsupported type %s", n, name, *factory.Type)
}
}
return nil
}
Expand All @@ -81,11 +77,11 @@ func (c *Config) preprocessConfig() error {
if c.ListenAddr == "" {
return errors.New("ListenAddr shouldn't be empty")
}
err := preprocessFactoriesConfig("Tube", c.TubeFactory, set.New[string](common.PulsarTubeType, common.MemoryTubeType))
err := preprocessFactoriesConfig("Tube", c.TubeFactory)
if err != nil {
return err
}
return preprocessFactoriesConfig("Runtime", c.RuntimeFactory, set.New[string](WASMRuntime, GRPCRuntime))
return preprocessFactoriesConfig("Runtime", c.RuntimeFactory)
}

func loadConfig() (*Config, error) {
Expand Down
90 changes: 60 additions & 30 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,27 @@ import (
"time"
)

var (
ErrUnsupportedTRuntimeType = errors.New("unsupported runtime type")
ErrUnsupportedTubeType = errors.New("unsupported tube type")
)

type Server struct {
options *serverOptions
httpSvr atomic.Pointer[http.Server]
log *slog.Logger
Manager *fs.FunctionManager
}

type TubeLoaderType func(c *FactoryConfig) (contube.TubeFactory, error)
type RuntimeLoaderType func(c *FactoryConfig) (api.FunctionRuntimeFactory, error)

type serverOptions struct {
httpListener net.Listener
managerOpts []fs.ManagerOption
httpTubeFact *contube.HttpTubeFactory
httpListener net.Listener
managerOpts []fs.ManagerOption
httpTubeFact *contube.HttpTubeFactory
tubeLoader TubeLoaderType
runtimeLoader RuntimeLoaderType
}

type ServerOption interface {
Expand Down Expand Up @@ -86,6 +96,24 @@ func WithHttpTubeFactory(factory *contube.HttpTubeFactory) ServerOption {
})
}

// WithTubeLoader sets the loader for the tube factory.
// This must be called before WithConfig.
func WithTubeLoader(loader TubeLoaderType) ServerOption {
return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) {
o.tubeLoader = loader
return o, nil
})
}

// WithRuntimeLoader sets the loader for the runtime factory.
// This must be called before WithConfig.
func WithRuntimeLoader(loader RuntimeLoaderType) ServerOption {
return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) {
o.runtimeLoader = loader
return o, nil
})
}

func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[string]) (string, error) {
if visited.Has(name) {
return "", errors.Errorf("circular reference of factory %s", name)
Expand All @@ -101,7 +129,7 @@ func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[str
return name, nil
}

func initFactories[T any](m map[string]*FactoryConfig, newFactory func(n string, c *FactoryConfig) (T, error), setup func(n string, f T)) error {
func initFactories[T any](m map[string]*FactoryConfig, newFactory func(c *FactoryConfig) (T, error), setup func(n string, f T)) error {
factoryMap := make(map[string]T)

for name := range m {
Expand All @@ -114,7 +142,10 @@ func initFactories[T any](m map[string]*FactoryConfig, newFactory func(n string,
if !exist {
return errors.Errorf("factory %s not found, which the factory %s is pointed to", refName, name)
}
f, err := newFactory(refName, fc)
if fc.Type == nil {
return errors.Errorf("factory %s type is not set", refName)
}
f, err := newFactory(fc)
if err != nil {
return err
}
Expand All @@ -126,43 +157,40 @@ func initFactories[T any](m map[string]*FactoryConfig, newFactory func(n string,
return nil
}

func DefaultTubeLoader(c *FactoryConfig) (contube.TubeFactory, error) {
switch strings.ToLower(*c.Type) {
case common.PulsarTubeType:
return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(*c.Config))
case common.MemoryTubeType:
return contube.NewMemoryQueueFactory(context.Background()), nil
}
return nil, errors.WithMessagef(ErrUnsupportedTubeType, "unsupported tube type :%s", *c.Type)
}

func DefaultRuntimeLoader(c *FactoryConfig) (api.FunctionRuntimeFactory, error) {
switch strings.ToLower(*c.Type) {
case WASMRuntime:
return wazero.NewWazeroFunctionRuntimeFactory(), nil
}
return nil, errors.WithMessagef(ErrUnsupportedTRuntimeType, "unsupported runtime type: %s", *c.Type)
}

func WithConfig(config *Config) ServerOption {
return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) {
ln, err := net.Listen("tcp", config.ListenAddr)
if err != nil {
return nil, err
}
o.httpListener = ln
err = initFactories[contube.TubeFactory](config.TubeFactory, func(n string, c *FactoryConfig) (contube.TubeFactory, error) {
if c.Type == nil {
return nil, errors.Errorf("tube factory %s type is not set", n)
}
switch strings.ToLower(*c.Type) {
case common.PulsarTubeType:
return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(*c.Config))
case common.MemoryTubeType:
return contube.NewMemoryQueueFactory(context.Background()), nil
}
return nil, errors.Errorf("unsupported tube type %s", *c.Type)
}, func(n string, f contube.TubeFactory) {
err = initFactories[contube.TubeFactory](config.TubeFactory, o.tubeLoader, func(n string, f contube.TubeFactory) {
o.managerOpts = append(o.managerOpts, fs.WithTubeFactory(n, f))
})
if err != nil {
return nil, err
}
err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory,
func(n string, c *FactoryConfig) (api.FunctionRuntimeFactory, error) {
if c.Type == nil {
return nil, errors.Errorf("runtime factory %s type is not set", n)
}
switch strings.ToLower(*c.Type) {
case WASMRuntime:
return wazero.NewWazeroFunctionRuntimeFactory(), nil
}
return nil, errors.Errorf("unsupported runtime type %s", *c.Type)
}, func(n string, f api.FunctionRuntimeFactory) {
o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f))
})
err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory, o.runtimeLoader, func(n string, f api.FunctionRuntimeFactory) {
o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f))
})
if err != nil {
return nil, err
}
Expand All @@ -178,6 +206,8 @@ func NewServer(opts ...ServerOption) (*Server, error) {
fs.WithTubeFactory("http", httpTubeFact),
}
options.httpTubeFact = httpTubeFact
options.tubeLoader = DefaultTubeLoader
options.runtimeLoader = DefaultRuntimeLoader
for _, o := range opts {
if o == nil {
continue
Expand Down

0 comments on commit 4618660

Please sign in to comment.