-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.go
177 lines (162 loc) · 4.11 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package drift
import (
"fmt"
"log"
"math/rand"
"os"
"strings"
"time"
jsoniter "github.com/json-iterator/go"
"github.com/mayur-tolexo/aqua"
"github.com/mayur-tolexo/drift/lib"
nsq "github.com/nsqio/go-nsq"
)
const (
driftApp = "-DRIFT-"
allKey = "ALL"
defaultAdminAddrs = "127.0.0.1:4171"
createAction = "create"
emptyAction = "empty"
deleteAction = "delete"
pauseAction = "pause"
unpauseAction = "unpause"
)
//NewConsumer will create new consumer
func NewConsumer(jobHandler JobHandler) *Drift {
return &Drift{
jobHandler: jobHandler,
chanelHandler: make(map[string]JobHandler),
consumers: make(map[string][]*nsq.Consumer),
}
}
//StartAdmin will start admin
func StartAdmin(lookupHTTPAddr []string, httpAddrs string) {
if httpAddrs == "" {
httpAddrs = defaultAdminAddrs
}
d := Drift{}
d.admin = dAdmin{
httpAddrs: strings.TrimPrefix(httpAddrs, "http://"),
lookupHTTPAddr: lookupHTTPAddr,
}
fmt.Println("Starting started at " + httpAddrs)
go d.sysInterrupt()
d.admin.startAdmin()
}
//NewPub will create new publisher
func NewPub(nsqDTCPAddrs []string) *Drift {
for i := range nsqDTCPAddrs {
nsqDTCPAddrs[i] = strings.TrimPrefix(nsqDTCPAddrs[i], "http://")
}
return &Drift{
jobHandler: nil,
chanelHandler: make(map[string]JobHandler),
consumers: make(map[string][]*nsq.Consumer),
pubAddrs: nsqDTCPAddrs,
}
}
//HandleMessage will define the nsq handler method
func (th *tailHandler) HandleMessage(m *nsq.Message) error {
return th.jobHandler(string(m.Body))
}
//vAddConsumer will validate add consumer request
func vAddConsumer(req aqua.Aide) (payload AddConstumer, err error) {
req.LoadVars()
err = lib.Unmarshal(req.Body, &payload)
return
}
//vAdmin will validate admin action request
func vAdmin(req aqua.Aide) (payload Admin, err error) {
req.LoadVars()
if err = lib.Unmarshal(req.Body, &payload); err == nil {
if payload.Topic == "" {
err = lib.VError("Empty Topic")
} else {
switch payload.Action {
case createAction, emptyAction, deleteAction, pauseAction, unpauseAction:
default:
err = lib.VError("Invalid action")
}
}
}
return
}
//vProduceReq will validate produce request
func vPublishReq(req aqua.Aide) (payload Publish, err error) {
req.LoadVars()
if err = lib.Unmarshal(req.Body, &payload); err == nil {
if len(payload.NsqDTCPAddrs) == 0 {
err = lib.VError("nsqd tcp address required")
}
}
return
}
//vKillConsumer will validate kill consumer request
func vKillConsumer(req aqua.Aide) (payload KillConsumer, err error) {
req.LoadVars()
if err = lib.Unmarshal(req.Body, &payload); err == nil {
if payload.Count <= 0 {
err = lib.BadReqError(err, "Invalid count")
}
}
return
}
//pPublishReq will process the publish request
func pPublishReq(d *Drift, payload Publish) (data interface{}, err error) {
config := nsq.NewConfig()
config.UserAgent = fmt.Sprintf("drift/%s", nsq.VERSION)
producers := make(map[string]*nsq.Producer)
for _, addr := range payload.NsqDTCPAddrs {
var producer *nsq.Producer
if producer, err = nsq.NewProducer(addr, config); err != nil {
break
}
logger := log.New(os.Stderr, "", log.Flags())
if d.logger != nil {
logger = d.logger
}
producer.SetLogger(logger, d.getNsqLogLevel())
producers[addr] = producer
}
if err == nil {
for _, producer := range producers {
var b []byte
if b, err = jsoniter.Marshal(payload.Data); err == nil {
if err = producer.Publish(payload.Topic, b); err != nil {
break
}
} else {
break
}
}
}
for _, producer := range producers {
producer.Stop()
}
data = "DONE"
return
}
func (d *Drift) getNsqLogLevel() (lvl nsq.LogLevel) {
switch d.logLevel {
case LogLevelInfo:
lvl = nsq.LogLevelInfo
case LogLevelWarning:
lvl = nsq.LogLevelWarning
case LogLevelError:
lvl = nsq.LogLevelError
default:
lvl = nsq.LogLevelDebug
}
return
}
func getChannel(c string) (channel string) {
channel = c
if channel == "" {
rand.Seed(time.Now().UnixNano())
channel = fmt.Sprintf("drift%06d#ephemeral", rand.Int()%999999)
}
return
}
func hash(a, b string) string {
return a + driftApp + b
}