Skip to content

Commit

Permalink
DBZ-4812 Introduce platform adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed Jun 5, 2024
1 parent a05714b commit c9e74f7
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 93 deletions.
100 changes: 9 additions & 91 deletions src/main/java/io/debezium/connector/db2/Db2Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.db2.platform.Db2PlatformAdapter;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
Expand Down Expand Up @@ -58,11 +59,6 @@ public class Db2Connection extends JdbcConnection {

private static final String LSN_TO_TIMESTAMP = "SELECT CURRENT TIMEstamp FROM sysibm.sysdummy1 WHERE ? > X'00000000000000000000000000000000'";

private final String GET_MAX_LSN;
private final String GET_ALL_CHANGES_FOR_TABLE;
private final String GET_LIST_OF_CDC_ENABLED_TABLES;
private final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES;

private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT "
+ "CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) as objectid, "
+ "c.colname,c.colno,c.keyseq "
Expand All @@ -89,6 +85,7 @@ public class Db2Connection extends JdbcConnection {
private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;

private final Db2ConnectorConfig connectorConfig;
private final Db2PlatformAdapter platform;

/**
* Creates a new connection using the supplied configuration.
Expand All @@ -97,95 +94,18 @@ public class Db2Connection extends JdbcConnection {
*/
public Db2Connection(Db2ConnectorConfig config) {
super(config.getJdbcConfig(), FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER);

connectorConfig = config;
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();

LOGGER.info("===========================================================================================================================================");
LOGGER.info("CDC_SCHEMA: {}", connectorConfig.getCdcControlSchema());
LOGGER.info("TABLE_CDC_SCHEMA: {}", connectorConfig.getCdcChangeTablesSchema());
LOGGER.info("DB_TYPE: {}", connectorConfig.getDb2Platform());
LOGGER.info("===========================================================================================================================================");

switch (connectorConfig.getDb2Platform()) {
case Z:
LOGGER.info("ZOS choice");

this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER) t for read only with ur";

this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " +
"ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM "
+ connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " +
" order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " +
" tmp2 AS (SELECT " +
" CASE " +
" WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " +
" WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " +
" WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " +
" WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " +
" END " +
" OPCODE, " +
" cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " +
" FROM tmp cdc left JOIN tmp cdc2 " +
" ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " +
" ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " +
" OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " +
" select res.OPCODE, cdc.* from " + connectorConfig.getCdcChangeTablesSchema()
+ ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";

this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from "
+ connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur";

this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " +
"WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur";
break;
case LUW:
LOGGER.info("LUW choice");
this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t";
this.GET_ALL_CHANGES_FOR_TABLE = "SELECT "
+ "CASE "
+ "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 "
+ "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 "
+ "WHEN IBMSNAP_OPERATION = 'D' THEN 1 "
+ "WHEN IBMSNAP_OPERATION = 'I' THEN 2 "
+ "END "
+ "OPCODE,"
+ "cdc.* "
+ "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";
this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from "
+ connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''";

// No new Tables 1=0
this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " +
"WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)";
break;
default:
throw new DebeziumException("Unsupported platform");
}
platform = connectorConfig.getDb2Platform().createAdapter(connectorConfig);
}

/**
* @return the current largest log sequence number
*/
public Lsn getMaxLsn() throws SQLException {
return queryAndMap(GET_MAX_LSN, singleResultMapper(rs -> {
return queryAndMap(platform.getMaxLsnQuery(), singleResultMapper(rs -> {
final Lsn ret = Lsn.valueOf(rs.getBytes(1));
LOGGER.trace("Current maximum lsn is {}", ret);
return ret;
Expand All @@ -202,7 +122,7 @@ public Lsn getMaxLsn() throws SQLException {
* @throws SQLException
*/
public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException {
final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId));
final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId));
prepareQuery(query, statement -> {
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, toLsn.getBinary());
Expand All @@ -226,7 +146,7 @@ public void getChangesForTables(Db2ChangeTable[] changeTables, Lsn intervalFromL

int idx = 0;
for (Db2ChangeTable changeTable : changeTables) {
final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
Expand Down Expand Up @@ -331,9 +251,8 @@ public Lsn getFromLsn() {
}

public Set<Db2ChangeTable> listOfChangeTables() throws SQLException {
final String query = GET_LIST_OF_CDC_ENABLED_TABLES;

return queryAndMap(query, rs -> {
return queryAndMap(platform.getListOfCdcEnabledTablesQuery(), rs -> {
final Set<Db2ChangeTable> changeTables = new HashSet<>();
while (rs.next()) {
/**
Expand Down Expand Up @@ -363,9 +282,8 @@ public Set<Db2ChangeTable> listOfChangeTables() throws SQLException {
}

public Set<Db2ChangeTable> listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws SQLException {
final String query = GET_LIST_OF_NEW_CDC_ENABLED_TABLES;

return prepareQueryAndMap(query,
return prepareQueryAndMap(platform.getListOfNewCdcEnabledTablesQuery(),
ps -> {
ps.setBytes(1, fromLsn.getBinary());
ps.setBytes(2, toLsn.getBinary());
Expand Down
31 changes: 29 additions & 2 deletions src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.debezium.config.Field;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.db2.platform.Db2PlatformAdapter;
import io.debezium.connector.db2.platform.LuwPlatform;
import io.debezium.connector.db2.platform.ZOsPlatform;
import io.debezium.document.Document;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.relational.ColumnFilterMode;
Expand Down Expand Up @@ -303,12 +306,32 @@ public enum Db2Platform implements EnumeratedValue {
/**
* Linux, Unix, Windows
*/
LUW("LUW"),
LUW("LUW") {
@Override
public Db2PlatformAdapter createAdapter(Db2ConnectorConfig config) {
return new LuwPlatform(config);
}

@Override
public String platfromName() {
return "LUW";
}
},

/**
* z/OS
*/
Z("ZOS");
Z("ZOS") {
@Override
public Db2PlatformAdapter createAdapter(Db2ConnectorConfig config) {
return new ZOsPlatform(config);
}

@Override
public String platfromName() {
return "z/OS";
}
};

private final String value;

Expand All @@ -321,6 +344,10 @@ public String getValue() {
return value;
}

public abstract Db2PlatformAdapter createAdapter(Db2ConnectorConfig config);

public abstract String platfromName();

/**
* Determine if the supplied value is one of the predefined options.
*
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public ChangeEventSourceCoordinator<Db2Partition, Db2OffsetContext> start(Config
final TopicNamingStrategy<TableId> topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster();

LOGGER.info("Using Db2 {} platfrom, CDC control schema is {}, schema with change tables is {}",
connectorConfig.getDb2Platform(), connectorConfig.getCdcControlSchema(),
connectorConfig.getCdcChangeTablesSchema());

MainConnectionProvidingConnectionFactory<Db2Connection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
() -> new Db2Connection(connectorConfig));
dataConnection = connectionFactory.mainConnection();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2.platform;

/**
* Implementation details differing between Db2 flavours
*
* @author Jiri Pechanec
*/
public interface Db2PlatformAdapter {

String getMaxLsnQuery();

String getAllChangesForTableQuery();

String getListOfCdcEnabledTablesQuery();

String getListOfNewCdcEnabledTablesQuery();

}
72 changes: 72 additions & 0 deletions src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.db2.platform;

import io.debezium.connector.db2.Db2ConnectorConfig;

/**
* Implementation details for LUW Platform (Linux, Unix, Windows)
*
* @author Jiri Pechanec
*/
public class LuwPlatform implements Db2PlatformAdapter {

private final String getMaxLsn;
private final String getAllChangesForTable;
private final String getListOfCdcEnabledTables;
private final String getListOfNewCdcEnabledTables;

public LuwPlatform(Db2ConnectorConfig connectorConfig) {

this.getMaxLsn = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t";

this.getAllChangesForTable = "SELECT "
+ "CASE "
+ "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 "
+ "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 "
+ "WHEN IBMSNAP_OPERATION = 'D' THEN 1 "
+ "WHEN IBMSNAP_OPERATION = 'I' THEN 2 "
+ "END "
+ "OPCODE,"
+ "cdc.* "
+ "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";

this.getListOfCdcEnabledTables = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from "
+ connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''";

// No new Tables 1=0
this.getListOfNewCdcEnabledTables = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " +
"WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)";
}

@Override
public String getMaxLsnQuery() {
return getMaxLsn;
}

@Override
public String getAllChangesForTableQuery() {
return getAllChangesForTable;
}

@Override
public String getListOfCdcEnabledTablesQuery() {
return getListOfCdcEnabledTables;
}

@Override
public String getListOfNewCdcEnabledTablesQuery() {
return getListOfNewCdcEnabledTables;
}
}
Loading

0 comments on commit c9e74f7

Please sign in to comment.