Skip to content

Commit

Permalink
feat: cronsumer internal queue capacity, producer batch size and time…
Browse files Browse the repository at this point in the history
…out expose (#147)

* feat: able to use kafka cronsumer queue capacity field

* feat: able to inject cronsumer producer batch size and timeout

* docs: add new cronsumer exposed field to the readme

* feat: kafka cronsumer v1.1.5 dump
  • Loading branch information
Abdulsametileri authored Oct 30, 2024
1 parent b9c0b2b commit 541cca7
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 9 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,17 +241,20 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is `kafka_konsumer`. Currently, there are two exposed prometheus metrics. `processed_messages_total` and `unprocessed_messages_total` So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current`. | kafka_konsumer |
| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout |
| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled |
| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster |
| `transport.DialTimeout ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | 5s |
| `transport.IdleTimeout ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | 30s |
| `transport.MetadataTTL ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | 6s |
| `transport.MetadataTopics ` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | all topics in cluster |
| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false |
| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() |
| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() |
| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | |
| `retryConfiguration.clientId` | [see doc]((https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#Transport) | |
| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | |
| `retryConfiguration.metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current | kafka_cronsumer |
| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | |
| `retryConfiguration.queueCapacity` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#ReaderConfig.QueueCapacity) | 100 |
| `retryConfiguration.producerBatchSize` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#WriterConfig.BatchSize) | 100 |
| `retryConfiguration.producerBatchTimeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@v0.4.47#WriterConfig.BatchTimeout) | 100 |
| `retryConfiguration.topic` | Retry/Exception topic names | |
| `retryConfiguration.brokers` | Retry topic brokers urls | |
| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 |
Expand Down
10 changes: 8 additions & 2 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
MaxRetry: cfg.RetryConfiguration.MaxRetry,
VerifyTopicOnStartup: cfg.RetryConfiguration.VerifyTopicOnStartup,
Concurrency: cfg.RetryConfiguration.Concurrency,
QueueCapacity: cfg.RetryConfiguration.QueueCapacity,
MinBytes: cfg.Reader.MinBytes,
MaxBytes: cfg.Reader.MaxBytes,
MaxWait: cfg.Reader.MaxWait,
Expand All @@ -137,8 +138,10 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config {
RetentionTime: cfg.Reader.RetentionTime,
},
Producer: kcronsumer.ProducerConfig{
Balancer: cfg.RetryConfiguration.Balancer,
Brokers: cfg.RetryConfiguration.Brokers,
Balancer: cfg.RetryConfiguration.Balancer,
Brokers: cfg.RetryConfiguration.Brokers,
BatchSize: cfg.RetryConfiguration.ProducerBatchSize,
BatchTimeout: cfg.RetryConfiguration.ProducerBatchTimeout,
},
LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel),
}
Expand Down Expand Up @@ -225,6 +228,9 @@ type RetryConfiguration struct {
WorkDuration time.Duration
SkipMessageByHeaderFn SkipMessageByHeaderFn
Concurrency int
QueueCapacity int
ProducerBatchSize int
ProducerBatchTimeout time.Duration
}

type BatchConfiguration struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2
go 1.19

require (
github.com/Trendyol/kafka-cronsumer v1.5.4
github.com/Trendyol/kafka-cronsumer v1.5.5
github.com/Trendyol/otel-kafka-konsumer v0.0.7
github.com/ansrivas/fiberprometheus/v2 v2.6.1
github.com/gofiber/fiber/v2 v2.52.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/Trendyol/kafka-cronsumer v1.5.4 h1:r2iyWJ8E+rd5IoRGv/XZ2bKMknxfLh2eApPhMiJodR4=
github.com/Trendyol/kafka-cronsumer v1.5.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.5-0.20241024185446-b44371b7a3ec h1:82NtbRoJBnH7Qq/9VIMyRbRi6jTc9yytqrpR3KsH5Ks=
github.com/Trendyol/kafka-cronsumer v1.5.5-0.20241024185446-b44371b7a3ec/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.5 h1:R+tZd/A//0NDQbmBwkuMolncOXIeKlqRt1bvnmU6Fn8=
github.com/Trendyol/kafka-cronsumer v1.5.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down
2 changes: 1 addition & 1 deletion test/integration/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
)

require (
github.com/Trendyol/kafka-cronsumer v1.5.4 // indirect
github.com/Trendyol/kafka-cronsumer v1.5.5 // indirect
github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions test/integration/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/Trendyol/kafka-cronsumer v1.5.3/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNu
github.com/Trendyol/kafka-cronsumer v1.5.4-0.20240808131305-10cf27589160/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.4-0.20240827135347-7ed5187ea81d/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.4/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.5-0.20241024185446-b44371b7a3ec/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/kafka-cronsumer v1.5.5/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU=
github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4=
github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
Expand Down

0 comments on commit 541cca7

Please sign in to comment.