Skip to content

Commit

Permalink
feat: little changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Nov 15, 2023
1 parent 9e54aa1 commit 13c26bb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ After running `docker-compose up` command, you can run any application you want.
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
TransactionalRetry: false,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
Expand Down
16 changes: 11 additions & 5 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Processing_Is_Successful", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true},
consumeFn: func([]*Message) error {
return nil
},
Expand All @@ -84,7 +84,7 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
gotOnlyOneTimeException := true
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []*Message) error {
if gotOnlyOneTimeException {
gotOnlyOneTimeException = false
Expand All @@ -108,7 +108,7 @@ func Test_batchConsumer_process(t *testing.T) {
t.Run("When_Re-processing_Is_Failed_And_Retry_Disabled", func(t *testing.T) {
// Given
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug)},
base: &base{metric: &ConsumerMetric{}, transactionalRetry: true, logger: NewZapLogger(LogLevelDebug)},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
Expand All @@ -129,7 +129,10 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
mc := mockCronsumer{}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
Expand All @@ -150,7 +153,10 @@ func Test_batchConsumer_process(t *testing.T) {
// Given
mc := mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc},
base: &base{
metric: &ConsumerMetric{}, transactionalRetry: true,
logger: NewZapLogger(LogLevelDebug), retryEnabled: true, cronsumer: &mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
Expand Down
16 changes: 10 additions & 6 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ type Message struct {
Partition int
Offset int64
HighWaterMark int64
IsFailed bool
Key []byte
Value []byte
Headers []Header
WriterData interface{}
Time time.Time

// IsFailed Is only used on transactional retry disabled
IsFailed bool

Key []byte
Value []byte
Headers []Header
WriterData interface{}
Time time.Time

// Context To enable distributed tracing support
Context context.Context
}
Expand Down

0 comments on commit 13c26bb

Please sign in to comment.