Skip to content

Commit

Permalink
add producer wamp_rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
efremropelato committed Nov 11, 2024
2 parents 659dfd6 + 119c678 commit 4bb2383
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/cmd/producerList.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var producerListCmd = &cobra.Command{
fmt.Printf("%sWASM Function%s (--output = wasm)\n", Green, Reset)
fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset)
fmt.Printf("%sWAMP Topic%s (--output = wamp)\n", Green, Reset)
fmt.Printf("%sWAMP RPC%s (--output = wamprpc)\n", Green, Reset)
fmt.Println()

},
Expand Down
4 changes: 3 additions & 1 deletion pkg/cmd/templateRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ jr template run --template "{{name}}"
configuration.GlobalCfg.WASMConfig, _ = cmd.Flags().GetString(f.Name)
case "wampConfig":
configuration.GlobalCfg.WAMPConfig, _ = cmd.Flags().GetString(f.Name)
case "wampRpcConfig":
configuration.GlobalCfg.WAMPRPCConfig, _ = cmd.Flags().GetString(f.Name)
}
}
})
Expand Down Expand Up @@ -200,6 +202,6 @@ func init() {
templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration")
templateRunCmd.Flags().String("luascriptConfig", "", "LUA Script configuration")
templateRunCmd.Flags().String("wasmConfig", "", "WASM configuration")
templateRunCmd.Flags().String("wampConfig", "", "WAMP configuration")
templateRunCmd.Flags().String("wampRpcConfig", "", "WAMP-RPC configuration")

}
1 change: 1 addition & 0 deletions pkg/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type GlobalConfiguration struct {
LUAScriptConfig string
WASMConfig string
WAMPConfig string
WAMPRPCConfig string
Url string
EmbeddedTemplate bool
FileNameTemplate bool
Expand Down
12 changes: 12 additions & 0 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/jrnd-io/jr/pkg/producers/s3"
"github.com/jrnd-io/jr/pkg/producers/server"
"github.com/jrnd-io/jr/pkg/producers/wamp"
"github.com/jrnd-io/jr/pkg/producers/wamprpc"
"github.com/jrnd-io/jr/pkg/tpl"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -182,6 +183,10 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi
e.Producer = createWAMPProducer(ctx, conf.WAMPConfig)
return
}
if e.Output == "wamprpc" {
e.Producer = createWAMPRPCProducer(ctx, conf.WAMPRPCConfig)
return
}

}

Expand Down Expand Up @@ -297,6 +302,13 @@ func createWAMPProducer(ctx context.Context, config string) Producer {
return producer
}

func createWAMPRPCProducer(ctx context.Context, config string) Producer {
producer := &wamprpc.Producer{}
producer.Initialize(ctx, config)

return producer
}

func createKafkaProducer(ctx context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.Manager {

kManager := &kafka.Manager{
Expand Down
10 changes: 10 additions & 0 deletions pkg/producers/wamprpc/config.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"wamp_uri": "ws://localhost:9009/ws",
"username": "admin",
"password": "password",
"realm": "realm1",
"procedure": "example",
"serType": "json",
"compress": true,
"authid": "clientJR"
}
100 changes: 100 additions & 0 deletions pkg/producers/wamprpc/wampRPCProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package wamprpc

import (
"context"
"encoding/json"
"os"

"github.com/gammazero/nexus/v3/client"
"github.com/gammazero/nexus/v3/wamp"
"github.com/rs/zerolog/log"
)

type Config struct {
WampURI string `json:"wamp_uri"`
Username string `json:"username"`
Password string `json:"password"`
Realm string `json:"realm"`
Procedure string `json:"procedure"`
SerType string `json:"serType"`
Compress bool `json:"compress"`
Authid string `json:"authid"`
}

type Producer struct {
client client.Client
realm string
procedure string
authid string
}

func (p *Producer) Initialize(ctx context.Context, configFile string) {
var config Config
file, err := os.ReadFile(configFile)
if err != nil {
log.Fatal().Err(err).Msg("Failed to read configuration file")
}
err = json.Unmarshal(file, &config)
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse configuration parameters")
}
var wampclient *client.Client

// Get requested serialization.
serialization := client.JSON
switch config.SerType {
case "json":
case "msgpack":
serialization = client.MSGPACK
case "cbor":
serialization = client.CBOR
default:
log.Fatal().Err(err).Msg("Invalid serialization, muse be one of: json, msgpack, cbor")
}

cfg := client.Config{
Realm: config.Realm,
Serialization: serialization,
HelloDetails: wamp.Dict{
"authid": config.Authid,
},
}

if config.Compress {
cfg.WsCfg.EnableCompression = true
}

addr := config.WampURI

wampclient, err = client.ConnectNet(context.Background(), addr, cfg)
if err != nil {
log.Fatal().Err(err).Msg("Can't connect to WAMP Router")
}
// defer wampclient.Close()

p.realm = config.Realm
p.procedure = config.Procedure
p.authid = config.Authid

p.client = *wampclient
}

func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) {
data := string(v)
args := wamp.List{data}
opts := wamp.Dict{
"authid": p.authid,
}
_, err := p.client.Call(ctx, p.procedure, opts, args, nil, nil)
if err != nil {
log.Fatal().Err(err).Msgf("call error: %s", err)
}
}

func (p *Producer) Close(ctx context.Context) error {
err := p.client.Close()
if err != nil {
log.Warn().Err(err).Msg("Failed to close WAMP connection")
}
return err
}
58 changes: 58 additions & 0 deletions pkg/producers/wamprpc/wampRPCProducer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//go:build exclude

package wamprpc

import (
"context"
"testing"

"github.com/jrnd-io/jr/pkg/producers/wamprpc"
)

func TestProducer_Initialize(t *testing.T) {
configFile := "config.json.example"

producer, err := wamprpc.ProducerFactory("wamprpc")
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
err = producer.Initialize(configFile)
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
}

func TestProducer_Close(t *testing.T) {
configFile := "config.json.example"

producer, err := wamprpc.ProducerFactory("wamprpc")
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
err = producer.Initialize(configFile)
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}

producer.Close()
}

func TestProducer_Produce(t *testing.T) {
configFile := "config.json.example"

producer, err := wamprpc.ProducerFactory("wamprpc")
if err != nil {
t.Fatalf("Error reading configuration file: %v", err)
}
err = producer.Initialize(configFile)
if err != nil {
t.Fatalf("Error initializing producer: %v", err)
}

ctx := context.Background()
key := "loo"
val := "foo"
exp := 0
producer.Produce(ctx, key, val, exp)

}

0 comments on commit 4bb2383

Please sign in to comment.