diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java index 2a762d481def..539acd26b115 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/CLPForwardIndexCreatorV2.java @@ -129,9 +129,10 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator { private final ChunkCompressionType _chunkCompressionType; /** - * Initializes a forward index creator for the given column using the provided base directory and column statistics. - * This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other immutable forward index - * constructors, this one handles the entire process of converting a mutable forward index into an immutable one. + * Initializes a forward index creator for the given column using the provided base directory, column statistics and + * chunk compressor type. This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other + * immutable forward index constructors, this one handles the entire process of converting a mutable forward index + * into an immutable one. * *

The {@code columnStatistics} object passed into this constructor should contain a reference to the mutable * forward index ({@link CLPMutableForwardIndexV2}). The data from the mutable index is efficiently copied over @@ -142,12 +143,26 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator { * @param baseIndexDir The base directory where the forward index files will be stored. * @param columnStatistics The column statistics containing the CLP forward index information, including a reference * to the mutable forward index. + * @param chunkCompressionType The chunk compressor type used to compress internal data columns * @throws IOException If there is an error during initialization or while accessing the file system. */ - public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics) + public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics, + ChunkCompressionType chunkCompressionType) throws IOException { this(baseIndexDir, ((CLPStatsProvider) columnStatistics).getCLPV2Stats().getClpMutableForwardIndexV2(), - ChunkCompressionType.ZSTANDARD); + chunkCompressionType); + } + + /** + * Same as above, except with chunk compressor set to ZStandard by default + * @param baseIndexDir The base directory where the forward index files will be stored. + * @param columnStatistics The column statistics containing the CLP forward index information, including a reference + * to the mutable forward index. + * @throws IOException If there is an error during initialization or while accessing the file system. + */ + public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics) + throws IOException { + this(baseIndexDir, columnStatistics, ChunkCompressionType.ZSTANDARD); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java index 87cb7262225f..6084c77b4eeb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java @@ -73,11 +73,19 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex // Dictionary disabled columns DataType storedType = fieldSpec.getDataType().getStoredType(); if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) { + // CLP (V1) uses hard-coded chunk compressor which is set to `PassThrough` return new CLPForwardIndexCreatorV1(indexDir, columnName, numTotalDocs, context.getColumnStatistics()); } if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2) { + // Use the default chunk compression codec for CLP, currently configured to use ZStandard return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics()); } + if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_ZSTD) { + return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.ZSTANDARD); + } + if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_LZ4) { + return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.LZ4); + } ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType(); if (chunkCompressionType == null) { chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java index c9b49bbc3692..c23dac3f916b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java @@ -256,7 +256,9 @@ public MutableIndex createMutableIndex(MutableIndexContext context, ForwardIndex // CLP (V1) always have clp encoding enabled whereas V2 is dynamic clpMutableForwardIndex.forceClpEncoding(); return clpMutableForwardIndex; - } else if (config.getCompressionCodec() == CompressionCodec.CLPV2) { + } else if (config.getCompressionCodec() == CompressionCodec.CLPV2 + || config.getCompressionCodec() == CompressionCodec.CLPV2_ZSTD + || config.getCompressionCodec() == CompressionCodec.CLPV2_LZ4) { CLPMutableForwardIndexV2 clpMutableForwardIndex = new CLPMutableForwardIndexV2(column, context.getMemoryManager()); return clpMutableForwardIndex; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index ad792016c35a..ddab35608529 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -1207,10 +1207,12 @@ private static void validateFieldConfigList(TableConfig tableConfig, @Nullable S switch (encodingType) { case RAW: Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex() - || compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2, + || compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2 + || compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4, "Compression codec: %s is not applicable to raw index", compressionCodec); - if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2) + if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2 + || compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4) && schema != null) { Preconditions.checkArgument( schema.getFieldSpecFor(columnName).getDataType().getStoredType() == DataType.STRING, diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java index 32732e4cad80..65152152e455 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/CLPForwardIndexCreatorV2Test.java @@ -114,12 +114,12 @@ public void testCLPWriter() Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD >= 0.19); } - private long createStringRawForwardIndex(ChunkCompressionType compressionType, int maxLength) + private long createStringRawForwardIndex(ChunkCompressionType chunkCompressionType, int maxLength) throws IOException { // Create a raw string immutable forward index TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); SingleValueVarByteRawIndexCreator index = - new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, COLUMN_NAME, _logMessages.size(), + new SingleValueVarByteRawIndexCreator(TEMP_DIR, chunkCompressionType, COLUMN_NAME, _logMessages.size(), FieldSpec.DataType.STRING, maxLength); for (String logMessage : _logMessages) { index.putString(logMessage); @@ -132,9 +132,9 @@ private long createStringRawForwardIndex(ChunkCompressionType compressionType, i } private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2, - ChunkCompressionType compressionType) + ChunkCompressionType chunkCompressionType) throws IOException { - long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, compressionType); + long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, chunkCompressionType); // Read from immutable forward index and validate the content File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); @@ -149,12 +149,12 @@ private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 } private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2, - ChunkCompressionType compressionType) + ChunkCompressionType chunkCompressionType) throws IOException { // Create a CLP immutable forward index from mutable forward index TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 = - new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, compressionType); + new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, chunkCompressionType); for (int i = 0; i < _logMessages.size(); i++) { clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i)); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java index fe2cfbbd2e7a..b2a794ac2ab9 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java @@ -116,6 +116,8 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec case PASS_THROUGH: case CLP: case CLPV2: + case CLPV2_ZSTD: + case CLPV2_LZ4: _chunkCompressionType = ChunkCompressionType.PASS_THROUGH; _dictIdCompressionType = null; break; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java index 3a5eaf775aa1..cf02527deb35 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java @@ -144,7 +144,10 @@ public enum CompressionCodec { // CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a special // handling for log lines (see {@link CLPForwardIndexCreatorV1} and {@link CLPForwardIndexCreatorV2) CLP(false, false), - CLPV2(false, false); + CLPV2(false, false), + CLPV2_ZSTD(false, false), + CLPV2_LZ4(false, false); + //@formatter:on private final boolean _applicableToRawIndex;