Note
|
in progress |
Apache Kafka is one of the most widely used solutions for messaging and pub/sub use cases. Based on this, other solutions are developed, like Debezium for change data capture. Debezium determines changes in data stores like Databases and publish the changes in a topic using Kafka.
The following article handles a short introduction and showcase using the following components
-
Strimzi (0.25.0): Solution to run Apache Kafka in Kubernetes/OpenShift
-
Debezium (1.7.0.Final): Determine data changes, from a MySQL or PostgreSQL database instance
The Figure 1 gives an short overview of the relevant components for this scenario and solution.
The consumer-app
is a custom application consuming the topic and in this case, retrieving the events with the data modifications. In this scenario this consuming application is a Quarkus application.
Install the relevant products
-
Strimzi: install via Helm3 charts
-
install strimzi in own namespace (here:
debezium-operator
) -
watch separate namespaces (here: single namespace
debezium-test
)
-
$ cd work $ export STRIMZI_VERSION=0.25.0 $ git clone -b $STRIMZI_VERSION https://github.com/strimzi/strimzi-kafka-operator $ cd strimzi-kafka-operator/helm-charts/helm3/strimzi-kafka-operator $ oc new-project strimzi-test $ oc new-project strimzi-operator $ helm3 upgrade --install strimzi . \ --set "watchNamespaces={strimzi-test}" \ --namespace strimzi-operator $ oc get pods -l name=strimzi-cluster-operator NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-5f8c7fd7c4-m629h 1/1 Running 0 2m25s
-
Install Kafka cluster and Zookeeper
-
Strimzi provides templates
-
$ oc create -f examples/templates/cluster-operator -n strimzi-operator template.template.openshift.io/strimzi-connect created template.template.openshift.io/strimzi-ephemeral created template.template.openshift.io/strimzi-mirror-maker created template.template.openshift.io/strimzi-persistent created
-
Install a Kafka broker
$ oc process strimzi-ephemeral \ -p CLUSTER_NAME=broker \ -p ZOOKEEPER_NODE_COUNT=1 \ -p KAFKA_NODE_COUNT=1 \ -p KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -p KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ | oc apply -n strimzi-test -f - kafka.kafka.strimzi.io/broker created $ c get pods -n strimzi-test NAME READY STATUS RESTARTS AGE broker-entity-operator-559b597ddf-xvmr7 3/3 Running 0 66s broker-kafka-0 1/1 Running 0 116s broker-zookeeper-0 1/1 Running 0 2m39s $ oc get kafka -n strimzi-test NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS broker 1 1 True True
-
Prepare and install Kafka Connect image with Debezium Connectors using
KafkaConnect
(spec)
Since Strimzi 0.25.0 is the S2I solution to create a Kafka connect image with Debezium connectors deprecated and replaced by KafkaConnect
build holding the information which plugins are needed.
$ oc apply -f scripts/kafka-connect-extended.yaml -n strimzi-test kafkaconnect.kafka.strimzi.io/kafka-connect-db-cluster created $ oc get pods NAME READY STATUS RESTARTS AGE broker-entity-operator-559b597ddf-xvmr7 3/3 Running 0 5h8m broker-kafka-0 1/1 Running 0 5h9m broker-zookeeper-0 1/1 Running 0 5h10m kafka-connect-db-cluster-connect-6cbd4c7f56-5k5l8 1/1 Running 0 87s kafka-connect-db-cluster-connect-build-1-build 0/1 Completed 0 2m8s
After a while Kafka Connect is built and connected to the Kafka broker.
-
Install Debezium-UI
$ oc apply -f scripts/debezium-ui.yaml -n strimzi-test service/debezium-service created configmap/debezium-service-config created deployment.apps/debezium-service created route.route.openshift.io/debezium-route created $ oc get pods NAME READY STATUS RESTARTS AGE broker-entity-operator-559b597ddf-xvmr7 3/3 Running 0 5h8m broker-kafka-0 1/1 Running 0 5h9m broker-zookeeper-0 1/1 Running 0 5h10m debezium-service-7d988bdd4-r9vz4 1/1 Running 0 4h10m kafka-connect-db-cluster-connect-6cbd4c7f56-5k5l8 1/1 Running 0 87s kafka-connect-db-cluster-connect-build-1-build 0/1 Completed 0 2m8s $ oc get routes NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD debezium-route debezium-route-strimzi-test....appdomain.cloud debezium-service <all> passthrough None
Afterwards the Debezium UI is available, but no connectors are installed.
But the wizard provides the possibility to register a connector
Now the following main components are successfully installed
-
Strimzi
-
Kafka Broker
-
Zookeeper
-
Kafka Connect with Debezium Connector for PostgreSQL and MySQL
-
-
Debezium UI
With the running Kafka and Debezium components let’s configure the data event capture for a MySQL or PostgreSQL database using the Debezium UI or direct via REST API to the Kafka Connect API. Prerequisite is a running DB instance.
oc exec -i -c kafka broker-kafka-0 -n strimzi-test -- curl -X POST \ -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://kafka-connect-postgresql-cluster-connect-api.strimzi-test:8083/connectors -d @- <<'EOF' { "name": "mysql-connector-test-01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "broker-kafka-bootstrap.strimzi-test:9092", "database.history.kafka.topic": "schema-changes.mysql-test" } } EOF
oc exec -i -c kafka broker-kafka-0 -n strimzi-test -- curl -X POST \ -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://kafka-connect-postgresql-cluster-connect-api.strimzi-test:8083/connectors -d @- <<'EOF' { "name": "pg-test-conntector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "test.postgresql.com", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "ibmclouddb", "database.server.name": "pg-test", "table.include.list": "public.importantevents", "plugin.name": "wal2json", "slot.name": "repl_log_postgresql_test" } } EOF
This creates a connector and configures also a new topic for the relevant database tables with the topic name a schema <namespace>.<schema>.<table-name>
like pg-test.public.importantevents
.
<namespace>
fragment has in the configuration API the name database.server.name
but in the UI is it namespace
.
Now modify (add, update) data rows in the table and verify the published messages on the Kafka topic.
The payload contains the previous and new data object in the payload.before
and payload.after
object.
oc exec -it broker-kafka-0 -n strimzi-test -- /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --list
oc exec -it broker-kafka-0 -n strimzi-test -- /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --from-beginning \ --property print.key=true \ --topic pg-test.public.importantevents
// ...
"payload": {
"before": {
"id": 15,
"title": "event1",
"event_state": "1",
"created_at": "2021-10-16T18:34:32Z"
},
"after": {
"id": 15,
"title": "event1",
"event_state": "2",
"created_at": "2021-10-16T19:11:14Z"
}
// ...
}
Debezium supports the possibility to format the message corresponding the Cloud Events specification.
To enable the Cloud Event type are additional parameters in the connector configuration needed
"value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type" : "json", "value.converter.data.serializer.type" : "json"
Supported types are JSON and Avro.
oc exec -i -c kafka broker-kafka-0 -n strimzi-test -- curl -X POST \ -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://kafka-connect-postgresql-cluster-connect-api.strimzi-test:8083/connectors -d @- <<'EOF' { "name": "pg-test-conntector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "test.postgresql.com", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "ibmclouddb", "database.server.name": "pg-test-ce", "table.include.list": "public.importantevents", "plugin.name": "wal2json", "slot.name": "repl_log_postgresql_test_ce", "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type" : "json", "value.converter.data.serializer.type" : "json" } } EOF
An example event looks like the following example
Details
{
"id": "name:pg-test-ce;lsn:654316360;txId:540",
"source": "/debezium/postgresql/pg-test-ce",
"specversion": "1.0",
"type": "io.debezium.postgresql.datachangeevent",
"time": "2021-10-24T18:50:04.046Z",
"datacontenttype": "application/json",
"iodebeziumop": "c",
"iodebeziumversion": "1.7.0.Final",
"iodebeziumconnector": "postgresql",
"iodebeziumname": "pg-test-ce",
"iodebeziumtsms": "1635101404046",
"iodebeziumsnapshot": "false",
"iodebeziumdb": "ibmclouddb",
"iodebeziumsequence": "[\"654314576\",\"654316360\"]",
"iodebeziumschema": "public",
"iodebeziumtable": "importantevents",
"iodebeziumtxId": "540",
"iodebeziumlsn": "654316360",
"iodebeziumxmin": null,
"iodebeziumtxid": null,
"iodebeziumtxtotalorder": null,
"iodebeziumtxdatacollectionorder": null,
"data": {
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "title"
},
{
"type": "string",
"optional": false,
"field": "event_state"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "created_at"
}
],
"optional": true,
"name": "pg_test_ce.public.importantevents.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "title"
},
{
"type": "string",
"optional": false,
"field": "event_state"
},
{
"type": "int64",
"optional": true,
"name": "io.debezium.time.MicroTimestamp",
"version": 1,
"field": "created_at"
}
],
"optional": true,
"name": "pg_test_ce.public.importantevents.Value",
"field": "after"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Data"
},
"payload": {
"before": null,
"after": {
"id": 34,
"title": "event1",
"event_state": "1",
"created_at": 1635108604046093
}
}
}
}
Kafka provides a solution to transform messages using Singe Message Transform (SMT). This can also be used in Debezium e.g. to filter messages which should be exposed to a topic.
The SMT feature is by default not enabled in Debezium. To enable to feature is an additional artifact in the KafkaConnect
necessary holding the dependency for the SMT scripting.
-
the SMT scripting artifact
-
a JSR 223 implementation like Groovy
plugins:
- name: debezium-smt-scripting
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-scripting/1.7.0.Final/debezium-scripting-1.7.0.Final.tar.gz
sha512sum: 360b48349ab7da6b68f91fb77eac7020989abeb756f9893a1c83d3ccd8872ef7d8b2a284cbfdcc8374bfc8cd5366ee1c74e7930cf4de331ed922fe78cf59df82
And with this enabled could a Debezium Connector configuration holds the transform rules
{
"name": "pg-test-conntector-cd",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "test.postgresql.com",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "ibmclouddb",
"database.server.name": "pg-test-cd",
"plugin.name": "wal2json",
"slot.name": "repl_log_postgresql_test_cd",
"schema.include.list": "public",
"table.include.list": "public.importantevents",
"transforms": "state2",
"transforms.state2.type": "io.debezium.transforms.Filter",
"transforms.state2.topic.regex": "",
"transforms.state2.language": "jsr223.groovy",
"transforms.state2.condition": "value.after.event_state == \"2\"",
"transforms.state2.null.handling.mode": "drop"
}
}
Note
|
Consider, this works well if a custom plug-in jar/tgz is provided, containing the DB connector (like postgresql) and the scripting related artifacts. Otherwise a mechanism or manual interaction is needed to copy the scripting libs into the DB related plug-in directory |
Debezium needs for PostgreSQL some configuration adjustments as suggested in the docu. One of the parameter is wal_level = logical
.
In case the PostgreSQL instance is a DBaaS in IBM Cloud use the following commands, which are also explained in the IBM Cloud: Change PostgreSQL Configuration page or more in detail in how to enable the wal2json plugin.
-
Set
wal_level
tological
-
Increase the default values for
max_replication_slots
andmax_wal_senders
-
Set the password for the user with
REPLICATION
permissions (here: userrepl
) -
Create a logical replication slot for the given PostgreSQL instance via API
wal_level
$ ibmcloud cdb ls Retrieving instances for all database types in all resource groups in all locations under ... as ... OK Name Location State postgresql-demo eu-de active $ ibmcloud cdb deployment-configuration-schema postgresql-demo Retrieving database configuration schema for postgresql-demo... OK Setting Default Kind Choices Requires Restart log_connections off choice [off, on] false log_disconnections off choice [off, on] false synchronous_commit local choice [local, off] false wal_level hot_standby choice [hot_standby, logical] true Setting Default Kind Min/Max Requires Restart archive_timeout 1800 integer [300, 1073741823] false deadlock_timeout 10000 integer [100, 2147483647] false effective_io_concurrency 12 integer [1, 1000] false log_min_duration_statement 100 integer [100, 2147483647] false max_connections 115 integer [115, null] true max_prepared_transactions 0 integer [0, null] true max_replication_slots 10 integer [10, 262143] true max_wal_senders 12 integer [12, 262143] true shared_buffers 32000 integer [16, null] true tcp_keepalives_count 6 integer [0, 2147483647] false tcp_keepalives_idle 300 integer [0, 2147483647] false tcp_keepalives_interval 10 integer [0, 2147483647] false $ ibmcloud cdb deployment-configuration postgresql-demo '{"configuration": {"wal_level": "logical", "max_replication_slots": 21, "max_wal_senders": 21 }}' Applying new configuration to postgresql-demo... The deployment's configuration is being changed with this task: ... $ ibmcloud cdb deployment-user-password postgresql-demo repl $ export BT=`ibmcloud iam oauth-tokens --output JSON | jq -r .iam_token` $ export DBID=`ibmcloud cdb about postgresql-demo -a -j | jq -r .resource.id | sed 's/\//%2F/g'` $ curl -X POST https://api.eu-de.databases.cloud.ibm.com/v5/ibm/deployments/${DBID}/postgresql/logical_replication_slots -H 'Authorization: '"${BT}"' \ -H 'Content-Type: application/json' \ -d '{"logical_replication_slot": {"name": "repl_log_postgresql-demo", "database_name": "ibmclouddb", "plugin_type": "wal2json" } }'
Verify the state with the sql SHOW wal_level
if the result is logical
.
Pay also attention to set in the Debezium Connector the plugin.name=wal2json
. Also the DB user/role needs the permissions to interact with the relevant tables.
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO repl;
-
In case multiple connectors using the same databse, use different replication slot names (
slot.name
) -
Replication has a hugh impact on storage and performance. Monitor and increase the resources accordingly
tbd
This article and project are licensed under the Apache License, Version 2. Separate third-party code objects invoked within this code pattern are licensed by their respective providers pursuant to their own separate licenses. Contributions are subject to the Developer Certificate of Origin, Version 1.1 and the Apache License, Version 2.
See also Apache License FAQ .