Skip to content

Commit

Permalink
Adding KafkaRebalance add/remove broker modes for rebalancing after…
Browse files Browse the repository at this point in the history
…/before scale up/down (#6800)

* Added rebalancing mode and brokers to the KafkaRebalance API

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Updated KafkaRebalanceAssemblyOperator to handle add-broker and
remove-broker rebalancing modes
Refactored rebalancing state machine tests to cover all modes

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Updated KafkaRebalanceAssemblyOperator tests to cover new add-broker and
remove-broker modes

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Fixed comments
Added rebalance examples for add-broker and remove-broker

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Fixed api test

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Fixed comments

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Renaming modes
Fix comments

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Fixed system test

Signed-off-by: Paolo Patierno <ppatierno@live.com>

Co-authored-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
ppatierno and scholzj authored May 13, 2022
1 parent 74f2ce4 commit 63c1891
Show file tree
Hide file tree
Showing 24 changed files with 1,323 additions and 583 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
* Support for the s390x platform
_(The s390x support is currently considered as experimental. We are not aware of any issues, but the s390x build doesn't at this point undergo the same level of testing as the AMD64 container images.)_
* Update Strimzi Kafka Bridge to 0.21.5
* Added rebalancing modes on the `KafkaRebalance` custom resource
* `full`: this mode runs a full rebalance moving replicas across all the brokers in the cluster. This is the default one if not specified.
* `add-brokers`: after scaling up the cluster, this mode is used to move replicas to the newly added brokers specified in the custom resource.
* `remove-brokers`: this mode is used to move replicas off the brokers that are going to be removed, before scaling down the cluster.
* **Experimental** KRaft mode (ZooKeeper-less Kafka) which can be enabled using the `UseKRaft` feature gate.
**Important: Use it for development and testing only!**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.strimzi.api.kafka.model.balancing.KafkaRebalanceMode;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.Minimum;
import io.sundr.builder.annotations.Buildable;
Expand All @@ -19,12 +20,16 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "goals", "skipHardGoalCheck", "rebalanceDisk", "excludedTopics", "concurrentPartitionMovementsPerBroker",
@JsonPropertyOrder({ "mode", "brokers", "goals", "skipHardGoalCheck", "rebalanceDisk", "excludedTopics", "concurrentPartitionMovementsPerBroker",
"concurrentIntraBrokerPartitionMovements", "concurrentLeaderMovements", "replicationThrottle", "replicaMovementStrategies" })
@EqualsAndHashCode
public class KafkaRebalanceSpec extends Spec {
private static final long serialVersionUID = 1L;

// rebalancing modes
private KafkaRebalanceMode mode = KafkaRebalanceMode.FULL;
private List<Integer> brokers;

// Optimization goal configurations
private List<String> goals;
private boolean skipHardGoalCheck;
Expand All @@ -40,6 +45,31 @@ public class KafkaRebalanceSpec extends Spec {
private long replicationThrottle;
private List<String> replicaMovementStrategies;

@Description("Mode to run the rebalancing. " +
"The supported modes are `full`, `add-brokers`, `remove-brokers`.\n" +
"If not specified, the `full` mode is used by default. \n\n" +
"* `full` mode runs the rebalancing across all the brokers in the cluster.\n" +
"* `add-brokers` mode can be used after scaling up the cluster to move some replicas to the newly added brokers.\n" +
"* `remove-brokers` mode can be used before scaling down the cluster to move replicas out of the brokers to be removed.\n")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public KafkaRebalanceMode getMode() {
return mode;
}

public void setMode(KafkaRebalanceMode mode) {
this.mode = mode;
}

@Description("The list of newly added brokers in case of scaling up or the ones to be removed in case of scaling down to use for rebalancing. " +
"This list can be used only with rebalancing mode `add-brokers` and `removed-brokers`. It is ignored with `full` mode.")
public List<Integer> getBrokers() {
return brokers;
}

public void setBrokers(List<Integer> brokers) {
this.brokers = brokers;
}

@Description("A list of goals, ordered by decreasing priority, to use for generating and executing the rebalance proposal. " +
"The supported goals are available at https://github.com/linkedin/cruise-control#goals. " +
"If an empty goals list is provided, the goals declared in the default.goals Cruise Control configuration parameter are used.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.api.kafka.model.balancing;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

public enum KafkaRebalanceMode {
FULL("full"),
ADD_BROKERS("add-brokers"),
REMOVE_BROKERS("remove-brokers");

private String name;

KafkaRebalanceMode(String name) {
this.name = name;
}

@JsonCreator
public static KafkaRebalanceMode forValue(String value) {
switch (value) {
case "full":
return FULL;
case "add-brokers":
return ADD_BROKERS;
case "remove-brokers":
return REMOVE_BROKERS;
default:
return null;
}
}

@JsonValue
public String toValue() {
return this.name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
Expand Down Expand Up @@ -52,6 +54,25 @@ void testKafkaRebalanceWithExcludedTopics() {
createDeleteCustomResource("KafkaRebalance-excluded-topics.yaml");
}

@Test
void testKafkaRebalanceAddBroker() {
createDeleteCustomResource("KafkaRebalance-add-brokers.yaml");
}

@Test
void testKafkaRebalanceRemoveBroker() {
createDeleteCustomResource("KafkaRebalance-remove-brokers.yaml");
}

@Test
void testKafkaRebalanceWrongMode() {
Throwable exception = assertThrows(
KubeClusterException.class,
() -> createDeleteCustomResource("KafkaRebalance-wrong-mode.yaml"));

assertThat(exception.getMessage(), containsString("spec.mode: Unsupported value: \"wrong-mode\": supported values: \"full\", \"add-brokers\", \"remove-brokers\""));
}

@BeforeAll
void setupEnvironment() {
cluster.createCustomResources(TestUtils.CRD_KAFKA_REBALANCE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: add-brokers
brokers: [3,4]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: remove-brokers
brokers: [3,4]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
spec:
mode: wrong-mode
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ kind: "KafkaRebalance"
metadata:
name: "my-rebalance"
spec:
mode: "full"
goals:
- "DiskCapacityGoal"
- "CpuCapacityGoal"
Expand Down
Loading

0 comments on commit 63c1891

Please sign in to comment.