Skip to content

Commit

Permalink
[improvement] 2pc commit or abort retry and interval configurable. (#136
Browse files Browse the repository at this point in the history
)
  • Loading branch information
CodeCooker17 authored Sep 14, 2023
1 parent 0daf6c4 commit bcccb0d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,16 @@ public interface ConfigurationOptions {
String DORIS_SINK_STREAMING_PASSTHROUGH = "doris.sink.streaming.passthrough";
boolean DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT = false;

/**
* txnId commit or abort interval
*/
String DORIS_SINK_TXN_INTERVAL_MS = "doris.sink.txn.interval.ms";
int DORIS_SINK_TXN_INTERVAL_MS_DEFAULT = 50;

/**
* txnId commit or abort retry times
*/
String DORIS_SINK_TXN_RETRIES = "doris.sink.txn.retries";
int DORIS_SINK_TXN_RETRIES_DEFAULT = 3;

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success}

class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], dorisStreamLoad: DorisStreamLoad)
class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], dorisStreamLoad: DorisStreamLoad, sinkTnxIntervalMs: Int, sinkTxnRetries: Int)
extends SparkListener {

val logger: Logger = LoggerFactory.getLogger(classOf[DorisTransactionListener])
Expand All @@ -45,7 +45,7 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], d
}
logger.info("job run succeed, start committing transactions")
txnIds.foreach(txnId =>
Utils.retry(3, Duration.ofSeconds(1), logger) {
Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) {
dorisStreamLoad.commit(txnId)
} match {
case Success(_) =>
Expand All @@ -66,7 +66,7 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], d
}
logger.info("job run failed, start aborting transactions")
txnIds.foreach(txnId =>
Utils.retry(3, Duration.ofSeconds(1), logger) {
Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) {
dorisStreamLoad.abort(txnId)
} match {
case Success(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class DorisWriter(settings: SparkSettings) extends Serializable {

private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter])

val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
private val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES,
ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
Expand All @@ -49,6 +51,10 @@ class DorisWriter(settings: SparkSettings) extends Serializable {

private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
private val sinkTxnIntervalMs: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_TXN_INTERVAL_MS_DEFAULT)
private val sinkTxnRetries: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES,
ConfigurationOptions.DORIS_SINK_TXN_RETRIES_DEFAULT)

private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)

Expand All @@ -67,7 +73,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
val sc = dataFrame.sqlContext.sparkContext
val preCommittedTxnAcc = sc.collectionAccumulator[Int]("preCommittedTxnAcc")
if (enable2PC) {
sc.addSparkListener(new DorisTransactionListener(preCommittedTxnAcc, dorisStreamLoader))
sc.addSparkListener(new DorisTransactionListener(preCommittedTxnAcc, dorisStreamLoader, sinkTxnIntervalMs, sinkTxnRetries))
}

var resultRdd = dataFrame.queryExecution.toRdd
Expand Down Expand Up @@ -106,7 +112,7 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
}
val abortFailedTxnIds = mutable.Buffer[Int]()
acc.value.asScala.foreach(txnId => {
Utils.retry[Unit, Exception](3, Duration.ofSeconds(1), logger) {
Utils.retry[Unit, Exception](sinkTxnRetries, Duration.ofMillis(sinkTxnIntervalMs), logger) {
dorisStreamLoader.abort(txnId)
} match {
case Success(_) =>
Expand Down

0 comments on commit bcccb0d

Please sign in to comment.