Skip to content

haf-tech/debezium-101

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Debezium 101

Note
in progress

Apache Kafka is one of the most widely used solutions for messaging and pub/sub use cases. Based on this, other solutions are developed, like Debezium for change data capture. Debezium determines changes in data stores like Databases and publish the changes in a topic using Kafka.

The following article handles a short introduction and showcase using the following components

  • Strimzi (0.25.0): Solution to run Apache Kafka in Kubernetes/OpenShift

  • Debezium (1.7.0.Final): Determine data changes, from a MySQL or PostgreSQL database instance

The Figure 1 gives an short overview of the relevant components for this scenario and solution.

Overview of Kafka and Debezium components

kafka debezium

The consumer-app is a custom application consuming the topic and in this case, retrieving the events with the data modifications. In this scenario this consuming application is a Quarkus application.

Install the relevant products

  • Strimzi: install via Helm3 charts

    • install strimzi in own namespace (here: debezium-operator)

    • watch separate namespaces (here: single namespace debezium-test)

Summary Strimzi installation
$ cd work
$ export STRIMZI_VERSION=0.25.0
$ git clone -b $STRIMZI_VERSION https://github.com/strimzi/strimzi-kafka-operator


$ cd strimzi-kafka-operator/helm-charts/helm3/strimzi-kafka-operator


$ oc new-project strimzi-test
$ oc new-project strimzi-operator

$ helm3 upgrade --install strimzi . \
 --set "watchNamespaces={strimzi-test}" \
 --namespace strimzi-operator

$ oc get pods -l name=strimzi-cluster-operator

NAME                                        READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-5f8c7fd7c4-m629h   1/1     Running   0          2m25s
  • Install Kafka cluster and Zookeeper

Summary Strimzi template installation
$ oc create -f examples/templates/cluster-operator -n strimzi-operator

template.template.openshift.io/strimzi-connect created
template.template.openshift.io/strimzi-ephemeral created
template.template.openshift.io/strimzi-mirror-maker created
template.template.openshift.io/strimzi-persistent created
  • Install a Kafka broker

Summary Kafka broker installation
$ oc process strimzi-ephemeral \
 -p CLUSTER_NAME=broker \
 -p ZOOKEEPER_NODE_COUNT=1 \
 -p KAFKA_NODE_COUNT=1 \
 -p KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
 -p KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
 | oc apply -n strimzi-test -f -

kafka.kafka.strimzi.io/broker created

$ c get pods -n strimzi-test

NAME                                      READY   STATUS    RESTARTS   AGE
broker-entity-operator-559b597ddf-xvmr7   3/3     Running   0          66s
broker-kafka-0                            1/1     Running   0          116s
broker-zookeeper-0                        1/1     Running   0          2m39s

$ oc get kafka -n strimzi-test

NAME     DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS
broker   1                        1                     True    True

Since Strimzi 0.25.0 is the S2I solution to create a Kafka connect image with Debezium connectors deprecated and replaced by KafkaConnect build holding the information which plugins are needed.

Summary Kafka Connect installation
$ oc apply -f scripts/kafka-connect-extended.yaml -n strimzi-test

kafkaconnect.kafka.strimzi.io/kafka-connect-db-cluster created

$ oc get pods

NAME                                                        READY   STATUS      RESTARTS   AGE
broker-entity-operator-559b597ddf-xvmr7                     3/3     Running     0          5h8m
broker-kafka-0                                              1/1     Running     0          5h9m
broker-zookeeper-0                                          1/1     Running     0          5h10m
kafka-connect-db-cluster-connect-6cbd4c7f56-5k5l8           1/1     Running     0          87s
kafka-connect-db-cluster-connect-build-1-build              0/1     Completed   0          2m8s

After a while Kafka Connect is built and connected to the Kafka broker.

$ oc apply -f scripts/debezium-ui.yaml -n strimzi-test

service/debezium-service created
configmap/debezium-service-config created
deployment.apps/debezium-service created
route.route.openshift.io/debezium-route created

$ oc get pods

