Skip to content

Commit

Permalink
fix: restrucher file and code
Browse files Browse the repository at this point in the history
  • Loading branch information
chintansakhiya committed Jan 24, 2024
1 parent ddcc13a commit 71feca2
Show file tree
Hide file tree
Showing 15 changed files with 315 additions and 283 deletions.
17 changes: 16 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,20 @@ SQLITE_FILEPATH=database/golang-api.db
JWT_SECRET=ThisIsKey

# Message queue config
MQ_TRACK=true
MQ_DEBUG=true
DEAD_LETTER_QUEUE=dead_queue

# Rabbitmq
MQ_DIALECT=amqp
AMQB_URI=amqp://guest:guest@localhost:5672/
AMQB_URI=amqp://guest:guest@localhost:5672/

# Redis
# MQ_DIALECT=redis
# REDIS_HOST=
# CONSUMER_GROUP=

# kafka
# MQ_DIALECT=kafka
# KAFKA_BROKER=
# CONSUMER_GROUP=
2 changes: 1 addition & 1 deletion cli/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func GetAPICommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command {
return err
}

pub, err := watermill.InitSender(cfg)
pub, err := watermill.InitPubliser(cfg)
if err != nil {
return err
}
Expand Down
45 changes: 45 additions & 0 deletions cli/dead_letter_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cli

import (
"fmt"

"go.uber.org/zap"

"github.com/Improwised/golang-api/config"
"github.com/Improwised/golang-api/pkg/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/spf13/cobra"
)

// GetAPICommandDef runs app
// ? need rename in file name or command name
func GetDeadQueueCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command {

workerCommand := cobra.Command{
Use: "dead-letter-queue",
Short: "To start dead-letter queue",
Long: `This queue is used to store failed job from all worker`,
RunE: func(cmd *cobra.Command, args []string) error {

// Init worker
subscriber, err := watermill.InitSubscriber(cfg)
if err != nil {
return err
}

// run worker with topic(queue name) and process function
if err := subscriber.Run(cfg.MQ.DeadQueue, HandleFailJob); err != nil {
return err
}

return nil
},
}
return workerCommand
}

func HandleFailJob(msg *message.Message) error {
fmt.Println("failed job", string(msg.Payload))
// process here
return nil
}
11 changes: 6 additions & 5 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
func Init(cfg config.AppConfig, logger *zap.Logger) error {
migrationCmd := GetMigrationCommandDef(cfg)
apiCmd := GetAPICommandDef(cfg, logger)

workerCmd:=GetWorkerCommandDef(cfg, logger)
workerCmd.PersistentFlags().String("topic", "demo", "Topic to subscribe")
workerCmd.PersistentFlags().Int("delay", 100, "time intertval to retry")
workerCmd.PersistentFlags().Int("retry-count", 3, "Number of retry")
workerCmd.PersistentFlags().Int("retry-delay", 100, "time intertval for two retry in ms")
workerCmd.PersistentFlags().Int("retry-count", 3, "number of retry")
workerCmd.PersistentFlags().String("topic", "", "topic name(queue name)")

d:=GetDeadQueueCommandDef(cfg, logger)
rootCmd := &cobra.Command{Use: "golang-api"}
rootCmd.AddCommand(&migrationCmd, &apiCmd, &workerCmd)
rootCmd.AddCommand(&migrationCmd, &apiCmd, &workerCmd,&d)
return rootCmd.Execute()
}
23 changes: 10 additions & 13 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package cli
import (
"go.uber.org/zap"

"github.com/Improwised/golang-api/cli/workers"
"github.com/Improwised/golang-api/config"
"github.com/Improwised/golang-api/pkg/watermill"
"github.com/Improwised/golang-api/cli/worker"
"github.com/spf13/cobra"
)

Expand All @@ -18,42 +18,39 @@ func GetWorkerCommandDef(cfg config.AppConfig, logger *zap.Logger) cobra.Command
Short: "To start worker",
Long: `To start worker`,
RunE: func(cmd *cobra.Command, args []string) error {

// Init worker
subscriber, err := watermill.InitWorker(cfg)
// Get details name from flag
topic, err := cmd.Flags().GetString("topic")
if err != nil {
return err
}

// get topic from flag
topic, err := cmd.Flags().GetString("topic")
retryCount, err := cmd.Flags().GetInt("retry-count")
if err != nil {
return err
}

// get retry count from flag
retryCount, err := cmd.Flags().GetInt("retry-count")
delay, err := cmd.Flags().GetInt("retry-delay")
if err != nil {
return err
}

// get delay from flag
delay, err := cmd.Flags().GetInt("delay")
// Init worker
subscriber, err := watermill.InitSubscriber(cfg)
if err != nil {
return err
}

// Init router for add middleware,retry count,etc
// init router for add middleware,retry count,etc
router, err := subscriber.InitRouter(cfg, delay, retryCount)
if err != nil {
return err
}

err = router.Run(topic, worker.Process)
// run worker with topic(queue name) and process function
err = router.Run(topic, workers.Process)
return err

},
}

return workerCommand
}
11 changes: 0 additions & 11 deletions cli/worker/gob_register.go

This file was deleted.

33 changes: 0 additions & 33 deletions cli/worker/struct.go

This file was deleted.

112 changes: 0 additions & 112 deletions cli/worker/worker.go

This file was deleted.

32 changes: 32 additions & 0 deletions cli/workers/user.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package workers

import (
"log"

helpers "github.com/Improwised/golang-api/helpers/smtp"
)

type WelcomeMail struct {
FirstName string `json:"first_name" `
LastName string `json:"last_name"`
Email string `json:"email"`
Roles string `json:"roles"`
}

func (w WelcomeMail) Handle() error {
log.Println("sending mail")

smtp := helpers.NewSMTPHelper("sandbox.smtp.mailtrap.io", "2525", "7628fa366c0257c", "2afb7200812272")
smtp.SetSubject("welocme")
smtp.SetPlainBody([]byte("welcome to our org"))

smtp.SetSender("chintansakhiya00001@gmail.com")
smtp.SetReceivers([]string{"chintansakhiya00001@gmail.com"})

if err := smtp.SendMail(); err != nil {
return err
}

log.Printf("mail send to %v", w.Email)
return nil
}
44 changes: 44 additions & 0 deletions cli/workers/worker_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package workers

import (
"bytes"
"encoding/gob"

"github.com/ThreeDotsLabs/watermill/message"
)

func init() {
for _, v := range ListStruct() {
gob.Register(v)
}
}

// Register all worker struct here befour run worker for proper unmarshalling
func ListStruct() []interface{} {
return []interface{}{
WelcomeMail{},
// ...
}
}

// Handler interface for all worker struct
type Handler interface {
Handle() error
}

// process all worker struct and call Handle function according to struct
func Process(msg *message.Message) error {
buf := bytes.NewBuffer(msg.Payload)
dec := gob.NewDecoder(buf)

var result Handler
err := dec.Decode(&result)
if err != nil {
return err
}
if err := result.Handle(); err != nil {
return err
}
msg.Ack()
return nil
}
Loading

0 comments on commit 71feca2

Please sign in to comment.