diff --git a/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsMaker.java b/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsMaker.java index a20509c..a33e108 100644 --- a/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsMaker.java +++ b/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsMaker.java @@ -5,10 +5,14 @@ */ package io.debezium.connector.db2.converters; +import java.util.Set; + import io.debezium.connector.AbstractSourceInfo; +import io.debezium.converters.recordandmetadata.RecordAndMetadata; import io.debezium.converters.spi.CloudEventsMaker; -import io.debezium.converters.spi.RecordParser; import io.debezium.converters.spi.SerializerType; +import io.debezium.data.Envelope; +import io.debezium.util.Collect; /** * CloudEvents maker for records produced by the Db2 connector. @@ -17,14 +21,27 @@ */ public class Db2CloudEventsMaker extends CloudEventsMaker { - public Db2CloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) { - super(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName); + static final String CHANGE_LSN_KEY = "change_lsn"; + static final String COMMIT_LSN_KEY = "commit_lsn"; + + static final Set DB2_SOURCE_FIELDS = Collect.unmodifiableSet( + CHANGE_LSN_KEY, + COMMIT_LSN_KEY); + + public Db2CloudEventsMaker(RecordAndMetadata recordAndMetadata, SerializerType dataContentType, String dataSchemaUriBase, + String cloudEventsSchemaName) { + super(recordAndMetadata, dataContentType, dataSchemaUriBase, cloudEventsSchemaName, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER); } @Override public String ceId() { - return "name:" + recordParser.getMetadata(AbstractSourceInfo.SERVER_NAME_KEY) - + ";change_lsn:" + recordParser.getMetadata(Db2RecordParser.CHANGE_LSN_KEY) - + ";commit_lsn:" + recordParser.getMetadata(Db2RecordParser.COMMIT_LSN_KEY); + return "name:" + sourceField(AbstractSourceInfo.SERVER_NAME_KEY) + + ";change_lsn:" + sourceField(CHANGE_LSN_KEY) + + ";commit_lsn:" + sourceField(COMMIT_LSN_KEY); + } + + @Override + public Set connectorSpecificSourceFields() { + return DB2_SOURCE_FIELDS; } } diff --git a/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsProvider.java b/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsProvider.java index 18c745f..f614cc8 100644 --- a/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsProvider.java +++ b/src/main/java/io/debezium/connector/db2/converters/Db2CloudEventsProvider.java @@ -9,7 +9,6 @@ import io.debezium.converters.recordandmetadata.RecordAndMetadata; import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.converters.spi.CloudEventsProvider; -import io.debezium.converters.spi.RecordParser; import io.debezium.converters.spi.SerializerType; /** @@ -24,12 +23,7 @@ public String getName() { } @Override - public RecordParser createParser(RecordAndMetadata recordAndMetadata) { - return new Db2RecordParser(recordAndMetadata); - } - - @Override - public CloudEventsMaker createMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) { - return new Db2CloudEventsMaker(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName); + public CloudEventsMaker createMaker(RecordAndMetadata recordAndMetadata, SerializerType dataContentType, String dataSchemaUriBase, String cloudEventsSchemaName) { + return new Db2CloudEventsMaker(recordAndMetadata, dataContentType, dataSchemaUriBase, cloudEventsSchemaName); } } diff --git a/src/main/java/io/debezium/connector/db2/converters/Db2RecordParser.java b/src/main/java/io/debezium/connector/db2/converters/Db2RecordParser.java deleted file mode 100644 index 636ae5c..0000000 --- a/src/main/java/io/debezium/connector/db2/converters/Db2RecordParser.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.db2.converters; - -import java.util.Set; - -import org.apache.kafka.connect.errors.DataException; - -import io.debezium.converters.recordandmetadata.RecordAndMetadata; -import io.debezium.converters.spi.RecordParser; -import io.debezium.data.Envelope; -import io.debezium.util.Collect; - -/** - * Parser for records produced by the Db2 connector. - * - * @author Chris Cranford - */ -public class Db2RecordParser extends RecordParser { - - static final String CHANGE_LSN_KEY = "change_lsn"; - static final String COMMIT_LSN_KEY = "commit_lsn"; - - static final Set DB2_SOURCE_FIELD = Collect.unmodifiableSet( - CHANGE_LSN_KEY, - COMMIT_LSN_KEY); - - public Db2RecordParser(RecordAndMetadata recordAndMetadata) { - super(recordAndMetadata, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER); - } - - @Override - public Object getMetadata(String name) { - if (SOURCE_FIELDS.contains(name)) { - return source().get(name); - } - if (DB2_SOURCE_FIELD.contains(name)) { - return source().get(name); - } - - throw new DataException("No such field \"" + name + "\" in the \"source\" field of events from Db2 connector"); - } -}