-
Notifications
You must be signed in to change notification settings - Fork 228
WAGED Rebalance Pipeline Redesign
After customers onboard WAGED rebalancer, it has become apparent that large clusters cause performance issues for the current WAGED implementation. As outlined in recent investigations, large clusters with 25k+ partitions suffer from slow controller pipelines due to WAGED calculation, and the performance must be improved before wider adoption.
There are currently several places where WAGED algorithm is executed:
Rebalance Type | Purpose | Scope | Async? | Produces |
---|---|---|---|---|
Global Rebalance | Creates Baseline Assignment. Baseline is used as an anchor/reference for partial rebalance. | Partitions based on cluster change, either all resources of the cluster or partial resources. (The cluster model for calculation is based on previous Baseline assignment) | Yes | Baseline Assignment |
Partial Rebalance | Creates Best Possible Assignment. This assignment is the ideal placement that's persisted to the cluster. | Partitions based on current states and difference with Baseline assignment. (The cluster model for calculation is based on previous Best Possible assignment) | No | Best Possible Assignment |
Rebalance Overwrite | Only when nodes are down and delayed rebalance is enabled - creates an assignment that's based on immediately available nodes (excluding all downed nodes regardless of delayed rebalance settings). This assignment is combined with Best Possible Assignment, if Best Possible doesn't meet minActiveReplicas. | Partitions based on downed nodes. (The cluster model for calculation is based on previous Baseline assignment) | No | (An assignment of a temporary state, which is not persisted, but is the final product applied to IdealState) |
During each pipeline iteration, the following steps happen during Partial Rebalance:
- Based on baseline divergence, find all replicas that need to have their placements recalculated (Candidate Replica);
- Sort Candidate Replica based on their sizes (largest replica are placed first);
- For each Candidate Replica, find the best fitting node and assign it.
During each pipeline iteration, the following steps happen during Rebalance Overwrite:
- Find all replicas that are not on immediately available nodes (Candidate Replica);
- Sort Candidate Replica based on their sizes (largest replica are placed first);
- For each Candidate Replica, find the best fitting node and assign it.
(The steps only feature work done in the main thread; work done in the asynchronous thread does not significantly impact performance and is not listed.)
When WAGED calculation is slow, bottlenecks can be identified in the above steps; improvements and optimizations are necessary for such bottlenecks.
After reports of large clusters converging slowly, it's observed that the partial rebalance latency (which is roughly equivalent to summed latency of the above 3 steps of partial rebalance) is as high as 30 to 40 seconds; meanwhile, the cluster has 60% baseline divergence and rising. The main thread workload can be calculated based on baseline divergence - for a cluster with 25k replica, 60% baseline divergence means that there are 15k Candidate Replica. That means for each pipeline iteration, the controller is working with 15k replica for each of its 3 steps.
Because of the massive cluster size and Candidate Replica load, 3 problems are revealed:
- The sorting algorithm cannot finish within a reasonable timeframe;
- Generating a cluster model for a large cluster is expensive;
- The find-best-node algorithm is doing mostly wasted work, since the algorithm is deterministic, and the cluster state remains the same.
Whether 60% baseline divergence of a 25k replica cluster, or a lower divergence of a larger cluster, these 3 problems will have impact on rebalancer performance when workload is large. Therefore, these 3 problems must be addressed.
For problem 1, Helix developers have sped up the sorting algorithm to address it.
For problem 2 and 3, a new solution is required and will be documented here.
The rebalancer's functionality can be further divided into several categories with different priorities:
- Improve evenness;
- Assign new partitions.
- Assign node-less partitions;
- Ensure minActiveReplica.
Each category can be further sorted by priorities:
Rebalancer Functionality | Description | Priority | Who does this? |
---|---|---|---|
Ensure minActiveReplica | Exclude downed or disabled nodes regardless of delayed rebalance window, then ensure minActiveReplica. | Immediately Required | Rebalance Override (if nodes are down but in the delayed rebalance window, and if partitions are missing minActiveReplica, rebalance override add assignments to fulfill minActiveReplica) |
Assign node-less partitions | If partitions were assigned to nodes that are permanently downed, assign the partitions somewhere else. | Preferably Immediately Required | Partial Rebalance (if node space permanently changes, partial rebalance will reassign partitions that no longer have a valid node assigned) |
Ensure evenness | Move existing partitions so that a better weight evenness is achieved. | Eventually Required |
Partial Rebalance (scope based on Baseline) Global Rebalance (scope based on cluster changes) |
Assign new partitions | Assign partitions of newly added Helix resources. | Eventually Required |
Partial Rebalance Global Rebalance |
The proposed design moves or keeps every immediately required actions to the synchronous thread, and moves or keeps every eventually required actions to the asynchronous thread.
After the change, the following steps happen in the main thread:
1) If necessary, calculate new partition assignment based on live and enabled nodes:
- Find all replicas that are assigned to permanently downed nodes; they need to have their placements recalculated (Candidate Replica);
- Sort Candidate Replica based on their sizes;
- For each Candidate Replica, find the best fitting node and assign it.
2) If necessary, start Rebalance Overwrite.
Main thread step 1 addresses "Assign node-less partitions". Main thread step 2 addresses "Ensure minActiveReplica". Note the ordering because the first step may already satisfy minActiveReplica.
Instead of running Partial Rebalance every time in the main thread, which means recreating cluster models and running the find-best-node algorithm every time, the main thread is now conditional and/or has stricter conditions ("if necessary"). That means if not necessary (which is often the case), no cluster model will be created and no wasted work will be done; if necessary, the rebalance scope is much smaller.
The proposed design added a new set of steps to handle "Assign node-less partitions"; combining with the existing Rebalance Override, we will name all main thread logic Emergency Rebalance.
Rebalance Type | Description | Scope | Async? | Produces |
---|---|---|---|---|
Emergency Rebalance (New) |
|
Step 1: partitions assigned to permanently downed nodes. Step 4: partitions assigned to nodes that are not immediately available. |
No |
Step 1 optionally produces Best Possible Assignment. Step 4 creates a temporary state that's not persisted (like Rebalance Overwrite behavior). |
Global Rebalance | (Unchanged) | (Unchanged) | Yes | Baseline Assignment |
Partial Rebalance (Changed) |
|
(Unchanged) | Yes (Changed) | Best Possible Assignment, only if new result differs from the old persisted result. (Changed) |
Rebalance Overwrite (Combined) |
(Now combined with Emergency Rebalance) | N/A | N/A | N/A |
Main thread before this change:
(Note that Rebalance Overwrite is omitted from the graph for the sake of simplicity. It could alter the Best Possible assignment before the assignment is applied to IdealStates. )
Assume that at all units of time, there is a cluster event, which triggers partial rebalance.
Assume that BestPossibleAssignment takes 2 units of time to compute. The t unit is arbitrary and does not scale with real latency.
Thread | Pipeline | t=n | t=n+1 | t=n+2 | t=n+3 | t=n+4 |
---|---|---|---|---|---|---|
Cluster event |
A cluster event happens and triggers Partial Rebalance. |
A cluster event is queued because the main thread is blocked. |
A cluster event is dequeued and triggers Partial Rebalance. A cluster event is queued because the main thread is blocked. |
A cluster event is queued because the main thread is blocked. |
A cluster event is dequeued and triggers Partial Rebalance. A cluster event is queued because the main thread is blocked. |
|
Main thread | Partial Rebalance |
|
|
|
|
|
The purpose of this illustration is to show the current thread structure, and the fact that slow BestPossibleAssignment computation leads to a slow responding pipeline.
WAGED thread structure after this change:
Assume there are cluster events at t=n, t=n+1, t=n+2; the cluster event at t=n and t=n+1 are permanent node down events, so Emergency Rebalance kicks in. A
Assume that Partial Rebalance takes 2 units of time to compute BestPossible, and Emergency Rebalance takes 1 unit of time to compute due to smaller scope. The t unit is arbitrary and does not scale with real latency.
Thread | Pipeline | t=n | t=n+1 | t=n+2 | t=n+3 | t=n+4 |
---|---|---|---|---|---|---|
Cluster event |
A cluster event happens, and triggers Emergency Rebalance. It's a permanent node down event so Emergency Rebalance computes. |
A cluster event happens, and triggers Emergency Rebalance. It's a permanent node down event so Emergency Rebalance computes. |
A cluster event happens, and triggers Emergency Rebalance. |
Nothing happens. |
The pipeline is triggered by the asynchronous thread because Partial Rebalance has finished computing. |
|
Main thread | Emergency Rebalance |
|
|
|
Nothing happens. |
|
Asynchronous thread | Partial Rebalance (#1) |
|
|
|
||
Partial Rebalance (#2) |
|
|
With the new thread structure, the main pipeline is doing minimal necessary computation work. The work is fast and is optional; unless invalid placements occur, the pipeline will be skipped.
As mentioned in previous sections, both main thread and asynchronous thread are computing BestPossibleAssignments. While conceptually they are different, they are both stored in the same place in ZooKeeper and they are both written to IdealState. To avoid race condition, the priority is given to the main thread.
If main thread does not compute assignment:
Since Emergency Rebalance only computes due to nodes permanently down or minActiveReplicas, usually it does nothing.
In this case, the main thread will attempt to create a Partial Rebalance thread; if it already exists, it will not.
If main thread does compute assignment:
If Emergency Rebalance is triggered, it will immediately cancel any existing Partial Rebalance thread. Only when it finishes computing, and finishes persisting, it will start a new Partial Rebalance thread.
Thus there will be the following cases:
Case 1: Partial Rebalance → Partial Rebalance
This is the expected case during normal operations. Cluster events happen, and none of them are permanent node down or minActiveReplica violation. Emergency Rebalance does not compute; the main thread attempts to spawn Partial Rebalance threads, before it applies the previous BestPossibleAssignment that was persisted to the cluster.
Existing Partial Rebalance threads compute in peace until finishing. Each Partial Rebalance calculation will be based on the previous Partial Rebalance calculation.
Once a Partial Rebalance thread finishes computing, it persists the result and triggers the main thread which applies the new result to the cluster.
Note that there's no possibility that a thread spawning cycle starts: Partial Rebalance only persists and triggers the main thread if the computed result is different from the previous result; else, the thread quietly dies and the controller awaits the next real cluster events.
This case is similar to the current WAGED pipelines, except that only different results are persisted.
Case 2: Partial Rebalance → Emergency Rebalance
A cluster event of permanent node down or minActiveReplica violation happens. Emergency Rebalance kills the existing (singular) Partial Rebalance thread and starts computing. a new BestPossibleAssignment with limited scope will be computed, persisted, and returned. Then, a new Partial Rebalance thread will be spawned.
The Emergency Rebalance calculation will be based on the previous Partial Rebalance calculation (minimal movement). The future Partial Rebalance calculation will be based on the current Emergency Rebalance calculation (minimal movement).
Case 3: Emergency Rebalance → Emergency Rebalance
If cluster events such as permanent node down and minActiveReplica violation keeps on happening, every event will cause the main thread to compute. Therefore, 1) a new BestPossibleAssignment with limited scope will be computed (fast) every pipeline iteration in the main thread, 2) each Emergency Rebalance calculation will be based on the previous Emergency Rebalance calculation, 3) Partial Rebalance threads are created and then killed immediately.
In this scenario, every calculation has the minimal amount of scope, and the main thread is busy "putting out fire". One may note that the evenness is not reinforced, but when every cluster event is catastrophic, the priority should be resolving the problem instead of evenness.
Case 4: Emergency Rebalance → Partial Rebalance
When cluster goes back to normal, Emergency Rebalance does not compute and the existing Partial Rebalance finishes in peace. Now back to case 1.
Note
In the unlikely event that the asynchronous partial rebalance finishes very fast, one of two situations happen:
- The main thread has started computation. If so, the main thread already tried to kill the asynchronous thread before it can persist.
- The main thread has not started computation. If so, the asynchronous thread will successfully write its result to ZooKeeper. This is okay, because this result is based on an previous BestPossible, meaning movements are minimized, and the new computation will be based on this persisted result, meaning movements are minimized.
Dotted lines indicate asynchronous work.
The main thread performance is expected to improve drastically due to 2 reasons:
- Without invalid placements, the main thread does nothing. The WAGED pipeline will have 0 calculation latency in this case.
- With invalid placements, the main thread only recalculates placements for the partitions on permanently down nodes or to fulfill minActiveReplica, which should be a relatively smaller number of partitions comparing to the total partitions in a cluster. The pipeline will have minimally required calculation latency in this case.
During usual circumstances, we anticipate the main pipeline to have performance in order of 100 milliseconds.
N/A. This is a controller change with no impact on customers.
As the proposed design concerns race conditions and multithread operations, the development team must carefully verify the logic to avoid possible deadlocks or data-corrupting race conditions.
The evenness must also be ensured - test clusters must be put under prolonged cluster events to show how their evenness changes over time.
Pull Request Description Template
ZooKeeper API module for Apache Helix
DataAccessor for Assignment Metadata
Concurrency and Parallelism for BucketDataAccessor
WAGED Rebalance Pipeline Redesign
WAGED rebalancer Hard Constraint Scope Expansion
IdealState Dependency Removal Progression Remove requested state in Task Framework