This repository has been archived by the owner on Dec 30, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathworker.go
110 lines (89 loc) · 2.71 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2018 Diego Bernardes. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package flare
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/diegobernardes/flare/infra/worker"
)
// Worker act as a queue that receive the messages from other workers and as a dispatcher as they
// process the requests from the same queue and deliver it to other workers.
type Worker struct {
Logger log.Logger
Pusher worker.Pusher
tasks map[string]worker.Processor
}
// Init validate if the worker has everything it needs to run.
func (w *Worker) Init() error {
if w.Logger == nil {
return errors.New("missing logger")
}
if w.Pusher == nil {
return errors.New("missing pusher")
}
w.tasks = make(map[string]worker.Processor)
return nil
}
// Process the message.
func (w *Worker) Process(ctx context.Context, rawContent []byte) error {
task, payload, err := w.unmarshal(rawContent)
if err != nil {
return errors.Wrap(err, "error during marshal content to json")
}
processor, ok := w.tasks[task]
if !ok {
level.Info(w.Logger).Log("message", "ignoring message as processor is not found", "task", task)
return nil
}
if err := processor.Process(ctx, payload); err != nil {
return errors.Wrap(err, "error during process")
}
return nil
}
// Enqueue the message to process it later.
func (w *Worker) Enqueue(ctx context.Context, rawContent []byte, task string) error {
content, err := w.marshal(task, rawContent)
if err != nil {
return errors.Wrap(err, "error during marshal message to json")
}
if err := w.Pusher.Push(ctx, content); err != nil {
return errors.Wrap(err, "error during push message")
}
return nil
}
// Register a processor for a given task.
func (w *Worker) Register(task string, processor worker.Processor) error {
if _, ok := w.tasks[task]; ok {
return fmt.Errorf("already have processor associated with the task '%s'", task)
}
w.tasks[task] = processor
return nil
}
func (w *Worker) marshal(task string, rawContent []byte) ([]byte, error) {
type protocol struct {
Task string `json:"task"`
Payload json.RawMessage `json:"payload"`
}
message := protocol{Task: task, Payload: rawContent}
content, err := json.Marshal(&message)
if err != nil {
return nil, err
}
return content, nil
}
func (w *Worker) unmarshal(content []byte) (string, []byte, error) {
type protocol struct {
Task string `json:"task"`
Payload json.RawMessage `json:"payload"`
}
var message protocol
if err := json.Unmarshal(content, &message); err != nil {
return "", nil, err
}
return message.Task, message.Payload, nil
}