Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Oct 25, 2023
2 parents 73a6f93 + bfa1e5e commit 36952bf
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 128 deletions.
9 changes: 9 additions & 0 deletions spark-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@
<artifactId>jackson-core</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public interface ConfigurationOptions {
// doris fe node address
String DORIS_FENODES = "doris.fenodes";
String DORIS_QUERY_PORT = "doris.query.port";

String DORIS_DEFAULT_CLUSTER = "default_cluster";

Expand Down Expand Up @@ -70,7 +71,7 @@ public interface ConfigurationOptions {
int SINK_BATCH_SIZE_DEFAULT = 100000;

String DORIS_SINK_MAX_RETRIES = "doris.sink.max-retries";
int SINK_MAX_RETRIES_DEFAULT = 1;
int SINK_MAX_RETRIES_DEFAULT = 0;

String DORIS_MAX_FILTER_RATIO = "doris.max.filter.ratio";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public class DorisStreamLoad implements Serializable {
private String FIELD_DELIMITER;
private final String LINE_DELIMITER;
private boolean streamingPassthrough = false;
private final Integer batchSize;
private final boolean enable2PC;
private final Integer txnRetries;
private final Integer txnIntervalMs;
Expand Down Expand Up @@ -133,8 +132,6 @@ public DorisStreamLoad(SparkSettings settings) {
LINE_DELIMITER = escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
this.streamingPassthrough = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH,
ConfigurationOptions.DORIS_SINK_STREAMING_PASSTHROUGH_DEFAULT);
this.batchSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE,
ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT);
this.enable2PC = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
this.txnRetries = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TXN_RETRIES,
Expand Down Expand Up @@ -215,7 +212,6 @@ public long load(Iterator<InternalRow> rows, StructType schema)
this.loadUrlStr = loadUrlStr;
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC);
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
.batchSize(batchSize)
.format(fileType)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ public class RecordBatch {
*/
private final Iterator<InternalRow> iterator;

/**
* batch size for single load
*/
private final int batchSize;

/**
* stream load format
*/
Expand All @@ -63,10 +58,9 @@ public class RecordBatch {

private final boolean addDoubleQuotes;

private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim,
private RecordBatch(Iterator<InternalRow> iterator, String format, String sep, byte[] delim,
StructType schema, boolean addDoubleQuotes) {
this.iterator = iterator;
this.batchSize = batchSize;
this.format = format;
this.sep = sep;
this.delim = delim;
Expand All @@ -78,10 +72,6 @@ public Iterator<InternalRow> getIterator() {
return iterator;
}

public int getBatchSize() {
return batchSize;
}

public String getFormat() {
return format;
}
Expand Down Expand Up @@ -112,8 +102,6 @@ public static class Builder {

private final Iterator<InternalRow> iterator;

private int batchSize;

private String format;

private String sep;
Expand All @@ -128,11 +116,6 @@ public Builder(Iterator<InternalRow> iterator) {
this.iterator = iterator;
}

public Builder batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}

public Builder format(String format) {
this.format = format;
return this;
Expand All @@ -159,7 +142,7 @@ public Builder addDoubleQuotes(boolean addDoubleQuotes) {
}

public RecordBatch build() {
return new RecordBatch(iterator, batchSize, format, sep, delim, schema, addDoubleQuotes);
return new RecordBatch(iterator, format, sep, delim, schema, addDoubleQuotes);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ public class RecordBatchInputStream extends InputStream {

public static final Logger LOG = LoggerFactory.getLogger(RecordBatchInputStream.class);

private static final int DEFAULT_BUF_SIZE = 4096;

/**
* Load record batch
*/
Expand All @@ -55,12 +53,12 @@ public class RecordBatchInputStream extends InputStream {
/**
* record buffer
*/
private ByteBuffer buffer = ByteBuffer.allocate(0);

/**
* record count has been read
*/
private int readCount = 0;
private ByteBuffer lineBuf = ByteBuffer.allocate(0);;

private ByteBuffer delimBuf = ByteBuffer.allocate(0);

private final byte[] delim;

/**
* streaming mode pass through data without process
Expand All @@ -70,31 +68,42 @@ public class RecordBatchInputStream extends InputStream {
public RecordBatchInputStream(RecordBatch recordBatch, boolean passThrough) {
this.recordBatch = recordBatch;
this.passThrough = passThrough;
this.delim = recordBatch.getDelim();
}

@Override
public int read() throws IOException {
try {
if (buffer.remaining() == 0 && endOfBatch()) {
return -1; // End of stream
if (lineBuf.remaining() == 0 && endOfBatch()) {
return -1;
}

if (delimBuf != null && delimBuf.remaining() > 0) {
return delimBuf.get() & 0xff;
}
} catch (DorisException e) {
throw new IOException(e);
}
return buffer.get() & 0xFF;
return lineBuf.get() & 0xFF;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
if (buffer.remaining() == 0 && endOfBatch()) {
return -1; // End of stream
if (lineBuf.remaining() == 0 && endOfBatch()) {
return -1;
}

if (delimBuf != null && delimBuf.remaining() > 0) {
int bytesRead = Math.min(len, delimBuf.remaining());
delimBuf.get(b, off, bytesRead);
return bytesRead;
}
} catch (DorisException e) {
throw new IOException(e);
}
int bytesRead = Math.min(len, buffer.remaining());
buffer.get(b, off, bytesRead);
int bytesRead = Math.min(len, lineBuf.remaining());
lineBuf.get(b, off, bytesRead);
return bytesRead;
}

Expand All @@ -108,11 +117,12 @@ public int read(byte[] b, int off, int len) throws IOException {
*/
public boolean endOfBatch() throws DorisException {
Iterator<InternalRow> iterator = recordBatch.getIterator();
if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) {
return true;
if (iterator.hasNext()) {
readNext(iterator);
return false;
}
readNext(iterator);
return false;
delimBuf = null;
return true;
}

/**
Expand All @@ -125,60 +135,15 @@ private void readNext(Iterator<InternalRow> iterator) throws DorisException {
if (!iterator.hasNext()) {
throw new ShouldNeverHappenException();
}
byte[] delim = recordBatch.getDelim();
byte[] rowBytes = rowToByte(iterator.next());
if (isFirst) {
ensureCapacity(rowBytes.length);
buffer.put(rowBytes);
buffer.flip();
delimBuf = null;
lineBuf = ByteBuffer.wrap(rowBytes);
isFirst = false;
} else {
ensureCapacity(delim.length + rowBytes.length);
buffer.put(delim);
buffer.put(rowBytes);
buffer.flip();
}
readCount++;
}

/**
* Check if the buffer has enough capacity.
*
* @param need required buffer space
*/
private void ensureCapacity(int need) {

int capacity = buffer.capacity();

if (need <= capacity) {
buffer.clear();
return;
}

// need to extend
int newCapacity = calculateNewCapacity(capacity, need);
LOG.info("expand buffer, min cap: {}, now cap: {}, new cap: {}", need, capacity, newCapacity);
buffer = ByteBuffer.allocate(newCapacity);

}

/**
* Calculate new capacity for buffer expansion.
*
* @param capacity current buffer capacity
* @param minCapacity required min buffer space
* @return new capacity
*/
private int calculateNewCapacity(int capacity, int minCapacity) {
int newCapacity = 0;
if (capacity == 0) {
newCapacity = DEFAULT_BUF_SIZE;

}
while (newCapacity < minCapacity) {
newCapacity = newCapacity << 1;
delimBuf = ByteBuffer.wrap(delim);
lineBuf = ByteBuffer.wrap(rowBytes);
}
return newCapacity;
}

/**
Expand Down Expand Up @@ -220,5 +185,4 @@ private byte[] rowToByte(InternalRow row) throws DorisException {

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.jdbc

import java.sql.{Connection, DriverManager}
import java.util.Properties

object JdbcUtils {

def getJdbcUrl(host: String, port: Int): String = s"jdbc:mysql://$host:$port/information_schema"

def getConnection(url: String, props: Properties): Connection = {

DriverManager.getConnection(url, props)
}

def getTruncateQuery(table: String): String = s"TRUNCATE TABLE $table"

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Long],
txnIds.foreach(txnId =>
Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) {
dorisStreamLoad.commit(txnId)
} match {
case Success(_) =>
} () match {
case Success(_) => // do nothing
case Failure(_) => failedTxnIds += txnId
}
)
Expand All @@ -68,8 +68,8 @@ class DorisTransactionListener(preCommittedTxnAcc: CollectionAccumulator[Long],
txnIds.foreach(txnId =>
Utils.retry(sinkTxnRetries, Duration.ofMillis(sinkTnxIntervalMs), logger) {
dorisStreamLoad.abortById(txnId)
} match {
case Success(_) =>
} () match {
case Success(_) => // do nothing
case Failure(_) => failedTxnIds += txnId
})
if (failedTxnIds.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -98,6 +98,7 @@ private[sql] class DorisRelation(
}
data.write.format(DorisSourceProvider.SHORT_NAME)
.options(insertCfg)
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.save()
}
}
Loading

0 comments on commit 36952bf

Please sign in to comment.