diff --git a/fs/contube/contube.go b/fs/contube/contube.go index 3205fff7..9d9d285c 100644 --- a/fs/contube/contube.go +++ b/fs/contube/contube.go @@ -139,5 +139,7 @@ func (e *RecordImpl) GetPayload() []byte { } func (e *RecordImpl) Commit() { - e.commitFunc() + if e.commitFunc != nil { + e.commitFunc() + } } diff --git a/go.mod b/go.mod index a2b1273b..258e5a7d 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,9 @@ require ( github.com/emicklei/go-restful-openapi/v2 v2.9.2-0.20231020145053-a5b7d60bb267 github.com/emicklei/go-restful/v3 v3.12.0 github.com/go-openapi/spec v0.21.0 - github.com/gorilla/mux v1.8.1 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.8.0 + github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 github.com/tetratelabs/wazero v1.6.0 golang.org/x/net v0.22.0 @@ -19,6 +19,7 @@ require ( google.golang.org/grpc v1.61.1 google.golang.org/protobuf v1.33.0 gopkg.in/yaml.v3 v3.0.1 + k8s.io/utils v0.0.0-20240102154912-e7106e64919e ) require ( @@ -36,9 +37,7 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/danieljoos/wincred v1.2.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect - github.com/frankban/quicktest v1.14.6 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-logr/logr v1.4.1 // indirect @@ -85,7 +84,6 @@ require ( github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.18.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.9.0 // indirect @@ -105,7 +103,6 @@ require ( k8s.io/apimachinery v0.29.1 // indirect k8s.io/client-go v0.29.1 // indirect k8s.io/klog/v2 v2.120.1 // indirect - k8s.io/utils v0.0.0-20240102154912-e7106e64919e // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/go.sum b/go.sum index 7d1faa37..fb7ef7e6 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/AthenZ/athenz v1.11.50 h1:mCyQhI32GHPpPde9NVChI46hpRjw+vX1Z4RN8GCDILE github.com/AthenZ/athenz v1.11.50/go.mod h1:HfKWur/iDpTKNb2TVaKKy4mt+Qa0PnZpIOqcmR9/i+Q= github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= -github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= -github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I= github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= @@ -37,28 +35,20 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/danieljoos/wincred v1.2.1 h1:dl9cBrupW8+r5250DYkYxocLeZ1Y4vB1kxgtjxw8GQs= github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5FdCcyfPwps= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= -github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA2EjTY= github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= -github.com/emicklei/go-restful-openapi/v2 v2.9.1 h1:Of8B1rXdG81il5TTiSY+9Qrh7pYOr8aLdynHIpvo7fM= -github.com/emicklei/go-restful-openapi/v2 v2.9.1/go.mod h1:VKNgZyYviM1hnyrjD9RDzP2RuE94xTXxV+u6MGN4v4k= github.com/emicklei/go-restful-openapi/v2 v2.9.2-0.20231020145053-a5b7d60bb267 h1:9hKp1vLTq4I9hA/hhZHOUTNX8DGFdLsLMl9pHl9VJAA= github.com/emicklei/go-restful-openapi/v2 v2.9.2-0.20231020145053-a5b7d60bb267/go.mod h1:4CTuOXHFg3jkvCpnXN+Wkw5prVUnP8hIACssJTYorWo= -github.com/emicklei/go-restful/v3 v3.7.3/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/emicklei/go-restful/v3 v3.12.0 h1:y2DdzBAURM29NFF94q6RaY4vjIH1rtwDapwQtU84iWk= github.com/emicklei/go-restful/v3 v3.12.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= @@ -71,11 +61,9 @@ github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34 github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= -github.com/go-openapi/jsonreference v0.19.6/go.mod h1:diGHMEHg2IqXZGKxqyvWdfWU/aim5Dprw5bqpKkTvns= github.com/go-openapi/jsonreference v0.20.0/go.mod h1:Ag74Ico3lPc+zR+qjn4XBUmXymS4zJbYVCZmcgkasdo= github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= -github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I= github.com/go-openapi/spec v0.20.9/go.mod h1:2OpW+JddWPrpXSCIX8eOx7lZ5iyuWj3RYR6VaaBKcWA= github.com/go-openapi/spec v0.21.0 h1:LTVzPc3p/RzRnkQqLRndbAzjY0d0BCL72A6j3CdL9ZY= github.com/go-openapi/spec v0.21.0/go.mod h1:78u6VdPw81XU44qEWGhtr982gJ5BWg2c0I5XwVMotYk= @@ -107,8 +95,6 @@ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaU github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= -github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= @@ -168,7 +154,6 @@ github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTw github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -247,7 +232,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= @@ -263,7 +247,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -276,7 +259,6 @@ golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= diff --git a/server/config.go b/server/config.go index 7cc1bb75..18ef7e8f 100644 --- a/server/config.go +++ b/server/config.go @@ -20,7 +20,6 @@ import ( "github.com/functionstream/function-stream/common" "github.com/pkg/errors" "github.com/spf13/viper" - "k8s.io/utils/set" "log/slog" "os" "strings" @@ -52,7 +51,7 @@ func init() { viper.SetDefault("ListenAddr", "7300") } -func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig, supportedTypes set.Set[string]) error { +func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig) error { for name, factory := range m { if ref := factory.Ref; ref != nil && *ref != "" { referred, ok := m[strings.ToLower(*ref)] @@ -70,9 +69,6 @@ func preprocessFactoriesConfig(n string, m map[string]*FactoryConfig, supportedT if factory.Type == nil { return errors.Errorf("%s factory %s has no type", n, name) } - if !supportedTypes.Has(strings.ToLower(*factory.Type)) { - return errors.Errorf("%s factory %s has unsupported type %s", n, name, *factory.Type) - } } return nil } @@ -81,11 +77,11 @@ func (c *Config) preprocessConfig() error { if c.ListenAddr == "" { return errors.New("ListenAddr shouldn't be empty") } - err := preprocessFactoriesConfig("Tube", c.TubeFactory, set.New[string](common.PulsarTubeType, common.MemoryTubeType)) + err := preprocessFactoriesConfig("Tube", c.TubeFactory) if err != nil { return err } - return preprocessFactoriesConfig("Runtime", c.RuntimeFactory, set.New[string](WASMRuntime, GRPCRuntime)) + return preprocessFactoriesConfig("Runtime", c.RuntimeFactory) } func loadConfig() (*Config, error) { diff --git a/server/server.go b/server/server.go index 8249f77a..f86866c7 100644 --- a/server/server.go +++ b/server/server.go @@ -37,6 +37,11 @@ import ( "time" ) +var ( + ErrUnsupportedTRuntimeType = errors.New("unsupported runtime type") + ErrUnsupportedTubeType = errors.New("unsupported tube type") +) + type Server struct { options *serverOptions httpSvr atomic.Pointer[http.Server] @@ -44,10 +49,15 @@ type Server struct { Manager *fs.FunctionManager } +type TubeLoaderType func(c *FactoryConfig) (contube.TubeFactory, error) +type RuntimeLoaderType func(c *FactoryConfig) (api.FunctionRuntimeFactory, error) + type serverOptions struct { - httpListener net.Listener - managerOpts []fs.ManagerOption - httpTubeFact *contube.HttpTubeFactory + httpListener net.Listener + managerOpts []fs.ManagerOption + httpTubeFact *contube.HttpTubeFactory + tubeLoader TubeLoaderType + runtimeLoader RuntimeLoaderType } type ServerOption interface { @@ -86,6 +96,24 @@ func WithHttpTubeFactory(factory *contube.HttpTubeFactory) ServerOption { }) } +// WithTubeLoader sets the loader for the tube factory. +// This must be called before WithConfig. +func WithTubeLoader(loader TubeLoaderType) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + o.tubeLoader = loader + return o, nil + }) +} + +// WithRuntimeLoader sets the loader for the runtime factory. +// This must be called before WithConfig. +func WithRuntimeLoader(loader RuntimeLoaderType) ServerOption { + return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { + o.runtimeLoader = loader + return o, nil + }) +} + func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[string]) (string, error) { if visited.Has(name) { return "", errors.Errorf("circular reference of factory %s", name) @@ -101,7 +129,7 @@ func getRefFactory(m map[string]*FactoryConfig, name string, visited set.Set[str return name, nil } -func initFactories[T any](m map[string]*FactoryConfig, newFactory func(n string, c *FactoryConfig) (T, error), setup func(n string, f T)) error { +func initFactories[T any](m map[string]*FactoryConfig, newFactory func(c *FactoryConfig) (T, error), setup func(n string, f T)) error { factoryMap := make(map[string]T) for name := range m { @@ -114,7 +142,10 @@ func initFactories[T any](m map[string]*FactoryConfig, newFactory func(n string, if !exist { return errors.Errorf("factory %s not found, which the factory %s is pointed to", refName, name) } - f, err := newFactory(refName, fc) + if fc.Type == nil { + return errors.Errorf("factory %s type is not set", refName) + } + f, err := newFactory(fc) if err != nil { return err } @@ -126,6 +157,24 @@ func initFactories[T any](m map[string]*FactoryConfig, newFactory func(n string, return nil } +func DefaultTubeLoader(c *FactoryConfig) (contube.TubeFactory, error) { + switch strings.ToLower(*c.Type) { + case common.PulsarTubeType: + return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(*c.Config)) + case common.MemoryTubeType: + return contube.NewMemoryQueueFactory(context.Background()), nil + } + return nil, errors.WithMessagef(ErrUnsupportedTubeType, "unsupported tube type :%s", *c.Type) +} + +func DefaultRuntimeLoader(c *FactoryConfig) (api.FunctionRuntimeFactory, error) { + switch strings.ToLower(*c.Type) { + case WASMRuntime: + return wazero.NewWazeroFunctionRuntimeFactory(), nil + } + return nil, errors.WithMessagef(ErrUnsupportedTRuntimeType, "unsupported runtime type: %s", *c.Type) +} + func WithConfig(config *Config) ServerOption { return serverOptionFunc(func(o *serverOptions) (*serverOptions, error) { ln, err := net.Listen("tcp", config.ListenAddr) @@ -133,36 +182,15 @@ func WithConfig(config *Config) ServerOption { return nil, err } o.httpListener = ln - err = initFactories[contube.TubeFactory](config.TubeFactory, func(n string, c *FactoryConfig) (contube.TubeFactory, error) { - if c.Type == nil { - return nil, errors.Errorf("tube factory %s type is not set", n) - } - switch strings.ToLower(*c.Type) { - case common.PulsarTubeType: - return contube.NewPulsarEventQueueFactory(context.Background(), contube.ConfigMap(*c.Config)) - case common.MemoryTubeType: - return contube.NewMemoryQueueFactory(context.Background()), nil - } - return nil, errors.Errorf("unsupported tube type %s", *c.Type) - }, func(n string, f contube.TubeFactory) { + err = initFactories[contube.TubeFactory](config.TubeFactory, o.tubeLoader, func(n string, f contube.TubeFactory) { o.managerOpts = append(o.managerOpts, fs.WithTubeFactory(n, f)) }) if err != nil { return nil, err } - err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory, - func(n string, c *FactoryConfig) (api.FunctionRuntimeFactory, error) { - if c.Type == nil { - return nil, errors.Errorf("runtime factory %s type is not set", n) - } - switch strings.ToLower(*c.Type) { - case WASMRuntime: - return wazero.NewWazeroFunctionRuntimeFactory(), nil - } - return nil, errors.Errorf("unsupported runtime type %s", *c.Type) - }, func(n string, f api.FunctionRuntimeFactory) { - o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f)) - }) + err = initFactories[api.FunctionRuntimeFactory](config.RuntimeFactory, o.runtimeLoader, func(n string, f api.FunctionRuntimeFactory) { + o.managerOpts = append(o.managerOpts, fs.WithRuntimeFactory(n, f)) + }) if err != nil { return nil, err } @@ -178,6 +206,8 @@ func NewServer(opts ...ServerOption) (*Server, error) { fs.WithTubeFactory("http", httpTubeFact), } options.httpTubeFact = httpTubeFact + options.tubeLoader = DefaultTubeLoader + options.runtimeLoader = DefaultRuntimeLoader for _, o := range opts { if o == nil { continue