Skip to content

Commit

Permalink
refactored AclParser to have proper class instantiantion (#37)
Browse files Browse the repository at this point in the history
* refactored AclParser to have proper class instantiantion

* added test on custom delimiter

* refactoring of delimiter and added documentation
  • Loading branch information
simplesteph authored Oct 3, 2018
1 parent 5262b20 commit 13fd92d
Show file tree
Hide file tree
Showing 21 changed files with 148 additions and 109 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.5-SNAPSHOT]
-
- New custom delimiter option for CsvAclParser
- Refactoring to allow new AclParser formats

## [0.4] - 09/09/2018
- Added S3 Acl Source (#27)
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The [default configurations](src/main/resources/application.conf) can be overwri
- `NOTIFICATION_CLASS`: Class for notification in case of ACL changes in Kafka.
- `com.github.simplesteph.ksm.notification.ConsoleNotification` (default): Print changes to the console. Useful for logging
- `com.github.simplesteph.ksm.notification.SlackNotification`: Send notifications to a Slack channel (useful for devops / admin team)
- `ACL_PARSER_CSV_DELIMITER`: Change the delimiter character for the CSV Parser (useful when you have SSL)

# Running on Docker

Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ ksm {
readonly = true
readonly = ${?KSM_READONLY}

}

parser {
csv {
delimiter = ","
delimiter = ${?ACL_PARSER_CSV_DELIMITER}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.github.simplesteph.ksm

import com.github.simplesteph.ksm.notification.Notification
import com.github.simplesteph.ksm.parser.AclParser
import com.github.simplesteph.ksm.source.{SourceAcl, SourceAclResult}
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.slf4j.{Logger, LoggerFactory}

import scala.util.{Failure, Success, Try}

object AclSynchronizer {
Expand Down Expand Up @@ -52,6 +54,7 @@ object AclSynchronizer {
class AclSynchronizer(authorizer: Authorizer,
sourceAcl: SourceAcl,
notification: Notification,
aclParser: AclParser,
readOnly: Boolean = false) {

import AclSynchronizer._
Expand All @@ -70,7 +73,7 @@ class AclSynchronizer(authorizer: Authorizer,
def run(): Unit = if (!readOnly) {

// parse the source of the ACL
Try(sourceAcl.refresh()) match {
Try(sourceAcl.refresh(aclParser)) match {
case Success(result) =>
result match {
// the source has not changed
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/com/github/simplesteph/ksm/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ class AppConfig(config: Config) {
val refreshFrequencyMs: Int = ksmConfig.getInt("refresh.frequency.ms")
val extract: Boolean = ksmConfig.getBoolean("extract")
val readOnly: Boolean = ksmConfig.getBoolean("readonly")
val csvDelimiter: String = ksmConfig.getString("csv.delimiter")
}

object Parser {
private val aclParserConfig = config.getConfig("parser")
val csvDelimiter: Char = aclParserConfig.getString("csv.delimiter").charAt(0)
}

object GRPC {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ object KafkaSecurityManager extends App {
var isCancelled: AtomicBoolean = new AtomicBoolean(false)
var grpcServer: KsmGrpcServer = _
var aclSynchronizer: AclSynchronizer = _
val aclParser = new CsvAclParser(appConfig.Parser.csvDelimiter)

if (appConfig.KSM.extract) {
new ExtractAcl(appConfig.Authorizer.authorizer, CsvAclParser).extract()
new ExtractAcl(appConfig.Authorizer.authorizer, aclParser).extract()
} else {
aclSynchronizer = new AclSynchronizer(appConfig.Authorizer.authorizer,
appConfig.Source.sourceAcl,
appConfig.Notification.notification,
aclParser,
appConfig.KSM.readOnly)

Try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.simplesteph.ksm.grpc

import com.github.simplesteph.ksm.{AclSynchronizer, KafkaSecurityManager}
import com.github.simplesteph.ksm.AclSynchronizer
import com.security.kafka.pb.ksm.{KsmServiceGrpc, KsmServiceHandler}
import grpcgateway.server.{GrpcGatewayServer, GrpcGatewayServerBuilder}
import io.grpc.protobuf.services.ProtoReflectionService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package com.github.simplesteph.ksm.grpc

import com.github.simplesteph.ksm.AclSynchronizer
import com.github.simplesteph.ksm.utils.ProtoConversionUtils
import com.security.kafka.pb.ksm.KsmServiceGrpc.KsmService
import com.security.kafka.pb.ksm.{
GetAllAclsRequest,
GetAllAclsResponse,
ResourceAndAclPb
}
import com.security.kafka.pb.ksm.KsmServiceGrpc.KsmService
import kafka.security.auth.{Acl, Resource}

import scala.concurrent.Future
Expand Down
65 changes: 32 additions & 33 deletions src/main/scala/com/github/simplesteph/ksm/parser/CsvAclParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package com.github.simplesteph.ksm.parser

import java.io.Reader

import com.github.simplesteph.ksm.AppConfig
import com.github.simplesteph.ksm.source.SourceAclResult
import com.github.tototoshi.csv.{CSVFormat, CSVReader, QUOTE_MINIMAL, Quoting}
import com.typesafe.config.ConfigFactory
import kafka.security.auth._
import org.apache.kafka.common.resource.PatternType
import org.apache.kafka.common.utils.SecurityUtils
Expand All @@ -14,41 +12,19 @@ import org.slf4j.LoggerFactory
import scala.collection.immutable
import scala.util.{Failure, Success, Try}

class CsvAclParser

/**
* Parser that assumes that all ACLs are flattened
* and live under a CSV format.
* The CSV is expected to have headers as outlined below and in the example
* Empty lines in the CSV should be ignored
*/
object CsvAclParser extends AclParser {
class CsvAclParser(delimiterInput: Char = ',') extends AclParser {

private val log = LoggerFactory.getLogger(classOf[CsvAclParser])

final val KAFKA_PRINCIPAL_COL = "KafkaPrincipal"
final val RESOURCE_TYPE_COL = "ResourceType"
final val RESOURCE_NAME_COL = "ResourceName"
final val OPERATION_COL = "Operation"
final val PERMISSION_TYPE_COL = "PermissionType"
final val HOST_COL = "Host"
final val PATTERN_TYPE_COL = "PatternType"

final val EXPECTED_COLS = List(KAFKA_PRINCIPAL_COL,
RESOURCE_TYPE_COL,
PATTERN_TYPE_COL,
RESOURCE_NAME_COL,
OPERATION_COL,
PERMISSION_TYPE_COL,
HOST_COL,
)

val config = ConfigFactory.load()
val appConfig: AppConfig = new AppConfig(config)
import CsvAclParser._

// we treat empty lines as Nil hence the format override
implicit val csvFormat: CSVFormat = new CSVFormat {
val delimiter: Char = appConfig.KSM.csvDelimiter.charAt(0)
val delimiter: Char = delimiterInput
val quoteChar: Char = '"'
val escapeChar: Char = '"'
val lineTerminator: String = "\r\n"
Expand All @@ -75,10 +51,11 @@ object CsvAclParser extends AclParser {
case Success(pt) => pt
case Failure(e: NoSuchElementException) =>
// column is missing
log.warn(s"""Since you upgraded to Kafka 2.0, your CSV needs to include an extra column '$PATTERN_TYPE_COL', after $RESOURCE_TYPE_COL and before $RESOURCE_NAME_COL.
|The CSV header should be: KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
|For a quick fix, you can run the application with KSM_EXTRACT=true and replace your current CSV with the output of the command
|For backwards compatibility, the default value $PATTERN_TYPE_COL=LITERAL has been chosen""".stripMargin)
log.warn(
s"""Since you upgraded to Kafka 2.0, your CSV needs to include an extra column '$PATTERN_TYPE_COL', after $RESOURCE_TYPE_COL and before $RESOURCE_NAME_COL.
|The CSV header should be: KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
|For a quick fix, you can run the application with KSM_EXTRACT=true and replace your current CSV with the output of the command
|For backwards compatibility, the default value $PATTERN_TYPE_COL=LITERAL has been chosen""".stripMargin)
// Default
PatternType.LITERAL
case Failure(e) =>
Expand Down Expand Up @@ -120,13 +97,13 @@ object CsvAclParser extends AclParser {
r.name,
a.operation.toString,
a.permissionType.toString,
a.host).mkString(appConfig.KSM.csvDelimiter)
a.host).mkString(delimiterInput.toString)
}

override def formatAcls(acls: List[(Resource, Acl)]): String = {
val sb = new StringBuilder
// header
sb.append(EXPECTED_COLS.mkString(appConfig.KSM.csvDelimiter))
sb.append(EXPECTED_COLS.mkString(delimiterInput.toString))
sb.append(System.getProperty("line.separator"))
// rows
acls.foreach {
Expand All @@ -136,5 +113,27 @@ object CsvAclParser extends AclParser {
}
sb.toString()
}
}

object CsvAclParser {

private val log = LoggerFactory.getLogger(classOf[CsvAclParser])

final val KAFKA_PRINCIPAL_COL = "KafkaPrincipal"
final val RESOURCE_TYPE_COL = "ResourceType"
final val RESOURCE_NAME_COL = "ResourceName"
final val OPERATION_COL = "Operation"
final val PERMISSION_TYPE_COL = "PermissionType"
final val HOST_COL = "Host"
final val PATTERN_TYPE_COL = "PatternType"

final val EXPECTED_COLS = List(KAFKA_PRINCIPAL_COL,
RESOURCE_TYPE_COL,
PATTERN_TYPE_COL,
RESOURCE_NAME_COL,
OPERATION_COL,
PERMISSION_TYPE_COL,
HOST_COL,
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.simplesteph.ksm.source

import java.io.{File, FileReader}

import com.github.simplesteph.ksm.parser.CsvAclParser
import com.github.simplesteph.ksm.parser.AclParser
import com.typesafe.config.Config

class FileSourceAcl extends SourceAcl {
Expand All @@ -27,11 +27,11 @@ class FileSourceAcl extends SourceAcl {
* Uses a CSV parser on the file afterwards
* @return
*/
override def refresh(): Option[SourceAclResult] = {
override def refresh(aclParser: AclParser): Option[SourceAclResult] = {
val file = new File(filename)
if (file.lastModified() > lastModified) {
val reader = new FileReader(file)
val res = CsvAclParser.aclsFromReader(reader)
val res = aclParser.aclsFromReader(reader)
reader.close()
lastModified = file.lastModified()
Some(res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.nio.charset.Charset
import java.util.Base64

import com.fasterxml.jackson.databind.ObjectMapper
import com.github.simplesteph.ksm.parser.CsvAclParser
import com.github.simplesteph.ksm.parser.AclParser
import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import skinny.http.{HTTP, HTTPException, Request, Response}
Expand Down Expand Up @@ -48,7 +48,7 @@ class GitHubSourceAcl extends SourceAcl {
tokenOpt = Try(config.getString(AUTH_TOKEN_CONFIG)).toOption
}

override def refresh(): Option[SourceAclResult] = {
override def refresh(aclParser: AclParser): Option[SourceAclResult] = {
val url =
s"https://$hostname/repos/$user/$repo/contents/$filepath?ref=$branch"
val request: Request = new Request(url)
Expand Down Expand Up @@ -76,7 +76,7 @@ class GitHubSourceAcl extends SourceAcl {
b64encodedContent.replace("\n", "").replace("\r", "")),
Charset.forName("UTF-8"))
// use the CSV Parser
Some(CsvAclParser.aclsFromReader(new StringReader(data)))
Some(aclParser.aclsFromReader(new StringReader(data)))
case 304 =>
None
case _ =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.github.simplesteph.ksm.source
import com.github.simplesteph.ksm.parser.AclParser
import com.typesafe.config.Config

class NoSourceAcl extends SourceAcl {
Expand All @@ -24,7 +25,7 @@ class NoSourceAcl extends SourceAcl {
*
* @return
*/
override def refresh(): Option[SourceAclResult] = None
override def refresh(aclParser: AclParser): Option[SourceAclResult] = None

/**
* Close all the necessary underlying objects or connections belonging to this instance
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.github.simplesteph.ksm.source

import java.io._
import java.util.Date

import com.amazonaws.regions.Regions
import com.amazonaws.services.s3._
import com.amazonaws.services.s3.model._
import java.io._
import com.github.simplesteph.ksm.parser.CsvAclParser
import org.slf4j.LoggerFactory
import com.github.simplesteph.ksm.parser.AclParser
import com.typesafe.config.Config

import java.util.Date
import org.slf4j.LoggerFactory

class S3SourceAcl extends SourceAcl {

Expand Down Expand Up @@ -48,7 +48,7 @@ class S3SourceAcl extends SourceAcl {
*
* @return
*/
override def refresh(): Option[SourceAclResult] = {
override def refresh(aclParser: AclParser): Option[SourceAclResult] = {
val s3Client =
AmazonS3ClientBuilder.standard.withRegion(Regions.fromName(region)).build
val s3object = Option(
Expand All @@ -60,7 +60,7 @@ class S3SourceAcl extends SourceAcl {
val reader = new BufferedReader(
new InputStreamReader(bucket.getObjectContent))
lastModified = bucket.getObjectMetadata.getLastModified
val res = CsvAclParser.aclsFromReader(reader)
val res = aclParser.aclsFromReader(reader)
reader.close()
bucket.close()
Some(res)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.simplesteph.ksm.source

import com.github.simplesteph.ksm.parser.AclParser
import com.typesafe.config.Config

trait SourceAcl {
Expand All @@ -24,7 +25,7 @@ trait SourceAcl {
* Kafka Security Manager will not update Acls in Kafka until there are no errors in the result
* @return
*/
def refresh(): Option[SourceAclResult]
def refresh(aclParser: AclParser): Option[SourceAclResult]

/**
* Close all the necessary underlying objects or connections belonging to this instance
Expand Down
Loading

0 comments on commit 13fd92d

Please sign in to comment.