diff --git a/go.mod b/go.mod index 6ab561e7..83635991 100644 --- a/go.mod +++ b/go.mod @@ -91,6 +91,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.5 // indirect github.com/gorilla/securecookie v1.1.2 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/hamba/avro/v2 v2.20.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -133,6 +134,7 @@ require ( github.com/tink-crypto/tink-go-gcpkms/v2 v2.1.0 // indirect github.com/tink-crypto/tink-go-hcvault/v2 v2.1.0 // indirect github.com/tink-crypto/tink-go/v2 v2.1.0 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect @@ -168,6 +170,7 @@ require ( github.com/cnkei/gospline v0.0.0-20191204052713-d67fac29a294 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/gammazero/nexus/v3 v3.2.2 github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/jsonschema v0.12.0 // indirect diff --git a/go.sum b/go.sum index 194c0a0e..7833bf79 100644 --- a/go.sum +++ b/go.sum @@ -229,6 +229,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fvbommel/sortorder v1.0.2 h1:mV4o8B2hKboCdkJm+a7uX/SIpZob4JzUpc5GGnM45eo= github.com/fvbommel/sortorder v1.0.2/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0= +github.com/gammazero/nexus/v3 v3.2.2 h1:uEBe4rKIcbBcbdP6XuyKUhnWBXxT0BnJrecG9+yZSTs= +github.com/gammazero/nexus/v3 v3.2.2/go.mod h1:55oZwPZFgRFCEjpMj1kdzffiPORKKmRsipSY8BeKRvY= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= @@ -661,6 +663,8 @@ github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea h1:SXhTLE6pb6eld/ github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea/go.mod h1:WPnis/6cRcDZSUvVmezrxJPkiO87ThFYsoUiMwWNDJk= github.com/tonistiigi/vt100 v0.0.0-20230623042737-f9a4f7ef6531 h1:Y/M5lygoNPKwVNLMPXgVfsRT40CSFKXCxuU8LoHySjs= github.com/tonistiigi/vt100 v0.0.0-20230623042737-f9a4f7ef6531/go.mod h1:ulncasL3N9uLrVann0m+CDlJKWsIAP34MPcOJF6VRvc= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= +github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/vadv/gopher-lua-libs v0.5.0 h1:m0hhWia1A1U3PIRmtdHWBj88ogzuIjm6HUBmtUa0Tz4= github.com/vadv/gopher-lua-libs v0.5.0/go.mod h1:mlSOxmrjug7DwisiH7xBFnBellHobPbvAIhVeI/4SYY= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= diff --git a/pkg/cmd/producerList.go b/pkg/cmd/producerList.go index d8d6d74a..8b055dec 100644 --- a/pkg/cmd/producerList.go +++ b/pkg/cmd/producerList.go @@ -60,6 +60,7 @@ var producerListCmd = &cobra.Command{ fmt.Printf("%sLUA Script%s (--output = luascript)\n", Green, Reset) fmt.Printf("%sWASM Function%s (--output = wasm)\n", Green, Reset) fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset) + fmt.Printf("%sWAMP Topic%s (--output = wamp)\n", Green, Reset) fmt.Println() }, diff --git a/pkg/cmd/templateRun.go b/pkg/cmd/templateRun.go index 500c6c7e..ce043fbd 100644 --- a/pkg/cmd/templateRun.go +++ b/pkg/cmd/templateRun.go @@ -126,6 +126,8 @@ jr template run --template "{{name}}" configuration.GlobalCfg.LUAScriptConfig, _ = cmd.Flags().GetString(f.Name) case "wasmConfig": configuration.GlobalCfg.WASMConfig, _ = cmd.Flags().GetString(f.Name) + case "wampConfig": + configuration.GlobalCfg.WAMPConfig, _ = cmd.Flags().GetString(f.Name) } } }) @@ -198,5 +200,6 @@ func init() { templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration") templateRunCmd.Flags().String("luascriptConfig", "", "LUA Script configuration") templateRunCmd.Flags().String("wasmConfig", "", "WASM configuration") + templateRunCmd.Flags().String("wampConfig", "", "WAMP configuration") } diff --git a/pkg/configuration/configuration.go b/pkg/configuration/configuration.go index ebf6ebdb..ae3b7f2e 100644 --- a/pkg/configuration/configuration.go +++ b/pkg/configuration/configuration.go @@ -44,6 +44,7 @@ type GlobalConfiguration struct { AWSDynamoDBConfig string LUAScriptConfig string WASMConfig string + WAMPConfig string Url string EmbeddedTemplate bool FileNameTemplate bool diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 3d87a527..4db63da4 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -46,6 +46,7 @@ import ( "github.com/jrnd-io/jr/pkg/producers/redis" "github.com/jrnd-io/jr/pkg/producers/s3" "github.com/jrnd-io/jr/pkg/producers/server" + "github.com/jrnd-io/jr/pkg/producers/wamp" "github.com/jrnd-io/jr/pkg/tpl" "github.com/rs/zerolog/log" ) @@ -177,6 +178,10 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi e.Producer = createWASMProducer(ctx, conf.LUAScriptConfig) return } + if e.Output == "wamp" { + e.Producer = createWAMPProducer(ctx, conf.WAMPConfig) + return + } } @@ -285,6 +290,13 @@ func createWASMProducer(ctx context.Context, config string) Producer { return producer } +func createWAMPProducer(ctx context.Context, config string) Producer { + producer := &wamp.Producer{} + producer.Initialize(ctx, config) + + return producer +} + func createKafkaProducer(ctx context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.Manager { kManager := &kafka.Manager{ diff --git a/pkg/producers/wamp/config.json.example b/pkg/producers/wamp/config.json.example new file mode 100644 index 00000000..215fe6fe --- /dev/null +++ b/pkg/producers/wamp/config.json.example @@ -0,0 +1,10 @@ +{ + "wamp_uri": "ws://localhost:9009/ws", + "username": "admin", + "password": "password", + "realm": "realm1", + "topic": "example.hello", + "serType": "json", + "compress": true, + "authid": "clientJR" +} \ No newline at end of file diff --git a/pkg/producers/wamp/wampProducer.go b/pkg/producers/wamp/wampProducer.go new file mode 100644 index 00000000..6932a7ab --- /dev/null +++ b/pkg/producers/wamp/wampProducer.go @@ -0,0 +1,100 @@ +package wamp + +import ( + "context" + "encoding/json" + "os" + + "github.com/gammazero/nexus/v3/client" + "github.com/gammazero/nexus/v3/wamp" + "github.com/rs/zerolog/log" +) + +type Config struct { + WampURI string `json:"wamp_uri"` + Username string `json:"username"` + Password string `json:"password"` + Realm string `json:"realm"` + Topic string `json:"topic"` + SerType string `json:"serType"` + Compress bool `json:"compress"` + Authid string `json:"authid"` +} + +type Producer struct { + client client.Client + realm string + topic string + authid string +} + +func (p *Producer) Initialize(ctx context.Context, configFile string) { + var config Config + file, err := os.ReadFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("Failed to read configuration file") + } + err = json.Unmarshal(file, &config) + if err != nil { + log.Fatal().Err(err).Msg("Failed to parse configuration parameters") + } + var wampclient *client.Client + + // Get requested serialization. + serialization := client.JSON + switch config.SerType { + case "json": + case "msgpack": + serialization = client.MSGPACK + case "cbor": + serialization = client.CBOR + default: + log.Fatal().Err(err).Msg("Invalid serialization, muse be one of: json, msgpack, cbor") + } + + cfg := client.Config{ + Realm: config.Realm, + Serialization: serialization, + HelloDetails: wamp.Dict{ + "authid": config.Authid, + }, + } + + if config.Compress { + cfg.WsCfg.EnableCompression = true + } + + addr := config.WampURI + + wampclient, err = client.ConnectNet(context.Background(), addr, cfg) + if err != nil { + log.Fatal().Err(err).Msg("Can't connect to WAMP Router") + } + // defer wampclient.Close() + + p.realm = config.Realm + p.topic = config.Topic + p.authid = config.Authid + + p.client = *wampclient +} + +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { + data := string(v) + args := wamp.List{data} + opts := wamp.Dict{ + "authid": p.authid, + } + err := p.client.Publish(p.topic, opts, args, nil) + if err != nil { + log.Fatal().Err(err).Msgf("publish error: %s", err) + } +} + +func (p *Producer) Close(ctx context.Context) error { + err := p.client.Close() + if err != nil { + log.Warn().Err(err).Msg("Failed to close WAMP connection") + } + return err +} diff --git a/pkg/producers/wamp/wampProducer_test.go b/pkg/producers/wamp/wampProducer_test.go new file mode 100644 index 00000000..79ee8cff --- /dev/null +++ b/pkg/producers/wamp/wampProducer_test.go @@ -0,0 +1,58 @@ +//go:build exclude + +package wamp + +import ( + "context" + "testing" + + "github.com/jrnd-io/jr/pkg/producers/wamp" +) + +func TestProducer_Initialize(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamp.ProducerFactory("wamp") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } +} + +func TestProducer_Close(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamp.ProducerFactory("wamp") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + + producer.Close() +} + +func TestProducer_Produce(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamp.ProducerFactory("wamp") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error initializing producer: %v", err) + } + + ctx := context.Background() + key := "loo" + val := "foo" + exp := 0 + producer.Produce(ctx, key, val, exp) + +}