Skip to content

Применение Debezium для обработки потоковых данных: Основные концепции, примеры.

Notifications You must be signed in to change notification settings

zh-efimenko/demo-debezium

Repository files navigation

Debezium

Коннектор Debezium фиксирует изменения на уровне строк в схемах базы данных.

При первом подключении к серверу или кластеру БД коннектор делает последовательный snapshot всех схем. После завершения создания снимка коннектор непрерывно фиксирует изменения на уровне строк, которые вставляют, обновляют и удаляют содержимое базы данных и которые были зафиксированы в базе данных. Коннектор генерирует записи о событиях изменения данных и передает их в топики Kafka. По умолчанию для каждой таблицы коннектор передает все сгенерированные события в отдельный топик Kafka для этой таблицы. Приложения и сервисы потребляют записи событий изменения данных из этого топика.

Установка окружения

Для того чтобы начать получать CDC(Change Data Capture) из топиков в realtime, сперва необходимо установить и настроить Kafka Connect, который будет взаимодействовать с конкретной Kafka.

Запустим минимальное количество сервисов, для того чтобы наша система уже могла начать работать, с помощью docker compose:

See compose.yml

Настройка окружения

Подготовка Postgres

Накатим следующую миграцию:

See V1_1__init.sql

Подготовка Kafka Connect

Отправим следующий запрос:

See V2_1. create snapshot.http

Результат

После отправки запроса на создание коннектора, коннектор создает в базе данных snapshot: initial для таблицы sport, отправляет все текущее содержимое таблицы в топик и теперь слушает CDC со стороны таблицы.

Трансформации данных

Debezium предоставляет нам широкий спектр свойств, через которые мы можем фильтровать, менять, добавлять или удалять данные.

Вот некоторые из них:

Cast - Приведение полей или всего ключа или значения к определенному типу

DropHeaders - Удаление заголовков по имени

ExtractField - Извлечение определенного поля из Struct и Map и включение только этого поля в результаты

Filter - Удаляет сообщения из дальнейшей обработки. Используется с предикатом для селективной фильтрации определенных сообщений

Flatten - Преобразование вложенной структуры данных в плоскую

HeaderFrom - Копирование или перемещение полей в ключе или значении в заголовки записи

HoistField - Обертывание всего события как одного поля внутри Struct или Map

InsertField - Добавление поля с использованием статических данных или метаданных записи

InsertHeader - Добавление заголовка с использованием статических данных

MaskField - Замена поля на допустимое нулевое значение для типа (0, пустая строка и т.д.) или на пользовательскую замену (только непустая строка или числовое значение)

RegexRouter - Изменение темы записи на основе исходной темы, строки замены и регулярного выражения

ReplaceField - Фильтрация или переименование полей

SetSchemaMetadata - Изменение имени или версии схемы

TimestampConverter - Преобразование меток времени между различными форматами

TimestampRouter - Изменение темы записи на основе исходной темы и временной метки. Полезно, когда используется приемник, который записывает в разные таблицы или индексы на основе временных меток

ValueToKey - Замена ключа записи новым ключом, сформированным из подмножества полей значения записи

  "transforms": "extractKeyField,extractValueField",

  "transforms.extractKeyField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKeyField.field": "id",

  "transforms.extractValueField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
  "transforms.extractValueField.field": "after"

Incremental snapshots

Одним из основных улучшений в Debezium, начиная с версии 1.6, является поддержка incremental snapshots.

Зачем нужны incremental snapshots?

Одной из наиболее болезненных точек Debezium с момента его создания была неоптимальная поддержка изменений в списке захватываемых таблиц. Пользователь создает новый коннектор со списком перехватываемых таблиц (table.include.list и связанные с ним опции); впоследствии может возникнуть необходимость скорректировать эту конфигурацию, чтобы перехватить дополнительные таблицы, которые изначально не входили в CDC. Если достаточно передавать только изменения из этих таблиц, то проблема решается достаточно просто. Но что делать, если необходимо захватить и существующее содержимое таблиц?

Захват существующих данных в таблицах традиционно выполняется Debezium в фазе snapshot. Эта фаза выполняется один раз при первом запуске коннектора, и ее целью является захват последовательных данных в определенный момент времени (преобразование данных в состоянии покоя в данные в движении). Это может быть достаточно длительная операция, и по определению она должна быть выполнена полностью или не выполнена вовсе - это напоминает семантику транзакций. Это означает, что если снимок не будет завершен, например, из-за перезапуска коннектора, то он должен быть выполнен заново, и все уже сделанное будет отброшено. Кроме того, пока делается снимок, любые модификации данных, параллельно выполняемые в базе данных, не передаются до завершения снимка. Это может привести к проблемам с ресурсами базы данных для очень больших снимков, так как журналы транзакций должны быть доступны до начала потоковой передачи.

