Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-4812 Incubating support for z/OS on Debezium 2.x #154

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/main/java/io/debezium/connector/db2/Db2ChangeTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
public class Db2ChangeTable extends ChangeTable {

private static final String CDC_SCHEMA = "ASNCDC";

/**
* A LSN from which the data in the change table are relevant
*/
Expand All @@ -35,15 +33,15 @@ public class Db2ChangeTable extends ChangeTable {
*/
private final String db2CaptureInstance;

public Db2ChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
super(captureInstance, sourceTableId, resolveChangeTableId(sourceTableId, captureInstance), changeTableObjectId);
public Db2ChangeTable(TableId sourceTableId, String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn, String tableCdcSchema) {
super(captureInstance, sourceTableId, resolveChangeTableId(sourceTableId, captureInstance, tableCdcSchema), changeTableObjectId);
this.startLsn = startLsn;
this.stopLsn = stopLsn;
this.db2CaptureInstance = Db2ObjectNameQuoter.quoteNameIfNecessary(captureInstance);
}

public Db2ChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn) {
this(null, captureInstance, changeTableObjectId, startLsn, stopLsn);
public Db2ChangeTable(String captureInstance, int changeTableObjectId, Lsn startLsn, Lsn stopLsn, String tableCdcSchema) {
this(null, captureInstance, changeTableObjectId, startLsn, stopLsn, tableCdcSchema);
}

public String getCaptureInstance() {
Expand All @@ -69,7 +67,7 @@ public String toString() {
+ getChangeTableObjectId() + ", stopLsn=" + stopLsn + "]";
}

private static TableId resolveChangeTableId(TableId sourceTableId, String captureInstance) {
return sourceTableId != null ? new TableId(sourceTableId.catalog(), CDC_SCHEMA, Db2ObjectNameQuoter.quoteNameIfNecessary(captureInstance)) : null;
private static TableId resolveChangeTableId(TableId sourceTableId, String captureInstance, String cdcSchema) {
return sourceTableId != null ? new TableId(sourceTableId.catalog(), cdcSchema, Db2ObjectNameQuoter.quoteNameIfNecessary(captureInstance)) : null;
}
}
58 changes: 19 additions & 39 deletions src/main/java/io/debezium/connector/db2/Db2Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.db2.platform.Db2PlatformAdapter;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
Expand All @@ -52,39 +53,12 @@ public class Db2Connection extends JdbcConnection {

private static Logger LOGGER = LoggerFactory.getLogger(Db2Connection.class);

private static final String CDC_SCHEMA = "ASNCDC";

private static final String STATEMENTS_PLACEHOLDER = "#";
private static final String GET_MAX_LSN = "SELECT max(t.SYNCHPOINT) FROM ( SELECT CD_NEW_SYNCHPOINT AS SYNCHPOINT FROM " + CDC_SCHEMA
+ ".IBMSNAP_REGISTER UNION ALL SELECT SYNCHPOINT AS SYNCHPOINT FROM " + CDC_SCHEMA + ".IBMSNAP_REGISTER) t";

private static final String LOCK_TABLE = "SELECT * FROM # WITH CS"; // DB2

private static final String LSN_TO_TIMESTAMP = "SELECT CURRENT TIMEstamp FROM sysibm.sysdummy1 WHERE ? > X'00000000000000000000000000000000'";

private static final String GET_ALL_CHANGES_FOR_TABLE = "SELECT "
+ "CASE "
+ "WHEN IBMSNAP_OPERATION = 'D' AND (LEAD(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='I' THEN 3 "
+ "WHEN IBMSNAP_OPERATION = 'I' AND (LAG(cdc.IBMSNAP_OPERATION,1,'X') OVER (PARTITION BY cdc.IBMSNAP_COMMITSEQ ORDER BY cdc.IBMSNAP_INTENTSEQ)) ='D' THEN 4 "
+ "WHEN IBMSNAP_OPERATION = 'D' THEN 1 "
+ "WHEN IBMSNAP_OPERATION = 'I' THEN 2 "
+ "END "
+ "OPCODE,"
+ "cdc.* "
+ "FROM ASNCDC.# cdc WHERE IBMSNAP_COMMITSEQ >= ? AND IBMSNAP_COMMITSEQ <= ? "
+ "order by IBMSNAP_COMMITSEQ, IBMSNAP_INTENTSEQ";

private static final String GET_LIST_OF_CDC_ENABLED_TABLES = "select r.SOURCE_OWNER, r.SOURCE_TABLE, r.CD_OWNER, r.CD_TABLE, r.CD_NEW_SYNCHPOINT, r.CD_OLD_SYNCHPOINT, t.TBSPACEID, t.TABLEID , CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER )from "
+ CDC_SCHEMA + ".IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME WHERE r.SOURCE_OWNER <> ''";

// No new Tabels 1=0
private static final String GET_LIST_OF_NEW_CDC_ENABLED_TABLES = "select CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) AS OBJECTID, " +
" CD_OWNER CONCAT '.' CONCAT CD_TABLE, " +
" CD_NEW_SYNCHPOINT, " +
" CD_OLD_SYNCHPOINT " +
"from ASNCDC.IBMSNAP_REGISTER r left JOIN SYSCAT.TABLES t ON r.SOURCE_OWNER = t.TABSCHEMA AND r.SOURCE_TABLE = t.TABNAME " +
"WHERE r.SOURCE_OWNER <> '' AND CD_NEW_SYNCHPOINT > ? AND (CD_OLD_SYNCHPOINT < ? OR CD_OLD_SYNCHPOINT IS NULL)";

private static final String GET_LIST_OF_KEY_COLUMNS = "SELECT "
+ "CAST((t.TBSPACEID * 65536 + t.TABLEID )AS INTEGER ) as objectid, "
+ "c.colname,c.colno,c.keyseq "
Expand All @@ -110,22 +84,28 @@ public class Db2Connection extends JdbcConnection {

private final BoundedConcurrentHashMap<Lsn, Instant> lsnToInstantCache;

private final Db2ConnectorConfig connectorConfig;
private final Db2PlatformAdapter platform;

/**
* Creates a new connection using the supplied configuration.
*
* @param config {@link Configuration} instance, may not be null.
*/
public Db2Connection(JdbcConfiguration config) {
super(config, FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER);
public Db2Connection(Db2ConnectorConfig config) {
super(config.getJdbcConfig(), FACTORY, QUOTED_CHARACTER, QUOTED_CHARACTER);

connectorConfig = config;
lsnToInstantCache = new BoundedConcurrentHashMap<>(100);
realDatabaseName = retrieveRealDatabaseName();
platform = connectorConfig.getDb2Platform().createAdapter(connectorConfig);
}

/**
* @return the current largest log sequence number
*/
public Lsn getMaxLsn() throws SQLException {
return queryAndMap(GET_MAX_LSN, singleResultMapper(rs -> {
return queryAndMap(platform.getMaxLsnQuery(), singleResultMapper(rs -> {
final Lsn ret = Lsn.valueOf(rs.getBytes(1));
LOGGER.trace("Current maximum lsn is {}", ret);
return ret;
Expand All @@ -142,7 +122,7 @@ public Lsn getMaxLsn() throws SQLException {
* @throws SQLException
*/
public void getChangesForTable(TableId tableId, Lsn fromLsn, Lsn toLsn, ResultSetConsumer consumer) throws SQLException {
final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId));
final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, cdcNameForTable(tableId));
prepareQuery(query, statement -> {
statement.setBytes(1, fromLsn.getBinary());
statement.setBytes(2, toLsn.getBinary());
Expand All @@ -166,7 +146,7 @@ public void getChangesForTables(Db2ChangeTable[] changeTables, Lsn intervalFromL

int idx = 0;
for (Db2ChangeTable changeTable : changeTables) {
final String query = GET_ALL_CHANGES_FOR_TABLE.replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
final String query = platform.getAllChangesForTableQuery().replace(STATEMENTS_PLACEHOLDER, changeTable.getCaptureInstance());
queries[idx] = query;
// If the table was added in the middle of queried buffer we need
// to adjust from to the first LSN available
Expand Down Expand Up @@ -271,9 +251,8 @@ public Lsn getFromLsn() {
}

public Set<Db2ChangeTable> listOfChangeTables() throws SQLException {
final String query = GET_LIST_OF_CDC_ENABLED_TABLES;

return queryAndMap(query, rs -> {
return queryAndMap(platform.getListOfCdcEnabledTablesQuery(), rs -> {
final Set<Db2ChangeTable> changeTables = new HashSet<>();
while (rs.next()) {
/**
Expand All @@ -293,7 +272,8 @@ public Set<Db2ChangeTable> listOfChangeTables() throws SQLException {
rs.getString(4),
rs.getInt(9),
Lsn.valueOf(rs.getBytes(5)),
Lsn.valueOf(rs.getBytes(6))
Lsn.valueOf(rs.getBytes(6)),
connectorConfig.getCdcChangeTablesSchema()

));
}
Expand All @@ -302,9 +282,8 @@ public Set<Db2ChangeTable> listOfChangeTables() throws SQLException {
}

public Set<Db2ChangeTable> listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws SQLException {
final String query = GET_LIST_OF_NEW_CDC_ENABLED_TABLES;

return prepareQueryAndMap(query,
return prepareQueryAndMap(platform.getListOfNewCdcEnabledTablesQuery(),
ps -> {
ps.setBytes(1, fromLsn.getBinary());
ps.setBytes(2, toLsn.getBinary());
Expand All @@ -316,7 +295,8 @@ public Set<Db2ChangeTable> listOfNewChangeTables(Lsn fromLsn, Lsn toLsn) throws
rs.getString(2),
rs.getInt(1),
Lsn.valueOf(rs.getBytes(3)),
Lsn.valueOf(rs.getBytes(4))));
Lsn.valueOf(rs.getBytes(4)),
connectorConfig.getCdcChangeTablesSchema()));
}
return changeTables;
});
Expand Down Expand Up @@ -570,7 +550,7 @@ public boolean validateLogPosition(Partition partition, OffsetContext offset, Co

final Lsn storedLsn = ((Db2OffsetContext) offset).getChangePosition().getCommitLsn();

String oldestFirstChangeQuery = String.format("SELECT min(RESTART_SEQ) FROM %s.IBMSNAP_CAPMON;", CDC_SCHEMA);
String oldestFirstChangeQuery = String.format("SELECT min(RESTART_SEQ) FROM %s.IBMSNAP_CAPMON;", connectorConfig.getCdcControlSchema());

try {
final String oldestScn = singleOptionalValue(oldestFirstChangeQuery, rs -> rs.getString(1));
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/debezium/connector/db2/Db2Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
// Try to connect to the database ...
Db2ConnectorConfig connectorConfig = new Db2ConnectorConfig(config);
try (Db2Connection connection = new Db2Connection(connectorConfig.getJdbcConfig())) {
try (Db2Connection connection = new Db2Connection(connectorConfig)) {
try {
connection.connect();
connection.execute("SELECT 1 FROM sysibm.sysdummy1");
Expand All @@ -102,7 +102,7 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
@Override
public List<TableId> getMatchingCollections(Configuration config) {
Db2ConnectorConfig connectorConfig = new Db2ConnectorConfig(config);
try (Db2Connection connection = new Db2Connection(connectorConfig.getJdbcConfig())) {
try (Db2Connection connection = new Db2Connection(connectorConfig)) {
return new ArrayList<>(
connection.readTableNames(null, null, null, new String[]{ "TABLE" }));
}
Expand Down
Loading
Loading