-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadapter.go
159 lines (128 loc) · 3.06 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package kafka
import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/babex-group/babex"
"github.com/bsm/sarama-cluster"
)
type Mode int
const (
_ Mode = iota + 1
ModeSingle
ModeMulti
)
type Adapter struct {
Consumer *cluster.Consumer
Producer sarama.SyncProducer
options Options
ch chan *babex.Message
err chan error
multi chan *babex.Channel
logger Logger
}
type Options struct {
Name string
Addrs []string
Topics []string
// If mode is Single then use service.GetMessages channel
// If mode is Multi then use service.GetChannels()
Mode Mode
// Function for message converting from sarama.Message to babex.Message
// Default kafka.NewMessage
ConvertMessage Converter
ConsumerConfig *cluster.Config
Logger Logger
}
func NewAdapter(options Options) (*Adapter, error) {
if options.ConvertMessage == nil {
options.ConvertMessage = NewMessage
}
if options.ConsumerConfig == nil {
options.ConsumerConfig = cluster.NewConfig()
}
if options.Mode == ModeMulti {
options.ConsumerConfig.Group.Mode = cluster.ConsumerModePartitions
}
if options.Logger == nil {
options.Logger = &StubLogger{}
}
consumer, err := cluster.NewConsumer(
options.Addrs,
options.Name,
options.Topics,
options.ConsumerConfig,
)
if err != nil {
return nil, err
}
producerConfig := sarama.NewConfig()
producerConfig.Producer.RequiredAcks = sarama.WaitForAll
producerConfig.Producer.Retry.Max = 5
producerConfig.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(options.Addrs, producerConfig)
if err != nil {
return nil, err
}
adapter := Adapter{
Consumer: consumer,
options: options,
Producer: producer,
ch: make(chan *babex.Message),
err: make(chan error),
multi: make(chan *babex.Channel),
logger: options.Logger,
}
if options.Mode == ModeMulti {
go multiListen(&adapter)
} else {
go singleListen(&adapter)
}
return &adapter, nil
}
func (a Adapter) GetMessages() (<-chan *babex.Message, error) {
return a.ch, nil
}
// Get channel for fatal errors
func (a *Adapter) GetErrors() chan error {
return a.err
}
func (a *Adapter) PublishMessage(exchange string, key string, chain []babex.ChainItem, data interface{}, meta map[string]string, config json.RawMessage) error {
bData, err := json.Marshal(data)
if err != nil {
return err
}
b, err := json.Marshal(babex.InitialMessage{
Data: bData,
Chain: chain,
Config: config,
Meta: meta,
})
if err != nil {
return err
}
_, _, err = a.Producer.SendMessage(&sarama.ProducerMessage{
Topic: exchange,
Value: sarama.ByteEncoder(b),
})
return err
}
func (a *Adapter) Publish(exchange string, key string, message babex.InitialMessage) error {
b, err := json.Marshal(message)
if err != nil {
return err
}
_, _, err = a.Producer.SendMessage(&sarama.ProducerMessage{
Topic: exchange,
Value: sarama.ByteEncoder(b),
})
return err
}
func (a *Adapter) Close() error {
return a.Consumer.Close()
}
func (a *Adapter) Channels() babex.Channels {
if a.options.Mode == ModeMulti {
return a.multi
}
return nil
}