Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DMS-322] Convert Debezium __deleted flagged record to tombstone #6

Merged
merged 9 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,27 @@ more details about configuring transformations on how to install transforms.

### `RenameDmsTopicToOpenSearchIndex`

This transformation renames the DMS topic to be an OpenSearch index based on the document ProjectName and ResourceName
This transformation renames the DMS topic to be an OpenSearch index based on the document ProjectName and ResourceName.

- `org.edfi.kafka.connect.transforms.RenameDmsTopicToOpenSearchIndex`
- works on values.

Here is an example of this transformation configuration:
Example of this transformation configuration:

```properties
transforms=RenameDmsTopicToOpenSearchIndex
transforms.RenameDmsTopicToOpenSearchIndex.type=org.edfi.kafka.connect.transforms.RenameDmsTopicToOpenSearchIndex
```

### `DebeziumDeletedToTombstone`

This transformation checks for a Debezium _deleted=true flag. If found, it turns it into a tombstone.

Example of this transformation configuration:

```properties
transforms=DebeziumDeletedToTombstone
transforms.DebeziumDeletedToTombstone.type=org.edfi.kafka.connect.transforms.DebeziumDeletedToTombstone
```


## Running transformations

### Prerequisites
Expand All @@ -43,9 +52,10 @@ Gradle 7.2.4
```

- To run the transforms locally for the first time you need to build the
gradle-wrapper.jar. To generate it, run the following command, this will add the
gradle-wrapper.jar. To generate it, run the following command from the `ed-fi-kafka-connect-transforms` directory. This will add the
gradle-wrapper.jar in the gradle\wrapper folder `> gradle wrapper`
- If you encounter an error message `java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)`, gradle could not find your Java cacert trustStore directory. Re-run and specify it explicitly e.g. `gradle wrapper -Djavax.net.ssl.trustStore=/usr/lib/jvm/default-java/lib/security/cacerts`.
- If you encounter file system watcher errors/warnings and don't care about gradle watching in the background, add the `--no-watch-fs` flag.

### Tasks

Expand All @@ -60,7 +70,7 @@ This project includes a series of *gradle* tasks:

## Build container

To build the container with a dev tag, simply run `docker build -t edfialliance/ed-fi-kafka-connect:dev` from the `kafka` directory.
To build the container with a dev tag, simply run `docker build -t edfialliance/ed-fi-kafka-connect:dev .` from the `kafka` directory.

## Legal Information

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@
<property name="localeLanguage" value="en"/>
<property name="fileExtensions" value="java, properties, xml"/>

<!-- See http://checkstyle.sourceforge.net/config_header.html#RegexpHeader -->
<module name="RegexpHeader">
<property name="fileExtensions" value="java"/>
<property name="headerFile" value="${config_loc}/java.header" default="checkstyle/java.header"/>
</module>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files have different license headers now

<!-- See http://checkstyle.sourceforge.net/config_whitespace.html#FileTabCharacter -->
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-License-Identifier: Apache-2.0
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

package org.edfi.kafka.connect.transforms;

import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;

public class DebeziumDeletedToTombstone<R extends ConnectRecord<R>> implements Transformation<R> {

// Empty SMT configuration
public static final ConfigDef CONFIG_DEF = new ConfigDef();

public static final String DELETED_FIELD = "__deleted";

@Override
public ConfigDef config() {
// No configuration needed
return CONFIG_DEF;
}

// No configuration needed
@Override
public void configure(final Map<String, ?> settings) {
}

// If __deleted=true, nulls out the record value.
@Override
public R apply(final R record) {

// Ignore tombstone records
if (record.value() == null) {
return null;
}

if (!(record.value() instanceof Map)) {
throw new DataException("Value type must be castable to Map: " + record.toString());
}

boolean isDeleted = false;
try {
@SuppressWarnings("unchecked")
final Map<String, Object> recordValueAsMap = (Map<String, Object>) record.value();
final String isDeletedString = (String) recordValueAsMap.get(DELETED_FIELD);
isDeleted = Boolean.valueOf(isDeletedString);
} catch (final Exception e) {
throw new DataException(
"Value type must have __deleted as String: " + record.toString() + " " + e.toString());
}

return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
isDeleted ? null : record.valueSchema(),
isDeleted ? null : record.value(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nulling out record value makes it a tombstone

record.timestamp(),
record.headers());
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
package org.edfi.kafka.connect.transforms;

import java.util.Map;
import java.util.Optional;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
Expand Down Expand Up @@ -87,25 +86,7 @@ public R apply(final R record) {
// Returns the index name fragment from the given record field name, converting
// any dots to dashes first
private String indexNameFragmentFromField(final String fieldName, final Map<String, Object> recordValues) {
return getStringField(fieldName, recordValues).replace(".", "-");
}

// Returns string field with the given field name
private String getStringField(final String fieldName, final Map<String, Object> recordValues) {
final Optional<String> result = Optional.ofNullable(recordValues.get(fieldName))
.map(field -> {
if (!field.getClass().equals(String.class)) {
throw new DataException(fieldName + " must be a string in record: " + recordValues.toString());
}
return field;
})
.map(Object::toString);

if (result.isPresent() && !result.get().isBlank()) {
return result.get();
} else {
throw new DataException(fieldName + " in value can't be null or empty");
}
return Utility.getStringField(fieldName, recordValues).replace(".", "-");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: Apache-2.0
// Licensed to the Ed-Fi Alliance under one or more agreements.
// The Ed-Fi Alliance licenses this file to you under the Apache License, Version 2.0.
// See the LICENSE and NOTICES files in the project root for more information.

package org.edfi.kafka.connect.transforms;

import java.util.Map;
import java.util.Optional;

import org.apache.kafka.connect.errors.DataException;

public class Utility {
// Returns string field with the given field name
public static String getStringField(final String fieldName, final Map<String, Object> recordValues) {
final Optional<String> result = Optional.ofNullable(recordValues.get(fieldName))
.map(field -> {
if (!field.getClass().equals(String.class)) {
throw new DataException(fieldName + " must be a string in record: " + recordValues.toString());
}
return field;
})
.map(Object::toString);

if (result.isPresent() && !result.get().isBlank()) {
return result.get();
} else {
throw new DataException(fieldName + " in value can't be null or empty");
}
}
}