-
Notifications
You must be signed in to change notification settings - Fork 14
/
producer.go
98 lines (81 loc) · 2.67 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package kafka
import (
"context"
"github.com/segmentio/kafka-go"
)
type Producer interface {
Produce(ctx context.Context, message Message) error
ProduceBatch(ctx context.Context, messages []Message) error
Close() error
}
type Writer interface {
WriteMessages(context.Context, ...kafka.Message) error
Close() error
}
type producer struct {
w Writer
interceptors []ProducerInterceptor
}
func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) {
cfg.Writer.removeSpaceBrokerList()
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Balancer: cfg.Writer.Balancer,
MaxAttempts: cfg.Writer.MaxAttempts,
WriteBackoffMin: cfg.Writer.WriteBackoffMin,
WriteBackoffMax: cfg.Writer.WriteBackoffMax,
BatchSize: cfg.Writer.BatchSize,
BatchBytes: cfg.Writer.BatchBytes,
BatchTimeout: cfg.Writer.BatchTimeout,
ReadTimeout: cfg.Writer.ReadTimeout,
WriteTimeout: cfg.Writer.WriteTimeout,
RequiredAcks: cfg.Writer.RequiredAcks,
Async: cfg.Writer.Async,
Completion: cfg.Writer.Completion,
Compression: cfg.Writer.Compression,
Logger: cfg.Writer.Logger,
ErrorLogger: cfg.Writer.ErrorLogger,
AllowAutoTopicCreation: cfg.Writer.AllowAutoTopicCreation,
}
if cfg.SASL != nil || cfg.TLS != nil {
transport, err := cfg.newKafkaTransport()
if err != nil {
return nil, err
}
kafkaWriter.Transport = transport
}
p := &producer{w: kafkaWriter, interceptors: interceptors}
if cfg.DistributedTracingEnabled {
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
if err != nil {
return nil, err
}
p.w = otelWriter
}
return p, nil
}
func (p *producer) Produce(ctx context.Context, message Message) error {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &message)
}
return p.w.WriteMessages(ctx, message.toKafkaMessage())
}
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
kafkaMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &messages[i])
}
kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
}
return p.w.WriteMessages(ctx, kafkaMessages...)
}
func (p *producer) executeInterceptors(ctx context.Context, message *Message) {
for _, interceptor := range p.interceptors {
interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message})
}
}
func (p *producer) Close() error {
return p.w.Close()
}