NAME                                                        READY   STATUS      RESTARTS   AGE
broker-entity-operator-559b597ddf-xvmr7                     3/3     Running     0          5h8m
broker-kafka-0                                              1/1     Running     0          5h9m
broker-zookeeper-0                                          1/1     Running     0          5h10m
debezium-service-7d988bdd4-r9vz4                            1/1     Running     0          4h10m
kafka-connect-db-cluster-connect-6cbd4c7f56-5k5l8           1/1     Running     0          87s
kafka-connect-db-cluster-connect-build-1-build              0/1     Completed   0          2m8s


$ oc get routes

NAME             HOST/PORT                                        PATH   SERVICES           PORT    TERMINATION   WILDCARD
debezium-route   debezium-route-strimzi-test....appdomain.cloud          debezium-service   <all>   passthrough   None

Afterwards the Debezium UI is available, but no connectors are installed.

debezium ui empty

But the wizard provides the possibility to register a connector

debezium ui connector wizard

Now the following main components are successfully installed

  • Strimzi

    • Kafka Broker

    • Zookeeper

    • Kafka Connect with Debezium Connector for PostgreSQL and MySQL

  • Debezium UI

With the running Kafka and Debezium components let’s configure the data event capture for a MySQL or PostgreSQL database using the Debezium UI or direct via REST API to the Kafka Connect API. Prerequisite is a running DB instance.

Command for MySQL instance
oc exec -i -c kafka broker-kafka-0 -n strimzi-test -- curl -X POST \
    -H "Accept:application/json" \
    -H "Content-Type:application/json" \
    http://kafka-connect-postgresql-cluster-connect-api.strimzi-test:8083/connectors -d @- <<'EOF'

{
    "name": "mysql-connector-test-01",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "broker-kafka-bootstrap.strimzi-test:9092",
        "database.history.kafka.topic": "schema-changes.mysql-test"
    }
}
EOF
Command for PostgreSQL instance
oc exec -i -c kafka broker-kafka-0 -n strimzi-test -- curl -X POST \
    -H "Accept:application/json" \
    -H "Content-Type:application/json" \
    http://kafka-connect-postgresql-cluster-connect-api.strimzi-test:8083/connectors -d @- <<'EOF'

{
    "name": "pg-test-conntector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "test.postgresql.com",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "ibmclouddb",
        "database.server.name": "pg-test",
        "table.include.list": "public.importantevents",
        "plugin.name": "wal2json",
        "slot.name": "repl_log_postgresql_test"
    }
}
EOF

This creates a connector and configures also a new topic for the relevant database tables with the topic name a schema <namespace>.<schema>.<table-name> like pg-test.public.importantevents. <namespace> fragment has in the configuration API the name database.server.name but in the UI is it namespace.

Now modify (add, update) data rows in the table and verify the published messages on the Kafka topic. The payload contains the previous and new data object in the payload.before and payload.after object.

List existing topics
oc exec -it broker-kafka-0 -n strimzi-test -- /opt/kafka/bin/kafka-topics.sh \
    --bootstrap-server localhost:9092 --list
Listen on messages from a given topic
oc exec -it broker-kafka-0 -n strimzi-test -- /opt/kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --from-beginning \
    --property print.key=true \
    --topic pg-test.public.importantevents
// ...
"payload": {
    "before": {
        "id": 15,
        "title": "event1",
        "event_state": "1",
        "created_at": "2021-10-16T18:34:32Z"
    },
    "after": {
        "id": 15,
        "title": "event1",
        "event_state": "2",
        "created_at": "2021-10-16T19:11:14Z"
    }
    // ...
}

Debezium supports the possibility to format the message corresponding the Cloud Events specification.

To enable the Cloud Event type are additional parameters in the connector configuration needed

    "value.converter": "io.debezium.converters.CloudEventsConverter",
    "value.converter.serializer.type" : "json",
    "value.converter.data.serializer.type" : "json"

Supported types are JSON and Avro.

Command for PostgreSQL instance with CloudEvents support
oc exec -i -c kafka broker-kafka-0 -n strimzi-test -- curl -X POST \
    -H "Accept:application/json" \
    -H "Content-Type:application/json" \
    http://kafka-connect-postgresql-cluster-connect-api.strimzi-test:8083/connectors -d @- <<'EOF'

