Skip to content

Commit

Permalink
feat: little changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Nov 15, 2023
1 parent df3b5ea commit 9e54aa1
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 79 deletions.
14 changes: 4 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ After running `docker-compose up` command, you can run any application you want.
</details>

<details>
<summary>With Non-Transactional Retry</summary>
<summary>With Disabling Transactional Retry</summary>

func main() {
consumerCfg := &kafka.ConsumerConfig{
Expand Down Expand Up @@ -161,7 +161,7 @@ After running `docker-compose up` command, you can run any application you want.
}
}

// you should return error here to retry failed messages
// you must return err here to retry failed messages
return errors.New("err")
}

Expand Down Expand Up @@ -194,7 +194,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info |
| `concurrency` | Number of goroutines used at listeners | 1 |
| `retryEnabled` | Retry/Exception consumer is working or not | false |
| `transactionalRetry` | Set false if you want to handle failed messages manually | true |
| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true |
| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s |
| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | |
| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.42#Dialer) | |
Expand Down Expand Up @@ -240,10 +240,4 @@ Kafka Konsumer offers an API that handles exposing several metrics.
| Metric Name | Description | Value Type |
|---------------------------------------------------------|---------------------------------------------|------------|
| kafka_konsumer_processed_messages_total_current | Total number of processed messages. | Counter |
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
| kafka_konsumer_processed_batch_messages_total_current | Total number of processed batch messages. | Counter |
| kafka_konsumer_unprocessed_batch_messages_total_current | Total number of unprocessed batch messages. | Counter |

**NOTE:** `kafka_konsumer_processed_batch_messages_total_current`
and `kafka_konsumer_unprocessed_batch_messages_total_current` will be deprecated in the next releases. Please
use `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current` instead.
| kafka_konsumer_unprocessed_messages_total_current | Total number of unprocessed messages. | Counter |
2 changes: 1 addition & 1 deletion batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (b *batchConsumer) process(messages []*Message) {

if consumeErr != nil {
b.logger.Warnf("Consume Function Err %s, Messages will be retried", consumeErr.Error())
// Try to process same messages again
// Try to process same messages again for resolving transient network errors etc.
if consumeErr = b.consumeFn(messages); consumeErr != nil {
b.logger.Warnf("Consume Function Again Err %s, messages are sending to exception/retry topic %s", consumeErr.Error(), b.retryTopic)
b.metric.TotalUnprocessedMessagesCounter += int64(len(messages))
Expand Down
51 changes: 27 additions & 24 deletions batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,33 +167,36 @@ func Test_batchConsumer_process(t *testing.T) {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
})
}
t.Run("When_Transactional_Retry_Disabled", func(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{
metric: &ConsumerMetric{},
logger: NewZapLogger(LogLevelDebug),
retryEnabled: true,
transactionalRetry: false,
cronsumer: mc,
},
consumeFn: func(messages []*Message) error {
messages[0].IsFailed = true
messages[1].IsFailed = true

func Test_batchConsumer_process_ReprocessingFailedWithRetryEnabled(t *testing.T) {
// Given
mc := &mockCronsumer{wantErr: true}
bc := batchConsumer{
base: &base{
metric: &ConsumerMetric{}, logger: NewZapLogger(LogLevelDebug),
retryEnabled: true,
transactionalRetry: true,
cronsumer: mc,
},
consumeFn: func(messages []*Message) error {
return errors.New("error case")
},
}
return errors.New("error case")
},
}

// When
bc.process([]*Message{{IsFailed: true}, {IsFailed: true}, {}})
// When
bc.process([]*Message{{}, {}, {}})

// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
// Then
if bc.metric.TotalProcessedMessagesCounter != 0 {
t.Fatalf("Total Processed Message Counter must equal to 0")
}
if bc.metric.TotalUnprocessedMessagesCounter != 3 {
t.Fatalf("Total Unprocessed Message Counter must equal to 3")
}
})
}

type mockCronsumer struct {
Expand Down
29 changes: 0 additions & 29 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ type metricCollector struct {

totalUnprocessedMessagesCounter *prometheus.Desc
totalProcessedMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalUnprocessedBatchMessagesCounter *prometheus.Desc
// Deprecated: it will be removed next releases
totalProcessedBatchMessagesCounter *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
Expand All @@ -37,19 +33,6 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
ch <- prometheus.MustNewConstMetric(
s.totalProcessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalProcessedMessagesCounter),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.totalUnprocessedBatchMessagesCounter,
prometheus.CounterValue,
float64(s.consumerMetric.TotalUnprocessedMessagesCounter),
[]string{}...,
)
}

