diff --git a/producer.go b/producer.go index 3a51574..640b919 100644 --- a/producer.go +++ b/producer.go @@ -18,11 +18,11 @@ type Writer interface { } type producer struct { - w Writer - interceptor *ProducerInterceptor + w Writer + interceptors []ProducerInterceptor } -func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Producer, error) { +func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(cfg.Writer.Brokers...), Topic: cfg.Writer.Topic, @@ -52,7 +52,7 @@ func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Produce kafkaWriter.Transport = transport } - p := &producer{w: kafkaWriter, interceptor: interceptor} + p := &producer{w: kafkaWriter, interceptors: interceptors} if cfg.DistributedTracingEnabled { otelWriter, err := NewOtelProducer(cfg, kafkaWriter) @@ -66,8 +66,8 @@ func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Produce } func (p *producer) Produce(ctx context.Context, message Message) error { - if p.interceptor != nil { - (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &message}) + if len(p.interceptors) > 0 { + p.executeInterceptors(ctx, &message) } return p.w.WriteMessages(ctx, message.toKafkaMessage()) @@ -76,8 +76,8 @@ func (p *producer) Produce(ctx context.Context, message Message) error { func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error { kafkaMessages := make([]kafka.Message, 0, len(messages)) for i := range messages { - if p.interceptor != nil { - (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &messages[i]}) + if len(p.interceptors) > 0 { + p.executeInterceptors(ctx, &messages[i]) } kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage()) @@ -86,6 +86,12 @@ func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error { return p.w.WriteMessages(ctx, kafkaMessages...) } +func (p *producer) executeInterceptors(ctx context.Context, message *Message) { + for _, interceptor := range p.interceptors { + interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message}) + } +} + func (p *producer) Close() error { return p.w.Close() } diff --git a/producer_test.go b/producer_test.go index e02c34d..452b6df 100644 --- a/producer_test.go +++ b/producer_test.go @@ -32,7 +32,7 @@ func Test_producer_Produce_interceptor_Successfully(t *testing.T) { }) interceptor := newMockProducerInterceptor() - p := producer{w: mw, interceptor: &interceptor} + p := producer{w: mw, interceptors: interceptor} // When err := p.Produce(context.Background(), msg) @@ -59,7 +59,7 @@ func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) { // Given mw := &mockWriter{} interceptor := newMockProducerInterceptor() - p := producer{w: mw, interceptor: &interceptor} + p := producer{w: mw, interceptors: interceptor} // When err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}}) @@ -101,6 +101,6 @@ func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) { }) } -func newMockProducerInterceptor() ProducerInterceptor { - return &mockProducerInterceptor{} +func newMockProducerInterceptor() []ProducerInterceptor { + return []ProducerInterceptor{&mockProducerInterceptor{}} } diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 2d4306f..a11a4d0 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" - stubdata "github.com/Trendyol/kafka-konsumer/v2/test/stubdata" segmentio "github.com/segmentio/kafka-go" "testing" "time" @@ -28,7 +27,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { topic, }, }, - }, nil) + }) // When err := producer.Produce(context.Background(), kafka.Message{ @@ -46,7 +45,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { // Given topic := "produce-interceptor-topic" consumerGroup := "produce-topic-cg" - interceptor := stubdata.NewMockProducerInterceptor() + interceptor := newMockProducerInterceptor() producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, @@ -55,7 +54,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { topic, }, }, - }, &interceptor) + }, interceptor...) // When err := producer.Produce(context.Background(), kafka.Message{ @@ -94,10 +93,10 @@ func Test_Should_Produce_Successfully(t *testing.T) { if len(actual.Headers) != 1 { t.Fatalf("Header size does not equal %d", len(actual.Headers)) } - if string(actual.Headers[0].Key) != stubdata.XSourceAppKey { + if string(actual.Headers[0].Key) != xSourceAppKey { t.Fatalf("Header key does not equal %s", actual.Headers[0].Key) } - if string(actual.Headers[0].Value) != stubdata.XSourceAppValue { + if string(actual.Headers[0].Value) != xSourceAppValue { t.Fatalf("Header value does not equal %s", actual.Headers[0].Value) } }) @@ -121,7 +120,7 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { t.Run("without interceptor", func(t *testing.T) { producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, nil) + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}) // When err := producer.ProduceBatch(context.Background(), msgs) @@ -133,10 +132,10 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { }) t.Run("with interceptor", func(t *testing.T) { - interceptor := stubdata.NewMockProducerInterceptor() + interceptors := newMockProducerInterceptor() producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor) + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, interceptors...) // When err := producer.ProduceBatch(context.Background(), msgs) @@ -643,3 +642,21 @@ func assertEventually(t *testing.T, condition func() bool, waitFor time.Duration } } } + +type mockProducerInterceptor struct{} + +const ( + xSourceAppKey = "x-source-app" + xSourceAppValue = "kafka-konsumer" +) + +func (i *mockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: xSourceAppKey, + Value: []byte(xSourceAppValue), + }) +} + +func newMockProducerInterceptor() []kafka.ProducerInterceptor { + return []kafka.ProducerInterceptor{&mockProducerInterceptor{}} +} diff --git a/test/stubdata/producer_interceptor_stub.go b/test/stubdata/producer_interceptor_stub.go deleted file mode 100644 index a82ae5e..0000000 --- a/test/stubdata/producer_interceptor_stub.go +++ /dev/null @@ -1,21 +0,0 @@ -package test - -import "github.com/Trendyol/kafka-konsumer/v2" - -type MockProducerInterceptor struct{} - -const ( - XSourceAppKey = "x-source-app" - XSourceAppValue = "kafka-konsumer" -) - -func (i *MockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { - ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ - Key: XSourceAppKey, - Value: []byte(XSourceAppValue), - }) -} - -func NewMockProducerInterceptor() kafka.ProducerInterceptor { - return &MockProducerInterceptor{} -}