Skip to content

Commit

Permalink
[add] sinks
Browse files Browse the repository at this point in the history
  • Loading branch information
tuancamtbtx committed Dec 3, 2023
1 parent 4072da9 commit 15dd414
Show file tree
Hide file tree
Showing 42 changed files with 2,696 additions and 115 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ buildNumber.properties
# Eclipse Core
.project
# JDT-specific (Eclipse Java Development Tools)
.classpath
.classpath
*.iml
application.properties
21 changes: 21 additions & 0 deletions cdc-debezium-dist/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>vn.dataplatform.cdc</groupId>
<artifactId>cdc-debezium-common</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<groupId>org.example</groupId>
<artifactId>cdc-debezium-dist</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

</project>
11 changes: 11 additions & 0 deletions cdc-debezium-dist/src/main/java/org/example/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.example;

/**
* @author tuan.nguyen3
*/
public class Main {

public static void main(String[] args) {
System.out.println("Hello world!");
}
}
24 changes: 20 additions & 4 deletions cdc-debezium-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<artifactId>cdc-debezium-server</artifactId>
<packaging>jar</packaging>
<properties>
Expand Down Expand Up @@ -61,9 +66,20 @@
<artifactId>connect-api</artifactId>
</dependency>
<dependency>
<groupId>com.trustingsocial.cdc</groupId>
<artifactId>ts-debezium</artifactId>
<version>1.0-SNAPSHOT</version>
<groupId>io.confluent</groupId>
<artifactId>kafka-serde-tools-package</artifactId>
<version>7.3.2</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>7.5.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>vn.dataplatform.cdc</groupId>
Expand Down
3 changes: 2 additions & 1 deletion cdc-debezium-server/src/main/resources/.example.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
HUB_NAME=
EVENTHUB_CONNECTION_STRING=
KAFKA_BOOTSTRAP_SERVERS=
KAFKA_BOOTSTRAP_SERVERS=
GCP_PROJECT_ID=
Empty file.
6 changes: 0 additions & 6 deletions cdc-debezium-sinks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@
<artifactId>json-path</artifactId>
<version>${version.jsonpath}</version>
</dependency>
<dependency>
<groupId>vn.dataplatform.cdc</groupId>
<version>1.0-SNAPSHOT</version>
<artifactId>cdc-security</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,151 @@
package vn.dataplatform.cdc.sinks.bigquery;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import lombok.extern.slf4j.Slf4j;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;

import java.util.HashSet;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vn.dataplatform.cdc.sinks.bigquery.batchsizewait.InterfaceBatchSizeWait;


/**
* @author tuan.nguyen3
*/
@Slf4j
public abstract class AbstractChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>>{
protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15);
protected static final ConcurrentHashMap<String, Object> uploadLock = new ConcurrentHashMap<>();
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final ObjectMapper mapper = new ObjectMapper();
static Deserializer<JsonNode> keyDeserializer;
protected final Logger LOGGER = LoggerFactory.getLogger(getClass());
protected final Clock clock = Clock.system();
protected Deserializer<JsonNode> valDeserializer;
protected long consumerStart = clock.currentTimeInMillis();
protected long numConsumedEvents = 0;
protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL);
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;
@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

public void initialize() {
log.info("initialize");
public void initizalize() throws InterruptedException {
// configure and set
valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();
// configure and set
keySerde.configure(Collections.emptyMap(), true);
keyDeserializer = keySerde.deserializer();

if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
}

if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
}

batchSizeWait = BatchUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
LOGGER.info("Using {} to optimize batch size", batchSizeWait.getClass().getSimpleName());
batchSizeWait.initizalize();
}
public abstract void sendEvent(ChangeEvent<Object, Object> event);

@Override
public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
log.info("handleBatch value: {}", list);
List<Integer> a = new Vector<>();
Set<String> an = new HashSet<>();
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
LOGGER.trace("Received {} events", records.size());

Instant start = Instant.now();
Map<String, List<DebeziumBigqueryEvent>> events = records.stream()
.map((ChangeEvent<Object, Object> e)
-> {
try {
return new DebeziumBigqueryEvent(e.destination(),
valDeserializer.deserialize(e.destination(), getBytes(e.value())),
e.key() == null ? null : keyDeserializer.deserialize(e.destination(), getBytes(e.key())),
mapper.readTree(getBytes(e.value())).get("schema"),
e.key() == null ? null : mapper.readTree(getBytes(e.key())).get("schema")
);
} catch (IOException ex) {
throw new DebeziumException(ex);
}
})
.collect(Collectors.groupingBy(DebeziumBigqueryEvent::destination));
long numUploadedEvents = 0;
for (Map.Entry<String, List<DebeziumBigqueryEvent>> destinationEvents : events.entrySet()) {
// group list of events by their schema, if in the batch we have schema change events grouped by their schema
// so with this uniform schema is guaranteed for each batch
// if(destinationEvents.getValue() != null) {
// destinationEvents.getValue().forEach(e -> LOGGER.info("Event '{}'", e.valueSchema));
// LOGGER.info("Destination {} got {} records", destinationEvents.getKey(), destinationEvents.getValue().size());
// return ;
// }
Map<JsonNode, List<DebeziumBigqueryEvent>> eventsGroupedBySchema =
destinationEvents.getValue().stream()
.collect(Collectors.groupingBy(DebeziumBigqueryEvent::valueSchema));
LOGGER.debug("Destination {} got {} records with {} different schema!!", destinationEvents.getKey(),
destinationEvents.getValue().size(),
eventsGroupedBySchema.keySet().size());

for (List<DebeziumBigqueryEvent> schemaEvents : eventsGroupedBySchema.values()) {
numUploadedEvents += this.uploadDestination(destinationEvents.getKey(), schemaEvents);
}
}
// workaround! somehow offset is not saved to file unless we call committer.markProcessed
// even it's should be saved to file periodically
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Processed event '{}'", record);
committer.markProcessed(record);
}
committer.markBatchFinished();
this.logConsumerProgress(numUploadedEvents);
LOGGER.debug("Received:{} Processed:{} events", records.size(), numUploadedEvents);

batchSizeWait.waitMs(numUploadedEvents, (int) Duration.between(start, Instant.now()).toMillis());

}

protected void logConsumerProgress(long numUploadedEvents) {
numConsumedEvents += numUploadedEvents;
if (logTimer.expired()) {
LOGGER.info("Consumed {} records after {}", numConsumedEvents, Strings.duration(clock.currentTimeInMillis() - consumerStart));
numConsumedEvents = 0;
consumerStart = clock.currentTimeInMillis();
logTimer = Threads.timer(clock, LOG_INTERVAL);
}
}

public abstract long uploadDestination(String destination, List<DebeziumBigqueryEvent> data);
}

This file was deleted.

Loading

0 comments on commit 15dd414

Please sign in to comment.