Skip to content

Commit

Permalink
Add current roles to the node pool status (#9458)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj authored Dec 13, 2023
1 parent 58882a1 commit 1612b71
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({ "conditions", "observedGeneration", "nodeIds", "clusterId", "replicas", "labelSelector" })
@JsonPropertyOrder({ "conditions", "observedGeneration", "nodeIds", "clusterId", "roles", "replicas", "labelSelector" })
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaNodePoolStatus extends Status {
private static final long serialVersionUID = 1L;

private List<Integer> nodeIds;
private String clusterId;
private List<ProcessRoles> roles;

// Replicas and label selector are required for scale subresource
private int replicas;
Expand All @@ -54,6 +55,15 @@ public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}

@Description("The roles currently assigned to this pool.")
public List<ProcessRoles> getRoles() {
return roles;
}

public void setRoles(List<ProcessRoles> roles) {
this.roles = roles;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Description("The current number of pods being used to provide this resource.")
public int getReplicas() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public KafkaNodePoolStatus generateNodePoolStatus(String clusterId) {
return new KafkaNodePoolStatusBuilder()
.withClusterId(clusterId)
.withNodeIds(new ArrayList<>(idAssignment.desired()))
.withRoles(processRoles.stream().sorted().toList())
.withReplicas(idAssignment.desired().size())
.withLabelSelector(getSelectorLabels().toSelectorString())
.withConditions(warningConditions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static KafkaNodePool convertKafkaToVirtualNodePool(Kafka kafka, Integer e
.endSpec()
.withNewStatus()
.withNodeIds(nodeIds)
.withRoles(ProcessRoles.BROKER)
.endStatus()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@ public void testNodesAndStatuses() {
assertThat(statuses.get("controllers").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=controllers"));
assertThat(statuses.get("controllers").getNodeIds().size(), is(3));
assertThat(statuses.get("controllers").getNodeIds(), hasItems(0, 1, 2));
assertThat(statuses.get("controllers").getRoles().size(), is(1));
assertThat(statuses.get("controllers").getRoles(), hasItems(ProcessRoles.CONTROLLER));
assertThat(statuses.get("brokers").getReplicas(), is(3));
assertThat(statuses.get("brokers").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=brokers"));
assertThat(statuses.get("brokers").getNodeIds().size(), is(3));
assertThat(statuses.get("brokers").getNodeIds(), hasItems(1000, 1001, 1002));
assertThat(statuses.get("brokers").getRoles().size(), is(1));
assertThat(statuses.get("brokers").getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,14 @@ public void testNodesAndStatuses() {
assertThat(statuses.get("pool-a").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=pool-a"));
assertThat(statuses.get("pool-a").getNodeIds().size(), is(3));
assertThat(statuses.get("pool-a").getNodeIds(), hasItems(0, 1, 2));
assertThat(statuses.get("pool-a").getRoles().size(), is(1));
assertThat(statuses.get("pool-a").getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(statuses.get("pool-b").getReplicas(), is(2));
assertThat(statuses.get("pool-b").getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=pool-b"));
assertThat(statuses.get("pool-b").getNodeIds().size(), is(2));
assertThat(statuses.get("pool-b").getNodeIds(), hasItems(10, 11));
assertThat(statuses.get("pool-b").getRoles().size(), is(1));
assertThat(statuses.get("pool-b").getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public void testKafkaPool() {
assertThat(status.getLabelSelector(), is("strimzi.io/cluster=my-cluster,strimzi.io/name=my-cluster-kafka,strimzi.io/kind=Kafka,strimzi.io/pool-name=pool"));
assertThat(status.getNodeIds().size(), is(3));
assertThat(status.getNodeIds(), hasItems(10, 11, 13));
assertThat(status.getRoles().size(), is(1));
assertThat(status.getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ public void testConvertMinimalKafka() {

// Status
assertThat(pool.getStatus().getNodeIds(), is(nullValue()));
assertThat(pool.getStatus().getRoles().size(), is(1));
assertThat(pool.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand All @@ -206,6 +208,8 @@ public void testConvertKafkaWithExistingReplicas() {
// Status
assertThat(pool.getStatus().getNodeIds().size(), is(3));
assertThat(pool.getStatus().getNodeIds(), hasItems(0, 1, 2));
assertThat(pool.getStatus().getRoles().size(), is(1));
assertThat(pool.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
}

@Test
Expand Down Expand Up @@ -267,5 +271,7 @@ public void testConvertMaximalKafka() {
// Status
assertThat(pool.getStatus().getNodeIds().size(), is(3));
assertThat(pool.getStatus().getNodeIds(), hasItems(0, 1, 2));
assertThat(pool.getStatus().getRoles().size(), is(1));
assertThat(pool.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,14 @@ public void testReconcileKafkaScaleDown(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(2));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

async.flag();
})));
Expand Down Expand Up @@ -543,10 +547,14 @@ public void testReconcileKafkaScaleUp(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(4));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1, 2, 5)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

async.flag();
})));
Expand Down Expand Up @@ -601,14 +609,20 @@ public void testReconcileAddPool(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(3));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolC = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-c").get();
assertThat(poolC.getStatus().getReplicas(), is(2));
assertThat(poolC.getStatus().getNodeIds(), is(List.of(5, 6)));
assertThat(poolC.getStatus().getRoles().size(), is(1));
assertThat(poolC.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

async.flag();
})));
Expand Down Expand Up @@ -658,14 +672,20 @@ public void testReconcileAndRemovePool(VertxTestContext context) {
KafkaNodePool poolA = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-a").get();
assertThat(poolA.getStatus().getReplicas(), is(3));
assertThat(poolA.getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(poolA.getStatus().getRoles().size(), is(1));
assertThat(poolA.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolB = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").get();
assertThat(poolB.getStatus().getReplicas(), is(2));
assertThat(poolB.getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(poolB.getStatus().getRoles().size(), is(1));
assertThat(poolB.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

KafkaNodePool poolC = Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-c").get();
assertThat(poolC.getStatus().getReplicas(), is(2));
assertThat(poolC.getStatus().getNodeIds(), is(List.of(5, 6)));
assertThat(poolC.getStatus().getRoles().size(), is(1));
assertThat(poolC.getStatus().getRoles(), hasItems(ProcessRoles.BROKER));

// Remove pool-b
Crds.kafkaNodePoolOperation(client).inNamespace(NAMESPACE).withName("pool-b").withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down Expand Up @@ -359,9 +360,13 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -778,9 +783,13 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -955,9 +964,13 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -1119,11 +1132,17 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().size(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getNodeIds(), is(List.of(5, 6)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getObservedGeneration(), is(1L));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(2).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b", "pool-c")));

// Assert the info passed over for Cruise Control
Expand Down Expand Up @@ -1295,8 +1314,12 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
assertThat(kafkaNodePoolStatusCaptor.getAllValues().size(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getReplicas(), is(3));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getNodeIds(), is(List.of(0, 1, 2)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(0).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getReplicas(), is(2));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getNodeIds(), is(List.of(3, 4)));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles().size(), is(1));
assertThat(kafkaNodePoolStatusCaptor.getAllValues().get(1).getStatus().getRoles(), hasItems(ProcessRoles.BROKER));
assertThat(kao.state.kafkaStatus.getKafkaNodePools().stream().map(UsedNodePoolStatus::getName).toList(), is(List.of("pool-a", "pool-b")));

// Assert the info passed over for Cruise Control
Expand Down
2 changes: 2 additions & 0 deletions documentation/modules/appendix_crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3499,6 +3499,8 @@ Used in: xref:type-KafkaNodePool-{context}[`KafkaNodePool`]
|integer array
|clusterId 1.2+<.<a|Kafka cluster ID.
|string
|roles 1.2+<.<a|The roles currently assigned to this pool.
|string (one or more of [controller, broker]) array
|replicas 1.2+<.<a|The current number of pods being used to provide this resource.
|integer
|labelSelector 1.2+<.<a|Label selector for pods providing this resource.
Expand Down
Loading

0 comments on commit 1612b71

Please sign in to comment.