Skip to content

Commit

Permalink
DBZ-7755 Refactor exporting to CloudEvents: remove RecordParser
Browse files Browse the repository at this point in the history
  • Loading branch information
rkudryashov authored and jpechane committed Apr 8, 2024
1 parent d5cafc2 commit ba8d984
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
*/
package io.debezium.connector.db2.converters;

import java.util.Set;

import io.debezium.connector.AbstractSourceInfo;
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.RecordParser;
import io.debezium.converters.spi.SerializerType;
import io.debezium.data.Envelope;
import io.debezium.util.Collect;

/**
* CloudEvents maker for records produced by the Db2 connector.
Expand All @@ -17,14 +21,27 @@
*/
public class Db2CloudEventsMaker extends CloudEventsMaker {

public Db2CloudEventsMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
super(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName);
static final String CHANGE_LSN_KEY = "change_lsn";
static final String COMMIT_LSN_KEY = "commit_lsn";

static final Set<String> DB2_SOURCE_FIELDS = Collect.unmodifiableSet(
CHANGE_LSN_KEY,
COMMIT_LSN_KEY);

public Db2CloudEventsMaker(RecordAndMetadata recordAndMetadata, SerializerType dataContentType, String dataSchemaUriBase,
String cloudEventsSchemaName) {
super(recordAndMetadata, dataContentType, dataSchemaUriBase, cloudEventsSchemaName, Envelope.FieldName.BEFORE, Envelope.FieldName.AFTER);
}

@Override
public String ceId() {
return "name:" + recordParser.getMetadata(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";change_lsn:" + recordParser.getMetadata(Db2RecordParser.CHANGE_LSN_KEY)
+ ";commit_lsn:" + recordParser.getMetadata(Db2RecordParser.COMMIT_LSN_KEY);
return "name:" + sourceField(AbstractSourceInfo.SERVER_NAME_KEY)
+ ";change_lsn:" + sourceField(CHANGE_LSN_KEY)
+ ";commit_lsn:" + sourceField(COMMIT_LSN_KEY);
}

@Override
public Set<String> connectorSpecificSourceFields() {
return DB2_SOURCE_FIELDS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.debezium.converters.recordandmetadata.RecordAndMetadata;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.CloudEventsProvider;
import io.debezium.converters.spi.RecordParser;
import io.debezium.converters.spi.SerializerType;

/**
Expand All @@ -24,12 +23,7 @@ public String getName() {
}

@Override
public RecordParser createParser(RecordAndMetadata recordAndMetadata) {
return new Db2RecordParser(recordAndMetadata);
}

@Override
public CloudEventsMaker createMaker(RecordParser parser, SerializerType contentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
return new Db2CloudEventsMaker(parser, contentType, dataSchemaUriBase, cloudEventsSchemaName);
public CloudEventsMaker createMaker(RecordAndMetadata recordAndMetadata, SerializerType dataContentType, String dataSchemaUriBase, String cloudEventsSchemaName) {
return new Db2CloudEventsMaker(recordAndMetadata, dataContentType, dataSchemaUriBase, cloudEventsSchemaName);
}
}

This file was deleted.

0 comments on commit ba8d984

Please sign in to comment.