Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: throw panic when produce message to retry topic is unsuccessful #150

Merged
merged 7 commits into from
Dec 29, 2024
23 changes: 21 additions & 2 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"errors"
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -239,8 +240,11 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
}
}

if produceErr := b.base.cronsumer.ProduceBatch(cronsumerMessages); produceErr != nil {
b.logger.Errorf("Error producing messages to exception/retry topic %s", produceErr.Error())
if err := b.retryBatchWithBackoff(cronsumerMessages); err != nil {
errorMsg := fmt.Sprintf(
"Error producing messages to exception/retry topic: %s. Error: %s", b.retryTopic, err.Error())
b.logger.Error(errorMsg)
panic(errorMsg)
}
}
}
Expand All @@ -249,3 +253,18 @@ func (b *batchConsumer) process(chunkMessages []*Message) {
b.metric.TotalProcessedMessagesCounter += int64(len(chunkMessages))
}
}

func (b *batchConsumer) retryBatchWithBackoff(retryableMessages []kcronsumer.Message) error {
var produceErr error

for attempt := 1; attempt <= 5; attempt++ {
produceErr = b.base.cronsumer.ProduceBatch(retryableMessages)
if produceErr == nil {
return nil
}
b.logger.Warnf("Error producing message (attempt %d/%d): %v", attempt, 5, produceErr)
time.Sleep((50 * time.Millisecond) * time.Duration(1<<attempt))
}

return produceErr
}
74 changes: 57 additions & 17 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,42 @@ func Test_batchConsumer_process(t *testing.T) {
},
}

// When
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// When && Then
bc.process([]*Message{{}, {}, {}})
})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
t.Run("When_Re-processing_Is_Failed_And_Retry_Failed_5_times", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true, retryBehaviorOpen: true, maxRetry: 5}
bc := batchConsumer{
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}

defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
if mc.times != mc.maxRetry {
t.Errorf("Expected produce to be called %d times, but got %d", mc.maxRetry, mc.times)
}
}()

// When && Then
bc.process([]*Message{{}, {}, {}})
})

t.Run("When_Transactional_Retry_Disabled", func(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
Expand All @@ -286,16 +311,14 @@ func Test_batchConsumer_process(t *testing.T) {
},
}

// When
bc.process([]*Message{{}, {}, {}})
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
// When && Then
bc.process([]*Message{{}, {}, {}})
})
}

Expand Down Expand Up @@ -468,7 +491,10 @@ func createMessages(partitionStart int, partitionEnd int) []*Message {
}

type mockCronsumer struct {
wantErr bool
wantErr bool
retryBehaviorOpen bool
times int
maxRetry int
}

func (m *mockCronsumer) Start() {
Expand All @@ -488,6 +514,13 @@ func (m *mockCronsumer) WithLogger(_ lcronsumer.Interface) {
}

func (m *mockCronsumer) Produce(_ kcronsumer.Message) error {
if m.retryBehaviorOpen {
if m.wantErr && m.times <= m.maxRetry {
m.times++
return errors.New("error")
}
return nil
}
if m.wantErr {
return errors.New("error")
}
Expand All @@ -499,6 +532,13 @@ func (m *mockCronsumer) GetMetricCollectors() []prometheus.Collector {
}

func (m *mockCronsumer) ProduceBatch([]kcronsumer.Message) error {
if m.retryBehaviorOpen {
if m.wantErr && m.times <= m.maxRetry {
m.times++
return errors.New("error")
}
return nil
}
if m.wantErr {
return errors.New("error")
}
Expand Down
25 changes: 22 additions & 3 deletions consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -151,13 +152,31 @@ func (c *consumer) process(message *Message) {

if consumeErr != nil && c.retryEnabled {
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
if err := c.retryWithBackoff(retryableMsg); err != nil {
errorMessage := fmt.Sprintf(
"Error producing message %s to exception/retry topic %s. Error: %s",
string(message.Value), c.retryTopic, err.Error())
c.logger.Error(errorMessage)
panic(err.Error())
}
}

if consumeErr == nil {
c.metric.TotalProcessedMessagesCounter++
}
}

func (c *consumer) retryWithBackoff(retryableMsg kcronsumer.Message) error {
var produceErr error

for attempt := 1; attempt <= 5; attempt++ {
produceErr = c.cronsumer.Produce(retryableMsg)
if produceErr == nil {
return nil
}
c.logger.Warnf("Error producing message (attempt %d/%d): %v", attempt, 5, produceErr)
time.Sleep((50 * time.Millisecond) * time.Duration(1<<attempt))
}

return produceErr
}
37 changes: 30 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func Test_consumer_process(t *testing.T) {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
}
})

t.Run("When_Re-processing_Is_Failed_And_Retry_Failed", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true}
Expand All @@ -104,16 +105,38 @@ func Test_consumer_process(t *testing.T) {
},
}

// When
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()

// When && Then
c.process(&Message{})
})

// Then
if c.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if c.metric.TotalUnprocessedMessagesCounter != 1 {
t.Fatalf("Total Unprocessed Message Counter must equal to 1")
t.Run("When_Re-processing_Is_Failed_And_Retry_Failed_5_times", func(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true, retryBehaviorOpen: true, maxRetry: 5}

c := consumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
consumeFn: func(*Message) error {
return errors.New("error case")
},
}

defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
if mc.times != mc.maxRetry {
t.Errorf("Expected produce to be called %d times, but got %d", mc.maxRetry, mc.times)
}
}()

// When && Then
c.process(&Message{})
})
}

Expand Down
Loading