diff --git a/.env.example b/.env.example index c7b3608..af8f392 100644 --- a/.env.example +++ b/.env.example @@ -24,5 +24,20 @@ SQLITE_FILEPATH=database/golang-api.db JWT_SECRET=ThisIsKey # Message queue config +MQ_TRACK=true +MQ_DEBUG=true +DEAD_LETTER_QUEUE=dead_queue + +# Rabbitmq MQ_DIALECT=amqp -AMQB_URI=amqp://guest:guest@localhost:5672/ \ No newline at end of file +AMQB_URI=amqp://guest:guest@localhost:5672/ + +# Redis +# MQ_DIALECT=redis +# REDIS_HOST= +# CONSUMER_GROUP= + +# kafka +# MQ_DIALECT=kafka +# KAFKA_BROKER= +# CONSUMER_GROUP= diff --git a/cli/api.go b/cli/api.go index c782f1b..2216b3c 100644 --- a/cli/api.go +++ b/cli/api.go @@ -44,7 +44,7 @@ func GetAPICommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command { return err } - pub, err := watermill.InitSender(cfg) + pub, err := watermill.InitPubliser(cfg) if err != nil { return err } diff --git a/cli/dead_letter_queue.go b/cli/dead_letter_queue.go new file mode 100644 index 0000000..b238ebd --- /dev/null +++ b/cli/dead_letter_queue.go @@ -0,0 +1,45 @@ +package cli + +import ( + "fmt" + + "go.uber.org/zap" + + "github.com/Improwised/golang-api/config" + "github.com/Improwised/golang-api/pkg/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/spf13/cobra" +) + +// GetAPICommandDef runs app +// ? need rename in file name or command name +func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command { + + workerCommand := cobra.Command{ + Use: "dead-letter-queue", + Short: "To start dead-letter queue", + Long: `This queue is used to store failed job from all worker`, + RunE: func(cmd *cobra.Command, args []string) error { + + // Init worker + subscriber, err := watermill.InitSubscriber(cfg) + if err != nil { + return err + } + + // run worker with topic(queue name) and process function + if err := subscriber.Run(cfg.MQ.DeadQueue, HandleFailJob); err != nil { + return err + } + + return nil + }, + } + return workerCommand +} + +func HandleFailJob(msg *message.Message) error { + fmt.Println("failed job", string(msg.Payload)) + // process here + return nil +} diff --git a/cli/main.go b/cli/main.go index f55407b..f871a25 100644 --- a/cli/main.go +++ b/cli/main.go @@ -10,12 +10,13 @@ import ( func Init(cfg config.AppConfig, logger *zap.Logger) error { migrationCmd := GetMigrationCommandDef(cfg) apiCmd := GetAPICommandDef(cfg, logger) - workerCmd:=GetWorkerCommandDef(cfg, logger) - workerCmd.PersistentFlags().String("topic", "demo", "Topic to subscribe") - workerCmd.PersistentFlags().Int("delay", 100, "time intertval to retry") - workerCmd.PersistentFlags().Int("retry-count", 3, "Number of retry") + workerCmd.PersistentFlags().Int("retry-delay", 100, "time intertval for two retry in ms") + workerCmd.PersistentFlags().Int("retry-count", 3, "number of retry") + workerCmd.PersistentFlags().String("topic", "", "topic name(queue name)") + + d:=GetDeadQueueCommandDef(cfg, logger) rootCmd := &cobra.Command{Use: "golang-api"} - rootCmd.AddCommand(&migrationCmd, &apiCmd, &workerCmd) + rootCmd.AddCommand(&migrationCmd, &apiCmd, &workerCmd,&d) return rootCmd.Execute() } diff --git a/cli/worker.go b/cli/worker.go index 2ce21a7..55196b0 100644 --- a/cli/worker.go +++ b/cli/worker.go @@ -3,9 +3,9 @@ package cli import ( "go.uber.org/zap" + "github.com/Improwised/golang-api/cli/workers" "github.com/Improwised/golang-api/config" "github.com/Improwised/golang-api/pkg/watermill" - "github.com/Improwised/golang-api/cli/worker" "github.com/spf13/cobra" ) @@ -18,42 +18,39 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command Short: "To start worker", Long: `To start worker`, RunE: func(cmd *cobra.Command, args []string) error { - - // Init worker - subscriber, err := watermill.InitWorker(cfg) + // Get details name from flag + topic, err := cmd.Flags().GetString("topic") if err != nil { return err } - // get topic from flag - topic, err := cmd.Flags().GetString("topic") + retryCount, err := cmd.Flags().GetInt("retry-count") if err != nil { return err } - // get retry count from flag - retryCount, err := cmd.Flags().GetInt("retry-count") + delay, err := cmd.Flags().GetInt("retry-delay") if err != nil { return err } - // get delay from flag - delay, err := cmd.Flags().GetInt("delay") + // Init worker + subscriber, err := watermill.InitSubscriber(cfg) if err != nil { return err } - // Init router for add middleware,retry count,etc + // init router for add middleware,retry count,etc router, err := subscriber.InitRouter(cfg, delay, retryCount) if err != nil { return err } - err = router.Run(topic, worker.Process) + // run worker with topic(queue name) and process function + err = router.Run(topic, workers.Process) return err }, } - return workerCommand } diff --git a/cli/worker/gob_register.go b/cli/worker/gob_register.go deleted file mode 100644 index 2cdcca5..0000000 --- a/cli/worker/gob_register.go +++ /dev/null @@ -1,11 +0,0 @@ -package worker - -import ( - "encoding/gob" -) - -func GobRegister() { - gob.Register(UpdateUser{}) - gob.Register(DeleteUser{}) - gob.Register(AddUser{}) -} diff --git a/cli/worker/struct.go b/cli/worker/struct.go deleted file mode 100644 index cbb89f1..0000000 --- a/cli/worker/struct.go +++ /dev/null @@ -1,33 +0,0 @@ -package worker - -type UpdateUser struct { - ID string `json:"id"` - FirstName string `json:"first_name" ` - LastName string `json:"last_name"` - Email string `json:"email"` - Password string `json:"-"` - Roles string `json:"roles"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` -} -type DeleteUser struct { - ID string `json:"id"` - FirstName string `json:"first_name" ` - LastName string `json:"last_name"` - Email string `json:"email"` - Password string `json:"-"` - Roles string `json:"roles"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` -} -type AddUser struct { - ID string `json:"id"` - FirstName string `json:"first_name" ` - LastName string `json:"last_name"` - Email string `json:"email"` - Password string `json:"-"` - Roles string `json:"roles"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` -} - diff --git a/cli/worker/worker.go b/cli/worker/worker.go deleted file mode 100644 index b7de382..0000000 --- a/cli/worker/worker.go +++ /dev/null @@ -1,112 +0,0 @@ -package worker - -import ( - "bytes" - "encoding/gob" - "fmt" - "go/ast" - "go/parser" - "go/token" - "os" - "path/filepath" - "runtime" - "text/template" - - "github.com/ThreeDotsLabs/watermill/message" -) - -type Handler interface { - Handle(data []byte) -} - -func (w DeleteUser) Handle(data []byte) { - fmt.Println("Delete User") - fmt.Println(w) -} -func (w UpdateUser) Handle(data []byte) { - fmt.Println("Update User") - fmt.Println(w) -} - -func (w AddUser) Handle(data []byte) { - fmt.Println("add user") - fmt.Println(w) -} - -func Process(msg *message.Message) error { - GobRegister() - buf := bytes.NewBuffer(msg.Payload) - dec := gob.NewDecoder(buf) - - var result Handler - err := dec.Decode(&result) - if err != nil { - fmt.Println("Error in decoding") - return err - } - result.Handle(msg.Payload) - return nil -} - -func init() { - GenerateGobCode() -} - -// ------------------------------------------------------------------------------------- -// generate gob_register.go file - -const outputTemplate = `package {{.PackageName}} -import ( - "encoding/gob" - "fmt" -) -func GobRegister() { - {{range .StructNames}}gob.Register({{.}}{}) - fmt.Println("Gob Register") - {{end}} -} -` - -func GenerateGobCode() { - fset := token.NewFileSet() - _, filename, _, ok := runtime.Caller(0) - if !ok { - fmt.Println("cannot get current file") - } - dir := filepath.Dir(filename) - node, err := parser.ParseFile(fset, dir+"/struct.go", nil, parser.ParseComments) - if err != nil { - fmt.Println(err) - } - - structNames := []string{} - ast.Inspect(node, func(n ast.Node) bool { - typeSpec, ok := n.(*ast.TypeSpec) - if !ok { - return true - } - - _, ok = typeSpec.Type.(*ast.StructType) - if !ok { - return true - } - - structNames = append(structNames, typeSpec.Name.Name) - return false - }) - - outputFile, err := os.Create(dir + "/gob_register.go") - if err != nil { - fmt.Println(err) - - } - - defer outputFile.Close() - - t := template.Must(template.New("").Parse(outputTemplate)) - t.Execute(outputFile, map[string]interface{}{ - "PackageName": node.Name.Name, - "StructNames": structNames, - }) - -} diff --git a/cli/workers/user.go b/cli/workers/user.go new file mode 100644 index 0000000..8006d3b --- /dev/null +++ b/cli/workers/user.go @@ -0,0 +1,32 @@ +package workers + +import ( + "log" + + helpers "github.com/Improwised/golang-api/helpers/smtp" +) + +type WelcomeMail struct { + FirstName string `json:"first_name" ` + LastName string `json:"last_name"` + Email string `json:"email"` + Roles string `json:"roles"` +} + +func (w WelcomeMail) Handle() error { + log.Println("sending mail") + + smtp := helpers.NewSMTPHelper("sandbox.smtp.mailtrap.io", "2525", "7628fa366c0257c", "2afb7200812272") + smtp.SetSubject("welocme") + smtp.SetPlainBody([]byte("welcome to our org")) + + smtp.SetSender("chintansakhiya00001@gmail.com") + smtp.SetReceivers([]string{"chintansakhiya00001@gmail.com"}) + + if err := smtp.SendMail(); err != nil { + return err + } + + log.Printf("mail send to %v", w.Email) + return nil +} diff --git a/cli/workers/worker_handler.go b/cli/workers/worker_handler.go new file mode 100644 index 0000000..67ae2a5 --- /dev/null +++ b/cli/workers/worker_handler.go @@ -0,0 +1,44 @@ +package workers + +import ( + "bytes" + "encoding/gob" + + "github.com/ThreeDotsLabs/watermill/message" +) + +func init() { + for _, v := range ListStruct() { + gob.Register(v) + } +} + +// Register all worker struct here befour run worker for proper unmarshalling +func ListStruct() []interface{} { + return []interface{}{ + WelcomeMail{}, + // ... + } +} + +// Handler interface for all worker struct +type Handler interface { + Handle() error +} + +// process all worker struct and call Handle function according to struct +func Process(msg *message.Message) error { + buf := bytes.NewBuffer(msg.Payload) + dec := gob.NewDecoder(buf) + + var result Handler + err := dec.Decode(&result) + if err != nil { + return err + } + if err := result.Handle(); err != nil { + return err + } + msg.Ack() + return nil +} diff --git a/config/main.go b/config/main.go index 6376c59..80701aa 100644 --- a/config/main.go +++ b/config/main.go @@ -19,11 +19,8 @@ type AppConfig struct { Env string `envconfig:"APP_ENV"` Port string `envconfig:"APP_PORT"` Secret string `envconfig:"JWT_SECRET"` - MQDialect string `envconfig:"MQ_DIALECT"` DB DBConfig - AMQB AmqpConfig - Redis RedisConfig - Kafka KafkaConfig + MQ MQConfig } // GetConfig Collects all configs diff --git a/config/mq.go b/config/mq.go index f5b81f6..fb852f7 100644 --- a/config/mq.go +++ b/config/mq.go @@ -1,7 +1,17 @@ package config +// TODO: add env to .env.example +type MQConfig struct { + Dialect string `envconfig:"MQ_DIALECT"` + Debug bool `envconfig:"MQ_DEBUG"` + Track bool `envconfig:"MQ_TRACK"` + DeadQueue string `envconfig:"DEAD_LETTER_QUEUE"` + Redis RedisConfig + Amqp AmqpConfig + Kafka KafkaConfig +} type RedisConfig struct { - RedisUrl string `envconfig:"REDIS_URI"` + RedisUrl string `envconfig:"REDIS_URI"` ConsumerGroup string `envconfig:"CONSUMER_GROUP"` } @@ -9,6 +19,6 @@ type AmqpConfig struct { AmqbUrl string `envconfig:"AMQB_URI"` } type KafkaConfig struct { - KafkaBroker []string `envconfig:"KAFKA_BROKER"` - ConsumerGroup string `envconfig:"CONSUMER_GROUP"` + KafkaBroker []string `envconfig:"KAFKA_BROKER"` + ConsumerGroup string `envconfig:"CONSUMER_GROUP"` } diff --git a/controllers/api/v1/user_controller.go b/controllers/api/v1/user_controller.go index 7af4ab0..512cdd4 100644 --- a/controllers/api/v1/user_controller.go +++ b/controllers/api/v1/user_controller.go @@ -1,14 +1,11 @@ package v1 import ( - "bytes" "database/sql" - "encoding/gob" "encoding/json" - "log" "net/http" - "github.com/Improwised/golang-api/cli/worker" + "github.com/Improwised/golang-api/cli/workers" "github.com/Improwised/golang-api/constants" "github.com/Improwised/golang-api/models" "github.com/Improwised/golang-api/pkg/events" @@ -103,37 +100,19 @@ func (ctrl *UserController) CreateUser(c *fiber.Ctx) error { if err != nil { return utils.JSONFail(c, http.StatusBadRequest, utils.ValidatorErrorString(err)) } - // ----------------------------------------------------------------------------------------- - // send message to worker - var network bytes.Buffer - enc := gob.NewEncoder(&network) - var message worker.Handler - - myEvent := worker.DeleteUser{FirstName: userReq.FirstName, LastName: userReq.LastName, Email: userReq.Email, Password: userReq.Password, Roles: userReq.Roles} - registerStructDynamically(myEvent) - - message = myEvent - err = enc.Encode(&message) - if err != nil { - log.Fatal("encode: niche", err) - } - - err = ctrl.pub.PublishMessages("worker.User", "user", network.Bytes()) - if err != nil { - return utils.JSONFail(c, http.StatusBadRequest, err.Error()) - } - // ----------------------------------------------------------------------------------------- + user, err := ctrl.userService.RegisterUser(models.User{FirstName: userReq.FirstName, LastName: userReq.LastName, Email: userReq.Email, Password: userReq.Password, Roles: userReq.Roles}, ctrl.event) if err != nil { ctrl.logger.Error("error while insert user", zap.Error(err)) return utils.JSONError(c, http.StatusInternalServerError, constants.ErrInsertUser) } - + + // publish job to queue + myEvent := workers.WelcomeMail{FirstName: userReq.FirstName, LastName: userReq.LastName, Email: userReq.Email, Roles: userReq.Roles} + err = ctrl.pub.Publish("user", myEvent) + if err != nil { + return utils.JSONError(c, http.StatusInternalServerError, constants.ErrInsertUser) + } + return utils.JSONSuccess(c, http.StatusCreated, user) } - -func registerStructDynamically(structValue interface{}) { - // t := reflect.TypeOf(structValue) - // gob.Register(t) - gob.Register(structValue) -} diff --git a/pkg/watermill/publisher.go b/pkg/watermill/publisher.go index 6f172e5..de29ea0 100644 --- a/pkg/watermill/publisher.go +++ b/pkg/watermill/publisher.go @@ -1,6 +1,12 @@ package watermill import ( + "bytes" + "encoding/gob" + "fmt" + "log" + + "github.com/Improwised/golang-api/cli/workers" "github.com/Improwised/golang-api/config" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill-amqp/v2/pkg/amqp" @@ -11,53 +17,83 @@ import ( "github.com/redis/go-redis/v9" ) -// GetAPICommandDef runs app type WatermillPubliser struct { publisher message.Publisher } -func InitSender(cfg config.AppConfig) (*WatermillPubliser, error) { - switch cfg.MQDialect { +func InitPubliser(cfg config.AppConfig) (*WatermillPubliser, error) { + logger = watermill.NewStdLogger(cfg.MQ.Debug, cfg.MQ.Track) + switch cfg.MQ.Dialect { case "amqp": - amqpConfig := amqp.NewDurableQueueConfig(cfg.AMQB.AmqbUrl) - publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false)) - return &WatermillPubliser{publisher: publisher}, err + return initAmqpPub(cfg) case "redis": - pubClient := redis.NewClient(&redis.Options{ - Addr: cfg.Redis.RedisUrl, - }) - publisher, err := redisstream.NewPublisher( - redisstream.PublisherConfig{ - Client: pubClient, - Marshaller: redisstream.DefaultMarshallerUnmarshaller{}, - }, - watermill.NewStdLogger(false, false), - ) - - return &WatermillPubliser{publisher: publisher}, err + return initRedisPub(cfg) case "kafka": - publisher, err := kafka.NewPublisher( - kafka.PublisherConfig{ - Brokers: cfg.Kafka.KafkaBroker, - Marshaler: kafka.DefaultMarshaler{}, - OverwriteSaramaConfig: kafka.DefaultSaramaSyncPublisherConfig(), - }, - watermill.NewStdLogger(false, false), - ) - return &WatermillPubliser{publisher: publisher}, err + return initKafkaPub(cfg) default: return &WatermillPubliser{}, nil } } -func (wp *WatermillPubliser) PublishMessages(key,topic string, data []byte) error { - msg := message.NewMessage(watermill.NewUUID(), data) - msg.Metadata.Set("Key", key) + +// send message to queue using topic name +// +// struct must from worker package(/cli/workers) +func (wp *WatermillPubliser) Publish(topic string, data interface{}) error { + var network bytes.Buffer + enc := gob.NewEncoder(&network) + var handle workers.Handler + + handle, ok := data.(workers.Handler) + if !ok { + return fmt.Errorf("data is not of type workers.Handler") + } + + err := enc.Encode(&handle) + if err != nil { + log.Fatal("encode: niche", err) + } + msg := message.NewMessage(watermill.NewUUID(), network.Bytes()) + if err := wp.publisher.Publish(topic, msg); err != nil { return err } return nil } + +func initAmqpPub(cfg config.AppConfig) (*WatermillPubliser, error) { + amqpConfig := amqp.NewDurableQueueConfig(cfg.MQ.Amqp.AmqbUrl) + publisher, err := amqp.NewPublisher(amqpConfig, logger) + return &WatermillPubliser{publisher: publisher}, err +} + +// TODO: username/pass +func initRedisPub(cfg config.AppConfig) (*WatermillPubliser, error) { + pubClient := redis.NewClient(&redis.Options{ + Addr: cfg.MQ.Redis.RedisUrl, + }) + publisher, err := redisstream.NewPublisher( + redisstream.PublisherConfig{ + Client: pubClient, + Marshaller: redisstream.DefaultMarshallerUnmarshaller{}, + }, + logger, + ) + return &WatermillPubliser{publisher: publisher}, err +} + +func initKafkaPub(cfg config.AppConfig) (*WatermillPubliser, error) { + publisher, err := kafka.NewPublisher( + kafka.PublisherConfig{ + Brokers: cfg.MQ.Kafka.KafkaBroker, + Marshaler: kafka.DefaultMarshaler{}, + OverwriteSaramaConfig: kafka.DefaultSaramaSyncPublisherConfig(), + }, + logger, + ) + return &WatermillPubliser{publisher: publisher}, err +} + diff --git a/pkg/watermill/subscriber.go b/pkg/watermill/subscriber.go index fe1d886..34f114e 100644 --- a/pkg/watermill/subscriber.go +++ b/pkg/watermill/subscriber.go @@ -2,6 +2,7 @@ package watermill import ( "context" + "fmt" "time" "github.com/Improwised/golang-api/config" @@ -18,70 +19,45 @@ import ( "github.com/redis/go-redis/v9" ) -var logger = watermill.NewStdLogger(true, false) +var logger watermill.LoggerAdapter type WatermillSubscriber struct { Subscriber message.Subscriber Router *message.Router } -func InitWorker(cfg config.AppConfig) (*WatermillSubscriber, error) { - switch cfg.MQDialect { +// TODO: for redis and kafka username/password +// TODO: add other dialect +func InitSubscriber(cfg config.AppConfig) (*WatermillSubscriber, error) { + logger = watermill.NewStdLogger(cfg.MQ.Debug, cfg.MQ.Track) + switch cfg.MQ.Dialect { case "amqp": - amqpConfig := amqp.NewDurableQueueConfig(cfg.AMQB.AmqbUrl) - subscriber, err := amqp.NewSubscriber( - amqpConfig, - watermill.NewStdLogger(false, false), - ) - return &WatermillSubscriber{Subscriber: subscriber}, err + return initAmqpSub(cfg) case "redis": - subClient := redis.NewClient(&redis.Options{ - Addr: cfg.Redis.RedisUrl, - }) - subscriber, err := redisstream.NewSubscriber( - redisstream.SubscriberConfig{ - Client: subClient, - Unmarshaller: redisstream.DefaultMarshallerUnmarshaller{}, - ConsumerGroup: cfg.Redis.ConsumerGroup, - }, - watermill.NewStdLogger(false, false), - ) - return &WatermillSubscriber{Subscriber: subscriber}, err + return initRedisSub(cfg) case "kafka": - saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig() - saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest - subscriber, err := kafka.NewSubscriber( - kafka.SubscriberConfig{ - Brokers: cfg.Kafka.KafkaBroker, - Unmarshaler: kafka.DefaultMarshaler{}, - OverwriteSaramaConfig: saramaSubscriberConfig, - ConsumerGroup: cfg.Kafka.ConsumerGroup, - }, - watermill.NewStdLogger(false, false), - ) - if err != nil { - return nil, err - } - return &WatermillSubscriber{Subscriber: subscriber}, err + return initKafkaSub(cfg) + default: return nil, nil } } +// InitRouter init router for add middleware,retry count,delay etc func (ws *WatermillSubscriber) InitRouter(cfg config.AppConfig, delayTime, MaxRetry int) (*WatermillSubscriber, error) { router, err := message.NewRouter(message.RouterConfig{}, logger) if err != nil { return nil, err } - pub, err := InitSender(cfg) + pub, err := InitPubliser(cfg) if err != nil { return nil, err } - poq, err := middleware.PoisonQueue(pub.publisher, "poison_queue") + poq, err := middleware.PoisonQueue(pub.publisher, cfg.MQ.DeadQueue) if err != nil { return nil, err } @@ -105,8 +81,20 @@ func (ws *WatermillSubscriber) InitRouter(cfg config.AppConfig, delayTime, MaxRe } func (ws *WatermillSubscriber) Run(topic string, handlerFunc message.NoPublishHandlerFunc) error { + if ws.Subscriber == nil { + return fmt.Errorf("subscriber is nil") + } + + if ws.Router == nil { + router, err := message.NewRouter(message.RouterConfig{}, logger) + if err != nil { + return err + } + ws.Router = router + } + ws.Router.AddNoPublisherHandler( - "counter", + "handler", topic, ws.Subscriber, handlerFunc, @@ -115,3 +103,47 @@ func (ws *WatermillSubscriber) Run(topic string, handlerFunc message.NoPublishHa err := ws.Router.Run(context.Background()) return err } + +func initAmqpSub(cfg config.AppConfig) (*WatermillSubscriber, error) { + amqpConfig := amqp.NewDurableQueueConfig(cfg.MQ.Amqp.AmqbUrl) + subscriber, err := amqp.NewSubscriber( + amqpConfig, + logger, + ) + return &WatermillSubscriber{Subscriber: subscriber}, err +} + +func initKafkaSub(cfg config.AppConfig) (*WatermillSubscriber, error) { + saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig() + saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest + subscriber, err := kafka.NewSubscriber( + kafka.SubscriberConfig{ + Brokers: cfg.MQ.Kafka.KafkaBroker, + Unmarshaler: kafka.DefaultMarshaler{}, + OverwriteSaramaConfig: saramaSubscriberConfig, + ConsumerGroup: cfg.MQ.Kafka.ConsumerGroup, + }, + logger, + ) + if err != nil { + return nil, err + } + return &WatermillSubscriber{Subscriber: subscriber}, err +} + +func initRedisSub(cfg config.AppConfig) (*WatermillSubscriber, error) { + subClient := redis.NewClient(&redis.Options{ + Addr: cfg.MQ.Redis.RedisUrl, + // Username: , + // password: , + }) + subscriber, err := redisstream.NewSubscriber( + redisstream.SubscriberConfig{ + Client: subClient, + Unmarshaller: redisstream.DefaultMarshallerUnmarshaller{}, + ConsumerGroup: cfg.MQ.Redis.ConsumerGroup, + }, + logger, + ) + return &WatermillSubscriber{Subscriber: subscriber}, err +}