Skip to content

Commit

Permalink
ST: Check that messages are not available in local storage when tiere…
Browse files Browse the repository at this point in the history
…d storage is used (strimzi#10351)

Signed-off-by: Jakub Stejskal <xstejs24@gmail.com>
  • Loading branch information
Frawless authored Jul 24, 2024
1 parent 0bc9fbe commit 85b6bb1
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public class Environment {

private static final String ST_KAFKA_VERSION_DEFAULT = TestKafkaVersion.getDefaultSupportedKafkaVersion();
private static final String ST_CLIENTS_KAFKA_VERSION_DEFAULT = "3.7.0";
// TODO - remove overridden version from TieredStorageST - see TODO in the class
public static final String TEST_CLIENTS_VERSION_DEFAULT = "0.8.1";
public static final String ST_FILE_PLUGIN_URL_DEFAULT = "https://repo1.maven.org/maven2/org/apache/kafka/connect-file/" + ST_KAFKA_VERSION_DEFAULT + "/connect-file-" + ST_KAFKA_VERSION_DEFAULT + ".jar";
public static final String OLM_OPERATOR_VERSION_DEFAULT = "0.42.0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ public void configureFromEnv() {
cmdKubeClient(namespaceName).execInPod(podName, CMD, "configure", "common", "--from-env");
}

public String fetchOffsets(String topicName, String time) {
AdminTopicCommand adminTopicCommand = new AdminTopicCommand()
.withFetchOffsetsSubCommand()
.withTopicName(topicName)
.withTime(time)
.withOutputJson();

ExecResult result = cmdKubeClient(namespaceName).execInPod(podName, false, adminTopicCommand.getCommand());
return result.returnCode() == 0 ? result.out() : result.err();
}

static class AdminTopicCommand {
private final static String TOPIC_SUBCOMMAND = "topic";
private List<String> command = new ArrayList<>(List.of(CMD, TOPIC_SUBCOMMAND));
Expand Down Expand Up @@ -155,6 +166,11 @@ public AdminTopicCommand withListSubCommand() {
return this;
}

public AdminTopicCommand withFetchOffsetsSubCommand() {
this.command.add("fetch-offsets");
return this;
}

public AdminTopicCommand withTopicPrefix(String topicPrefix) {
this.command.addAll(List.of("-tpref", topicPrefix));
return this;
Expand Down Expand Up @@ -190,6 +206,11 @@ public AdminTopicCommand withAll() {
return this;
}

public AdminTopicCommand withTime(String time) {
this.command.addAll(List.of("--time", time));
return this;
}

public String[] getCommand() {
return this.command.toArray(new String[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
import io.strimzi.systemtest.enums.DeploymentTypes;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.kubernetes.NetworkPolicyResource;
import io.strimzi.test.TestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SetupMinio {
private static final Logger LOGGER = LogManager.getLogger(SetupMinio.class);
Expand Down Expand Up @@ -133,53 +130,4 @@ public static void createBucket(String namespace, String bucketName) {
"mb",
MINIO_STORAGE_ALIAS + "/" + bucketName);
}

/**
* Collect data from Minio about usage of a specific bucket
* @param namespace
* @param bucketName
* @return Overall statistics about the bucket in String format
*/
public static String getBucketSizeInfo(String namespace, String bucketName) {
final String minioPod = ResourceManager.kubeClient().listPods(namespace, Map.of(TestConstants.APP_POD_LABEL, MINIO)).get(0).getMetadata().getName();

return ResourceManager.cmdKubeClient().namespace(namespace).execInPod(minioPod,
"mc",
"stat",
"local/" + bucketName).out();

}

/**
* Parse out total size of bucket from the information about usage.
* @param bucketInfo String containing all stat info about bucket
* @return Map consists of parsed size and it's unit
*/
private static Map<String, Object> parseTotalSize(String bucketInfo) {
Pattern pattern = Pattern.compile("Total size:\\s*(?<size>[\\d.]+)\\s*(?<unit>.*)");
Matcher matcher = pattern.matcher(bucketInfo);

if (matcher.find()) {
return Map.of("size", Double.parseDouble(matcher.group("size")), "unit", matcher.group("unit"));
} else {
throw new IllegalArgumentException("Total size not found in the provided string");
}
}

/**
* Wait until size of the bucket is not 0 B.
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForDataInMinio(String namespace, String bucketName) {
TestUtils.waitFor("data sync from Kafka to Minio", TestConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestConstants.GLOBAL_TIMEOUT_LONG, () -> {
String bucketSizeInfo = SetupMinio.getBucketSizeInfo(namespace, bucketName);
Map<String, Object> parsedSize = parseTotalSize(bucketSizeInfo);
double bucketSize = (Double) parsedSize.get("size");
LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit"));
LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo);

return bucketSize > 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
*/
package io.strimzi.systemtest.utils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.strimzi.systemtest.TestConstants;
Expand Down Expand Up @@ -115,4 +118,18 @@ private static LabelSelector getLabelSelector(String adminName) {
.withMatchLabels(matchLabels)
.build();
}

public static long getPartitionsOffset(String data, String partition) throws JsonProcessingException {
// Create ObjectMapper instance
ObjectMapper mapper = new ObjectMapper();

// Read JSON string as JsonNode
JsonNode rootNode = mapper.readTree(data);

// Get the node for the partition number
JsonNode partitionNode = rootNode.get(partition);

// Get the offset value
return partitionNode.get("offset").asLong();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.systemtest.utils.specific;

import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.minio.SetupMinio;
import io.strimzi.test.TestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class MinioUtils {
private static final Logger LOGGER = LogManager.getLogger(SetupMinio.class);

private MinioUtils() {

}

/**
* Collect data from Minio about usage of a specific bucket
* @param namespace
* @param bucketName
* @return Overall statistics about the bucket in String format
*/
public static String getBucketSizeInfo(String namespace, String bucketName) {
final String minioPod = ResourceManager.kubeClient().listPods(namespace, Map.of(TestConstants.APP_POD_LABEL, SetupMinio.MINIO)).get(0).getMetadata().getName();

return ResourceManager.cmdKubeClient().namespace(namespace).execInPod(minioPod,
"mc",
"stat",
"local/" + bucketName).out();

}

/**
* Parse out total size of bucket from the information about usage.
* @param bucketInfo String containing all stat info about bucket
* @return Map consists of parsed size and it's unit
*/
private static Map<String, Object> parseTotalSize(String bucketInfo) {
Pattern pattern = Pattern.compile("Total size:\\s*(?<size>[\\d.]+)\\s*(?<unit>.*)");
Matcher matcher = pattern.matcher(bucketInfo);

if (matcher.find()) {
return Map.of("size", Double.parseDouble(matcher.group("size")), "unit", matcher.group("unit"));
} else {
throw new IllegalArgumentException("Total size not found in the provided string");
}
}

/**
* Wait until size of the bucket is not 0 B.
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForDataInMinio(String namespace, String bucketName) {
TestUtils.waitFor("data sync from Kafka to Minio", TestConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestConstants.GLOBAL_TIMEOUT_LONG, () -> {
String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName);
Map<String, Object> parsedSize = parseTotalSize(bucketSizeInfo);
double bucketSize = (Double) parsedSize.get("size");
LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit"));
LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo);

return bucketSize > 0;
});
}

/**
* Wait until size of the bucket is 0 B.
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForNoDataInMinio(String namespace, String bucketName) {
TestUtils.waitFor("data deletion in Minio", TestConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestConstants.GLOBAL_TIMEOUT_LONG, () -> {
String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName);
Map<String, Object> parsedSize = parseTotalSize(bucketSizeInfo);
double bucketSize = (Double) parsedSize.get("size");
LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit"));
LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo);

return bucketSize == 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,31 @@
*/
package io.strimzi.systemtest.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.systemtest.AbstractST;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.annotations.MicroShiftNotSupported;
import io.strimzi.systemtest.annotations.ParallelTest;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
import io.strimzi.systemtest.kafkaclients.internalClients.admin.AdminClient;
import io.strimzi.systemtest.resources.NamespaceManager;
import io.strimzi.systemtest.resources.NodePoolsConverter;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.crd.KafkaTopicResource;
import io.strimzi.systemtest.resources.imageBuild.ImageBuild;
import io.strimzi.systemtest.resources.minio.SetupMinio;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
import io.strimzi.systemtest.templates.specific.AdminClientTemplates;
import io.strimzi.systemtest.utils.AdminClientUtils;
import io.strimzi.systemtest.utils.ClientUtils;
import io.strimzi.systemtest.utils.specific.MinioUtils;
import io.strimzi.test.TestUtils;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -137,11 +146,56 @@ void testTieredStorageWithAivenPlugin() {

resourceManager.createResourceWithWait(clients.producerStrimzi());

SetupMinio.waitForDataInMinio(suiteStorage.getNamespaceName(), BUCKET_NAME);
MinioUtils.waitForDataInMinio(suiteStorage.getNamespaceName(), BUCKET_NAME);

// Create admin-client to check offsets
resourceManager.createResourceWithWait(
AdminClientTemplates.plainAdminClient(
testStorage.getNamespaceName(),
testStorage.getAdminName(),
KafkaResources.plainBootstrapAddress(testStorage.getClusterName())
)
.editSpec()
.editOrNewTemplate()
.editSpec()
.editFirstContainer()
// TODO - remove this when new version of clients will be available
.withImage("quay.io/strimzi-test-clients/test-clients:latest-kafka-" + Environment.ST_KAFKA_VERSION)
.endContainer()
.endSpec()
.endTemplate()
.endSpec()
.build()
);
final AdminClient adminClient = AdminClientUtils.getConfiguredAdminClient(testStorage.getNamespaceName(), testStorage.getAdminName());

TestUtils.waitFor("earliest-local offset to be higher than 0",
TestConstants.GLOBAL_POLL_INTERVAL_5_SECS, TestConstants.GLOBAL_TIMEOUT_LONG,
() -> {
// Fetch earliest-local offsets
// Check that data are not present locally, earliest-local offset should be higher than 0
String offsetData = adminClient.fetchOffsets(testStorage.getTopicName(), String.valueOf(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP));
long earliestLocalOffset = 0;
try {
earliestLocalOffset = AdminClientUtils.getPartitionsOffset(offsetData, "0");
LOGGER.info("earliest-local offset for topic {} is {}", testStorage.getTopicName(), earliestLocalOffset);
} catch (JsonProcessingException e) {
return false;
}
return earliestLocalOffset > 0;
});

ClientUtils.waitForInstantProducerClientSuccess(testStorage);

resourceManager.createResourceWithWait(clients.consumerStrimzi());
ClientUtils.waitForInstantConsumerClientSuccess(testStorage);

// Delete data
KafkaTopicResource.replaceTopicResourceInSpecificNamespace(
testStorage.getTopicName(),
topic -> topic.getSpec().getConfig().put("retention.ms", 10000), testStorage.getNamespaceName());

MinioUtils.waitForNoDataInMinio(suiteStorage.getNamespaceName(), BUCKET_NAME);
}

@BeforeAll
Expand Down

0 comments on commit 85b6bb1

Please sign in to comment.