diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java index e4b0d0f9c..92a61a361 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/ArrowLogWriteBatch.java @@ -100,8 +100,8 @@ public boolean isClosed() { } @Override - public int sizeInBytes() { - return recordsBuilder.getSizeInBytes(); + public int estimatedSizeInBytes() { + return recordsBuilder.estimatedSizeInBytes(); } @Override diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/IndexedLogWriteBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/IndexedLogWriteBatch.java index d8ddcc189..15f33b545 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/IndexedLogWriteBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/IndexedLogWriteBatch.java @@ -124,7 +124,7 @@ public void resetWriterState(long writerId, int batchSequence) { } @Override - public int sizeInBytes() { + public int estimatedSizeInBytes() { return recordsBuilder.getSizeInBytes(); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/KvWriteBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/KvWriteBatch.java index bb9e0e2df..445a760e5 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/KvWriteBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/KvWriteBatch.java @@ -116,7 +116,7 @@ public boolean isClosed() { } @Override - public int sizeInBytes() { + public int estimatedSizeInBytes() { return recordsBuilder.getSizeInBytes(); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java index 1cecb40b0..380357c87 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java @@ -22,7 +22,6 @@ import com.alibaba.fluss.cluster.BucketLocation; import com.alibaba.fluss.cluster.Cluster; import com.alibaba.fluss.cluster.ServerNode; -import com.alibaba.fluss.compression.ArrowCompressionType; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.FlussRuntimeException; @@ -108,8 +107,6 @@ public final class RecordAccumulator { private final IdempotenceManager idempotenceManager; - private final ArrowCompressionType arrowCompressionType; - // TODO add retryBackoffMs to retry the produce request upon receiving an error. // TODO add deliveryTimeoutMs to report success or failure on record delivery. // TODO add nextBatchExpiryTimeMs @@ -129,8 +126,6 @@ public final class RecordAccumulator { this.batchSize = Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes()); - this.arrowCompressionType = conf.get(ConfigOptions.CLIENT_WRITER_ARROW_COMPRESSION_TYPE); - this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf); this.pagesPerBatch = Math.max(1, batchSize / writerBufferPool.pageSize()); this.bufferAllocator = new RootAllocator(Long.MAX_VALUE); @@ -509,7 +504,7 @@ private RecordAppendResult appendNewBatch( schemaId, outputView.getPreAllocatedSize(), tableInfo.getTableDescriptor().getSchema().toRowType(), - arrowCompressionType); + tableInfo.getTableDescriptor().getArrowCompressionInfo()); batch = new ArrowLogWriteBatch( tb, physicalTablePath, schemaId, arrowWriter, outputView); @@ -578,7 +573,7 @@ private List drainBatchesForOneNode(Cluster cluster, ServerNode node // TODO retry back off check. - if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) { + if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // there is a rare case that a single batch size is larger than the request size // due to compression; in this case we will still eventually send this batch in // a single request. @@ -626,7 +621,7 @@ private List drainBatchesForOneNode(Cluster cluster, ServerNode node // the rest of the work by processing outside the lock close() is particularly expensive Preconditions.checkNotNull(batch, "batch should not be null"); batch.close(); - size += batch.sizeInBytes(); + size += batch.estimatedSizeInBytes(); ready.add(batch); // mark the batch as drained. batch.drained(System.currentTimeMillis()); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java index b56de51f3..9f22a6f08 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/Sender.java @@ -214,11 +214,13 @@ private void sendWriteData() throws Exception { if (!batches.isEmpty()) { addToInflightBatches(batches); - updateWriterMetrics(batches); // TODO add logic for batch expire. sendWriteRequests(batches); + + // move metrics update to the end to make sure the batches has been built. + updateWriterMetrics(batches); } } @@ -527,10 +529,14 @@ private void updateWriterMetrics(Map> batches) { int recordCount = batch.getRecordCount(); writerMetricGroup.recordsSendTotal().inc(recordCount); writerMetricGroup.setBatchQueueTimeMs(batch.getQueueTimeMs()); - writerMetricGroup.bytesSendTotal().inc(batch.sizeInBytes()); + writerMetricGroup + .bytesSendTotal() + .inc(batch.estimatedSizeInBytes()); writerMetricGroup.recordPerBatch().update(recordCount); - writerMetricGroup.bytesPerBatch().update(batch.sizeInBytes()); + writerMetricGroup + .bytesPerBatch() + .update(batch.estimatedSizeInBytes()); } }); } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java index 7bd8e12f5..439a4cf2e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriteBatch.java @@ -88,11 +88,11 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac public abstract boolean isClosed(); /** - * get size in bytes. - * - * @return the size in bytes + * Get an estimate of the number of bytes written to the underlying buffer. The returned value + * is exactly correct if the record set is not compressed or if the batch has been {@link + * #build()}. */ - public abstract int sizeInBytes(); + public abstract int estimatedSizeInBytes(); /** * get pooled memory segments to de-allocate. After produceLog/PutKv acks, the {@link diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java index 5a4926bb7..b41dfce8d 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/DefaultCompletedFetchTest.java @@ -48,6 +48,7 @@ import java.util.Map; import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.toByteBuffer; +import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION; import static com.alibaba.fluss.record.TestData.DATA2; import static com.alibaba.fluss.record.TestData.DATA2_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA2_TABLE_ID; @@ -235,7 +236,7 @@ private MemoryLogRecords genRecordsWithProjection(List objects, Projec FileLogProjection fileLogProjection = new FileLogProjection(); fileLogProjection.setCurrentProjection( - DATA2_TABLE_ID, rowType, projection.getProjectionInOrder()); + DATA2_TABLE_ID, rowType, NO_COMPRESSION, projection.getProjectionInOrder()); ByteBuffer buffer = toByteBuffer( fileLogProjection diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 4324b2dbf..e4cd9ca9c 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -28,7 +28,6 @@ import com.alibaba.fluss.client.table.writer.TableWriter; import com.alibaba.fluss.client.table.writer.UpsertWrite; import com.alibaba.fluss.client.table.writer.UpsertWriter; -import com.alibaba.fluss.compression.ArrowCompressionType; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.MemorySize; @@ -54,19 +53,17 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import java.util.stream.Stream; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; @@ -937,9 +934,8 @@ void testFirstRowMergeEngine() throws Exception { } @ParameterizedTest - @MethodSource("arrowCompressionTypes") - void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType) - throws Exception { + @CsvSource({"none,3", "lz4_frame,3", "zstd,3", "zstd,9"}) + void testArrowCompressionAndProject(String compression, String level) throws Exception { Schema schema = Schema.newBuilder() .column("a", DataTypes.INT()) @@ -947,18 +943,21 @@ void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType) .column("c", DataTypes.STRING()) .column("d", DataTypes.BIGINT()) .build(); - TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build(); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(schema) + .property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), compression) + .property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL.key(), level) + .build(); TablePath tablePath = TablePath.of("test_db_1", "test_arrow_compression_and_project"); createTable(tablePath, tableDescriptor, false); - Configuration conf = new Configuration(clientConf); - conf.set(ConfigOptions.CLIENT_WRITER_ARROW_COMPRESSION_TYPE, arrowCompressionType); - try (Connection conn = ConnectionFactory.createConnection(conf); + try (Connection conn = ConnectionFactory.createConnection(clientConf); Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); int expectedSize = 30; for (int i = 0; i < expectedSize; i++) { - String value = i % 2 == 0 ? "hello, friend " + i : null; + String value = i % 2 == 0 ? "hello, friend " + i : null; InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L}); appendWriter.append(row); if (i % 10 == 0) { @@ -980,7 +979,7 @@ void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType) assertThat(scanRecord.getRow().getInt(1)).isEqualTo(100); if (count % 2 == 0) { assertThat(scanRecord.getRow().getString(2).toString()) - .isEqualTo("hello, friend " + count); + .isEqualTo("hello, friend " + count); } else { // check null values assertThat(scanRecord.getRow().isNullAt(2)).isTrue(); @@ -1003,7 +1002,7 @@ void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType) assertThat(scanRecord.getRow().getInt(0)).isEqualTo(count); if (count % 2 == 0) { assertThat(scanRecord.getRow().getString(1).toString()) - .isEqualTo("hello, friend " + count); + .isEqualTo("hello, friend " + count); } else { // check null values assertThat(scanRecord.getRow().isNullAt(1)).isTrue(); @@ -1015,8 +1014,4 @@ void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType) logScanner.close(); } } - - private static Stream arrowCompressionTypes() { - return Arrays.stream(ArrowCompressionType.values()); - } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/write/ArrowLogWriteBatchTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/write/ArrowLogWriteBatchTest.java index ed3ede965..ae74d53a9 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/write/ArrowLogWriteBatchTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/write/ArrowLogWriteBatchTest.java @@ -16,7 +16,7 @@ package com.alibaba.fluss.client.write; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.memory.MemorySegment; import com.alibaba.fluss.memory.PreAllocatedPagedOutputView; import com.alibaba.fluss.memory.TestingMemorySegmentPool; @@ -128,7 +128,7 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception { DATA1_TABLE_INFO.getSchemaId(), maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO), + ArrowCompressionInfo.NO_COMPRESSION), new PreAllocatedPagedOutputView(memorySegmentList)); assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList); @@ -180,7 +180,7 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI DATA1_TABLE_INFO.getSchemaId(), maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO), + ArrowCompressionInfo.NO_COMPRESSION), new UnmanagedPagedOutputView(128)); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/compression/FlussArrowCompressionFactory.java b/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionFactory.java similarity index 94% rename from fluss-common/src/main/java/com/alibaba/fluss/compression/FlussArrowCompressionFactory.java rename to fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionFactory.java index 8c37239cd..2314389cb 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/compression/FlussArrowCompressionFactory.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionFactory.java @@ -32,9 +32,9 @@ * CommonsCompressionFactory. */ @Internal -public class FlussArrowCompressionFactory implements CompressionCodec.Factory { +public class ArrowCompressionFactory implements CompressionCodec.Factory { - public static final FlussArrowCompressionFactory INSTANCE = new FlussArrowCompressionFactory(); + public static final ArrowCompressionFactory INSTANCE = new ArrowCompressionFactory(); @Override public CompressionCodec createCodec(CompressionUtil.CodecType codecType) { @@ -67,7 +67,7 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int com public static CompressionUtil.CodecType toArrowCompressionCodecType( ArrowCompressionType compressionType) { switch (compressionType) { - case NO: + case NONE: return CompressionUtil.CodecType.NO_COMPRESSION; case LZ4_FRAME: return CompressionUtil.CodecType.LZ4_FRAME; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionInfo.java b/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionInfo.java new file mode 100644 index 000000000..7f31a6e68 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionInfo.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.compression; + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionCodec; + +/** Compression information for Arrow record batches. */ +public class ArrowCompressionInfo { + + public static final ArrowCompressionInfo NO_COMPRESSION = + new ArrowCompressionInfo(ArrowCompressionType.NONE, -1); + + private final ArrowCompressionType compressionType; + private final int compressionLevel; + + public ArrowCompressionInfo(ArrowCompressionType compressionType, int compressionLevel) { + this.compressionType = compressionType; + this.compressionLevel = compressionLevel; + } + + public ArrowCompressionType getCompressionType() { + return compressionType; + } + + /** + * Get the compression level. If the compression level is not supported by the compression type, + * -1 is returned. + */ + public int getCompressionLevel() { + return compressionLevel; + } + + /** Create a Arrow compression codec based on the compression type and level. */ + public CompressionCodec createCompressionCodec() { + return ArrowCompressionFactory.INSTANCE.createCodec( + ArrowCompressionFactory.toArrowCompressionCodecType(compressionType), + compressionLevel); + } + + @Override + public String toString() { + return compressionLevel == -1 + ? compressionType.toString() + : compressionType + "-" + compressionLevel; + } + + public static ArrowCompressionInfo fromConf(Configuration conf) { + ArrowCompressionType compressionType = + conf.get(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE); + if (compressionType == ArrowCompressionType.ZSTD) { + int compressionLevel = conf.get(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL); + if (compressionLevel < 1 || compressionLevel > 22) { + throw new IllegalArgumentException( + "Invalid ZSTD compression level: " + + compressionLevel + + ". Expected a value between 1 and 22."); + } + return new ArrowCompressionInfo(compressionType, compressionLevel); + } else { + return new ArrowCompressionInfo(compressionType, -1); + } + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionType.java b/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionType.java index 1e7d4db51..54ebbf739 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionType.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/compression/ArrowCompressionType.java @@ -18,15 +18,16 @@ package com.alibaba.fluss.compression; import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.metadata.LogFormat; /** - * Supported compression types for ARROW format. + * Supported compression types for ARROW {@link LogFormat}. * * @since 0.6 */ @PublicEvolving public enum ArrowCompressionType { - NO, + NONE, LZ4_FRAME, - ZSTD; + ZSTD } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index e7dcd96c0..64de99626 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -735,15 +735,6 @@ public class ConfigOptions { + "requests per bucket exceeds this setting, the writer will wait for the inflight " + "requests to complete before sending out new requests. This setting defaults to 5"); - public static final ConfigOption CLIENT_WRITER_ARROW_COMPRESSION_TYPE = - key("client.writer.arrow-compression-type") - .enumType(ArrowCompressionType.class) - .defaultValue(ArrowCompressionType.NO) - .withDescription( - "The compression type for the input log batch when the log format set as ARROW. " - + "The candidate compression type is " - + Arrays.toString(ArrowCompressionType.values())); - public static final ConfigOption CLIENT_REQUEST_TIMEOUT = key("client.request-timeout") .durationType() @@ -907,6 +898,24 @@ public class ConfigOptions { "The format of the log records in log store. The default value is 'arrow'. " + "The supported formats are 'arrow' and 'indexed'."); + public static final ConfigOption TABLE_LOG_ARROW_COMPRESSION_TYPE = + key("table.log.arrow.compression.type") + .enumType(ArrowCompressionType.class) + // TODO: change to ZSTD by default when it is stable + .defaultValue(ArrowCompressionType.NONE) + .withDescription( + "The compression type of the log records if the log format is set to 'ARROW'. " + + "The candidate compression type is " + + Arrays.toString(ArrowCompressionType.values())); + + public static final ConfigOption TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL = + key("table.log.arrow.compression.zstd.level") + .intType() + .defaultValue(3) + .withDescription( + "The compression level of ZSTD for the log records if the log format is set to 'ARROW' " + + "and the compression type is set to 'ZSTD'. The valid range is 1 to 22."); + public static final ConfigOption TABLE_KV_FORMAT = key("table.kv.format") .enumType(KvFormat.class) @@ -1102,15 +1111,6 @@ public class ConfigOptions { "The max size of the consumed memory for RocksDB batch write, " + "will flush just based on item count if this config set to 0."); - public static final ConfigOption KV_CDC_ARROW_COMPRESSION_TYPE = - key("kv.cdc.arrow-compression-type") - .enumType(ArrowCompressionType.class) - .defaultValue(ArrowCompressionType.NO) - .withDescription( - "The compression type for the cdc log generated by kv table when the log format set as ARROW. " - + "The candidate compression type is " - + Arrays.toString(ArrowCompressionType.values())); - // -------------------------------------------------------------------------- // Provided configurable ColumnFamilyOptions within Fluss // -------------------------------------------------------------------------- @@ -1141,18 +1141,18 @@ public class ConfigOptions { + "For more information, please refer to %s https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true" + "RocksDB's doc."); - public static final ConfigOption> KV_COMPRESSION_PER_LEVEL = + public static final ConfigOption> KV_COMPRESSION_PER_LEVEL = key("kv.rocksdb.compression.per.level") - .enumType(CompressionType.class) + .enumType(KvCompressionType.class) .asList() .defaultValues( - CompressionType.LZ4, - CompressionType.LZ4, - CompressionType.LZ4, - CompressionType.LZ4, - CompressionType.LZ4, - CompressionType.ZSTD, - CompressionType.ZSTD) + KvCompressionType.LZ4, + KvCompressionType.LZ4, + KvCompressionType.LZ4, + KvCompressionType.LZ4, + KvCompressionType.LZ4, + KvCompressionType.ZSTD, + KvCompressionType.ZSTD) .withDescription( "A comma-separated list of Compression Type. Different levels can have different " + "compression policies. In many cases, lower levels use fast compression algorithms," @@ -1351,7 +1351,7 @@ public enum NoKeyAssigner { } /** Compression type for Fluss's kv. Currently only exposes the following compression type. */ - public enum CompressionType { + public enum KvCompressionType { NO, SNAPPY, LZ4, diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 751c0d621..161d89013 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.annotation.PublicStable; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.ConfigOption; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; @@ -144,6 +145,20 @@ && getLogFormat() != LogFormat.ARROW) { throw new IllegalArgumentException( "Merge-engine is only supported in primary key table."); } + + // TODO: generalize the validation for ConfigOption + if (properties.containsKey(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL.key())) { + int compressionLevel = + Integer.parseInt( + properties.get( + ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL.key())); + if (compressionLevel < 1 || compressionLevel > 22) { + throw new IllegalArgumentException( + "Invalid ZSTD compression level: " + + compressionLevel + + ". Expected a value between 1 and 22."); + } + } } /** Creates a builder for building table descriptor. */ @@ -289,6 +304,11 @@ public boolean isDataLakeEnabled() { return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE); } + /** Gets the Arrow compression type and compression level of the table. */ + public ArrowCompressionInfo getArrowCompressionInfo() { + return ArrowCompressionInfo.fromConf(configuration()); + } + public TableDescriptor copy(Map newProperties) { return new TableDescriptor( schema, comment, partitionKeys, tableDistribution, newProperties, customProperties); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java b/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java index 8c403510b..b16031088 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/FileLogProjection.java @@ -17,15 +17,16 @@ package com.alibaba.fluss.record; import com.alibaba.fluss.annotation.VisibleForTesting; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.exception.InvalidColumnProjectionException; import com.alibaba.fluss.record.bytesview.MultiBytesView; import com.alibaba.fluss.shaded.arrow.com.google.flatbuffers.FlatBufferBuilder; -import com.alibaba.fluss.shaded.arrow.org.apache.arrow.flatbuf.BodyCompression; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.flatbuf.Buffer; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.flatbuf.FieldNode; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.flatbuf.Message; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.flatbuf.RecordBatch; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.TypeLayout; +import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionUtil; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.ipc.WriteChannel; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.ArrowBodyCompression; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.ArrowBuffer; @@ -56,9 +57,9 @@ import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORDS_COUNT_OFFSET; import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; -import static com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.NoCompressionCodec.DEFAULT_BODY_COMPRESSION; import static com.alibaba.fluss.utils.FileUtils.readFullyOrFail; import static com.alibaba.fluss.utils.Preconditions.checkNotNull; +import static com.alibaba.fluss.utils.Preconditions.checkState; /** Column projection util on Arrow format {@link FileLogRecords}. */ public class FileLogProjection { @@ -90,7 +91,11 @@ public FileLogProjection() { this.arrowHeaderBuffer.order(ByteOrder.LITTLE_ENDIAN); } - public void setCurrentProjection(long tableId, RowType schema, int[] selectedFields) { + public void setCurrentProjection( + long tableId, + RowType schema, + ArrowCompressionInfo compressionInfo, + int[] selectedFields) { if (projectionsCache.containsKey(tableId)) { // the schema and projection should identical for the same table id. currentProjection = projectionsCache.get(tableId); @@ -127,9 +132,20 @@ public void setCurrentProjection(long tableId, RowType schema, int[] selectedFie bufferIndex += bufferLayoutCount[i]; } + Schema projectedArrowSchema = ArrowUtils.toArrowSchema(schema.project(selectedFields)); + ArrowBodyCompression bodyCompression = + CompressionUtil.createBodyCompression(compressionInfo.createCompressionCodec()); + int metadataLength = + ArrowUtils.estimateArrowMetadataLength(projectedArrowSchema, bodyCompression); currentProjection = new ProjectionInfo( - nodesProjection, buffersProjection, bufferIndex, schema, selectedFields); + nodesProjection, + buffersProjection, + bufferIndex, + schema, + metadataLength, + bodyCompression, + selectedFields); projectionsCache.put(tableId, currentProjection); } @@ -187,20 +203,25 @@ public BytesViewLogRecords project(FileChannel channel, int start, int end, int currentProjection.bufferCount); long arrowBodyLength = projectedArrowBatch.bodyLength(); - // 3. create new arrow batch metadata which already projected. - Tuple2 lengthAndHeaderMetadata = - serializeArrowRecordBatchMetadata(projectedArrowBatch, arrowBodyLength); int newBatchSizeInBytes = RECORD_BATCH_HEADER_SIZE + rowKindBytes - + lengthAndHeaderMetadata.f0 + + currentProjection.arrowMetadataLength + (int) arrowBodyLength; // safe to cast to int if (newBatchSizeInBytes > maxBytes) { // the remaining bytes in the file are not enough to read a full batch return new BytesViewLogRecords(builder.build()); } - byte[] headerMetadata = lengthAndHeaderMetadata.f1; + // 3. create new arrow batch metadata which already projected. + byte[] headerMetadata = + serializeArrowRecordBatchMetadata( + projectedArrowBatch, + arrowBodyLength, + currentProjection.bodyCompression); + checkState( + headerMetadata.length == currentProjection.arrowMetadataLength, + "Invalid metadata length"); // 4. update and copy log batch header logHeaderBuffer.position(LENGTH_OFFSET); @@ -252,18 +273,7 @@ private ProjectedArrowBatch projectArrowBatch( newOffset += paddedLength; } - // Get compression codec and method. See ArrowRecordBatch#writeTo(FlatBufferBuilder). - BodyCompression compression = recordBatch.compression(); - ArrowBodyCompression arrowBodyCompression; - if (compression != null) { - arrowBodyCompression = - new ArrowBodyCompression(compression.codec(), compression.method()); - } else { - arrowBodyCompression = DEFAULT_BODY_COMPRESSION; - } - - return new ProjectedArrowBatch( - numRecords, newNodes, newBufferLayouts, selectedBuffers, arrowBodyCompression); + return new ProjectedArrowBatch(numRecords, newNodes, newBufferLayouts, selectedBuffers); } /** @@ -273,18 +283,18 @@ private ProjectedArrowBatch projectArrowBatch( * @see MessageSerializer#serialize(WriteChannel, ArrowRecordBatch) * @see ArrowRecordBatch#writeTo(FlatBufferBuilder) */ - private Tuple2 serializeArrowRecordBatchMetadata( - ProjectedArrowBatch batch, long arrowBodyLength) throws IOException { + private byte[] serializeArrowRecordBatchMetadata( + ProjectedArrowBatch batch, long arrowBodyLength, ArrowBodyCompression bodyCompression) + throws IOException { outputStream.reset(); - int newMetadataLength = - ArrowUtils.serializeArrowRecordBatchMetadata( - writeChannel, - batch.numRecords, - batch.nodes, - batch.buffersLayout, - batch.arrowBodyCompression, - arrowBodyLength); - return Tuple2.of(newMetadataLength, outputStream.toByteArray()); + ArrowUtils.serializeArrowRecordBatchMetadata( + writeChannel, + batch.numRecords, + batch.nodes, + batch.buffersLayout, + bodyCompression, + arrowBodyLength); + return outputStream.toByteArray(); } private void resizeArrowMetadataBuffer(int metadataSize) { @@ -356,6 +366,8 @@ static final class ProjectionInfo { final BitSet buffersProjection; final int bufferCount; final RowType schema; + final int arrowMetadataLength; + final ArrowBodyCompression bodyCompression; final int[] selectedFields; private ProjectionInfo( @@ -363,11 +375,15 @@ private ProjectionInfo( BitSet buffersProjection, int bufferCount, RowType schema, + int arrowMetadataLength, + ArrowBodyCompression bodyCompression, int[] selectedFields) { this.nodesProjection = nodesProjection; this.buffersProjection = buffersProjection; this.bufferCount = bufferCount; this.schema = schema; + this.arrowMetadataLength = arrowMetadataLength; + this.bodyCompression = bodyCompression; this.selectedFields = selectedFields; } } @@ -386,20 +402,15 @@ public static final class ProjectedArrowBatch { /** The projected buffer positions of {@link ArrowRecordBatch#getBuffers()}. */ final List buffers; - /** The arrow body compression. */ - final ArrowBodyCompression arrowBodyCompression; - public ProjectedArrowBatch( long numRecords, List nodes, List buffersLayout, - List buffers, - ArrowBodyCompression arrowBodyCompression) { + List buffers) { this.numRecords = numRecords; this.nodes = nodes; this.buffersLayout = buffersLayout; this.buffers = buffers; - this.arrowBodyCompression = arrowBodyCompression; } public long bodyLength() { diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java index e89746ef5..4db3e96f3 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java @@ -42,7 +42,6 @@ /** Builder for {@link MemoryLogRecords} of log records in {@link LogFormat#ARROW} format. */ public class MemoryLogRecordsArrowBuilder implements AutoCloseable { - private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; private static final int BUILDER_DEFAULT_OFFSET = 0; private final long baseLogOffset; @@ -57,8 +56,7 @@ public class MemoryLogRecordsArrowBuilder implements AutoCloseable { private MultiBytesView bytesView = null; private long writerId; private int batchSequence; - private int sizeInBytes; - private int sizeInBytesAfterCompression; + private int estimatedSizeInBytes; private int recordCount; private boolean isClosed; private boolean reCalculateSizeInBytes = false; @@ -90,8 +88,7 @@ private MemoryLogRecordsArrowBuilder( + ARROW_ROWKIND_OFFSET + " bytes."); this.rowKindWriter = new RowKindVectorWriter(firstSegment, ARROW_ROWKIND_OFFSET); - this.sizeInBytes = ARROW_ROWKIND_OFFSET; - this.sizeInBytesAfterCompression = sizeInBytes; + this.estimatedSizeInBytes = ARROW_ROWKIND_OFFSET; this.recordCount = 0; } @@ -118,17 +115,16 @@ public MultiBytesView build() throws IOException { } // serialize the arrow batch to dynamically allocated memory segments - int serializeSizeInBytes = - arrowWriter.serializeToOutputView( - pagedOutputView, ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes()); - sizeInBytesAfterCompression += serializeSizeInBytes; - arrowWriter.recycle(writerEpoch); - - writeBatchHeader(); + arrowWriter.serializeToOutputView( + pagedOutputView, ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes()); + recordCount = arrowWriter.getRecordsCount(); bytesView = MultiBytesView.builder() .addMemorySegmentByteViewList(pagedOutputView.getWrittenSegments()) .build(); + arrowWriter.recycle(writerEpoch); + + writeBatchHeader(); return bytesView; } @@ -182,8 +178,6 @@ public void close() throws Exception { return; } - // update sizeInBytes and recordCount if needed - getSizeInBytes(); isClosed = true; } @@ -191,19 +185,21 @@ public void recycleArrowWriter() { arrowWriter.recycle(writerEpoch); } - public int getSizeInBytes() { + public int estimatedSizeInBytes() { + if (bytesView != null) { + // accurate total size in bytes + return bytesView.getBytesLength(); + } + if (reCalculateSizeInBytes) { // make size in bytes up-to-date - sizeInBytes = + // TODO: consider the compression ratio + estimatedSizeInBytes = ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes() + arrowWriter.sizeInBytes(); - // Before serialize to channel, the sizeInBytesAfterCompression only contains batch - // header size and rowKind vector size. - sizeInBytesAfterCompression = ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes(); - recordCount = arrowWriter.getRecordsCount(); } reCalculateSizeInBytes = false; - return sizeInBytes; + return estimatedSizeInBytes; } // ----------------------- internal methods ------------------------------- @@ -214,7 +210,7 @@ private void writeBatchHeader() throws IOException { outputView.setPosition(0); // update header. outputView.writeLong(baseLogOffset); - outputView.writeInt(sizeInBytesAfterCompression - BASE_OFFSET_LENGTH - LENGTH_LENGTH); + outputView.writeInt(bytesView.getBytesLength() - BASE_OFFSET_LENGTH - LENGTH_LENGTH); outputView.writeByte(magic); // write empty timestamp which will be overridden on server side diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriter.java b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriter.java index b63b6593a..ddc495ccc 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriter.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriter.java @@ -17,8 +17,7 @@ package com.alibaba.fluss.row.arrow; import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.compression.ArrowCompressionType; -import com.alibaba.fluss.compression.FlussArrowCompressionFactory; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.memory.AbstractPagedOutputView; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.arrow.writers.ArrowFieldWriter; @@ -29,6 +28,7 @@ import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.VectorUnloader; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionCodec; +import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionUtil; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.ipc.WriteChannel; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.ArrowBlock; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.ArrowRecordBatch; @@ -102,18 +102,16 @@ public class ArrowWriter implements AutoCloseable { RowType schema, BufferAllocator allocator, ArrowWriterProvider provider, - ArrowCompressionType arrowCompressionType) { + ArrowCompressionInfo compressionInfo) { this.writerKey = writerKey; this.schema = schema; this.root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(schema), allocator); this.provider = Preconditions.checkNotNull(provider); + this.compressionCodec = compressionInfo.createCompressionCodec(); - this.compressionCodec = - FlussArrowCompressionFactory.INSTANCE.createCodec( - FlussArrowCompressionFactory.toArrowCompressionCodecType( - arrowCompressionType)); - - this.metadataLength = ArrowUtils.estimateArrowMetadataLength(root.getSchema()); + this.metadataLength = + ArrowUtils.estimateArrowMetadataLength( + root.getSchema(), CompressionUtil.createBodyCompression(compressionCodec)); this.writeLimitInBytes = (int) (bufferSizeInBytes * BUFFER_USAGE_RATIO); this.estimatedMaxRecordsCount = -1; this.recordsCount = 0; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java index f086f9890..a280a3085 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java @@ -18,7 +18,7 @@ import com.alibaba.fluss.annotation.Internal; import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; @@ -82,8 +82,8 @@ public ArrowWriter getOrCreateWriter( int schemaId, int bufferSizeInBytes, RowType schema, - ArrowCompressionType arrowCompressionType) { - final String writerKey = tableId + "-" + schemaId + "-" + arrowCompressionType; + ArrowCompressionInfo compressionInfo) { + final String writerKey = tableId + "-" + schemaId + "-" + compressionInfo.toString(); return inLock( lock, () -> { @@ -102,7 +102,7 @@ public ArrowWriter getOrCreateWriter( schema, allocator, this, - arrowCompressionType), + compressionInfo), bufferSizeInBytes); } }); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterProvider.java b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterProvider.java index bb3263a6a..0bfd373b4 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterProvider.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterProvider.java @@ -17,7 +17,7 @@ package com.alibaba.fluss.row.arrow; import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.types.RowType; /** The provider used for requesting and releasing {@link ArrowWriter}. */ @@ -28,7 +28,7 @@ ArrowWriter getOrCreateWriter( int schemaId, int bufferSizeInBytes, RowType schema, - ArrowCompressionType arrowCompressionType); + ArrowCompressionInfo compressionInfo); void recycleWriter(ArrowWriter arrowWriter); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrowUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrowUtils.java index 7d4976c94..8ac688cc8 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrowUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/ArrowUtils.java @@ -17,7 +17,7 @@ package com.alibaba.fluss.utils; import com.alibaba.fluss.annotation.Internal; -import com.alibaba.fluss.compression.FlussArrowCompressionFactory; +import com.alibaba.fluss.compression.ArrowCompressionFactory; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.memory.MemorySegment; import com.alibaba.fluss.row.InternalRow; @@ -127,7 +127,6 @@ import java.util.List; import java.util.stream.Collectors; -import static com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.NoCompressionCodec.DEFAULT_BODY_COMPRESSION; import static com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeRecordBatch; import static com.alibaba.fluss.utils.Preconditions.checkArgument; @@ -159,7 +158,7 @@ public static ArrowReader createArrowReader( new ReadChannel(new ByteBufferReadableChannel(arrowBatchBuffer)); ArrowRecordBatch batch = deserializeRecordBatch(channel, allocator)) { VectorLoader vectorLoader = - new VectorLoader(schemaRoot, FlussArrowCompressionFactory.INSTANCE); + new VectorLoader(schemaRoot, ArrowCompressionFactory.INSTANCE); vectorLoader.load(batch); List columnVectors = new ArrayList<>(); List fieldVectors = schemaRoot.getFieldVectors(); @@ -221,7 +220,8 @@ public static int serializeArrowRecordBatchMetadata( } /** Estimates the size of {@link ArrowRecordBatch} metadata for the given schema. */ - public static int estimateArrowMetadataLength(Schema arrowSchema) { + public static int estimateArrowMetadataLength( + Schema arrowSchema, ArrowBodyCompression bodyCompression) { List fields = flattenFields(arrowSchema.getFields()); List nodes = createFieldNodes(fields); List buffersLayout = createBuffersLayout(fields); @@ -230,7 +230,7 @@ public static int estimateArrowMetadataLength(Schema arrowSchema) { WriteChannel writeChannel = new WriteChannel(Channels.newChannel(out)); try { return ArrowUtils.serializeArrowRecordBatchMetadata( - writeChannel, 1L, nodes, buffersLayout, DEFAULT_BODY_COMPRESSION, 8L); + writeChannel, 1L, nodes, buffersLayout, bodyCompression, 8L); } catch (IOException e) { throw new FlussRuntimeException("Failed to estimate Arrow metadata size", e); } diff --git a/fluss-common/src/test/java/com/alibaba/fluss/compression/ArrowCompressionCodecTest.java b/fluss-common/src/test/java/com/alibaba/fluss/compression/ArrowCompressionCodecTest.java index 5240f9a42..4027244a1 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/compression/ArrowCompressionCodecTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/compression/ArrowCompressionCodecTest.java @@ -193,7 +193,7 @@ void testUnloadCompressed(CompressionUtil.CodecType codec) { CompressionCodec.Factory factory = codec == CompressionUtil.CodecType.NO_COMPRESSION ? NoCompressionCodec.Factory.INSTANCE - : FlussArrowCompressionFactory.INSTANCE; + : ArrowCompressionFactory.INSTANCE; try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { final IntVector ints = (IntVector) root.getVector(0); final VarCharVector strings = (VarCharVector) root.getVector(1); diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java index 34e8bea86..3af224cfd 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/FileLogProjectionTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.stream.Stream; +import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION; import static com.alibaba.fluss.record.LogRecordReadContext.createArrowReadContext; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset; @@ -50,7 +51,8 @@ class FileLogProjectionTest { @Test void testSetCurrentProjection() { FileLogProjection projection = new FileLogProjection(); - projection.setCurrentProjection(1L, TestData.DATA2_ROW_TYPE, new int[] {0, 2}); + projection.setCurrentProjection( + 1L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {0, 2}); FileLogProjection.ProjectionInfo info1 = projection.currentProjection; assertThat(info1).isNotNull(); assertThat(info1.nodesProjection.stream().toArray()).isEqualTo(new int[] {0, 2}); @@ -59,7 +61,7 @@ void testSetCurrentProjection() { assertThat(projection.projectionsCache).hasSize(1); assertThat(projection.projectionsCache.get(1L)).isSameAs(info1); - projection.setCurrentProjection(2L, TestData.DATA2_ROW_TYPE, new int[] {1}); + projection.setCurrentProjection(2L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {1}); FileLogProjection.ProjectionInfo info2 = projection.currentProjection; assertThat(info2).isNotNull(); assertThat(info2.nodesProjection.stream().toArray()).isEqualTo(new int[] {1}); @@ -68,13 +70,14 @@ void testSetCurrentProjection() { assertThat(projection.projectionsCache).hasSize(2); assertThat(projection.projectionsCache.get(2L)).isSameAs(info2); - projection.setCurrentProjection(1L, TestData.DATA2_ROW_TYPE, new int[] {0, 2}); + projection.setCurrentProjection( + 1L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {0, 2}); assertThat(projection.currentProjection).isNotNull().isSameAs(info1); assertThatThrownBy( () -> projection.setCurrentProjection( - 1L, TestData.DATA1_ROW_TYPE, new int[] {1})) + 1L, TestData.DATA1_ROW_TYPE, NO_COMPRESSION, new int[] {1})) .isInstanceOf(InvalidColumnProjectionException.class) .hasMessage("The schema and projection should be identical for the same table id."); } @@ -86,21 +89,27 @@ void testIllegalSetCurrentProjection() { assertThatThrownBy( () -> projection.setCurrentProjection( - 1L, TestData.DATA2_ROW_TYPE, new int[] {3})) + 1L, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {3})) .isInstanceOf(InvalidColumnProjectionException.class) .hasMessage("Projected fields [3] is out of bound for schema with 3 fields."); assertThatThrownBy( () -> projection.setCurrentProjection( - 1L, TestData.DATA2_ROW_TYPE, new int[] {1, 0})) + 1L, + TestData.DATA2_ROW_TYPE, + NO_COMPRESSION, + new int[] {1, 0})) .isInstanceOf(InvalidColumnProjectionException.class) .hasMessage("The projection indexes should be in field order, but is [1, 0]"); assertThatThrownBy( () -> projection.setCurrentProjection( - 1L, TestData.DATA2_ROW_TYPE, new int[] {0, 0, 0})) + 1L, + TestData.DATA2_ROW_TYPE, + NO_COMPRESSION, + new int[] {0, 0, 0})) .isInstanceOf(InvalidColumnProjectionException.class) .hasMessage( "The projection indexes should not contain duplicated fields, but is [0, 0, 0]"); @@ -225,7 +234,7 @@ private List doProjection( int[] projectedFields, int fetchMaxBytes) throws Exception { - projection.setCurrentProjection(1L, rowType, projectedFields); + projection.setCurrentProjection(1L, rowType, NO_COMPRESSION, projectedFields); RowType projectedType = rowType.project(projectedFields); LogRecords project = projection.project( diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java index d9731c4eb..1fab5036b 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.record; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.compression.ArrowCompressionType; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; @@ -30,6 +31,7 @@ import com.alibaba.fluss.utils.CloseableIterator; import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,7 +82,7 @@ void testAppendWithEmptyRecord() throws Exception { DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO); + ArrowCompressionInfo.NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(0, writer, 10, 100); assertThat(builder.isFull()).isFalse(); @@ -106,7 +108,7 @@ void testAppend() throws Exception { DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO); + ArrowCompressionInfo.NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024); List rowKinds = @@ -139,22 +141,22 @@ void testAppend() throws Exception { } @ParameterizedTest - @MethodSource("compressionTypes") - void testCompression(ArrowCompressionType compressionType) throws Exception { + @MethodSource("compressionInfos") + void testCompression(ArrowCompressionInfo compressionInfo) throws Exception { int maxSizeInBytes = 1024; // create a compression-able data set. List dataSet = Arrays.asList( - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}, - new Object[] {1, " "}); + new Object[] {1, StringUtils.repeat("a", 10)}, + new Object[] {2, StringUtils.repeat("b", 11)}, + new Object[] {3, StringUtils.repeat("c", 12)}, + new Object[] {4, StringUtils.repeat("d", 13)}, + new Object[] {5, StringUtils.repeat("e", 14)}, + new Object[] {6, StringUtils.repeat("f", 15)}, + new Object[] {7, StringUtils.repeat("g", 16)}, + new Object[] {8, StringUtils.repeat("h", 17)}, + new Object[] {9, StringUtils.repeat("i", 18)}, + new Object[] {10, StringUtils.repeat("j", 19)}); // first create an un-compression batch. ArrowWriter writer1 = @@ -163,31 +165,28 @@ void testCompression(ArrowCompressionType compressionType) throws Exception { DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO); + ArrowCompressionInfo.NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = - createMemoryLogRecordsArrowBuilder(writer1, 10, 1024); + createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024); for (Object[] data : dataSet) { builder.append(RowKind.APPEND_ONLY, row(DATA1_ROW_TYPE, data)); } builder.close(); - MemoryLogRecords records1 = - MemoryLogRecords.pointToByteBuffer(builder.build().getByteBuf().nioBuffer()); + MemoryLogRecords records1 = MemoryLogRecords.pointToBytesView(builder.build()); int sizeInBytes1 = records1.sizeInBytes(); assertLogRecordsEquals(DATA1_ROW_TYPE, records1, dataSet); // second create a compression batch. ArrowWriter writer2 = provider.getOrCreateWriter( - 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, compressionType); + 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, compressionInfo); MemoryLogRecordsArrowBuilder builder2 = - createMemoryLogRecordsArrowBuilder(writer2, 10, 1024); + createMemoryLogRecordsArrowBuilder(0, writer2, 10, 1024); for (Object[] data : dataSet) { builder2.append(RowKind.APPEND_ONLY, row(DATA1_ROW_TYPE, data)); } builder2.close(); - MemoryLogRecords records2 = - MemoryLogRecords.pointToByteBuffer(builder2.build().getByteBuf().nioBuffer()); - + MemoryLogRecords records2 = MemoryLogRecords.pointToBytesView(builder2.build()); int sizeInBytes2 = records2.sizeInBytes(); assertLogRecordsEquals(DATA1_ROW_TYPE, records2, dataSet); @@ -206,7 +205,7 @@ void testIllegalArgument() { DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO)) { + ArrowCompressionInfo.NO_COMPRESSION)) { createMemoryLogRecordsArrowBuilder(0, writer, 10, 30); } }) @@ -220,7 +219,11 @@ void testClose() throws Exception { int maxSizeInBytes = 1024; ArrowWriter writer = provider.getOrCreateWriter( - 1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE, ArrowCompressionType.NO); + 1L, + DEFAULT_SCHEMA_ID, + 1024, + DATA1_ROW_TYPE, + ArrowCompressionInfo.NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024); List rowKinds = @@ -235,14 +238,14 @@ void testClose() throws Exception { } assertThat(builder.isFull()).isTrue(); - String tableSchemaId = 1L + "-" + 1 + "-" + "NO"; + String tableSchemaId = 1L + "-" + 1 + "-" + "NONE"; assertThat(provider.freeWriters().size()).isEqualTo(0); - int sizeInBytesBeforeClose = builder.getSizeInBytes(); + int sizeInBytesBeforeClose = builder.estimatedSizeInBytes(); builder.close(); builder.setWriterState(1L, 0); builder.build(); assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(1); - int sizeInBytes = builder.getSizeInBytes(); + int sizeInBytes = builder.estimatedSizeInBytes(); assertThat(sizeInBytes).isEqualTo(sizeInBytesBeforeClose); // get writer again, writer will be initial. ArrowWriter writer1 = @@ -251,11 +254,11 @@ void testClose() throws Exception { DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO); + ArrowCompressionInfo.NO_COMPRESSION); assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(0); // Even if the writer has re-initialized, the sizeInBytes should be the same. - assertThat(builder.getSizeInBytes()).isEqualTo(sizeInBytes); + assertThat(builder.estimatedSizeInBytes()).isEqualTo(sizeInBytes); writer.close(); writer1.close(); @@ -265,7 +268,12 @@ void testClose() throws Exception { void testNoRecordAppend() throws Exception { // 1. no record append with base offset as 0. ArrowWriter writer = - provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE); + provider.getOrCreateWriter( + 1L, + DEFAULT_SCHEMA_ID, + 1024 * 10, + DATA1_ROW_TYPE, + ArrowCompressionInfo.NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10); MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build()); @@ -290,7 +298,12 @@ void testNoRecordAppend() throws Exception { // 2. no record append with base offset as 0. ArrowWriter writer2 = - provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE); + provider.getOrCreateWriter( + 1L, + DEFAULT_SCHEMA_ID, + 1024 * 10, + DATA1_ROW_TYPE, + ArrowCompressionInfo.NO_COMPRESSION); builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 10); memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build()); // only contains batch header. @@ -313,8 +326,11 @@ void testNoRecordAppend() throws Exception { } } - private static List compressionTypes() { - return Arrays.asList(ArrowCompressionType.LZ4_FRAME, ArrowCompressionType.ZSTD); + private static List compressionInfos() { + return Arrays.asList( + new ArrowCompressionInfo(ArrowCompressionType.LZ4_FRAME, -1), + new ArrowCompressionInfo(ArrowCompressionType.ZSTD, 3), + new ArrowCompressionInfo(ArrowCompressionType.ZSTD, 9)); } private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder( diff --git a/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java b/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java index 683d39de0..a1283b7ca 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowReaderWriterTest.java @@ -16,7 +16,7 @@ package com.alibaba.fluss.row.arrow; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.memory.AbstractPagedOutputView; import com.alibaba.fluss.memory.ManagedPagedOutputView; import com.alibaba.fluss.memory.MemorySegment; @@ -136,7 +136,11 @@ void testReaderWriter() throws IOException { ArrowWriterPool provider = new ArrowWriterPool(allocator); ArrowWriter writer = provider.getOrCreateWriter( - 1L, 1, Integer.MAX_VALUE, rowType, ArrowCompressionType.NO)) { + 1L, + 1, + Integer.MAX_VALUE, + rowType, + ArrowCompressionInfo.NO_COMPRESSION)) { for (InternalRow row : TEST_DATA) { writer.writeRow(row); } @@ -169,7 +173,7 @@ void testWriterExceedMaxSizeInBytes() { ArrowWriterPool provider = new ArrowWriterPool(allocator); ArrowWriter writer = provider.getOrCreateWriter( - 1L, 1, 1024, DATA1_ROW_TYPE, ArrowCompressionType.NO)) { + 1L, 1, 1024, DATA1_ROW_TYPE, ArrowCompressionInfo.NO_COMPRESSION)) { while (!writer.isFull()) { writer.writeRow(row(DATA1_ROW_TYPE, DATA1.get(0))); } diff --git a/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowWriterPoolTest.java b/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowWriterPoolTest.java index 537b1b588..cd0bf8af8 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowWriterPoolTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/row/arrow/ArrowWriterPoolTest.java @@ -16,7 +16,7 @@ package com.alibaba.fluss.row.arrow; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; @@ -51,37 +51,37 @@ void testWriterMap() { Map> freeWritersMap = arrowWriterPool.freeWriters(); ArrowWriter writer1 = arrowWriterPool.getOrCreateWriter( - 1L, 1, 1024, DATA1_ROW_TYPE, ArrowCompressionType.NO); + 1L, 1, 1024, DATA1_ROW_TYPE, ArrowCompressionInfo.NO_COMPRESSION); assertThat(writer1.getWriteLimitInBytes()).isEqualTo((int) (1024 * BUFFER_USAGE_RATIO)); assertThat(freeWritersMap.isEmpty()).isTrue(); long epoch = writer1.getEpoch(); writer1.recycle(epoch); assertThat(freeWritersMap.size()).isEqualTo(1); - assertThat(freeWritersMap.get("1-1-NO")).hasSize(1); + assertThat(freeWritersMap.get("1-1-NONE")).hasSize(1); assertThat(writer1.getEpoch()).isEqualTo(epoch + 1); // recycle the same epoch again, doesn't add it to pool writer1.recycle(epoch); assertThat(freeWritersMap.size()).isEqualTo(1); - assertThat(freeWritersMap.get("1-1-NO")).hasSize(1); + assertThat(freeWritersMap.get("1-1-NONE")).hasSize(1); ArrowWriter writer2 = arrowWriterPool.getOrCreateWriter( - 1L, 2, 10, DATA1_ROW_TYPE, ArrowCompressionType.NO); + 1L, 2, 10, DATA1_ROW_TYPE, ArrowCompressionInfo.NO_COMPRESSION); assertThat(freeWritersMap.size()).isEqualTo(1); writer2.recycle(writer2.getEpoch()); assertThat(freeWritersMap.size()).isEqualTo(2); // test key1: "tableId_schemaId" - Deque arrowWriters = freeWritersMap.get("1-1-NO"); + Deque arrowWriters = freeWritersMap.get("1-1-NONE"); assertThat(arrowWriters.size()).isEqualTo(1); writer1 = arrowWriterPool.getOrCreateWriter( - 1L, 1, 1000, DATA1_ROW_TYPE, ArrowCompressionType.NO); + 1L, 1, 1000, DATA1_ROW_TYPE, ArrowCompressionInfo.NO_COMPRESSION); assertThat(arrowWriters.size()).isEqualTo(0); assertThat(writer1.getWriteLimitInBytes()).isEqualTo((int) (1000 * BUFFER_USAGE_RATIO)); ArrowWriter writer3WithKey1 = arrowWriterPool.getOrCreateWriter( - 1L, 1, 100, DATA1_ROW_TYPE, ArrowCompressionType.NO); + 1L, 1, 100, DATA1_ROW_TYPE, ArrowCompressionInfo.NO_COMPRESSION); writer3WithKey1.recycle(writer3WithKey1.getEpoch()); writer1.recycle(writer1.getEpoch()); assertThat(arrowWriters.size()).isEqualTo(2); diff --git a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java index 534eae1a6..4908b7b9b 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java @@ -16,7 +16,7 @@ package com.alibaba.fluss.testutils; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.memory.ManagedPagedOutputView; @@ -456,7 +456,11 @@ private static MemoryLogRecords createArrowMemoryLogRecords( ArrowWriterPool provider = new ArrowWriterPool(allocator)) { ArrowWriter writer = provider.getOrCreateWriter( - 1L, schemaId, Integer.MAX_VALUE, rowType, ArrowCompressionType.NO); + 1L, + schemaId, + Integer.MAX_VALUE, + rowType, + ArrowCompressionInfo.NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = MemoryLogRecordsArrowBuilder.builder( baseLogOffset, diff --git a/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrowUtilsTest.java b/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrowUtilsTest.java index 6ca67c802..7767515ff 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrowUtilsTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/utils/ArrowUtilsTest.java @@ -55,6 +55,7 @@ import java.util.Arrays; import java.util.List; +import static com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.NoCompressionCodec.DEFAULT_BODY_COMPRESSION; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link com.alibaba.fluss.utils.ArrowUtils}. */ @@ -83,7 +84,7 @@ void testEstimateArrowMetadataSizeInBytes() throws IOException { DataTypes.FIELD("f2", DataTypes.STRING()), DataTypes.FIELD("f3", DataTypes.DOUBLE())); Schema schema = ArrowUtils.toArrowSchema(rowType); - int metadataSize = ArrowUtils.estimateArrowMetadataLength(schema); + int metadataSize = ArrowUtils.estimateArrowMetadataLength(schema, DEFAULT_BODY_COMPRESSION); ByteArrayOutputStream out = new ByteArrayOutputStream(); try (BufferAllocator allocator = new RootAllocator(); @@ -122,7 +123,7 @@ void testRandomEstimateArrowMetadataSizeInBytes() throws IOException { } RowType rowType = new RowType(fields); Schema schema = ArrowUtils.toArrowSchema(rowType); - int metadataSize = ArrowUtils.estimateArrowMetadataLength(schema); + int metadataSize = ArrowUtils.estimateArrowMetadataLength(schema, DEFAULT_BODY_COMPRESSION); ByteArrayOutputStream out = new ByteArrayOutputStream(); int rowCount = RandomUtils.nextInt(1, 1000); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java index 16fbe0bb6..dd9e0fb78 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java @@ -165,6 +165,20 @@ void testCreateUnSupportedTable() { .cause() .isInstanceOf(InvalidTableException.class) .hasMessageContaining("Currently, partitioned table must enable auto partition"); + + // test invalid property + assertThatThrownBy( + () -> + tEnv.executeSql( + "create table test_arrow_compression" + + " (a int, b int) with (" + + " 'table.log.format' = 'arrow'," + + " 'table.log.arrow.compression.type' = 'zstd'," + + " 'table.log.arrow.compression.zstd.level' = '0')")) + .cause() + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Invalid ZSTD compression level: 0. Expected a value between 1 and 22."); } @Test diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index 29a61ddbd..86ae8b527 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -125,11 +125,18 @@ void after() { tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)); } - @Test - void testAppendLog() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testAppendLog(boolean compressed) throws Exception { + String compressedProperties = + compressed + ? ",'table.log.format' = 'arrow', 'table.log.arrow.compression.type' = 'zstd'" + : ""; tEnv.executeSql( "create table sink_test (a int not null, b bigint, c string) with " - + "('bucket.num' = '3')"); + + "('bucket.num' = '3'" + + compressedProperties + + ")"); tEnv.executeSql( "INSERT INTO sink_test(a, b, c) " + "VALUES (1, 3501, 'Tim'), " diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index f12cd425c..414adc7ff 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -65,6 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.UncheckedIOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -141,7 +142,18 @@ public CompletableFuture createTable(CreateTableRequest req return FutureUtils.failedFuture(e); } - TableDescriptor tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson()); + TableDescriptor tableDescriptor = null; + try { + tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson()); + } catch (Exception e) { + if (e instanceof UncheckedIOException) { + throw new InvalidTableException( + "Failed to parse table descriptor: " + e.getMessage()); + } else { + // wrap the validate message to InvalidTableException + throw new InvalidTableException(e.getMessage()); + } + } int bucketCount = defaultBucketNumber; // not set distribution diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java index 571ffc298..a295c27d0 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.kv; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.KvStorageException; @@ -150,7 +151,8 @@ public KvTablet getOrCreateKv( TableBucket tableBucket, LogTablet logTablet, KvFormat kvFormat, - @Nullable MergeEngine mergeEngine) + @Nullable MergeEngine mergeEngine, + ArrowCompressionInfo arrowCompressionInfo) throws Exception { return inLock( tabletCreationOrDeletionLock, @@ -169,7 +171,8 @@ public KvTablet getOrCreateKv( arrowBufferAllocator, memorySegmentPool, kvFormat, - mergeEngine); + mergeEngine, + arrowCompressionInfo); currentKvs.put(tableBucket, tablet); LOG.info( @@ -271,7 +274,8 @@ public KvTablet loadKv(File tabletDir) throws Exception { arrowBufferAllocator, memorySegmentPool, tableDescriptor.getKvFormat(), - tableDescriptor.getMergeEngine()); + tableDescriptor.getMergeEngine(), + tableDescriptor.getArrowCompressionInfo()); if (this.currentKvs.containsKey(tableBucket)) { throw new IllegalStateException( String.format( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index be848951f..e01023049 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -17,7 +17,7 @@ package com.alibaba.fluss.server.kv; import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.KvStorageException; @@ -104,7 +104,7 @@ public final class KvTablet { private final LogFormat logFormat; private final KvFormat kvFormat; private final @Nullable MergeEngine mergeEngine; - private final ArrowCompressionType arrowCompressionType; + private final ArrowCompressionInfo arrowCompressionInfo; /** * The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been @@ -127,7 +127,7 @@ private KvTablet( MemorySegmentPool memorySegmentPool, KvFormat kvFormat, @Nullable MergeEngine mergeEngine, - ArrowCompressionType arrowCompressionType) { + ArrowCompressionInfo arrowCompressionInfo) { this.physicalPath = physicalPath; this.tableBucket = tableBucket; this.logTablet = logTablet; @@ -142,7 +142,7 @@ private KvTablet( this.partialUpdaterCache = new PartialUpdaterCache(); this.kvFormat = kvFormat; this.mergeEngine = mergeEngine; - this.arrowCompressionType = arrowCompressionType; + this.arrowCompressionInfo = arrowCompressionInfo; } public static KvTablet create( @@ -152,7 +152,8 @@ public static KvTablet create( BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, KvFormat kvFormat, - @Nullable MergeEngine mergeEngine) + @Nullable MergeEngine mergeEngine, + ArrowCompressionInfo arrowCompressionInfo) throws IOException { Tuple2 tablePathAndBucket = FlussPaths.parseTabletDir(kvTabletDir); @@ -165,7 +166,8 @@ public static KvTablet create( arrowBufferAllocator, memorySegmentPool, kvFormat, - mergeEngine); + mergeEngine, + arrowCompressionInfo); } public static KvTablet create( @@ -177,7 +179,8 @@ public static KvTablet create( BufferAllocator arrowBufferAllocator, MemorySegmentPool memorySegmentPool, KvFormat kvFormat, - @Nullable MergeEngine mergeEngine) + @Nullable MergeEngine mergeEngine, + ArrowCompressionInfo arrowCompressionInfo) throws IOException { RocksDBKv kv = buildRocksDBKv(conf, kvTabletDir); return new KvTablet( @@ -192,7 +195,7 @@ public static KvTablet create( memorySegmentPool, kvFormat, mergeEngine, - conf.get(ConfigOptions.KV_CDC_ARROW_COMPRESSION_TYPE)); + arrowCompressionInfo); } private static RocksDBKv buildRocksDBKv(Configuration configuration, File kvDir) @@ -389,7 +392,7 @@ private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Except // changelogs should be in a single batch Integer.MAX_VALUE, rowType, - arrowCompressionType), + arrowCompressionInfo), memorySegmentPool); default: throw new IllegalArgumentException("Unsupported log format: " + logFormat); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainer.java index c99f1d84d..22b7d7b4e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainer.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainer.java @@ -294,16 +294,16 @@ File getInstanceRocksDBPath() { } private List toRocksDbCompressionTypes( - List compressionTypes) { + List compressionTypes) { List rocksdbCompressionTypes = new ArrayList<>(); - for (ConfigOptions.CompressionType compressionType : compressionTypes) { + for (ConfigOptions.KvCompressionType compressionType : compressionTypes) { rocksdbCompressionTypes.add(toRocksDbCompressionType(compressionType)); } return rocksdbCompressionTypes; } private CompressionType toRocksDbCompressionType( - ConfigOptions.CompressionType compressionType) { + ConfigOptions.KvCompressionType compressionType) { switch (compressionType) { case NO: return CompressionType.NO_COMPRESSION; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/FetchParams.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/FetchParams.java index da57cf452..5b082c283 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/FetchParams.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/FetchParams.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.server.log; import com.alibaba.fluss.annotation.VisibleForTesting; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.record.FileLogProjection; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.types.RowType; @@ -96,6 +97,7 @@ public void setCurrentFetch( long fetchOffset, int maxFetchBytes, RowType schema, + ArrowCompressionInfo compressionInfo, @Nullable int[] projectedFields) { this.fetchOffset = fetchOffset; this.maxFetchBytes = maxFetchBytes; @@ -104,7 +106,8 @@ public void setCurrentFetch( if (fileLogProjection == null) { fileLogProjection = new FileLogProjection(); } - fileLogProjection.setCurrentProjection(tableId, schema, projectedFields); + fileLogProjection.setCurrentProjection( + tableId, schema, compressionInfo, projectedFields); } else { projectionEnabled = false; } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 6edd70028..b1f05aff3 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.server.replica; import com.alibaba.fluss.annotation.VisibleForTesting; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.exception.FencedLeaderEpochException; import com.alibaba.fluss.exception.InvalidColumnProjectionException; @@ -168,6 +169,7 @@ public final class Replica { private final List partitionKeys; private final Schema schema; private final LogFormat logFormat; + private final ArrowCompressionInfo arrowCompressionInfo; private final KvFormat kvFormat; private final @Nullable MergeEngine mergeEngine; private final long logTTLMs; @@ -233,6 +235,7 @@ public Replica( this.bucketMetricGroup = bucketMetricGroup; this.schema = tableDescriptor.getSchema(); this.logFormat = tableDescriptor.getLogFormat(); + this.arrowCompressionInfo = tableDescriptor.getArrowCompressionInfo(); this.kvFormat = tableDescriptor.getKvFormat(); this.logTTLMs = tableDescriptor.getLogTTLMs(); this.dataLakeEnabled = tableDescriptor.isDataLakeEnabled(); @@ -271,6 +274,10 @@ public RowType getRowType() { return schema.toRowType(); } + public ArrowCompressionInfo getArrowCompressionInfo() { + return arrowCompressionInfo; + } + public int getLeaderEpoch() { return leaderEpoch; } @@ -611,7 +618,12 @@ private Optional initKvTablet() { // if it exists before init kv tablet kvTablet = kvManager.getOrCreateKv( - physicalPath, tableBucket, logTablet, kvFormat, mergeEngine); + physicalPath, + tableBucket, + logTablet, + kvFormat, + mergeEngine, + arrowCompressionInfo); } logTablet.updateMinRetainOffset(restoreStartOffset); recoverKvTablet(restoreStartOffset); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java index 4cd8330d6..d07776a03 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java @@ -983,6 +983,7 @@ public Map readFromLog( fetchOffset, adjustedMaxBytes, replica.getRowType(), + replica.getArrowCompressionInfo(), fetchData.getProjectFields()); LogReadInfo readInfo = replica.fetchRecords(fetchParams); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java index 3a266fd61..872f28a4b 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.kv; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.metadata.KvFormat; @@ -260,7 +261,12 @@ private KvTablet getOrCreateKv( LogTablet logTablet = logManager.getOrCreateLog(physicalTablePath, tableBucket, LogFormat.ARROW, 1, true); return kvManager.getOrCreateKv( - physicalTablePath, tableBucket, logTablet, KvFormat.COMPACTED, null); + physicalTablePath, + tableBucket, + logTablet, + KvFormat.COMPACTED, + null, + ArrowCompressionInfo.NO_COMPRESSION); } private byte[] valueOf(KvRecord kvRecord) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 89674aaa3..31fce67bd 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.kv; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidTargetColumnException; import com.alibaba.fluss.exception.OutOfOrderSequenceException; @@ -146,7 +147,8 @@ private KvTablet createKvTablet( new RootAllocator(Long.MAX_VALUE), new TestingMemorySegmentPool(10 * 1024), KvFormat.COMPACTED, - mergeEngine); + mergeEngine, + ArrowCompressionInfo.NO_COMPRESSION); } @Test diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java index f5e9111ff..1021db926 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainerTest.java @@ -152,9 +152,9 @@ void testConfigurationOptionsFromConfig() throws Exception { configuration.set( ConfigOptions.KV_COMPRESSION_PER_LEVEL, Arrays.asList( - ConfigOptions.CompressionType.NO, - ConfigOptions.CompressionType.LZ4, - ConfigOptions.CompressionType.ZSTD)); + ConfigOptions.KvCompressionType.NO, + ConfigOptions.KvCompressionType.LZ4, + ConfigOptions.KvCompressionType.ZSTD)); try (RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(configuration, null, true)) { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilderTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilderTest.java index 466d25fa4..d7b9abaa3 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilderTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilderTest.java @@ -16,7 +16,7 @@ package com.alibaba.fluss.server.kv.wal; -import com.alibaba.fluss.compression.ArrowCompressionType; +import com.alibaba.fluss.compression.ArrowCompressionInfo; import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.MemorySize; @@ -159,7 +159,7 @@ private WalBuilder createWalBuilder( DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, - ArrowCompressionType.NO), + ArrowCompressionInfo.NO_COMPRESSION), memorySegmentPool); } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/log/FetchParamsTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/log/FetchParamsTest.java index 454d7dd78..ac1172382 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/FetchParamsTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/FetchParamsTest.java @@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test; +import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** Tests for {@link com.alibaba.fluss.server.log.FetchParams}. */ @@ -29,22 +30,24 @@ class FetchParamsTest { @Test void testSetCurrentFetch() { FetchParams fetchParams = new FetchParams(1, 100); - fetchParams.setCurrentFetch(1L, 20L, 1024, TestData.DATA1_ROW_TYPE, null); + fetchParams.setCurrentFetch(1L, 20L, 1024, TestData.DATA1_ROW_TYPE, NO_COMPRESSION, null); assertThat(fetchParams.fetchOffset()).isEqualTo(20L); assertThat(fetchParams.maxFetchBytes()).isEqualTo(1024); assertThat(fetchParams.projection()).isNull(); - fetchParams.setCurrentFetch(2L, 30L, 512, TestData.DATA2_ROW_TYPE, new int[] {0, 2}); + fetchParams.setCurrentFetch( + 2L, 30L, 512, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {0, 2}); assertThat(fetchParams.fetchOffset()).isEqualTo(30L); assertThat(fetchParams.maxFetchBytes()).isEqualTo(512); assertThat(fetchParams.projection()).isNotNull(); FileLogProjection prevProjection = fetchParams.projection(); - fetchParams.setCurrentFetch(1L, 40L, 256, TestData.DATA1_ROW_TYPE, null); + fetchParams.setCurrentFetch(1L, 40L, 256, TestData.DATA1_ROW_TYPE, NO_COMPRESSION, null); assertThat(fetchParams.projection()).isNull(); - fetchParams.setCurrentFetch(2L, 30L, 512, TestData.DATA2_ROW_TYPE, new int[] {0, 2}); + fetchParams.setCurrentFetch( + 2L, 30L, 512, TestData.DATA2_ROW_TYPE, NO_COMPRESSION, new int[] {0, 2}); // the FileLogProjection should be cached assertThat(fetchParams.projection()).isNotNull().isSameAs(prevProjection); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java index 521a9796e..146901728 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTest.java @@ -53,6 +53,7 @@ import java.util.Iterator; import java.util.List; +import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION; import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID; import static com.alibaba.fluss.record.TestData.DATA1; @@ -120,7 +121,8 @@ void testAppendRecordsToLeader() throws Exception { (int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES) .getBytes()); - fetchParams.setCurrentFetch(DATA1_TABLE_ID, 0, Integer.MAX_VALUE, DATA1_ROW_TYPE, null); + fetchParams.setCurrentFetch( + DATA1_TABLE_ID, 0, Integer.MAX_VALUE, DATA1_ROW_TYPE, NO_COMPRESSION, null); LogReadInfo logReadInfo = logReplica.fetchRecords(fetchParams); assertLogRecordsEquals(DATA1_ROW_TYPE, logReadInfo.getFetchedData().getRecords(), DATA1); } @@ -522,6 +524,7 @@ private static LogRecords fetchRecords(Replica replica, long offset) throws IOEx offset, Integer.MAX_VALUE, replica.getRowType(), + NO_COMPRESSION, null); LogReadInfo logReadInfo = replica.fetchRecords(fetchParams); return logReadInfo.getFetchedData().getRecords(); diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 823d0c502..731dad08a 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -144,7 +144,6 @@ during the Fluss cluster working. | kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0 | Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0. | | kv.rocksdb.bloom-filter.block-based-mode | Boolean | false | If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'. | | kv.recover.log-record-batch.max-size | MemorySize | 16mb | The max fetch size for fetching log to apply to kv during recovering kv. | -| kv.cdc.arrow-compression-type | Enum | NO | The compression type for the cdc log generated by kv table when the log format set as ARROW. The candidate compression type is NO, LZ4_FRAMO, ZSTD | ### Metrics @@ -175,6 +174,8 @@ during the Fluss cluster working. | table.auto_partitioning.time-zone | String | the system time zone | The time zone for auto partitions, which is by default the same as the system time zone. | | table.replication.factor | Integer | (None) | The replication factor for the log of the new table. When it's not set, Fluss will use the cluster's default replication factor configured by default.replication.factor. It should be a positive number and not larger than the number of tablet servers in the Fluss cluster. A value larger than the number of tablet servers in Fluss cluster will result in an error when the new table is created. | | table.log.format | Enum | ARROW | The format of the log records in log store. The default value is 'ARROW'. The supported formats are 'ARROW' and 'INDEXED'. | +| table.log.arrow.compression.type | Enum | NONE | The compression type of the log records if the log format is set to 'ARROW'. The candidate compression type is 'NONE', 'LZ4_FRAME', 'ZSTD'. The default value is 'NONE'. | +| table.log.arrow.compression.zstd.level | Integer | 3 | The compression level of the log records if the log format is set to 'ARROW' and the compression type is set to 'ZSTD'. The valid range is 1 to 22. The default value is 3. | | table.kv.format | Enum | COMPACTED | The format of the kv records in kv store. The default value is 'COMPACTED'. The supported formats are 'COMPACTED' and 'INDEXED'. | | table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | | table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | @@ -217,7 +218,6 @@ Currently, we don't support alter table configuration by Flink. This will be sup | client.writer.retries | Integer | Integer.MAX_VALUE | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. | | client.writer.enable-idempotence | Boolean | true | Writer idempotence is enabled by default if no conflicting config are set. If conflicting config are set and writer idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting config are set, a ConfigException is thrown | | client.writer.max-inflight-requests-per-bucket | Integer | 5 | The maximum number of unacknowledged requests per bucket for writer. This configuration can work only if 'client.writer.enable-idempotence' is set to true. When the number of inflight requests per bucket exceeds this setting, the writer will wait for the inflight requests to complete before sending out new requests. This setting defaults to 5 | -| client.writer.arrow-compression-type | Enum | NO | The compression type for the input log batch when the log format set as ARROW. The candidate compression type is NO, LZ4_FRAME, ZSTD. | | client.request-timeout | Duration | 30s | The timeout for a request to complete. If user set the write ack to -1, this timeout is the max time that delayed write try to complete. The default setting is 30 seconds. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | | client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. |