Skip to content

Commit

Permalink
[Feature] Support insert overwrite (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Jan 20, 2025
1 parent 067eaf1 commit 869396b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
import org.apache.doris.flink.exception.DorisSystemException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
Expand All @@ -46,13 +51,14 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;

/** DorisDynamicTableSink. */
public class DorisDynamicTableSink implements DynamicTableSink {
public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite {
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSink.class);
private final DorisOptions options;
private final DorisReadOptions readOptions;
private final DorisExecutionOptions executionOptions;
private final TableSchema tableSchema;
private final Integer sinkParallelism;
private boolean overwrite = false;

public DorisDynamicTableSink(
DorisOptions options,
Expand Down Expand Up @@ -115,13 +121,46 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism);
DorisSink<RowData> dorisSink = dorisSinkBuilder.build();

// for insert overwrite
if (overwrite) {
if (context.isBounded()) {
// execute jdbc query to truncate table
Preconditions.checkArgument(
options.getJdbcUrl() != null, "jdbc-url is required for Overwrite mode.");
// todo: should be written to a temporary table first,
// and then use GlobalCommitter to perform the rename.
truncateTable();
} else {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
}
return SinkV2Provider.of(dorisSink, sinkParallelism);
}

private void truncateTable() {
String truncateQuery = "TRUNCATE TABLE " + options.getTableIdentifier();
SimpleJdbcConnectionProvider jdbcConnectionProvider =
new SimpleJdbcConnectionProvider(options);
try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
Statement statement = connection.createStatement()) {
LOG.info("Executing truncate query: {}", truncateQuery);
statement.execute(truncateQuery);
} catch (Exception e) {
LOG.error("Failed to execute truncate query: {}", truncateQuery, e);
throw new DorisSystemException(
String.format("Failed to execute truncate query: %s", truncateQuery), e);
}
}

@Override
public DynamicTableSink copy() {
return new DorisDynamicTableSink(
options, readOptions, executionOptions, tableSchema, sinkParallelism);
DorisDynamicTableSink sink =
new DorisDynamicTableSink(
options, readOptions, executionOptions, tableSchema, sinkParallelism);
sink.overwrite = overwrite;
return sink;
}

@Override
Expand All @@ -142,11 +181,18 @@ public boolean equals(Object o) {
&& Objects.equals(readOptions, that.readOptions)
&& Objects.equals(executionOptions, that.executionOptions)
&& Objects.equals(tableSchema, that.tableSchema)
&& Objects.equals(sinkParallelism, that.sinkParallelism);
&& Objects.equals(sinkParallelism, that.sinkParallelism)
&& Objects.equals(overwrite, that.overwrite);
}

@Override
public int hashCode() {
return Objects.hash(options, readOptions, executionOptions, tableSchema, sinkParallelism);
return Objects.hash(
options, readOptions, executionOptions, tableSchema, sinkParallelism, overwrite);
}

@Override
public void applyOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.api.common.JobStatus.FINISHED;
import static org.apache.flink.api.common.JobStatus.RUNNING;
Expand All @@ -68,6 +70,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
static final String TABLE_OVERWRITE = "tbl_overwrite";
static final String TABLE_GZ_FORMAT = "tbl_gz_format";
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";
Expand Down Expand Up @@ -556,6 +559,59 @@ public void testTaskManagerFailoverSink() throws Exception {
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}

@Test
public void testTableOverwrite() throws Exception {
initializeTable(TABLE_OVERWRITE);
// mock data
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
String.format(
"INSERT INTO %s.%s values('history-data',12)", DATABASE, TABLE_OVERWRITE));

List<String> expected_his = Arrays.asList("history-data,12");
String query =
String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_OVERWRITE);
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected_his, query, 2);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sinkDDL =
String.format(
"CREATE TABLE doris_overwrite_sink ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'jdbc-url' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'sink.label-prefix' = '"
+ UUID.randomUUID()
+ "'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_OVERWRITE,
getDorisQueryUrl(),
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sinkDDL);
TableResult tableResult =
tEnv.executeSql(
"INSERT OVERWRITE doris_overwrite_sink SELECT 'doris',1 union all SELECT 'overwrite',2 union all SELECT 'flink',3");

tableResult.await(25000, TimeUnit.MILLISECONDS);
List<String> expected = Arrays.asList("doris,1", "flink,3", "overwrite,2");
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
}

private void initializeTable(String table) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
Expand Down

0 comments on commit 869396b

Please sign in to comment.