Skip to content

Commit

Permalink
add gen+sink configs initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Logutko committed Jul 31, 2024
1 parent 46caafe commit 0f378dd
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
7 changes: 5 additions & 2 deletions source/generator/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package kafka

import (
"errors"
kcfg "github.com/pipelane/pipelaner/source/shared/kafka"
"time"

kcfg "github.com/pipelane/pipelaner/source/shared/kafka"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pipelane/pipelaner"
"github.com/rs/zerolog"

"github.com/pipelane/pipelaner"
)

type Kafka struct {
Expand Down Expand Up @@ -45,6 +47,7 @@ func NewConsumer(cfg *kcfg.KafkaConfig) (*kafka.Consumer, error) {
}

func (c *Kafka) Init(ctx *pipelaner.Context) error {
c.cfg = new(kcfg.KafkaConfig)
err := ctx.LaneItem().Config().ParseExtended(c.cfg)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion source/sink/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"encoding/json"
"errors"

"github.com/huandu/go-sqlbuilder"
"github.com/pipelane/pipelaner"
"github.com/rs/zerolog"

"github.com/pipelane/pipelaner"
)

type Clickhouse struct {
Expand All @@ -17,6 +19,7 @@ type Clickhouse struct {

func (c *Clickhouse) Init(ctx *pipelaner.Context) error {
c.logger = pipelaner.NewLogger()
c.clickConfig = new(ClickhouseConfig)
err := ctx.LaneItem().Config().ParseExtended(c.clickConfig)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion source/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package kafka

import (
"encoding/json"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/rs/zerolog"

"github.com/pipelane/pipelaner"
kCfg "github.com/pipelane/pipelaner/source/shared/kafka"
"github.com/rs/zerolog"
)

const timeout = 15 * 1000
Expand All @@ -22,6 +24,7 @@ type Kafka struct {

func (k *Kafka) Init(ctx *pipelaner.Context) error {
k.logger = pipelaner.NewLogger()
k.cfg = new(kCfg.KafkaConfig)
err := ctx.LaneItem().Config().ParseExtended(k.cfg)
if err != nil {
return err
Expand Down

0 comments on commit 0f378dd

Please sign in to comment.