Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add transactional retry feature #64

Merged
merged 10 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,45 @@ After running `docker-compose up` command, you can run any application you want.
}
</details>

<details>
<summary>With Non-Transactional Retry</summary>

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
NonTransactionalBatchRetryEnabled: true,
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
</details>


#### With Distributed Tracing Support

Expand Down Expand Up @@ -147,6 +186,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `nonTransactionalBatchRetryEnabled` | Manuel error handling for batch consumers | false |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
Expand Down
12 changes: 10 additions & 2 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,16 @@ func (b *batchConsumer) process(messages []*Message) {

if consumeErr != nil && b.retryEnabled {
cronsumerMessages := make([]kcronsumer.Message, 0, len(messages))
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
if b.nonTransactionalBatchRetryEnabled {
for i := range messages {
if messages[i].IsFailed {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
}
}
} else {
for i := range messages {
cronsumerMessages = append(cronsumerMessages, messages[i].toRetryableMessage(b.retryTopic))
}
}

if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil {
Expand Down
27 changes: 27 additions & 0 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,33 @@ func Test_batchConsumer_process(t *testing.T) {
})
}

func Test_batchConsumer_process_ReprocessingFailedWithRetryEnabled(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{
metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug),
retryEnabled: true,
nonTransactionalBatchRetryEnabled: true,
cronsumer: mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

// When
bc.process([]*Message{{IsFailed: true}, {IsFailed: true}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
}

type mockCronsumer struct {
wantErr bool
}
Expand Down
54 changes: 28 additions & 26 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,24 @@
}

type base struct {
cronsumer kcronsumer.Cronsumer
api API
logger LoggerInterface
metric *ConsumerMetric
context context.Context
messageCh chan *Message
quit chan struct{}
cancelFn context.CancelFunc
r Reader
retryTopic string
subprocesses subprocesses
wg sync.WaitGroup
concurrency int
once sync.Once
retryEnabled bool
distributedTracingEnabled bool
propagator propagation.TextMapPropagator
cronsumer kcronsumer.Cronsumer
api API
logger LoggerInterface
metric *ConsumerMetric
context context.Context
messageCh chan *Message
quit chan struct{}
cancelFn context.CancelFunc
r Reader
retryTopic string
subprocesses subprocesses
wg sync.WaitGroup
concurrency int
once sync.Once
retryEnabled bool
nonTransactionalBatchRetryEnabled bool
distributedTracingEnabled bool
propagator propagation.TextMapPropagator
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand All @@ -68,15 +69,16 @@
}

c := base{
metric: &ConsumerMetric{},
messageCh: make(chan *Message, cfg.Concurrency),
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
distributedTracingEnabled: cfg.DistributedTracingEnabled,
logger: log,
subprocesses: newSubProcesses(),
r: reader,
metric: &ConsumerMetric{},
messageCh: make(chan *Message, cfg.Concurrency),
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
nonTransactionalBatchRetryEnabled: cfg.NonTransactionalBatchRetryEnabled,
distributedTracingEnabled: cfg.DistributedTracingEnabled,
logger: log,
subprocesses: newSubProcesses(),
r: reader,

Check warning on line 81 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L72-L81

Added lines #L72 - L81 were not covered by tests
}

if cfg.DistributedTracingEnabled {
Expand Down
39 changes: 20 additions & 19 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,26 @@ type DialConfig struct {
}

type ConsumerConfig struct {
APIConfiguration APIConfiguration
Logger LoggerInterface
MetricConfiguration MetricConfiguration
SASL *SASLConfig
TLS *TLSConfig
Dial *DialConfig
BatchConfiguration *BatchConfiguration
ConsumeFn ConsumeFn
ClientID string
Rack string
LogLevel LogLevel
Reader ReaderConfig
RetryConfiguration RetryConfiguration
CommitInterval time.Duration
DistributedTracingEnabled bool
DistributedTracingConfiguration DistributedTracingConfiguration
Concurrency int
RetryEnabled bool
APIEnabled bool
APIConfiguration APIConfiguration
Logger LoggerInterface
MetricConfiguration MetricConfiguration
SASL *SASLConfig
TLS *TLSConfig
Dial *DialConfig
BatchConfiguration *BatchConfiguration
ConsumeFn ConsumeFn
ClientID string
Rack string
LogLevel LogLevel
Reader ReaderConfig
RetryConfiguration RetryConfiguration
CommitInterval time.Duration
DistributedTracingEnabled bool
DistributedTracingConfiguration DistributedTracingConfiguration
Concurrency int
RetryEnabled bool
APIEnabled bool
NonTransactionalBatchRetryEnabled bool
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion examples/with-kafka-batch-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/with-kafka-batch-consumer/load.txt
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
Expand Down
61 changes: 61 additions & 0 deletions examples/with-kafka-non-transactional-retry/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer"
"os"
"os/signal"
"time"
)

func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
BatchConfiguration: &kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
RetryEnabled: true,
NonTransactionalBatchRetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

fmt.Println("Consumer started...!")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}

// In order to load topic with data, use:
// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt
func batchConsumeFn(messages []*kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
// you can add custom error handling here & flag messages
for i := range messages {
if i%2 == 0 {
messages[i].IsFailed = true
} else {
messages[i].IsFailed = false
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
}
}

// you should return error here to retry failed messages
return errors.New("err")
Abdulsametileri marked this conversation as resolved.
Show resolved Hide resolved
}
1 change: 1 addition & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Message struct {
Partition int
Offset int64
HighWaterMark int64
IsFailed bool
Key []byte
Value []byte
Headers []Header
Expand Down
Loading