Skip to content

Commit

Permalink
Allow setting ForwardIndexConfig default settings via cluster config
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Jan 8, 2025
1 parent 82bf63c commit ffb0dde
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.spi.utils.CommonConstants.CONFIG_OF_TIMEZONE;


public class ServiceStartableUtils {
private ServiceStartableUtils() {
Expand All @@ -44,7 +43,10 @@ private ServiceStartableUtils() {
protected static String _timeZone;

/**
* Applies the ZK cluster config to the given instance config if it does not already exist.
* Applies the ZK cluster config to:
* - The given instance config if it does not already exist.
* - Set the timezone.
* - Initialize the default values in {@link ForwardIndexConfig}.
*
* In the ZK cluster config:
* - pinot.all.* will be replaced to role specific config, e.g. pinot.controller.* for controllers
Expand All @@ -70,7 +72,8 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
zkClient.readData(String.format(CLUSTER_CONFIG_ZK_PATH_TEMPLATE, clusterName, clusterName), true);
if (clusterConfigZNRecord == null) {
LOGGER.warn("Failed to find cluster config for cluster: {}, skipping applying cluster config", clusterName);
setupTimezone(instanceConfig);
setTimezone(instanceConfig);
initForwardIndexConfig(instanceConfig);
return;
}

Expand All @@ -92,7 +95,8 @@ public static void applyClusterConfig(PinotConfiguration instanceConfig, String
} finally {
zkClient.close();
}
setupTimezone(instanceConfig);
setTimezone(instanceConfig);
initForwardIndexConfig(instanceConfig);
}

private static void addConfigIfNotExists(PinotConfiguration instanceConfig, String key, String value) {
Expand All @@ -101,10 +105,31 @@ private static void addConfigIfNotExists(PinotConfiguration instanceConfig, Stri
}
}

private static void setupTimezone(PinotConfiguration instanceConfig) {
private static void setTimezone(PinotConfiguration instanceConfig) {
TimeZone localTimezone = TimeZone.getDefault();
_timeZone = instanceConfig.getProperty(CONFIG_OF_TIMEZONE, localTimezone.getID());
_timeZone = instanceConfig.getProperty(CommonConstants.CONFIG_OF_TIMEZONE, localTimezone.getID());
System.setProperty("user.timezone", _timeZone);
LOGGER.info("Timezone: {}", _timeZone);
}

private static void initForwardIndexConfig(PinotConfiguration instanceConfig) {
String defaultRawIndexWriterVersion =
instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_RAW_INDEX_WRITER_VERSION);
if (defaultRawIndexWriterVersion != null) {
LOGGER.info("Setting forward index default raw index writer version to: {}", defaultRawIndexWriterVersion);
ForwardIndexConfig.setDefaultRawIndexWriterVersion(Integer.parseInt(defaultRawIndexWriterVersion));
}
String defaultTargetMaxChunkSize =
instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_MAX_CHUNK_SIZE);
if (defaultTargetMaxChunkSize != null) {
LOGGER.info("Setting forward index default target max chunk size to: {}", defaultTargetMaxChunkSize);
ForwardIndexConfig.setDefaultTargetMaxChunkSize(defaultTargetMaxChunkSize);
}
String defaultTargetDocsPerChunk =
instanceConfig.getProperty(CommonConstants.ForwardIndexConfigs.CONFIG_OF_DEFAULT_TARGET_DOCS_PER_CHUNK);
if (defaultTargetDocsPerChunk != null) {
LOGGER.info("Setting forward index default target docs per chunk to: {}", defaultTargetDocsPerChunk);
ForwardIndexConfig.setDefaultTargetDocsPerChunk(Integer.parseInt(defaultTargetDocsPerChunk));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType c
DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
this(indexFile, compressionType, totalDocs, valueType, maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
writerVersion, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
writerVersion, ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType, int maxRowLengthInBytes, int maxNumberOfElements)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
maxRowLengthInBytes, maxNumberOfElements, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.getDefaultRawWriterVersion(),
maxRowLengthInBytes, maxNumberOfElements, ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class SingleValueFixedByteRawIndexCreator implements ForwardIndexCreator
public SingleValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
int totalDocs, DataType valueType)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
this(baseIndexDir, compressionType, column, totalDocs, valueType, ForwardIndexConfig.getDefaultRawWriterVersion(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public SingleValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
int totalDocs, DataType valueType, int maxLength)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
ForwardIndexConfig.getDefaultRawWriterVersion(), ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void oldConfEnableRawWithCompression(String compression,
.withCompressionType(expectedChunkCompression)
.withDictIdCompressionType(expectedDictCompression)
.withDeriveNumDocsPerChunk(false)
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.withRawIndexWriterVersion(ForwardIndexConfig.getDefaultRawWriterVersion())
.build()
);
}
Expand All @@ -248,7 +248,7 @@ public void oldConfEnableRawWithDeriveNumDocs()
assertEquals(new ForwardIndexConfig.Builder()
.withCompressionType(null)
.withDeriveNumDocsPerChunk(true)
.withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
.withRawIndexWriterVersion(ForwardIndexConfig.getDefaultRawWriterVersion())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ public void testCalculateForwardIndexConfig()
assertTrue(forwardIndexConfig.isEnabled());
assertNull(forwardIndexConfig.getCompressionCodec());
assertFalse(forwardIndexConfig.isDeriveNumDocsPerChunk());
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE);
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultRawWriterVersion());
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.getDefaultTargetMaxChunkSize());
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.getDefaultTargetDocsPerChunk());

// Check custom settings
//@formatter:off
Expand Down Expand Up @@ -242,8 +242,8 @@ public void testCalculateForwardIndexConfig()
assertFalse(forwardIndexConfig.isEnabled());
assertNull(forwardIndexConfig.getCompressionCodec());
assertFalse(forwardIndexConfig.isDeriveNumDocsPerChunk());
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION);
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE);
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
assertEquals(forwardIndexConfig.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultRawWriterVersion());
assertEquals(forwardIndexConfig.getTargetMaxChunkSize(), ForwardIndexConfig.getDefaultTargetMaxChunkSize());
assertEquals(forwardIndexConfig.getTargetDocsPerChunk(), ForwardIndexConfig.getDefaultTargetDocsPerChunk());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.segment.spi.index;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -36,14 +35,47 @@

public class ForwardIndexConfig extends IndexConfig {
public static final int DEFAULT_RAW_WRITER_VERSION = 2;
public static final int DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES = 1024 * 1024; // 1MB
public static final String DEFAULT_TARGET_MAX_CHUNK_SIZE =
DataSizeUtils.fromBytes(DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES);
public static final String DEFAULT_TARGET_MAX_CHUNK_SIZE = "1MB";
public static final int DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES = 1024 * 1024;
public static final int DEFAULT_TARGET_DOCS_PER_CHUNK = 1000;
public static final ForwardIndexConfig DISABLED =
new ForwardIndexConfig(true, null, null, null, null, null, null, null);
public static final ForwardIndexConfig DEFAULT = new Builder().build();

private static int _defaultRawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION;
private static String _defaultTargetMaxChunkSize = DEFAULT_TARGET_MAX_CHUNK_SIZE;
private static int _defaultTargetMaxChunkSizeBytes = DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES;
private static int _defaultTargetDocsPerChunk = DEFAULT_TARGET_DOCS_PER_CHUNK;

public static int getDefaultRawWriterVersion() {
return _defaultRawIndexWriterVersion;
}

public static void setDefaultRawIndexWriterVersion(int defaultRawIndexWriterVersion) {
_defaultRawIndexWriterVersion = defaultRawIndexWriterVersion;
}

public static String getDefaultTargetMaxChunkSize() {
return _defaultTargetMaxChunkSize;
}

public static int getDefaultTargetMaxChunkSizeBytes() {
return _defaultTargetMaxChunkSizeBytes;
}

public static void setDefaultTargetMaxChunkSize(String defaultTargetMaxChunkSize) {
_defaultTargetMaxChunkSize = defaultTargetMaxChunkSize;
_defaultTargetMaxChunkSizeBytes = (int) DataSizeUtils.toBytes(defaultTargetMaxChunkSize);
}

public static int getDefaultTargetDocsPerChunk() {
return _defaultTargetDocsPerChunk;
}

public static void setDefaultTargetDocsPerChunk(int defaultTargetDocsPerChunk) {
_defaultTargetDocsPerChunk = defaultTargetDocsPerChunk;
}

@Nullable
private final CompressionCodec _compressionCodec;
private final boolean _deriveNumDocsPerChunk;
Expand All @@ -61,15 +93,14 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
@Nullable Boolean deriveNumDocsPerChunk, @Nullable Integer rawIndexWriterVersion,
@Nullable String targetMaxChunkSize, @Nullable Integer targetDocsPerChunk) {
super(disabled);
_deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk);
_rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion;
_compressionCodec = compressionCodec;
_deriveNumDocsPerChunk = Boolean.TRUE.equals(deriveNumDocsPerChunk);

_targetMaxChunkSizeBytes = targetMaxChunkSize == null ? DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES
: (int) DataSizeUtils.toBytes(targetMaxChunkSize);
_targetMaxChunkSize =
targetMaxChunkSize == null ? DEFAULT_TARGET_MAX_CHUNK_SIZE : targetMaxChunkSize;
_targetDocsPerChunk = targetDocsPerChunk == null ? DEFAULT_TARGET_DOCS_PER_CHUNK : targetDocsPerChunk;
_rawIndexWriterVersion = rawIndexWriterVersion == null ? _defaultRawIndexWriterVersion : rawIndexWriterVersion;
_targetMaxChunkSize = targetMaxChunkSize == null ? _defaultTargetMaxChunkSize : targetMaxChunkSize;
_targetMaxChunkSizeBytes =
targetMaxChunkSize == null ? _defaultTargetMaxChunkSizeBytes : (int) DataSizeUtils.toBytes(targetMaxChunkSize);
_targetDocsPerChunk = targetDocsPerChunk == null ? _defaultTargetDocsPerChunk : targetDocsPerChunk;

if (compressionCodec != null) {
switch (compressionCodec) {
Expand Down Expand Up @@ -115,10 +146,10 @@ public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
@Deprecated @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType,
@JsonProperty("deriveNumDocsPerChunk") @Nullable Boolean deriveNumDocsPerChunk,
@JsonProperty("rawIndexWriterVersion") @Nullable Integer rawIndexWriterVersion,
@JsonProperty("targetMaxChunkSize") @Nullable String targetMaxChunkSizeBytes,
@JsonProperty("targetMaxChunkSize") @Nullable String targetMaxChunkSize,
@JsonProperty("targetDocsPerChunk") @Nullable Integer targetDocsPerChunk) {
this(disabled, getActualCompressionCodec(compressionCodec, chunkCompressionType, dictIdCompressionType),
deriveNumDocsPerChunk, rawIndexWriterVersion, targetMaxChunkSizeBytes, targetDocsPerChunk);
deriveNumDocsPerChunk, rawIndexWriterVersion, targetMaxChunkSize, targetDocsPerChunk);
}

public static CompressionCodec getActualCompressionCodec(@Nullable CompressionCodec compressionCodec,
Expand Down Expand Up @@ -219,9 +250,9 @@ public static class Builder {
@Nullable
private CompressionCodec _compressionCodec;
private boolean _deriveNumDocsPerChunk = false;
private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION;
private int _rawIndexWriterVersion = _defaultRawIndexWriterVersion;
private String _targetMaxChunkSize;
private int _targetDocsPerChunk = DEFAULT_TARGET_DOCS_PER_CHUNK;
private int _targetDocsPerChunk = _defaultTargetDocsPerChunk;

public Builder() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public AggregationSpec(StarTreeAggregationConfig aggregationConfig) {
public AggregationSpec(@Nullable CompressionCodec compressionCodec, @Nullable Boolean deriveNumDocsPerChunk,
@Nullable Integer indexVersion, @Nullable Integer targetMaxChunkSizeBytes, @Nullable Integer targetDocsPerChunk,
@Nullable Map<String, Object> functionParameters) {
_indexVersion = indexVersion != null ? indexVersion : ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION;
_indexVersion = indexVersion != null ? indexVersion : ForwardIndexConfig.getDefaultRawWriterVersion();
_compressionCodec = compressionCodec != null ? compressionCodec : DEFAULT_COMPRESSION_CODEC;
_deriveNumDocsPerChunk = deriveNumDocsPerChunk != null ? deriveNumDocsPerChunk : false;
_targetMaxChunkSizeBytes = targetMaxChunkSizeBytes != null ? targetMaxChunkSizeBytes
: ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES;
: ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes();
_targetDocsPerChunk =
targetDocsPerChunk != null ? targetDocsPerChunk : ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK;
targetDocsPerChunk != null ? targetDocsPerChunk : ForwardIndexConfig.getDefaultTargetDocsPerChunk();
_functionParameters = functionParameters == null ? Map.of() : functionParameters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void withEmptyConf()
assertFalse(config.isDisabled(), "Unexpected disabled");
assertNull(config.getChunkCompressionType(), "Unexpected chunkCompressionType");
assertFalse(config.isDeriveNumDocsPerChunk(), "Unexpected deriveNumDocsPerChunk");
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultRawWriterVersion(),
"Unexpected rawIndexWriterVersion");
}

Expand All @@ -50,7 +50,7 @@ public void withDisabledNull()
assertFalse(config.isDisabled(), "Unexpected disabled");
assertNull(config.getChunkCompressionType(), "Unexpected chunkCompressionType");
assertFalse(config.isDeriveNumDocsPerChunk(), "Unexpected deriveNumDocsPerChunk");
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultTargetDocsPerChunk(),
"Unexpected rawIndexWriterVersion");
}

Expand All @@ -63,7 +63,7 @@ public void withDisabledFalse()
assertFalse(config.isDisabled(), "Unexpected disabled");
assertNull(config.getChunkCompressionType(), "Unexpected chunkCompressionType");
assertFalse(config.isDeriveNumDocsPerChunk(), "Unexpected deriveNumDocsPerChunk");
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultRawWriterVersion(),
"Unexpected rawIndexWriterVersion");
}

Expand All @@ -76,7 +76,7 @@ public void withDisabledTrue()
assertTrue(config.isDisabled(), "Unexpected disabled");
assertNull(config.getChunkCompressionType(), "Unexpected chunkCompressionType");
assertFalse(config.isDeriveNumDocsPerChunk(), "Unexpected deriveNumDocsPerChunk");
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION,
assertEquals(config.getRawIndexWriterVersion(), ForwardIndexConfig.getDefaultRawWriterVersion(),
"Unexpected rawIndexWriterVersion");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,4 +1347,13 @@ public static class CursorConfigs {
public static final String RESPONSE_STORE_CLEANER_INITIAL_DELAY =
"controller.cluster.response.store.cleaner.initialDelay";
}

public static class ForwardIndexConfigs {
public static final String CONFIG_OF_DEFAULT_RAW_INDEX_WRITER_VERSION =
"pinot.forward.index.default.raw.index.writer.version";
public static final String CONFIG_OF_DEFAULT_TARGET_MAX_CHUNK_SIZE =
"pinot.forward.index.default.target.max.chunk.size";
public static final String CONFIG_OF_DEFAULT_TARGET_DOCS_PER_CHUNK =
"pinot.forward.index.default.target.docs.per.chunk";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ private void convertOneColumn(IndexSegment segment, String column, File newSegme

try (ForwardIndexCreator rawIndexCreator = ForwardIndexCreatorFactory.getRawIndexCreatorForSVColumn(newSegment,
compressionType, column, storedType, numDocs, lengthOfLongestEntry, false,
ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION, ForwardIndexConfig.DEFAULT_TARGET_MAX_CHUNK_SIZE_BYTES,
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
ForwardIndexConfig.getDefaultRawWriterVersion(), ForwardIndexConfig.getDefaultTargetMaxChunkSizeBytes(),
ForwardIndexConfig.getDefaultTargetDocsPerChunk());
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
switch (storedType) {
case INT:
Expand Down

0 comments on commit ffb0dde

Please sign in to comment.