diff --git a/batch_producer_interceptor.go b/batch_producer_interceptor.go new file mode 100644 index 0000000..e00b6dc --- /dev/null +++ b/batch_producer_interceptor.go @@ -0,0 +1,14 @@ +package kafka + +import ( + "context" +) + +type BatchProducerInterceptorContext struct { + Context context.Context + Message *Message +} + +type BatchProducerInterceptor interface { + OnProduce(ctx context.Context, msg Message) +} diff --git a/examples/with-deadletter/main.go b/examples/with-deadletter/main.go index 7c7f1dc..29ed729 100644 --- a/examples/with-deadletter/main.go +++ b/examples/with-deadletter/main.go @@ -21,7 +21,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }) + }, nil) _ = producer.Produce(context.Background(), kafka.Message{ Topic: topicName, diff --git a/examples/with-kafka-producer/main.go b/examples/with-kafka-producer/main.go index f4522f5..67f66b9 100644 --- a/examples/with-kafka-producer/main.go +++ b/examples/with-kafka-producer/main.go @@ -10,7 +10,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }) + }, nil) const topicName = "standart-topic" diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index 09f0429..d7ba9fa 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -13,7 +13,7 @@ import ( func main() { producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"}, - }) + }, nil) producer.ProduceBatch(context.Background(), []kafka.Message{ {Key: []byte("key1"), Value: []byte("message1")}, diff --git a/producer.go b/producer.go index ee2910d..3a51574 100644 --- a/producer.go +++ b/producer.go @@ -18,10 +18,11 @@ type Writer interface { } type producer struct { - w Writer + w Writer + interceptor *ProducerInterceptor } -func NewProducer(cfg *ProducerConfig) (Producer, error) { +func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Producer, error) { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(cfg.Writer.Brokers...), Topic: cfg.Writer.Topic, @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) { kafkaWriter.Transport = transport } - p := &producer{w: kafkaWriter} + p := &producer{w: kafkaWriter, interceptor: interceptor} if cfg.DistributedTracingEnabled { otelWriter, err := NewOtelProducer(cfg, kafkaWriter) @@ -64,18 +65,27 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) { return p, nil } -func (c *producer) Produce(ctx context.Context, message Message) error { - return c.w.WriteMessages(ctx, message.toKafkaMessage()) +func (p *producer) Produce(ctx context.Context, message Message) error { + if p.interceptor != nil { + (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &message}) + } + + return p.w.WriteMessages(ctx, message.toKafkaMessage()) } -func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error { +func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error { kafkaMessages := make([]kafka.Message, 0, len(messages)) for i := range messages { + if p.interceptor != nil { + (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &messages[i]}) + } + kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage()) } - return c.w.WriteMessages(ctx, kafkaMessages...) + + return p.w.WriteMessages(ctx, kafkaMessages...) } -func (c *producer) Close() error { - return c.w.Close() +func (p *producer) Close() error { + return p.w.Close() } diff --git a/producer_interceptor.go b/producer_interceptor.go new file mode 100644 index 0000000..b5bc6b9 --- /dev/null +++ b/producer_interceptor.go @@ -0,0 +1,14 @@ +package kafka + +import ( + "context" +) + +type ProducerInterceptorContext struct { + Context context.Context + Message *Message +} + +type ProducerInterceptor interface { + OnProduce(ctx ProducerInterceptorContext) +} diff --git a/producer_test.go b/producer_test.go index d4fd8ac..02cfb87 100644 --- a/producer_test.go +++ b/producer_test.go @@ -2,6 +2,8 @@ package kafka import ( "context" + stubData "github.com/Trendyol/kafka-konsumer/v2/test/stub-data" + "github.com/gofiber/fiber/v2/utils" "testing" "github.com/segmentio/kafka-go" @@ -20,6 +22,27 @@ func Test_producer_Produce_Successfully(t *testing.T) { } } +func Test_producer_Produce_interceptor_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + msg := Message{Headers: make([]Header, 0)} + msg.Headers = append(msg.Headers, kafka.Header{ + Key: "x-correlation-id", + Value: []byte(utils.UUIDv4()), + }) + interceptor := stubData.NewMockProducerInterceptor() + + p := producer{w: mw, interceptor: &interceptor} + + // When + err := p.Produce(context.Background(), msg) + + // Then + if err != nil { + t.Fatalf("Producing err %s", err.Error()) + } +} + func Test_producer_ProduceBatch_Successfully(t *testing.T) { // Given mw := &mockWriter{} @@ -33,6 +56,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) { } } +func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + interceptor := stubData.NewMockProducerInterceptor() + p := producer{w: mw, interceptor: &interceptor} + + // When + err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}}) + // Then + if err != nil { + t.Fatalf("Batch Producing err %s", err.Error()) + } +} + func Test_producer_Close_Successfully(t *testing.T) { // Given mw := &mockWriter{} @@ -48,7 +85,7 @@ func Test_producer_Close_Successfully(t *testing.T) { type mockWriter struct{} -func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error { +func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error { return nil } diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index b62f977..351658a 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 #for m1 => v23.3.9-arm64 container_name: redpanda-1 command: - redpanda diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index af93446..3adea00 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" + stub_data "github.com/Trendyol/kafka-konsumer/v2/test/stub-data" segmentio "github.com/segmentio/kafka-go" "testing" "time" @@ -14,28 +15,92 @@ import ( func Test_Should_Produce_Successfully(t *testing.T) { // Given t.Parallel() - topic := "produce-topic" brokerAddress := "localhost:9092" - producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, - Transport: &kafka.TransportConfig{ - MetadataTopics: []string{ - topic, + t.Run("without interceptor", func(t *testing.T) { + //Given + + topic := "produce-topic" + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, + Transport: &kafka.TransportConfig{ + MetadataTopics: []string{ + topic, + }, }, - }, - }) + }, nil) - // When - err := producer.Produce(context.Background(), kafka.Message{ - Key: []byte("1"), - Value: []byte(`foo`), + // When + err := producer.Produce(context.Background(), kafka.Message{ + Key: []byte("1"), + Value: []byte(`foo`), + }) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } }) - // Then - if err != nil { - t.Fatalf("Error while producing err %s", err.Error()) - } + t.Run("with interceptor", func(t *testing.T) { + // Given + topic := "produce-interceptor-topic" + consumerGroup := "produce-topic-cg" + interceptor := stub_data.NewMockProducerInterceptor() + + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, + Transport: &kafka.TransportConfig{ + MetadataTopics: []string{ + topic, + }, + }, + }, &interceptor) + + // When + err := producer.Produce(context.Background(), kafka.Message{ + Key: []byte("1"), + Value: []byte(`foo`), + }) + + messageCh := make(chan *kafka.Message) + + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + ConsumeFn: func(message *kafka.Message) error { + messageCh <- message + return nil + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + // Then + + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + + actual := <-messageCh + if string(actual.Value) != "foo" { + t.Fatalf("Value does not equal %s", actual.Value) + } + if string(actual.Key) != "1" { + t.Fatalf("Key does not equal %s", actual.Key) + } + if len(actual.Headers) != 1 { + t.Fatalf("Header size does not equal %d", len(actual.Headers)) + } + if string(actual.Headers[0].Key) != stub_data.XSourceAppKey { + t.Fatalf("Header key does not equal %s", actual.Headers[0].Key) + } + if string(actual.Headers[0].Value) != stub_data.XSourceAppValue { + t.Fatalf("Header value does not equal %s", actual.Headers[0].Value) + } + }) } func Test_Should_Batch_Produce_Successfully(t *testing.T) { @@ -43,11 +108,6 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { t.Parallel() topic := "batch-produce-topic" brokerAddress := "localhost:9092" - - producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}) - - // When msgs := []kafka.Message{ { Key: []byte("1"), @@ -59,13 +119,33 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { }, } - // When - err := producer.ProduceBatch(context.Background(), msgs) + t.Run("without interceptor", func(t *testing.T) { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, nil) - // Then - if err != nil { - t.Fatalf("Error while producing err %s", err.Error()) - } + // When + err := producer.ProduceBatch(context.Background(), msgs) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + }) + + t.Run("with interceptor", func(t *testing.T) { + interceptor := stub_data.NewMockProducerInterceptor() + + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor) + + // When + err := producer.ProduceBatch(context.Background(), msgs) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + }) } func Test_Should_Consume_Message_Successfully(t *testing.T) { diff --git a/test/stub-data/producer_interceptor_stub.go b/test/stub-data/producer_interceptor_stub.go new file mode 100644 index 0000000..a82ae5e --- /dev/null +++ b/test/stub-data/producer_interceptor_stub.go @@ -0,0 +1,21 @@ +package test + +import "github.com/Trendyol/kafka-konsumer/v2" + +type MockProducerInterceptor struct{} + +const ( + XSourceAppKey = "x-source-app" + XSourceAppValue = "kafka-konsumer" +) + +func (i *MockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: XSourceAppKey, + Value: []byte(XSourceAppValue), + }) +} + +func NewMockProducerInterceptor() kafka.ProducerInterceptor { + return &MockProducerInterceptor{} +}