Skip to content

Commit

Permalink
add documentations
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyxo committed Dec 12, 2024
1 parent 593f4ba commit e1a05ba
Show file tree
Hide file tree
Showing 62 changed files with 800 additions and 279 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ lint:
.PHONY: test
test:
@go test -count=1 -v ./... -race

.PHONY: proto
proto:
@rm -rf sources/shared/proto
Expand Down
147 changes: 144 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,144 @@
[![Go Lint](https://github.com/pipelane/pipelaner/actions/workflows/lint.yml/badge.svg)](https://github.com/pipelane/pipelaner/actions/workflows/lint.yml)
[![Go Test](https://github.com/pipelane/pipelaner/actions/workflows/test.yml/badge.svg)](https://github.com/pipelane/pipelaner/actions/workflows/test.yml)
# pipelaner

# **Pipelaner**

**Pipelaner** is a high-performance and efficient **Framework and Agent** for creating data pipelines. The core of pipeline descriptions is based on the **_Configuration As Code_** concept and the [**Pkl**](https://github.com/apple/pkl) configuration language by **Apple**.

Pipelaner manages data streams through three key entities: **Generator**, **Transform**, and **Sink**.

---

## 📖 **Contents**
- [Core Entities](#core-entities)
- [Generator](#generator)
- [Transform](#transform)
- [Sink](#sink)
- [Basic Parameters](#basic-parameters)
- [Built-in Pipeline Elements](#built-in-pipeline-elements)
- [Generators](#generators)
- [Transforms](#transforms)
- [Sinks](#sinks)
- [Scalability](#scalability)
- [Single-Node Deployment](#single-node-deployment)
- [Multi-Node Deployment](#multi-node-deployment)
- [Support](#support)
- [License](#license)

---

## 📌 **Core Entities**

### **Generator**
The component responsible for creating or retrieving source data for the pipeline. Generators can produce messages, events, or retrieve data from various sources such as files, databases, or APIs.

- **Example use case:**
Reading data from a file or receiving events via webhooks.

---

### **Transform**
The component that processes data within the pipeline. Transforms perform operations such as filtering, aggregation, data transformation, or cleaning to prepare it for further processing.

- **Example use case:**
Filtering records based on specific conditions or converting data format from JSON to CSV.

---

### **Sink**
The final destination for the data stream. Sinks send processed data to a target system, such as a database, API, or message queue.

- **Example use case:**
Saving data to PostgreSQL or sending it to a Kafka topic.

---

### **Basic Parameters**
| **Parameter** | **Type** | **Description** |
|-----------------------|---------|---------------------------------------------------------------------------------------------------|
| `name` | String | Unique name of the pipeline element. |
| `threads` | Int | Number of threads for processing messages. Defaults to the value of `GOMAXPROC`. |
| `outputBufferSize` | Int | Size of the output buffer. **Not applicable to Sink components.** |

---

## 📦 **Built-in Pipeline Elements**

### **Generators**
| **Name** | **Description** |
|------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------|
| [**cmd**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/cmd) | Reads the output of a command, e.g., `"/usr/bin/log" "stream --style ndjson"`. |
| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/kafka) | Apache Kafka consumer that streams `Value` into the pipeline. |
| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/pipelaner) | GRPC server that streams values via [gRPC](https://github.com/pipelane/pipelaner/tree/main/proto/service.proto). |

---

### **Transforms**
| **Name** | **Description** |
|--------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------|
| [**batch**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/batch) | Forms batches of data with a specified size. |
| [**chunks**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/chunks) | Splits incoming data into chunks. |
| [**debounce**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/debounce) | Eliminates "bounce" (frequent repeats) in data. |
| [**filter**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/filter) | Filters data based on specified conditions. |
| [**remap**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/remap) | Reassigns fields or transforms the data structure. |
| [**throttling**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/throttling) | Limits data processing rate. |

---

### **Sinks**
| **Name** | **Description** |
|--------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------|
| [**clickhouse**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/clickhouse) | Sends data to a ClickHouse database. |
| [**console**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/console) | Outputs data to the console. |
| [**http**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/http) | Sends data to a specified HTTP endpoint. |
| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/kafka) | Publishes data to Apache Kafka. |
| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/pipelaner) | Streams data via gRPC to other Pipelaner nodes. |

---

## 🌐 **Scalability**

### **Single-Node Deployment**
For operation on a single host:
![Single Node](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-singlehost.png/?raw=true "Single Node Deployment")

---

### **Multi-Node Deployment**
For distributed data processing across multiple hosts:
![Multi-Node](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-multihost.png/?raw=true "Multi-Node Deployment")

For distributed interaction between nodes, you can use:
1. **gRPC** — via generators and sinks with the parameter `sourceName: "pipelaner"`.
2. **Apache Kafka** — for reading/writing data via topics.

Example configuration using Kafka:
```yaml
new Inputs.Kafka {
common {
topics {
"kafka-topic"
}
}
}

new Sinks.Kafka {
common {
topics {
"kafka-topic"
}
}
}
```

---

## 🤝 **Support**

If you have questions, suggestions, or encounter any issues, please [create an Issue](https://github.com/pipelane/pipelaner/issues/new) in the repository.
You can also participate in discussions in the [Discussions](https://github.com/pipelane/pipelaner/discussions) section.

---

## 📜 **License**

This project is licensed under the [Apache 2.0](https://github.com/pipelane/pipelaner/blob/main/LICENSE) license.
You are free to use, modify, and distribute the code under the terms of the license.
144 changes: 144 additions & 0 deletions README_RU.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@

# **Pipelaner**

**Pipelaner** — это высокопроизводительный и эффективный **Framework и Агент** для создания data pipelines. Основой описания пайплайнов служит концепция **_Configuration As Code_** и язык конфигураций [**Pkl**](https://github.com/apple/pkl) от компании **Apple**.

Pipelaner управляет потоками данных с помощью трех ключевых сущностей: **Generator**, **Transform**, и **Sink**.

---

## 📖 **Содержание**
- [Основные сущности](#основные-сущности)
- [Generator](#generator)
- [Transform](#transform)
- [Sink](#sink)
- [Базовые параметры](#базовые-параметры)
- [Элементы пайплайнов "из коробки"](#элементы-пайплайнов-из-коробки)
- [Generators](#generators)
- [Transforms](#transforms)
- [Sinks](#sinks)
- [Масштабируемость](#масштабируемость)
- [Одноузловое развертывание](#одноузловое-развертывание)
- [Многоузловое развертывание](#многоузловое-развертывание)
- [Поддержка](#поддержка)
- [Лицензия](#лицензия)

---

## 📌 **Основные сущности**

### **Generator**
Компонент, отвечающий за создание или получение исходных данных для потока. Generator может генерировать сообщения, события или извлекать данные из различных источников, таких как файлы, базы данных или API.

- **Пример использования:**
Чтение данных из файла или получение событий через вебхуки.

---

### **Transform**
Компонент, который обрабатывает данные в потоке. Transform выполняет операции, такие как фильтрация, агрегация, преобразование структуры или очистка данных, чтобы подготовить их для дальнейшей обработки.

- **Пример использования:**
Фильтрация записей с заданными условиями или преобразование формата данных из JSON в CSV.

---

### **Sink**
Конечная точка потока данных. Sink отправляет обработанные данные в целевую систему, например, в базу данных, API или систему очередей сообщений.

- **Пример использования:**
Сохранение данных в PostgreSQL или отправка их в Kafka-топик.

---

### **Базовые параметры**
| **Параметр** | **Тип** | **Описание** |
|-----------------------|---------|---------------------------------------------------------------------------------------------------|
| `name` | String | Уникальное название элемента пайплайна. |
| `threads` | Int | Количество потоков для обработки сообщений. По умолчанию равно значению переменной `GOMAXPROC`. |
| `outputBufferSize` | Int | Размер выходного буфера. **Не используется в Sink-компонентах.** |

---

## 📦 **Элементы пайплайнов "из коробки"**

### **Generators**
| **Название** | **Описание** |
|----------------------------------------------------------------------------------------------|------------------------------------------------------------------------------|
| [**cmd**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/cmd) | Считывает вывод команды, например `"/usr/bin/log" "stream --style ndjson"`. |
| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/kafka) | Консьюмер для Apache Kafka, передает по пайплайну значения `Value`. |
| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/generator/pipelaner) | GRPC-сервер, передает значения через [gRPC](https://github.com/pipelane/pipelaner/tree/main/proto/service.proto). |

---

### **Transforms**
| **Название** | **Описание** |
|------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------|
| [**batch**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/batch) | Формирует пакеты данных заданного размера. |
| [**chunks**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/chunks) | Разбивает входящие данные на чанки. |
| [**debounce**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/debounce) | Устраняет "дребезг" (частые повторы) в данных. |
| [**filter**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/filter) | Фильтрует данные по заданным условиям. |
| [**remap**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/remap) | Переназначает поля или преобразует структуру данных. |
| [**throttling**](https://github.com/pipelane/pipelaner/tree/main/sources/transform/throttling) | Ограничивает скорость обработки данных. |

---

### **Sinks**
| **Название** | **Описание** |
|------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------|
| [**clickhouse**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/clickhouse) | Отправляет данные в базу данных ClickHouse. |
| [**console**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/console) | Выводит данные в консоль. |
| [**http**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/http) | Отправляет данные на указанный HTTP-эндпоинт. |
| [**kafka**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/kafka) | Публикует данные в Apache Kafka. |
| [**pipelaner**](https://github.com/pipelane/pipelaner/tree/main/sources/sink/pipelaner) | Передает данные через gRPC к другим узлам Pipelaner. |

---

## 🌐 **Масштабируемость**

### **Одноузловое развертывание**
Для работы на одном хосте:
![Одноузловая схема](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-singlehost.png/?raw=true "Одноузловая схема")

---

### **Многоузловое развертывание**
Для распределенной обработки данных на нескольких хостах:
![Многоузловая схема](https://github.com/pipelane/pipelaner/blob/c8e232106e9acf8a1d8682d225e369f282f6523a/images/pipelaner-multihost.png/?raw=true "Многоузловая схема")

Для распределенного взаимодействия между узлами можно использовать:
1. **gRPC** — через генераторы и синки с параметром `sourceName: "pipelaner"`.
2. **Apache Kafka** — для чтения/записи данных через топики.

Пример конфигурации с использованием Kafka:
```pkl
new Inputs.Kafka {
common {
topics {
"kafka-topic"
}
}
}
new Sinks.Kafka {
common {
topics {
"kafka-topic"
}
}
}
```

---

## 🤝 **Поддержка**

Если у вас есть вопросы, предложения или вы нашли ошибку, пожалуйста, [создайте Issue](https://github.com/pipelane/pipelaner/issues/new) в репозитории.
Вы также можете участвовать в обсуждениях проекта в разделе [Discussions](https://github.com/pipelane/pipelaner/discussions).

---

## 📜 **Лицензия**

Этот проект распространяется под лицензией [Apache 2.0](https://github.com/pipelane/pipelaner/blob/main/LICENSE).
Вы можете свободно использовать, изменять и распространять код при соблюдении условий лицензии.
13 changes: 9 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewAgent(file string) (*Agent, error) {

func (a *Agent) initHealthCheck(cfg *config.Pipelaner) error {
hcCfg := cfg.Settings.HealthCheck
if hcCfg.Enable {
if hcCfg != nil {
hc, err := health.NewHealthCheck(cfg)
if err != nil {
return fmt.Errorf("init health check: %w", err)
Expand All @@ -64,7 +64,7 @@ func (a *Agent) initHealthCheck(cfg *config.Pipelaner) error {

func (a *Agent) initMetricsServer(cfg *config.Pipelaner) error {
metricsCfg := cfg.Settings.Metrics
if metricsCfg.Enable {
if metricsCfg != nil {
m, err := metrics.NewMetricsServer(metricsCfg)
if err != nil {
return fmt.Errorf("init metrics server: %w", err)
Expand All @@ -77,11 +77,15 @@ func (a *Agent) initMetricsServer(cfg *config.Pipelaner) error {
func (a *Agent) initPipelaner(cfg *config.Pipelaner) error {
pipelanerCfg := cfg.Pipelines
logCfg := cfg.Settings.Logger
// todo: use another solution for specific parameters
metricsCfg := cfg.Settings.Metrics
mEnable := false
if metricsCfg != nil {
mEnable = true
}
p, err := NewPipelaner(
pipelanerCfg,
logCfg,
cfg.Settings.Metrics.Enable,
mEnable,
cfg.Settings.StartGCAfterMessageProcess,
)
if err != nil {
Expand All @@ -105,6 +109,7 @@ func (a *Agent) Serve(ctx context.Context) error {
return a.metrics.Serve(ctx)
})
}

g.Go(func() error {
return a.pipelaner.Run(inputsCtx)
})
Expand Down
Loading

0 comments on commit e1a05ba

Please sign in to comment.