From 9503ce8df4cacfae80d8557d488378c30e31ef5c Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 25 Jul 2024 13:32:29 +0300 Subject: [PATCH] ref configs --- config.go | 43 +++++++++++++++------------- source/generator/kafka/kafka.go | 22 ++++++-------- source/sink/clickhouse/clickhouse.go | 6 ++-- tree.go | 2 +- 4 files changed, 35 insertions(+), 38 deletions(-) diff --git a/config.go b/config.go index 1e420db..5d685c3 100644 --- a/config.go +++ b/config.go @@ -62,19 +62,20 @@ type BaseLaneConfig struct { } type KafkaConfig struct { - KafkaBrokers string `pipelane:"brokers"` - KafkaVersion string `pipelane:"version"` - KafkaOffsetNewest bool `pipelane:"offset_newest"` - KafkaSASLEnabled bool `pipelane:"sasl_enabled"` - KafkaSASLMechanism string `pipelane:"sasl_mechanism"` - KafkaSASLUsername string `pipelane:"sasl_username"` - KafkaSASLPassword string `pipelane:"sasl_password"` - KafkaAutoCommitEnabled bool `pipelane:"auto_commit_enabled"` - KafkaConsumerGroupId string `pipelane:"consumer_group_id"` - KafkaTopics []string `pipelane:"topics"` - KafkaAutoOffsetReset string `pipelane:"auto_offset_reset"` - KafkaBatchSize int `pipelane:"batch_size"` - KafkaSchemaRegistry string `pipelane:"schema_registry"` + KafkaBrokers string `pipelane:"brokers"` + KafkaVersion string `pipelane:"version"` + KafkaOffsetNewest bool `pipelane:"offset_newest"` + KafkaSASLEnabled bool `pipelane:"sasl_enabled"` + KafkaSASLMechanism string `pipelane:"sasl_mechanism"` + KafkaSASLUsername string `pipelane:"sasl_username"` + KafkaSASLPassword string `pipelane:"sasl_password"` + KafkaAutoCommitEnabled bool `pipelane:"auto_commit_enabled"` + KafkaConsumerGroupId string `pipelane:"consumer_group_id"` + KafkaTopics []string `pipelane:"topics"` + KafkaAutoOffsetReset string `pipelane:"auto_offset_reset"` + KafkaBatchSize int `pipelane:"batch_size"` + KafkaSchemaRegistry string `pipelane:"schema_registry"` + DelayReadTopic time.Duration `pipelane:"delay_read_topic"` Internal } @@ -93,6 +94,7 @@ type ClickHouseConfig struct { BlockBufferSize uint8 `pipelane:"block_buffer_size"` MaxCompressionBuffer string `pipelane:"max_compression_buffer"` EnableDebug bool `pipelane:"enable_debug"` + TableName string `pipelane:"table_name"` Internal } @@ -141,7 +143,7 @@ func NewBaseConfigWithTypeAndExtended( return &c, nil } -func readToml(file string) (map[string]any, error) { +func ReadToml(file string) (map[string]any, error) { var c map[string]any _, err := toml.DecodeFile(file, &c) if err != nil { @@ -218,19 +220,20 @@ func CastConfig[K, V any](config K) *V { res := new(V) if reflect.ValueOf(res).Kind() == reflect.Struct { - panic("V is not struct") + panic("Result is not struct") } - val := reflect.ValueOf(config).Elem() - for i := 0; i < val.NumField(); i++ { - fieldName := val.Type().Field(i).Name + resVal := reflect.ValueOf(res).Elem() + confVal := reflect.ValueOf(config).Elem() + for i := 0; i < resVal.NumField(); i++ { + fieldName := resVal.Type().Field(i).Name - valueField := val.FieldByName(fieldName) + valueField := confVal.FieldByName(fieldName) if !valueField.IsValid() { panic(fmt.Sprintf("No such field: %s in valueField", fieldName)) } - setField := reflect.ValueOf(res).Elem().FieldByName(fieldName) + setField := resVal.FieldByName(fieldName) if !setField.IsValid() { panic(fmt.Sprintf("No such field: %s in setField", fieldName)) } diff --git a/source/generator/kafka/kafka.go b/source/generator/kafka/kafka.go index 8707039..8aa800b 100644 --- a/source/generator/kafka/kafka.go +++ b/source/generator/kafka/kafka.go @@ -13,17 +13,14 @@ import ( ) type Kafka struct { - cons *kafka.Consumer - cfg *pipelaner.KafkaConfig - logger zerolog.Logger - ticker *time.Ticker - delayReadTopic time.Duration + cons *kafka.Consumer + cfg *pipelaner.KafkaConfig + logger zerolog.Logger } func NewKafka( cfg *pipelaner.KafkaConfig, logger zerolog.Logger, - delayReadTopic time.Duration, ) (*Kafka, error) { castCfg := pipelaner.CastConfig[*pipelaner.KafkaConfig, config.Kafka](cfg) @@ -32,10 +29,9 @@ func NewKafka( return nil, err } return &Kafka{ - cons: consumer, - cfg: cfg, - logger: logger, - delayReadTopic: delayReadTopic, + cons: consumer, + cfg: cfg, + logger: logger, }, nil } @@ -49,14 +45,14 @@ func (c *Kafka) Init(_ *pipelaner.Context) error { } func (c *Kafka) Generate(ctx *pipelaner.Context, input chan<- any) { - c.ticker = time.NewTicker(c.delayReadTopic) - defer c.ticker.Stop() + ticker := time.NewTicker(c.cfg.DelayReadTopic) + defer ticker.Stop() for { select { case <-ctx.Context().Done(): return - case <-c.ticker.C: + case <-ticker.C: msg, err := c.cons.ReadMessage(-1) var kafkaErr *kafka.Error if err != nil && errors.As(err, &kafkaErr) && kafkaErr.IsTimeout() { diff --git a/source/sink/clickhouse/clickhouse.go b/source/sink/clickhouse/clickhouse.go index a0e36d5..70bd283 100644 --- a/source/sink/clickhouse/clickhouse.go +++ b/source/sink/clickhouse/clickhouse.go @@ -15,14 +15,12 @@ type Clickhouse struct { logger zerolog.Logger cfg *pipelaner.ClickHouseConfig client *clickhouse.ClientClickhouse - table string } -func NewClickhouse(logger zerolog.Logger, cfg *pipelaner.ClickHouseConfig, table string) *Clickhouse { +func NewClickhouse(logger zerolog.Logger, cfg *pipelaner.ClickHouseConfig) *Clickhouse { return &Clickhouse{ logger: logger, cfg: cfg, - table: table, } } @@ -51,7 +49,7 @@ func (c *Clickhouse) write(ctx context.Context, data map[string]any) { } sb := sqlbuilder.NewInsertBuilder() - sb.InsertInto(c.table).Cols(cols...).Values(values).SetFlavor(sqlbuilder.ClickHouse) + sb.InsertInto(c.cfg.TableName).Cols(cols...).Values(values).SetFlavor(sqlbuilder.ClickHouse) sql, args := sb.Build() diff --git a/tree.go b/tree.go index 8bc0361..2e3d04a 100644 --- a/tree.go +++ b/tree.go @@ -102,7 +102,7 @@ func NewTreeFrom( ctx context.Context, file string, ) (*TreeLanes, error) { - c, err := readToml(file) + c, err := ReadToml(file) if err != nil { return nil, err }