Skip to content

Commit

Permalink
make update segment.tier and rebalance table consistent on segment's …
Browse files Browse the repository at this point in the history
…tier (apache#14516)
  • Loading branch information
klsince authored and davecromberge committed Nov 22, 2024
1 parent 1ca7547 commit 7447533
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.spi.config.table.TierConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Factory class to create and sort {@link Tier}
*/
public final class TierFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(TierFactory.class);
public static final String TIME_SEGMENT_SELECTOR_TYPE = "time";
public static final String FIXED_SEGMENT_SELECTOR_TYPE = "fixed";
public static final String PINOT_SERVER_STORAGE_TYPE = "pinot_server";
Expand All @@ -41,11 +45,18 @@ private TierFactory() {
* Constructs a {@link Tier} from the {@link TierConfig} in the table config
*/
public static Tier getTier(TierConfig tierConfig, HelixManager helixManager) {
return getTier(tierConfig, helixManager, null);
}

public static Tier getTier(TierConfig tierConfig, HelixManager helixManager,
@Nullable Set<String> providedSegmentsForTier) {
TierSegmentSelector segmentSelector;
TierStorage storageSelector;

String segmentSelectorType = tierConfig.getSegmentSelectorType();
if (segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
if (providedSegmentsForTier != null) {
LOGGER.debug("Provided segments: {} for tier: {}", providedSegmentsForTier, tierConfig.getName());
segmentSelector = new FixedTierSegmentSelector(helixManager, providedSegmentsForTier);
} else if (segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
segmentSelector = new TimeBasedTierSegmentSelector(helixManager, tierConfig.getSegmentAge());
} else if (segmentSelectorType.equalsIgnoreCase(TierFactory.FIXED_SEGMENT_SELECTOR_TYPE)) {
segmentSelector = new FixedTierSegmentSelector(helixManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -68,8 +69,8 @@ public static String normalizeTierName(String tierName) {
* @return InstancePartitions if the one can be derived from the given sorted tiers, null otherwise
*/
@Nullable
public static InstancePartitions getTieredInstancePartitionsForSegment(String tableNameWithType,
String segmentName, @Nullable List<Tier> sortedTiers, HelixManager helixManager) {
public static InstancePartitions getTieredInstancePartitionsForSegment(String tableNameWithType, String segmentName,
@Nullable List<Tier> sortedTiers, HelixManager helixManager) {
if (CollectionUtils.isEmpty(sortedTiers)) {
return null;
}
Expand Down Expand Up @@ -139,10 +140,18 @@ public static String getDataDirForTier(TableConfig tableConfig, String tierName,
*/
public static List<Tier> getSortedTiersForStorageType(List<TierConfig> tierConfigList, String storageType,
HelixManager helixManager) {
return getSortedTiersForStorageType(tierConfigList, storageType, helixManager, null);
}

public static List<Tier> getSortedTiersForStorageType(List<TierConfig> tierConfigList, String storageType,
HelixManager helixManager, @Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
List<Tier> sortedTiers = new ArrayList<>();
for (TierConfig tierConfig : tierConfigList) {
if (storageType.equalsIgnoreCase(tierConfig.getStorageType())) {
sortedTiers.add(TierFactory.getTier(tierConfig, helixManager));
String tierName = tierConfig.getName();
Set<String> providedSegmentsForTier =
providedTierToSegmentsMap == null ? null : providedTierToSegmentsMap.get(tierName);
sortedTiers.add(TierFactory.getTier(tierConfig, helixManager, providedSegmentsForTier));
}
}
sortedTiers.sort(TierConfigUtils.getTierComparator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand Down Expand Up @@ -107,6 +108,14 @@ public void testGetTier() {
Assert.assertEquals(tier.getStorage().getType(), TierFactory.PINOT_SERVER_STORAGE_TYPE);
Assert.assertEquals(((PinotServerTierStorage) tier.getStorage()).getServerTag(), "tier1_tag_OFFLINE");

// With provided segments, the time base selector is overwritten by a fixed selector.
tier = TierFactory.getTier(tierConfig, null, Set.of("segment1", "segment2"));
Assert.assertEquals(tier.getName(), "tier1");
Assert.assertTrue(tier.getSegmentSelector() instanceof FixedTierSegmentSelector);
Assert.assertEquals(tier.getSegmentSelector().getType(), TierFactory.FIXED_SEGMENT_SELECTOR_TYPE);
Assert.assertEquals(((FixedTierSegmentSelector) tier.getSegmentSelector()).getSegmentsToSelect(),
Sets.newHashSet("segment1", "segment2"));

tierConfig = new TierConfig("tier1", TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null,
Lists.newArrayList("segment1", "segment2", "segment3"), TierFactory.PINOT_SERVER_STORAGE_TYPE,
"tier1_tag_OFFLINE", null, null);
Expand All @@ -117,9 +126,16 @@ public void testGetTier() {
Assert.assertEquals(((FixedTierSegmentSelector) tier.getSegmentSelector()).getSegmentsToSelect(),
Sets.newHashSet("segment1", "segment2", "segment3"));

tierConfig = new TierConfig("tier1", TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null,
null, TierFactory.PINOT_SERVER_STORAGE_TYPE,
"tier1_tag_OFFLINE", null, null);
// With provided segments, the fixed selector can be overwritten with different set of segments.
tier = TierFactory.getTier(tierConfig, null, Set.of("segment1a", "segment2b"));
Assert.assertEquals(tier.getName(), "tier1");
Assert.assertTrue(tier.getSegmentSelector() instanceof FixedTierSegmentSelector);
Assert.assertEquals(tier.getSegmentSelector().getType(), TierFactory.FIXED_SEGMENT_SELECTOR_TYPE);
Assert.assertEquals(((FixedTierSegmentSelector) tier.getSegmentSelector()).getSegmentsToSelect(),
Sets.newHashSet("segment1a", "segment2b"));

tierConfig = new TierConfig("tier1", TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null, null,
TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, null);
tier = TierFactory.getTier(tierConfig, null);
Assert.assertEquals(tier.getName(), "tier1");
Assert.assertTrue(tier.getSegmentSelector() instanceof FixedTierSegmentSelector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3520,35 +3520,42 @@ public RebalanceResult rebalanceTable(String tableNameWithType, RebalanceConfig

public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig tableConfig, String rebalanceJobId,
RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver) {
Map<String, Set<String>> tierToSegmentsMap = null;
if (rebalanceConfig.isUpdateTargetTier()) {
updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
}
TableRebalancer tableRebalancer =
new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics);
return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId);
return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId, tierToSegmentsMap);
}

/**
* Calculate the target tier the segment belongs to and set it in segment ZK metadata as goal state, which can be
* checked by servers when loading the segment to put it onto the target storage tier.
*/
@VisibleForTesting
void updateTargetTier(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig) {
Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig) {
List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
List<Tier> sortedTiers =
tierCfgs == null ? Collections.emptyList() : TierConfigUtils.getSortedTiers(tierCfgs, _helixZkManager);
CollectionUtils.isNotEmpty(tierCfgs) ? TierConfigUtils.getSortedTiersForStorageType(tierCfgs,
TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager) : Collections.emptyList();
LOGGER.info("For rebalanceId: {}, updating target tiers for segments of table: {} with tierConfigs: {}",
rebalanceJobId, tableNameWithType, sortedTiers);
Map<String, Set<String>> tierToSegmentsMap = new HashMap<>();
for (String segmentName : getSegmentsFor(tableNameWithType, true)) {
updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
String tier = updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
if (tier != null) {
tierToSegmentsMap.computeIfAbsent(tier, t -> new HashSet<>()).add(segmentName);
}
}
return tierToSegmentsMap;
}

private void updateSegmentTargetTier(String tableNameWithType, String segmentName, List<Tier> sortedTiers) {
private String updateSegmentTargetTier(String tableNameWithType, String segmentName, List<Tier> sortedTiers) {
ZNRecord segmentMetadataZNRecord = getSegmentMetadataZnRecord(tableNameWithType, segmentName);
if (segmentMetadataZNRecord == null) {
LOGGER.debug("No ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType);
return;
return null;
}
Tier targetTier = null;
for (Tier tier : sortedTiers) {
Expand All @@ -3563,21 +3570,22 @@ private void updateSegmentTargetTier(String tableNameWithType, String segmentNam
if (targetTier == null) {
if (segmentZKMetadata.getTier() == null) {
LOGGER.debug("Segment: {} of table: {} is already set to go to default tier", segmentName, tableNameWithType);
return;
return null;
}
LOGGER.info("Segment: {} of table: {} is put back on default tier", segmentName, tableNameWithType);
} else {
targetTierName = targetTier.getName();
if (targetTierName.equals(segmentZKMetadata.getTier())) {
LOGGER.debug("Segment: {} of table: {} is already set to go to target tier: {}", segmentName, tableNameWithType,
targetTierName);
return;
return targetTierName;
}
LOGGER.info("Segment: {} of table: {} is put onto new tier: {}", segmentName, tableNameWithType, targetTierName);
}
// Update the tier in segment ZK metadata and write it back to ZK.
segmentZKMetadata.setTier(targetTierName);
updateZkMetadata(tableNameWithType, segmentZKMetadata, segmentMetadataZNRecord.getVersion());
return targetTierName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,16 @@ public static String createUniqueRebalanceJobIdentifier() {

public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
@Nullable String rebalanceJobId) {
return rebalance(tableConfig, rebalanceConfig, rebalanceJobId, null);
}

public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
@Nullable String rebalanceJobId, @Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
long startTime = System.currentTimeMillis();
String tableNameWithType = tableConfig.getTableName();
RebalanceResult.Status status = RebalanceResult.Status.UNKNOWN_ERROR;
try {
RebalanceResult result = doRebalance(tableConfig, rebalanceConfig, rebalanceJobId);
RebalanceResult result = doRebalance(tableConfig, rebalanceConfig, rebalanceJobId, providedTierToSegmentsMap);
status = result.getStatus();
return result;
} finally {
Expand All @@ -159,7 +164,7 @@ public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig rebala
}

private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
@Nullable String rebalanceJobId) {
@Nullable String rebalanceJobId, @Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = tableConfig.getTableName();
if (rebalanceJobId == null) {
Expand Down Expand Up @@ -238,7 +243,7 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb
Map<String, InstancePartitions> tierToInstancePartitionsMap;
boolean tierInstancePartitionsUnchanged;
try {
sortedTiers = getSortedTiers(tableConfig);
sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap);
Pair<Map<String, InstancePartitions>, Boolean> tierToInstancePartitionsMapAndUnchanged =
getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun);
tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft();
Expand Down Expand Up @@ -671,13 +676,14 @@ private Pair<InstancePartitions, Boolean> getInstancePartitions(TableConfig tabl
}

@Nullable
private List<Tier> getSortedTiers(TableConfig tableConfig) {
private List<Tier> getSortedTiers(TableConfig tableConfig,
@Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
List<TierConfig> tierConfigs = tableConfig.getTierConfigsList();
if (CollectionUtils.isNotEmpty(tierConfigs)) {
// Get tiers with storageType = "PINOT_SERVER". This is the only type available right now.
// Other types should be treated differently
return TierConfigUtils.getSortedTiersForStorageType(tierConfigs, TierFactory.PINOT_SERVER_STORAGE_TYPE,
_helixManager);
_helixManager, providedTierToSegmentsMap);
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,21 +920,42 @@ public void testUpdateTargetTier()
assertNull(segmentZKMetadata.getTier());

// Move on to new tier
_helixResourceManager.updateTargetTier("j1", tableConfig.getTableName(), tableConfig);
Map<String, Set<String>> tierToSegmentsMap =
_helixResourceManager.updateTargetTier("j1", tableConfig.getTableName(), tableConfig);
List<SegmentZKMetadata> retrievedSegmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
SegmentZKMetadata retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
assertEquals(tierToSegmentsMap.size(), 1);
assertEquals(tierToSegmentsMap.get("tier1"), Set.of("testSegment"));

// No tier move
tierToSegmentsMap =
_helixResourceManager.updateTargetTier("j11", tableConfig.getTableName(), tableConfig);
retrievedSegmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
assertEquals(tierToSegmentsMap.size(), 1);
assertEquals(tierToSegmentsMap.get("tier1"), Set.of("testSegment"));

// Move back to default tier
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
.setServerTenant(SERVER_TENANT_NAME).build();
_helixResourceManager.updateTableConfig(tableConfig);
_helixResourceManager.updateTargetTier("j2", tableConfig.getTableName(), tableConfig);
tierToSegmentsMap = _helixResourceManager.updateTargetTier("j2", tableConfig.getTableName(), tableConfig);
retrievedSegmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
assertNull(retrievedSegmentZKMetadata.getTier());
assertTrue(tierToSegmentsMap.isEmpty());

// No tier move
tierToSegmentsMap =
_helixResourceManager.updateTargetTier("j22", tableConfig.getTableName(), tableConfig);
retrievedSegmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
assertNull(retrievedSegmentZKMetadata.getTier());
assertTrue(tierToSegmentsMap.isEmpty());
}

/**
Expand Down

0 comments on commit 7447533

Please sign in to comment.