Skip to content

Commit

Permalink
Add ability to download schema references
Browse files Browse the repository at this point in the history
  • Loading branch information
Feroze Daud committed Jan 4, 2024
1 parent 9015b82 commit a0f5014
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ data class DownloadSubject(
val outputPath: String,
val version: Int? = null,
val regex: Boolean = false,
val outputFileName: String? = null
val outputFileName: String? = null,
val downloadReferences: Boolean = false
)

data class MetadataExtension(
val enabled: Boolean = false,
val outputPath: String? = null
) {
constructor(enabled: Boolean) : this(enabled, null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,37 @@ class DownloadTaskAction(
val metadata = getSchemaMetadata(downloadSubject)
val outputDir = File(rootDir.toURI()).resolve(downloadSubject.outputPath)
outputDir.mkdirs()
if (metadataConfiguration.enabled) {
val metadataDirectory = if (metadataConfiguration.enabled) {
val metadataDirectory = metadataConfiguration.outputPath?.run {
File(rootDir.toURI()).resolve(this)
} ?: outputDir
metadataDirectory.mkdirs()
metadataDirectory
} else {
outputDir
}

if (metadataConfiguration.enabled) {
writeSchemaMetadata(downloadSubject, metadata, metadataDirectory)
}
writeSchemaFile(downloadSubject, metadata, outputDir)

if (downloadSubject.downloadReferences) {
metadata.references.forEach {
logger.infoIfNotQuiet("Start downloading referenced schema ${it.subject}/${it.version} for ${downloadSubject.subject}")
val referenceSubject = DownloadSubject(
subject = it.subject,
outputPath = downloadSubject.outputPath,
version = it.version
)
val referenceMetadata = getSchemaMetadata(referenceSubject)

if (metadataConfiguration.enabled) {
writeSchemaMetadata(referenceSubject, referenceMetadata, metadataDirectory)
}
writeSchemaFile(referenceSubject, referenceMetadata, outputDir)
}
}
} catch (e: Exception) {
logger.error("Error during schema retrieval for ${downloadSubject.subject}", e)
errorCount++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.github.imflog.schema.registry.tasks.download
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider
import org.assertj.core.api.Assertions
Expand Down Expand Up @@ -63,7 +64,7 @@ class DownloadTaskActionTest {
DownloadSubject(testSubject, outputDir),
DownloadSubject(fooSubject, outputDir)
),
MetadataExtension(),
MetadataExtension()
).run()

// then
Expand Down Expand Up @@ -136,7 +137,7 @@ class DownloadTaskActionTest {
arrayListOf(
DownloadSubject("te.*", outputDir, null, true)
),
MetadataExtension(),
MetadataExtension()
).run()

// then
Expand Down Expand Up @@ -184,7 +185,7 @@ class DownloadTaskActionTest {
registryClient,
folderRoot,
arrayListOf(DownloadSubject(subject, outputDir)),
MetadataExtension(),
MetadataExtension()
).run()

// then
Expand Down Expand Up @@ -220,7 +221,7 @@ class DownloadTaskActionTest {
DownloadSubject(invalidSubjectPattern, outputDir, null, true),
DownloadSubject("test", outputDir)
),
MetadataExtension(),
MetadataExtension()
).run()

// then
Expand Down Expand Up @@ -274,7 +275,7 @@ class DownloadTaskActionTest {
arrayListOf(
DownloadSubject("test", outputDir, v1Id)
),
MetadataExtension(),
MetadataExtension()
).run()

// Then
Expand Down Expand Up @@ -317,7 +318,7 @@ class DownloadTaskActionTest {
registryClient,
folderRoot,
arrayListOf(DownloadSubject(testSubject, outputDir)),
MetadataExtension(true),
MetadataExtension(true)
).run()

// then
Expand Down Expand Up @@ -365,7 +366,7 @@ class DownloadTaskActionTest {
registryClient,
folderRoot,
arrayListOf(DownloadSubject(testSubject, outputDir)),
MetadataExtension(true, metadataDir),
MetadataExtension(true, metadataDir)
).run()

// then
Expand All @@ -380,4 +381,149 @@ class DownloadTaskActionTest {
.containsIgnoringCase("\"schema\" :")
.containsIgnoringCase("\"references\" :")
}

@Test
fun `Should download referenced schemas with metadata in the specified output dir`() {
// given
val testSubject = "test"
val testLibSubject = "test_lib"
val outputDir = "src/main/avro/external"
val metadataDir = "src/main/avro/metadata"

val registryClient = MockSchemaRegistryClient(listOf(AvroSchemaProvider()))

registryClient.register(
testLibSubject,
registryClient.parseSchema(
AvroSchema.TYPE,
"""{
"type": "record",
"name": "test_lib",
"fields": [
{ "name": "name", "type": "string" }
]
}""",
listOf()
).get()
)

registryClient.register(
testSubject,
registryClient.parseSchema(
AvroSchema.TYPE,
"""{
"type": "record",
"name": "test",
"fields": [
{ "name": "name", "type": "test_lib" }
]
}""",
listOf(
SchemaReference(
"name",
testLibSubject,
1
)
)
).get()
)

folderRule.resolve("src/main/avro/external").toFile().mkdir()
val folderRoot = folderRule.toFile()

// when
val errorCount = DownloadTaskAction(
registryClient,
folderRoot,
arrayListOf(DownloadSubject(testSubject, outputDir, downloadReferences = true)),
MetadataExtension(true, metadataDir)
).run()

// then
Assertions.assertThat(errorCount).isEqualTo(0)
Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc")).isNotNull
Assertions.assertThat(File(folderRoot, "src/main/avro/metadata/test-metadata.json")).isNotNull
// Would be cleaner to use a JSON assertion library but I am not sure this is really required for now
Assertions.assertThat(File(folderRoot, "src/main/avro/metadata/test-metadata.json").readText())
.containsIgnoringCase("\"id\" :")
.containsIgnoringCase("\"version\" :")
.containsIgnoringCase("\"schema_type\" :")
.containsIgnoringCase("\"schema\" :")
.containsIgnoringCase("\"references\" :")

Assertions.assertThat(File(folderRoot, "src/main/avro/external/test_lib.avsc")).isNotNull
Assertions.assertThat(File(folderRoot, "src/main/avro/metadata/test_lib-metadata.json")).isNotNull
// Would be cleaner to use a JSON assertion library but I am not sure this is really required for now
Assertions.assertThat(File(folderRoot, "src/main/avro/metadata/test_lib-metadata.json").readText())
.containsIgnoringCase("\"id\" :")
.containsIgnoringCase("\"version\" :")
.containsIgnoringCase("\"schema_type\" :")
.containsIgnoringCase("\"schema\" :")
.containsIgnoringCase("\"references\" :")
}

@Test
fun `Should download referenced schemas in the specified output dir`() {
// given
val testSubject = "test"
val testLibSubject = "test_lib"
val outputDir = "src/main/avro/external"
val metadataDir = "src/main/avro/metadata"

val registryClient = MockSchemaRegistryClient(listOf(AvroSchemaProvider()))

registryClient.register(
testLibSubject,
registryClient.parseSchema(
AvroSchema.TYPE,
"""{
"type": "record",
"name": "test_lib",
"fields": [
{ "name": "name", "type": "string" }
]
}""",
listOf()
).get()
)

registryClient.register(
testSubject,
registryClient.parseSchema(
AvroSchema.TYPE,
"""{
"type": "record",
"name": "test",
"fields": [
{ "name": "name", "type": "test_lib" }
]
}""",
listOf(
SchemaReference(
"name",
testLibSubject,
1
)
)
).get()
)

folderRule.resolve("src/main/avro/external").toFile().mkdir()
val folderRoot = folderRule.toFile()

// when
val errorCount = DownloadTaskAction(
registryClient,
folderRoot,
arrayListOf(DownloadSubject(testSubject, outputDir, downloadReferences = true)),
MetadataExtension()
).run()

// then
Assertions.assertThat(errorCount).isEqualTo(0)
Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc")).isNotNull

Assertions.assertThat(File(folderRoot, "src/main/avro/external/test_lib.avsc")).isNotNull
// Would be cleaner to use a JSON assertion library but I am not sure this is really required for now
}
}

0 comments on commit a0f5014

Please sign in to comment.