From 869396bae945103e44dd592f781166ae634a9e2b Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 20 Jan 2025 14:21:30 +0800 Subject: [PATCH] [Feature] Support insert overwrite (#544) --- .../flink/table/DorisDynamicTableSink.java | 58 +++++++++++++++++-- .../doris/flink/sink/DorisSinkITCase.java | 56 ++++++++++++++++++ 2 files changed, 108 insertions(+), 6 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 17db7d2e1..f43d49bc7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -21,18 +21,23 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkV2Provider; +import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider; +import org.apache.doris.flink.exception.DorisSystemException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.Statement; import java.util.Arrays; import java.util.Objects; import java.util.Properties; @@ -46,13 +51,14 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; /** DorisDynamicTableSink. */ -public class DorisDynamicTableSink implements DynamicTableSink { +public class DorisDynamicTableSink implements DynamicTableSink, SupportsOverwrite { private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSink.class); private final DorisOptions options; private final DorisReadOptions readOptions; private final DorisExecutionOptions executionOptions; private final TableSchema tableSchema; private final Integer sinkParallelism; + private boolean overwrite = false; public DorisDynamicTableSink( DorisOptions options, @@ -115,13 +121,46 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setDorisReadOptions(readOptions) .setDorisExecutionOptions(executionOptions) .setSerializer(serializerBuilder.build()); - return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism); + DorisSink dorisSink = dorisSinkBuilder.build(); + + // for insert overwrite + if (overwrite) { + if (context.isBounded()) { + // execute jdbc query to truncate table + Preconditions.checkArgument( + options.getJdbcUrl() != null, "jdbc-url is required for Overwrite mode."); + // todo: should be written to a temporary table first, + // and then use GlobalCommitter to perform the rename. + truncateTable(); + } else { + throw new IllegalStateException("Streaming mode not support overwrite."); + } + } + return SinkV2Provider.of(dorisSink, sinkParallelism); + } + + private void truncateTable() { + String truncateQuery = "TRUNCATE TABLE " + options.getTableIdentifier(); + SimpleJdbcConnectionProvider jdbcConnectionProvider = + new SimpleJdbcConnectionProvider(options); + try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection(); + Statement statement = connection.createStatement()) { + LOG.info("Executing truncate query: {}", truncateQuery); + statement.execute(truncateQuery); + } catch (Exception e) { + LOG.error("Failed to execute truncate query: {}", truncateQuery, e); + throw new DorisSystemException( + String.format("Failed to execute truncate query: %s", truncateQuery), e); + } } @Override public DynamicTableSink copy() { - return new DorisDynamicTableSink( - options, readOptions, executionOptions, tableSchema, sinkParallelism); + DorisDynamicTableSink sink = + new DorisDynamicTableSink( + options, readOptions, executionOptions, tableSchema, sinkParallelism); + sink.overwrite = overwrite; + return sink; } @Override @@ -142,11 +181,18 @@ public boolean equals(Object o) { && Objects.equals(readOptions, that.readOptions) && Objects.equals(executionOptions, that.executionOptions) && Objects.equals(tableSchema, that.tableSchema) - && Objects.equals(sinkParallelism, that.sinkParallelism); + && Objects.equals(sinkParallelism, that.sinkParallelism) + && Objects.equals(overwrite, that.overwrite); } @Override public int hashCode() { - return Objects.hash(options, readOptions, executionOptions, tableSchema, sinkParallelism); + return Objects.hash( + options, readOptions, executionOptions, tableSchema, sinkParallelism, overwrite); + } + + @Override + public void applyOverwrite(boolean overwrite) { + this.overwrite = overwrite; } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 80986ea3c..dd74bb1cd 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.StringUtils; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static org.apache.flink.api.common.JobStatus.FINISHED; import static org.apache.flink.api.common.JobStatus.RUNNING; @@ -68,6 +70,7 @@ public class DorisSinkITCase extends AbstractITCaseService { static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl"; static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS"; static final String TABLE_GROUP_COMMIT = "tbl_group_commit"; + static final String TABLE_OVERWRITE = "tbl_overwrite"; static final String TABLE_GZ_FORMAT = "tbl_gz_format"; static final String TABLE_CSV_JM = "tbl_csv_jm"; static final String TABLE_CSV_TM = "tbl_csv_tm"; @@ -556,6 +559,59 @@ public void testTaskManagerFailoverSink() throws Exception { ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); } + @Test + public void testTableOverwrite() throws Exception { + initializeTable(TABLE_OVERWRITE); + // mock data + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format( + "INSERT INTO %s.%s values('history-data',12)", DATABASE, TABLE_OVERWRITE)); + + List expected_his = Arrays.asList("history-data,12"); + String query = + String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_OVERWRITE); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected_his, query, 2); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sinkDDL = + String.format( + "CREATE TABLE doris_overwrite_sink (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'jdbc-url' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.label-prefix' = '" + + UUID.randomUUID() + + "'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_OVERWRITE, + getDorisQueryUrl(), + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sinkDDL); + TableResult tableResult = + tEnv.executeSql( + "INSERT OVERWRITE doris_overwrite_sink SELECT 'doris',1 union all SELECT 'overwrite',2 union all SELECT 'flink',3"); + + tableResult.await(25000, TimeUnit.MILLISECONDS); + List expected = Arrays.asList("doris,1", "flink,3", "overwrite,2"); + ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2); + } + private void initializeTable(String table) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(),