DbSink is a Kafka Sink Connector that provides a sink implementation for streaming changes emitted by Debezium
1.100% compatible with debezium and able to process data change event, schema change event and transaction event without requiring any additional transform like 'ExtractNewRecordState'.
2.Supports both transaction-based and table-based parallel applying.
3.Supports for a variety of database dialect applying. Currently only Oracle to Postgres and MySQL to Postgres are supported, more database dialects are being developed and will be released in the near future.
4.Supports executing insert, update, and delete based on operation from the debezium event record without requiring additional configuration like 'insert mode'.
5.Support automatic switching to upsert mode to apply insert events when there are duplicate key errors.
6.Supports most data types and able to use the right way to process the data type from the source event based on the definition of the target column.
For example, the time data type of the MySQL can be inserted into either the time data type or the interval data type in the postgres database.
7.Support configuration of target table and column. By default, the connector uses the source table and column name as the target ones.
connector.class
To use this connector, specify the name of the connector class in the connector.class
configuration property.
"connector.class": "io.dbsink.connector.sink.DbSinkConnector"
JDBC connection user.
- Type: String
- Default: null
- Importantce: hight
JDBC connection password.
- Type: String
- Default: null
- Importantce: hight
JDBC connection URL. For example: jdbc:postgresql://localhost:5432/migration"
, jdbc:mysql://localhost/db_name
- Type: String
- Default: null
- Importantce: high
JDBC driver class. For example: org.postgresql.Driver
, com.mysql.cj.jdbc.Driver
,com.mysql.jdbc.Driver
- Type: String
- Default: null
- Importantce: low
The maximum number of retries to call JDBC interface when SQLException hanppes. The value must be a positive integer.
- Type: int
- Default: 5
- Importantce: low
The backoff time in milliseconds between JDBC retries.
- Type: long
- Default: 3000
- Importantce: low
The Maximum number of threads to apply.
- Type: int
- Default: cpu cores * 1.4
- Importantce: high
Whether to apply transactionally. Requires debezium configuration properties provide.transaction.metadata
to be true and all the incoming events with the same topic name. (use transforms)
- Type: boolean
- Default: false
- Importantce: high
Specifies how many transction to cache in the buffer.
- Type: int
- Default: 50
- Importantce: high
Specifies how many transctions that an applier worker can cache in its buffer.
- Type: int
- Default: 100
- Importantce: high
Specifies the fully-qualified class name of a TableNamingStrategy
implementation that the connector uses to resolve table names from incoming event s. DefaultTableNamingStrategy,LowCaseTableNamingStrategy,UpperCaseTableNamingStrategy are availables.
- Type: class
- Default: io.dbsink.connector.sink.naming.DefaultTableNamingStrategy
- Importantce: high
Specifies the fully-qualified class name of a ColumnNamingStrategy
implementation that the connector uses to resolve column names from incoming events. DefaultColumNamingStrategy,LowCaseColumNamingStrategy,UpperCaseColumNamingStrategy are availables.
- Type: class
- Default: io.dbsink.connector.sink.naming.DefaultColumNamingStrategy
- Importantce: high
jdk version: >=11
maven: >=3.0
mvn clean package -Dmaven.test.skip=true
note: The dependencies like jdbc drivers are not to be packaged into the jar, you need to manually put them to in the plugin path of kafka connect.
mysql | postgres | description |
---|---|---|
float(n) | float(n) | Float in postgres is a standard type while mysql is not, so this may be errors |
float(n,p) | float(n) or decimal(n,p) | No fully equivalent data type, so may be errors |
double(n,p) | double precision or decimal(n,p) | No fully equivalent data type, so may be errors |
double | double precision | Double in postgres is a standard type while mysql is not, so may be errors |
decimal(n,p) | decimal(n,p) | |
bigint | bigint | |
mediumint | int | |
int | int | |
smallint | smallint | |
tinyint | smallint | |
timestamp(n) | timestamp(n) with time zone | Ensure that MySQL and Postgres time zones are the same, in which case no error. |
datetime(n) | timestamp(n) without time zone | |
time(n) | time(n) or interval day to second | 0-23:59:59.999999: time(n) ---> time(n). otherwise time(n) --> interval day to second |
year | smallint or interval year | |
bit(n) | bit(m) m>=n | |
bit(1) | bit(1) or bool | |
tinyint(1) | bool | |
binary(n) | bytea | |
blob | bytea | |
tinyblob | bytea | |
mediumblob | bytea | |
longblob | bytea | |
char(n) | char(n) | |
varchar(n) | varchar(n) | |
tinytext | text | |
text | text | |
mediumtext | text | |
json | json |
oracle | postgres | description |
---|---|---|
number(n,p) | numeric(n,p) | |
binary_float | float | |
binary_double | double precision | |
integer | int | |
char(n) | char(n) | |
nvarchar2(n) | varchar(n) | |
varchar2(n) | varchar(n) | |
clob | text | |
nclob | text | |
blob | bytea | |
raw | bytea | |
long raw | bytea | |
long | bytea | |
date | timestamp(0) without time zone | |
timestamp(n) | timestamp(n) without time zone | Numbers exceeding 6 digits of the Decimal separator will be truncated |
timestamp(n) with time zone | timestamp(n) with time zone | Numbers exceeding 6 digits of the Decimal separator will be truncated |
timestamp(n) with local time zone | timestamp(n) with time zone | Numbers exceeding 6 digits of the Decimal separator will be truncated |
interval year to month | interval year to month | |
interval day to month | interval day to month |
Here are some examples to introduce the usage method and its application scenarios
A trading system is preparing to migrate from MySQL database to Postgres database. In order to verify whether the Postgres database can meet business needs, it is necessary to establish real-time stream replication between MySQL and Postgres, and switch the read-only business of the system to the Postgres database to verify whether the business system can run normally. Now follow these steps to implement it.
You need to manually or use third-party tools to convert table schema in mysql to Postgres. The mapping of data types can refer to the [MySQL to Postgres](MySQL to Postgres)
Example:
Mysql:
create database shop;
create table shop.user(id int auto_increment primary key, name varchar(50), create_time timestamp(6))
Postgres:
create schema shop;
create table shop.user(id serial primary key, name varchar(50), create_time timestamp(6))
note: the database in mysql will be converted to database in postgres.
You need download zookeeper,kafka, jre(>=jre11) to create a running environment for the Debezium connector and DbSink connector. There are many such documents online, so we won't go into detail here.
Write the configuration json for the debezium connector. Here a template is provided directly.
{
"name": "mysql_debezium_source_connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.allowPublicKeyRetrieval": true,
"database.user": "wangwei",
"database.server.id": "1000",
"tasks.max": 1,
"database.include.list": "shop",
"provide.transaction.metadata": true,
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"database.port": 3307,
"tombstones.on.delete": false,
"topic.prefix": "olap_migration",
"schema.history.internal.kafka.topic": "olap_migration_schema_history",
"database.hostname": "192.168.142.129",
"database.password": "19961028",
"snapshot.mode": "initial",
"snapshot.max.threads": 10,
"heartbeat.interval.ms": 10000,
"transforms":"Reroute",
"transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex":"(.*)",
"transforms.Reroute.topic.replacement":"all_topics"
}
}
Here are some special configuration items that you need to notice.
1.snapshot.mode
initial means to capture both snapshot and incremental data.
2.provide.transaction.metadata
In this case, we need get transaction metadata from the source side in order to apply data in transaction, so this configuration item must be set to true
3.transforms
Here we configures the 'ByLogicalTableRouter' transformer to convert all the topics of source records to the same topic('all_topics'). By default, the topic of a record produced by debezium is related to the table identifier. In this way, the order of records consumed by the sink connector may be different from the order produced by the source connector, because events in different tables will have different topics, only records within the same topic can be ordered
After configuring json, send a post request to kafka Connect to create a connector
Write the configuration json for the DbSink connector. Here a template is provided directly.
{
"name": "DbSinkConnector",
"config": {
"connector.class": "io.dbsink.connector.sink.DbSinkConnector",
"jdbc.password": "wangwei123",
"jdbc.username": "wangwei",
"tasks.max": 1,
"topics": "all_topics",
"jdbc.url": "jdbc:postgresql://localhost:5432/migration",
"database.dialect.name": "PostgreSqlDialect",
"jdbc.driver.class": "org.postgresql.Driver",
"jdbc.retries.max": 5,
"jdbc.backoff.ms": 6000,
"applier.parallel.max": 50,
"applier.transaction.enabled": "true",
"applier.transaction.buffer.size": 10000,
"applier.worker.buffer.size": 100,
"table.naming.strategy": "io.dbsink.connector.sink.naming.DefaultTableNamingStrategy",
"column.naming.strategy": "io.dbsink.connector.sink.naming.DefaultColumnNamingStrategy"
}
}
Here are some special configuration items that you need to notice.
1.apply.transaction.enabled
It specifies whether to apply in a transactional manner. in this case it's true
2.apply.transaction.buffer.size
it specifies the maximum number of cached transactions, the larger this value, the larger the heap memory consumes.
3.applier.worker.buffer.size
it specifies the size of the buffer in applier worker, which indicates the maximum number of transaction which a applier worker can have.
4.applier.parallel.max
it specifies the number of threads to apply parallel
5.table.naming.strategy
It specifies how to resolve the table name from source records, four strategies are provided:
1.DefaultTableNamingStrategy
"Table1"--> "Table1"
2.LowCaseTableNamingStrategy
"Table1"--> "table1"
3.UpperCaseTableNamingStrategy.
"table"--> "TABLE1"
According to step1, DefaultTableNamingStrategy is used.
6.column.naming.strategy
It specifies how to resolve the column name from source records, four strategies are provided:
1.DefaultColumnNamingStrategy
"Col1" ---> "Col1"
2.LowCaseColumnNamingStrategy
"COL1" ---> "col1"
3.UpperCaseColumnNamingStrategy.
"col1" ---> "COL1"
According to step1, DefaultColumnNamingStrategy is used.
After configuring json, send a post request to kafka Connect to create a connector