Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add transactional retry feature #64

Merged
merged 10 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,55 @@ After running `docker-compose up` command, you can run any application you want.
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}

</details>

<details>
<summary>With Disabling Transactional Retry</summary>

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
}
}

// you must return err here to retry failed messages
return errors.New("err")
}

</details>

#### With Distributed Tracing Support

Expand Down Expand Up @@ -147,6 +194,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
Expand Down Expand Up @@ -192,8 +240,4 @@ Kafka Konsumer offers an API that handles exposing several metrics.
| Metric Name | Description | Value Type |
|---------------------------------------------------------|---------------------------------------------|------------|
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
| kafka_konsumer_processed_batch_messages_total_current | Total number of processed batch messages. | Counter |
| kafka_konsumer_unprocessed_batch_messages_total_current | Total number of unprocessed batch messages. | Counter |

**NOTE:** `kafka_konsumer_processed_batch_messages_total_current` and `kafka_konsumer_unprocessed_batch_messages_total_current` will be deprecated in the next releases. Please use `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current` instead.
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
14 changes: 11 additions & 3 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,24 @@ func (b *batchConsumer) process(messages []*Message) {

if consumeErr != nil {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
// Try to process same messages again
// Try to process same messages again for resolving transient network errors etc.
if consumeErr = b.consumeFn(messages); consumeErr != nil {
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))
}

if consumeErr != nil && b.retryEnabled {
cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
if b.transactionalRetry {
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
}
} else {
for i := range messages {
if messages[i].IsFailed {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
}
}
}

if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil {
Expand Down
46 changes: 41 additions & 5 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true},
consumeFn: func([]*Message) error {
return nil
},
Expand All @@ -84,7 +84,7 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
gotOnlyOneTimeException := true
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
Expand All @@ -108,7 +108,7 @@ func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
Expand All @@ -129,7 +129,10 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
mc := mockCronsumer{}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
Expand All @@ -150,8 +153,41 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
})
t.Run("When_Transactional_Retry_Disabled", func(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{
metric: &ConsumerMetric{},
logger: NewZapLogger(LogLevelDebug),
retryEnabled: true,
transactionalRetry: false,
cronsumer: mc,
},
consumeFn: func(messages []*Message) error {
messages[0].IsFailed = true
messages[1].IsFailed = true

return errors.New("error case")
},
}
Expand Down
29 changes: 0 additions & 29 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ type metricCollector struct {

totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalUnprocessedBatchMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalProcessedBatchMessagesCounter *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
Expand All @@ -37,19 +33,6 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
ch <- prometheus.MustNewConstMetric(
s.totalProcessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalProcessedMessagesCounter),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.totalUnprocessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
}

func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
Expand All @@ -68,18 +51,6 @@ func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
[]string{},
nil,
),
totalProcessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_batch_messages_total", "current"),
"Total number of processed batch messages.",
[]string{},
nil,
),
totalUnprocessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_batch_messages_total", "current"),
"Total number of unprocessed batch messages.",
[]string{},
nil,
),
}
}

Expand Down
2 changes: 2 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
concurrency int
once sync.Once
retryEnabled bool
transactionalRetry bool
distributedTracingEnabled bool
propagator propagation.TextMapPropagator
}
Expand Down Expand Up @@ -73,6 +74,7 @@
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
transactionalRetry: *cfg.TransactionalRetry,

Check warning on line 77 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L77

Added line #L77 was not covered by tests
distributedTracingEnabled: cfg.DistributedTracingEnabled,
logger: log,
subprocesses: newSubProcesses(),
Expand Down
8 changes: 8 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ConsumerConfig struct {
Concurrency int
RetryEnabled bool
APIEnabled bool
TransactionalRetry *bool
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
Expand Down Expand Up @@ -193,4 +194,11 @@ func (cfg *ConsumerConfig) setDefaults() {
cfg.DistributedTracingConfiguration.TracerProvider = otel.GetTracerProvider()
}
}
if cfg.TransactionalRetry == nil {
cfg.TransactionalRetry = NewBoolPtr(true)
}
}

func NewBoolPtr(value bool) *bool {
return &value
}
3 changes: 3 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func TestConsumerConfig_validate(t *testing.T) {
if cfg.Reader.CommitInterval != time.Second {
t.Fatalf("Reader Commit Interval default value must equal to 1s")
}
if *cfg.TransactionalRetry != true {
t.Fatal("Default Transactional Retry is true")
}
})
t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) {
// Given
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion examples/with-kafka-batch-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer/load.txt
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
Expand Down
58 changes: 58 additions & 0 deletions examples/with-kafka-transactional-retry-disabled/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer"
"os"
"os/signal"
"time"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
BatchConfiguration: &kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
RetryEnabled: true,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/5 * * * *",
WorkDuration: 4 * time.Minute,
MaxRetry: 3,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
}
}

// you must return error here to retry only failed messages
return errors.New("err")
}
15 changes: 10 additions & 5 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ type Message struct {
Partition int
Offset int64
HighWaterMark int64
Key []byte
Value []byte
Headers []Header
WriterData interface{}
Time time.Time

// IsFailed Is only used on transactional retry disabled
IsFailed bool

Key []byte
Value []byte
Headers []Header
WriterData interface{}
Time time.Time

// Context To enable distributed tracing support
Context context.Context
}
Expand Down
4 changes: 0 additions & 4 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,4 @@ package kafka
type ConsumerMetric struct {
TotalUnprocessedMessagesCounter int64
TotalProcessedMessagesCounter int64
// Deprecated
TotalUnprocessedBatchMessagesCounter int64
// Deprecated
TotalProcessedBatchMessagesCounter int64
}
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
)

require (
github.com/Trendyol/kafka-cronsumer v1.3.4 // indirect
github.com/Trendyol/kafka-cronsumer v1.4.4 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.5 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/Trendyol/kafka-cronsumer v1.3.4 h1:H1PmXfNtzCQm6pYsERUHlSTaib/WaICES+GJvl2RX8U=
github.com/Trendyol/kafka-cronsumer v1.3.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.4.4 h1:RfTpVyvxf+FjLxOJIHQXr6zrMjtba6PGUAYXLoGnVuE=
github.com/Trendyol/kafka-cronsumer v1.4.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.5 h1:i5Q6vR4ZRTtlb+uLimGJNBOQUiAtcbjn7Xc2FmPap/4=
github.com/Trendyol/otel-kafka-konsumer v0.0.5/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
Loading
Loading