Skip to content

Commit

Permalink
Restart the correct Pod when resizing PVCs (strimzi#10396)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj authored Jul 30, 2024
1 parent a6531bc commit 0407c52
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public KafkaPool nodePoolForNodeId(int nodeId) {
}
}

throw new RuntimeException("Node ID " + nodeId + " does not belong to any known node pool!");
throw new NodePoolNotFoundException("Node ID " + nodeId + " does not belong to any known node pool!");
}

/**
Expand Down Expand Up @@ -2000,4 +2000,18 @@ private Labels brokersSelector() {
return labels.strimziSelectorLabels();
}
}

/**
* Exception used to indicate that a matching Node Pool was not found
*/
public static final class NodePoolNotFoundException extends RuntimeException {
/**
* Creates new exception
*
* @param message Error message
*/
public NodePoolNotFoundException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,19 @@ protected Future<Void> pvcs(KafkaStatus kafkaStatus) {
List<PersistentVolumeClaim> pvcs = kafka.generatePersistentVolumeClaims();

return new PvcReconciler(reconciliation, pvcOperator, storageClassOperator)
.resizeAndReconcilePvcs(kafkaStatus, podIndex -> KafkaResources.kafkaPodName(reconciliation.name(), podIndex), pvcs)
.compose(podsToRestart -> {
fsResizingRestartRequest.addAll(podsToRestart);
.resizeAndReconcilePvcs(kafkaStatus, pvcs)
.compose(podIdsToRestart -> {
for (Integer podId : podIdsToRestart) {
try {
fsResizingRestartRequest.add(kafka.nodePoolForNodeId(podId).nodeRef(podId).podName());
} catch (KafkaCluster.NodePoolNotFoundException e) {
// We might have triggered some resizing on a PVC not belonging to this cluster anymore.
// This could happen for example with old PVCs from removed nodes. We will ignore it with
// a warning.
LOGGER.warnCr(reconciliation, "Node with ID {} does not seem to belong to this Kafka cluster and cannot be marked for restart due to storage resizing", podId);
}
}

return Future.succeededFuture();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.function.Function;

/**
* This class reconciles the PVCs for the Kafka and ZooKeeper clusters. It has two public methods:
Expand Down Expand Up @@ -50,18 +49,17 @@ public PvcReconciler(Reconciliation reconciliation, PvcOperator pvcOperator, Sto

/**
* Resizes and reconciles the PVCs based on the model and list of PVCs passed to it. It will return a future with
* collection containing a list of pods which need restart to complete the filesystem resizing. The PVCs are only
* created or updated. This method does not delete any PVCs. This is done by a separate method which should be
* collection containing a list of IDs of nodes which need restart to complete the filesystem resizing. The PVCs are
* only created or updated. This method does not delete any PVCs. This is done by a separate method which should be
* called separately at the end of the reconciliation.
*
* @param kafkaStatus Status of the Kafka custom resource where warnings about any issues with resizing will be added
* @param podNameProvider Function to generate a pod name from its index
* @param pvcs List of desired PVC used by this controller
* @param kafkaStatus Status of the Kafka custom resource where warnings about any issues with resizing will be added
* @param pvcs List of desired PVC used by this controller
*
* @return Future with list of pod names which should be restarted to complete the filesystem resizing
* @return Future with set of node IDs which should be restarted to complete the filesystem resizing
*/
public Future<Collection<String>> resizeAndReconcilePvcs(KafkaStatus kafkaStatus, Function<Integer, String> podNameProvider, List<PersistentVolumeClaim> pvcs) {
Set<String> podsToRestart = new HashSet<>();
public Future<Collection<Integer>> resizeAndReconcilePvcs(KafkaStatus kafkaStatus, List<PersistentVolumeClaim> pvcs) {
Set<Integer> podIdsToRestart = new HashSet<>();
List<Future<Void>> futures = new ArrayList<>(pvcs.size());

for (PersistentVolumeClaim desiredPvc : pvcs) {
Expand All @@ -79,9 +77,8 @@ public Future<Collection<String>> resizeAndReconcilePvcs(KafkaStatus kafkaStatus
return Future.succeededFuture();
} else if (currentPvc.getStatus().getConditions().stream().anyMatch(cond -> "FileSystemResizePending".equals(cond.getType()) && "true".equals(cond.getStatus().toLowerCase(Locale.ENGLISH)))) {
// The PVC is Bound and resized but waiting for FS resizing => We need to restart the pod which is using it
String podName = podNameProvider.apply(getPodIndexFromPvcName(desiredPvc.getMetadata().getName()));
podsToRestart.add(podName);
LOGGER.infoCr(reconciliation, "The PVC {} is waiting for file system resizing and the pod {} needs to be restarted.", desiredPvc.getMetadata().getName(), podName);
podIdsToRestart.add(getPodIndexFromPvcName(desiredPvc.getMetadata().getName()));
LOGGER.infoCr(reconciliation, "The PVC {} is waiting for file system resizing and the pod using it might need to be restarted.", desiredPvc.getMetadata().getName());
return Future.succeededFuture();
} else {
// The PVC is Bound and resizing is not in progress => We should check if the SC supports resizing and check if size changed
Expand All @@ -103,7 +100,7 @@ public Future<Collection<String>> resizeAndReconcilePvcs(KafkaStatus kafkaStatus
}

return Future.all(futures)
.map(podsToRestart);
.map(podIdsToRestart);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,9 @@ protected Future<Void> pvcs(KafkaStatus kafkaStatus) {
List<PersistentVolumeClaim> pvcs = zk.generatePersistentVolumeClaims();

return new PvcReconciler(reconciliation, pvcOperator, storageClassOperator)
.resizeAndReconcilePvcs(kafkaStatus, podIndex -> KafkaResources.zookeeperPodName(reconciliation.name(), podIndex), pvcs)
.compose(podsToRestart -> {
fsResizingRestartRequest.addAll(podsToRestart);
.resizeAndReconcilePvcs(kafkaStatus, pvcs)
.compose(podIdsToRestart -> {
fsResizingRestartRequest.addAll(podIdsToRestart.stream().map(podId -> KafkaResources.zookeeperPodName(reconciliation.name(), podId)).collect(Collectors.toSet()));
return Future.succeededFuture();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,4 +687,24 @@ public void testCustomKRaftMetadataVersion() {

assertThat(kc.getMetadataVersion(), is("3.5-IV1"));
}

@Test
public void testNodePoolForNodeId() {
KafkaCluster kc = KafkaCluster.fromCrd(
Reconciliation.DUMMY_RECONCILIATION,
KAFKA,
List.of(KAFKA_POOL_CONTROLLERS, KAFKA_POOL_BROKERS),
VERSIONS,
new KafkaVersionChange(VERSIONS.defaultVersion(), VERSIONS.defaultVersion(), null, null, "3.5-IV1"),
KafkaMetadataConfigurationState.KRAFT,
null,
SHARED_ENV_PROVIDER);

// Existing node
assertThat(kc.nodePoolForNodeId(1001), is(KAFKA_POOL_BROKERS));

// Non-existing node
KafkaCluster.NodePoolNotFoundException e = assertThrows(KafkaCluster.NodePoolNotFoundException.class, () -> kc.nodePoolForNodeId(1874));
assertThat(e.getMessage(), is("Node ID 1874 does not belong to any known node pool!"));
}
}
Loading

0 comments on commit 0407c52

Please sign in to comment.