From 2693d530215eddb0b97e5b44ee0a6b34246b7503 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 24 Oct 2024 23:10:58 +0300 Subject: [PATCH] feat: able to inject cronsumer producer batch size and timeout --- consumer_config.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/consumer_config.go b/consumer_config.go index e5d89e1..c4ac5ea 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -138,8 +138,10 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { RetentionTime: cfg.Reader.RetentionTime, }, Producer: kcronsumer.ProducerConfig{ - Balancer: cfg.RetryConfiguration.Balancer, - Brokers: cfg.RetryConfiguration.Brokers, + Balancer: cfg.RetryConfiguration.Balancer, + Brokers: cfg.RetryConfiguration.Brokers, + BatchSize: cfg.RetryConfiguration.ProducerBatchSize, + BatchTimeout: cfg.RetryConfiguration.ProducerBatchTimeout, }, LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel), } @@ -227,6 +229,8 @@ type RetryConfiguration struct { SkipMessageByHeaderFn SkipMessageByHeaderFn Concurrency int QueueCapacity int + ProducerBatchSize int + ProducerBatchTimeout time.Duration } type BatchConfiguration struct {