Skip to content

Commit

Permalink
DBZ-4812 Add customization config options
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed Jun 5, 2024
1 parent 6bb6c48 commit a05714b
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 94 deletions.
165 changes: 83 additions & 82 deletions src/main/java/io/debezium/connector/db2/Db2Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,13 @@ public class Db2Connection extends JdbcConnection {

private static Logger LOGGER = LoggerFactory.getLogger(Db2Connection.class);

private final String CDC_SCHEMA;
private final String TABLE_CDC_SCHEMA;
private final String DB_TYPE;

private static final String STATEMENTS_PLACEHOLDER = "#";
private final String GET_MAX_LSN;

private static final String LOCK_TABLE = "SELECT * FROM # WITH CS"; // DB2

private static final String LSN_TO_TIMESTAMP = "SELECT CURRENT TIMEstamp FROM sysibm.sysdummy1 WHERE ? > X'00000000000000000000000000000000'";

private final String GET_MAX_LSN;
private final String GET_ALL_CHANGES_FOR_TABLE;
private final String GET_LIST_OF_CDC_ENABLED_TABLES;
private final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES;
Expand Down Expand Up @@ -92,91 +88,96 @@ public class Db2Connection extends JdbcConnection {

private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;

private final Db2ConnectorConfig connectorConfig;

/**
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
*/
public Db2Connection(JdbcConfiguration config) {
super(config, FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER);
public Db2Connection(Db2ConnectorConfig config) {
super(config.getJdbcConfig(), FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER);
connectorConfig = config;
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();
this.CDC_SCHEMA = config.getString("custom.cdc.program.schema");
this.TABLE_CDC_SCHEMA = config.getString("custom.cdc.table.schema");

this.DB_TYPE = config.getString("custom.db.type");

LOGGER.info("===========================================================================================================================================");
LOGGER.info("CDC_SCHEMA: {}", this.CDC_SCHEMA);
LOGGER.info("TABLE_CDC_SCHEMA: {}", this.TABLE_CDC_SCHEMA);
LOGGER.info("DB_TYPE: {}", this.DB_TYPE);
LOGGER.info("CDC_SCHEMA: {}", connectorConfig.getCdcControlSchema());
LOGGER.info("TABLE_CDC_SCHEMA: {}", connectorConfig.getCdcChangeTablesSchema());
LOGGER.info("DB_TYPE: {}", connectorConfig.getDb2Platform());
LOGGER.info("===========================================================================================================================================");

if ("ZOS".equals(this.DB_TYPE)) {
LOGGER.info("ZOS choice");

this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER) t for read only with ur";

this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " +
"ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM "
+ this.TABLE_CDC_SCHEMA + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " +
" order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " +
" tmp2 AS (SELECT " +
" CASE " +
" WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " +
" WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " +
" WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " +
" WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " +
" END " +
" OPCODE, " +
" cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " +
" FROM tmp cdc left JOIN tmp cdc2 " +
" ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " +
" ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " +
" OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " +
" select res.OPCODE, cdc.* from " + this.TABLE_CDC_SCHEMA
+ ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";

this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from "
+ this.CDC_SCHEMA
+ ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur";

this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " +
"WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur";

}
else {
LOGGER.info("LUW choice");
this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER) t";
this.GET_ALL_CHANGES_FOR_TABLE = "SELECT "
+ "CASE "
+ "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 "
+ "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 "
+ "WHEN IBMSNAP_OPERATION = 'D' THEN 1 "
+ "WHEN IBMSNAP_OPERATION = 'I' THEN 2 "
+ "END "
+ "OPCODE,"
+ "cdc.* "
+ "FROM " + this.TABLE_CDC_SCHEMA + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";
this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from "
+ CDC_SCHEMA
+ ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''";

// No new Tables 1=0
this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from " + this.CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " +
"WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)";
switch (connectorConfig.getDb2Platform()) {
case Z:
LOGGER.info("ZOS choice");

this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER) t for read only with ur";

this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " +
"ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM "
+ connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " +
" order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " +
" tmp2 AS (SELECT " +
" CASE " +
" WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " +
" WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " +
" WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " +
" WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " +
" END " +
" OPCODE, " +
" cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " +
" FROM tmp cdc left JOIN tmp cdc2 " +
" ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " +
" ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " +
" OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " +
" select res.OPCODE, cdc.* from " + connectorConfig.getCdcChangeTablesSchema()
+ ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";

this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from "
+ connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur";

this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " +
"WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur";
break;
case LUW:
LOGGER.info("LUW choice");
this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t";
this.GET_ALL_CHANGES_FOR_TABLE = "SELECT "
+ "CASE "
+ "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 "
+ "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 "
+ "WHEN IBMSNAP_OPERATION = 'D' THEN 1 "
+ "WHEN IBMSNAP_OPERATION = 'I' THEN 2 "
+ "END "
+ "OPCODE,"
+ "cdc.* "
+ "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";
this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from "
+ connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''";

// No new Tables 1=0
this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from " + connectorConfig.getCdcControlSchema()
+ ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " +
"WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)";
break;
default:
throw new DebeziumException("Unsupported platform");
}
}

Expand Down Expand Up @@ -353,7 +354,7 @@ public Set<Db2ChangeTable> listOfChangeTables() throws SQLException {
rs.getInt(9),
Lsn.valueOf(rs.getBytes(5)),
Lsn.valueOf(rs.getBytes(6)),
this.TABLE_CDC_SCHEMA
connectorConfig.getCdcChangeTablesSchema()

));
}
Expand All @@ -377,7 +378,7 @@ public Set<Db2ChangeTable> listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws
rs.getInt(1),
Lsn.valueOf(rs.getBytes(3)),
Lsn.valueOf(rs.getBytes(4)),
this.TABLE_CDC_SCHEMA));
connectorConfig.getCdcChangeTablesSchema()));
}
return changeTables;
});
Expand Down Expand Up @@ -631,7 +632,7 @@ public boolean validateLogPosition(Partition partition, OffsetContext offset, Co

final Lsn storedLsn = ((Db2OffsetContext) offset).getChangePosition().getCommitLsn();

String oldestFirstChangeQuery = String.format("SELECT min(RESTART_SEQ) FROM %s.IBMSNAP_CAPMON;", CDC_SCHEMA);
String oldestFirstChangeQuery = String.format("SELECT min(RESTART_SEQ) FROM %s.IBMSNAP_CAPMON;", connectorConfig.getCdcControlSchema());

try {
final String oldestScn = singleOptionalValue(oldestFirstChangeQuery, rs -> rs.getString(1));
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/debezium/connector/db2/Db2Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
// Try to connect to the database ...
Db2ConnectorConfig connectorConfig = new Db2ConnectorConfig(config);
try (Db2Connection connection = new Db2Connection(connectorConfig.getJdbcConfig())) {
try (Db2Connection connection = new Db2Connection(connectorConfig)) {
try {
connection.connect();
connection.execute("SELECT 1 FROM sysibm.sysdummy1");
Expand All @@ -102,7 +102,7 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
@Override
public List<TableId> getMatchingCollections(Configuration config) {
Db2ConnectorConfig connectorConfig = new Db2ConnectorConfig(config);
try (Db2Connection connection = new Db2Connection(connectorConfig.getJdbcConfig())) {
try (Db2Connection connection = new Db2Connection(connectorConfig)) {
return new ArrayList<>(
connection.readTableNames(null, null, null, new String[]{ "TABLE" }));
}
Expand Down
Loading

0 comments on commit a05714b

Please sign in to comment.