func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
Expand All @@ -68,18 +51,6 @@ func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector {
[]string{},
nil,
),
totalProcessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "processed_batch_messages_total", "current"),
"Total number of processed batch messages.",
[]string{},
nil,
),
totalUnprocessedBatchMessagesCounter: prometheus.NewDesc(
prometheus.BuildFQName(Name, "unprocessed_batch_messages_total", "current"),
"Total number of unprocessed batch messages.",
[]string{},
nil,
),
}
}

Expand Down
2 changes: 1 addition & 1 deletion consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) {
quit: make(chan struct{}),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
transactionalRetry: cfg.TransactionalRetry,
transactionalRetry: *cfg.TransactionalRetry,

Check warning on line 77 in consumer_base.go

View check run for this annotation

Codecov / codecov/patch

consumer_base.go#L77

Added line #L77 was not covered by tests
distributedTracingEnabled: cfg.DistributedTracingEnabled,
logger: log,
subprocesses: newSubProcesses(),
Expand Down
9 changes: 8 additions & 1 deletion consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ConsumerConfig struct {
Concurrency int
RetryEnabled bool
APIEnabled bool
TransactionalRetry bool `default:"true"`
TransactionalRetry *bool
}

func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
Expand Down Expand Up @@ -194,4 +194,11 @@ func (cfg *ConsumerConfig) setDefaults() {
cfg.DistributedTracingConfiguration.TracerProvider = otel.GetTracerProvider()
}
}
if cfg.TransactionalRetry == nil {
cfg.TransactionalRetry = NewBoolPtr(true)
}
}

func NewBoolPtr(value bool) *bool {
return &value
}
3 changes: 3 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func TestConsumerConfig_validate(t *testing.T) {
if cfg.Reader.CommitInterval != time.Second {
t.Fatalf("Reader Commit Interval default value must equal to 1s")
}
if *cfg.TransactionalRetry != true {
t.Fatal("Default Transactional Retry is true")
}
})
t.Run("Set_Defaults_When_Distributed_Tracing_Enabled", func(t *testing.T) {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func main() {
BatchConsumeFn: batchConsumeFn,
},
RetryEnabled: true,
TransactionalRetry: false,
TransactionalRetry: kafka.NewBoolPtr(false),
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
StartTimeCron: "*/5 * * * *",
WorkDuration: 4 * time.Minute,
MaxRetry: 3,
},
}
Expand All @@ -53,6 +53,6 @@ func batchConsumeFn(messages []*kafka.Message) error {
}
}

// you should return error here to retry failed messages
// you must return error here to retry only failed messages
return errors.New("err")
}
4 changes: 0 additions & 4 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,4 @@ package kafka
type ConsumerMetric struct {
TotalUnprocessedMessagesCounter int64
TotalProcessedMessagesCounter int64
// Deprecated
TotalUnprocessedBatchMessagesCounter int64
// Deprecated
TotalProcessedBatchMessagesCounter int64
}
9 changes: 4 additions & 5 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {
}
}

func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) {
func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Disabled(t *testing.T) {
// Given
topic := "cronsumer-topic"
consumerGroup := "cronsumer-cg"
topic := "nontransactional-cronsumer-topic"
consumerGroup := "nontransactional-cronsumer-cg"
brokerAddress := "localhost:9092"

retryTopic := "retry-topic"
Expand All @@ -175,7 +175,7 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) {
defer cleanUpThisToo()

consumerCfg := &kafka.ConsumerConfig{
TransactionalRetry: false,
TransactionalRetry: kafka.NewBoolPtr(false),
Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Expand Down Expand Up @@ -206,7 +206,6 @@ func Test_Should_Batch_Retry_Only_Failed_Messages(t *testing.T) {
var expectedOffset int64 = 1
conditionFunc := func() bool {
lastOffset, _ := retryConn.ReadLastOffset()
fmt.Println(lastOffset)
return lastOffset == expectedOffset
}

Expand Down

0 comments on commit 9e54aa1

Please sign in to comment.