From 85b6bb1e858112b0d07e2e25be820008f21bb50a Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Wed, 24 Jul 2024 15:20:54 +0200 Subject: [PATCH] ST: Check that messages are not available in local storage when tiered storage is used (#10351) Signed-off-by: Jakub Stejskal --- .../io/strimzi/systemtest/Environment.java | 1 + .../internalClients/admin/AdminClient.java | 21 +++++ .../resources/minio/SetupMinio.java | 52 ----------- .../systemtest/utils/AdminClientUtils.java | 17 ++++ .../systemtest/utils/specific/MinioUtils.java | 90 +++++++++++++++++++ .../systemtest/kafka/TieredStorageST.java | 56 +++++++++++- 6 files changed, 184 insertions(+), 53 deletions(-) create mode 100644 systemtest/src/main/java/io/strimzi/systemtest/utils/specific/MinioUtils.java diff --git a/systemtest/src/main/java/io/strimzi/systemtest/Environment.java b/systemtest/src/main/java/io/strimzi/systemtest/Environment.java index caceb9eb9be..f374e3e9bfa 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/Environment.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/Environment.java @@ -209,6 +209,7 @@ public class Environment { private static final String ST_KAFKA_VERSION_DEFAULT = TestKafkaVersion.getDefaultSupportedKafkaVersion(); private static final String ST_CLIENTS_KAFKA_VERSION_DEFAULT = "3.7.0"; + // TODO - remove overridden version from TieredStorageST - see TODO in the class public static final String TEST_CLIENTS_VERSION_DEFAULT = "0.8.1"; public static final String ST_FILE_PLUGIN_URL_DEFAULT = "https://repo1.maven.org/maven2/org/apache/kafka/connect-file/" + ST_KAFKA_VERSION_DEFAULT + "/connect-file-" + ST_KAFKA_VERSION_DEFAULT + ".jar"; public static final String OLM_OPERATOR_VERSION_DEFAULT = "0.42.0"; diff --git a/systemtest/src/main/java/io/strimzi/systemtest/kafkaclients/internalClients/admin/AdminClient.java b/systemtest/src/main/java/io/strimzi/systemtest/kafkaclients/internalClients/admin/AdminClient.java index 6d2bfc3df20..aff7959e4c0 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/kafkaclients/internalClients/admin/AdminClient.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/kafkaclients/internalClients/admin/AdminClient.java @@ -121,6 +121,17 @@ public void configureFromEnv() { cmdKubeClient(namespaceName).execInPod(podName, CMD, "configure", "common", "--from-env"); } + public String fetchOffsets(String topicName, String time) { + AdminTopicCommand adminTopicCommand = new AdminTopicCommand() + .withFetchOffsetsSubCommand() + .withTopicName(topicName) + .withTime(time) + .withOutputJson(); + + ExecResult result = cmdKubeClient(namespaceName).execInPod(podName, false, adminTopicCommand.getCommand()); + return result.returnCode() == 0 ? result.out() : result.err(); + } + static class AdminTopicCommand { private final static String TOPIC_SUBCOMMAND = "topic"; private List command = new ArrayList<>(List.of(CMD, TOPIC_SUBCOMMAND)); @@ -155,6 +166,11 @@ public AdminTopicCommand withListSubCommand() { return this; } + public AdminTopicCommand withFetchOffsetsSubCommand() { + this.command.add("fetch-offsets"); + return this; + } + public AdminTopicCommand withTopicPrefix(String topicPrefix) { this.command.addAll(List.of("-tpref", topicPrefix)); return this; @@ -190,6 +206,11 @@ public AdminTopicCommand withAll() { return this; } + public AdminTopicCommand withTime(String time) { + this.command.addAll(List.of("--time", time)); + return this; + } + public String[] getCommand() { return this.command.toArray(new String[0]); } diff --git a/systemtest/src/main/java/io/strimzi/systemtest/resources/minio/SetupMinio.java b/systemtest/src/main/java/io/strimzi/systemtest/resources/minio/SetupMinio.java index 9ea57e1a21a..19c39bbde8f 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/resources/minio/SetupMinio.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/resources/minio/SetupMinio.java @@ -16,13 +16,10 @@ import io.strimzi.systemtest.enums.DeploymentTypes; import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.resources.kubernetes.NetworkPolicyResource; -import io.strimzi.test.TestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public class SetupMinio { private static final Logger LOGGER = LogManager.getLogger(SetupMinio.class); @@ -133,53 +130,4 @@ public static void createBucket(String namespace, String bucketName) { "mb", MINIO_STORAGE_ALIAS + "/" + bucketName); } - - /** - * Collect data from Minio about usage of a specific bucket - * @param namespace - * @param bucketName - * @return Overall statistics about the bucket in String format - */ - public static String getBucketSizeInfo(String namespace, String bucketName) { - final String minioPod = ResourceManager.kubeClient().listPods(namespace, Map.of(TestConstants.APP_POD_LABEL, MINIO)).get(0).getMetadata().getName(); - - return ResourceManager.cmdKubeClient().namespace(namespace).execInPod(minioPod, - "mc", - "stat", - "local/" + bucketName).out(); - - } - - /** - * Parse out total size of bucket from the information about usage. - * @param bucketInfo String containing all stat info about bucket - * @return Map consists of parsed size and it's unit - */ - private static Map parseTotalSize(String bucketInfo) { - Pattern pattern = Pattern.compile("Total size:\\s*(?[\\d.]+)\\s*(?.*)"); - Matcher matcher = pattern.matcher(bucketInfo); - - if (matcher.find()) { - return Map.of("size", Double.parseDouble(matcher.group("size")), "unit", matcher.group("unit")); - } else { - throw new IllegalArgumentException("Total size not found in the provided string"); - } - } - - /** - * Wait until size of the bucket is not 0 B. - * @param namespace Minio location - * @param bucketName bucket name - */ - public static void waitForDataInMinio(String namespace, String bucketName) { - TestUtils.waitFor("data sync from Kafka to Minio", TestConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestConstants.GLOBAL_TIMEOUT_LONG, () -> { - String bucketSizeInfo = SetupMinio.getBucketSizeInfo(namespace, bucketName); - Map parsedSize = parseTotalSize(bucketSizeInfo); - double bucketSize = (Double) parsedSize.get("size"); - LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit")); - LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo); - - return bucketSize > 0; - }); - } } diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/AdminClientUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/AdminClientUtils.java index f9049a56b5e..47d1fa8b97c 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/AdminClientUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/AdminClientUtils.java @@ -4,6 +4,9 @@ */ package io.strimzi.systemtest.utils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.LabelSelector; import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.strimzi.systemtest.TestConstants; @@ -115,4 +118,18 @@ private static LabelSelector getLabelSelector(String adminName) { .withMatchLabels(matchLabels) .build(); } + + public static long getPartitionsOffset(String data, String partition) throws JsonProcessingException { + // Create ObjectMapper instance + ObjectMapper mapper = new ObjectMapper(); + + // Read JSON string as JsonNode + JsonNode rootNode = mapper.readTree(data); + + // Get the node for the partition number + JsonNode partitionNode = rootNode.get(partition); + + // Get the offset value + return partitionNode.get("offset").asLong(); + } } diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/specific/MinioUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/specific/MinioUtils.java new file mode 100644 index 00000000000..480d4c3b028 --- /dev/null +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/specific/MinioUtils.java @@ -0,0 +1,90 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.systemtest.utils.specific; + +import io.strimzi.systemtest.TestConstants; +import io.strimzi.systemtest.resources.ResourceManager; +import io.strimzi.systemtest.resources.minio.SetupMinio; +import io.strimzi.test.TestUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MinioUtils { + private static final Logger LOGGER = LogManager.getLogger(SetupMinio.class); + + private MinioUtils() { + + } + + /** + * Collect data from Minio about usage of a specific bucket + * @param namespace + * @param bucketName + * @return Overall statistics about the bucket in String format + */ + public static String getBucketSizeInfo(String namespace, String bucketName) { + final String minioPod = ResourceManager.kubeClient().listPods(namespace, Map.of(TestConstants.APP_POD_LABEL, SetupMinio.MINIO)).get(0).getMetadata().getName(); + + return ResourceManager.cmdKubeClient().namespace(namespace).execInPod(minioPod, + "mc", + "stat", + "local/" + bucketName).out(); + + } + + /** + * Parse out total size of bucket from the information about usage. + * @param bucketInfo String containing all stat info about bucket + * @return Map consists of parsed size and it's unit + */ + private static Map parseTotalSize(String bucketInfo) { + Pattern pattern = Pattern.compile("Total size:\\s*(?[\\d.]+)\\s*(?.*)"); + Matcher matcher = pattern.matcher(bucketInfo); + + if (matcher.find()) { + return Map.of("size", Double.parseDouble(matcher.group("size")), "unit", matcher.group("unit")); + } else { + throw new IllegalArgumentException("Total size not found in the provided string"); + } + } + + /** + * Wait until size of the bucket is not 0 B. + * @param namespace Minio location + * @param bucketName bucket name + */ + public static void waitForDataInMinio(String namespace, String bucketName) { + TestUtils.waitFor("data sync from Kafka to Minio", TestConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestConstants.GLOBAL_TIMEOUT_LONG, () -> { + String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName); + Map parsedSize = parseTotalSize(bucketSizeInfo); + double bucketSize = (Double) parsedSize.get("size"); + LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit")); + LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo); + + return bucketSize > 0; + }); + } + + /** + * Wait until size of the bucket is 0 B. + * @param namespace Minio location + * @param bucketName bucket name + */ + public static void waitForNoDataInMinio(String namespace, String bucketName) { + TestUtils.waitFor("data deletion in Minio", TestConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestConstants.GLOBAL_TIMEOUT_LONG, () -> { + String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName); + Map parsedSize = parseTotalSize(bucketSizeInfo); + double bucketSize = (Double) parsedSize.get("size"); + LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit")); + LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo); + + return bucketSize == 0; + }); + } +} diff --git a/systemtest/src/test/java/io/strimzi/systemtest/kafka/TieredStorageST.java b/systemtest/src/test/java/io/strimzi/systemtest/kafka/TieredStorageST.java index 6c88c5a209b..a831c2bcf70 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/kafka/TieredStorageST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/kafka/TieredStorageST.java @@ -4,22 +4,31 @@ */ package io.strimzi.systemtest.kafka; +import com.fasterxml.jackson.core.JsonProcessingException; +import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.systemtest.AbstractST; import io.strimzi.systemtest.Environment; +import io.strimzi.systemtest.TestConstants; import io.strimzi.systemtest.annotations.MicroShiftNotSupported; import io.strimzi.systemtest.annotations.ParallelTest; import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients; +import io.strimzi.systemtest.kafkaclients.internalClients.admin.AdminClient; import io.strimzi.systemtest.resources.NamespaceManager; import io.strimzi.systemtest.resources.NodePoolsConverter; import io.strimzi.systemtest.resources.ResourceManager; +import io.strimzi.systemtest.resources.crd.KafkaTopicResource; import io.strimzi.systemtest.resources.imageBuild.ImageBuild; import io.strimzi.systemtest.resources.minio.SetupMinio; import io.strimzi.systemtest.storage.TestStorage; import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates; import io.strimzi.systemtest.templates.crd.KafkaTemplates; import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates; +import io.strimzi.systemtest.templates.specific.AdminClientTemplates; +import io.strimzi.systemtest.utils.AdminClientUtils; import io.strimzi.systemtest.utils.ClientUtils; +import io.strimzi.systemtest.utils.specific.MinioUtils; import io.strimzi.test.TestUtils; +import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.BeforeAll; @@ -137,11 +146,56 @@ void testTieredStorageWithAivenPlugin() { resourceManager.createResourceWithWait(clients.producerStrimzi()); - SetupMinio.waitForDataInMinio(suiteStorage.getNamespaceName(), BUCKET_NAME); + MinioUtils.waitForDataInMinio(suiteStorage.getNamespaceName(), BUCKET_NAME); + + // Create admin-client to check offsets + resourceManager.createResourceWithWait( + AdminClientTemplates.plainAdminClient( + testStorage.getNamespaceName(), + testStorage.getAdminName(), + KafkaResources.plainBootstrapAddress(testStorage.getClusterName()) + ) + .editSpec() + .editOrNewTemplate() + .editSpec() + .editFirstContainer() + // TODO - remove this when new version of clients will be available + .withImage("quay.io/strimzi-test-clients/test-clients:latest-kafka-" + Environment.ST_KAFKA_VERSION) + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .build() + ); + final AdminClient adminClient = AdminClientUtils.getConfiguredAdminClient(testStorage.getNamespaceName(), testStorage.getAdminName()); + + TestUtils.waitFor("earliest-local offset to be higher than 0", + TestConstants.GLOBAL_POLL_INTERVAL_5_SECS, TestConstants.GLOBAL_TIMEOUT_LONG, + () -> { + // Fetch earliest-local offsets + // Check that data are not present locally, earliest-local offset should be higher than 0 + String offsetData = adminClient.fetchOffsets(testStorage.getTopicName(), String.valueOf(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)); + long earliestLocalOffset = 0; + try { + earliestLocalOffset = AdminClientUtils.getPartitionsOffset(offsetData, "0"); + LOGGER.info("earliest-local offset for topic {} is {}", testStorage.getTopicName(), earliestLocalOffset); + } catch (JsonProcessingException e) { + return false; + } + return earliestLocalOffset > 0; + }); + ClientUtils.waitForInstantProducerClientSuccess(testStorage); resourceManager.createResourceWithWait(clients.consumerStrimzi()); ClientUtils.waitForInstantConsumerClientSuccess(testStorage); + + // Delete data + KafkaTopicResource.replaceTopicResourceInSpecificNamespace( + testStorage.getTopicName(), + topic -> topic.getSpec().getConfig().put("retention.ms", 10000), testStorage.getNamespaceName()); + + MinioUtils.waitForNoDataInMinio(suiteStorage.getNamespaceName(), BUCKET_NAME); } @BeforeAll