From e1a05ba7e29cc056a3d8e592eaf62c1e2809f180 Mon Sep 17 00:00:00 2001 From: Alexey Khokhlov Date: Thu, 12 Dec 2024 19:45:01 +0300 Subject: [PATCH] add documentations --- Makefile | 1 - README.md | 147 +++++++++++++++++- README_RU.md | 144 +++++++++++++++++ agent.go | 13 +- example/README.md | 147 ++++++++++++++++-- example/main.go | 14 +- example/pkl/PklProject | 2 +- example/pkl/PklProject.deps.json | 4 +- example/pkl/config.pkl | 83 +++++++--- example/pkl/example.pkl | 6 +- gen/settings/healthcheck/Config.pkl.go | 2 - gen/settings/logger/loglevel/LogLevel.pkl.go | 3 + gen/settings/metrics/Config.pkl.go | 4 +- gen/source/common/Kafka.pkl.go | 6 +- .../common/saslmechanism/SASLMechanism.pkl.go | 3 - .../{KafkaConsumer.pkl.go => Kafka.pkl.go} | 34 ++-- gen/source/input/init.pkl.go | 2 +- gen/source/sink/Clickhouse.pkl.go | 82 ---------- gen/source/sink/Console.pkl.go | 10 ++ gen/source/sink/Http.pkl.go | 52 +++++++ .../{KafkaProducer.pkl.go => Kafka.pkl.go} | 28 ++-- gen/source/sink/init.pkl.go | 3 +- gen/source/sink/method/Method.pkl.go | 43 +++++ go.mod | 1 + go.sum | 4 + internal/logger/logger.go | 4 +- internal/utils/channels.go | 38 +++-- pipelaner.go | 10 +- pipeline/node/input.go | 7 +- pipeline/node/sink.go | 9 +- pipeline/node/transform.go | 10 +- pkl/settings/Settings.pkl | 4 +- .../healthcheck/HealthcheckConfig.pkl | 2 - pkl/settings/logger/LoggerConfig.pkl | 4 +- pkl/settings/metrics/MetricsConfig.pkl | 5 +- pkl/source/sink/Sinks.pkl | 2 + sources/embedded.go | 1 + sources/generator/README.md | 1 + sources/generator/cmd/README.md | 1 + sources/generator/kafka/README.md | 1 + sources/generator/kafka/consumer.go | 14 +- sources/generator/kafka/kafka.go | 6 +- sources/generator/pipelaner/README.md | 1 + sources/shared/chunker/chunker.go | 56 +++---- sources/sink/README.md | 1 + sources/sink/clickhouse/README.md | 1 + sources/sink/clickhouse/client.go | 1 - sources/sink/console/README.md | 1 + sources/sink/console/console.go | 27 +++- sources/sink/http/README.md | 1 + sources/sink/kafka/README.md | 1 + sources/sink/kafka/kafka.go | 6 +- sources/sink/kafka/producer.go | 14 +- sources/sink/pipelaner/README.md | 1 + sources/transform/README.md | 2 + sources/transform/batch/README.md | 1 + sources/transform/chunks/README.md | 1 + sources/transform/chunks/chunks.go | 3 +- sources/transform/debounce/README.md | 1 + sources/transform/filter/README.md | 1 + sources/transform/remap/README.md | 1 + sources/transform/throttling/README.md | 1 + 62 files changed, 800 insertions(+), 279 deletions(-) create mode 100644 README_RU.md rename gen/source/input/{KafkaConsumer.pkl.go => Kafka.pkl.go} (60%) create mode 100644 gen/source/sink/Http.pkl.go rename gen/source/sink/{KafkaProducer.pkl.go => Kafka.pkl.go} (53%) create mode 100644 gen/source/sink/method/Method.pkl.go create mode 100644 sources/generator/README.md create mode 100644 sources/generator/cmd/README.md create mode 100644 sources/generator/kafka/README.md create mode 100644 sources/generator/pipelaner/README.md create mode 100644 sources/sink/README.md create mode 100644 sources/sink/clickhouse/README.md create mode 100644 sources/sink/console/README.md create mode 100644 sources/sink/http/README.md create mode 100644 sources/sink/kafka/README.md create mode 100644 sources/sink/pipelaner/README.md create mode 100644 sources/transform/README.md create mode 100644 sources/transform/batch/README.md create mode 100644 sources/transform/chunks/README.md create mode 100644 sources/transform/debounce/README.md create mode 100644 sources/transform/filter/README.md create mode 100644 sources/transform/remap/README.md create mode 100644 sources/transform/throttling/README.md diff --git a/Makefile b/Makefile index 46cf297..d589132 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,6 @@ lint: .PHONY: test test: @go test -count=1 -v ./... -race - .PHONY: proto proto: @rm -rf sources/shared/proto diff --git a/README.md b/README.md index f5dd804..d4b015f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,144 @@ -[![Go Lint](https://github.com/pipelane/pipelaner/actions/workflows/lint.yml/badge.svg)](https://github.com/pipelane/pipelaner/actions/workflows/lint.yml) -[![Go Test](https://github.com/pipelane/pipelaner/actions/workflows/test.yml/badge.svg)](https://github.com/pipelane/pipelaner/actions/workflows/test.yml) -# pipelaner + +# **Pipelaner** + +**Pipelaner** is a high-performance and efficient **Framework and Agent** for creating data pipelines. The core of pipeline descriptions is based on the **_Configuration As Code_** concept and the [**Pkl**](https://github.com/apple/pkl) configuration language by **Apple**. + +Pipelaner manages data streams through three key entities: **Generator**, **Transform**, and **Sink**. + +--- + +## 📖 **Contents** +- [Core Entities](#core-entities) + - [Generator](#generator) + - [Transform](#transform) + - [Sink](#sink) + - [Basic Parameters](#basic-parameters) +- [Built-in Pipeline Elements](#built-in-pipeline-elements) + - [Generators](#generators) + - [Transforms](#transforms) + - [Sinks](#sinks) +- [Scalability](#scalability) + - [Single-Node Deployment](#single-node-deployment) + - [Multi-Node Deployment](#multi-node-deployment) +- [Support](#support) +- [License](#license) + +--- + +## 📌 **Core Entities** + +### **Generator** +The component responsible for creating or retrieving source data for the pipeline. Generators can produce messages, events, or retrieve data from various sources such as files, databases, or APIs. + +- **Example use case:** + Reading data from a file or receiving events via webhooks. + +--- + +### **Transform** +The component that processes data within the pipeline. Transforms perform operations such as filtering, aggregation, data transformation, or cleaning to prepare it for further processing. + +- **Example use case:** + Filtering records based on specific conditions or converting data format from JSON to CSV. + +--- + +### **Sink** +The final destination for the data stream. Sinks send processed data to a target system, such as a database, API, or message queue. + +- **Example use case:** + Saving data to PostgreSQL or sending it to a Kafka topic. + +--- + +### **Basic Parameters** +| **Parameter** | **Type** | **Description** | +|-----------------------|---------|---------------------------------------------------------------------------------------------------| +| `name` | String | Unique name of the pipeline element. | +| `threads` | Int | Number of threads for processing messages. Defaults to the value of `GOMAXPROC`. | +| `outputBufferSize` | Int | Size of the output buffer. **Not applicable to Sink components.** | + +--- + +## 📦 **Built-in Pipeline Elements** + +### **Generators** +| **Name** | **Description** | +|------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------| +| [**cmd**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/cmd) | Reads the output of a command, e.g., `"/usr/bin/log" "stream --style ndjson"`. | +| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/kafka) | Apache Kafka consumer that streams `Value` into the pipeline. | +| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/pipelaner) | GRPC server that streams values via [gRPC](https://github.com/pipelane/pipelaner/tree/main/proto/service.proto). | + +--- + +### **Transforms** +| **Name** | **Description** | +|--------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------| +| [**batch**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/batch) | Forms batches of data with a specified size. | +| [**chunks**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/chunks) | Splits incoming data into chunks. | +| [**debounce**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/debounce) | Eliminates "bounce" (frequent repeats) in data. | +| [**filter**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/filter) | Filters data based on specified conditions. | +| [**remap**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/remap) | Reassigns fields or transforms the data structure. | +| [**throttling**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/throttling) | Limits data processing rate. | + +--- + +### **Sinks** +| **Name** | **Description** | +|--------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------| +| [**clickhouse**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/clickhouse) | Sends data to a ClickHouse database. | +| [**console**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/console) | Outputs data to the console. | +| [**http**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/http) | Sends data to a specified HTTP endpoint. | +| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/kafka) | Publishes data to Apache Kafka. | +| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/pipelaner) | Streams data via gRPC to other Pipelaner nodes. | + +--- + +## 🌐 **Scalability** + +### **Single-Node Deployment** +For operation on a single host: +![Single Node](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-singlehost.png/?raw=true "Single Node Deployment") + +--- + +### **Multi-Node Deployment** +For distributed data processing across multiple hosts: +![Multi-Node](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-multihost.png/?raw=true "Multi-Node Deployment") + +For distributed interaction between nodes, you can use: +1. **gRPC** — via generators and sinks with the parameter `sourceName: "pipelaner"`. +2. **Apache Kafka** — for reading/writing data via topics. + +Example configuration using Kafka: +```yaml +new Inputs.Kafka { + common { + topics { + "kafka-topic" + } + } +} + +new Sinks.Kafka { + common { + topics { + "kafka-topic" + } + } +} +``` + +--- + +## 🤝 **Support** + +If you have questions, suggestions, or encounter any issues, please [create an Issue](https://github.com/pipelane/pipelaner/issues/new) in the repository. +You can also participate in discussions in the [Discussions](https://github.com/pipelane/pipelaner/discussions) section. + +--- + +## 📜 **License** + +This project is licensed under the [Apache 2.0](https://github.com/pipelane/pipelaner/blob/main/LICENSE) license. +You are free to use, modify, and distribute the code under the terms of the license. diff --git a/README_RU.md b/README_RU.md new file mode 100644 index 0000000..4052942 --- /dev/null +++ b/README_RU.md @@ -0,0 +1,144 @@ + +# **Pipelaner** + +**Pipelaner** — это высокопроизводительный и эффективный **Framework и Агент** для создания data pipelines. Основой описания пайплайнов служит концепция **_Configuration As Code_** и язык конфигураций [**Pkl**](https://github.com/apple/pkl) от компании **Apple**. + +Pipelaner управляет потоками данных с помощью трех ключевых сущностей: **Generator**, **Transform**, и **Sink**. + +--- + +## 📖 **Содержание** +- [Основные сущности](#основные-сущности) + - [Generator](#generator) + - [Transform](#transform) + - [Sink](#sink) + - [Базовые параметры](#базовые-параметры) +- [Элементы пайплайнов "из коробки"](#элементы-пайплайнов-из-коробки) + - [Generators](#generators) + - [Transforms](#transforms) + - [Sinks](#sinks) +- [Масштабируемость](#масштабируемость) + - [Одноузловое развертывание](#одноузловое-развертывание) + - [Многоузловое развертывание](#многоузловое-развертывание) +- [Поддержка](#поддержка) +- [Лицензия](#лицензия) + +--- + +## 📌 **Основные сущности** + +### **Generator** +Компонент, отвечающий за создание или получение исходных данных для потока. Generator может генерировать сообщения, события или извлекать данные из различных источников, таких как файлы, базы данных или API. + +- **Пример использования:** + Чтение данных из файла или получение событий через вебхуки. + +--- + +### **Transform** +Компонент, который обрабатывает данные в потоке. Transform выполняет операции, такие как фильтрация, агрегация, преобразование структуры или очистка данных, чтобы подготовить их для дальнейшей обработки. + +- **Пример использования:** + Фильтрация записей с заданными условиями или преобразование формата данных из JSON в CSV. + +--- + +### **Sink** +Конечная точка потока данных. Sink отправляет обработанные данные в целевую систему, например, в базу данных, API или систему очередей сообщений. + +- **Пример использования:** + Сохранение данных в PostgreSQL или отправка их в Kafka-топик. + +--- + +### **Базовые параметры** +| **Параметр** | **Тип** | **Описание** | +|-----------------------|---------|---------------------------------------------------------------------------------------------------| +| `name` | String | Уникальное название элемента пайплайна. | +| `threads` | Int | Количество потоков для обработки сообщений. По умолчанию равно значению переменной `GOMAXPROC`. | +| `outputBufferSize` | Int | Размер выходного буфера. **Не используется в Sink-компонентах.** | + +--- + +## 📦 **Элементы пайплайнов "из коробки"** + +### **Generators** +| **Название** | **Описание** | +|----------------------------------------------------------------------------------------------|------------------------------------------------------------------------------| +| [**cmd**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/cmd) | Считывает вывод команды, например `"/usr/bin/log" "stream --style ndjson"`. | +| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/kafka) | Консьюмер для Apache Kafka, передает по пайплайну значения `Value`. | +| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/pipelaner) | GRPC-сервер, передает значения через [gRPC](https://github.com/pipelane/pipelaner/tree/main/proto/service.proto). | + +--- + +### **Transforms** +| **Название** | **Описание** | +|------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------| +| [**batch**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/batch) | Формирует пакеты данных заданного размера. | +| [**chunks**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/chunks) | Разбивает входящие данные на чанки. | +| [**debounce**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/debounce) | Устраняет "дребезг" (частые повторы) в данных. | +| [**filter**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/filter) | Фильтрует данные по заданным условиям. | +| [**remap**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/remap) | Переназначает поля или преобразует структуру данных. | +| [**throttling**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/throttling) | Ограничивает скорость обработки данных. | + +--- + +### **Sinks** +| **Название** | **Описание** | +|------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------| +| [**clickhouse**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/clickhouse) | Отправляет данные в базу данных ClickHouse. | +| [**console**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/console) | Выводит данные в консоль. | +| [**http**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/http) | Отправляет данные на указанный HTTP-эндпоинт. | +| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/kafka) | Публикует данные в Apache Kafka. | +| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/pipelaner) | Передает данные через gRPC к другим узлам Pipelaner. | + +--- + +## 🌐 **Масштабируемость** + +### **Одноузловое развертывание** +Для работы на одном хосте: +![Одноузловая схема](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-singlehost.png/?raw=true "Одноузловая схема") + +--- + +### **Многоузловое развертывание** +Для распределенной обработки данных на нескольких хостах: +![Многоузловая схема](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-multihost.png/?raw=true "Многоузловая схема") + +Для распределенного взаимодействия между узлами можно использовать: +1. **gRPC** — через генераторы и синки с параметром `sourceName: "pipelaner"`. +2. **Apache Kafka** — для чтения/записи данных через топики. + +Пример конфигурации с использованием Kafka: +```pkl +new Inputs.Kafka { + common { + topics { + "kafka-topic" + } + } +} + +new Sinks.Kafka { + common { + topics { + "kafka-topic" + } + } +} +``` + +--- + +## 🤝 **Поддержка** + +Если у вас есть вопросы, предложения или вы нашли ошибку, пожалуйста, [создайте Issue](https://github.com/pipelane/pipelaner/issues/new) в репозитории. +Вы также можете участвовать в обсуждениях проекта в разделе [Discussions](https://github.com/pipelane/pipelaner/discussions). + +--- + +## 📜 **Лицензия** + +Этот проект распространяется под лицензией [Apache 2.0](https://github.com/pipelane/pipelaner/blob/main/LICENSE). +Вы можете свободно использовать, изменять и распространять код при соблюдении условий лицензии. diff --git a/agent.go b/agent.go index b1554a8..3c0db00 100644 --- a/agent.go +++ b/agent.go @@ -52,7 +52,7 @@ func NewAgent(file string) (*Agent, error) { func (a *Agent) initHealthCheck(cfg *config.Pipelaner) error { hcCfg := cfg.Settings.HealthCheck - if hcCfg.Enable { + if hcCfg != nil { hc, err := health.NewHealthCheck(cfg) if err != nil { return fmt.Errorf("init health check: %w", err) @@ -64,7 +64,7 @@ func (a *Agent) initHealthCheck(cfg *config.Pipelaner) error { func (a *Agent) initMetricsServer(cfg *config.Pipelaner) error { metricsCfg := cfg.Settings.Metrics - if metricsCfg.Enable { + if metricsCfg != nil { m, err := metrics.NewMetricsServer(metricsCfg) if err != nil { return fmt.Errorf("init metrics server: %w", err) @@ -77,11 +77,15 @@ func (a *Agent) initMetricsServer(cfg *config.Pipelaner) error { func (a *Agent) initPipelaner(cfg *config.Pipelaner) error { pipelanerCfg := cfg.Pipelines logCfg := cfg.Settings.Logger - // todo: use another solution for specific parameters + metricsCfg := cfg.Settings.Metrics + mEnable := false + if metricsCfg != nil { + mEnable = true + } p, err := NewPipelaner( pipelanerCfg, logCfg, - cfg.Settings.Metrics.Enable, + mEnable, cfg.Settings.StartGCAfterMessageProcess, ) if err != nil { @@ -105,6 +109,7 @@ func (a *Agent) Serve(ctx context.Context) error { return a.metrics.Serve(ctx) }) } + g.Go(func() error { return a.pipelaner.Run(inputsCtx) }) diff --git a/example/README.md b/example/README.md index 59f3104..d1935c1 100644 --- a/example/README.md +++ b/example/README.md @@ -1,22 +1,147 @@ -# Использование pkl во внешних проектах -1) В проекте создать директорию pkl, в ней создать файлик PklProject, со следующим контентом: -``` + +# **Using Pipelaner in External Projects** + +This guide provides detailed instructions on how to integrate **Pkl** with **Pipelaner** in your external projects. + +--- + +## 📂 **Step 1: Configure Dependencies** + +1. Create a directory named `pkl` in your project. +2. Inside the `pkl` directory, create a file named `PklProject` with the following content: + +```pkl dependencies { ["pipelaner"] { uri = "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@x.x.x" } } ``` -2) Объявить корневой файл с конфигурацией в заголовке файла объявить: -``` -amends "@pipelaner/Pipelaner.pkl" // нужно разобраться чтобы работало так -amends "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.4#/Pipelaner.pkl" // сейчас получилось только без алиаса +Replace `x.x.x` with the required version of **Pipelaner**. + +--- + +## ⚙️ **Step 2: Configure Pipelines** + +Create a pipeline configuration file (e.g., `pipeline_config.pkl`) with the following content: + +```pkl +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/Pipelaner.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/Components.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/sink/Sinks.pkl" +import "example.pkl" + +pipelines { + new Components.Pipeline { + name = "example-pipeline" + inputs { + new example.ExampleGenInt { + name = "example-gen-int" + count = 10 + } + } + transforms { + new example.ExampleMul { + name = "example-mul" + inputs { + "example-gen-int" + } + mul = 2 + } + new example.ExampleMul { + threads = 10 + name = "example-mul2" + inputs { + "example-mul" + } + mul = 5 + } + } + sinks { + new Sinks.Console { + threads = 10 + name = "console" + inputs { + "example-mul2" + } + } + } + } +} + +settings { + gracefulShutdownDelay = 15.s + logger { + logLevel = "info" + } + healthCheck { + enable = false + host = "127.0.0.1" + port = 8080 + } + metrics { + enable = false + host = "127.0.0.1" + port = 8082 + } +} ``` -3) Конфигурируем сами пайплайны, пример в pkl/config.pkl -4) При необходимости создаем свои имплементации компонент pkl/components.pkl -5) В случае если были созданы свои компоненты необходимо сгенерировать код для их использования для файла pkl/components.pkl, используется следующая команда: + +--- + +## 🛠 **Step 3: Implement Custom Components** + +If you need custom components, create an implementation file (e.g., `pkl/example.pkl`) with the following content: + +```pkl +@go.Package {name = "gen/custom"} +module pipelaner.source.example + +import "package://pkg.pkl-lang.org/pkl-go/pkl.golang@0.8.1#/go.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/input/Inputs.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/sink/Sinks.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/transform/Transforms.pkl" + +class ExampleGenInt extends Inputs.Input { + fixed sourceName = "example-generator" + count: Int +} + +class ExampleMul extends Transforms.Transform { + fixed sourceName = "example-mul" + mul: Int +} + +class ExampleConsole extends Sinks.Sink { + fixed sourceName = "example-console" +} ``` + +--- + +## 🔧 **Step 4: Generate Code** + +If custom components were created, generate the required code using the following command: + +```shell pkl-gen-go pkl/example.pkl ``` -6) Имплементируем созданные компоненты и регистрируем их в source pipelaner'a пример в main.go \ No newline at end of file + +--- + +## 🚀 **Step 5: Implement and Register Components** + +To use the custom components in your project, implement and register them in the source of **Pipelaner**. An example implementation can be found in [main.go](https://github.com/pipelane/pipelaner/tree/main/example/main.go): + +```go +source.RegisterInput("example-generator", &GenInt{}) +source.RegisterTransform("example-mul", &TransMul{}) +``` + +--- + +## 📜 **License** + +This project is licensed under the [Apache 2.0](https://github.com/pipelane/pipelaner/blob/main/LICENSE) license. +You are free to use, modify, and distribute the code under the terms of the license. diff --git a/example/main.go b/example/main.go index 47edad5..02f0f0e 100644 --- a/example/main.go +++ b/example/main.go @@ -13,6 +13,7 @@ import ( "github.com/pipelane/pipelaner/example/pkl/gen/custom" "github.com/pipelane/pipelaner/gen/source/input" "github.com/pipelane/pipelaner/gen/source/transform" + "github.com/pipelane/pipelaner/pipeline/components" "github.com/pipelane/pipelaner/pipeline/source" _ "github.com/pipelane/pipelaner/sources" ) @@ -20,6 +21,7 @@ import ( // ============== Test generator =============== type GenInt struct { + components.Logger count int } @@ -33,16 +35,16 @@ func (g *GenInt) Init(cfg input.Input) error { } func (g *GenInt) Generate(ctx context.Context, input chan<- any) { + i := 0 for { select { case <-ctx.Done(): return default: - for i := 0; i < g.count; i++ { - input <- i - } + input <- i + i += 1 + time.Sleep(time.Second * 1) } - time.Sleep(3 * time.Second) } } @@ -55,7 +57,7 @@ type TransMul struct { func (t *TransMul) Init(cfg transform.Transform) error { tCfg, ok := cfg.(custom.ExampleMul) if !ok { - return errors.New("transform.Mul expects transform.TransMul") + return errors.New("invalid config") } t.mul = tCfg.GetMul() return nil @@ -64,7 +66,7 @@ func (t *TransMul) Init(cfg transform.Transform) error { func (t *TransMul) Transform(val any) any { v, ok := val.(int) if !ok { - return errors.New("transform.TransMul expects transform.TransMul") + return errors.New("invalid value") } return t.mul * v } diff --git a/example/pkl/PklProject b/example/pkl/PklProject index 114b7ea..7627ba4 100644 --- a/example/pkl/PklProject +++ b/example/pkl/PklProject @@ -2,6 +2,6 @@ amends "pkl:Project" dependencies { ["pipelaner"] { - uri = "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8" + uri = "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9" } } diff --git a/example/pkl/PklProject.deps.json b/example/pkl/PklProject.deps.json index 2cfbad1..4b9138e 100644 --- a/example/pkl/PklProject.deps.json +++ b/example/pkl/PklProject.deps.json @@ -3,9 +3,9 @@ "resolvedDependencies": { "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0": { "type": "remote", - "uri": "projectpackage://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8", + "uri": "projectpackage://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9", "checksums": { - "sha256": "394943b2d9f30047779a7401b3a8bed3d6973ed21ddd3bf88d1de44065ea7223" + "sha256": "1abd63fb617f2a19bb2be70feb7652f1b504e4f08fd3e678a2bc24462d0f908f" } } } diff --git a/example/pkl/config.pkl b/example/pkl/config.pkl index 497660f..599950e 100644 --- a/example/pkl/config.pkl +++ b/example/pkl/config.pkl @@ -1,40 +1,82 @@ -amends "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/Pipelaner.pkl" -import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/Components.pkl" -import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/sink/Sinks.pkl" +amends "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/Pipelaner.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/source/Components.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/source/sink/Sinks.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/source/input/Inputs.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/source/transform/Transforms.pkl" import "example.pkl" pipelines { + // new Components.Pipeline { + // name = "example-pipeline" + // inputs { + // new example.ExampleGenInt { + // name = "example-gen-int" + // count = 10 + // } + // new Inputs.KafkaConsumer { + // kafka { + // topics { + // + // } + // } + // } + // } + // transforms { + // new example.ExampleMul { + // name = "example-mul" + // inputs { + // "example-gen-int" + // } + // mul = 2 + // } + // new example.ExampleMul { + // threads = 10 + // name = "example-mul2" + // inputs { + // "example-mul" + // } + // mul = 5 + // } + // } + // sinks { + // new Sinks.Console { + // threads = 10 + // name = "console" + // inputs { + // "example-mul2" + // } + // } + // } + // } new Components.Pipeline { name = "example-pipeline" inputs { - new example.ExampleGenInt { - name = "example-gen-int" - count = 10 + new Inputs.Cmd { + name = "osx-logs" + exec { + "/usr/bin/log" + "stream --style ndjson" + } } } transforms { - new example.ExampleMul { - name = "example-mul" - inputs { - "example-gen-int" - } - mul = 2 - } - new example.ExampleMul { + new Transforms.Chunk { + name = "log-buffering" threads = 10 - name = "example-mul2" + outputBufferSize = 10_000 inputs { - "example-mul" + "osx-logs" } - mul = 5 + maxChunkSize = 1_000 + maxIdleTime = 20.s } } sinks { new Sinks.Console { threads = 10 - name = "console" + name = "console-log" inputs { - "example-mul2" + "log-buffering" } } } @@ -45,14 +87,13 @@ settings { gracefulShutdownDelay = 15.s logger { logLevel = "info" + logFormat = "json" } healthCheck { - enable = false host = "127.0.0.1" port = 8080 } metrics { - enable = false host = "127.0.0.1" port = 8082 } diff --git a/example/pkl/example.pkl b/example/pkl/example.pkl index 22171c9..b61c417 100644 --- a/example/pkl/example.pkl +++ b/example/pkl/example.pkl @@ -2,9 +2,9 @@ module pipelaner.source.example import "package://pkg.pkl-lang.org/pkl-go/pkl.golang@0.8.1#/go.pkl" -import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/input/Inputs.pkl" -import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/sink/Sinks.pkl" -import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.8#/source/transform/Transforms.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/source/input/Inputs.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/source/sink/Sinks.pkl" +import "package://pkg.pkl-lang.org/github.com/pipelane/pipelaner/pipelaner@0.0.9#/source/transform/Transforms.pkl" class ExampleGenInt extends Inputs.Input { fixed sourceName = "example-generator" diff --git a/gen/settings/healthcheck/Config.pkl.go b/gen/settings/healthcheck/Config.pkl.go index 02ba1db..7018032 100644 --- a/gen/settings/healthcheck/Config.pkl.go +++ b/gen/settings/healthcheck/Config.pkl.go @@ -11,8 +11,6 @@ type Config struct { Host string `pkl:"host"` Port int `pkl:"port"` - - Enable bool `pkl:"enable"` } // LoadFromPath loads the pkl module at the given path and evaluates it into a Config diff --git a/gen/settings/logger/loglevel/LogLevel.pkl.go b/gen/settings/logger/loglevel/LogLevel.pkl.go index c622a85..9c1058b 100644 --- a/gen/settings/logger/loglevel/LogLevel.pkl.go +++ b/gen/settings/logger/loglevel/LogLevel.pkl.go @@ -13,6 +13,7 @@ const ( Warn LogLevel = "warn" Info LogLevel = "info" Debug LogLevel = "debug" + Trace LogLevel = "trace" ) // String returns the string representation of LogLevel @@ -33,6 +34,8 @@ func (rcv *LogLevel) UnmarshalBinary(data []byte) error { *rcv = Info case "debug": *rcv = Debug + case "trace": + *rcv = Trace default: return fmt.Errorf(`illegal: "%s" is not a valid LogLevel`, str) } diff --git a/gen/settings/metrics/Config.pkl.go b/gen/settings/metrics/Config.pkl.go index 24ade6a..f819bd3 100644 --- a/gen/settings/metrics/Config.pkl.go +++ b/gen/settings/metrics/Config.pkl.go @@ -14,9 +14,7 @@ type Config struct { Path string `pkl:"path"` - ServiceName *string `pkl:"serviceName"` - - Enable bool `pkl:"enable"` + ServiceName string `pkl:"serviceName"` } // LoadFromPath loads the pkl module at the given path and evaluates it into a Config diff --git a/gen/source/common/Kafka.pkl.go b/gen/source/common/Kafka.pkl.go index e451224..87bd8fd 100644 --- a/gen/source/common/Kafka.pkl.go +++ b/gen/source/common/Kafka.pkl.go @@ -6,17 +6,15 @@ import "github.com/pipelane/pipelaner/gen/source/common/saslmechanism" type Kafka struct { SaslEnabled bool `pkl:"saslEnabled"` - SaslMechanism saslmechanism.SASLMechanism `pkl:"saslMechanism"` + SaslMechanism *saslmechanism.SASLMechanism `pkl:"saslMechanism"` SaslUsername *string `pkl:"saslUsername"` SaslPassword *string `pkl:"saslPassword"` - Brokers string `pkl:"brokers"` + Brokers []string `pkl:"brokers"` Version *string `pkl:"version"` Topics []string `pkl:"topics"` - - SchemaRegistry string `pkl:"schemaRegistry"` } diff --git a/gen/source/common/saslmechanism/SASLMechanism.pkl.go b/gen/source/common/saslmechanism/SASLMechanism.pkl.go index f05bb6a..f2a6023 100644 --- a/gen/source/common/saslmechanism/SASLMechanism.pkl.go +++ b/gen/source/common/saslmechanism/SASLMechanism.pkl.go @@ -11,7 +11,6 @@ type SASLMechanism string const ( SCRAMSHA512 SASLMechanism = "SCRAM-SHA-512" SCRAMSHA256 SASLMechanism = "SCRAM-SHA-256" - PLAIN SASLMechanism = "PLAIN" ) // String returns the string representation of SASLMechanism @@ -28,8 +27,6 @@ func (rcv *SASLMechanism) UnmarshalBinary(data []byte) error { *rcv = SCRAMSHA512 case "SCRAM-SHA-256": *rcv = SCRAMSHA256 - case "PLAIN": - *rcv = PLAIN default: return fmt.Errorf(`illegal: "%s" is not a valid SASLMechanism`, str) } diff --git a/gen/source/input/KafkaConsumer.pkl.go b/gen/source/input/Kafka.pkl.go similarity index 60% rename from gen/source/input/KafkaConsumer.pkl.go rename to gen/source/input/Kafka.pkl.go index 0aca18c..5f11c88 100644 --- a/gen/source/input/KafkaConsumer.pkl.go +++ b/gen/source/input/Kafka.pkl.go @@ -8,10 +8,10 @@ import ( "github.com/pipelane/pipelaner/gen/source/input/strategy" ) -type KafkaConsumer interface { +type Kafka interface { Input - GetKafka() *common.Kafka + GetCommon() *common.Kafka GetAutoCommitEnabled() bool @@ -26,12 +26,12 @@ type KafkaConsumer interface { GetFetchMaxBytes() *pkl.DataSize } -var _ KafkaConsumer = (*KafkaConsumerImpl)(nil) +var _ Kafka = (*KafkaImpl)(nil) -type KafkaConsumerImpl struct { +type KafkaImpl struct { SourceName string `pkl:"sourceName"` - Kafka *common.Kafka `pkl:"kafka"` + Common *common.Kafka `pkl:"common"` AutoCommitEnabled bool `pkl:"autoCommitEnabled"` @@ -52,46 +52,46 @@ type KafkaConsumerImpl struct { OutputBufferSize int `pkl:"outputBufferSize"` } -func (rcv *KafkaConsumerImpl) GetSourceName() string { +func (rcv *KafkaImpl) GetSourceName() string { return rcv.SourceName } -func (rcv *KafkaConsumerImpl) GetKafka() *common.Kafka { - return rcv.Kafka +func (rcv *KafkaImpl) GetCommon() *common.Kafka { + return rcv.Common } -func (rcv *KafkaConsumerImpl) GetAutoCommitEnabled() bool { +func (rcv *KafkaImpl) GetAutoCommitEnabled() bool { return rcv.AutoCommitEnabled } -func (rcv *KafkaConsumerImpl) GetConsumerGroupID() string { +func (rcv *KafkaImpl) GetConsumerGroupID() string { return rcv.ConsumerGroupID } -func (rcv *KafkaConsumerImpl) GetAutoOffsetReset() autooffsetreset.AutoOffsetReset { +func (rcv *KafkaImpl) GetAutoOffsetReset() autooffsetreset.AutoOffsetReset { return rcv.AutoOffsetReset } -func (rcv *KafkaConsumerImpl) GetBalancerStrategy() []strategy.Strategy { +func (rcv *KafkaImpl) GetBalancerStrategy() []strategy.Strategy { return rcv.BalancerStrategy } -func (rcv *KafkaConsumerImpl) GetMaxPartitionFetchBytes() *pkl.DataSize { +func (rcv *KafkaImpl) GetMaxPartitionFetchBytes() *pkl.DataSize { return rcv.MaxPartitionFetchBytes } -func (rcv *KafkaConsumerImpl) GetFetchMaxBytes() *pkl.DataSize { +func (rcv *KafkaImpl) GetFetchMaxBytes() *pkl.DataSize { return rcv.FetchMaxBytes } -func (rcv *KafkaConsumerImpl) GetName() string { +func (rcv *KafkaImpl) GetName() string { return rcv.Name } -func (rcv *KafkaConsumerImpl) GetThreads() int { +func (rcv *KafkaImpl) GetThreads() int { return rcv.Threads } -func (rcv *KafkaConsumerImpl) GetOutputBufferSize() int { +func (rcv *KafkaImpl) GetOutputBufferSize() int { return rcv.OutputBufferSize } diff --git a/gen/source/input/init.pkl.go b/gen/source/input/init.pkl.go index a823a9e..69e1d1e 100644 --- a/gen/source/input/init.pkl.go +++ b/gen/source/input/init.pkl.go @@ -6,6 +6,6 @@ import "github.com/apple/pkl-go/pkl" func init() { pkl.RegisterMapping("com.pipelaner.source.inputs", Inputs{}) pkl.RegisterMapping("com.pipelaner.source.inputs#Cmd", CmdImpl{}) - pkl.RegisterMapping("com.pipelaner.source.inputs#KafkaConsumer", KafkaConsumerImpl{}) + pkl.RegisterMapping("com.pipelaner.source.inputs#Kafka", KafkaImpl{}) pkl.RegisterMapping("com.pipelaner.source.inputs#Pipelaner", PipelanerImpl{}) } diff --git a/gen/source/sink/Clickhouse.pkl.go b/gen/source/sink/Clickhouse.pkl.go index d035ad5..0e6c274 100644 --- a/gen/source/sink/Clickhouse.pkl.go +++ b/gen/source/sink/Clickhouse.pkl.go @@ -1,8 +1,6 @@ // Code generated from Pkl module `com.pipelaner.source.sinks`. DO NOT EDIT. package sink -import "github.com/apple/pkl-go/pkl" - type Clickhouse interface { Sink @@ -14,26 +12,6 @@ type Clickhouse interface { GetDatabase() string - GetMigrationEngine() string - - GetMigrationsPathClickhouse() string - - GetMaxExecutionTime() *pkl.Duration - - GetCannMaxLifeTime() *pkl.Duration - - GetDialTimeout() *pkl.Duration - - GetMaxOpenConns() int - - GetMaxIdleConns() int - - GetBlockBufferSize() uint8 - - GetMaxCompressionBuffer() *pkl.DataSize - - GetEnableDebug() bool - GetTableName() string GetAsyncInsert() string @@ -54,26 +32,6 @@ type ClickhouseImpl struct { Database string `pkl:"database"` - MigrationEngine string `pkl:"migrationEngine"` - - MigrationsPathClickhouse string `pkl:"migrationsPathClickhouse"` - - MaxExecutionTime *pkl.Duration `pkl:"maxExecutionTime"` - - CannMaxLifeTime *pkl.Duration `pkl:"cannMaxLifeTime"` - - DialTimeout *pkl.Duration `pkl:"dialTimeout"` - - MaxOpenConns int `pkl:"maxOpenConns"` - - MaxIdleConns int `pkl:"maxIdleConns"` - - BlockBufferSize uint8 `pkl:"blockBufferSize"` - - MaxCompressionBuffer *pkl.DataSize `pkl:"maxCompressionBuffer"` - - EnableDebug bool `pkl:"enableDebug"` - TableName string `pkl:"tableName"` AsyncInsert string `pkl:"asyncInsert"` @@ -107,46 +65,6 @@ func (rcv *ClickhouseImpl) GetDatabase() string { return rcv.Database } -func (rcv *ClickhouseImpl) GetMigrationEngine() string { - return rcv.MigrationEngine -} - -func (rcv *ClickhouseImpl) GetMigrationsPathClickhouse() string { - return rcv.MigrationsPathClickhouse -} - -func (rcv *ClickhouseImpl) GetMaxExecutionTime() *pkl.Duration { - return rcv.MaxExecutionTime -} - -func (rcv *ClickhouseImpl) GetCannMaxLifeTime() *pkl.Duration { - return rcv.CannMaxLifeTime -} - -func (rcv *ClickhouseImpl) GetDialTimeout() *pkl.Duration { - return rcv.DialTimeout -} - -func (rcv *ClickhouseImpl) GetMaxOpenConns() int { - return rcv.MaxOpenConns -} - -func (rcv *ClickhouseImpl) GetMaxIdleConns() int { - return rcv.MaxIdleConns -} - -func (rcv *ClickhouseImpl) GetBlockBufferSize() uint8 { - return rcv.BlockBufferSize -} - -func (rcv *ClickhouseImpl) GetMaxCompressionBuffer() *pkl.DataSize { - return rcv.MaxCompressionBuffer -} - -func (rcv *ClickhouseImpl) GetEnableDebug() bool { - return rcv.EnableDebug -} - func (rcv *ClickhouseImpl) GetTableName() string { return rcv.TableName } diff --git a/gen/source/sink/Console.pkl.go b/gen/source/sink/Console.pkl.go index c50fbab..a787d0a 100644 --- a/gen/source/sink/Console.pkl.go +++ b/gen/source/sink/Console.pkl.go @@ -1,8 +1,12 @@ // Code generated from Pkl module `com.pipelaner.source.sinks`. DO NOT EDIT. package sink +import "github.com/pipelane/pipelaner/gen/settings/logger/logformat" + type Console interface { Sink + + GetLogFormat() logformat.LogFormat } var _ Console = (*ConsoleImpl)(nil) @@ -10,6 +14,8 @@ var _ Console = (*ConsoleImpl)(nil) type ConsoleImpl struct { SourceName string `pkl:"sourceName"` + LogFormat logformat.LogFormat `pkl:"logFormat"` + Name string `pkl:"name"` Inputs []string `pkl:"inputs"` @@ -21,6 +27,10 @@ func (rcv *ConsoleImpl) GetSourceName() string { return rcv.SourceName } +func (rcv *ConsoleImpl) GetLogFormat() logformat.LogFormat { + return rcv.LogFormat +} + func (rcv *ConsoleImpl) GetName() string { return rcv.Name } diff --git a/gen/source/sink/Http.pkl.go b/gen/source/sink/Http.pkl.go new file mode 100644 index 0000000..ed77fa7 --- /dev/null +++ b/gen/source/sink/Http.pkl.go @@ -0,0 +1,52 @@ +// Code generated from Pkl module `com.pipelaner.source.sinks`. DO NOT EDIT. +package sink + +import "github.com/pipelane/pipelaner/gen/source/sink/method" + +type Http interface { + Sink + + GetUrl() string + + GetMethod() method.Method +} + +var _ Http = (*HttpImpl)(nil) + +type HttpImpl struct { + SourceName string `pkl:"sourceName"` + + Url string `pkl:"url"` + + Method method.Method `pkl:"method"` + + Name string `pkl:"name"` + + Inputs []string `pkl:"inputs"` + + Threads int `pkl:"threads"` +} + +func (rcv *HttpImpl) GetSourceName() string { + return rcv.SourceName +} + +func (rcv *HttpImpl) GetUrl() string { + return rcv.Url +} + +func (rcv *HttpImpl) GetMethod() method.Method { + return rcv.Method +} + +func (rcv *HttpImpl) GetName() string { + return rcv.Name +} + +func (rcv *HttpImpl) GetInputs() []string { + return rcv.Inputs +} + +func (rcv *HttpImpl) GetThreads() int { + return rcv.Threads +} diff --git a/gen/source/sink/KafkaProducer.pkl.go b/gen/source/sink/Kafka.pkl.go similarity index 53% rename from gen/source/sink/KafkaProducer.pkl.go rename to gen/source/sink/Kafka.pkl.go index a64f63b..ca1fc16 100644 --- a/gen/source/sink/KafkaProducer.pkl.go +++ b/gen/source/sink/Kafka.pkl.go @@ -6,10 +6,10 @@ import ( "github.com/pipelane/pipelaner/gen/source/common" ) -type KafkaProducer interface { +type Kafka interface { Sink - GetKafka() *common.Kafka + GetCommon() *common.Kafka GetMaxRequestSize() *pkl.DataSize @@ -18,12 +18,12 @@ type KafkaProducer interface { GetBatchNumMessages() int } -var _ KafkaProducer = (*KafkaProducerImpl)(nil) +var _ Kafka = (*KafkaImpl)(nil) -type KafkaProducerImpl struct { +type KafkaImpl struct { SourceName string `pkl:"sourceName"` - Kafka *common.Kafka `pkl:"kafka"` + Common *common.Kafka `pkl:"common"` MaxRequestSize *pkl.DataSize `pkl:"maxRequestSize"` @@ -38,34 +38,34 @@ type KafkaProducerImpl struct { Threads int `pkl:"threads"` } -func (rcv *KafkaProducerImpl) GetSourceName() string { +func (rcv *KafkaImpl) GetSourceName() string { return rcv.SourceName } -func (rcv *KafkaProducerImpl) GetKafka() *common.Kafka { - return rcv.Kafka +func (rcv *KafkaImpl) GetCommon() *common.Kafka { + return rcv.Common } -func (rcv *KafkaProducerImpl) GetMaxRequestSize() *pkl.DataSize { +func (rcv *KafkaImpl) GetMaxRequestSize() *pkl.DataSize { return rcv.MaxRequestSize } -func (rcv *KafkaProducerImpl) GetLingerMs() *pkl.Duration { +func (rcv *KafkaImpl) GetLingerMs() *pkl.Duration { return rcv.LingerMs } -func (rcv *KafkaProducerImpl) GetBatchNumMessages() int { +func (rcv *KafkaImpl) GetBatchNumMessages() int { return rcv.BatchNumMessages } -func (rcv *KafkaProducerImpl) GetName() string { +func (rcv *KafkaImpl) GetName() string { return rcv.Name } -func (rcv *KafkaProducerImpl) GetInputs() []string { +func (rcv *KafkaImpl) GetInputs() []string { return rcv.Inputs } -func (rcv *KafkaProducerImpl) GetThreads() int { +func (rcv *KafkaImpl) GetThreads() int { return rcv.Threads } diff --git a/gen/source/sink/init.pkl.go b/gen/source/sink/init.pkl.go index d1bc3eb..e6c3010 100644 --- a/gen/source/sink/init.pkl.go +++ b/gen/source/sink/init.pkl.go @@ -7,6 +7,7 @@ func init() { pkl.RegisterMapping("com.pipelaner.source.sinks", Sinks{}) pkl.RegisterMapping("com.pipelaner.source.sinks#Console", ConsoleImpl{}) pkl.RegisterMapping("com.pipelaner.source.sinks#Pipelaner", PipelanerImpl{}) - pkl.RegisterMapping("com.pipelaner.source.sinks#KafkaProducer", KafkaProducerImpl{}) + pkl.RegisterMapping("com.pipelaner.source.sinks#Kafka", KafkaImpl{}) pkl.RegisterMapping("com.pipelaner.source.sinks#Clickhouse", ClickhouseImpl{}) + pkl.RegisterMapping("com.pipelaner.source.sinks#Http", HttpImpl{}) } diff --git a/gen/source/sink/method/Method.pkl.go b/gen/source/sink/method/Method.pkl.go new file mode 100644 index 0000000..0a656cd --- /dev/null +++ b/gen/source/sink/method/Method.pkl.go @@ -0,0 +1,43 @@ +// Code generated from Pkl module `com.pipelaner.source.sinks`. DO NOT EDIT. +package method + +import ( + "encoding" + "fmt" +) + +type Method string + +const ( + PATCH Method = "PATCH" + POST Method = "POST" + PUT Method = "PUT" + DELETE Method = "DELETE" + GET Method = "GET" +) + +// String returns the string representation of Method +func (rcv Method) String() string { + return string(rcv) +} + +var _ encoding.BinaryUnmarshaler = new(Method) + +// UnmarshalBinary implements encoding.BinaryUnmarshaler for Method. +func (rcv *Method) UnmarshalBinary(data []byte) error { + switch str := string(data); str { + case "PATCH": + *rcv = PATCH + case "POST": + *rcv = POST + case "PUT": + *rcv = PUT + case "DELETE": + *rcv = DELETE + case "GET": + *rcv = GET + default: + return fmt.Errorf(`illegal: "%s" is not a valid Method`, str) + } + return nil +} diff --git a/go.mod b/go.mod index 7d21a49..74ef640 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/apple/pkl-go v0.8.1 github.com/expr-lang/expr v1.16.9 github.com/go-faker/faker/v4 v4.5.0 + github.com/go-resty/resty/v2 v2.16.2 github.com/google/uuid v1.6.0 github.com/prometheus/client_golang v1.20.5 github.com/rs/zerolog v1.33.0 diff --git a/go.sum b/go.sum index 17d5e19..3fa1abe 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-resty/resty/v2 v2.16.2 h1:CpRqTjIzq/rweXUt9+GxzzQdlkqMdt8Lm/fuK/CAbAg= +github.com/go-resty/resty/v2 v2.16.2/go.mod h1:0fHAoK7JoBy/Ch36N8VFeMsK7xQOHhvWaC3iOktwmIU= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= @@ -165,6 +167,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= +golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 1cd9c9e..cc29376 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -40,12 +40,12 @@ func NewLoggerWithCfg(cfg *logger.Config) (*zerolog.Logger, error) { writers = append(writers, newRollingFile(cfg)) } mw := io.MultiWriter(writers...) - logger := zerolog. + l := zerolog. New(mw). Level(level). With().Timestamp(). Logger() - return &logger, nil + return &l, nil } func newRollingFile(cfg *logger.Config) io.Writer { diff --git a/internal/utils/channels.go b/internal/utils/channels.go index d8a6dd3..3412275 100644 --- a/internal/utils/channels.go +++ b/internal/utils/channels.go @@ -11,26 +11,30 @@ import ( "github.com/LastPossum/kamino" ) -func MergeChannels(channels []chan any) chan any { - var wg sync.WaitGroup - out := make(chan any, len(channels)) - - for _, ch := range channels { - wg.Add(1) - go func() { - for msg := range ch { - out <- msg - } - wg.Done() - }() +func MergeInputs[T any](chs ...chan T) chan T { + if len(chs) == 1 { + return chs[0] } - + lens := 0 + for i := range chs { + lens += cap(chs[i]) + } + res := make(chan T, lens) + gr := sync.WaitGroup{} + gr.Add(len(chs)) go func() { - wg.Wait() - close(out) + gr.Wait() + close(res) }() - - return out + for _, ch := range chs { + go func(c chan T) { + defer gr.Done() + for v := range c { + res <- v + } + }(ch) + } + return res } func BroadcastChannels(outputs []chan any, ch chan any) { diff --git a/pipelaner.go b/pipelaner.go index 88b958f..1392994 100644 --- a/pipelaner.go +++ b/pipelaner.go @@ -8,6 +8,7 @@ import ( logCfg "github.com/pipelane/pipelaner/gen/settings/logger" "github.com/pipelane/pipelaner/internal/logger" pipelines "github.com/pipelane/pipelaner/pipeline" + "golang.org/x/sync/errgroup" ) type pipeline interface { @@ -43,10 +44,11 @@ func NewPipelaner( } func (p *Pipelaner) Run(ctx context.Context) error { + gr := errgroup.Group{} for _, pipe := range p.Pipelines { - if err := pipe.Run(ctx); err != nil { - return err - } + gr.Go(func() error { + return pipe.Run(ctx) + }) } - return nil + return gr.Wait() } diff --git a/pipeline/node/input.go b/pipeline/node/input.go index 9862813..1a67365 100644 --- a/pipeline/node/input.go +++ b/pipeline/node/input.go @@ -94,6 +94,7 @@ func (i *Input) Run(ctx context.Context) error { } input := make(chan any, i.cfg.outBufferSize*len(i.outChannels)) + go func() { defer func() { for _, channel := range i.outChannels { @@ -105,7 +106,7 @@ func (i *Input) Run(ctx context.Context) error { for _, ch := range i.outChannels { m, err := i.prepareMessage(msg) if err != nil { - i.logger.Error().Err(err).Msg("prepare message failed") + i.logger.Debug().Err(err).Msg("prepare message failed") continue } i.preSendMessageAction(len(input), len(input)) @@ -119,6 +120,7 @@ func (i *Input) Run(ctx context.Context) error { defer close(input) i.impl.Generate(ctx, input) }() + return nil } @@ -136,6 +138,9 @@ func (i *Input) prepareMessage(msg any) (any, error) { kind := reflect.TypeOf(msg).Kind() switch kind { case reflect.Pointer, reflect.Slice, reflect.Map, reflect.Struct: + if len(i.outChannels) == 1 { + return msg, nil + } mes, err := kamino.Clone(msg) if err != nil { return nil, err diff --git a/pipeline/node/sink.go b/pipeline/node/sink.go index c4fed91..8b108f0 100644 --- a/pipeline/node/sink.go +++ b/pipeline/node/sink.go @@ -75,7 +75,7 @@ func NewSink( threadsCount: cfg.GetThreads(), nodeCfg: buildOptions(opts...), }, - logger: logger.With().Logger(), + logger: l.With().Logger(), }, nil } @@ -88,14 +88,16 @@ func (s *Sink) GetInputs() []string { } func (s *Sink) Run() error { + if len(s.inputChannels) == 0 { return errors.New("no input channels configured") } - inChannel := utils.MergeChannels(s.inputChannels) + inChannel := utils.MergeInputs(s.inputChannels...) sema := utils.NewSemaphore(s.cfg.threadsCount) go func() { + s.logger.Debug().Msg("start sink") for msg := range inChannel { // process message in separated goroutine sema.Acquire() @@ -106,8 +108,9 @@ func (s *Sink) Run() error { s.postSinkAction() }() } - s.logger.Debug().Msg("input channels processed") + s.logger.Debug().Msg("stop sink") }() + return nil } diff --git a/pipeline/node/transform.go b/pipeline/node/transform.go index d943a32..52d54fa 100644 --- a/pipeline/node/transform.go +++ b/pipeline/node/transform.go @@ -113,6 +113,7 @@ func (t *Transform) GetOutputBufferSize() int { // Run non-blocking call that start Transform node action in separated goroutine. func (t *Transform) Run() error { + if len(t.inputChannels) == 0 { return fmt.Errorf("no input channels configured for '%s'", t.cfg.name) } @@ -121,9 +122,10 @@ func (t *Transform) Run() error { } sema := utils.NewSemaphore(t.cfg.threadsCount) - inChannel := utils.MergeChannels(t.inputChannels) + inChannel := utils.MergeInputs(t.inputChannels...) go func() { + t.logger.Debug().Msg("starting transform") var wg sync.WaitGroup for msg := range inChannel { @@ -138,13 +140,13 @@ func (t *Transform) Run() error { if t.cfg.enableMetrics { metrics.TotalTransformationError.WithLabelValues(transformNodeType, t.cfg.name).Inc() } - t.logger.Error().Err(e).Msg("received error") + t.logger.Debug().Err(e).Msg("received error") return } for _, ch := range t.outChannels { mes, err := t.prepareMessage(msg) if err != nil { - t.logger.Error().Err(err).Msg("prepare message to send") + t.logger.Debug().Err(err).Msg("skip nil message transform") continue } t.preSendMessageAction(len(ch), cap(ch)) @@ -152,11 +154,11 @@ func (t *Transform) Run() error { } }() } - t.logger.Debug().Msg("input channels processed") wg.Wait() for _, ch := range t.outChannels { close(ch) } + t.logger.Debug().Msg("stop transform") }() return nil } diff --git a/pkl/settings/Settings.pkl b/pkl/settings/Settings.pkl index 9cc64a4..31125fa 100644 --- a/pkl/settings/Settings.pkl +++ b/pkl/settings/Settings.pkl @@ -7,7 +7,7 @@ import "logger/LoggerConfig.pkl" import "metrics/MetricsConfig.pkl" logger: LoggerConfig -healthCheck: HealthcheckConfig -metrics: MetricsConfig +healthCheck: HealthcheckConfig? +metrics: MetricsConfig? startGCAfterMessageProcess: Boolean = false gracefulShutdownDelay:Duration = 15.s \ No newline at end of file diff --git a/pkl/settings/healthcheck/HealthcheckConfig.pkl b/pkl/settings/healthcheck/HealthcheckConfig.pkl index b292d5b..96cfcaf 100644 --- a/pkl/settings/healthcheck/HealthcheckConfig.pkl +++ b/pkl/settings/healthcheck/HealthcheckConfig.pkl @@ -6,5 +6,3 @@ import "package://pkg.pkl-lang.org/pkl-go/pkl.golang@0.8.1#/go.pkl" host: String = "localhost" port: Int(this > 0) = 82 - -enable: Boolean = false diff --git a/pkl/settings/logger/LoggerConfig.pkl b/pkl/settings/logger/LoggerConfig.pkl index 8ad9a17..fc37805 100644 --- a/pkl/settings/logger/LoggerConfig.pkl +++ b/pkl/settings/logger/LoggerConfig.pkl @@ -1,9 +1,9 @@ -@go.Package {name = "github.com/pipelane/pipelaner/gen/settings/logger"} +@go.Package { name = "github.com/pipelane/pipelaner/gen/settings/logger" } module com.pipelaner.settings.logger.config import "package://pkg.pkl-lang.org/pkl-go/pkl.golang@0.8.1#/go.pkl" -typealias LogLevel = "error"|"warn"|"info"|"debug" +typealias LogLevel = "error"|"warn"|"info"|"debug"|"trace" logLevel: LogLevel = "info" diff --git a/pkl/settings/metrics/MetricsConfig.pkl b/pkl/settings/metrics/MetricsConfig.pkl index 79fcf64..58d83ae 100644 --- a/pkl/settings/metrics/MetricsConfig.pkl +++ b/pkl/settings/metrics/MetricsConfig.pkl @@ -9,7 +9,4 @@ port: Int = 8090 path: String = "/metrics" -// non empty only if metrics enable -serviceName: String? - -enable: Boolean = false \ No newline at end of file +serviceName: String \ No newline at end of file diff --git a/pkl/source/sink/Sinks.pkl b/pkl/source/sink/Sinks.pkl index 539a361..b0e9c61 100644 --- a/pkl/source/sink/Sinks.pkl +++ b/pkl/source/sink/Sinks.pkl @@ -3,6 +3,7 @@ module com.pipelaner.source.sinks import "package://pkg.pkl-lang.org/pkl-go/pkl.golang@0.8.1#/go.pkl" import ".../source/Common.pkl" +import ".../settings/logger/LoggerConfig.pkl" abstract class Sink { name: String @@ -13,6 +14,7 @@ abstract class Sink { class Console extends Sink { fixed sourceName = "console" + logFormat: LoggerConfig.LogFormat = "plain" } class Pipelaner extends Sink { diff --git a/sources/embedded.go b/sources/embedded.go index 1ee9fef..4298829 100644 --- a/sources/embedded.go +++ b/sources/embedded.go @@ -11,6 +11,7 @@ import ( _ "github.com/pipelane/pipelaner/sources/generator/pipelaner" _ "github.com/pipelane/pipelaner/sources/sink/clickhouse" _ "github.com/pipelane/pipelaner/sources/sink/console" + _ "github.com/pipelane/pipelaner/sources/sink/http" _ "github.com/pipelane/pipelaner/sources/sink/kafka" _ "github.com/pipelane/pipelaner/sources/sink/pipelaner" _ "github.com/pipelane/pipelaner/sources/transform/batch" diff --git a/sources/generator/README.md b/sources/generator/README.md new file mode 100644 index 0000000..7d5fe7a --- /dev/null +++ b/sources/generator/README.md @@ -0,0 +1 @@ +# Generators diff --git a/sources/generator/cmd/README.md b/sources/generator/cmd/README.md new file mode 100644 index 0000000..e4a380b --- /dev/null +++ b/sources/generator/cmd/README.md @@ -0,0 +1 @@ +# Cmd diff --git a/sources/generator/kafka/README.md b/sources/generator/kafka/README.md new file mode 100644 index 0000000..2eb9d10 --- /dev/null +++ b/sources/generator/kafka/README.md @@ -0,0 +1 @@ +# Kafka diff --git a/sources/generator/kafka/consumer.go b/sources/generator/kafka/consumer.go index da4243a..e87d85f 100644 --- a/sources/generator/kafka/consumer.go +++ b/sources/generator/kafka/consumer.go @@ -20,17 +20,17 @@ type Consumer struct { } func NewConsumer( - cfg input.KafkaConsumer, + cfg input.Kafka, logger *zerolog.Logger, ) (*Consumer, error) { v := cfg.GetMaxPartitionFetchBytes().ToUnit(pkl.Bytes).Value maxByteFetch := cfg.GetFetchMaxBytes().ToUnit(pkl.Bytes).Value opts := []kgo.Opt{ - kgo.SeedBrokers(cfg.GetKafka().Brokers), + kgo.SeedBrokers(cfg.GetCommon().Brokers...), kgo.WithLogger(kzerolog.New(logger)), kgo.ConsumerGroup(cfg.GetConsumerGroupID()), - kgo.ConsumeTopics(cfg.GetKafka().Topics...), + kgo.ConsumeTopics(cfg.GetCommon().Topics...), kgo.FetchMaxBytes(int32(maxByteFetch)), kgo.FetchMaxPartitionBytes(int32(v)), kgo.HeartbeatInterval(time.Second), @@ -63,13 +63,13 @@ func NewConsumer( opts = append(opts, kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd())) } - if cfg.GetKafka().SaslEnabled { + if cfg.GetCommon().SaslEnabled { auth := scram.Auth{ - User: *cfg.GetKafka().SaslUsername, - Pass: *cfg.GetKafka().SaslPassword, + User: *cfg.GetCommon().SaslUsername, + Pass: *cfg.GetCommon().SaslPassword, } var authOpt kgo.Opt - switch cfg.GetKafka().SaslMechanism { + switch *cfg.GetCommon().SaslMechanism { case saslmechanism.SCRAMSHA512: authOpt = kgo.SASL(auth.AsSha512Mechanism()) case saslmechanism.SCRAMSHA256: diff --git a/sources/generator/kafka/kafka.go b/sources/generator/kafka/kafka.go index 7526d9e..78b303c 100644 --- a/sources/generator/kafka/kafka.go +++ b/sources/generator/kafka/kafka.go @@ -11,17 +11,17 @@ import ( ) func init() { - source.RegisterInput("kafka-consumer", &Kafka{}) + source.RegisterInput("kafka", &Kafka{}) } type Kafka struct { components.Logger cons *Consumer - cfg input.KafkaConsumer + cfg input.Kafka } func (c *Kafka) Init(cfg input.Input) error { - consumerCfg, ok := cfg.(input.KafkaConsumer) + consumerCfg, ok := cfg.(input.Kafka) if !ok { return fmt.Errorf("invalid cafka config type: %T", cfg) } diff --git a/sources/generator/pipelaner/README.md b/sources/generator/pipelaner/README.md new file mode 100644 index 0000000..6de160d --- /dev/null +++ b/sources/generator/pipelaner/README.md @@ -0,0 +1 @@ +# Pipelaner diff --git a/sources/shared/chunker/chunker.go b/sources/shared/chunker/chunker.go index c1fad40..3a70553 100644 --- a/sources/shared/chunker/chunker.go +++ b/sources/shared/chunker/chunker.go @@ -1,6 +1,7 @@ package chunker import ( + "context" "sync" "sync/atomic" "time" @@ -14,9 +15,9 @@ type Config struct { type Chunks struct { Cfg Config buffers chan chan any - stopped atomic.Bool input chan any wg sync.WaitGroup + cancel context.CancelFunc } func NewChunks(cfg Config) *Chunks { @@ -38,12 +39,9 @@ func (c *Chunks) resetChannels() { c.buffers = make(chan chan any, c.Cfg.BufferSize) } -func (c *Chunks) send(ch chan any) { - c.buffers <- ch -} func (c *Chunks) NewChunk() chan any { b := make(chan any, c.Cfg.MaxChunkSize) - c.send(b) + c.buffers <- b return b } @@ -54,39 +52,35 @@ func (c *Chunks) Chunk() chan any { return <-c.buffers } func (c *Chunks) Stop() { - c.stopped.Store(true) + c.cancel() } func (c *Chunks) Generator() { - timer := time.NewTicker(c.Cfg.MaxIdleTime) - stop := make(chan struct{}, 1) + timer := time.NewTimer(c.Cfg.MaxIdleTime) + stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + go c.startProcessing(stop, timer) go func() { - for { - if c.stopped.Load() { - c.wg.Wait() - timer.Stop() - stop <- struct{}{} - close(c.buffers) - close(c.input) - close(stop) - break - } - } + <-ctx.Done() + c.wg.Wait() + close(c.input) + timer.Stop() + close(c.buffers) + stop <- struct{}{} + close(stop) }() - go c.startProcessing(stop, timer) } -func (c *Chunks) startProcessing(stop chan struct{}, timer *time.Ticker) { +func (c *Chunks) startProcessing(onClose chan struct{}, timer *time.Timer) { buffer := c.NewChunk() counter := atomic.Int64{} counter.Store(0) - breaks := false - for !breaks { +Loop: + for { select { - case <-stop: - close(buffer) - breaks = true - continue + case <-onClose: + break Loop case <-timer.C: if counter.Load() == 0 { timer.Reset(c.Cfg.MaxIdleTime) @@ -96,20 +90,20 @@ func (c *Chunks) startProcessing(stop chan struct{}, timer *time.Ticker) { counter.Store(0) buffer = c.NewChunk() timer.Reset(c.Cfg.MaxIdleTime) - default: - msg, ok := <-c.input + case msg, ok := <-c.input: if !ok && msg == nil { continue } timer.Reset(c.Cfg.MaxIdleTime) buffer <- msg counter.Add(1) - if counter.Load() >= int64(c.Cfg.MaxChunkSize) { - counter.Store(0) + if counter.Load() == int64(c.Cfg.MaxChunkSize) { close(buffer) + counter.Store(0) buffer = c.NewChunk() } c.wg.Done() } } + close(buffer) } diff --git a/sources/sink/README.md b/sources/sink/README.md new file mode 100644 index 0000000..851cd96 --- /dev/null +++ b/sources/sink/README.md @@ -0,0 +1 @@ +# Sinks diff --git a/sources/sink/clickhouse/README.md b/sources/sink/clickhouse/README.md new file mode 100644 index 0000000..f3acc50 --- /dev/null +++ b/sources/sink/clickhouse/README.md @@ -0,0 +1 @@ +# Clickhouse diff --git a/sources/sink/clickhouse/client.go b/sources/sink/clickhouse/client.go index 01dc14c..7da883b 100644 --- a/sources/sink/clickhouse/client.go +++ b/sources/sink/clickhouse/client.go @@ -15,7 +15,6 @@ type Client struct { func NewClickhouseClient(ctx context.Context, cfg sink.Clickhouse) (*Client, error) { conn, err := chpool.Dial(ctx, chpool.Options{ - ClientOptions: ch.Options{ Address: cfg.GetAddress(), Database: cfg.GetDatabase(), diff --git a/sources/sink/console/README.md b/sources/sink/console/README.md new file mode 100644 index 0000000..a495787 --- /dev/null +++ b/sources/sink/console/README.md @@ -0,0 +1 @@ +# Console output diff --git a/sources/sink/console/console.go b/sources/sink/console/console.go index 5cca54c..981a97f 100644 --- a/sources/sink/console/console.go +++ b/sources/sink/console/console.go @@ -7,9 +7,12 @@ package console import ( "fmt" + config "github.com/pipelane/pipelaner/gen/settings/logger" + "github.com/pipelane/pipelaner/gen/settings/logger/loglevel" "github.com/pipelane/pipelaner/gen/source/sink" - "github.com/pipelane/pipelaner/pipeline/components" + logger "github.com/pipelane/pipelaner/internal/logger" "github.com/pipelane/pipelaner/pipeline/source" + "github.com/rs/zerolog" ) func init() { @@ -17,15 +20,29 @@ func init() { } type Console struct { - components.Logger + l *zerolog.Logger } func (c *Console) Init(cfg sink.Sink) error { - _, ok := cfg.(sink.Console) + cCfg, ok := cfg.(sink.Console) if !ok { return fmt.Errorf("invalid console config type: %T", cfg) } - + lCfg := config.Config{ + LogLevel: loglevel.Info, + EnableConsole: true, + LogFormat: cCfg.GetLogFormat(), + } + l, err := logger.NewLoggerWithCfg(&lCfg) + if err != nil { + return err + } + logs := l.With(). + Str("source", cfg.GetSourceName()). + Str("type", "sink"). + Str("lane_name", cfg.GetName()). + Logger() + c.l = &logs return nil } @@ -47,6 +64,6 @@ func (c *Console) Sink(val any) { } return default: - c.Log().Info().Msg(fmt.Sprintf("%v", val)) + c.l.Info().Msg(fmt.Sprintf("%v", val)) } } diff --git a/sources/sink/http/README.md b/sources/sink/http/README.md new file mode 100644 index 0000000..de54a45 --- /dev/null +++ b/sources/sink/http/README.md @@ -0,0 +1 @@ +# Http request diff --git a/sources/sink/kafka/README.md b/sources/sink/kafka/README.md new file mode 100644 index 0000000..5907da2 --- /dev/null +++ b/sources/sink/kafka/README.md @@ -0,0 +1 @@ +# Kafka sink diff --git a/sources/sink/kafka/kafka.go b/sources/sink/kafka/kafka.go index 45ebd56..69a66e2 100644 --- a/sources/sink/kafka/kafka.go +++ b/sources/sink/kafka/kafka.go @@ -21,12 +21,12 @@ func init() { type Kafka struct { components.Logger - cfg sink.KafkaProducer + cfg sink.Kafka prod *Producer } func (k *Kafka) Init(cfg sink.Sink) error { - kafkaCfg, ok := cfg.(sink.KafkaProducer) + kafkaCfg, ok := cfg.(sink.Kafka) if !ok { return fmt.Errorf("invalid kafka-producer config %T", cfg) } @@ -41,7 +41,7 @@ func (k *Kafka) Init(cfg sink.Sink) error { } func (k *Kafka) write(ctx context.Context, message []byte) { - for _, topic := range k.cfg.GetKafka().Topics { + for _, topic := range k.cfg.GetCommon().Topics { k.prod.Produce(ctx, &kgo.Record{ Value: message, Topic: topic, diff --git a/sources/sink/kafka/producer.go b/sources/sink/kafka/producer.go index f273af5..6f8c37e 100644 --- a/sources/sink/kafka/producer.go +++ b/sources/sink/kafka/producer.go @@ -20,30 +20,30 @@ type Producer struct { } func NewProducer( - cfg sink.KafkaProducer, + cfg sink.Kafka, logger *zerolog.Logger, ) (*Producer, error) { mSize := cfg.GetBatchNumMessages() opts := []kgo.Opt{ - kgo.SeedBrokers(cfg.GetKafka().Brokers), + kgo.SeedBrokers(cfg.GetCommon().Brokers...), kgo.WithLogger(kzerolog.New(logger)), kgo.ProducerLinger(cfg.GetLingerMs().GoDuration()), kgo.MaxBufferedRecords(mSize), } - if cfg.GetKafka().SaslEnabled { + if cfg.GetCommon().SaslEnabled { auth := scram.Auth{ - User: *cfg.GetKafka().SaslUsername, - Pass: *cfg.GetKafka().SaslPassword, + User: *cfg.GetCommon().SaslUsername, + Pass: *cfg.GetCommon().SaslPassword, } var authOpt kgo.Opt - switch cfg.GetKafka().SaslMechanism { + switch *cfg.GetCommon().SaslMechanism { case saslmechanism.SCRAMSHA512: authOpt = kgo.SASL(auth.AsSha512Mechanism()) case saslmechanism.SCRAMSHA256: authOpt = kgo.SASL(auth.AsSha512Mechanism()) default: - return nil, fmt.Errorf("unknown sasl mechanism: %s", cfg.GetKafka().SaslMechanism) + return nil, fmt.Errorf("unknown sasl mechanism: %s", cfg.GetCommon().SaslMechanism) } opts = append(opts, authOpt) } diff --git a/sources/sink/pipelaner/README.md b/sources/sink/pipelaner/README.md new file mode 100644 index 0000000..6de160d --- /dev/null +++ b/sources/sink/pipelaner/README.md @@ -0,0 +1 @@ +# Pipelaner diff --git a/sources/transform/README.md b/sources/transform/README.md new file mode 100644 index 0000000..8d482ae --- /dev/null +++ b/sources/transform/README.md @@ -0,0 +1,2 @@ +# Transforms + diff --git a/sources/transform/batch/README.md b/sources/transform/batch/README.md new file mode 100644 index 0000000..6bdde42 --- /dev/null +++ b/sources/transform/batch/README.md @@ -0,0 +1 @@ +# Batch diff --git a/sources/transform/chunks/README.md b/sources/transform/chunks/README.md new file mode 100644 index 0000000..ef94f67 --- /dev/null +++ b/sources/transform/chunks/README.md @@ -0,0 +1 @@ +# Chunks diff --git a/sources/transform/chunks/chunks.go b/sources/transform/chunks/chunks.go index 9bc25c8..e68ef72 100644 --- a/sources/transform/chunks/chunks.go +++ b/sources/transform/chunks/chunks.go @@ -44,6 +44,5 @@ func (c *Chunk) Transform(val any) any { } c.locked.Store(true) defer c.locked.Store(false) - v := <-c.buffer.Chunks() - return v + return c.buffer.Chunk() } diff --git a/sources/transform/debounce/README.md b/sources/transform/debounce/README.md new file mode 100644 index 0000000..ff443f5 --- /dev/null +++ b/sources/transform/debounce/README.md @@ -0,0 +1 @@ +# Debounce diff --git a/sources/transform/filter/README.md b/sources/transform/filter/README.md new file mode 100644 index 0000000..8cc4cd0 --- /dev/null +++ b/sources/transform/filter/README.md @@ -0,0 +1 @@ +# Filter diff --git a/sources/transform/remap/README.md b/sources/transform/remap/README.md new file mode 100644 index 0000000..35afcc4 --- /dev/null +++ b/sources/transform/remap/README.md @@ -0,0 +1 @@ +# Remap diff --git a/sources/transform/throttling/README.md b/sources/transform/throttling/README.md new file mode 100644 index 0000000..b6b8627 --- /dev/null +++ b/sources/transform/throttling/README.md @@ -0,0 +1 @@ +# Throttling