{
    "name": "pg-test-conntector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "test.postgresql.com",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "ibmclouddb",
        "database.server.name": "pg-test-ce",
        "table.include.list": "public.importantevents",
        "plugin.name": "wal2json",
        "slot.name": "repl_log_postgresql_test_ce",
        "value.converter": "io.debezium.converters.CloudEventsConverter",
        "value.converter.serializer.type" : "json",
        "value.converter.data.serializer.type" : "json"
    }
}
EOF

An example event looks like the following example

Details
{
    "id": "name:pg-test-ce;lsn:654316360;txId:540",
    "source": "/debezium/postgresql/pg-test-ce",
    "specversion": "1.0",
    "type": "io.debezium.postgresql.datachangeevent",
    "time": "2021-10-24T18:50:04.046Z",
    "datacontenttype": "application/json",
    "iodebeziumop": "c",
    "iodebeziumversion": "1.7.0.Final",
    "iodebeziumconnector": "postgresql",
    "iodebeziumname": "pg-test-ce",
    "iodebeziumtsms": "1635101404046",
    "iodebeziumsnapshot": "false",
    "iodebeziumdb": "ibmclouddb",
    "iodebeziumsequence": "[\"654314576\",\"654316360\"]",
    "iodebeziumschema": "public",
    "iodebeziumtable": "importantevents",
    "iodebeziumtxId": "540",
    "iodebeziumlsn": "654316360",
    "iodebeziumxmin": null,
    "iodebeziumtxid": null,
    "iodebeziumtxtotalorder": null,
    "iodebeziumtxdatacollectionorder": null,
    "data": {
        "schema": {
            "type": "struct",
            "fields": [
                {
                    "type": "struct",
                    "fields": [
                        {
                            "type": "int32",
                            "optional": false,
                            "default": 0,
                            "field": "id"
                        },
                        {
                            "type": "string",
                            "optional": false,
                            "field": "title"
                        },
                        {
                            "type": "string",
                            "optional": false,
                            "field": "event_state"
                        },
                        {
                            "type": "int64",
                            "optional": true,
                            "name": "io.debezium.time.MicroTimestamp",
                            "version": 1,
                            "field": "created_at"
                        }
                    ],
                    "optional": true,
                    "name": "pg_test_ce.public.importantevents.Value",
                    "field": "before"
                },
                {
                    "type": "struct",
                    "fields": [
                        {
                            "type": "int32",
                            "optional": false,
                            "default": 0,
                            "field": "id"
                        },
                        {
                            "type": "string",
                            "optional": false,
                            "field": "title"
                        },
                        {
                            "type": "string",
                            "optional": false,
                            "field": "event_state"
                        },
                        {
                            "type": "int64",
                            "optional": true,
                            "name": "io.debezium.time.MicroTimestamp",
                            "version": 1,
                            "field": "created_at"
                        }
                    ],
                    "optional": true,
                    "name": "pg_test_ce.public.importantevents.Value",
                    "field": "after"
                }
            ],
            "optional": false,
            "name": "io.debezium.connector.mysql.Data"
        },
        "payload": {
            "before": null,
            "after": {
                "id": 34,
                "title": "event1",
                "event_state": "1",
                "created_at": 1635108604046093
            }
        }
    }
}

Kafka provides a solution to transform messages using Singe Message Transform (SMT). This can also be used in Debezium e.g. to filter messages which should be exposed to a topic.

The SMT feature is by default not enabled in Debezium. To enable to feature is an additional artifact in the KafkaConnect necessary holding the dependency for the SMT scripting.

  • the SMT scripting artifact

  • a JSR 223 implementation like Groovy

plugins:
      - name: debezium-smt-scripting
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-scripting/1.7.0.Final/debezium-scripting-1.7.0.Final.tar.gz
            sha512sum: 360b48349ab7da6b68f91fb77eac7020989abeb756f9893a1c83d3ccd8872ef7d8b2a284cbfdcc8374bfc8cd5366ee1c74e7930cf4de331ed922fe78cf59df82

And with this enabled could a Debezium Connector configuration holds the transform rules

