diff --git a/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaData.java b/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaData.java index afabc811..d63604fa 100644 --- a/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaData.java +++ b/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaData.java @@ -2,7 +2,7 @@ import java.time.OffsetDateTime; -public record KafkaData(String topic, String key, byte[] value, String KAFKA_SERVER_URL, OffsetDateTime timeStamp) { +public record KafkaData(String topic, byte[] key, byte[] value, String KAFKA_SERVER_URL, OffsetDateTime timeStamp) { } diff --git a/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaWorker.java b/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaWorker.java index 296e893f..af28be24 100644 --- a/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaWorker.java +++ b/connectors/source-kafka/src/main/java/com/linkall/source/kafka/KafkaWorker.java @@ -76,8 +76,7 @@ public void doWork() { for (ConsumerRecord record : partitionRecords) { OffsetDateTime timeStamp = new Date(record.timestamp()).toInstant().atOffset( ZoneOffset.UTC ); - String key64 = Base64.getEncoder().encodeToString(record.key()); - KafkaData kafkaData = new KafkaData(record.topic(), key64, record.value(), KAFKA_SERVER_URL, timeStamp); + KafkaData kafkaData = new KafkaData(record.topic(), record.key(), record.value(), KAFKA_SERVER_URL, timeStamp); CloudEvent event = adapter.adapt(kafkaData); String sink = ConfigUtil.getVanceSink(); System.out.println("message: " + Arrays.toString(record.value()));