From 9bb2ca6812ed854d467d74a6f85688d9a003d313 Mon Sep 17 00:00:00 2001 From: kwall Date: Tue, 28 May 2024 20:37:53 +0100 Subject: [PATCH] Allow test Kafka Clusters to use SASL SCRAM-SHA and OAUTH bearer wip - only KafkaCluster API refactored at the moment. Signed-off-by: kwall --- .../kafka/common/KafkaClusterConfig.java | 113 ++++++++++++++---- .../testing/kafka/common/Utils.java | 40 +++++-- .../testing/kafka/invm/InVMKafkaCluster.java | 8 +- .../TestcontainersKafkaCluster.java | 6 +- .../testing/kafka/KafkaClusterTest.java | 56 ++++++++- 5 files changed, 184 insertions(+), 39 deletions(-) diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java index 4acba090..9c9b7bf7 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/KafkaClusterConfig.java @@ -14,6 +14,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -31,6 +32,9 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.security.scram.ScramLoginModule; import org.apache.kafka.common.utils.AppInfoParser; import org.junit.jupiter.api.TestInfo; @@ -61,6 +65,9 @@ public class KafkaClusterConfig { public static final String INTERNAL_LISTENER_NAME = "INTERNAL"; public static final String ANON_LISTENER_NAME = "ANON"; + private static final String SASL_SCRAM_SHA_MECHANISM_PREFIX = "SCRAM-SHA-"; + private static final String SASL_PLAIN_MECHANISM_NAME = "PLAIN"; + private TestInfo testInfo; private KeytoolCertificateGenerator brokerKeytoolCertificateGenerator; private KeytoolCertificateGenerator clientKeytoolCertificateGenerator; @@ -90,6 +97,13 @@ public class KafkaClusterConfig { * will be used. */ private final String saslMechanism; + + /** + * name of login module that will be used to for client and broker. if null, the login module will be + * derived from the saslMechanism. + */ + private String loginModule; + private final String securityProtocol; @Builder.Default private Integer brokersNum = 1; @@ -105,6 +119,12 @@ public class KafkaClusterConfig { @Singular private final Map users; + @Singular + private final Map jaasServerOptions; + + @Singular + private final Map jaasClientOptions; + @Singular private final Map brokerConfigs; @@ -164,7 +184,7 @@ public static KafkaClusterConfig fromConstraints(List annotations, T } } if (annotation instanceof SaslPlainAuth.List saslPlainAuthList) { - builder.saslMechanism("PLAIN"); + builder.saslMechanism(SASL_PLAIN_MECHANISM_NAME); sasl = true; Map users = new HashMap<>(); for (var user : saslPlainAuthList.value()) { @@ -173,7 +193,7 @@ public static KafkaClusterConfig fromConstraints(List annotations, T builder.users(users); } else if (annotation instanceof SaslPlainAuth saslPlainAuth) { - builder.saslMechanism("PLAIN"); + builder.saslMechanism(SASL_PLAIN_MECHANISM_NAME); sasl = true; builder.users(Map.of(saslPlainAuth.user(), saslPlainAuth.password())); } @@ -361,18 +381,43 @@ private void configureSasl(Properties server) { if (saslMechanism != null) { putConfig(server, "sasl.enabled.mechanisms", saslMechanism); - var saslPairs = new StringBuilder(); + var lm = loginModule; + if (lm == null) { + lm = deriveLoginModuleFromSasl(saslMechanism); + } + + var serverOptions = Optional.ofNullable(jaasServerOptions).orElse(Map.of()).entrySet().stream(); + Stream> userOptions = Stream.empty(); + // Note Scram users are added to the server at after startup. + if (isSaslPlain()) { + userOptions = Optional.ofNullable(users).orElse(Map.of()).entrySet() + .stream() + .collect(Collectors.toMap(e -> String.format("user_%s", e.getKey()), Map.Entry::getValue)).entrySet().stream(); + } + + var moduleOptions = Stream.concat(serverOptions, userOptions) + .map(e -> String.join("=", e.getKey(), e.getValue())) + .collect(Collectors.joining(" ")); - Optional.ofNullable(users).orElse(Map.of()).forEach((key, value) -> { - saslPairs.append(String.format("user_%s", key)); - saslPairs.append("="); - saslPairs.append(value); - saslPairs.append(" "); - }); + var moduleConfig = String.format("%s required %s;", lm, moduleOptions); + var configKey = String.format("listener.name.%s.%s.sasl.jaas.config", EXTERNAL_LISTENER_NAME.toLowerCase(), saslMechanism.toLowerCase(Locale.ROOT)); - // TODO support other than PLAIN - String plainModuleConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required %s;", saslPairs); - putConfig(server, String.format("listener.name.%s.plain.sasl.jaas.config", EXTERNAL_LISTENER_NAME.toLowerCase()), plainModuleConfig); + putConfig(server, configKey, moduleConfig); + } + } + + private String deriveLoginModuleFromSasl(String saslMechanism) { + switch (saslMechanism.toUpperCase(Locale.ROOT)) { + case SASL_PLAIN_MECHANISM_NAME -> { + return PlainLoginModule.class.getName(); + } + case "SCRAM-SHA-256", "SCRAM-SHA-512" -> { + return ScramLoginModule.class.getName(); + } + case "OAUTHBEARER" -> { + return OAuthBearerLoginModule.class.getName(); + } + default -> throw new IllegalArgumentException("Cannot derive login module from saslMechanism %s".formatted(saslMechanism)); } } @@ -453,9 +498,9 @@ public Map getAnonConnectConfigForCluster(String bootstrapServer */ public Map getConnectConfigForCluster(String bootstrapServers) { if (saslMechanism != null) { - Map users = getUsers(); - if (!users.isEmpty()) { - Map.Entry first = users.entrySet().iterator().next(); + var externalUsers = getUsers(); + if (!externalUsers.isEmpty()) { + Map.Entry first = externalUsers.entrySet().iterator().next(); return getConnectConfigForCluster(bootstrapServers, first.getKey(), first.getValue(), getSecurityProtocol(), getSaslMechanism()); } else { @@ -536,21 +581,33 @@ public Map getConnectConfigForCluster(String bootstrapServers, S } if (saslMechanism != null) { + kafkaConfig.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + var lm = loginModule; + if (lm == null) { + lm = deriveLoginModuleFromSasl(saslMechanism); + } + if (securityProtocol == null) { kafkaConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name()); } - kafkaConfig.put(SaslConfigs.SASL_MECHANISM, saslMechanism); - if ("PLAIN".equals(saslMechanism)) { - if (user != null && password != null) { - kafkaConfig.put(SaslConfigs.SASL_JAAS_CONFIG, - String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", - user, password)); + var jaasOptions = new HashMap<>(jaasClientOptions == null ? Map.of() : jaasClientOptions); + + if (isSaslPlain() || isSaslScram()) { + if (user != null && !jaasOptions.containsKey("username")) { + jaasOptions.put("username", user); + } + if (password != null && !jaasOptions.containsKey("password")) { + jaasOptions.put("password", password); } } - else { - throw new IllegalStateException(String.format("unsupported SASL mechanism %s", saslMechanism)); - } + + var moduleOptions = jaasOptions.entrySet().stream() + .map(e -> String.join("=", e.getKey(), e.getValue())) + .collect(Collectors.joining(" ")); + + kafkaConfig.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("%s required %s;", lm, moduleOptions)); } kafkaConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); @@ -558,6 +615,14 @@ public Map getConnectConfigForCluster(String bootstrapServers, S return kafkaConfig; } + private boolean isSaslPlain() { + return this.saslMechanism != null && this.saslMechanism.toUpperCase(Locale.ROOT).equals(SASL_PLAIN_MECHANISM_NAME); + } + + public boolean isSaslScram() { + return this.saslMechanism != null && this.saslMechanism.toUpperCase(Locale.ROOT).startsWith(SASL_SCRAM_SHA_MECHANISM_PREFIX); + } + /** * Is the cluster coppering using Kraft Controller nodes. * diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/common/Utils.java b/impl/src/main/java/io/kroxylicious/testing/kafka/common/Utils.java index 8f03bc35..f393d0e7 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/common/Utils.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/common/Utils.java @@ -24,7 +24,11 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.ScramCredentialInfo; +import org.apache.kafka.clients.admin.ScramMechanism; import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UserScramCredentialAlteration; +import org.apache.kafka.clients.admin.UserScramCredentialUpsertion; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.InvalidReplicationFactorException; @@ -46,6 +50,7 @@ public class Utils { private static final Logger log = getLogger(Utils.class); private static final String CONSISTENCY_TEST = "__org_kroxylicious_testing_consistencyTest"; + private static final int SCRAM_ITERATIONS = 4096; private Utils() { } @@ -55,10 +60,10 @@ private Utils() { * have at least one replica elsewhere in the cluster. * * @param connectionConfig the connection config - * @param fromNodeId nodeId being evacuated - * @param toNodeId replacement nodeId - * @param timeout the timeout - * @param timeUnit the time unit + * @param fromNodeId nodeId being evacuated + * @param toNodeId replacement nodeId + * @param timeout the timeout + * @param timeUnit the time unit */ public static void awaitReassignmentOfKafkaInternalTopicsIfNecessary(Map connectionConfig, int fromNodeId, int toNodeId, int timeout, TimeUnit timeUnit) { @@ -113,9 +118,9 @@ public static void awaitReassignmentOfKafkaInternalTopicsIfNecessary(Map connectionConfig, int timeout, TimeUnit timeUnit, Integer expectedBrokerCount) { @@ -163,6 +168,22 @@ public static void awaitExpectedBrokerCountInClusterViaTopic(Map } } + public static void createUsersOnClusterIfNecessary(Map connectionConfig, KafkaClusterConfig clusterConfig) { + var users = clusterConfig.getUsers(); + var saslMechanism = clusterConfig.getSaslMechanism(); + if (users.isEmpty() || !clusterConfig.isSaslScram()) { + return; + } + var sci = new ScramCredentialInfo(ScramMechanism.fromMechanismName(saslMechanism), SCRAM_ITERATIONS); + try (var admin = Admin.create(connectionConfig)) { + // TODO fail gracefully if KRaft and Metaddata version does not yet support SCRAM + admin.alterUserScramCredentials(users.entrySet().stream() + .map(e -> new UserScramCredentialUpsertion(e.getKey(), sci, e.getValue())) + .map(UserScramCredentialAlteration.class::cast) + .toList()).all().toCompletionStage().toCompletableFuture().join(); + } + } + /* * There are edge cases where deleting the topic isn't possible. Primarily `delete.topic.enable==false`. * Rather than attempt to detect that in advance (as that requires an RPC) we catch that and return normally @@ -229,14 +250,15 @@ private static boolean isRetryable(Throwable potentiallyWrapped) { var throwable = potentiallyWrapped instanceof CompletionException && potentiallyWrapped.getCause() != null ? potentiallyWrapped.getCause() : potentiallyWrapped; boolean retriable = throwable instanceof RetriableException && (throwable.getMessage() == null - || !throwable.getMessage().contains("The AdminClient is not accepting new calls") /* workaround for KAFKA-15507 */ ); + || !throwable.getMessage().contains("The AdminClient is not accepting new calls") /* workaround for KAFKA-15507 */); return retriable || throwable instanceof InvalidReplicationFactorException || (throwable instanceof TopicExistsException && throwable.getMessage().contains("is marked for deletion")); } /** * Factory for {@link Awaitility#await()} preconfigured with defaults. - * @param timeout at most timeout + * + * @param timeout at most timeout * @param timeUnit at most {@link TimeUnit} * @return preconfigured factory */ diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java index 50cd540a..b11bb342 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/invm/InVMKafkaCluster.java @@ -56,8 +56,6 @@ import io.kroxylicious.testing.kafka.common.Utils; import io.kroxylicious.testing.kafka.internal.AdminSource; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION; - /** * Configures and manages an in process (within the JVM) Kafka cluster. */ @@ -134,7 +132,7 @@ private static void prepareLogDirsForKraft(String clusterId, KafkaConfig config, boolean.class); // note ignoreFormatter=true so tolerate a log directory which is already formatted. this is // required to support start/stop. - formatCommand.invoke(null, LOGGING_PRINT_STREAM, directories, metaProperties, MINIMUM_BOOTSTRAP_VERSION, true); + formatCommand.invoke(null, LOGGING_PRINT_STREAM, directories, metaProperties, MetadataVersion.latestProduction(), true); } catch (Exception e) { throw new RuntimeException("failed to prepare log dirs for KRaft", e); @@ -207,10 +205,14 @@ public synchronized void start() { tryToStartServerWithRetry(configHolder, server); servers.put(configHolder.getBrokerNum(), server); }); + var anonConnectConfigForCluster = clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))); Utils.awaitExpectedBrokerCountInClusterViaTopic( clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), 120, TimeUnit.SECONDS, clusterConfig.getBrokersNum()); + + Utils.createUsersOnClusterIfNecessary(anonConnectConfigForCluster, clusterConfig); + } private void tryToStartServerWithRetry(KafkaClusterConfig.ConfigHolder configHolder, Server server) { diff --git a/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java b/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java index 7f5c9154..edf729a4 100644 --- a/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java +++ b/impl/src/main/java/io/kroxylicious/testing/kafka/testcontainers/TestcontainersKafkaCluster.java @@ -334,10 +334,14 @@ public synchronized void start() { zookeeper.start(); } Startables.deepStart(nodes.values().stream()).get(READY_TIMEOUT_SECONDS, TimeUnit.SECONDS); + + var anonConnectConfigForCluster = clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))); awaitExpectedBrokerCountInClusterViaTopic( - clusterConfig.getAnonConnectConfigForCluster(buildBrokerList(nodeId -> getEndpointPair(Listener.ANON, nodeId))), + anonConnectConfigForCluster, READY_TIMEOUT_SECONDS, TimeUnit.SECONDS, clusterConfig.getBrokersNum()); + + Utils.createUsersOnClusterIfNecessary(anonConnectConfigForCluster, clusterConfig); } catch (InterruptedException | ExecutionException | TimeoutException e) { if (e instanceof InterruptedException) { diff --git a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java index 6b11f190..6bb5eb56 100644 --- a/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java +++ b/impl/src/test/java/io/kroxylicious/testing/kafka/KafkaClusterTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.awaitility.Awaitility; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; @@ -337,7 +338,7 @@ void kafkaClusterKraftDisallowsControllerRemoval() throws Exception { } @Test - void kafkaClusterKraftModeWithAuth() throws Exception { + void kafkaClusterKraftModeWithPlainAuth() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .kraftMode(true) .testInfo(testInfo) @@ -351,7 +352,7 @@ void kafkaClusterKraftModeWithAuth() throws Exception { } @Test - void kafkaClusterZookeeperModeWithAuth() throws Exception { + void kafkaClusterZookeeperModeWithPlainAuth() throws Exception { try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() .testInfo(testInfo) .kraftMode(false) @@ -364,6 +365,28 @@ void kafkaClusterZookeeperModeWithAuth() throws Exception { } } + static Stream kafkaClusterWithSaslScramAuth() { + return Stream.of( + Arguments.of("SCRAM-SHA-256", true), + Arguments.of("SCRAM-SHA-512", true), + Arguments.of("SCRAM-SHA-512", false)); + } + + @ParameterizedTest + @MethodSource + void kafkaClusterWithSaslScramAuth(String mechanism, boolean kraft) throws Exception { + try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() + .kraftMode(kraft) + .testInfo(testInfo) + .securityProtocol("SASL_PLAINTEXT") + .saslMechanism(mechanism) + .user("guest", "pass") + .build())) { + cluster.start(); + verifyRecordRoundTrip(1, cluster); + } + } + @Test void kafkaClusterKraftModeSASL_SSL_ClientUsesSSLClientAuth() throws Exception { createClientCertificate(); @@ -484,6 +507,35 @@ void kafkaClusterZookeeperModeSASL_SSL_ClientNoAuth() throws Exception { } } + @Test + void kafkaClusterKraftModeWithOAuthBearerAuth() throws Exception { + try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() + .kraftMode(true) + .testInfo(testInfo) + .saslMechanism("OAUTHBEARER") + .securityProtocol("SASL_PLAINTEXT") + .jaasClientOption("unsecuredLoginStringClaim_sub", "thePrincipalName") + .build())) { + cluster.start(); + verifyRecordRoundTrip(1, cluster); + } + } + + @Test + @Disabled + void kafkaClusterKraftModeWithOAuthBearerAuthFail() throws Exception { + try (var cluster = KafkaClusterFactory.create(KafkaClusterConfig.builder() + .kraftMode(true) + .testInfo(testInfo) + .saslMechanism("OAUTHBEARER") + .securityProtocol("SASL_PLAINTEXT") + // .jaasClientOption("unsecuredLoginStringClaim_sub", "") + .build())) { + cluster.start(); + verifyRecordRoundTrip(1, cluster); + } + } + private void verifyRecordRoundTrip(int expected, KafkaCluster cluster) throws Exception { var topic = "roundTrip" + Uuid.randomUuid(); var message = "Hello, world!";