{
    "name": "pg-test-conntector-cd",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "test.postgresql.com",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "ibmclouddb",
        "database.server.name": "pg-test-cd",
        "plugin.name": "wal2json",
        "slot.name": "repl_log_postgresql_test_cd",
        "schema.include.list": "public",
        "table.include.list": "public.importantevents",
        "transforms": "state2",
        "transforms.state2.type": "io.debezium.transforms.Filter",
        "transforms.state2.topic.regex": "",
        "transforms.state2.language": "jsr223.groovy",
        "transforms.state2.condition": "value.after.event_state == \"2\"",
        "transforms.state2.null.handling.mode": "drop"
    }
}
Note
Consider, this works well if a custom plug-in jar/tgz is provided, containing the DB connector (like postgresql) and the scripting related artifacts. Otherwise a mechanism or manual interaction is needed to copy the scripting libs into the DB related plug-in directory

Debezium needs for PostgreSQL some configuration adjustments as suggested in the docu. One of the parameter is wal_level = logical.

In case the PostgreSQL instance is a DBaaS in IBM Cloud use the following commands, which are also explained in the IBM Cloud: Change PostgreSQL Configuration page or more in detail in how to enable the wal2json plugin.

Summarized the following steps are needed
  • Set wal_level to logical

  • Increase the default values for max_replication_slots and max_wal_senders

  • Set the password for the user with REPLICATION permissions (here: user repl)

  • Create a logical replication slot for the given PostgreSQL instance via API

Commands to change wal_level
$ ibmcloud cdb ls
Retrieving instances for all database types in all resource groups in all locations under ... as ...
OK
Name                             Location   State
postgresql-demo                  eu-de      active

$ ibmcloud cdb deployment-configuration-schema postgresql-demo
Retrieving database configuration schema for postgresql-demo...
OK

Setting              Default       Kind     Choices                  Requires Restart
log_connections      off           choice   [off, on]                false
log_disconnections   off           choice   [off, on]                false
synchronous_commit   local         choice   [local, off]             false
wal_level            hot_standby   choice   [hot_standby, logical]   true

Setting                      Default   Kind      Min/Max             Requires Restart
archive_timeout              1800      integer   [300, 1073741823]   false
deadlock_timeout             10000     integer   [100, 2147483647]   false
effective_io_concurrency     12        integer   [1, 1000]           false
log_min_duration_statement   100       integer   [100, 2147483647]   false
max_connections              115       integer   [115, null]         true
max_prepared_transactions    0         integer   [0, null]           true
max_replication_slots        10        integer   [10, 262143]        true
max_wal_senders              12        integer   [12, 262143]        true
shared_buffers               32000     integer   [16, null]          true
tcp_keepalives_count         6         integer   [0, 2147483647]     false
tcp_keepalives_idle          300       integer   [0, 2147483647]     false
tcp_keepalives_interval      10        integer   [0, 2147483647]     false



$ ibmcloud cdb deployment-configuration postgresql-demo '{"configuration": {"wal_level": "logical", "max_replication_slots": 21, "max_wal_senders": 21 }}'
Applying new configuration to postgresql-demo...
The deployment's configuration is being changed with this task:
...

$ ibmcloud cdb deployment-user-password postgresql-demo repl

$ export BT=`ibmcloud iam oauth-tokens --output JSON | jq -r .iam_token`
$ export DBID=`ibmcloud cdb about postgresql-demo -a -j | jq -r .resource.id | sed 's/\//%2F/g'`
$ curl -X POST https://api.eu-de.databases.cloud.ibm.com/v5/ibm/deployments/${DBID}/postgresql/logical_replication_slots   -H 'Authorization: '"${BT}"' \
  -H 'Content-Type: application/json' \
  -d '{"logical_replication_slot": {"name": "repl_log_postgresql-demo", "database_name": "ibmclouddb", "plugin_type": "wal2json" } }'

Verify the state with the sql SHOW wal_level if the result is logical.

Pay also attention to set in the Debezium Connector the plugin.name=wal2json. Also the DB user/role needs the permissions to interact with the relevant tables.

NOT for production - simplified workaround
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO repl;
  • In case multiple connectors using the same databse, use different replication slot names (slot.name)

  • Replication has a hugh impact on storage and performance. Monitor and increase the resources accordingly

tbd

This article and project are licensed under the Apache License, Version 2. Separate third-party code objects invoked within this code pattern are licensed by their respective providers pursuant to their own separate licenses. Contributions are subject to the Developer Certificate of Origin, Version 1.1 and the Apache License, Version 2.

See also Apache License FAQ .