From 8d869e33721b51c90e57846fe75d8eb80c25b8bd Mon Sep 17 00:00:00 2001 From: Mert Oz Date: Wed, 11 Dec 2024 15:41:25 +0300 Subject: [PATCH] feat: remove spaces from broker lists implementation (#151) --- consumer_config.go | 9 +++++++++ consumer_config_test.go | 43 +++++++++++++++++++++++++++++++++++++++++ producer.go | 1 + producer_config.go | 6 ++++++ producer_config_test.go | 43 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 102 insertions(+) diff --git a/consumer_config.go b/consumer_config.go index c4ac5ea..8c70133 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -87,6 +87,12 @@ func (cfg ReaderConfig) JSON() string { cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset)) } +func (cfg *ReaderConfig) removeSpaceBrokerList() { + for i := range cfg.Brokers { + cfg.Brokers[i] = strings.TrimSpace(cfg.Brokers[i]) + } +} + func (cfg *ConsumerConfig) JSON() string { if cfg == nil { return "{}" @@ -271,7 +277,10 @@ func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) { return nil, err } + cfg.Reader.removeSpaceBrokerList() + readerCfg := kafka.ReaderConfig(cfg.Reader) + readerCfg.Dialer = dialer if cfg.Rack != "" { readerCfg.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: cfg.Rack}} diff --git a/consumer_config_test.go b/consumer_config_test.go index f434e71..559065e 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -1,6 +1,7 @@ package kafka import ( + "reflect" "testing" "time" @@ -356,6 +357,48 @@ func TestConsumerConfig_JSONPretty(t *testing.T) { }) } +func TestConsumerConfig_removeSpaceBrokerList(t *testing.T) { + type fields struct { + Reader ReaderConfig + } + tests := []struct { + name string + fields fields + expected []string + }{ + { + name: "Should_Remove_Spaces_In_Broker_Lists", + fields: fields{ + Reader: ReaderConfig{Brokers: []string{" address", "address ", " address "}}, + }, + expected: []string{"address", "address", "address"}, + }, + { + name: "Should_Do_Nothing_When_Broker_Lists_Have_Not_Any_Space", + fields: fields{ + Reader: ReaderConfig{Brokers: []string{"address1", "address2", "address3"}}, + }, + expected: []string{"address1", "address2", "address3"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Given + cfg := &ConsumerConfig{ + Reader: tt.fields.Reader, + } + + // When + cfg.Reader.removeSpaceBrokerList() + + // Then + if !reflect.DeepEqual(cfg.Reader.Brokers, tt.expected) { + t.Errorf("For broker list %v, expected %v", cfg.Reader.Brokers, tt.expected) + } + }) + } +} + func getConsumerConfigExample() *ConsumerConfig { return &ConsumerConfig{ Rack: "stage", diff --git a/producer.go b/producer.go index 640b919..892f4a0 100644 --- a/producer.go +++ b/producer.go @@ -23,6 +23,7 @@ type producer struct { } func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) { + cfg.Writer.removeSpaceBrokerList() kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(cfg.Writer.Brokers...), Topic: cfg.Writer.Topic, diff --git a/producer_config.go b/producer_config.go index 7af1195..4fcf118 100644 --- a/producer_config.go +++ b/producer_config.go @@ -37,6 +37,12 @@ func (cfg WriterConfig) JSON() string { strings.Join(cfg.Brokers, "\", \""), GetBalancerString(cfg.Balancer), cfg.Compression.String()) } +func (cfg *WriterConfig) removeSpaceBrokerList() { + for i := range cfg.Brokers { + cfg.Brokers[i] = strings.TrimSpace(cfg.Brokers[i]) + } +} + type TransportConfig struct { MetadataTopics []string DialTimeout time.Duration diff --git a/producer_config_test.go b/producer_config_test.go index 0cf6388..c365b5f 100644 --- a/producer_config_test.go +++ b/producer_config_test.go @@ -1,6 +1,7 @@ package kafka import ( + "reflect" "testing" "github.com/segmentio/kafka-go" @@ -90,6 +91,48 @@ func TestProducerConfig_JsonPretty(t *testing.T) { }) } +func TestProducerConfig_removeSpaceBrokerList(t *testing.T) { + type fields struct { + Brokers []string + } + tests := []struct { + name string + fields fields + expected []string + }{ + { + name: "Should_Remove_Spaces_In_Broker_Lists", + fields: fields{ + Brokers: []string{" address", "address ", " address "}, + }, + expected: []string{"address", "address", "address"}, + }, + { + name: "Should_Do_Nothing_When_Broker_Lists_Have_Not_Any_Space", + fields: fields{ + Brokers: []string{"address1", "address2", "address3"}, + }, + expected: []string{"address1", "address2", "address3"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Given + cfg := &WriterConfig{ + Brokers: tt.fields.Brokers, + } + + // When + cfg.removeSpaceBrokerList() + + // Then + if !reflect.DeepEqual(cfg.Brokers, tt.expected) { + t.Errorf("For broker list %v, expected %v", cfg.Brokers, tt.expected) + } + }) + } +} + func TestProducerConfig_String(t *testing.T) { t.Run("Should_Convert_To_String", func(t *testing.T) { // Given