Skip to content

Commit

Permalink
[add] test release package
Browse files Browse the repository at this point in the history
  • Loading branch information
tuancamtbtx committed Dec 4, 2023
1 parent a636dc6 commit 750dc98
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 24 deletions.
30 changes: 22 additions & 8 deletions .github/workflows/sink_package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,27 @@ jobs:
packages: write

steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Log in to the Container registry
uses: docker/login-action@v2
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
server-id: github # Value of the distributionManagement/repository/id field of the pom.xml
settings-path: ${{ github.workspace }} # location for the settings.xml file
- name: Set up Maven
id: setup
uses: stCarolas/setup-maven@v4.5
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.DOCKER_GITHUB_TOKEN }}
maven-version: 3.8.2
- name: Build with Maven
id: build_maven
run: mvn -B package --file pom.xml
- name: Publish package
id: publish_package
run: mvn --batch-mode deploy
env:
GITHUB_TOKEN: ${{ secrets.REGISTRY_GITHUB_TOKEN }}



1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ Follow via https://debezium.io/documentation/reference/stable/operations/debeziu

### Architecture
![alt text](./images/cdc_debezium_server.gif)

18 changes: 17 additions & 1 deletion cdc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,21 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>${version.jsonpath}</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${version.json.org}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${version.gson}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,61 @@
package vn.dataplatform.core.common;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.GsonJsonProvider;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* @author tuan.nguyen3
*/
public class JsonPathUtils {
public static Gson gson = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();

public static GsonJsonProvider gsonJsonProvider = new GsonJsonProvider(gson);

public static final Configuration jConf =
Configuration.builder().jsonProvider(gsonJsonProvider).options(Option.DEFAULT_PATH_LEAF_TO_NULL, Option.SUPPRESS_EXCEPTIONS).build();

public static final Configuration jPathConf = Configuration.builder().jsonProvider(gsonJsonProvider)
.options(Option.DEFAULT_PATH_LEAF_TO_NULL, Option.SUPPRESS_EXCEPTIONS, Option.AS_PATH_LIST, Option.ALWAYS_RETURN_LIST).build();

public static Map<String, String> pathToValueMap(String json, List<String> paths) {
if (json == null || json.trim().isEmpty() || paths.isEmpty()) {
return Collections.emptyMap();
}

DocumentContext jVals = JsonPath.using(jConf).parse(json);
DocumentContext jPaths = JsonPath.using(jPathConf).parse(json);

return new HashSet<>(paths).stream() // Make sure that keys are unique
.flatMap(p -> StreamSupport.stream(jPaths.read(p, JsonArray.class).spliterator(), false).map(JsonElement::getAsString))
.collect(Collectors.toMap(p -> p, p -> {
JsonElement e = jVals.read(p, JsonElement.class);
if (e.isJsonArray()) {
return gson.toJson(e.getAsJsonArray());
}
return e.isJsonNull() ? "" : e.getAsString();
})).entrySet().stream().filter(kv -> !kv.getValue().trim().isEmpty()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static <T> String setJsonValues(String json, Map<String, T> updatedPaths) {
if (json == null || updatedPaths == null || json.trim().isEmpty() || updatedPaths.isEmpty()) {
return json;
}

DocumentContext doc = JsonPath.using(jConf).parse(json);
updatedPaths.forEach((path, newVal) -> doc.map(path, (val, conf) -> newVal));
return doc.jsonString();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,55 @@
package vn.dataplatform.core.common;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* @author tuan.nguyen3
*/
public class StringUtils {
public static final String EMPTY = "";

public static final String[] EMPTY_STRING_ARRAY = new String[0];

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL = new String[]{null};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY = new String[]{EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_NULL = new String[]{EMPTY, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_EMPTY = new String[]{null, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_NULL = new String[]{null, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_EMPTY = new String[]{EMPTY, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_NULL_AND_EMPTY = new String[]{EMPTY, null, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_EMPTY_AND_NULL = new String[]{null, EMPTY, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_NULL_AND_NULL = new String[]{null, null, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_EMPTY_AND_EMPTY = new String[]{EMPTY, EMPTY, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_NULL_AND_NULL = new String[]{EMPTY, null, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_EMPTY_AND_EMPTY = new String[]{null, EMPTY, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_NULL_AND_EMPTY = new String[]{null, null, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_EMPTY_AND_NULL = new String[]{EMPTY, EMPTY, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_NULL_AND_EMPTY_AND_NULL = new String[]{EMPTY, null, EMPTY, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_EMPTY_AND_NULL_AND_EMPTY = new String[]{null, EMPTY, null, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_NULL_AND_NULL_AND_NULL = new String[]{null, null, null, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_EMPTY_AND_EMPTY_AND_EMPTY = new String[]{EMPTY, EMPTY, EMPTY, EMPTY};

public static final String[] EMPTY_STRING_ARRAY_WITH_EMPTY_AND_NULL_AND_NULL_AND_NULL = new String[]{EMPTY, null, null, null};

public static final String[] EMPTY_STRING_ARRAY_WITH_NULL_AND_EMPTY_AND_NULL_AND_NULL = new String[]{null, EMPTY, null, null};
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void configure(Properties props) {
@Override
public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
if("isbn".equals(field.typeName())) {
registration.register(schemaBuilder, x -> x.toString());
registration.register(schemaBuilder, Object::toString);
}
}
}
9 changes: 7 additions & 2 deletions cdc-debezium-sinks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cdc-debezium-sinks</artifactId>

<distributionManagement>
<repository>
<id>github</id>
<name>GitHub Packages</name>
<url>https://maven.pkg.github.com/tuancamtbtx/cdc-debezium-sinks</url>
</repository>
</distributionManagement>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,9 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
for (Map.Entry<String, List<DebeziumBigqueryEvent>> destinationEvents : events.entrySet()) {
// group list of events by their schema, if in the batch we have schema change events grouped by their schema
// so with this uniform schema is guaranteed for each batch
// if(destinationEvents.getValue() != null) {
// destinationEvents.getValue().forEach(e -> LOGGER.info("Event '{}'", e.valueSchema));
// LOGGER.info("Destination {} got {} records", destinationEvents.getKey(), destinationEvents.getValue().size());
// return ;
// }
Map<JsonNode, List<DebeziumBigqueryEvent>> eventsGroupedBySchema =
destinationEvents.getValue().stream()
.filter(e -> e.valueSchema() != null)
.collect(Collectors.groupingBy(DebeziumBigqueryEvent::valueSchema));
LOGGER.debug("Destination {} got {} records with {} different schema!!", destinationEvents.getKey(),
destinationEvents.getValue().size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vn.dataplatform.cdc.sinks.bigquery.BatchUtil;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class BigquerySchemaHistory extends AbstractSchemaHistory {
private final AtomicBoolean running = new AtomicBoolean();
BigquerySchemaHistoryConfig historyConfig;
BigQuery bqClient;
@Getter
private String tableFullName;
private TableId tableId;

Expand Down Expand Up @@ -101,10 +103,6 @@ public void start() {
});
}

public String getTableFullName() {
return tableFullName;
}

@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (record == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
debezium.sink.type=bigquerystream
debezium.sink.bigquerystream.project=${GCP_PROJECT_ID}
debezium.sink.bigquerystream.dataset=tuan_test
debezium.sink.bigquerybatch.location=US
debezium.sink.bigquerystream.create-if-needed=true
debezium.sink.bigquerystream.partition-field=create_on
debezium.source.offset.storage.bigquery.table-name=debezium_offset_storage_custom_table
Expand Down
7 changes: 7 additions & 0 deletions cdc-debezium-transform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>cdc-debezium-transform</artifactId>
<version>1.0-SNAPSHOT</version>
<distributionManagement>
<repository>
<id>github</id>
<name>GitHub Packages</name>
<url>https://maven.pkg.github.com/tuancamtbtx/cdc-debezium-transform</url>
</repository>
</distributionManagement>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@
<module>cdc-debezium-transform</module>
<module>cdc-debezium-server</module>
<module>cdc-core</module>
<module>cdc-schema-registry</module>
<module>cdc-debezium-converter</module>
<module>cdc-debezium-pipeline</module>
<module>cdc-security</module>
<module>cdc-debezium-sinks</module>
<module>cdc-debezium-storage</module>
<module>cdc-engine</module>
<module>cdc-debezium-dist</module>
</modules>
<properties>
<maven.compiler.source>11</maven.compiler.source>
Expand Down Expand Up @@ -91,6 +89,8 @@
<version.junit>5.10.0</version.junit>
<!-- Json Org -->
<version.json.org>20230618</version.json.org>
<version.gson>2.10.1</version.gson>

</properties>
<dependencyManagement>
<dependencies>
Expand Down

0 comments on commit 750dc98

Please sign in to comment.