diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d4216f..218fa1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [0.5-SNAPSHOT] -- +- New custom delimiter option for CsvAclParser +- Refactoring to allow new AclParser formats ## [0.4] - 09/09/2018 - Added S3 Acl Source (#27) diff --git a/README.md b/README.md index 6c27768..597f1ff 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,7 @@ The [default configurations](src/main/resources/application.conf) can be overwri - `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka. - `com.github.simplesteph.ksm.notification.ConsoleNotification` (default): Print changes to the console. Useful for logging - `com.github.simplesteph.ksm.notification.SlackNotification`: Send notifications to a Slack channel (useful for devops / admin team) +- `ACL_PARSER_CSV_DELIMITER`: Change the delimiter character for the CSV Parser (useful when you have SSL) # Running on Docker diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 0a42e61..f5c8fb1 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -12,8 +12,12 @@ ksm { readonly = true readonly = ${?KSM_READONLY} +} + +parser { csv { delimiter = "," + delimiter = ${?ACL_PARSER_CSV_DELIMITER} } } diff --git a/src/main/scala/com/github/simplesteph/ksm/AclSynchronizer.scala b/src/main/scala/com/github/simplesteph/ksm/AclSynchronizer.scala index 6303be9..9aca6c2 100644 --- a/src/main/scala/com/github/simplesteph/ksm/AclSynchronizer.scala +++ b/src/main/scala/com/github/simplesteph/ksm/AclSynchronizer.scala @@ -1,9 +1,11 @@ package com.github.simplesteph.ksm import com.github.simplesteph.ksm.notification.Notification +import com.github.simplesteph.ksm.parser.AclParser import com.github.simplesteph.ksm.source.{SourceAcl, SourceAclResult} import kafka.security.auth.{Acl, Authorizer, Resource} import org.slf4j.{Logger, LoggerFactory} + import scala.util.{Failure, Success, Try} object AclSynchronizer { @@ -52,6 +54,7 @@ object AclSynchronizer { class AclSynchronizer(authorizer: Authorizer, sourceAcl: SourceAcl, notification: Notification, + aclParser: AclParser, readOnly: Boolean = false) { import AclSynchronizer._ @@ -70,7 +73,7 @@ class AclSynchronizer(authorizer: Authorizer, def run(): Unit = if (!readOnly) { // parse the source of the ACL - Try(sourceAcl.refresh()) match { + Try(sourceAcl.refresh(aclParser)) match { case Success(result) => result match { // the source has not changed diff --git a/src/main/scala/com/github/simplesteph/ksm/AppConfig.scala b/src/main/scala/com/github/simplesteph/ksm/AppConfig.scala index 371692c..0a9f7fd 100644 --- a/src/main/scala/com/github/simplesteph/ksm/AppConfig.scala +++ b/src/main/scala/com/github/simplesteph/ksm/AppConfig.scala @@ -50,7 +50,11 @@ class AppConfig(config: Config) { val refreshFrequencyMs: Int = ksmConfig.getInt("refresh.frequency.ms") val extract: Boolean = ksmConfig.getBoolean("extract") val readOnly: Boolean = ksmConfig.getBoolean("readonly") - val csvDelimiter: String = ksmConfig.getString("csv.delimiter") + } + + object Parser { + private val aclParserConfig = config.getConfig("parser") + val csvDelimiter: Char = aclParserConfig.getString("csv.delimiter").charAt(0) } object GRPC { diff --git a/src/main/scala/com/github/simplesteph/ksm/KafkaSecurityManager.scala b/src/main/scala/com/github/simplesteph/ksm/KafkaSecurityManager.scala index d3e5517..973ffb1 100644 --- a/src/main/scala/com/github/simplesteph/ksm/KafkaSecurityManager.scala +++ b/src/main/scala/com/github/simplesteph/ksm/KafkaSecurityManager.scala @@ -19,13 +19,15 @@ object KafkaSecurityManager extends App { var isCancelled: AtomicBoolean = new AtomicBoolean(false) var grpcServer: KsmGrpcServer = _ var aclSynchronizer: AclSynchronizer = _ + val aclParser = new CsvAclParser(appConfig.Parser.csvDelimiter) if (appConfig.KSM.extract) { - new ExtractAcl(appConfig.Authorizer.authorizer, CsvAclParser).extract() + new ExtractAcl(appConfig.Authorizer.authorizer, aclParser).extract() } else { aclSynchronizer = new AclSynchronizer(appConfig.Authorizer.authorizer, appConfig.Source.sourceAcl, appConfig.Notification.notification, + aclParser, appConfig.KSM.readOnly) Try { diff --git a/src/main/scala/com/github/simplesteph/ksm/grpc/KsmGrpcServer.scala b/src/main/scala/com/github/simplesteph/ksm/grpc/KsmGrpcServer.scala index 6a45abf..27cb8da 100644 --- a/src/main/scala/com/github/simplesteph/ksm/grpc/KsmGrpcServer.scala +++ b/src/main/scala/com/github/simplesteph/ksm/grpc/KsmGrpcServer.scala @@ -1,6 +1,6 @@ package com.github.simplesteph.ksm.grpc -import com.github.simplesteph.ksm.{AclSynchronizer, KafkaSecurityManager} +import com.github.simplesteph.ksm.AclSynchronizer import com.security.kafka.pb.ksm.{KsmServiceGrpc, KsmServiceHandler} import grpcgateway.server.{GrpcGatewayServer, GrpcGatewayServerBuilder} import io.grpc.protobuf.services.ProtoReflectionService diff --git a/src/main/scala/com/github/simplesteph/ksm/grpc/KsmServiceImpl.scala b/src/main/scala/com/github/simplesteph/ksm/grpc/KsmServiceImpl.scala index f6baeb6..148a8f9 100644 --- a/src/main/scala/com/github/simplesteph/ksm/grpc/KsmServiceImpl.scala +++ b/src/main/scala/com/github/simplesteph/ksm/grpc/KsmServiceImpl.scala @@ -2,12 +2,12 @@ package com.github.simplesteph.ksm.grpc import com.github.simplesteph.ksm.AclSynchronizer import com.github.simplesteph.ksm.utils.ProtoConversionUtils +import com.security.kafka.pb.ksm.KsmServiceGrpc.KsmService import com.security.kafka.pb.ksm.{ GetAllAclsRequest, GetAllAclsResponse, ResourceAndAclPb } -import com.security.kafka.pb.ksm.KsmServiceGrpc.KsmService import kafka.security.auth.{Acl, Resource} import scala.concurrent.Future diff --git a/src/main/scala/com/github/simplesteph/ksm/parser/CsvAclParser.scala b/src/main/scala/com/github/simplesteph/ksm/parser/CsvAclParser.scala index 221e5b1..772da91 100644 --- a/src/main/scala/com/github/simplesteph/ksm/parser/CsvAclParser.scala +++ b/src/main/scala/com/github/simplesteph/ksm/parser/CsvAclParser.scala @@ -2,10 +2,8 @@ package com.github.simplesteph.ksm.parser import java.io.Reader -import com.github.simplesteph.ksm.AppConfig import com.github.simplesteph.ksm.source.SourceAclResult import com.github.tototoshi.csv.{CSVFormat, CSVReader, QUOTE_MINIMAL, Quoting} -import com.typesafe.config.ConfigFactory import kafka.security.auth._ import org.apache.kafka.common.resource.PatternType import org.apache.kafka.common.utils.SecurityUtils @@ -14,41 +12,19 @@ import org.slf4j.LoggerFactory import scala.collection.immutable import scala.util.{Failure, Success, Try} -class CsvAclParser - /** * Parser that assumes that all ACLs are flattened * and live under a CSV format. * The CSV is expected to have headers as outlined below and in the example * Empty lines in the CSV should be ignored */ -object CsvAclParser extends AclParser { +class CsvAclParser(delimiterInput: Char = ',') extends AclParser { - private val log = LoggerFactory.getLogger(classOf[CsvAclParser]) - - final val KAFKA_PRINCIPAL_COL = "KafkaPrincipal" - final val RESOURCE_TYPE_COL = "ResourceType" - final val RESOURCE_NAME_COL = "ResourceName" - final val OPERATION_COL = "Operation" - final val PERMISSION_TYPE_COL = "PermissionType" - final val HOST_COL = "Host" - final val PATTERN_TYPE_COL = "PatternType" - - final val EXPECTED_COLS = List(KAFKA_PRINCIPAL_COL, - RESOURCE_TYPE_COL, - PATTERN_TYPE_COL, - RESOURCE_NAME_COL, - OPERATION_COL, - PERMISSION_TYPE_COL, - HOST_COL, - ) - - val config = ConfigFactory.load() - val appConfig: AppConfig = new AppConfig(config) + import CsvAclParser._ // we treat empty lines as Nil hence the format override implicit val csvFormat: CSVFormat = new CSVFormat { - val delimiter: Char = appConfig.KSM.csvDelimiter.charAt(0) + val delimiter: Char = delimiterInput val quoteChar: Char = '"' val escapeChar: Char = '"' val lineTerminator: String = "\r\n" @@ -75,10 +51,11 @@ object CsvAclParser extends AclParser { case Success(pt) => pt case Failure(e: NoSuchElementException) => // column is missing - log.warn(s"""Since you upgraded to Kafka 2.0, your CSV needs to include an extra column '$PATTERN_TYPE_COL', after $RESOURCE_TYPE_COL and before $RESOURCE_NAME_COL. - |The CSV header should be: KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host - |For a quick fix, you can run the application with KSM_EXTRACT=true and replace your current CSV with the output of the command - |For backwards compatibility, the default value $PATTERN_TYPE_COL=LITERAL has been chosen""".stripMargin) + log.warn( + s"""Since you upgraded to Kafka 2.0, your CSV needs to include an extra column '$PATTERN_TYPE_COL', after $RESOURCE_TYPE_COL and before $RESOURCE_NAME_COL. + |The CSV header should be: KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host + |For a quick fix, you can run the application with KSM_EXTRACT=true and replace your current CSV with the output of the command + |For backwards compatibility, the default value $PATTERN_TYPE_COL=LITERAL has been chosen""".stripMargin) // Default PatternType.LITERAL case Failure(e) => @@ -120,13 +97,13 @@ object CsvAclParser extends AclParser { r.name, a.operation.toString, a.permissionType.toString, - a.host).mkString(appConfig.KSM.csvDelimiter) + a.host).mkString(delimiterInput.toString) } override def formatAcls(acls: List[(Resource, Acl)]): String = { val sb = new StringBuilder // header - sb.append(EXPECTED_COLS.mkString(appConfig.KSM.csvDelimiter)) + sb.append(EXPECTED_COLS.mkString(delimiterInput.toString)) sb.append(System.getProperty("line.separator")) // rows acls.foreach { @@ -136,5 +113,27 @@ object CsvAclParser extends AclParser { } sb.toString() } +} + +object CsvAclParser { + + private val log = LoggerFactory.getLogger(classOf[CsvAclParser]) + + final val KAFKA_PRINCIPAL_COL = "KafkaPrincipal" + final val RESOURCE_TYPE_COL = "ResourceType" + final val RESOURCE_NAME_COL = "ResourceName" + final val OPERATION_COL = "Operation" + final val PERMISSION_TYPE_COL = "PermissionType" + final val HOST_COL = "Host" + final val PATTERN_TYPE_COL = "PatternType" + + final val EXPECTED_COLS = List(KAFKA_PRINCIPAL_COL, + RESOURCE_TYPE_COL, + PATTERN_TYPE_COL, + RESOURCE_NAME_COL, + OPERATION_COL, + PERMISSION_TYPE_COL, + HOST_COL, + ) } diff --git a/src/main/scala/com/github/simplesteph/ksm/source/FileSourceAcl.scala b/src/main/scala/com/github/simplesteph/ksm/source/FileSourceAcl.scala index 98fe307..1a79bb6 100644 --- a/src/main/scala/com/github/simplesteph/ksm/source/FileSourceAcl.scala +++ b/src/main/scala/com/github/simplesteph/ksm/source/FileSourceAcl.scala @@ -2,7 +2,7 @@ package com.github.simplesteph.ksm.source import java.io.{File, FileReader} -import com.github.simplesteph.ksm.parser.CsvAclParser +import com.github.simplesteph.ksm.parser.AclParser import com.typesafe.config.Config class FileSourceAcl extends SourceAcl { @@ -27,11 +27,11 @@ class FileSourceAcl extends SourceAcl { * Uses a CSV parser on the file afterwards * @return */ - override def refresh(): Option[SourceAclResult] = { + override def refresh(aclParser: AclParser): Option[SourceAclResult] = { val file = new File(filename) if (file.lastModified() > lastModified) { val reader = new FileReader(file) - val res = CsvAclParser.aclsFromReader(reader) + val res = aclParser.aclsFromReader(reader) reader.close() lastModified = file.lastModified() Some(res) diff --git a/src/main/scala/com/github/simplesteph/ksm/source/GitHubSourceAcl.scala b/src/main/scala/com/github/simplesteph/ksm/source/GitHubSourceAcl.scala index 2009deb..308478d 100644 --- a/src/main/scala/com/github/simplesteph/ksm/source/GitHubSourceAcl.scala +++ b/src/main/scala/com/github/simplesteph/ksm/source/GitHubSourceAcl.scala @@ -5,7 +5,7 @@ import java.nio.charset.Charset import java.util.Base64 import com.fasterxml.jackson.databind.ObjectMapper -import com.github.simplesteph.ksm.parser.CsvAclParser +import com.github.simplesteph.ksm.parser.AclParser import com.typesafe.config.Config import org.slf4j.LoggerFactory import skinny.http.{HTTP, HTTPException, Request, Response} @@ -48,7 +48,7 @@ class GitHubSourceAcl extends SourceAcl { tokenOpt = Try(config.getString(AUTH_TOKEN_CONFIG)).toOption } - override def refresh(): Option[SourceAclResult] = { + override def refresh(aclParser: AclParser): Option[SourceAclResult] = { val url = s"https://$hostname/repos/$user/$repo/contents/$filepath?ref=$branch" val request: Request = new Request(url) @@ -76,7 +76,7 @@ class GitHubSourceAcl extends SourceAcl { b64encodedContent.replace("\n", "").replace("\r", "")), Charset.forName("UTF-8")) // use the CSV Parser - Some(CsvAclParser.aclsFromReader(new StringReader(data))) + Some(aclParser.aclsFromReader(new StringReader(data))) case 304 => None case _ => diff --git a/src/main/scala/com/github/simplesteph/ksm/source/NoSourceAcl.scala b/src/main/scala/com/github/simplesteph/ksm/source/NoSourceAcl.scala index 8162980..4c8ef6c 100644 --- a/src/main/scala/com/github/simplesteph/ksm/source/NoSourceAcl.scala +++ b/src/main/scala/com/github/simplesteph/ksm/source/NoSourceAcl.scala @@ -1,4 +1,5 @@ package com.github.simplesteph.ksm.source +import com.github.simplesteph.ksm.parser.AclParser import com.typesafe.config.Config class NoSourceAcl extends SourceAcl { @@ -24,7 +25,7 @@ class NoSourceAcl extends SourceAcl { * * @return */ - override def refresh(): Option[SourceAclResult] = None + override def refresh(aclParser: AclParser): Option[SourceAclResult] = None /** * Close all the necessary underlying objects or connections belonging to this instance diff --git a/src/main/scala/com/github/simplesteph/ksm/source/S3SourceAcl.scala b/src/main/scala/com/github/simplesteph/ksm/source/S3SourceAcl.scala index 550006d..848b30f 100644 --- a/src/main/scala/com/github/simplesteph/ksm/source/S3SourceAcl.scala +++ b/src/main/scala/com/github/simplesteph/ksm/source/S3SourceAcl.scala @@ -1,14 +1,14 @@ package com.github.simplesteph.ksm.source +import java.io._ +import java.util.Date + import com.amazonaws.regions.Regions import com.amazonaws.services.s3._ import com.amazonaws.services.s3.model._ -import java.io._ -import com.github.simplesteph.ksm.parser.CsvAclParser -import org.slf4j.LoggerFactory +import com.github.simplesteph.ksm.parser.AclParser import com.typesafe.config.Config - -import java.util.Date +import org.slf4j.LoggerFactory class S3SourceAcl extends SourceAcl { @@ -48,7 +48,7 @@ class S3SourceAcl extends SourceAcl { * * @return */ - override def refresh(): Option[SourceAclResult] = { + override def refresh(aclParser: AclParser): Option[SourceAclResult] = { val s3Client = AmazonS3ClientBuilder.standard.withRegion(Regions.fromName(region)).build val s3object = Option( @@ -60,7 +60,7 @@ class S3SourceAcl extends SourceAcl { val reader = new BufferedReader( new InputStreamReader(bucket.getObjectContent)) lastModified = bucket.getObjectMetadata.getLastModified - val res = CsvAclParser.aclsFromReader(reader) + val res = aclParser.aclsFromReader(reader) reader.close() bucket.close() Some(res) diff --git a/src/main/scala/com/github/simplesteph/ksm/source/SourceAcl.scala b/src/main/scala/com/github/simplesteph/ksm/source/SourceAcl.scala index 67e6338..4eca78c 100644 --- a/src/main/scala/com/github/simplesteph/ksm/source/SourceAcl.scala +++ b/src/main/scala/com/github/simplesteph/ksm/source/SourceAcl.scala @@ -1,5 +1,6 @@ package com.github.simplesteph.ksm.source +import com.github.simplesteph.ksm.parser.AclParser import com.typesafe.config.Config trait SourceAcl { @@ -24,7 +25,7 @@ trait SourceAcl { * Kafka Security Manager will not update Acls in Kafka until there are no errors in the result * @return */ - def refresh(): Option[SourceAclResult] + def refresh(aclParser: AclParser): Option[SourceAclResult] /** * Close all the necessary underlying objects or connections belonging to this instance diff --git a/src/test/scala/com/github/simplesteph/ksm/AclSynchronizerTest.scala b/src/test/scala/com/github/simplesteph/ksm/AclSynchronizerTest.scala index 7adeb37..ac7e82f 100644 --- a/src/test/scala/com/github/simplesteph/ksm/AclSynchronizerTest.scala +++ b/src/test/scala/com/github/simplesteph/ksm/AclSynchronizerTest.scala @@ -1,7 +1,8 @@ package com.github.simplesteph.ksm import com.github.simplesteph.ksm.notification.{ConsoleNotification, DummyNotification} -import com.github.simplesteph.ksm.source.{DummySourceAcl, NoSourceAcl, SourceAcl, SourceAclResult} +import com.github.simplesteph.ksm.parser.{AclParser, CsvAclParser} +import com.github.simplesteph.ksm.source.{DummySourceAcl, SourceAcl, SourceAclResult} import com.typesafe.config.Config import kafka.security.auth._ import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} @@ -15,6 +16,9 @@ class AclSynchronizerTest extends FlatSpec with EmbeddedKafka with Matchers with import TestFixtures._ + val aclParser = new CsvAclParser() + + val kafkaGroupedAcls: Map[Resource, Set[Acl]] = Map( res1 -> Set(acl1, acl2), res2 -> Set(acl3), @@ -50,12 +54,9 @@ class AclSynchronizerTest extends FlatSpec with EmbeddedKafka with Matchers with val dummySourceAcl = new DummySourceAcl val dummyNotification = new DummyNotification + val aclParser = new CsvAclParser() - val aclSynchronizer: AclSynchronizer = new AclSynchronizer( - simpleAclAuthorizer, - dummySourceAcl, - dummyNotification, - ) + val aclSynchronizer: AclSynchronizer = new AclSynchronizer(simpleAclAuthorizer, dummySourceAcl, dummyNotification, aclParser, false) // first iteration @@ -110,11 +111,9 @@ class AclSynchronizerTest extends FlatSpec with EmbeddedKafka with Matchers with val dummySourceAcl = new DummySourceAcl - val aclSynchronizer: AclSynchronizer = new AclSynchronizer( - simpleAclAuthorizer, - dummySourceAcl, - new ConsoleNotification - ) + val aclParser = new CsvAclParser() + + val aclSynchronizer: AclSynchronizer = new AclSynchronizer(simpleAclAuthorizer, dummySourceAcl, new ConsoleNotification, aclParser) // first iteration aclSynchronizer.run() @@ -154,11 +153,8 @@ class AclSynchronizerTest extends FlatSpec with EmbeddedKafka with Matchers with val dummySourceAcl = new DummySourceAcl val dummyNotification = new DummyNotification - val aclSynchronizer: AclSynchronizer = new AclSynchronizer( - simpleAclAuthorizer, - dummySourceAcl, - dummyNotification, - ) + + val aclSynchronizer: AclSynchronizer = new AclSynchronizer(simpleAclAuthorizer, dummySourceAcl, dummyNotification, aclParser) // first iteration @@ -208,11 +204,7 @@ class AclSynchronizerTest extends FlatSpec with EmbeddedKafka with Matchers with val dummySourceAcl = new DummySourceAcl val dummyNotification = new DummyNotification - val aclSynchronizer: AclSynchronizer = new AclSynchronizer( - simpleAclAuthorizer, - dummySourceAcl, - dummyNotification, - ) + val aclSynchronizer: AclSynchronizer = new AclSynchronizer(simpleAclAuthorizer, dummySourceAcl, dummyNotification, aclParser) // first iteration @@ -243,7 +235,7 @@ class AclSynchronizerTest extends FlatSpec with EmbeddedKafka with Matchers with var refreshCalled = false override val CONFIG_PREFIX: String = "" override def configure(config: Config): Unit = {} - override def refresh(): Option[SourceAclResult] = { + override def refresh(aclParser: AclParser): Option[SourceAclResult] = { refreshCalled = true None } @@ -252,12 +244,7 @@ class AclSynchronizerTest extends FlatSpec with EmbeddedKafka with Matchers with val dummyNotification = new DummyNotification - val aclSynchronizer: AclSynchronizer = new AclSynchronizer( - simpleAclAuthorizer, - controlSourceAcl, - dummyNotification, - readOnly = true - ) + val aclSynchronizer: AclSynchronizer = new AclSynchronizer(simpleAclAuthorizer, controlSourceAcl, dummyNotification, aclParser , readOnly = true) simpleAclAuthorizer.addAcls(Set(acl1), res1) diff --git a/src/test/scala/com/github/simplesteph/ksm/DummyAuthorizer.scala b/src/test/scala/com/github/simplesteph/ksm/DummyAuthorizer.scala index d294f25..679d94d 100644 --- a/src/test/scala/com/github/simplesteph/ksm/DummyAuthorizer.scala +++ b/src/test/scala/com/github/simplesteph/ksm/DummyAuthorizer.scala @@ -4,7 +4,7 @@ import java.util import com.github.simplesteph.ksm.TestFixtures._ import kafka.network.RequestChannel -import kafka.security.auth.{ Acl, Authorizer, Operation, Resource } +import kafka.security.auth.{Acl, Authorizer, Operation, Resource} import org.apache.kafka.common.security.auth.KafkaPrincipal class DummyAuthorizer() extends Authorizer { diff --git a/src/test/scala/com/github/simplesteph/ksm/grpc/KsmServiceImplTest.scala b/src/test/scala/com/github/simplesteph/ksm/grpc/KsmServiceImplTest.scala index b632419..fc91dbc 100644 --- a/src/test/scala/com/github/simplesteph/ksm/grpc/KsmServiceImplTest.scala +++ b/src/test/scala/com/github/simplesteph/ksm/grpc/KsmServiceImplTest.scala @@ -1,25 +1,20 @@ package com.github.simplesteph.ksm.grpc -import com.github.simplesteph.ksm.{ AclSynchronizer, DummyAuthorizer } import com.github.simplesteph.ksm.notification.DummyNotification +import com.github.simplesteph.ksm.parser.CsvAclParser import com.github.simplesteph.ksm.source.DummySourceAcl +import com.github.simplesteph.ksm.{AclSynchronizer, DummyAuthorizer} import com.security.kafka.pb.ksm.OperationTypePb._ import com.security.kafka.pb.ksm.PermissionTypePb._ import com.security.kafka.pb.ksm.ResourceTypePb._ import com.security.kafka.pb.ksm._ -import org.scalatest.{ AsyncFlatSpec, FlatSpec, Matchers } - -import scala.collection.Map -import scala.concurrent.Future +import org.scalatest.{AsyncFlatSpec, Matchers} class KsmServiceImplTest extends AsyncFlatSpec with Matchers { val dummySourceAcl = new DummySourceAcl - val ksmServiceImpl = new KsmServiceImpl(new AclSynchronizer( - new DummyAuthorizer(), - dummySourceAcl, - new DummyNotification)) + val ksmServiceImpl = new KsmServiceImpl(new AclSynchronizer(new DummyAuthorizer(), dummySourceAcl, new DummyNotification, new CsvAclParser)) "getAllAcls" should "return all Acls" in { ksmServiceImpl.getAllAcls(new GetAllAclsRequest) map { diff --git a/src/test/scala/com/github/simplesteph/ksm/notification/DummyNotification.scala b/src/test/scala/com/github/simplesteph/ksm/notification/DummyNotification.scala index 68adea8..a69ae30 100644 --- a/src/test/scala/com/github/simplesteph/ksm/notification/DummyNotification.scala +++ b/src/test/scala/com/github/simplesteph/ksm/notification/DummyNotification.scala @@ -1,8 +1,6 @@ package com.github.simplesteph.ksm.notification -import java.util.concurrent.atomic.AtomicInteger - import com.typesafe.config.Config -import kafka.security.auth.{ Acl, Resource } +import kafka.security.auth.{Acl, Resource} import scala.collection.mutable import scala.util.Try diff --git a/src/test/scala/com/github/simplesteph/ksm/parser/CsvAclParserTest.scala b/src/test/scala/com/github/simplesteph/ksm/parser/CsvAclParserTest.scala index 66af8df..f42a0b9 100644 --- a/src/test/scala/com/github/simplesteph/ksm/parser/CsvAclParserTest.scala +++ b/src/test/scala/com/github/simplesteph/ksm/parser/CsvAclParserTest.scala @@ -21,9 +21,10 @@ class CsvAclParserTest extends FlatSpec with Matchers { val resource = Resource(Topic, "test", PatternType.LITERAL) val acl = Acl(SecurityUtils.parseKafkaPrincipal("User:alice"), Allow, "*", Read) + val csvAclParser = new CsvAclParser(',') "parseRow" should "correctly parse a Row" in { - CsvAclParser.parseRow(row) shouldBe((resource, acl)) + csvAclParser.parseRow(row) shouldBe((resource, acl)) } "aclsFromCsv" should "correctly parse a Correct CSV" in { @@ -44,7 +45,7 @@ class CsvAclParserTest extends FlatSpec with Matchers { val res2 = Resource(Group, "bar", PatternType.LITERAL) val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) - val res = CsvAclParser.aclsFromReader(new StringReader(csv)) + val res = csvAclParser.aclsFromReader(new StringReader(csv)) res.errs shouldBe List() @@ -56,7 +57,6 @@ class CsvAclParserTest extends FlatSpec with Matchers { } - "aclsFromCsv" should "catch all errors and catch all correct" in { // 1 correct, 1 wrong data, 1 missing column @@ -76,7 +76,7 @@ class CsvAclParserTest extends FlatSpec with Matchers { val res2 = Resource(Group, "bar", PatternType.LITERAL) val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) - val res = CsvAclParser.aclsFromReader(new StringReader(csv)) + val res = csvAclParser.aclsFromReader(new StringReader(csv)) res.errs.size shouldBe 2 val throwable1 = res.errs.head.get @@ -103,7 +103,7 @@ class CsvAclParserTest extends FlatSpec with Matchers { |User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow |""".stripMargin - val res = CsvAclParser.aclsFromReader(new StringReader(csv)) + val res = csvAclParser.aclsFromReader(new StringReader(csv)) res.errs.size shouldBe 3 @@ -112,7 +112,7 @@ class CsvAclParserTest extends FlatSpec with Matchers { "asCsv" should "correctly write CSV Row" in { val acl1 = Acl(SecurityUtils.parseKafkaPrincipal("User:alice"), Allow, "*", Read) val res1 = Resource(Topic, "foo", PatternType.LITERAL) - val res = CsvAclParser.asCsv(res1, acl1) + val res = csvAclParser.asCsv(res1, acl1) res shouldBe "User:alice,Topic,LITERAL,foo,Read,Allow,*" } @@ -133,9 +133,50 @@ class CsvAclParserTest extends FlatSpec with Matchers { val res2 = Resource(Group, "bar", PatternType.PREFIXED) val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) - val res = CsvAclParser.formatAcls(List((res1, acl1),(res2, acl2), (res3, acl3))) + val res = csvAclParser.formatAcls(List((res1, acl1),(res2, acl2), (res3, acl3))) res shouldBe csv } + + + "aclsFromCsv" should "correctly parse a Correct CSV with different delimiter" in { + val funnyCsvAclParser = new CsvAclParser('?') + val csv = + """KafkaPrincipal?ResourceType?PatternType?ResourceName?Operation?PermissionType?Host + |User:alice?Topic?LITERAL?foo?Read?Allow?* + |User:bob?Group?LITERAL?bar?Write?Deny?12.34.56.78 + | + |User:peter?Cluster?LITERAL?kafka-cluster?Create?Allow?* + |""".stripMargin + + + val acl1 = Acl(SecurityUtils.parseKafkaPrincipal("User:alice"), Allow, "*", Read) + val acl2 = Acl(SecurityUtils.parseKafkaPrincipal("User:bob"), Deny, "12.34.56.78", Write) + val acl3 = Acl(SecurityUtils.parseKafkaPrincipal("User:peter"), Allow, "*", Create) + + val res1 = Resource(Topic, "foo", PatternType.LITERAL) + val res2 = Resource(Group, "bar", PatternType.LITERAL) + val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) + + val res = funnyCsvAclParser.aclsFromReader(new StringReader(csv)) + + res.errs shouldBe List() + + res.acls shouldBe Set( + res1 -> acl1, + res2 -> acl2, + res3 -> acl3 + ) + + } + + + "asCsv" should "correctly write CSV Row with different delimiter" in { + val funnyCsvAclParser = new CsvAclParser('?') + val acl1 = Acl(SecurityUtils.parseKafkaPrincipal("User:alice"), Allow, "*", Read) + val res1 = Resource(Topic, "foo", PatternType.LITERAL) + val res = funnyCsvAclParser.asCsv(res1, acl1) + res shouldBe "User:alice?Topic?LITERAL?foo?Read?Allow?*" + } } diff --git a/src/test/scala/com/github/simplesteph/ksm/source/DummySourceAcl.scala b/src/test/scala/com/github/simplesteph/ksm/source/DummySourceAcl.scala index e84e962..942d893 100644 --- a/src/test/scala/com/github/simplesteph/ksm/source/DummySourceAcl.scala +++ b/src/test/scala/com/github/simplesteph/ksm/source/DummySourceAcl.scala @@ -1,11 +1,10 @@ package com.github.simplesteph.ksm.source +import com.github.simplesteph.ksm.TestFixtures._ +import com.github.simplesteph.ksm.parser.AclParser import com.typesafe.config.Config -import kafka.security.auth._ -import org.apache.kafka.common.utils.SecurityUtils import scala.util.Try -import com.github.simplesteph.ksm.TestFixtures._ class DummySourceAcl extends SourceAcl { @@ -33,7 +32,7 @@ class DummySourceAcl extends SourceAcl { // all state changes val sars: Iterator[SourceAclResult] = List(sar1, sar2, sar3).iterator - override def refresh(): Option[SourceAclResult] = { + override def refresh(aclParser: AclParser): Option[SourceAclResult] = { if(noneNext){ noneNext = false None diff --git a/src/test/scala/com/github/simplesteph/ksm/source/FileSourceAclTest.scala b/src/test/scala/com/github/simplesteph/ksm/source/FileSourceAclTest.scala index 676cc2d..0077042 100644 --- a/src/test/scala/com/github/simplesteph/ksm/source/FileSourceAclTest.scala +++ b/src/test/scala/com/github/simplesteph/ksm/source/FileSourceAclTest.scala @@ -4,6 +4,7 @@ import java.io.File import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} +import com.github.simplesteph.ksm.parser.CsvAclParser import kafka.security.auth._ import org.apache.kafka.common.resource.PatternType import org.apache.kafka.common.utils.SecurityUtils @@ -11,6 +12,8 @@ import org.scalatest.{FlatSpec, Matchers} class FileSourceAclTest extends FlatSpec with Matchers { + val csvlAclParser = new CsvAclParser() + "fileSourceAcl Refresh" should "correctly parse a file" in { val file = File.createTempFile("ksm", "test") @@ -35,7 +38,7 @@ class FileSourceAclTest extends FlatSpec with Matchers { val res2 = Resource(Group, "bar", PatternType.PREFIXED) val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) - fileSourceAcl.refresh() shouldBe Some(SourceAclResult(Set(res1 -> acl1, res2 -> acl2, res3 -> acl3), List())) + fileSourceAcl.refresh(csvlAclParser) shouldBe Some(SourceAclResult(Set(res1 -> acl1, res2 -> acl2, res3 -> acl3), List())) } "fileSourceAcl Refresh" should "correctly parse a file and then refresh after changes" in { @@ -62,7 +65,7 @@ class FileSourceAclTest extends FlatSpec with Matchers { val res2 = Resource(Group, "bar", PatternType.PREFIXED) val res3 = Resource(Cluster, "kafka-cluster", PatternType.LITERAL) - fileSourceAcl.refresh() shouldBe Some(SourceAclResult(Set(res1 -> acl1, res2 -> acl2, res3 -> acl3), List())) + fileSourceAcl.refresh(csvlAclParser) shouldBe Some(SourceAclResult(Set(res1 -> acl1, res2 -> acl2, res3 -> acl3), List())) val content2 = """KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host @@ -73,7 +76,7 @@ class FileSourceAclTest extends FlatSpec with Matchers { // we force the modification of the time of the file so that the test passes file.setLastModified(System.currentTimeMillis() + 10000) - fileSourceAcl.refresh() shouldBe Some(SourceAclResult(Set(res1 -> acl1), List())) + fileSourceAcl.refresh(csvlAclParser) shouldBe Some(SourceAclResult(Set(res1 -> acl1), List())) }