Skip to content

Commit

Permalink
Extend to add Default Server level configs for Dedup Tables (#14684)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepthi912 authored Dec 27, 2024
1 parent 00f0721 commit 86a82dd
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ protected void doInit() {
List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns),
"Primary key columns must be configured for dedup");
_tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, _serverMetrics);
_tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, _serverMetrics,
_instanceDataManagerConfig.getDedupConfig());
}

UpsertConfig upsertConfig = _tableConfig.getUpsertConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*/
package org.apache.pinot.integration.tests;

import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManagerFactory;
import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.HashFunction;
Expand Down Expand Up @@ -76,6 +79,9 @@ public void setUp()
protected void overrideServerConf(PinotConfiguration serverConf) {
serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".max.segment.preload.threads",
"1");
serverConf.setProperty(Joiner.on(".").join(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX,
HelixInstanceDataManagerConfig.DEDUP_CONFIG_PREFIX,
TableDedupMetadataManagerFactory.DEDUP_DEFAULT_ENABLE_PRELOAD), "true");
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.pinot.segment.local.dedup;

import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,15 +36,30 @@ private TableDedupMetadataManagerFactory() {
}

private static final Logger LOGGER = LoggerFactory.getLogger(TableDedupMetadataManagerFactory.class);
public static final String DEDUP_DEFAULT_METADATA_MANAGER_CLASS = "default.metadata.manager.class";
public static final String DEDUP_DEFAULT_ENABLE_PRELOAD = "default.enable.preload";

public static TableDedupMetadataManager create(TableConfig tableConfig, Schema schema,
TableDataManager tableDataManager, ServerMetrics serverMetrics) {
TableDataManager tableDataManager, ServerMetrics serverMetrics,
@Nullable PinotConfiguration instanceDedupConfig) {
String tableNameWithType = tableConfig.getTableName();
DedupConfig dedupConfig = tableConfig.getDedupConfig();
Preconditions.checkArgument(dedupConfig != null, "Must provide dedup config for table: %s", tableNameWithType);

TableDedupMetadataManager metadataManager;
String metadataManagerClass = dedupConfig.getMetadataManagerClass();

if (instanceDedupConfig != null) {
if (metadataManagerClass == null) {
metadataManagerClass = instanceDedupConfig.getProperty(DEDUP_DEFAULT_METADATA_MANAGER_CLASS);
}

// Server level config honoured only when table level config is not set to true
if (!dedupConfig.isEnablePreload()) {
dedupConfig.setEnablePreload(
Boolean.parseBoolean(instanceDedupConfig.getProperty(DEDUP_DEFAULT_ENABLE_PRELOAD, "false")));
}
}
if (StringUtils.isNotEmpty(metadataManagerClass)) {
LOGGER.info("Creating TableDedupMetadataManager with class: {} for table: {}", metadataManagerClass,
tableNameWithType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ public void testEnablePreload() {
when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable"));
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(null);
TableDedupMetadataManager tableDedupMetadataManager =
TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, null);
TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, null, null);
assertNotNull(tableDedupMetadataManager);
assertFalse(tableDedupMetadataManager.isEnablePreload());

// Enabled as enablePreload is true and there is preloading thread.
tableDataManager = mock(TableDataManager.class);
when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable"));
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(mock(ExecutorService.class));
tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, null);
tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, null,
null);
assertNotNull(tableDedupMetadataManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static TableDedupMetadataManager getTableDedupMetadataManager(Schema sch
TableDataManager tableDataManager = Mockito.mock(TableDataManager.class);
Mockito.when(tableDataManager.getTableDataDir()).thenReturn(TEMP_DIR);
return TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager,
Mockito.mock(ServerMetrics.class));
Mockito.mock(ServerMetrics.class), null);
}

public List<Map<String, String>> loadJsonFile(String filePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig
public static final String SEGMENT_DIRECTORY_LOADER = "segment.directory.loader";
// Prefix for upsert config
public static final String UPSERT_CONFIG_PREFIX = "upsert";
// Prefix for dedup config
public static final String DEDUP_CONFIG_PREFIX = "dedup";
// Prefix for auth config
public static final String AUTH_CONFIG_PREFIX = "auth";
// Prefix for tier configs
Expand Down Expand Up @@ -118,6 +120,7 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig

private final PinotConfiguration _serverConfig;
private final PinotConfiguration _upsertConfig;
private final PinotConfiguration _dedupConfig;
private final PinotConfiguration _authConfig;
private final Map<String, Map<String, String>> _tierConfigs;

Expand All @@ -133,6 +136,7 @@ public HelixInstanceDataManagerConfig(PinotConfiguration serverConfig)

_authConfig = serverConfig.subset(AUTH_CONFIG_PREFIX);
_upsertConfig = serverConfig.subset(UPSERT_CONFIG_PREFIX);
_dedupConfig = serverConfig.subset(DEDUP_CONFIG_PREFIX);

PinotConfiguration tierConfigs = getConfig().subset(TIER_CONFIGS_PREFIX);
List<String> tierNames = tierConfigs.getProperty(TIER_NAMES, Collections.emptyList());
Expand Down Expand Up @@ -289,6 +293,11 @@ public PinotConfiguration getUpsertConfig() {
return _upsertConfig;
}

@Override
public PinotConfiguration getDedupConfig() {
return _dedupConfig;
}

@Override
public PinotConfiguration getAuthConfig() {
return _authConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface InstanceDataManagerConfig {

PinotConfiguration getUpsertConfig();

PinotConfiguration getDedupConfig();

PinotConfiguration getAuthConfig();

Map<String, Map<String, String>> getTierConfigs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class DedupConfig extends BaseJsonConfig {
private final String _dedupTimeColumn;

@JsonPropertyDescription("Whether to preload segments for fast dedup metadata recovery")
private final boolean _enablePreload;
private boolean _enablePreload;

public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) boolean dedupEnabled,
@JsonProperty(value = "hashFunction") HashFunction hashFunction) {
Expand Down Expand Up @@ -96,4 +96,8 @@ public String getDedupTimeColumn() {
public boolean isEnablePreload() {
return _enablePreload;
}

public void setEnablePreload(boolean enablePreload) {
_enablePreload = enablePreload;
}
}

0 comments on commit 86a82dd

Please sign in to comment.