Коннектор Debezium фиксирует изменения на уровне строк в схемах базы данных.
При первом подключении к серверу или кластеру БД коннектор делает последовательный snapshot всех схем. После завершения создания снимка коннектор непрерывно фиксирует изменения на уровне строк, которые вставляют, обновляют и удаляют содержимое базы данных и которые были зафиксированы в базе данных. Коннектор генерирует записи о событиях изменения данных и передает их в топики Kafka. По умолчанию для каждой таблицы коннектор передает все сгенерированные события в отдельный топик Kafka для этой таблицы. Приложения и сервисы потребляют записи событий изменения данных из этого топика.
Для того чтобы начать получать CDC(Change Data Capture) из топиков в realtime, сперва необходимо установить и настроить Kafka Connect, который будет взаимодействовать с конкретной Kafka.
Запустим минимальное количество сервисов, для того чтобы наша система уже могла начать работать, с помощью docker compose:
See compose.yml
Накатим следующую миграцию:
See V1_1__init.sql
Отправим следующий запрос:
See V2_1. create snapshot.http
После отправки запроса на создание коннектора, коннектор создает в базе данных snapshot: initial для таблицы sport, отправляет все текущее содержимое таблицы в топик и теперь слушает CDC со стороны таблицы.
Debezium предоставляет нам широкий спектр свойств, через которые мы можем фильтровать, менять, добавлять или удалять данные.
Вот некоторые из них:
Cast - Приведение полей или всего ключа или значения к определенному типу
DropHeaders - Удаление заголовков по имени
ExtractField - Извлечение определенного поля из Struct и Map и включение только этого поля в результаты
Filter - Удаляет сообщения из дальнейшей обработки. Используется с предикатом для селективной фильтрации определенных сообщений
Flatten - Преобразование вложенной структуры данных в плоскую
HeaderFrom - Копирование или перемещение полей в ключе или значении в заголовки записи
HoistField - Обертывание всего события как одного поля внутри Struct или Map
InsertField - Добавление поля с использованием статических данных или метаданных записи
InsertHeader - Добавление заголовка с использованием статических данных
MaskField - Замена поля на допустимое нулевое значение для типа (0, пустая строка и т.д.) или на пользовательскую замену (только непустая строка или числовое значение)
RegexRouter - Изменение темы записи на основе исходной темы, строки замены и регулярного выражения
ReplaceField - Фильтрация или переименование полей
SetSchemaMetadata - Изменение имени или версии схемы
TimestampConverter - Преобразование меток времени между различными форматами
TimestampRouter - Изменение темы записи на основе исходной темы и временной метки. Полезно, когда используется приемник, который записывает в разные таблицы или индексы на основе временных меток
ValueToKey - Замена ключа записи новым ключом, сформированным из подмножества полей значения записи
"transforms": "extractKeyField,extractValueField",
"transforms.extractKeyField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyField.field": "id",
"transforms.extractValueField.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractValueField.field": "after"
Одним из основных улучшений в Debezium, начиная с версии 1.6, является поддержка incremental snapshots.
Одной из наиболее болезненных точек Debezium с момента его создания была неоптимальная поддержка изменений в списке захватываемых таблиц. Пользователь создает новый коннектор со списком перехватываемых таблиц (table.include.list и связанные с ним опции); впоследствии может возникнуть необходимость скорректировать эту конфигурацию, чтобы перехватить дополнительные таблицы, которые изначально не входили в CDC. Если достаточно передавать только изменения из этих таблиц, то проблема решается достаточно просто. Но что делать, если необходимо захватить и существующее содержимое таблиц?
Захват существующих данных в таблицах традиционно выполняется Debezium в фазе snapshot. Эта фаза выполняется один раз при первом запуске коннектора, и ее целью является захват последовательных данных в определенный момент времени (преобразование данных в состоянии покоя в данные в движении). Это может быть достаточно длительная операция, и по определению она должна быть выполнена полностью или не выполнена вовсе - это напоминает семантику транзакций. Это означает, что если снимок не будет завершен, например, из-за перезапуска коннектора, то он должен быть выполнен заново, и все уже сделанное будет отброшено. Кроме того, пока делается снимок, любые модификации данных, параллельно выполняемые в базе данных, не передаются до завершения снимка. Это может привести к проблемам с ресурсами базы данных для очень больших снимков, так как журналы транзакций должны быть доступны до начала потоковой передачи.
Таким образом, мы получаем три проблемы, требующие решения:
- Практически невозможность добавления дополнительных таблиц в список захваченных таблиц, если существующие данные должны быть переданы потоком
- Длительный процесс последовательного создания моментальных снимков, который нельзя прервать или возобновить
- Блокировка потоковой передачи данных об изменениях до завершения моментального снимка
Проблема была хорошо известна, и со временем были разработаны обходные пути. В качестве обходного пути общей рекомендацией было использование подхода, основанного на использовании нескольких коннекторов. Пользователю предлагалось:
- Остановить коннектор
- Создать новый коннектор для создания снимков новых таблиц (с использованием режима initial_only snapshot)
- После завершения работы остановите новый коннектор
- Переконфигурировать и запустить старый коннектор с добавлением в список новых захваченных таблиц
Этот способ в некоторой степени справился с поставленной задачей, но он очень неуклюж, и все вопросы, связанные с согласованностью моментальных снимков, о которых говорилось выше, остаются актуальными.
Иногда бывает полезно управлять Debezium извне, чтобы заставить его выполнить то или иное запрошенное действие. Допустим, необходимо выполнить повторный снимок уже созданной таблицы - так называемый ad-hoc снимок. Пользователю необходимо послать Debezium команду на приостановку текущей операции и выполнение снапшота. Для этого в Debezium определены сигналы, выдаваемые через таблицу сигналов. Это специальная таблица, предназначенная для общения между пользователем и Debezium. Debezium перехватывает таблицу и, когда пользователю требуется выполнить определенную операцию, он просто записывает запись в сигнальную таблицу (посылает сигнал). Debezium получит захваченное изменение и затем выполнит требуемое действие.
Пример:
Вначале мы запустим развертывание, создадим таблицу сигнализации и запустим коннектор:
See compose.yml
See V1_1__init.sql
Как и в первом случае в кафке появиться топик с данными о sport.
Теперь для того чтобы настроить CDC также и для таблицы category нам нужно сделать update дебезиум конфига, добавив в table.include.list еще одну таблицу - custom.category:
Warning: После данного шага вы сможете увидеть в топике данные только с текущего момента, но не сначала создания snapshot.
Однако это легко исправить отправив команду в сигнальную таблицу:
INSERT INTO public.dbz_signal
VALUES ('signal-1', 'execute-snapshot', '{"data-collections": ["custom.category"]}');
Debezium запустит необходимые процессы по созданию incremental snapshot без прерывания стриминга данных в других таблицах.
В Debezium 2.3 сигнальный канал Kafka доступен для всех коннекторов.
Это усовершенствование обеспечивает упрощенный подход к интеграции, а также единый и согласованный подход к сигналам для всех поддерживаемых Debezium баз данных. Вы можете отправлять сигналы в определенный топик Kafka, а Debezium будет потреблять и обрабатывать этот сигнал, как если бы он исходил из самой таблицы сигналов.
Использование топика Kafka для сигналов дает несколько преимуществ. Во-первых, оно соответствует событийно-ориентированному дизайну, что делает его естественным для сбора данных об изменениях и Debezium. Кроме того, это безопасный способ отправки сигналов в Debezium без необходимости предоставления пользователю прямого доступа к базе данных.
Warning: Даже при использовании сигнального подхода Kafka функция инкрементного снимка все равно требует наличия и использования сигнальной таблицы для управления некоторыми операциями, необходимыми для процесса инкрементного снимка.
Продолжим с того же места, где мы остановились в предыдущем примере.
Для того чтобы у нас заработал сигнальный канал через Kafka топик, нам необходимо добавить следующие проперти в конфиг дебезиума:
- "signal.enabled.channels": "source,kafka" (без явного указания по умолчанию: source)
- "signal.kafka.bootstrap.servers": "kafka:29092" (required)
- "signal.kafka.topic": "pgsql.kafka.signal" (без явного указания по умолчанию: <topic.prefix>-signal)
Отправить сигнал можно следующим образом:
>> kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic pgsql.demo.kafka.signal --property "parse.key=true" --property "key.separator=:"
У вас откроется канал для отправки данных в топик
>> pgsql.demo:{"type":"execute-snapshot", "data": {"data-collections": ["custom.category"], "type": "incremental"}}
Warning: Сообщение обязательно должно иметь ключ, который должен обязательно совпадать со значением из проперти topic.prefix. В значения для data-collections перечисляем нужные нам таблицы для incremental shapshot.
После этого Debezium запустит процесс incremental shapshot , который создаст новые ивенты в топике pgsql.demo.custom.category начиная с самого начала существования таблицы.