Skip to content

Commit

Permalink
Add CLPV2_ZSTD and CLPV2_LZ4 raw forward index compression codecs. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jackluo923 authored Jan 13, 2025
1 parent 9259a85 commit b787ad4
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,10 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator {
private final ChunkCompressionType _chunkCompressionType;

/**
* Initializes a forward index creator for the given column using the provided base directory and column statistics.
* This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other immutable forward index
* constructors, this one handles the entire process of converting a mutable forward index into an immutable one.
* Initializes a forward index creator for the given column using the provided base directory, column statistics and
* chunk compressor type. This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other
* immutable forward index constructors, this one handles the entire process of converting a mutable forward index
* into an immutable one.
*
* <p>The {@code columnStatistics} object passed into this constructor should contain a reference to the mutable
* forward index ({@link CLPMutableForwardIndexV2}). The data from the mutable index is efficiently copied over
Expand All @@ -142,12 +143,26 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator {
* @param baseIndexDir The base directory where the forward index files will be stored.
* @param columnStatistics The column statistics containing the CLP forward index information, including a reference
* to the mutable forward index.
* @param chunkCompressionType The chunk compressor type used to compress internal data columns
* @throws IOException If there is an error during initialization or while accessing the file system.
*/
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics)
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics,
ChunkCompressionType chunkCompressionType)
throws IOException {
this(baseIndexDir, ((CLPStatsProvider) columnStatistics).getCLPV2Stats().getClpMutableForwardIndexV2(),
ChunkCompressionType.ZSTANDARD);
chunkCompressionType);
}

/**
* Same as above, except with chunk compressor set to ZStandard by default
* @param baseIndexDir The base directory where the forward index files will be stored.
* @param columnStatistics The column statistics containing the CLP forward index information, including a reference
* to the mutable forward index.
* @throws IOException If there is an error during initialization or while accessing the file system.
*/
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics)
throws IOException {
this(baseIndexDir, columnStatistics, ChunkCompressionType.ZSTANDARD);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,19 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
// Dictionary disabled columns
DataType storedType = fieldSpec.getDataType().getStoredType();
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) {
// CLP (V1) uses hard-coded chunk compressor which is set to `PassThrough`
return new CLPForwardIndexCreatorV1(indexDir, columnName, numTotalDocs, context.getColumnStatistics());
}
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2) {
// Use the default chunk compression codec for CLP, currently configured to use ZStandard
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics());
}
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_ZSTD) {
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.ZSTANDARD);
}
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_LZ4) {
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.LZ4);
}
ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType();
if (chunkCompressionType == null) {
chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ public MutableIndex createMutableIndex(MutableIndexContext context, ForwardIndex
// CLP (V1) always have clp encoding enabled whereas V2 is dynamic
clpMutableForwardIndex.forceClpEncoding();
return clpMutableForwardIndex;
} else if (config.getCompressionCodec() == CompressionCodec.CLPV2) {
} else if (config.getCompressionCodec() == CompressionCodec.CLPV2
|| config.getCompressionCodec() == CompressionCodec.CLPV2_ZSTD
|| config.getCompressionCodec() == CompressionCodec.CLPV2_LZ4) {
CLPMutableForwardIndexV2 clpMutableForwardIndex =
new CLPMutableForwardIndexV2(column, context.getMemoryManager());
return clpMutableForwardIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1207,10 +1207,12 @@ private static void validateFieldConfigList(TableConfig tableConfig, @Nullable S
switch (encodingType) {
case RAW:
Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex()
|| compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2,
|| compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2
|| compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4,
"Compression codec: %s is not applicable to raw index",
compressionCodec);
if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2)
if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2
|| compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4)
&& schema != null) {
Preconditions.checkArgument(
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == DataType.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ public void testCLPWriter()
Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD >= 0.19);
}

private long createStringRawForwardIndex(ChunkCompressionType compressionType, int maxLength)
private long createStringRawForwardIndex(ChunkCompressionType chunkCompressionType, int maxLength)
throws IOException {
// Create a raw string immutable forward index
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
SingleValueVarByteRawIndexCreator index =
new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, COLUMN_NAME, _logMessages.size(),
new SingleValueVarByteRawIndexCreator(TEMP_DIR, chunkCompressionType, COLUMN_NAME, _logMessages.size(),
FieldSpec.DataType.STRING, maxLength);
for (String logMessage : _logMessages) {
index.putString(logMessage);
Expand All @@ -132,9 +132,9 @@ private long createStringRawForwardIndex(ChunkCompressionType compressionType, i
}

private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
ChunkCompressionType compressionType)
ChunkCompressionType chunkCompressionType)
throws IOException {
long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, compressionType);
long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, chunkCompressionType);

// Read from immutable forward index and validate the content
File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
Expand All @@ -149,12 +149,12 @@ private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2
}

private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
ChunkCompressionType compressionType)
ChunkCompressionType chunkCompressionType)
throws IOException {
// Create a CLP immutable forward index from mutable forward index
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, compressionType);
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, chunkCompressionType);
for (int i = 0; i < _logMessages.size(); i++) {
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
case PASS_THROUGH:
case CLP:
case CLPV2:
case CLPV2_ZSTD:
case CLPV2_LZ4:
_chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
_dictIdCompressionType = null;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ public enum CompressionCodec {
// CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a special
// handling for log lines (see {@link CLPForwardIndexCreatorV1} and {@link CLPForwardIndexCreatorV2)
CLP(false, false),
CLPV2(false, false);
CLPV2(false, false),
CLPV2_ZSTD(false, false),
CLPV2_LZ4(false, false);

//@formatter:on

private final boolean _applicableToRawIndex;
Expand Down

0 comments on commit b787ad4

Please sign in to comment.