diff --git a/src/main/java/io/debezium/connector/db2/Db2Connection.java b/src/main/java/io/debezium/connector/db2/Db2Connection.java index 2605390..95220ab 100644 --- a/src/main/java/io/debezium/connector/db2/Db2Connection.java +++ b/src/main/java/io/debezium/connector/db2/Db2Connection.java @@ -30,6 +30,7 @@ import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.connector.db2.platform.Db2PlatformAdapter; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.spi.OffsetContext; @@ -58,11 +59,6 @@ public class Db2Connection extends JdbcConnection { private static final String LSN_TO_TIMESTAMP = "SELECT CURRENT TIMEstamp FROM sysibm.sysdummy1 WHERE ? > X'00000000000000000000000000000000'"; - private final String GET_MAX_LSN; - private final String GET_ALL_CHANGES_FOR_TABLE; - private final String GET_LIST_OF_CDC_ENABLED_TABLES; - private final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES; - private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT " + "CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) as objectid, " + "c.colname,c.colno,c.keyseq " @@ -89,6 +85,7 @@ public class Db2Connection extends JdbcConnection { private final BoundedConcurrentHashMap lsnToInstantCache; private final Db2ConnectorConfig connectorConfig; + private final Db2PlatformAdapter platform; /** * Creates a new connection using the supplied configuration. @@ -97,95 +94,18 @@ public class Db2Connection extends JdbcConnection { */ public Db2Connection(Db2ConnectorConfig config) { super(config.getJdbcConfig(), FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER); + connectorConfig = config; lsnToInstantCache = new BoundedConcurrentHashMap<>(100); realDatabaseName = retrieveRealDatabaseName(); - - LOGGER.info("==========================================================================================================================================="); - LOGGER.info("CDC_SCHEMA: {}", connectorConfig.getCdcControlSchema()); - LOGGER.info("TABLE_CDC_SCHEMA: {}", connectorConfig.getCdcChangeTablesSchema()); - LOGGER.info("DB_TYPE: {}", connectorConfig.getDb2Platform()); - LOGGER.info("==========================================================================================================================================="); - - switch (connectorConfig.getDb2Platform()) { - case Z: - LOGGER.info("ZOS choice"); - - this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER) t for read only with ur"; - - this.GET_ALL_CHANGES_FOR_TABLE = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " + - "ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM " - + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " + - " order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " + - " tmp2 AS (SELECT " + - " CASE " + - " WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " + - " WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " + - " WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " + - " WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " + - " END " + - " OPCODE, " + - " cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " + - " FROM tmp cdc left JOIN tmp cdc2 " + - " ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " + - " ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " + - " OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " + - " select res.OPCODE, cdc.* from " + connectorConfig.getCdcChangeTablesSchema() - + ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ " - + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; - - this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from " - + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur"; - - this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " + - " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + - " CD_NEW_SYNCHPOINT, " + - " CD_OLD_SYNCHPOINT " + - "from " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " + - "WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur"; - break; - case LUW: - LOGGER.info("LUW choice"); - this.GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t"; - this.GET_ALL_CHANGES_FOR_TABLE = "SELECT " - + "CASE " - + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " - + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " - + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " - + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " - + "END " - + "OPCODE," - + "cdc.* " - + "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " - + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; - this.GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " - + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; - - // No new Tables 1=0 - this.GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + - " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + - " CD_NEW_SYNCHPOINT, " + - " CD_OLD_SYNCHPOINT " + - "from " + connectorConfig.getCdcControlSchema() - + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + - "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; - break; - default: - throw new DebeziumException("Unsupported platform"); - } + platform = connectorConfig.getDb2Platform().createAdapter(connectorConfig); } /** * @return the current largest log sequence number */ public Lsn getMaxLsn() throws SQLException { - return queryAndMap(GET_MAX_LSN, singleResultMapper(rs -> { + return queryAndMap(platform.getMaxLsnQuery(), singleResultMapper(rs -> { final Lsn ret = Lsn.valueOf(rs.getBytes(1)); LOGGER.trace("Current maximum lsn is {}", ret); return ret; @@ -202,7 +122,7 @@ public Lsn getMaxLsn() throws SQLException { * @throws SQLException */ public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException { - final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId)); + final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId)); prepareQuery(query, statement -> { statement.setBytes(1, fromLsn.getBinary()); statement.setBytes(2, toLsn.getBinary()); @@ -226,7 +146,7 @@ public void getChangesForTables(Db2ChangeTable[] changeTables, Lsn intervalFromL int idx = 0; for (Db2ChangeTable changeTable : changeTables) { - final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance()); + final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance()); queries[idx] = query; // If the table was added in the middle of queried buffer we need // to adjust from to the first LSN available @@ -331,9 +251,8 @@ public Lsn getFromLsn() { } public Set listOfChangeTables() throws SQLException { - final String query = GET_LIST_OF_CDC_ENABLED_TABLES; - return queryAndMap(query, rs -> { + return queryAndMap(platform.getListOfCdcEnabledTablesQuery(), rs -> { final Set changeTables = new HashSet<>(); while (rs.next()) { /** @@ -363,9 +282,8 @@ public Set listOfChangeTables() throws SQLException { } public Set listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws SQLException { - final String query = GET_LIST_OF_NEW_CDC_ENABLED_TABLES; - return prepareQueryAndMap(query, + return prepareQueryAndMap(platform.getListOfNewCdcEnabledTablesQuery(), ps -> { ps.setBytes(1, fromLsn.getBinary()); ps.setBytes(2, toLsn.getBinary()); diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java index e8a816a..166d157 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorConfig.java @@ -22,6 +22,9 @@ import io.debezium.config.Field; import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.SourceInfoStructMaker; +import io.debezium.connector.db2.platform.Db2PlatformAdapter; +import io.debezium.connector.db2.platform.LuwPlatform; +import io.debezium.connector.db2.platform.ZOsPlatform; import io.debezium.document.Document; import io.debezium.heartbeat.DatabaseHeartbeatImpl; import io.debezium.relational.ColumnFilterMode; @@ -303,12 +306,32 @@ public enum Db2Platform implements EnumeratedValue { /** * Linux, Unix, Windows */ - LUW("LUW"), + LUW("LUW") { + @Override + public Db2PlatformAdapter createAdapter(Db2ConnectorConfig config) { + return new LuwPlatform(config); + } + + @Override + public String platfromName() { + return "LUW"; + } + }, /** * z/OS */ - Z("ZOS"); + Z("ZOS") { + @Override + public Db2PlatformAdapter createAdapter(Db2ConnectorConfig config) { + return new ZOsPlatform(config); + } + + @Override + public String platfromName() { + return "z/OS"; + } + }; private final String value; @@ -321,6 +344,10 @@ public String getValue() { return value; } + public abstract Db2PlatformAdapter createAdapter(Db2ConnectorConfig config); + + public abstract String platfromName(); + /** * Determine if the supplied value is one of the predefined options. * diff --git a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java index 55103f4..05e3ff6 100644 --- a/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java +++ b/src/main/java/io/debezium/connector/db2/Db2ConnectorTask.java @@ -69,6 +69,10 @@ public ChangeEventSourceCoordinator start(Config final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY); final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster(); + LOGGER.info("Using Db2 {} platfrom, CDC control schema is {}, schema with change tables is {}", + connectorConfig.getDb2Platform(), connectorConfig.getCdcControlSchema(), + connectorConfig.getCdcChangeTablesSchema()); + MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( () -> new Db2Connection(connectorConfig)); dataConnection = connectionFactory.mainConnection(); diff --git a/src/main/java/io/debezium/connector/db2/platform/Db2PlatformAdapter.java b/src/main/java/io/debezium/connector/db2/platform/Db2PlatformAdapter.java new file mode 100644 index 0000000..ec3c67a --- /dev/null +++ b/src/main/java/io/debezium/connector/db2/platform/Db2PlatformAdapter.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.db2.platform; + +/** + * Implementation details differing between Db2 flavours + * + * @author Jiri Pechanec + */ +public interface Db2PlatformAdapter { + + String getMaxLsnQuery(); + + String getAllChangesForTableQuery(); + + String getListOfCdcEnabledTablesQuery(); + + String getListOfNewCdcEnabledTablesQuery(); + +} diff --git a/src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java b/src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java new file mode 100644 index 0000000..78fa530 --- /dev/null +++ b/src/main/java/io/debezium/connector/db2/platform/LuwPlatform.java @@ -0,0 +1,72 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.db2.platform; + +import io.debezium.connector.db2.Db2ConnectorConfig; + +/** + * Implementation details for LUW Platform (Linux, Unix, Windows) + * + * @author Jiri Pechanec + */ +public class LuwPlatform implements Db2PlatformAdapter { + + private final String getMaxLsn; + private final String getAllChangesForTable; + private final String getListOfCdcEnabledTables; + private final String getListOfNewCdcEnabledTables; + + public LuwPlatform(Db2ConnectorConfig connectorConfig) { + + this.getMaxLsn = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + ".IBMSNAP_REGISTER) t"; + + this.getAllChangesForTable = "SELECT " + + "CASE " + + "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 " + + "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 " + + "WHEN IBMSNAP_OPERATION = 'D' THEN 1 " + + "WHEN IBMSNAP_OPERATION = 'I' THEN 2 " + + "END " + + "OPCODE," + + "cdc.* " + + "FROM " + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + + this.getListOfCdcEnabledTables = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from " + + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''"; + + // No new Tables 1=0 + this.getListOfNewCdcEnabledTables = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " + + "WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)"; + } + + @Override + public String getMaxLsnQuery() { + return getMaxLsn; + } + + @Override + public String getAllChangesForTableQuery() { + return getAllChangesForTable; + } + + @Override + public String getListOfCdcEnabledTablesQuery() { + return getListOfCdcEnabledTables; + } + + @Override + public String getListOfNewCdcEnabledTablesQuery() { + return getListOfNewCdcEnabledTables; + } +} diff --git a/src/main/java/io/debezium/connector/db2/platform/ZOsPlatform.java b/src/main/java/io/debezium/connector/db2/platform/ZOsPlatform.java new file mode 100644 index 0000000..e0d1f47 --- /dev/null +++ b/src/main/java/io/debezium/connector/db2/platform/ZOsPlatform.java @@ -0,0 +1,81 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.db2.platform; + +import io.debezium.connector.db2.Db2ConnectorConfig; + +/** + * Implementation details for z/OS + * + * @author Jiri Pechanec + */ +public class ZOsPlatform implements Db2PlatformAdapter { + + private final String getMaxLsn; + private final String getAllChangesForTable; + private final String getListOfCdcEnabledTables; + private final String getListOfNewCdcEnabledTables; + + public ZOsPlatform(Db2ConnectorConfig connectorConfig) { + + this.getMaxLsn = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER) t for read only with ur"; + + this.getAllChangesForTable = "WITH tmp AS (SELECT cdc.IBMSNAP_OPERATION, cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, " + + "ROW_NUMBER() OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ) rn FROM " + + connectorConfig.getCdcChangeTablesSchema() + ".# cdc WHERE cdc.IBMSNAP_COMMITSEQ >= ? AND cdc.IBMSNAP_COMMITSEQ <= ? " + + " order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ), " + + " tmp2 AS (SELECT " + + " CASE " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' AND cdc2.IBMSNAP_OPERATION ='I' THEN 3 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' AND cdc2.IBMSNAP_OPERATION ='D' THEN 4 " + + " WHEN cdc.IBMSNAP_OPERATION = 'D' THEN 1 " + + " WHEN cdc.IBMSNAP_OPERATION = 'I' THEN 2 " + + " END " + + " OPCODE, " + + " cdc.IBMSNAP_COMMITSEQ, cdc.IBMSNAP_INTENTSEQ, cdc.IBMSNAP_OPERATION " + + " FROM tmp cdc left JOIN tmp cdc2 " + + " ON cdc.IBMSNAP_COMMITSEQ = cdc2.IBMSNAP_COMMITSEQ AND " + + " ((cdc.IBMSNAP_OPERATION = 'D' AND cdc.rn = cdc2.rn - 1) " + + " OR (cdc.IBMSNAP_OPERATION = 'I' AND cdc.rn = cdc2.rn + 1))) " + + " select res.OPCODE, cdc.* from " + connectorConfig.getCdcChangeTablesSchema() + + ".# cdc inner join tmp2 res on cdc.IBMSNAP_COMMITSEQ=res.IBMSNAP_COMMITSEQ and cdc.IBMSNAP_INTENTSEQ=res.IBMSNAP_INTENTSEQ " + + "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ"; + + this.getListOfCdcEnabledTables = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.DBID, t.OBID , CAST((t.DBID * 65536 + t.OBID )AS INTEGER )from " + + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME WHERE r.SOURCE_OWNER <> '' for read only with ur"; + + this.getListOfNewCdcEnabledTables = "select CAST((t.DBID * 65536 + t.OBID )AS INTEGER ) AS OBJECTID, " + + " CD_OWNER CONCAT '.' CONCAT CD_TABLE, " + + " CD_NEW_SYNCHPOINT, " + + " CD_OLD_SYNCHPOINT " + + "from " + connectorConfig.getCdcControlSchema() + + ".IBMSNAP_REGISTER r left JOIN SYSIBM.SYSTABLES t ON r.SOURCE_OWNER = t.CREATOR AND r.SOURCE_TABLE = t.NAME " + + "WHERE r.SOURCE_OWNER <> '' AND 1=0 AND CD_NEW_SYNCHPOINT > ? AND CD_OLD_SYNCHPOINT < ? for read only with ur"; + } + + @Override + public String getMaxLsnQuery() { + return getMaxLsn; + } + + @Override + public String getAllChangesForTableQuery() { + return getAllChangesForTable; + } + + @Override + public String getListOfCdcEnabledTablesQuery() { + return getListOfCdcEnabledTables; + } + + @Override + public String getListOfNewCdcEnabledTablesQuery() { + return getListOfNewCdcEnabledTables; + } +}