-
Notifications
You must be signed in to change notification settings - Fork 228
WAGED rebalancer Hard Constraint Scope Expansion
WAGED Rebalancer uses hard constraints to ensure the best possible state to have certain properties; among those properties, most notably, hard constraints ensure that the partition weight usage on each node is not beyond the capacity limit.
Under the impression that hard constraints will always be enforced, users rely on hard constraints for real hardware limitations - for example, users will configure DISK weights and capacities; when DISK capacities are violated, the nodes encounter out of disk errors. However, hard constraints are in fact not always enforced, so users start to encounter out of disk errors, as well as other issues due to hard constraint violation.
This design focus on solving the problem where WAGED hard constraints are not being honored in some temporary states, both corner cases last only for limited time and will eventually converge to a compliant state:
- N to N+1: in the middle of state transition, some instance may have more partitions than its defined capacity, which will eventually be reduced and satisfies
- Rebalance overwrites: a special process to maintain the minimum number of active replicas while reducing overall partition movement. Some picked instance may "temporarily" be assigned a partition until a time threshold and a formal assignment is made The DISK overload is a critical issue for applications like databases. In practice, extra resources are usually provisioned to have extra head room to avoid server crash.
Above are the only two known scenarios for hard constraint violation, this design focus on enforcing hard constraints in these cases. It is not for optimizing any of the mentioned process itself. In fact, we are making effort to maintain backward compatibility as possible except for fixing the hard constraint issue.
There is known tradeoff with this new feature:
- Partition end-to-end movement will be slower, some previously state transitions now become illegal and can only be issued when the condition is met
- It's possible for a heavy loaded cluster (close to its capacity) to unable to move any partition due to capacity. This is possible even prior to this, but it's more likely to happen now due to more strict check. Some previous movement now become illegal.
https://github.com/apache/helix/wiki/Weight-aware-Globally-even-distribute-Rebalancer
https://github.com/apache/helix/wiki/WAGED-Rebalance-Pipeline-Redesign
In the original design, hard constraints are meant to be a part of WagedRebalancer and are used to calculate the "ideal assignment" as the output to WagedRebalancer. Based on that design, hard constraints are enforced in the ConstraintBasedAlgorithm class, which is invoked during the several rebalance pipelines, such as Emergency Rebalance, Global Rebalance, and Partial Rebalance. During ConstraintBasedAlgorithm.calculate()
function, and by extension, during WagedRebalancer.calculateAssignment()
, hard constraints are enforced and the return value - a mapping between resource names and assignment - fully respect all hard constraints. However, outside of the scope of WagedRebalancer.calculateAssignment(), assignments can still be altered and they may not necessarily respect hard constraints.
There are many cases that a node can be disconnected from Helix but is expected to come back soon, for example, scheduled maintenance, node bounce, long GC pause, intermittent network partition, or node crashes and recovers in a short period of time. In these scenarios, we would like Helix to still take measures to maintain availability requirements (such as minimum number of active replicas) but minimize the redundant shuffling work because the node that went offline may come back online shortly.
The delayed window is configured at cluster level.
Helix always try to maintain a minimal number of active (non-offline) replica for each partition of a resource, minActiveReplicas, configured per resource level.
This effectively means, when an instance goes offline, which causes some partitions' active replica to drop to less than its minimal active replica count, Helix should immediately bring up as many replica on other live instances to keep the active replica count greater or equal to the required minimal count. No-delay should be applied at this scenario.
When some nodes are not live but in the delay period, partitions are still assigned to them. However, since these nodes are actually offline, we need to ensure that minActiveReplica is met, therefore we need RebalanceOverwrite to optionally move some replicas from those offline nodes to other live nodes.
To avoid confusion and for the sake of consistency, we will use the following terms in this document:
- Temporarily downed instance: the instance is offline but still in the delayed window, the partitions on it are offline but not yet moving out (unless minActiveReplicas is not met)
- Permanent downed instance: the "dead" instance that stay offline after the delayed window finishes, the partitions on it are dropped and reassigned.
- N+1: One area where hard constraints being violated temporarily, the temporary state during state transition where hard constraint may not be honored (continue reading in Problem Statement for more)
- Rebalance Overwrites: Another area of problem of hard constraints being violated. A previous effort to apply additional patch to assignment result so that it meets minActiveReplica even during the delayed window, this is done as an afterthought. (continue reading in Problem Statement for more)
The original concept of hard constraints only applies to the eventual and stable state of the cluster. In some temporary state, the property may not be met. The temporary areas that break hard constraints, namely N+1 and rebalance overwrites. Note: This design is specifically focusing on the hard constraint violation issue in above two scenario. It is NOT on improving "N+1" state transitions or "rebalance overwrites" themselves. It aims to remain backward compatible on those areas while fixing the hard constraint violation issue only.
During state transition, some instances may exceed the capacity temporarily. The algorithm only ensures the eventual state after state transition satisfies the hard constraints.
As emergent reaction to recover minActiveReplicas during delayed window, rebalance overwrites are applied as an additional patch on the calculated results. It's temporary in a sense that it's only happening during the delayed window and when some partitions failed to meet minActiveReplicas.
- Only when nodes are down and delayed rebalance is enabled - creates an assignment that's based on immediately available nodes (excluding all downed nodes regardless of delayed rebalance settings).
- This assignment is merged with Best Possible Assignment, if the original Best Possible doesn't meet minActiveReplicas. This design explores options to expand the scope of hard constraints such that it's met at all time, including the temporary states like "N+1" and "rebalance overwrites".
Before getting in to the design, we want to clarify on the two problems we are trying to solve.
Even though we listed two areas in parallel that break the hard constraints, they are quite different. We determine N+1 and overwrites are two different problems with the following criteria:
- Whether the rebalancer output satisfies the constraints
-
- N+1: YES
-
- Rebalance Overwrites: NO
- Whether the problem can be solved with proper execution planning and throttling.
-
- N+1: YES
-
- Rebalance Overwrites: NO As a result, N+1 and overwrites problems should be solved differently.
There are two main directions to solve this problem.
- Modify ideal assignment computation logic. Let each ideal assignment computation be aware of old assignment and pending state transitions. As a result, the weights of moving partition will be counted twice: once in the old location, another in the target location.
- (Preferred) Modify downstream message selection and throttling logic. The end result is the state transition will only be triggered if it satisfies the constraints.
Two approaches attempt to solve the N+1 problem in different places. They do have one thing in common: block certain state transitions if hard constraints are not met on the instance, with the hope that it will succeed in later cycle.
To understand the basis of this approach, please be reminded that the current ideal assignment output from rebalancer is eventually valid in "N+1" scenario. The eventual resource mapping satisfies the constraints after all state transitions finish.
As a result, we can have a clear cutoff between resource mapping calculation (a target to get to) and execution (how do we get there). Another way to think of this is, if we push the throttling to the extreme, only one state transition is allowed in the cluster global scope, there shouldn't be any overloading. (The assumption is that the cluster has a minimum functional headroom to allow at least one extra partition, otherwise no state transition can be issued).
In other word, the N+1 problem is essentially an execution planning problem, it doesn't necessarily have to be solved in the WAGED rebalancer scope in BestPossibleStateCalcStage. in fact, it shouldn't if considering a clear divide of responsibility of each component and stage.
- Compute instance weights based on currentState.
- Add additional throttling logic based on hard constraints, and apply that to partition replicas following the same sorting order.
First, let's review the current sorting and ordering in IntermediateStateCalcStage:
- (IntermediateStateCalcStage.compute) Resource priority sorting, looping through resource list in prioritized order, resource with higher priority will be first computed and assigned
- (computeIntermediatePartitionState) For each resource, priority sorting on partition
- For each partition, extract messages for that resource partition, sorting based on message priority
- For each message, apply throttling logic accordingly
Where/how to integrate the hard constraints into throttling? -- StateTransitionThrottleController.java
StateTransitionThrottleController handles the throttling logic in 3 different levels: Cluster, Resource, and Instance. It's instantiated in each pipeline based on cached data and/or event context.
We should be expanding the instance level throttle with additional hard constraints limit. The updated StateTransitionThrottleController should achieve the following: It should know:
- Current live instances
- The instance capacity and current utilization
- Resource partition weights
It should do:
- Keep track of the utilization on the host, ideally populated from currentstates and cluster/resource configs
- Always fetching the latest instance and resource weights, keeping the instance technically stateless across pipelines
- Determines the weights for each incoming message and make decision based on current instance capacity
- When "charge" instance on each message, update the capacity accordingly
Suppose there is a resource with 4 partitions, with replication factor 2, and there are 6 instances {A, B, C, D, E, F} For simplicity, ONLINE-OFFLINE model is used in this example Partition view (ideal states)
Instance view (current states)
To finish the transitions, the sequence of best possible states will be:
DB_1: [A, B, C] -> [A, B, C, D] -> [A, B, D]
DB_2: [D, E, F] -> [C, D, E, F] -> [C, E, F]
State transitions needed:
ST1 D-(DB_1 UP)
ST2 C-(DB_1 DOWN)
ST3 C-(DB_2 UP)
ST4 D-(DB_2 DOWN)
Assume node capacity: C: 2 D: 3 Assume partition size all equals 1
//The throttler will block the state transition if the projected capacity (currentstates + ST) exceeds the node's capacity.
//State transitions are checked based on priority order. Instance capacity is reduced in the throttler after each upward ST is accepted.
C: {DB_1, DB_3} + DB_2 UP => 3, exceeding C's capacity, so it shouldn't be allowed
D: {DB_2, DB_3} + DB_1 UP => 3, leading to projected current state {DB_1, DB_2, DB_3}, it should be allowed
In the first pipeline, DB_2 UP is throttled, while DB_1 UP is allowed
The sequence of current states and the order of state transition will happen:
====== pipeline 1 ======
best possible:
DB_1: [A, B, C, D]
DB_2: [C, D, E, F]
externalview:
DB_1: [A, B, C, D] -- ST1 D-(DB_1 UP)
DB_2: [D, E, F]
current states:
C: {DB_1, DB_3}
D: {DB_1, DB_2, DB_3}
====== pipeline 2 ======
best possible:
DB_1: [A, B, D]
DB_2: [C, D, E, F]
externalview:
DB_1: [A, B, D] -- ST2 C-(DB_1 DOWN)
DB_2: [D, E, F]
current states:
C: {DB3}
D: {DB_1, DB_2, DB_3}
====== pipeline 3 ======
best possible: DB_1: [A, B, D] DB_2: [C, D, E, F]
externalview:
DB_1: [A, B, D]
DB_2: [C, D, E, F] -- ST3 C-(DB_2 UP)
current states:
C: {DB_2, DB_3}
D: {DB_1, DB_2, DB_3}
====== pipeline 4 ======
best possible:
DB_1: [A, B, D]
DB_2: [C, E, F]
externalview: DB_1: [A, B, D] DB_2: [C, E, F] -- ST4 D-(DB_2 DOWN)
current states: C: {DB_2, DB_3} D: {DB_1, DB_3}
We apply hard constraints at throttling, as a result, only "legal" state transitions will be dispatched so that the instance is never over utilized.
It's a relatively general solution for constraint-based problem during state transition, which is not necessarily bounded with WAGED.
Messages with higher priority (at resource, partition, message levels) won't be throttled before messages with lower priority.
There is a tradeoff between overall fairness/evenness and transition efficiency
- The tradeoff arises on how we handle the case when ST is throttled on capacity.
- The target instance of a ST message is fixed as long as ideal state mapping stays the same.
-
- The algorithm and pipeline setup DOES NOT allow "regret" or downgrade to a sub-par location for the time being.
- On the bright side, this ensures better eventual evenness and reduces unnecessary partition movement.
-
- Message with higher priority is considered first, thus less likely to be throttled, as always.
Pros:
- A clean separate of responsibility between ideal mapping calculation and execution planning
- The logic is integrated with throttling while current sorting logic and special downgrade options are also satisfied
Cons:
- WAGED cluster will have longer convergence time, e.g. it will take more pipelines to complete a collection of state transitions
Is it specific to WAGED or could be generalized?
In ideal case, the rules applied at throttling stage should be independent from the algorithm we use. In this particular case, the update to StateTransitionThrottleController should be made as general as possible. The relationship between the new throttling and WAGED weights should be loosely coupled. From the throttling controller's point of view, the input could come from anywhere as long as it specifies the quota, capacity and rules.
Why minActiveReplica is not a hard constraint to begin with?
The concepts of delayed rebalancer and minActiveReplica are prior to WAGED, but the the contract and interface for hard constraint is different. Hard constraints designed in WAGED check for one partition on one candidate node, while minActiveReplica is an aggregated view. To ensure minActiveReplica, we are producing "temporary" state to apply on the cluster to reduce movement. The results are not persisted. This is different from other flows in constraint based algorithm.
How will this impact CRUSHed?
For now, the logic should only be applied to WAGED with a config. CRUSHed behavior should remain the same for backward compatibility. CRUSHed doesn't have the concept of "weights" to begin with, adding these new fundamental concept is inappropriate. We have to realize these are two very different algorithms and approaches. The goal is not to unify throttling for CRUSHed and WAGED. Instead, CRUSHed will enter maintenance mode eventually, no new feature will be added there.
The alignment with long term strategic goals?
We'll be moving toward microservices architecture, in the new Helix architecture, the current rebalance pipeline will be replaced by several independent components: Calculation, Scheduling/planning and Execution etc. The new throttling logic should be a implemented as a component that will be integrated into the scheduling part.
When creating AssignableNode, not only do we assign Partition that would not move (current behavior), but we also assign partitions that would move (new behavior). Then, partitions that would move will be assigned again into their new positions; in other words, the weights of the partitions that would move are accounted for twice.
This approach is not selected because it applies "short-visioned" strategy and not ideal overall.
- Create AssignableNodes and assign all non-candidate partitions (meaning partitions that won't move) (current behavior);
- Start rebalance algorithm;
- For each partition placement, assign the partition both in the old location, and the new location (new behavior).
Instead of failing the rebalance (current behavior), as long as any partition succeeds in moving, we continue the rebalance. Since WAGED algorithm is deterministic, the failed movements will be attempted again in the next pipeline. Eventually, as long as at least 1 partition moves each time, the movements will be concluded in the end.
Places to be changed:
ConstraintBasedAlgorithm.calculate()
Need to find and assign candidate replicas to its old position The calculate() method need to run for all nodes even if some replica cannot find assignable node due to capacity issue.
Replicas that can't find assignable node will be skipped in this cycle If no replica can be assigned in a cycle, throw FAILED_TO_CALCULATE exception, this means even baby step doesn't work, this is the time we call a stop.
Overall comment: It changes the previous assumptions on WAGED rebalancer:
Now "temporary state" is included as algorithm input. The calculation result is still deterministic, but it becomes more dynamic. The overall result is WAGED becoming more complicated and result less explainable.
This approach comes up with a solution tailored for WAGED, with a cost that the responsibility boundary becomes blur between each component. Now the ideal state computation takes into account whether a state transition is possible at a given time, which strictly speaking, belongs to scheduling stage.
Pros:
- Relatively a straightforward approach by counting weights in both the old and new positions.
- Change set is limited to WAGED rebalancer.
Cons:
- A "short-visioned" strategy is applied: The eventual cluster evenness will suffer:
Because the algorithm takes in "temporary" state including the moving out partitions, it's possible the picked assignable node is no longer optimal once the previous state transition finishes.
Even worse, "Flip flop": The strategy can lead to cascading effect where cluster enters a loop of load balancing, in cases when the cluster is configured in favor of evenness over less movement.
- Another level of complexity added to WAGED rebalancer core, another patch to apply that diverges from its original design. Less accountability and explainability on why a ST happens (or not happen)
To recap the current behavior in WAGED:
During each pipeline iteration, if there are temporarily downed instance, and there are partitions falling short on minActiveReplicas, rebalance overwrites is applied:
- Find all replicas that are not on immediately available nodes (excluding all downed nodes regardless of delayed rebalance settings);
- Sort Candidate Replica based on their sizes;
- For each Candidate Replica, find the best fitting node and assign it.
This maintains the minActiveReplicas requirement while minimize the partition movement, an idea and behavior inherited from delayed rebalancer. The logic is in WagedRebalancer.applyRebalanceOverwrite.
This is an assignment of a temporary state, which is not persisted, but is the in the best possible state therefore applied to the cluster.
Overwrite as a patch shouldn’t be used to begin with, it shouldn’t be a afterthought. It should happen together with the main computation logic.
We are going to repurpose the emergency rebalance
The complication part is the combination of permanently down nodes and partitions missing minActiveReplicas, especially when they are happening together.
Below are all possible cases:
Two checks should be done:
- node-less partitions ? Check if partition is assigned to permanent down instance (same in current emergency rebalance)
- missing minActiveReplica ? Check if assignment needs rebalance overwrites (existing logic in WagedRebalancer.requireRebalanceOverwrite)
The first row of the table are for current behavior in the emergency rebalance process.
A new clusterModel scope should be created to address minActiveReplica (rebalance overwrite process):
- The to-be-assigned replicas are partitions that are missing activeReplicas AND are assigned to temporarily downed instance
- The assignable instances are current immediate live instances
Therefore the rebalance overwrites is formally computed by the WAGED algorithm and thus satisfies the hard constraints.
If both checks are positive, meaning we need to handle both partitions on permanent downed instances, and partitions that are missing minActiveReplicas while instances in delayed window.
Two rebalance calculation will happen in sequential order
- First compute for partitions on permanent downed instances
- Then compute for rebalance overwrites.
There are also difference on the two calculation, whether they are persisted or not:
- To assign the node-less partition, the results should be persisted because these are for stable/non-temporary assignments
- Rebalance overwrites assignment shouldn't be persisted because these are temporary assignments, we decided to do so to minimize movement.
Above inherits the current behavior as possible. Any change to the persistence is not directly related to the hard constraint expansion and is not in the scope of this design.
Hard constraints are met because there is no "afterthought" anymore. All constraints are honored because it's calculated directly by the rebalance algorithm.
The rebalance scope is restricted to the partitions missing minActiveReplicas on the temporarily downed instance, those partitions will be temporarily assigned to active nodes.
If minActiveReplicas is not met during delayed window, the assignment result is not persisted, which keeps the behavior of reduced partition movement.
Pull Request Description Template
ZooKeeper API module for Apache Helix
DataAccessor for Assignment Metadata
Concurrency and Parallelism for BucketDataAccessor
WAGED Rebalance Pipeline Redesign
WAGED rebalancer Hard Constraint Scope Expansion
IdealState Dependency Removal Progression Remove requested state in Task Framework