From bcccb0d6a88d673e0a5f5ccb9a3ab5f3ed1a05d9 Mon Sep 17 00:00:00 2001 From: Chuang Li <64473732+CodeCooker17@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:31:08 +0800 Subject: [PATCH] [improvement] 2pc commit or abort retry and interval configurable. (#136) --- .../apache/doris/spark/cfg/ConfigurationOptions.java | 12 ++++++++++++ .../spark/listener/DorisTransactionListener.scala | 6 +++--- .../org/apache/doris/spark/writer/DorisWriter.scala | 10 ++++++++-- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 09c0416f..a6767f08 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -106,4 +106,16 @@ public interface ConfigurationOptions { String DORIS_SINK_STREAMING_PASSTHROUGH = "doris.sink.streaming.passthrough"; boolean DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT = false; + /** + * txnId commit or abort interval + */ + String DORIS_SINK_TXN_INTERVAL_MS = "doris.sink.txn.interval.ms"; + int DORIS_SINK_TXN_INTERVAL_MS_DEFAULT = 50; + + /** + * txnId commit or abort retry times + */ + String DORIS_SINK_TXN_RETRIES = "doris.sink.txn.retries"; + int DORIS_SINK_TXN_RETRIES_DEFAULT = 3; + } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala index a36e634a..262ad193 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala @@ -28,7 +28,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Success} -class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], dorisStreamLoad: DorisStreamLoad) +class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], dorisStreamLoad: DorisStreamLoad, sinkTnxIntervalMs: Int, sinkTxnRetries: Int) extends SparkListener { val logger: Logger = LoggerFactory.getLogger(classOf[DorisTransactionListener]) @@ -45,7 +45,7 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], d } logger.info("job run succeed, start committing transactions") txnIds.foreach(txnId => - Utils.retry(3, Duration.ofSeconds(1), logger) { + Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) { dorisStreamLoad.commit(txnId) } match { case Success(_) => @@ -66,7 +66,7 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], d } logger.info("job run failed, start aborting transactions") txnIds.foreach(txnId => - Utils.retry(3, Duration.ofSeconds(1), logger) { + Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) { dorisStreamLoad.abort(txnId) } match { case Success(_) => diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index b278a385..3fdfb793 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -39,6 +39,8 @@ class DorisWriter(settings: SparkSettings) extends Serializable { private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter]) + val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, + ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) private val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) @@ -49,6 +51,10 @@ class DorisWriter(settings: SparkSettings) extends Serializable { private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC, ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT); + private val sinkTxnIntervalMs: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS, + ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS_DEFAULT) + private val sinkTxnRetries: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES, + ConfigurationOptions.DORIS_SINK_TXN_RETRIES_DEFAULT) private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) @@ -67,7 +73,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable { val sc = dataFrame.sqlContext.sparkContext val preCommittedTxnAcc = sc.collectionAccumulator[Int]("preCommittedTxnAcc") if (enable2PC) { - sc.addSparkListener(new DorisTransactionListener(preCommittedTxnAcc, dorisStreamLoader)) + sc.addSparkListener(new DorisTransactionListener(preCommittedTxnAcc, dorisStreamLoader, sinkTxnIntervalMs, sinkTxnRetries)) } var resultRdd = dataFrame.queryExecution.toRdd @@ -106,7 +112,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable { } val abortFailedTxnIds = mutable.Buffer[Int]() acc.value.asScala.foreach(txnId => { - Utils.retry[Unit, Exception](3, Duration.ofSeconds(1), logger) { + Utils.retry[Unit, Exception](sinkTxnRetries, Duration.ofMillis(sinkTxnIntervalMs), logger) { dorisStreamLoader.abort(txnId) } match { case Success(_) =>