Skip to content

Commit

Permalink
[improvement] batch load retry (#148)
Browse files Browse the repository at this point in the history
Co-authored-by: gnehil <adamlee489@gamil.com>
  • Loading branch information
gnehil and gnehil authored Oct 25, 2023
1 parent f1e402a commit 9dd57b0
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public interface ConfigurationOptions {
int SINK_BATCH_SIZE_DEFAULT = 100000;

String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries";
int SINK_MAX_RETRIES_DEFAULT = 1;
int SINK_MAX_RETRIES_DEFAULT = 0;

String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public class DorisStreamLoad implements Serializable {
private String FIELD_DELIMITER;
private final String LINE_DELIMITER;
private boolean streamingPassthrough = false;
private final Integer batchSize;
private final boolean enable2PC;
private final Integer txnRetries;
private final Integer txnIntervalMs;
Expand Down Expand Up @@ -128,8 +127,6 @@ public DorisStreamLoad(SparkSettings settings) {
LINE_DELIMITER = escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
this.streamingPassthrough = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH,
ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT);
this.batchSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT);
this.enable2PC = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
this.txnRetries = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES,
Expand Down Expand Up @@ -200,7 +197,6 @@ public int load(Iterator<InternalRow> rows, StructType schema)
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC);
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.batchSize(batchSize)
.format(fileType)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ public class RecordBatch {
*/
private final Iterator<InternalRow> iterator;

/**
* batch size for single load
*/
private final int batchSize;

/**
* stream load format
*/
Expand All @@ -63,10 +58,9 @@ public class RecordBatch {

private final boolean addDoubleQuotes;

private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim,
private RecordBatch(Iterator<InternalRow> iterator, String format, String sep, byte[] delim,
StructType schema, boolean addDoubleQuotes) {
this.iterator = iterator;
this.batchSize = batchSize;
this.format = format;
this.sep = sep;
this.delim = delim;
Expand All @@ -78,10 +72,6 @@ public Iterator<InternalRow> getIterator() {
return iterator;
}

public int getBatchSize() {
return batchSize;
}

public String getFormat() {
return format;
}
Expand Down Expand Up @@ -112,8 +102,6 @@ public static class Builder {

private final Iterator<InternalRow> iterator;

private int batchSize;

private String format;

private String sep;
Expand All @@ -128,11 +116,6 @@ public Builder(Iterator<InternalRow> iterator) {
this.iterator = iterator;
}

public Builder batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}

public Builder format(String format) {
this.format = format;
return this;
Expand All @@ -159,7 +142,7 @@ public Builder addDoubleQuotes(boolean addDoubleQuotes) {
}

public RecordBatch build() {
return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes);
return new RecordBatch(iterator, format, sep, delim, schema, addDoubleQuotes);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public class RecordBatchInputStream extends InputStream {

private final byte[] delim;

/**
* record count has been read
*/
private int readCount = 0;

/**
* streaming mode pass through data without process
*/
Expand Down Expand Up @@ -122,12 +117,12 @@ public int read(byte[] b, int off, int len) throws IOException {
*/
public boolean endOfBatch() throws DorisException {
Iterator<InternalRow> iterator = recordBatch.getIterator();
if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) {
delimBuf = null;
return true;
if (iterator.hasNext()) {
readNext(iterator);
return false;
}
readNext(iterator);
return false;
delimBuf = null;
return true;
}

/**
Expand All @@ -149,7 +144,6 @@ private void readNext(Iterator<InternalRow> iterator) throws DorisException {
delimBuf = ByteBuffer.wrap(delim);
lineBuf = ByteBuffer.wrap(rowBytes);
}
readCount++;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], d
txnIds.foreach(txnId =>
Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) {
dorisStreamLoad.commit(txnId)
} match {
case Success(_) =>
} () match {
case Success(_) => // do nothing
case Failure(_) => failedTxnIds += txnId
}
)
Expand All @@ -68,8 +68,8 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Int], d
txnIds.foreach(txnId =>
Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) {
dorisStreamLoad.abortById(txnId)
} match {
case Success(_) =>
} () match {
case Success(_) => // do nothing
case Failure(_) => failedTxnIds += txnId
})
if (failedTxnIds.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ import scala.util.{Failure, Success, Try}
private[spark] object Utils {
/**
* quote column name
*
* @param colName column name
* @return quoted column name
*/
def quote(colName: String): String = s"`$colName`"

/**
* compile a filter to Doris FE filter format.
* @param filter filter to be compile
* @param dialect jdbc dialect to translate value to sql format
*
* @param filter filter to be compile
* @param dialect jdbc dialect to translate value to sql format
* @param inValueLengthLimit max length of in value array
* @return if Doris FE can handle this filter, return None if Doris FE can not handled it.
*/
Expand Down Expand Up @@ -87,6 +89,7 @@ private[spark] object Utils {

/**
* Escape special characters in SQL string literals.
*
* @param value The string to be escaped.
* @return Escaped string.
*/
Expand All @@ -95,6 +98,7 @@ private[spark] object Utils {

/**
* Converts value to SQL expression.
*
* @param value The value to be converted.
* @return Converted value.
*/
Expand All @@ -108,16 +112,17 @@ private[spark] object Utils {

/**
* check parameters validation and process it.
*
* @param parameters parameters from rdd and spark conf
* @param logger slf4j logger
* @param logger slf4j logger
* @return processed parameters
*/
def params(parameters: Map[String, String], logger: Logger) = {
// '.' seems to be problematic when specifying the options
val dottedParams = parameters.map { case (k, v) =>
if (k.startsWith("sink.properties.") || k.startsWith("doris.sink.properties.")){
(k,v)
}else {
if (k.startsWith("sink.properties.") || k.startsWith("doris.sink.properties.")) {
(k, v)
} else {
(k.replace('_', '.'), v)
}
}
Expand All @@ -141,7 +146,7 @@ private[spark] object Utils {
case (k, v) =>
if (k.startsWith("doris.")) (k, v)
else ("doris." + k, v)
}.map{
}.map {
case (ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD, _) =>
logger.error(s"${ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD} cannot use in Doris Datasource.")
throw new DorisException(s"${ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD} cannot use in" +
Expand All @@ -165,13 +170,14 @@ private[spark] object Utils {

// validate path is available
finalParams.getOrElse(ConfigurationOptions.DORIS_TABLE_IDENTIFIER,
throw new DorisException("table identifier must be specified for doris table identifier."))
throw new DorisException("table identifier must be specified for doris table identifier."))

finalParams
}

@tailrec
def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)(f: => R): Try[R] = {
def retry[R, T <: Throwable : ClassTag](retryTimes: Int, interval: Duration, logger: Logger)
(f: => R)(h: => Unit): Try[R] = {
assert(retryTimes >= 0)
val result = Try(f)
result match {
Expand All @@ -182,7 +188,8 @@ private[spark] object Utils {
logger.warn(s"Execution failed caused by: ", exception)
logger.warn(s"$retryTimes times retry remaining, the next attempt will be in ${interval.toMillis} ms")
LockSupport.parkNanos(interval.toNanos)
retry(retryTimes - 1, interval, logger)(f)
h
retry(retryTimes - 1, interval, logger)(f)(h)
case Failure(exception) => Failure(exception)
}
}
Expand Down
Loading

0 comments on commit 9dd57b0

Please sign in to comment.