From a9d5e253037c6259aa44c56526072b73ef3859b1 Mon Sep 17 00:00:00 2001 From: LeiWang Date: Thu, 24 Oct 2024 17:59:04 +0800 Subject: [PATCH] [Enhancemant] add label prefix configuration item for doris sink to track writing (#235) --- .../org/apache/doris/spark/cfg/ConfigurationOptions.java | 3 +++ .../scala/org/apache/doris/spark/load/StreamLoader.scala | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index cf0630f7..3d601421 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -126,6 +126,9 @@ public interface ConfigurationOptions { String DORIS_SINK_AUTO_REDIRECT = "doris.sink.auto-redirect"; boolean DORIS_SINK_AUTO_REDIRECT_DEFAULT = true; + String DORIS_SINK_LABEL_PREFIX = "doris.sink.label.prefix"; + String DORIS_SINK_LABEL_PREFIX_DEFAULT = "spark_streamload"; + /** * compress_type */ diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala index 5f7765dd..52a88bc4 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/load/StreamLoader.scala @@ -83,6 +83,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader ConfigurationOptions.DORIS_ENABLE_HTTPS_DEFAULT) && autoRedirect private val enableGroupCommit: Boolean = streamLoadProps.contains(ConfigurationOptions.GROUP_COMMIT) + /** * execute stream load * @@ -384,7 +385,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader /** * generate load label * - * spark_streamload_YYYYMMDD_HHMMSS_{UUID} + * {label_prefix}_YYYYMMDD_HHMMSS_{UUID} * * @return load label */ @@ -393,7 +394,9 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader return null; } val calendar = Calendar.getInstance - "spark_streamload_" + + val labelPrefix = settings.getProperty(ConfigurationOptions.DORIS_SINK_LABEL_PREFIX, + ConfigurationOptions.DORIS_SINK_LABEL_PREFIX_DEFAULT) + labelPrefix + "_" + f"${calendar.get(Calendar.YEAR)}${calendar.get(Calendar.MONTH) + 1}%02d${calendar.get(Calendar.DAY_OF_MONTH)}%02d" + f"_${calendar.get(Calendar.HOUR_OF_DAY)}%02d${calendar.get(Calendar.MINUTE)}%02d${calendar.get(Calendar.SECOND)}%02d" + f"_${UUID.randomUUID.toString.replaceAll("-", "")}"