Skip to content

Commit

Permalink
feat: add skip message by header filter function for cronsumer (#104)
Browse files Browse the repository at this point in the history
* feat: add skip message by header filter function for cronsumer

* feat: update cronsumer version

* fix: remove cronsumer import path

* feat: implement header to cronsumer header conversion for skip message by header function

* chore: add tests

* chore: fix lint

* chore: remove depguard and add gosec to the test in golangci-lint

---------

Co-authored-by: Abdulsametileri <sametileri07@gmail.com>
  • Loading branch information
emrekosen and Abdulsametileri authored Feb 15, 2024
1 parent 01d341a commit 9ec9054
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
dist/
unit_coverage.out
unit_coverage.html
unit_coverage.html
.DS_Store
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ linters:
disable-all: true
enable:
- bodyclose
- depguard
- errcheck
- dupl
- exhaustive
Expand Down Expand Up @@ -43,6 +42,7 @@ issues:
- path: _test\.go
linters:
- errcheck
- gosec
- funlen

service:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | |
| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | |
| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | |
| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil |
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
Expand Down
42 changes: 31 additions & 11 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel),
}

if cfg.RetryConfiguration.SkipMessageByHeaderFn != nil {
cronsumerCfg.Consumer.SkipMessageByHeaderFn = func(headers []kcronsumer.Header) bool {
return cfg.RetryConfiguration.SkipMessageByHeaderFn(toHeaders(headers))
}
}

if !cfg.RetryConfiguration.SASL.IsEmpty() {
cronsumerCfg.SASL.Enabled = true
cronsumerCfg.SASL.AuthType = string(cfg.RetryConfiguration.SASL.Type)
Expand Down Expand Up @@ -110,18 +116,32 @@ type DistributedTracingConfiguration struct {
Propagator propagation.TextMapPropagator
}

type SkipMessageByHeaderFn func(headers []Header) bool

func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header {
headers := make([]Header, 0, len(cronsumerHeaders))
for i := range cronsumerHeaders {
headers = append(headers, Header{
Key: cronsumerHeaders[i].Key,
Value: cronsumerHeaders[i].Value,
})
}
return headers
}

type RetryConfiguration struct {
SASL *SASLConfig
TLS *TLSConfig
ClientID string
StartTimeCron string
Topic string
DeadLetterTopic string
Rack string
LogLevel LogLevel
Brokers []string
MaxRetry int
WorkDuration time.Duration
SASL *SASLConfig
TLS *TLSConfig
ClientID string
StartTimeCron string
Topic string
DeadLetterTopic string
Rack string
LogLevel LogLevel
Brokers []string
MaxRetry int
WorkDuration time.Duration
SkipMessageByHeaderFn SkipMessageByHeaderFn
}

type BatchConfiguration struct {
Expand Down
77 changes: 77 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package kafka
import (
"testing"
"time"

kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka"
"github.com/google/go-cmp/cmp"
)

func TestConsumerConfig_validate(t *testing.T) {
Expand Down Expand Up @@ -61,3 +64,77 @@ func TestConsumerConfig_validate(t *testing.T) {
}
})
}

func TestConsumerConfig_newCronsumerConfig(t *testing.T) {
t.Run("Should_Return_Nil_When_Client_Don't_Use_SkipMessageByHeaderFn", func(t *testing.T) {
// Given
cfg := ConsumerConfig{}

// When
actual := cfg.newCronsumerConfig()

// Then
if actual.Consumer.SkipMessageByHeaderFn != nil {
t.Error("SkipMessageByHeaderFn must be nil")
}
})
t.Run("Should_Set_When_Client_Give_SkipMessageByHeaderFn", func(t *testing.T) {
// Given
cfg := ConsumerConfig{
RetryConfiguration: RetryConfiguration{
SkipMessageByHeaderFn: func(headers []Header) bool {
return false
},
},
}

// When
actual := cfg.newCronsumerConfig()

// Then
if actual.Consumer.SkipMessageByHeaderFn == nil {
t.Error("SkipMessageByHeaderFn mustn't be nil")
}
})
}

func Test_toHeader(t *testing.T) {
t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Nil", func(t *testing.T) {
// When
headers := toHeaders(nil)

// Then
if len(headers) != 0 {
t.Error("Header must be nil")
}
})
t.Run("Should_Return_Empty_List_When_Cronsumer_Header_Is_Empty", func(t *testing.T) {
// When
headers := toHeaders([]kcronsumer.Header{})

// Then
if len(headers) != 0 {
t.Error("Header must be nil")
}
})
t.Run("Should_Covert_List_When_Cronsumer_Header", func(t *testing.T) {
// Given
expected := []Header{
{Key: "key", Value: []byte("val")},
{Key: "key2", Value: []byte("val2")},
{Key: "key3", Value: nil},
}

// When
actual := toHeaders([]kcronsumer.Header{
{Key: "key", Value: []byte("val")},
{Key: "key2", Value: []byte("val2")},
{Key: "key3", Value: nil},
})

// Then
if diff := cmp.Diff(expected, actual); diff != "" {
t.Error(diff)
}
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2
go 1.19

require (
github.com/Trendyol/kafka-cronsumer v1.4.6
github.com/Trendyol/kafka-cronsumer v1.4.7
github.com/Trendyol/otel-kafka-konsumer v0.0.7
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.50.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY=
github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY=
github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
)

require (
github.com/Trendyol/kafka-cronsumer v1.4.6 // indirect
github.com/Trendyol/kafka-cronsumer v1.4.7 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY=
github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down

0 comments on commit 9ec9054

Please sign in to comment.