-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path07-camel-sql-connector-sink.yaml
30 lines (30 loc) · 1.88 KB
/
07-camel-sql-connector-sink.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: camelsqlconnector
labels:
strimzi.io/cluster: kafka-connect-jdbc
spec:
class: org.apache.camel.kafkaconnector.sql.CamelSqlSinkConnector
tasksMax: 1
config:
topics: inventory.inventory.orders
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: true
transforms.unwrap.delete.handling.mode: "rewrite"
transforms.unwrap.add.fields: "table,lsn,op"
behavior.on.null.values: "delete"
delete.enabled: true
auto.create: true
insert.mode: "upsert"
pk.fields: "id"
pk.mode: "record_key"
database.history.kafka.topic: inventory.inventory.ordershistory
camel.component.sql.dataSource.user: "sa"
camel.component.sql.dataSource.password: "change-mssql-pwd"
camel.component.sql.dataSource.serverName: "mssql-service.appdev-kafka.svc"
camel.component.sql.dataSource.trustServerCertificate: true
camel.component.sql.dataSource: "#class:com.microsoft.sqlserver.jdbc.SQLServerDataSource"
camel.sink.path.query: " MERGE INTO dbo.order1 WITH (HOLDLOCK) AS target USING (SELECT cast(isnull(:#__deleted,'false') as varchar) AS deleted, cast(isnull(:#id,1) as int) AS id, cast(:#order_date as int) as order_date, cast(:#purchaser as int) as purcharser, cast(:#quantity as int) as quantity, cast(:#product_id as int) as product_id ) AS source (deleted, id,order_date, purchaser, quantity,product_id) ON (target.id = source.id ) WHEN MATCHED AND deleted = 'true' THEN DELETE WHEN MATCHED THEN UPDATE SET purchaser = :#purchaser,order_date = :#order_date,quantity = :#quantity ,product_id = :#product_id WHEN NOT MATCHED THEN INSERT (id, order_date, purchaser, quantity, product_id) VALUES (:#id, :#order_date, :#purchaser, :#quantity , :#product_id); "
camel.component.sql.dataSource.portNumber: 31433