-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.go
96 lines (86 loc) · 2.23 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package kafkaesque
import (
"github.com/reactivex/rxgo/v2"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
// Header mapping of kafka header
type Header kafka.Header
func mapHeader(xs []Header) []kafka.Header {
var f []kafka.Header
for _, x := range xs {
f = append(f, kafka.Header(x))
}
return f
}
// ErrorEvent will be exported when producer crashes
type ErrorEvent struct {
Error error
}
func (ee ErrorEvent) String() string {
return ee.Error.Error()
}
//Producer holds the observables and channels in one struct.
type Producer struct {
Config *Config
Topic string
Key []byte
closeChannel chan bool
infoChannel chan kafka.Event
producer *kafka.Producer
Events rxgo.Observable
headers []kafka.Header
}
//Open starts the Producer
func (p *Producer) open() (*Producer, error) {
var err error
p.producer, err = kafka.NewProducer(p.Config.Map())
if err != nil {
return nil, err
}
go func(producer *Producer) {
defer p.producer.Close()
for ev := range producer.Events.Observe() {
err := producer.producer.Produce(
&kafka.Message{
Value: ev.V.([]byte),
Key: p.Key,
TopicPartition: kafka.TopicPartition{Topic: &producer.Topic, Partition: kafka.PartitionAny},
Headers: p.headers,
},
p.infoChannel,
)
if err != nil {
p.infoChannel <- kafka.Event(ErrorEvent{Error: err})
return
}
}
}(p)
return p, nil
}
// GetInfos return infos from producer as Observable
func (p *Producer) GetInfos() rxgo.Observable {
items := make(chan rxgo.Item)
go func(items chan rxgo.Item, infos chan kafka.Event) {
for info := range infos {
items <- rxgo.Of(info)
}
}(items, p.infoChannel)
return rxgo.FromChannel(items)
}
// Close the producer
func (p *Producer) Close() {
p.producer.Close()
}
// NewProducer creates a new producer instance from a observable
func NewProducer(config *Config, topic string, key []byte, events rxgo.Observable, header ...Header) (*Producer, error) {
producer := &Producer{
Config: config,
Topic: topic,
Key: key,
Events: events,
closeChannel: make(chan bool),
infoChannel: make(chan kafka.Event),
headers: mapHeader(header),
}
return producer.open()
}