From 0407c527a6ea3795e2a79b38cace6a8232598f41 Mon Sep 17 00:00:00 2001 From: Jakub Scholz Date: Tue, 30 Jul 2024 08:42:13 +0200 Subject: [PATCH] Restart the correct Pod when resizing PVCs (#10396) Signed-off-by: Jakub Scholz --- .../operator/cluster/model/KafkaCluster.java | 16 +- .../operator/assembly/KafkaReconciler.java | 16 +- .../operator/assembly/PvcReconciler.java | 23 +-- .../assembly/ZooKeeperReconciler.java | 6 +- .../model/KafkaClusterWithKRaftTest.java | 20 ++ .../KafkaAssemblyOperatorPodSetTest.java | 185 +++++++++++++++++- .../KafkaAssemblyOperatorWithKRaftTest.java | 139 +++++++++++++ .../operator/assembly/PvcReconcilerTest.java | 22 +-- 8 files changed, 391 insertions(+), 36 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index 220d432749d..e5eb05fa4dd 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -535,7 +535,7 @@ public KafkaPool nodePoolForNodeId(int nodeId) { } } - throw new RuntimeException("Node ID " + nodeId + " does not belong to any known node pool!"); + throw new NodePoolNotFoundException("Node ID " + nodeId + " does not belong to any known node pool!"); } /** @@ -2000,4 +2000,18 @@ private Labels brokersSelector() { return labels.strimziSelectorLabels(); } } + + /** + * Exception used to indicate that a matching Node Pool was not found + */ + public static final class NodePoolNotFoundException extends RuntimeException { + /** + * Creates new exception + * + * @param message Error message + */ + public NodePoolNotFoundException(String message) { + super(message); + } + } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index 93b90136089..d362f14dd16 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -483,9 +483,19 @@ protected Future pvcs(KafkaStatus kafkaStatus) { List pvcs = kafka.generatePersistentVolumeClaims(); return new PvcReconciler(reconciliation, pvcOperator, storageClassOperator) - .resizeAndReconcilePvcs(kafkaStatus, podIndex -> KafkaResources.kafkaPodName(reconciliation.name(), podIndex), pvcs) - .compose(podsToRestart -> { - fsResizingRestartRequest.addAll(podsToRestart); + .resizeAndReconcilePvcs(kafkaStatus, pvcs) + .compose(podIdsToRestart -> { + for (Integer podId : podIdsToRestart) { + try { + fsResizingRestartRequest.add(kafka.nodePoolForNodeId(podId).nodeRef(podId).podName()); + } catch (KafkaCluster.NodePoolNotFoundException e) { + // We might have triggered some resizing on a PVC not belonging to this cluster anymore. + // This could happen for example with old PVCs from removed nodes. We will ignore it with + // a warning. + LOGGER.warnCr(reconciliation, "Node with ID {} does not seem to belong to this Kafka cluster and cannot be marked for restart due to storage resizing", podId); + } + } + return Future.succeededFuture(); }); } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/PvcReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/PvcReconciler.java index 770ae2ad0f8..c7d3af05cb6 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/PvcReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/PvcReconciler.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Locale; import java.util.Set; -import java.util.function.Function; /** * This class reconciles the PVCs for the Kafka and ZooKeeper clusters. It has two public methods: @@ -50,18 +49,17 @@ public PvcReconciler(Reconciliation reconciliation, PvcOperator pvcOperator, Sto /** * Resizes and reconciles the PVCs based on the model and list of PVCs passed to it. It will return a future with - * collection containing a list of pods which need restart to complete the filesystem resizing. The PVCs are only - * created or updated. This method does not delete any PVCs. This is done by a separate method which should be + * collection containing a list of IDs of nodes which need restart to complete the filesystem resizing. The PVCs are + * only created or updated. This method does not delete any PVCs. This is done by a separate method which should be * called separately at the end of the reconciliation. * - * @param kafkaStatus Status of the Kafka custom resource where warnings about any issues with resizing will be added - * @param podNameProvider Function to generate a pod name from its index - * @param pvcs List of desired PVC used by this controller + * @param kafkaStatus Status of the Kafka custom resource where warnings about any issues with resizing will be added + * @param pvcs List of desired PVC used by this controller * - * @return Future with list of pod names which should be restarted to complete the filesystem resizing + * @return Future with set of node IDs which should be restarted to complete the filesystem resizing */ - public Future> resizeAndReconcilePvcs(KafkaStatus kafkaStatus, Function podNameProvider, List pvcs) { - Set podsToRestart = new HashSet<>(); + public Future> resizeAndReconcilePvcs(KafkaStatus kafkaStatus, List pvcs) { + Set podIdsToRestart = new HashSet<>(); List> futures = new ArrayList<>(pvcs.size()); for (PersistentVolumeClaim desiredPvc : pvcs) { @@ -79,9 +77,8 @@ public Future> resizeAndReconcilePvcs(KafkaStatus kafkaStatus return Future.succeededFuture(); } else if (currentPvc.getStatus().getConditions().stream().anyMatch(cond -> "FileSystemResizePending".equals(cond.getType()) && "true".equals(cond.getStatus().toLowerCase(Locale.ENGLISH)))) { // The PVC is Bound and resized but waiting for FS resizing => We need to restart the pod which is using it - String podName = podNameProvider.apply(getPodIndexFromPvcName(desiredPvc.getMetadata().getName())); - podsToRestart.add(podName); - LOGGER.infoCr(reconciliation, "The PVC {} is waiting for file system resizing and the pod {} needs to be restarted.", desiredPvc.getMetadata().getName(), podName); + podIdsToRestart.add(getPodIndexFromPvcName(desiredPvc.getMetadata().getName())); + LOGGER.infoCr(reconciliation, "The PVC {} is waiting for file system resizing and the pod using it might need to be restarted.", desiredPvc.getMetadata().getName()); return Future.succeededFuture(); } else { // The PVC is Bound and resizing is not in progress => We should check if the SC supports resizing and check if size changed @@ -103,7 +100,7 @@ public Future> resizeAndReconcilePvcs(KafkaStatus kafkaStatus } return Future.all(futures) - .map(podsToRestart); + .map(podIdsToRestart); } /** diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java index 28807a665b8..8330f130e5a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java @@ -411,9 +411,9 @@ protected Future pvcs(KafkaStatus kafkaStatus) { List pvcs = zk.generatePersistentVolumeClaims(); return new PvcReconciler(reconciliation, pvcOperator, storageClassOperator) - .resizeAndReconcilePvcs(kafkaStatus, podIndex -> KafkaResources.zookeeperPodName(reconciliation.name(), podIndex), pvcs) - .compose(podsToRestart -> { - fsResizingRestartRequest.addAll(podsToRestart); + .resizeAndReconcilePvcs(kafkaStatus, pvcs) + .compose(podIdsToRestart -> { + fsResizingRestartRequest.addAll(podIdsToRestart.stream().map(podId -> KafkaResources.zookeeperPodName(reconciliation.name(), podId)).collect(Collectors.toSet())); return Future.succeededFuture(); }); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterWithKRaftTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterWithKRaftTest.java index c6db49a467d..cbbcabf4b6e 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterWithKRaftTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterWithKRaftTest.java @@ -687,4 +687,24 @@ public void testCustomKRaftMetadataVersion() { assertThat(kc.getMetadataVersion(), is("3.5-IV1")); } + + @Test + public void testNodePoolForNodeId() { + KafkaCluster kc = KafkaCluster.fromCrd( + Reconciliation.DUMMY_RECONCILIATION, + KAFKA, + List.of(KAFKA_POOL_CONTROLLERS, KAFKA_POOL_BROKERS), + VERSIONS, + new KafkaVersionChange(VERSIONS.defaultVersion(), VERSIONS.defaultVersion(), null, null, "3.5-IV1"), + KafkaMetadataConfigurationState.KRAFT, + null, + SHARED_ENV_PROVIDER); + + // Existing node + assertThat(kc.nodePoolForNodeId(1001), is(KAFKA_POOL_BROKERS)); + + // Non-existing node + KafkaCluster.NodePoolNotFoundException e = assertThrows(KafkaCluster.NodePoolNotFoundException.class, () -> kc.nodePoolForNodeId(1874)); + assertThat(e.getMessage(), is("Node ID 1874 does not belong to any known node pool!")); + } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorPodSetTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorPodSetTest.java index ec0f7dbb4da..4692efb89f2 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorPodSetTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorPodSetTest.java @@ -4,6 +4,8 @@ */ package io.strimzi.operator.cluster.operator.assembly; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimConditionBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder; @@ -13,6 +15,7 @@ import io.strimzi.api.kafka.model.kafka.KafkaList; import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.api.kafka.model.kafka.KafkaStatus; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.kafka.Storage; import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; @@ -43,6 +46,7 @@ import io.strimzi.operator.cluster.operator.resource.kubernetes.ConfigMapOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.CrdOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator; +import io.strimzi.operator.cluster.operator.resource.kubernetes.PvcOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.StatefulSetOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.StrimziPodSetOperator; @@ -88,7 +92,7 @@ import static org.mockito.Mockito.when; @ExtendWith(VertxExtension.class) -@SuppressWarnings("checkstyle:ClassFanOutComplexity") +@SuppressWarnings({"checkstyle:ClassDataAbstractionCoupling", "checkstyle:ClassFanOutComplexity"}) public class KafkaAssemblyOperatorPodSetTest { private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); private static final SharedEnvironmentProvider SHARED_ENV_PROVIDER = new MockSharedEnvironmentProvider(); @@ -118,13 +122,15 @@ public class KafkaAssemblyOperatorPodSetTest { .withType(KafkaListenerType.INTERNAL) .withTls(false) .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() + .withNewJbodStorage() + .withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()) + .endJbodStorage() .endKafka() .withNewZookeeper() .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() + .withNewPersistentClaimStorage() + .withSize("100Gi") + .endPersistentClaimStorage() .endZookeeper() .endSpec() .build(); @@ -209,6 +215,10 @@ public void testRegularReconciliation(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Zoo when(mockPodSetOps.getAsync(any(), eq(zkCluster.getComponentName()))).thenReturn(Future.succeededFuture(zkPodSet)); @@ -320,6 +330,10 @@ public void testFirstReconciliation(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Zoo when(mockPodSetOps.getAsync(any(), eq(zkCluster.getComponentName()))).thenReturn(Future.succeededFuture(null)); // The PodSet does not exist yet in the first reconciliation @@ -451,6 +465,10 @@ public void testReconciliationWithRoll(VertxTestContext context) { when(mockCmOps.reconcile(any(), any(), startsWith("my-cluster-kafka-"), any())).thenReturn(Future.succeededFuture()); when(mockCmOps.deleteAsync(any(), any(), eq("my-cluster-kafka-config"), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; when(mockPodSetOps.getAsync(any(), eq(newZkCluster.getComponentName()))).thenReturn(Future.succeededFuture(oldZkPodSet)); when(mockPodSetOps.reconcile(any(), any(), eq(newZkCluster.getComponentName()), any())).thenAnswer(i -> Future.succeededFuture(ReconcileResult.noop(i.getArgument(3)))); @@ -571,6 +589,10 @@ public void testScaleUp(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka when(mockPodSetOps.getAsync(any(), eq(zkCluster.getComponentName()))).thenReturn(Future.succeededFuture(oldZkPodSet)); @@ -710,6 +732,10 @@ public void testScaleDown(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Zoo when(mockPodSetOps.getAsync(any(), eq(zkCluster.getComponentName()))).thenReturn(Future.succeededFuture(oldZkPodSet)); @@ -852,6 +878,10 @@ public void testScaleDownWithEmptyBrokersWithBrokerScaleDownCheckEnabled(VertxTe ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka @@ -918,6 +948,149 @@ public void testScaleDownWithEmptyBrokersWithBrokerScaleDownCheckEnabled(VertxTe }))); } + /** + * Tests that the Pod that needs a restart to finish PVC resizing is rolled + * + * @param context Test context + */ + @Test + public void testRollDueToPersistentVolumeResizing(VertxTestContext context) { + ZookeeperCluster zkCluster = ZookeeperCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, KAFKA, VERSIONS, SHARED_ENV_PROVIDER); + StrimziPodSet zkPodSet = zkCluster.generatePodSet(KAFKA.getSpec().getZookeeper().getReplicas(), false, null, null, podNum -> null); + StrimziPodSet kafkaPodSet = KAFKA_CLUSTER.generatePodSets(false, null, null, brokerId -> null).get(0); + + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + when(secretOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + when(secretOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + + ConfigMapOperator mockCmOps = supplier.configMapOperations; + when(mockCmOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(List.of())); + ArgumentCaptor cmReconciliationCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.reconcile(any(), any(), cmReconciliationCaptor.capture(), any())).thenReturn(Future.succeededFuture()); + ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.getAsync(any(), eq("data-my-cluster-zookeeper-2"))).thenReturn(Future.succeededFuture( + new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withNamespace(NAMESPACE) + .withName("data-my-cluster-zookeeper-2") + .endMetadata() + .withNewSpec() + .endSpec() + .withNewStatus() + .withPhase("Bound") + .withConditions(new PersistentVolumeClaimConditionBuilder().withType("FileSystemResizePending").withStatus("True").build()) + .endStatus() + .build() + )); + when(mockPvcOps.getAsync(any(), eq("data-0-my-cluster-kafka-1"))).thenReturn(Future.succeededFuture( + new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withNamespace(NAMESPACE) + .withName("data-0-my-cluster-kafka-1") + .endMetadata() + .withNewSpec() + .endSpec() + .withNewStatus() + .withPhase("Bound") + .withConditions(new PersistentVolumeClaimConditionBuilder().withType("FileSystemResizePending").withStatus("True").build()) + .endStatus() + .build() + )); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; + // Zoo + when(mockPodSetOps.getAsync(any(), eq(zkCluster.getComponentName()))).thenReturn(Future.succeededFuture(zkPodSet)); + when(mockPodSetOps.reconcile(any(), any(), eq(zkCluster.getComponentName()), any())).thenReturn(Future.succeededFuture(ReconcileResult.noop(zkPodSet))); + // Kafka + when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(List.of(kafkaPodSet))); + when(mockPodSetOps.batchReconcile(any(), any(), any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenAnswer(i -> { + List podSets = i.getArgument(2); + HashMap> result = new HashMap<>(); + + for (StrimziPodSet podSet : podSets) { + result.put(podSet.getMetadata().getName(), ReconcileResult.noop(kafkaPodSet)); + } + + return Future.succeededFuture(result); + }); + + StatefulSetOperator mockStsOps = supplier.stsOperations; + when(mockStsOps.getAsync(any(), eq(zkCluster.getComponentName()))).thenReturn(Future.succeededFuture(null)); // Zoo STS is queried and deleted if it still exists + when(mockStsOps.getAsync(any(), eq(KAFKA_CLUSTER.getComponentName()))).thenReturn(Future.succeededFuture(null)); // Kafka STS is queried and deleted if it still exists + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), eq(zkCluster.getSelectorLabels()))).thenReturn(Future.succeededFuture(Collections.emptyList())); + when(mockPodOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(Collections.emptyList())); + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(Collections.emptyList())); + + CrdOperator mockKafkaOps = supplier.kafkaOperator; + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); + when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); + when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); + + ClusterOperatorConfig config = ResourceUtils.dummyClusterOperatorConfig(VERSIONS); + + MockZooKeeperReconciler zr = new MockZooKeeperReconciler( + new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), + vertx, + config, + supplier, + new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + KAFKA, + VERSION_CHANGE, + null, + 0, + CLUSTER_CA); + + MockKafkaReconciler kr = new MockKafkaReconciler( + new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), + vertx, + config, + supplier, + new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + KAFKA, + KAFKA_CLUSTER, + CLUSTER_CA, + CLIENTS_CA); + + MockKafkaAssemblyOperator kao = new MockKafkaAssemblyOperator( + vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + CERT_MANAGER, + PASSWORD_GENERATOR, + supplier, + config, + zr, + kr); + + Checkpoint async = context.checkpoint(); + kao.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + assertThat(zr.maybeRollZooKeeperInvocations, is(1)); + assertThat(zr.zooPodNeedsRestart.apply(podFromPodSet(zkPodSet, "my-cluster-zookeeper-0")), empty()); + assertThat(zr.zooPodNeedsRestart.apply(podFromPodSet(zkPodSet, "my-cluster-zookeeper-1")), empty()); + assertThat(zr.zooPodNeedsRestart.apply(podFromPodSet(zkPodSet, "my-cluster-zookeeper-2")), is(List.of("File system needs to be resized"))); + + assertThat(kr.maybeRollKafkaInvocations, is(1)); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSet, "my-cluster-kafka-0")), is(RestartReasons.empty())); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSet, "my-cluster-kafka-1")), is(RestartReasons.of(RestartReason.FILE_SYSTEM_RESIZE_NEEDED))); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSet, "my-cluster-kafka-2")), is(RestartReasons.empty())); + + assertThat(cmReconciliationCaptor.getAllValues().size(), is(4)); + assertThat(cmReconciliationCaptor.getAllValues(), is(List.of("my-cluster-kafka-0", "my-cluster-kafka-1", "my-cluster-kafka-2", "my-cluster-kafka-config"))); + + assertThat(cmDeletionCaptor.getAllValues().size(), is(0)); + + async.flag(); + }))); + } + // Internal utility methods private Pod podFromPodSet(StrimziPodSet podSet, String name) { return PodSetUtils.podSetToPods(podSet).stream().filter(p -> name.equals(p.getMetadata().getName())).findFirst().orElse(null); @@ -975,6 +1148,7 @@ public MockZooKeeperReconciler(Reconciliation reconciliation, Vertx vertx, Clust public Future reconcile(KafkaStatus kafkaStatus, Clock clock) { return manualPodCleaning() .compose(i -> manualRollingUpdate()) + .compose(i -> pvcs(kafkaStatus)) .compose(i -> migrateFromStatefulSetToPodSet()) .compose(i -> podSet()) .compose(i -> scaleDown()) @@ -1002,6 +1176,7 @@ public MockKafkaReconciler(Reconciliation reconciliation, Vertx vertx, ClusterOp public Future reconcile(KafkaStatus kafkaStatus, Clock clock) { return manualPodCleaning() .compose(i -> manualRollingUpdate()) + .compose(i -> pvcs(kafkaStatus)) .compose(i -> scaleDown()) .compose(i -> listeners()) .compose(i -> brokerConfigurationConfigMaps()) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java index a2311124142..9484a7252d1 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaAssemblyOperatorWithKRaftTest.java @@ -4,6 +4,8 @@ */ package io.strimzi.operator.cluster.operator.assembly; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimConditionBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Quantity; import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; @@ -49,6 +51,7 @@ import io.strimzi.operator.cluster.operator.resource.kubernetes.ConfigMapOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.CrdOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.PodOperator; +import io.strimzi.operator.cluster.operator.resource.kubernetes.PvcOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.StatefulSetOperator; import io.strimzi.operator.cluster.operator.resource.kubernetes.StrimziPodSetOperator; @@ -249,6 +252,10 @@ public void testRegularReconciliation(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(kafkaPodSets)); @@ -356,6 +363,10 @@ public void testFirstReconciliation(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(List.of())); @@ -458,6 +469,10 @@ public void testReconciliationWithRollDueToImageChange(VertxTestContext context) when(mockCmOps.reconcile(any(), any(), startsWith("my-cluster-"), any())).thenReturn(Future.succeededFuture()); when(mockCmOps.deleteAsync(any(), any(), eq("my-cluster-kafka-config"), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaPodSets)); when(mockPodSetOps.batchReconcile(any(), any(), any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenAnswer(i -> { @@ -553,6 +568,10 @@ public void testScaleUp(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaPodSets)); @@ -674,6 +693,10 @@ public void testScaleDown(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaPodSets)); @@ -809,6 +832,10 @@ public void testNewPool(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(kafkaPodSets)); @@ -952,6 +979,10 @@ public void testRemovePool(VertxTestContext context) { ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; // Kafka when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(oldKafkaPodSets)); @@ -1134,6 +1165,113 @@ vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), }))); } + /** + * Tests that the Pod that needs a restart to finish PVC resizing is rolled + * + * @param context Test context + */ + @Test + public void testRollDueToPersistentVolumeResizing(VertxTestContext context) { + List kafkaPodSets = KAFKA_CLUSTER.generatePodSets(false, null, null, brokerId -> null); + + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + SecretOperator secretOps = supplier.secretOperations; + when(secretOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + when(secretOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + + ConfigMapOperator mockCmOps = supplier.configMapOperations; + when(mockCmOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(List.of())); + ArgumentCaptor cmReconciliationCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.reconcile(any(), any(), cmReconciliationCaptor.capture(), any())).thenReturn(Future.succeededFuture()); + ArgumentCaptor cmDeletionCaptor = ArgumentCaptor.forClass(String.class); + when(mockCmOps.deleteAsync(any(), any(), cmDeletionCaptor.capture(), anyBoolean())).thenReturn(Future.succeededFuture()); + + PvcOperator mockPvcOps = supplier.pvcOperations; + when(mockPvcOps.getAsync(any(), any())).thenReturn(Future.succeededFuture()); + when(mockPvcOps.getAsync(any(), eq("data-0-my-cluster-brokers-4"))).thenReturn(Future.succeededFuture( + new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withNamespace(NAMESPACE) + .withName("data-0-my-cluster-brokers-4") + .endMetadata() + .withNewSpec() + .endSpec() + .withNewStatus() + .withPhase("Bound") + .withConditions(new PersistentVolumeClaimConditionBuilder().withType("FileSystemResizePending").withStatus("True").build()) + .endStatus() + .build() + )); + when(mockPvcOps.reconcile(any(), any(), any(), any())).thenReturn(Future.succeededFuture()); + + StrimziPodSetOperator mockPodSetOps = supplier.strimziPodSetOperator; + // Kafka + when(mockPodSetOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(kafkaPodSets)); + when(mockPodSetOps.batchReconcile(any(), any(), any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenAnswer(i -> { + List podSets = i.getArgument(2); + HashMap> result = new HashMap<>(); + + for (StrimziPodSet podSet : podSets) { + StrimziPodSet patched = kafkaPodSets.stream().filter(sps -> podSet.getMetadata().getName().equals(sps.getMetadata().getName())).findFirst().orElse(null); + result.put(podSet.getMetadata().getName(), patched == null ? ReconcileResult.created(podSet) : ReconcileResult.noop(patched)); + } + + return Future.succeededFuture(result); + }); + + StatefulSetOperator mockStsOps = supplier.stsOperations; + when(mockStsOps.getAsync(any(), eq(KAFKA_CLUSTER.getComponentName()))).thenReturn(Future.succeededFuture(null)); // Kafka STS is queried and deleted if it still exists + + PodOperator mockPodOps = supplier.podOperations; + when(mockPodOps.listAsync(any(), eq(KAFKA_CLUSTER.getSelectorLabels()))).thenReturn(Future.succeededFuture(Collections.emptyList())); + when(mockPodOps.listAsync(any(), any(Labels.class))).thenReturn(Future.succeededFuture(Collections.emptyList())); + + CrdOperator mockKafkaOps = supplier.kafkaOperator; + when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(Future.succeededFuture(KAFKA)); + when(mockKafkaOps.get(eq(NAMESPACE), eq(CLUSTER_NAME))).thenReturn(KAFKA); + when(mockKafkaOps.updateStatusAsync(any(), any())).thenReturn(Future.succeededFuture()); + + CrdOperator mockKafkaNodePoolOps = supplier.kafkaNodePoolOperator; + ArgumentCaptor kafkaNodePoolStatusCaptor = ArgumentCaptor.forClass(KafkaNodePool.class); + when(mockKafkaNodePoolOps.updateStatusAsync(any(), kafkaNodePoolStatusCaptor.capture())).thenReturn(Future.succeededFuture()); + + MockKafkaReconciler kr = new MockKafkaReconciler( + new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), + vertx, + CONFIG, + supplier, + new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + KAFKA, + List.of(CONTROLLERS, BROKERS), + KAFKA_CLUSTER, + CLUSTER_CA, + CLIENTS_CA); + + MockKafkaAssemblyOperator kao = new MockKafkaAssemblyOperator( + vertx, new PlatformFeaturesAvailability(false, KUBERNETES_VERSION), + CERT_MANAGER, + PASSWORD_GENERATOR, + supplier, + CONFIG, + kr); + + Checkpoint async = context.checkpoint(); + kao.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME)) + .onComplete(context.succeeding(v -> context.verify(() -> { + assertThat(kr.maybeRollKafkaInvocations, is(1)); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSets.get(0), "my-cluster-controllers-0")), is(RestartReasons.empty())); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSets.get(0), "my-cluster-controllers-1")), is(RestartReasons.empty())); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSets.get(0), "my-cluster-controllers-2")), is(RestartReasons.empty())); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSets.get(1), "my-cluster-brokers-3")), is(RestartReasons.empty())); + // Only the pod that needs volume resizing is rolled + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSets.get(1), "my-cluster-brokers-4")), is(RestartReasons.of(RestartReason.FILE_SYSTEM_RESIZE_NEEDED))); + assertThat(kr.kafkaPodNeedsRestart.apply(podFromPodSet(kafkaPodSets.get(1), "my-cluster-brokers-5")), is(RestartReasons.empty())); + + async.flag(); + }))); + } + // Internal utility methods private Pod podFromPodSet(StrimziPodSet podSet, String name) { return PodSetUtils.podSetToPods(podSet).stream().filter(p -> name.equals(p.getMetadata().getName())).findFirst().orElse(null); @@ -1192,6 +1330,7 @@ public MockKafkaReconciler(Reconciliation reconciliation, Vertx vertx, ClusterOp public Future reconcile(KafkaStatus kafkaStatus, Clock clock) { return manualPodCleaning() .compose(i -> manualRollingUpdate()) + .compose(i -> pvcs(kafkaStatus)) .compose(i -> scaleDown()) .compose(i -> updateNodePoolStatuses(kafkaStatus)) .compose(i -> listeners()) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/PvcReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/PvcReconcilerTest.java index 034b706ed58..62f99afe580 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/PvcReconcilerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/PvcReconcilerTest.java @@ -89,7 +89,7 @@ public void testNoExistingVolumes(VertxTestContext context) { ); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(new KafkaStatus(), i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(new KafkaStatus(), pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); @@ -135,7 +135,7 @@ public void testNotBoundVolumes(VertxTestContext context) { ); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(new KafkaStatus(), i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(new KafkaStatus(), pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); @@ -199,7 +199,7 @@ public void testVolumesBoundExpandableStorageClass(VertxTestContext context) { ); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(new KafkaStatus(), i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(new KafkaStatus(), pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); @@ -264,7 +264,7 @@ public void testVolumesBoundExpandableStorageClassWithInvalidSize(VertxTestConte ); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(new KafkaStatus(), i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(new KafkaStatus(), pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(false)); assertThat(res.cause(), is(instanceOf(IllegalArgumentException.class))); @@ -327,7 +327,7 @@ public void testVolumesBoundNonExpandableStorageClass(VertxTestContext context) KafkaStatus kafkaStatus = new KafkaStatus(); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(kafkaStatus, i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(kafkaStatus, pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); @@ -399,7 +399,7 @@ public void testVolumesBoundMissingStorageClass(VertxTestContext context) { KafkaStatus kafkaStatus = new KafkaStatus(); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(kafkaStatus, i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(kafkaStatus, pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); @@ -468,7 +468,7 @@ public void testVolumesBoundWithoutStorageClass(VertxTestContext context) { KafkaStatus kafkaStatus = new KafkaStatus(); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(kafkaStatus, i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(kafkaStatus, pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); @@ -536,7 +536,7 @@ public void testVolumesResizing(VertxTestContext context) { ); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(new KafkaStatus(), i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(new KafkaStatus(), pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); @@ -598,12 +598,12 @@ public void testVolumesWaitingForRestart(VertxTestContext context) { ); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(new KafkaStatus(), i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(new KafkaStatus(), pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true)); assertThat(res.result().size(), is(3)); - assertThat(res.result(), is(Set.of("pod-0", "pod-1", "pod-2"))); + assertThat(res.result(), is(Set.of(0, 1, 2))); assertThat(pvcCaptor.getAllValues().size(), is(0)); @@ -657,7 +657,7 @@ public void testVolumesResized(VertxTestContext context) { ); Checkpoint async = context.checkpoint(); - reconciler.resizeAndReconcilePvcs(new KafkaStatus(), i -> "pod-" + i, pvcs) + reconciler.resizeAndReconcilePvcs(new KafkaStatus(), pvcs) .onComplete(res -> { assertThat(res.succeeded(), is(true));