Skip to content

Commit

Permalink
apache#2963 Fix NPE during task rebalancing
Browse files Browse the repository at this point in the history
Reformatted touched files according to "helix-format"
  • Loading branch information
jacob-netguardians committed Dec 9, 2024
1 parent dfe0076 commit 3645d2b
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 437 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
Expand All @@ -46,7 +47,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Global Rebalance does the baseline recalculation when certain changes happen.
* The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled.
Expand All @@ -60,9 +60,9 @@ class GlobalRebalanceRunner implements AutoCloseable {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
ImmutableSet
.of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.IDEAL_STATE, HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG);

// To calculate the baseline asynchronously
private final ExecutorService _baselineCalculateExecutor;
Expand All @@ -77,10 +77,8 @@ class GlobalRebalanceRunner implements AutoCloseable {
private boolean _asyncGlobalRebalanceEnabled;

public GlobalRebalanceRunner(AssignmentManager assignmentManager,
AssignmentMetadataStore assignmentMetadataStore,
MetricCollector metricCollector,
LatencyMetric writeLatency,
CountMetric rebalanceFailureCount,
AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector,
LatencyMetric writeLatency, CountMetric rebalanceFailureCount,
boolean isAsyncGlobalRebalanceEnabled) {
_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
_assignmentManager = assignmentManager;
Expand All @@ -106,14 +104,17 @@ public GlobalRebalanceRunner(AssignmentManager assignmentManager,
* @param algorithm
* @throws HelixRebalanceException
*/
public void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
public void globalRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
_changeDetector.updateSnapshots(clusterData);
// Get all the changed items' information. Filter for the items that have content changed.
final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
_changeDetector.getAllChanges();
Set<String> allAssignableInstances = clusterData.getAssignableInstances();

if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
if (clusterChanges.keySet().stream()
.anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
// Calculate the Baseline assignment for global rebalance.
Future<Boolean> result = _baselineCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
Expand Down Expand Up @@ -153,8 +154,8 @@ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<Stri
*/
private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, Set<String> allAssignableInstances,
RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges)
RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput,
boolean shouldTriggerMainPipeline, Map<HelixConstants.ChangeType, Set<String>> clusterChanges)
throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_baselineCalcCounter.increment(1L);
Expand All @@ -165,7 +166,8 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
// 1. Ignore node status (disable/offline).
// 2. Use the previous Baseline as the only parameter about the previous assignment.
Map<String, ResourceAssignment> currentBaseline =
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap,
Expand All @@ -175,7 +177,8 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}

Map<String, ResourceAssignment> newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
Map<String, ResourceAssignment> newBaseline =
WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
boolean isBaselineChanged =
_assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
// Write the new baseline to metadata store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
Expand All @@ -44,7 +45,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Compute the best possible assignment based on the Baseline and the previous Best Possible assignment.
* The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a
Expand All @@ -68,10 +68,8 @@ class PartialRebalanceRunner implements AutoCloseable {
private Future<Boolean> _asyncPartialRebalanceResult;

public PartialRebalanceRunner(AssignmentManager assignmentManager,
AssignmentMetadataStore assignmentMetadataStore,
MetricCollector metricCollector,
CountMetric rebalanceFailureCount,
boolean isAsyncPartialRebalanceEnabled) {
AssignmentMetadataStore assignmentMetadataStore, MetricCollector metricCollector,
CountMetric rebalanceFailureCount, boolean isAsyncPartialRebalanceEnabled) {
_assignmentManager = assignmentManager;
_assignmentMetadataStore = assignmentMetadataStore;
_bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
Expand All @@ -82,16 +80,16 @@ public PartialRebalanceRunner(AssignmentManager assignmentManager,
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
CountMetric.class);
_partialRebalanceLatency = metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
.name(),
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
LatencyMetric.class);
_baselineDivergenceGauge = metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
BaselineDivergenceGauge.class);
}

public void partialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
public void partialRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, Set<String> activeNodes,
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
// If partial rebalance is async and the previous result is not completed yet,
// do not start another partial rebalance.
Expand All @@ -100,19 +98,20 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
return;
}

_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
currentStateOutput);
} catch (HelixRebalanceException e) {
if (_asyncPartialRebalanceEnabled) {
_rebalanceFailureCount.increment(1L);
}
LOG.error("Failed to calculate best possible assignment!", e);
return false;
}
return true;
}));
_asyncPartialRebalanceResult =
_bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
try {
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
currentStateOutput);
} catch (HelixRebalanceException e) {
if (_asyncPartialRebalanceEnabled) {
_rebalanceFailureCount.increment(1L);
}
LOG.error("Failed to calculate best possible assignment!", e);
return false;
}
return true;
}));
if (!_asyncPartialRebalanceEnabled) {
try {
if (!_asyncPartialRebalanceResult.get()) {
Expand All @@ -131,9 +130,9 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
* If the result differ from the persisted result, persist it to memory (only if the version is not stale);
* If persisted, trigger the pipeline so that main thread logic can run again.
*/
private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
private void doPartialRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, Set<String> activeNodes, RebalanceAlgorithm algorithm,
CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
_partialRebalanceCounter.increment(1L);
_partialRebalanceLatency.startMeasuringLatency();
Expand All @@ -142,27 +141,30 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<
if (_assignmentMetadataStore != null) {
newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
} else {
LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
LOG.debug(
"Assignment Metadata Store is null. Skip getting best possible assignment version.");
}

// Read the baseline from metadata store
Map<String, ResourceAssignment> currentBaseline =
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
_assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());

// Read the best possible assignment from metadata store
Map<String, ResourceAssignment> currentBestPossibleAssignment =
_assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider
.generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
currentBaseline, currentBestPossibleAssignment);
clusterModel =
ClusterModelProvider.generateClusterModelForPartialRebalance(clusterData, resourceMap,
activeNodes, currentBaseline, currentBestPossibleAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
Map<String, ResourceAssignment> newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
Map<String, ResourceAssignment> newAssignment =
WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);

// Asynchronously report baseline divergence metric before persisting to metadata store,
// just in case if persisting fails, we still have the metric.
Expand All @@ -177,12 +179,14 @@ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<
currentBaseline, newAssignmentCopy);

boolean bestPossibleUpdateSuccessful = false;
if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(
newAssignment)) {
// This will not persist the new Best Possible Assignment into ZK. It will only update the in-memory cache.
// If this is done successfully, the new Best Possible Assignment will be persisted into ZK the next time that
// the pipeline is triggered. We schedule the pipeline to run below.
bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
newBestPossibleAssignmentVersion);
bestPossibleUpdateSuccessful =
_assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
newBestPossibleAssignmentVersion);
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
}
Expand Down
Loading

0 comments on commit 3645d2b

Please sign in to comment.