diff --git a/consumer_config.go b/consumer_config.go index e5d89e1..c4ac5ea 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -138,8 +138,10 @@ func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { RetentionTime: cfg.Reader.RetentionTime, }, Producer: kcronsumer.ProducerConfig{ - Balancer: cfg.RetryConfiguration.Balancer, - Brokers: cfg.RetryConfiguration.Brokers, + Balancer: cfg.RetryConfiguration.Balancer, + Brokers: cfg.RetryConfiguration.Brokers, + BatchSize: cfg.RetryConfiguration.ProducerBatchSize, + BatchTimeout: cfg.RetryConfiguration.ProducerBatchTimeout, }, LogLevel: lcronsumer.Level(cfg.RetryConfiguration.LogLevel), } @@ -227,6 +229,8 @@ type RetryConfiguration struct { SkipMessageByHeaderFn SkipMessageByHeaderFn Concurrency int QueueCapacity int + ProducerBatchSize int + ProducerBatchTimeout time.Duration } type BatchConfiguration struct {