Таким образом, мы получаем три проблемы, требующие решения:

  • Практически невозможность добавления дополнительных таблиц в список захваченных таблиц, если существующие данные должны быть переданы потоком
  • Длительный процесс последовательного создания моментальных снимков, который нельзя прервать или возобновить
  • Блокировка потоковой передачи данных об изменениях до завершения моментального снимка

Legacy Solutions

Проблема была хорошо известна, и со временем были разработаны обходные пути. В качестве обходного пути общей рекомендацией было использование подхода, основанного на использовании нескольких коннекторов. Пользователю предлагалось:

  • Остановить коннектор
  • Создать новый коннектор для создания снимков новых таблиц (с использованием режима initial_only snapshot)
  • После завершения работы остановите новый коннектор
  • Переконфигурировать и запустить старый коннектор с добавлением в список новых захваченных таблиц

Этот способ в некоторой степени справился с поставленной задачей, но он очень неуклюж, и все вопросы, связанные с согласованностью моментальных снимков, о которых говорилось выше, остаются актуальными.

Сигнальная таблица

Иногда бывает полезно управлять Debezium извне, чтобы заставить его выполнить то или иное запрошенное действие. Допустим, необходимо выполнить повторный снимок уже созданной таблицы - так называемый ad-hoc снимок. Пользователю необходимо послать Debezium команду на приостановку текущей операции и выполнение снапшота. Для этого в Debezium определены сигналы, выдаваемые через таблицу сигналов. Это специальная таблица, предназначенная для общения между пользователем и Debezium. Debezium перехватывает таблицу и, когда пользователю требуется выполнить определенную операцию, он просто записывает запись в сигнальную таблицу (посылает сигнал). Debezium получит захваченное изменение и затем выполнит требуемое действие.

Пример:

Вначале мы запустим развертывание, создадим таблицу сигнализации и запустим коннектор:

See compose.yml

See V1_1__init.sql

See _V1_3__add_dbz.sql

See V3_1. create 4.http

Как и в первом случае в кафке появиться топик с данными о sport.

Теперь для того чтобы настроить CDC также и для таблицы category нам нужно сделать update дебезиум конфига, добавив в table.include.list еще одну таблицу - custom.category:

See V3_2. update 1.http

Warning: После данного шага вы сможете увидеть в топике данные только с текущего момента, но не сначала создания snapshot.

Однако это легко исправить отправив команду в сигнальную таблицу:

INSERT INTO public.dbz_signal
VALUES ('signal-1', 'execute-snapshot', '{"data-collections": ["custom.category"]}');

Debezium запустит необходимые процессы по созданию incremental snapshot без прерывания стриминга данных в других таблицах.

Сигналы через Kafka топик

В Debezium 2.3 сигнальный канал Kafka доступен для всех коннекторов.

Это усовершенствование обеспечивает упрощенный подход к интеграции, а также единый и согласованный подход к сигналам для всех поддерживаемых Debezium баз данных. Вы можете отправлять сигналы в определенный топик Kafka, а Debezium будет потреблять и обрабатывать этот сигнал, как если бы он исходил из самой таблицы сигналов.

Использование топика Kafka для сигналов дает несколько преимуществ. Во-первых, оно соответствует событийно-ориентированному дизайну, что делает его естественным для сбора данных об изменениях и Debezium. Кроме того, это безопасный способ отправки сигналов в Debezium без необходимости предоставления пользователю прямого доступа к базе данных.

Warning: Даже при использовании сигнального подхода Kafka функция инкрементного снимка все равно требует наличия и использования сигнальной таблицы для управления некоторыми операциями, необходимыми для процесса инкрементного снимка.

Продолжим с того же места, где мы остановились в предыдущем примере.

Для того чтобы у нас заработал сигнальный канал через Kafka топик, нам необходимо добавить следующие проперти в конфиг дебезиума:

  • "signal.enabled.channels": "source,kafka" (без явного указания по умолчанию: source)
  • "signal.kafka.bootstrap.servers": "kafka:29092" (required)
  • "signal.kafka.topic": "pgsql.kafka.signal" (без явного указания по умолчанию: <topic.prefix>-signal)

Отправить сигнал можно следующим образом:

>> kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic pgsql.demo.kafka.signal --property "parse.key=true" --property "key.separator=:"
 
У вас откроется канал для отправки данных в топик
 
>> pgsql.demo:{"type":"execute-snapshot", "data": {"data-collections": ["custom.category"], "type": "incremental"}}

Warning: Сообщение обязательно должно иметь ключ, который должен обязательно совпадать со значением из проперти topic.prefix. В значения для data-collections перечисляем нужные нам таблицы для incremental shapshot.

После этого Debezium запустит процесс incremental shapshot , который создаст новые ивенты в топике pgsql.demo.custom.category начиная с самого начала существования таблицы.

Presentation

Presentation

About

Применение Debezium для обработки потоковых данных: Основные концепции, примеры.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published