Skip to content

Commit

Permalink
Allow setting ForwardIndexConfig default settings via cluster config
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 8, 2025
1 parent 82bf63c commit 37253e9
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_TIMEZONE;


public class ServiceStartableUtils {
private ServiceStartableUtils() {
Expand All @@ -44,7 +43,10 @@ private ServiceStartableUtils() {
protected static String _timeZone;

/**
* Applies the ZK cluster config to the given instance config if it does not already exist.
* Applies the ZK cluster config to:
* - The given instance config if it does not already exist.
* - Set the timezone.
* - Initialize the default values in {@link ForwardIndexConfig}.
*
* In the ZK cluster config:
* - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
Expand All @@ -70,7 +72,8 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
if (clusterConfigZNRecord == null) {
LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
setupTimezone(instanceConfig);
setTimezone(instanceConfig);
initForwardIndexConfig(instanceConfig);
return;
}

Expand All @@ -92,7 +95,8 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
} finally {
zkClient.close();
}
setupTimezone(instanceConfig);
setTimezone(instanceConfig);
initForwardIndexConfig(instanceConfig);
}

private static void addConfigIfNotExists(PinotConfiguration instanceConfig, String key, String value) {
Expand All @@ -101,10 +105,31 @@ private static void addConfigIfNotExists(PinotConfiguration instanceConfig, Stri
}
}

private static void setupTimezone(PinotConfiguration instanceConfig) {
private static void setTimezone(PinotConfiguration instanceConfig) {
TimeZone localTimezone = TimeZone.getDefault();
_timeZone = instanceConfig.getProperty(CONFIG_OF_TIMEZONE, localTimezone.getID());
_timeZone = instanceConfig.getProperty(CommonConstants.CONFIG_OF_TIMEZONE, localTimezone.getID());
System.setProperty("user.timezone", _timeZone);
LOGGER.info("Timezone: {}", _timeZone);
}

private static void initForwardIndexConfig(PinotConfiguration instanceConfig) {
String defaultRawIndexWriterVersion =
instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_RAW_INDEX_WRITER_VERSION);
if (defaultRawIndexWriterVersion != null) {
LOGGER.info("Setting forward index default raw index writer version to: {}", defaultRawIndexWriterVersion);
ForwardIndexConfig.setDefaultRawIndexWriterVersion(Integer.parseInt(defaultRawIndexWriterVersion));
}
String defaultTargetMaxChunkSize =
instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_MAX_CHUNK_SIZE);
if (defaultTargetMaxChunkSize != null) {
LOGGER.info("Setting forward index default target max chunk size to: {}", defaultTargetMaxChunkSize);
ForwardIndexConfig.setDefaultTargetMaxChunkSize(defaultTargetMaxChunkSize);
}
String defaultTargetDocsPerChunk =
instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_DOCS_PER_CHUNK);
if (defaultTargetDocsPerChunk != null) {
LOGGER.info("Setting forward index default target docs per chunk to: {}", defaultTargetDocsPerChunk);
ForwardIndexConfig.setDefaultTargetDocsPerChunk(Integer.parseInt(defaultTargetDocsPerChunk));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType c
DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
this(indexFile, compressionType, totalDocs, valueType, maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
writerVersion, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
writerVersion, ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int maxRowLengthInBytes, int maxNumberOfElements)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
maxRowLengthInBytes, maxNumberOfElements, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.getDefaultRawWriterVersion(),
maxRowLengthInBytes, maxNumberOfElements, ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class SingleValueFixedByteRawIndexCreator implements ForwardIndexCreator
public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.getDefaultRawWriterVersion(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
int totalDocs, DataType valueType, int maxLength)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
ForwardIndexConfig.getDefaultRawWriterVersion(), ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Class<ForwardIndexConfig> getIndexConfigClass() {

@Override
public ForwardIndexConfig getDefaultConfig() {
return ForwardIndexConfig.DEFAULT;
return ForwardIndexConfig.getDefault();
}

@Override
Expand All @@ -112,7 +112,7 @@ public ColumnConfigDeserializer<ForwardIndexConfig> createDeserializer() {
fwdConfig.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
} else {
ForwardIndexConfig config = createConfigFromFieldConfig(fieldConfig);
if (!config.equals(ForwardIndexConfig.DEFAULT)) {
if (!config.equals(ForwardIndexConfig.getDefault())) {
fwdConfig.put(fieldConfig.getName(), config);
}
// It is important to do not explicitly add the default value here in order to avoid exclusive problems with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;


public class ForwardIndexTypeTest {
Expand Down Expand Up @@ -92,7 +93,7 @@ public void oldConfNotFound()
JsonUtils.stringToObject("[]", _fieldConfigListTypeRef)
);

assertEquals(ForwardIndexConfig.DEFAULT);
assertEquals(ForwardIndexConfig.getDefault());
}

@Test
Expand All @@ -108,7 +109,7 @@ public void oldConfDisabled()
+ " }]", _fieldConfigListTypeRef)
);

assertEquals(ForwardIndexConfig.DISABLED);
assertTrue(getActualConfig("dimInt", StandardIndexes.forward()).isDisabled());
}

@Test
Expand All @@ -120,7 +121,7 @@ public void oldConfEnableDefault()
+ " }"
);

assertEquals(ForwardIndexConfig.DEFAULT);
assertEquals(ForwardIndexConfig.getDefault());
}

@Test
Expand Down Expand Up @@ -177,7 +178,7 @@ public void oldConfEnableDict()
+ " \"encodingType\": \"DICTIONARY\"\n"
+ " }"
);
assertEquals(ForwardIndexConfig.DEFAULT);
assertEquals(ForwardIndexConfig.getDefault());
}

@Test
Expand All @@ -204,7 +205,7 @@ public void oldConfEnableRawDefault()
+ " }"
);

assertEquals(ForwardIndexConfig.DEFAULT);
assertEquals(ForwardIndexConfig.getDefault());
}

@Test(dataProvider = "allCompressionCodec", dataProviderClass = ForwardIndexTypeTest.class)
Expand All @@ -227,7 +228,7 @@ public void oldConfEnableRawWithCompression(String compression,
.withCompressionType(expectedChunkCompression)
.withDictIdCompressionType(expectedDictCompression)
.withDeriveNumDocsPerChunk(false)
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.withRawIndexWriterVersion(ForwardIndexConfig.getDefaultRawWriterVersion())
.build()
);
}
Expand All @@ -248,7 +249,7 @@ public void oldConfEnableRawWithDeriveNumDocs()
assertEquals(new ForwardIndexConfig.Builder()
.withCompressionType(null)
.withDeriveNumDocsPerChunk(true)
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.withRawIndexWriterVersion(ForwardIndexConfig.getDefaultRawWriterVersion())
.build());
}

Expand Down Expand Up @@ -284,7 +285,8 @@ public void newConfigDisabled()
+ " }\n"
+ " }"
);
assertEquals(ForwardIndexConfig.DISABLED);

assertTrue(getActualConfig("dimInt", StandardIndexes.forward()).isDisabled());
}

@Test
Expand All @@ -297,7 +299,7 @@ public void newConfigDefault()
+ " }"
);

assertEquals(ForwardIndexConfig.DEFAULT);
assertEquals(ForwardIndexConfig.getDefault());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ public void testCalculateForwardIndexConfig()
assertTrue(forwardIndexConfig.isEnabled());
assertNull(forwardIndexConfig.getCompressionCodec());
assertFalse(forwardIndexConfig.isDeriveNumDocsPerChunk());
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE);
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultRawWriterVersion());
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.getDefaultTargetMaxChunkSize());
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.getDefaultTargetDocsPerChunk());

// Check custom settings
//@formatter:off
Expand Down Expand Up @@ -242,8 +242,8 @@ public void testCalculateForwardIndexConfig()
assertFalse(forwardIndexConfig.isEnabled());
assertNull(forwardIndexConfig.getCompressionCodec());
assertFalse(forwardIndexConfig.isDeriveNumDocsPerChunk());
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE);
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultRawWriterVersion());
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.getDefaultTargetMaxChunkSize());
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.segment.spi.index;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -35,14 +34,55 @@


public class ForwardIndexConfig extends IndexConfig {
@Deprecated
public static final int DEFAULT_RAW_WRITER_VERSION = 2;
public static final int DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES = 1024 * 1024; // 1MB
public static final String DEFAULT_TARGET_MAX_CHUNK_SIZE =
DataSizeUtils.fromBytes(DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES);
@Deprecated
public static final String DEFAULT_TARGET_MAX_CHUNK_SIZE = "1MB";
@Deprecated
public static final int DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES = 1024 * 1024;
@Deprecated
public static final int DEFAULT_TARGET_DOCS_PER_CHUNK = 1000;

public static final ForwardIndexConfig DISABLED =
new ForwardIndexConfig(true, null, null, null, null, null, null, null);
public static final ForwardIndexConfig DEFAULT = new Builder().build();

private static int _defaultRawIndexWriterVersion = 2;
private static String _defaultTargetMaxChunkSize = "1MB";
private static int _defaultTargetMaxChunkSizeBytes = 1024 * 1024;
private static int _defaultTargetDocsPerChunk = 1000;

public static int getDefaultRawWriterVersion() {
return _defaultRawIndexWriterVersion;
}

public static void setDefaultRawIndexWriterVersion(int defaultRawIndexWriterVersion) {
_defaultRawIndexWriterVersion = defaultRawIndexWriterVersion;
}

public static String getDefaultTargetMaxChunkSize() {
return _defaultTargetMaxChunkSize;
}

public static int getDefaultTargetMaxChunkSizeBytes() {
return _defaultTargetMaxChunkSizeBytes;
}

public static void setDefaultTargetMaxChunkSize(String defaultTargetMaxChunkSize) {
_defaultTargetMaxChunkSize = defaultTargetMaxChunkSize;
_defaultTargetMaxChunkSizeBytes = (int) DataSizeUtils.toBytes(defaultTargetMaxChunkSize);
}

public static int getDefaultTargetDocsPerChunk() {
return _defaultTargetDocsPerChunk;
}

public static void setDefaultTargetDocsPerChunk(int defaultTargetDocsPerChunk) {
_defaultTargetDocsPerChunk = defaultTargetDocsPerChunk;
}

public static ForwardIndexConfig getDefault() {
return new Builder().build();
}

@Nullable
private final CompressionCodec _compressionCodec;
Expand All @@ -61,15 +101,14 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
@Nullable Boolean deriveNumDocsPerChunk, @Nullable Integer rawIndexWriterVersion,
@Nullable String targetMaxChunkSize, @Nullable Integer targetDocsPerChunk) {
super(disabled);
_deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk);
_rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion;
_compressionCodec = compressionCodec;
_deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk);

_targetMaxChunkSizeBytes = targetMaxChunkSize == null ? DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES
: (int) DataSizeUtils.toBytes(targetMaxChunkSize);
_targetMaxChunkSize =
targetMaxChunkSize == null ? DEFAULT_TARGET_MAX_CHUNK_SIZE : targetMaxChunkSize;
_targetDocsPerChunk = targetDocsPerChunk == null ? DEFAULT_TARGET_DOCS_PER_CHUNK : targetDocsPerChunk;
_rawIndexWriterVersion = rawIndexWriterVersion == null ? _defaultRawIndexWriterVersion : rawIndexWriterVersion;
_targetMaxChunkSize = targetMaxChunkSize == null ? _defaultTargetMaxChunkSize : targetMaxChunkSize;
_targetMaxChunkSizeBytes =
targetMaxChunkSize == null ? _defaultTargetMaxChunkSizeBytes : (int) DataSizeUtils.toBytes(targetMaxChunkSize);
_targetDocsPerChunk = targetDocsPerChunk == null ? _defaultTargetDocsPerChunk : targetDocsPerChunk;

if (compressionCodec != null) {
switch (compressionCodec) {
Expand Down Expand Up @@ -115,10 +154,10 @@ public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
@Deprecated @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType,
@JsonProperty("deriveNumDocsPerChunk") @Nullable Boolean deriveNumDocsPerChunk,
@JsonProperty("rawIndexWriterVersion") @Nullable Integer rawIndexWriterVersion,
@JsonProperty("targetMaxChunkSize") @Nullable String targetMaxChunkSizeBytes,
@JsonProperty("targetMaxChunkSize") @Nullable String targetMaxChunkSize,
@JsonProperty("targetDocsPerChunk") @Nullable Integer targetDocsPerChunk) {
this(disabled, getActualCompressionCodec(compressionCodec, chunkCompressionType, dictIdCompressionType),
deriveNumDocsPerChunk, rawIndexWriterVersion, targetMaxChunkSizeBytes, targetDocsPerChunk);
deriveNumDocsPerChunk, rawIndexWriterVersion, targetMaxChunkSize, targetDocsPerChunk);
}

public static CompressionCodec getActualCompressionCodec(@Nullable CompressionCodec compressionCodec,
Expand Down Expand Up @@ -219,9 +258,9 @@ public static class Builder {
@Nullable
private CompressionCodec _compressionCodec;
private boolean _deriveNumDocsPerChunk = false;
private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION;
private String _targetMaxChunkSize;
private int _targetDocsPerChunk = DEFAULT_TARGET_DOCS_PER_CHUNK;
private int _rawIndexWriterVersion = _defaultRawIndexWriterVersion;
private String _targetMaxChunkSize = _defaultTargetMaxChunkSize;
private int _targetDocsPerChunk = _defaultTargetDocsPerChunk;

public Builder() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public AggregationSpec(StarTreeAggregationConfig aggregationConfig) {
public AggregationSpec(@Nullable CompressionCodec compressionCodec, @Nullable Boolean deriveNumDocsPerChunk,
@Nullable Integer indexVersion, @Nullable Integer targetMaxChunkSizeBytes, @Nullable Integer targetDocsPerChunk,
@Nullable Map<String, Object> functionParameters) {
_indexVersion = indexVersion != null ? indexVersion : ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION;
_indexVersion = indexVersion != null ? indexVersion : ForwardIndexConfig.getDefaultRawWriterVersion();
_compressionCodec = compressionCodec != null ? compressionCodec : DEFAULT_COMPRESSION_CODEC;
_deriveNumDocsPerChunk = deriveNumDocsPerChunk != null ? deriveNumDocsPerChunk : false;
_targetMaxChunkSizeBytes = targetMaxChunkSizeBytes != null ? targetMaxChunkSizeBytes
: ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES;
: ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes();
_targetDocsPerChunk =
targetDocsPerChunk != null ? targetDocsPerChunk : ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK;
targetDocsPerChunk != null ? targetDocsPerChunk : ForwardIndexConfig.getDefaultTargetDocsPerChunk();
_functionParameters = functionParameters == null ? Map.of() : functionParameters;
}

Expand Down
Loading

0 comments on commit 37253e9

Please sign in to comment.