diff --git a/README.md b/README.md index e2fe5ac..3ae8384 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,7 @@ However, for stable images tag a release. ## Run - `./ccloud-schema-exporter -batchExport` : Running the app with this flag will perform a batch export. +Starting v1.1, `-batchExport` can be declared with `-syncDeletes` to perform an export of soft deleted schemas. - `./ccloud-schema-exporter -sync` : Running the app with this flag will start a continuous sync between the source and destination schema registries. - `./ccloud-schema-exporter -getLocalCopy` : Running the app with this flag will get a snapshot of your Schema Registry @@ -148,10 +149,10 @@ NOTE: Lists aren't respected with the utility `-deleteAllFromDestination` #### A note on syncing hard deletions -As of v1.1, `ccloud-schema-exporter` provides an efficient way of syncing hard deletions. +Starting v1.1, `ccloud-schema-exporter` provides an efficient way of syncing hard deletions. In previous versions, this was done through inefficient lookups. -Support for syncing hard deletions is only when the source and destination are both a Confluent Cloud Schema Registry. +Support for syncing hard deletions applies when the source and destination are both a Confluent Cloud Schema Registries. #### Non-Interactive Run @@ -172,8 +173,8 @@ If you'd like more info on how to change the Schema Registry mode to enable non- #### Extendability: Custom Sources and Destinations -`ccloud-schema-exporter` supports custom implementations of source registries and destination registries. -If you'd like to leverage the already built back-end, all you have to do is an implementation the `CustomSource` or `CustomDestination` interface. +`ccloud-schema-exporter` supports custom implementations of sources and destinations. +If you'd like to leverage the already built back-end, all you have to do is an implementation of the `CustomSource` or `CustomDestination` interfaces. A copy of the interface definitions is below for convenience: ```` @@ -181,8 +182,7 @@ type CustomSource interface { // Perform any set-up behavior before start of sync/batch export SetUp() error // An implementation should handle the retrieval of a schema from the source. - // The id should be a unique identifier for the schema. - GetSchema(SchemaSourceID int64) (subject string, version int64, id int64, stype string, schema string, err error) + GetSchema(subject string, version int64) (id int64, stype string, schema string, err error) // An implementation should be able to send exactly one map describing the state of the source // This map should be minimal. Describing only the Subject and Versions that exist. GetSourceState() (map[string][]int64, error) @@ -227,7 +227,6 @@ var customSrcFactory = map[string]client.CustomSource{ You will see that these maps already have one entry, that is because `ccloud-schema-exporter` comes with sample implementations of the interface under `cmd/internals/customDestination.go` and `cmd/internals/customSource.go`, check them out! -Make sure to add your implementation to this map. For the custom source example, there is an implementation to allow sourcing schemas from Apicurio into Schema Registry. It defaults to looking for Apicurio in `http://localhost:8081`, but you can override it by providing a mapping @@ -238,7 +237,7 @@ Note: The schemas get exported using record names (all treated as `-value`), so Once added, all you have to do is indicate you will want to run with a custom source/destination with the `-customSource | -customDestination` flag. The value of this flag must be the name you gave it in the factory mapping. -The following options are respected for custom destinations as well: +The following options are respected for custom sources / destinations as well: ```` -allowList value diff --git a/cmd/ccloud-schema-exporter/ccloud-schema-exporter.go b/cmd/ccloud-schema-exporter/ccloud-schema-exporter.go index 7f640c1..b92de42 100644 --- a/cmd/ccloud-schema-exporter/ccloud-schema-exporter.go +++ b/cmd/ccloud-schema-exporter/ccloud-schema-exporter.go @@ -2,7 +2,7 @@ package main // // ccloud-schema-exporter.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -33,6 +33,9 @@ func main() { if client.CustomSourceName != "" { destClient := client.NewSchemaRegistryClient(client.DestSRUrl, client.DestSRKey, client.DestSRSecret, "dst") + if !client.NoPrompt { + preflightWriteChecks(destClient) + } if client.ThisRun == client.BATCH { client.RunCustomSourceBatch(destClient, customSrcFactory[client.CustomSourceName]) @@ -53,6 +56,9 @@ func main() { } destClient := client.NewSchemaRegistryClient(client.DestSRUrl, client.DestSRKey, client.DestSRSecret, "dst") + if !client.NoPrompt { + preflightWriteChecks(destClient) + } client.WriteFromFS(destClient, client.PathToWrite, workingDir) @@ -95,23 +101,64 @@ func main() { } destClient := client.NewSchemaRegistryClient(client.DestSRUrl, client.DestSRKey, client.DestSRSecret, "dst") + if !client.NoPrompt { + preflightWriteChecks(destClient) + } + + if (!strings.HasSuffix(srcClient.SRUrl, "confluent.cloud") || + !strings.HasSuffix(destClient.SRUrl, "confluent.cloud")) && + client.ThisRun == client.SYNC && client.SyncHardDeletes && !client.NoPrompt { + + fmt.Println("It looks like you are trying to sync hard deletions between non-Confluent Cloud Schema Registries") + fmt.Println("Starting v1.1, ccloud-schema-exporter only supports hard deletion sync between Confluent Cloud Schema Registries") + fmt.Println("------------------------------------------------------") + fmt.Println("Do you wish to continue? (Y/n)") + + var text string + + _, err := fmt.Scanln(&text) + if err != nil { + log.Fatal(err) + } + + if !strings.EqualFold(text, "Y") { + os.Exit(0) + } + } + + if client.ThisRun == client.SYNC { + client.Sync(srcClient, destClient) + } + if client.ThisRun == client.BATCH { + client.BatchExport(srcClient, destClient) + } + + log.Println("-----------------------------------------------") + + if client.ThisRun == client.BATCH { + log.Println("Resetting target to READWRITE") + destClient.SetMode(client.READWRITE) + } + + log.Println("All Done! Thanks for using ccloud-schema-exporter!") + +} + +func preflightWriteChecks (destClient *client.SchemaRegistryClient) { + if !destClient.IsReachable() { log.Println("Could not reach destination registry. Possible bad credentials?") os.Exit(0) } - destChan := make(chan map[string][]int64) - go destClient.GetSubjectsWithVersions(destChan) - destSubjects := <-destChan - close(destChan) - - if len(destSubjects) != 0 && client.ThisRun != client.SYNC && !client.NoPrompt { + destSubjects := client.GetCurrentSubjectState(destClient) + if len(destSubjects) != 0 && client.ThisRun != client.SYNC { log.Println("You have existing subjects registered in the destination registry, exporter cannot write schemas when " + "previous schemas exist in batch mode.") os.Exit(0) } - if !destClient.IsImportModeReady() && !client.NoPrompt { + if !destClient.IsImportModeReady() { fmt.Println("Destination Schema Registry is not set to IMPORT mode!") fmt.Println("------------------------------------------------------") @@ -136,7 +183,7 @@ func main() { } } - if !destClient.IsCompatReady() && !client.NoPrompt { + if !destClient.IsCompatReady() { fmt.Println("Destination Schema Registry is not set to NONE global compatibility level!") fmt.Println("We assume the source to be maintaining correct compatibility between registrations, per subject compatibility changes are not supported.") @@ -159,42 +206,4 @@ func main() { log.Println("Continuing without NONE Global Compatibility Level. Note this might arise some failures in registration of some schemas.") } } - - if (!strings.HasSuffix(srcClient.SRUrl, "confluent.cloud") || - !strings.HasSuffix(destClient.SRUrl, "confluent.cloud")) && - client.ThisRun == client.SYNC && client.SyncHardDeletes && !client.NoPrompt { - - fmt.Println("It looks like you are trying to sync hard deletions between non-Confluent Cloud Schema Registries") - fmt.Println("Starting v1.1, ccloud-schema-exporter only supports hard deletion sync between Confluent Cloud Schema Registries") - fmt.Println("------------------------------------------------------") - fmt.Println("Do you wish to continue? (Y/n)") - - var text string - - _, err := fmt.Scanln(&text) - if err != nil { - log.Fatal(err) - } - - if !strings.EqualFold(text, "Y") { - os.Exit(0) - } - } - - if client.ThisRun == client.SYNC { - client.Sync(srcClient, destClient) - } - if client.ThisRun == client.BATCH { - client.BatchExport(srcClient, destClient) - } - - log.Println("-----------------------------------------------") - - if client.ThisRun == client.BATCH { - log.Println("Resetting target to READWRITE") - destClient.SetMode(client.READWRITE) - } - - log.Println("All Done! Thanks for using ccloud-schema-exporter!") - } diff --git a/cmd/integrationTests/exporter-integration_test.go b/cmd/integrationTests/exporter-integration_test.go index 7f5dacd..5e6be2a 100644 --- a/cmd/integrationTests/exporter-integration_test.go +++ b/cmd/integrationTests/exporter-integration_test.go @@ -1,5 +1,11 @@ package integration +// +// exporter-integration_test.go +// Copyright 2020 Abraham Leal +// + + import ( client "github.com/abraham-leal/ccloud-schema-exporter/cmd/internals" "github.com/stretchr/testify/assert" diff --git a/cmd/integrationTests/hardDeletetionTesting/hardDeleteCloud_test.go b/cmd/integrationTests/hardDeletetionTesting/hardDeleteCloud_test.go index 707067e..8fd96c1 100644 --- a/cmd/integrationTests/hardDeletetionTesting/hardDeleteCloud_test.go +++ b/cmd/integrationTests/hardDeletetionTesting/hardDeleteCloud_test.go @@ -1,5 +1,11 @@ package integration_deletion +// +// hardDeleteCloud_test.go +// Copyright 2020 Abraham Leal +// + + import ( client "github.com/abraham-leal/ccloud-schema-exporter/cmd/internals" "github.com/stretchr/testify/assert" @@ -98,7 +104,7 @@ func printSubjectTestResult(srcSubjects map[string][]int64, destSubjects map[str log.Printf("Destination subject-version mapping contents: %v", destSubjects) } -func printIDTestResult(srcIDs map[int64]map[string]int64, dstIDs map[int64]map[string]int64) { +func printIDTestResult(srcIDs map[int64]map[string][]int64, dstIDs map[int64]map[string][]int64) { log.Printf("Source IDs contents: %v", srcIDs) log.Printf("Destination IDs contents: %v", dstIDs) } diff --git a/cmd/internals/context.go b/cmd/internals/context.go index 631bcb7..3fd7e22 100644 --- a/cmd/internals/context.go +++ b/cmd/internals/context.go @@ -2,7 +2,7 @@ package client // // context.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( diff --git a/cmd/internals/customDestination.go b/cmd/internals/customDestination.go index e186d0b..e35b689 100644 --- a/cmd/internals/customDestination.go +++ b/cmd/internals/customDestination.go @@ -1,5 +1,10 @@ package client +// +// customDestination.go +// Copyright 2020 Abraham Leal +// + import ( "log" "reflect" @@ -72,7 +77,7 @@ func RunCustomDestinationBatch(srcClient *SchemaRegistryClient, customDest Custo return } for _, v := range srcVersions { - schema := srcClient.GetSchema(srcSubject, v) + schema := srcClient.GetSchema(srcSubject, v, false) log.Printf("Registering schema: %s with version: %d and ID: %d and Type: %s", schema.Subject, schema.Version, schema.Id, schema.SType) err := customDest.RegisterSchema(schema) @@ -86,7 +91,7 @@ func customDestSync(diff map[string][]int64, srcClient *SchemaRegistryClient, cu log.Println("Source registry has values that Destination does not, syncing...") for subject, versions := range diff { for _, v := range versions { - schema := srcClient.GetSchema(subject, v) + schema := srcClient.GetSchema(subject, v, false) log.Println("Registering new schema: " + schema.Subject + " with version: " + strconv.FormatInt(schema.Version, 10) + " and ID: " + strconv.FormatInt(schema.Id, 10) + @@ -104,7 +109,7 @@ func customDestSyncDeletes(destSubjects map[string][]int64, srcSubjects map[stri log.Println("Source registry has deletes that Destination does not, syncing...") for subject, versions := range diff { for _, v := range versions { - schema := srcClient.GetSchema(subject, v) + schema := srcClient.GetSchema(subject, v, false) err := customDest.DeleteSchema(schema) checkCouldNotRegister(err) } diff --git a/cmd/internals/customDestination_test.go b/cmd/internals/customDestination_test.go index 3bc52a4..853e7be 100644 --- a/cmd/internals/customDestination_test.go +++ b/cmd/internals/customDestination_test.go @@ -1,5 +1,10 @@ package client +// +// customDestination_test.go +// Copyright 2020 Abraham Leal +// + import ( "github.com/stretchr/testify/assert" "log" diff --git a/cmd/internals/customSource.go b/cmd/internals/customSource.go index fb52dd4..30f97aa 100644 --- a/cmd/internals/customSource.go +++ b/cmd/internals/customSource.go @@ -1,5 +1,10 @@ package client +// +// customSource.go +// Copyright 2020 Abraham Leal +// + import ( "encoding/json" "fmt" diff --git a/cmd/internals/customSource_test.go b/cmd/internals/customSource_test.go index f5dd824..cd5f27d 100644 --- a/cmd/internals/customSource_test.go +++ b/cmd/internals/customSource_test.go @@ -1,5 +1,11 @@ package client +// +// customSource_test.go +// Copyright 2020 Abraham Leal +// + + import ( "github.com/stretchr/testify/assert" "log" diff --git a/cmd/internals/definitions.go b/cmd/internals/definitions.go index 0c73ef7..f0f8a54 100644 --- a/cmd/internals/definitions.go +++ b/cmd/internals/definitions.go @@ -2,7 +2,7 @@ package client // // definitions.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -18,8 +18,6 @@ type SchemaRegistryClient struct { SRUrl string SRApiKey string SRApiSecret string - InMemSchemas map[string][]int64 - srcInMemDeletedIDs map[int64]map[string]int64 } /* @@ -56,6 +54,7 @@ type CustomSource interface { TearDown() error } +// Holding struct that describes a schema record type SchemaRecord struct { Subject string `json:"subject"` Schema string `json:"schema"` @@ -73,6 +72,7 @@ func (srs SchemaRecord) setTypeIfEmpty() SchemaRecord { return srs } +// Holding struct for registering a schema in an SR compliant way type SchemaToRegister struct { Schema string `json:"schema"` Id int64 `json:"id,omitempty"` @@ -80,6 +80,7 @@ type SchemaToRegister struct { SType string `json:"schemaType"` } +// Holding struct for retrieving a schema type SchemaExtraction struct { Schema string `json:"schema"` Id int64 `json:"id"` @@ -113,9 +114,9 @@ func (i *StringArrayFlag) String() string { func (i *StringArrayFlag) Set(value string) error { currentPath, _ := os.Getwd() + path := CheckPath(value, currentPath) - if strings.LastIndexAny(value, "/.") != -1 { - path := CheckPath(value, currentPath) + if fileExists(path) { f, err := ioutil.ReadFile(path) if err != nil { panic(err) @@ -144,8 +145,3 @@ func (i *StringArrayFlag) removeSpaces(str string) string { return r }, str) } - -type idSubjectVersion struct { - Id int64 `json:"Id"` - SubjectAndVersion map[string]int64 `json:"SubjectAndVersion"` -} diff --git a/cmd/internals/exportSchemas.go b/cmd/internals/exportSchemas.go index 3839aba..bc2ece6 100644 --- a/cmd/internals/exportSchemas.go +++ b/cmd/internals/exportSchemas.go @@ -2,7 +2,7 @@ package client // // exportSchemas.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -12,9 +12,12 @@ import ( func BatchExport(srcClient *SchemaRegistryClient, destClient *SchemaRegistryClient) { listenForInterruption() - srcChan := make(chan map[string][]int64) - go srcClient.GetSubjectsWithVersions(srcChan) - srcSubjects := <-srcChan + srcSubjects := GetCurrentSubjectState(srcClient) + + // Set up soft Deleted IDs in destination for interpretation by the destination registry + if SyncDeletes { + syncExistingSoftDeletedSubjects(srcClient,destClient) + } log.Println("Registering all schemas from " + srcClient.SRUrl) for srcSubject, srcVersions := range srcSubjects { @@ -22,7 +25,7 @@ func BatchExport(srcClient *SchemaRegistryClient, destClient *SchemaRegistryClie return } for _, v := range srcVersions { - schema := srcClient.GetSchema(srcSubject, v) + schema := srcClient.GetSchema(srcSubject, v, false) log.Printf("Registering schema: %s with version: %d and ID: %d and Type: %s", schema.Subject, schema.Version, schema.Id, schema.SType) destClient.RegisterSchemaBySubjectAndIDAndVersion(schema.Schema, diff --git a/cmd/internals/getInfo.go b/cmd/internals/getInfo.go index ff68b41..0b950bb 100644 --- a/cmd/internals/getInfo.go +++ b/cmd/internals/getInfo.go @@ -2,7 +2,7 @@ package client // // getInfo.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( diff --git a/cmd/internals/helpers.go b/cmd/internals/helpers.go index 916a467..dbbb51f 100644 --- a/cmd/internals/helpers.go +++ b/cmd/internals/helpers.go @@ -2,7 +2,7 @@ package client // // helpers.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -36,6 +36,23 @@ func printVersion() { fmt.Printf("ccloud-schema-exporter: %s\n", Version) } +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} + +func isInSlice(i int64, list []int64) bool { + for _, current := range list { + if current == i { + return true + } + } + return false +} + // Returns an HTTP request with the given information to execute func GetNewRequest(method string, endpoint string, key string, secret string, headers map[string]string, reader io.Reader) *http.Request { req, err := http.NewRequest(method, endpoint, reader) @@ -174,7 +191,7 @@ func filterListedSubjectsVersions(response []SubjectVersion) []SubjectVersion { } // Filters the provided map of [ID]:[Subject:Version] according to what is provided in AllowList and DisallowList -func filterIDs(candidate map[int64]map[string]int64) map[int64]map[string]int64 { +func filterIDs(candidate map[int64]map[string][]int64) map[int64]map[string][]int64 { for id, subjects := range candidate { // Filter out for allow lists for sbj, _ := range subjects { @@ -265,23 +282,49 @@ func GetVersionsDiff(a1 []int64, a2 []int64) []int64 { // Returns the difference between the provided maps of [ID][Subject:Version] // The difference will be what is contained in the left map that is not contained in the right map -func getIDDiff(m1 map[int64]map[string]int64, m2 map[int64]map[string]int64) map[int64]map[string]int64 { - diffMap := map[int64]map[string]int64{} - - for idDest, subjectValDestMap := range m2 { // Iterate through destination id -> (subject->version) mapping - subjValSrcMap, idExistsSrc := m1[idDest] // Check if source has this mapping, if it does, retrieve it - if !idExistsSrc { // if the source does NOT have this mapping - diffMap[idDest] = subjectValDestMap // This whole mapping gets added to the map of things to be deleted - } else { // if the source DOES have the ID - toDelete := map[string]int64{} // Holder for schema/version references to delete - for subDest, verDest := range subjectValDestMap { // iterate through subject/versions for current id - _, verSrcExists := subjValSrcMap[subDest] // check if they exist in source - if !verSrcExists { // if not exists - toDelete[subDest] = verDest // Add to holder for queueing for deletion +func GetIDDiff(m1 map[int64]map[string][]int64, m2 map[int64]map[string][]int64) map[int64]map[string][]int64 { + diffMap := map[int64]map[string][]int64{} + + for idLeft, subjectVersionsLeftMap := range m1 { // Iterate through the left id -> (subject->version) mapping + subjVersionsRightMap, idExistsRight := m2[idLeft] // Check if right has this mapping, if it does, retrieve it + if !idExistsRight { // if the right does NOT have this mapping + diffMap[idLeft] = subjectVersionsLeftMap // This whole mapping gets added to the map of things to be deleted + } else { // if the right DOES have the ID + toDelete := map[int64]map[string][]int64{} // Holder for schema/version references to delete + for subjectLeft, versionsLeft := range subjectVersionsLeftMap { // iterate through subject/versions for current id + subjectRightVersions, subjectExistsRight := subjVersionsRightMap[subjectLeft] + if subjectExistsRight { + for _, singleVersionLeft := range versionsLeft { // Iterate through versions on left + if !isInSlice(singleVersionLeft, subjectRightVersions) { // if not exists on right + _, idInQueue := toDelete[idLeft] + if idInQueue { + _, subjectInQueue := toDelete[idLeft][subjectLeft] + if subjectInQueue { + toDelete[idLeft][subjectLeft] = append(toDelete[idLeft][subjectLeft],singleVersionLeft) // Add to holder for queueing for deletion + } else { + tmpIDContents := toDelete[idLeft] + tmpIDContents[subjectLeft] = []int64{singleVersionLeft} + toDelete[idLeft] = tmpIDContents + } + } else { + tempMap := map[string][]int64{subjectLeft:{singleVersionLeft}} + toDelete[idLeft] = tempMap + } + } + } + } else { + _, idInQueue := toDelete[idLeft] + if idInQueue { + tempMap := toDelete[idLeft] + tempMap[subjectLeft] = versionsLeft + toDelete[idLeft] = tempMap + } else { + toDelete[idLeft] = map[string][]int64{subjectLeft : versionsLeft} + } } } if len(toDelete) != 0 { - diffMap[idDest] = toDelete // Add deletion queue to diffMap + diffMap[idLeft] = toDelete[idLeft] // Add deletion queue to diffMap } } } diff --git a/cmd/internals/helpers_test.go b/cmd/internals/helpers_test.go new file mode 100644 index 0000000..5505e7e --- /dev/null +++ b/cmd/internals/helpers_test.go @@ -0,0 +1,193 @@ +package client + +// +// helpers_test.go +// Copyright 2020 Abraham Leal +// + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func TestMainStackHelpers(t *testing.T) { + t.Run("TisInSlice", func(t *testing.T) { TisInSlice(t) }) + t.Run("TcheckSubjectIsAllowed", func(t *testing.T) { TcheckSubjectIsAllowed(t) }) + t.Run("TfilterListedSubjects", func(t *testing.T) { TfilterListedSubjects(t) }) + t.Run("TfilterListedSubjectsVersions", func(t *testing.T) { TfilterListedSubjectsVersions(t) }) + t.Run("TfilterIDs", func(t *testing.T) { TfilterIDs(t) }) + t.Run("TGetSubjectDiff", func(t *testing.T) { TGetSubjectDiff(t) }) + t.Run("TGetVersionsDiff", func(t *testing.T) { TGetVersionsDiff(t) }) + t.Run("TGetIDDiff", func(t *testing.T) { TGetIDDiff(t) }) +} + +func TisInSlice (t *testing.T) { + sliceOne := []int64{1,2,3,4,5} + + assert.True(t, isInSlice(3, sliceOne)) + assert.True(t, !isInSlice(6, sliceOne)) +} + +func TcheckSubjectIsAllowed (t *testing.T) { + AllowList = map[string]bool{ + "testingSubjectKey": true, + "SomeOtherValue": true, + } + + DisallowList = map[string]bool{ + "someValuePartTwo": true, + "SomeOtherValue": true, + } + + assert.True(t, checkSubjectIsAllowed("testingSubjectKey")) + assert.True(t, !checkSubjectIsAllowed("SomeOtherValue")) + + AllowList = nil + DisallowList = nil + +} + + +func TfilterListedSubjects (t *testing.T) { + + AllowList = map[string]bool{ + "testingSubjectKey": true, + "SomeOtherValue": true, + } + + DisallowList = map[string]bool{ + "someValuePartTwo": true, + "SomeOtherValue": true, + } + + toFiler := []string{ + "someValuePartTwo", + "testingSubjectKey", + "SomeOtherValue", + } + + result := filterListedSubjects(toFiler) + + expected := map[string]bool{"testingSubjectKey": true} + + AllowList = nil + DisallowList = nil + + assert.True(t, reflect.DeepEqual(result,expected)) +} + +func TfilterListedSubjectsVersions (t *testing.T) { + AllowList = map[string]bool{ + "testingSubjectKey": true, + "SomeOtherValue": true, + } + + DisallowList = map[string]bool{ + "someValuePartTwo": true, + "SomeOtherValue": true, + } + + toFilter := []SubjectVersion{ + {Subject: "testingSubjectKey", Version: 1}, + {Subject: "someValuePartTwo", Version: 1}, + {Subject: "SomeOtherValue", Version: 1}, + } + + result := filterListedSubjectsVersions(toFilter) + + expected := []SubjectVersion{ + {Subject: "testingSubjectKey", Version: 1}, + } + + AllowList = nil + DisallowList = nil + + assert.True(t, reflect.DeepEqual(result,expected)) + +} + +func TfilterIDs (t *testing.T) { + + AllowList = map[string]bool{ + "testingSubjectKey": true, + "SomeOtherValue" : true, + } + + DisallowList = map[string]bool{ + "someValuePartTwo": true, + "SomeOtherValue" : true, + } + + toFilter := map[int64]map[string][]int64{ + 1001: {"testingSubjectKey" : {1,2,3}, "someValuePartTwo" : {1,2}}, + 5000: {"SomeOtherValue" : {1,2}}, + } + + result := filterIDs(toFilter) + + expected := map[int64]map[string][]int64{ + 1001: {"testingSubjectKey" : {1,2,3}}, + } + + AllowList = nil + DisallowList = nil + + assert.True(t, reflect.DeepEqual(result,expected)) +} + +func TGetSubjectDiff (t *testing.T) { + subjectMapOne := map[string][]int64{ + "someValue" : {1,2,3}, + "SomeOtherValue" : {1,2}, + } + + subjectMapTwo := map[string][]int64{ + "someValue" : {1,3}, + "SomeOtherValue" : {2}, + } + + result := GetSubjectDiff(subjectMapOne,subjectMapTwo) + + expected := map[string][]int64{ + "someValue" : {2}, + "SomeOtherValue" : {1}, + } + + // Assert values that are contained in left but not right are returned + assert.True(t,reflect.DeepEqual(expected,result)) +} + +func TGetVersionsDiff(t *testing.T) { + versionArrayOne := []int64{1,2,3} + versionArrayTwo := []int64{1,3} + + result := GetVersionsDiff(versionArrayOne,versionArrayTwo) + + expected := []int64{2} + + // Assert values that are contained in left but not right are returned + assert.Equal(t,expected,result) +} + +func TGetIDDiff (t *testing.T) { + + idMapOne := map[int64]map[string][]int64{ + 1001: {"someValue" : {1,2,3}, "someValuePartTwo" : {1,2}}, + 5000: {"SomeOtherValue" : {1,2}}, + } + + idMapTwo := map[int64]map[string][]int64{ + 1001: {"someValue" : {1,3}, "someValuePartTwo" : {1}}, + 5000: {"SomeOtherValue" : {2}}, + } + + results := GetIDDiff(idMapOne, idMapTwo) + + expected := map[int64]map[string][]int64{ + 1001: {"someValue" : {2}, "someValuePartTwo" : {2}}, + 5000: {"SomeOtherValue" : {1}}, + } + + assert.True(t, reflect.DeepEqual(expected,results)) +} diff --git a/cmd/internals/localFSFunctions.go b/cmd/internals/localFSFunctions.go index 1fb0aac..73306b7 100644 --- a/cmd/internals/localFSFunctions.go +++ b/cmd/internals/localFSFunctions.go @@ -2,7 +2,7 @@ package client // // writeToLocal.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -21,9 +21,7 @@ func WriteToFS(srcClient *SchemaRegistryClient, definedPath string, workingDirec definedPath = CheckPath(definedPath, workingDirectory) - srcChan := make(chan map[string][]int64) - go srcClient.GetSubjectsWithVersions(srcChan) - srcSubjects := <-srcChan + srcSubjects := GetCurrentSubjectState(srcClient) var aGroup sync.WaitGroup log.Printf("Writing all schemas from %s to path %s", srcClient.SRUrl, definedPath) @@ -64,11 +62,12 @@ func writeSchemaToSR(dstClient *SchemaRegistryClient, filepath string) { return } id, version, subject, stype := parseFileName(filepath) - checkSubjectIsAllowed(subject) - rawSchema, err := ioutil.ReadFile(filepath) - check(err) - log.Printf("Registering Schema with Subject: %s. Version: %v, and ID: %v", subject, version, id) - dstClient.RegisterSchemaBySubjectAndIDAndVersion(string(rawSchema), subject, id, version, stype) + if checkSubjectIsAllowed(subject) { + rawSchema, err := ioutil.ReadFile(filepath) + check(err) + log.Printf("Registering Schema with Subject: %s. Version: %v, and ID: %v", subject, version, id) + dstClient.RegisterSchemaBySubjectAndIDAndVersion(string(rawSchema), subject, id, version, stype) + } } // Returns schema metadata for a file given its path @@ -94,7 +93,7 @@ func parseFileName(filepath string) (int64, int64, string, string) { // Writes the provided schema in the given path func writeSchemaLocally(srcClient *SchemaRegistryClient, pathToWrite string, subject string, version int64, wg *sync.WaitGroup) { - rawSchema := srcClient.GetSchema(subject, version) + rawSchema := srcClient.GetSchema(subject, version, false) defer wg.Done() if CancelRun == true { return diff --git a/cmd/internals/localFSFunctions_test.go b/cmd/internals/localFSFunctions_test.go index ded40c7..dca18ad 100644 --- a/cmd/internals/localFSFunctions_test.go +++ b/cmd/internals/localFSFunctions_test.go @@ -1,5 +1,10 @@ package client +// +// localFSFunctions_test.go +// Copyright 2020 Abraham Leal +// + import ( "github.com/stretchr/testify/assert" "testing" diff --git a/cmd/internals/meta.go b/cmd/internals/meta.go index 51d9bd8..4253dd8 100644 --- a/cmd/internals/meta.go +++ b/cmd/internals/meta.go @@ -2,7 +2,7 @@ package client // // meta.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -11,7 +11,7 @@ import ( var HttpCallTimeout int var ScrapeInterval int -var Version = "1.1-SNAPSHOT" +var Version = "1.1" var httpClient http.Client var SrcSRUrl string diff --git a/cmd/internals/schema-registry-light-client.go b/cmd/internals/schema-registry-light-client.go index c0f0d67..2f0273b 100644 --- a/cmd/internals/schema-registry-light-client.go +++ b/cmd/internals/schema-registry-light-client.go @@ -2,7 +2,7 @@ package client // // schema-registry-light-client.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -33,14 +33,14 @@ func NewSchemaRegistryClient(SR string, apiKey string, apiSecret string, target // If the parameters are empty, go fetch from env if SR == "" || apiKey == "" || apiSecret == "" { if target == "dst" { - client = SchemaRegistryClient{SRUrl: DestGetSRUrl(), SRApiKey: DestGetAPIKey(), SRApiSecret: DestGetAPISecret(), InMemSchemas: map[string][]int64{}, srcInMemDeletedIDs: map[int64]map[string]int64{}} + client = SchemaRegistryClient{SRUrl: DestGetSRUrl(), SRApiKey: DestGetAPIKey(), SRApiSecret: DestGetAPISecret()} } if target == "src" { - client = SchemaRegistryClient{SRUrl: SrcGetSRUrl(), SRApiKey: SrcGetAPIKey(), SRApiSecret: SrcGetAPISecret(), InMemSchemas: map[string][]int64{}, srcInMemDeletedIDs: map[int64]map[string]int64{}} + client = SchemaRegistryClient{SRUrl: SrcGetSRUrl(), SRApiKey: SrcGetAPIKey(), SRApiSecret: SrcGetAPISecret()} } } else { // Enables passing in the vars through flags - client = SchemaRegistryClient{SRUrl: SR, SRApiKey: apiKey, SRApiSecret: apiSecret, InMemSchemas: map[string][]int64{}, srcInMemDeletedIDs: map[int64]map[string]int64{}} + client = SchemaRegistryClient{SRUrl: SR, SRApiKey: apiKey, SRApiSecret: apiSecret} } httpClient = http.Client{ @@ -69,7 +69,6 @@ func (src *SchemaRegistryClient) IsReachable() bool { // Returns all non-deleted (soft or hard deletions) subjects with their versions in the form of a map. func (src *SchemaRegistryClient) GetSubjectsWithVersions(chanY chan<- map[string][]int64) { - src.InMemSchemas = make(map[string][]int64) endpoint := fmt.Sprintf("%s/subjects", src.SRUrl) req := GetNewRequest("GET", endpoint, src.SRApiKey, src.SRApiSecret, nil, nil) @@ -121,12 +120,13 @@ func (src *SchemaRegistryClient) GetSubjectsWithVersions(chanY chan<- map[string }() //Collect SubjectWithVersions + tmpSchemaMap := make(map[string][]int64) for item := range aChan { - src.InMemSchemas[item.Subject] = item.Versions + tmpSchemaMap[item.Subject] = item.Versions } // Send back to main thread - chanY <- src.InMemSchemas + chanY <- tmpSchemaMap } // Returns all non-deleted versions (soft or hard deleted) that exist for a given subject. @@ -249,8 +249,11 @@ func (src *SchemaRegistryClient) SetMode(modeToSet Mode) bool { } // Returns a SchemaRecord for the given subject and version by querying the backing Schema Registry -func (src *SchemaRegistryClient) GetSchema(subject string, version int64) SchemaRecord { +func (src *SchemaRegistryClient) GetSchema(subject string, version int64, deleted bool) SchemaRecord { endpoint := fmt.Sprintf("%s/subjects/%s/versions/%d", src.SRUrl, subject, version) + if deleted { + endpoint = fmt.Sprintf("%s/subjects/%s/versions/%d?deleted=true", src.SRUrl, subject, version) + } req := GetNewRequest("GET", endpoint, src.SRApiKey, src.SRApiSecret, nil, nil) res, err := httpClient.Do(req) @@ -367,38 +370,24 @@ func (src *SchemaRegistryClient) PerformHardDelete(subject string, version int64 return handleDeletesHTTPResponse(res.Body, res.StatusCode, req.Method, endpoint, "Hard", subject, version) } -// Returns a map with the [ID][Subject:Version] state of the backing Schema Registry for only the soft deleted SubjectVersions -func (src *SchemaRegistryClient) GetSoftDeletedIDs() map[int64]map[string]int64 { - src.srcInMemDeletedIDs = map[int64]map[string]int64{} // Map of ID -> (Map of subject -> Version) +// Returns a map with the [ID][Subject:Versions] state of the backing Schema Registry for only the soft deleted SubjectVersions +func (src *SchemaRegistryClient) GetSoftDeletedIDs() map[int64]map[string][]int64 { responseWithDeletes := src.getSchemaList(true) responseWithOutDeletes := src.getSchemaList(false) - listHolder := make(map[int64]map[string]int64) - for id, data := range responseWithDeletes { - _, contains := responseWithOutDeletes[id] - if !contains { - holder, seenBefore := listHolder[id] - if seenBefore { - holder[data.Subject] = data.Version - listHolder[id] = holder - } else { - newIdHolder := make(map[string]int64) - newIdHolder[data.Subject] = data.Version - listHolder[id] = newIdHolder - } - } - } - - src.srcInMemDeletedIDs = filterIDs(listHolder) - return src.srcInMemDeletedIDs + diff := GetIDDiff(responseWithDeletes, responseWithOutDeletes) + return filterIDs(diff) } // Returns a dump of all Schemas mapped to their IDs from the backing Schema Registry // The parameter specifies whether to show soft deleted schemas as well -func (src SchemaRegistryClient) getSchemaList(deleted bool) map[int64]SchemaExtraction { - +func (src *SchemaRegistryClient) getSchemaList(deleted bool) map[int64]map[string][]int64 { endpoint := fmt.Sprintf("%s/schemas?deleted=%v", src.SRUrl, deleted) + if !deleted { + endpoint = fmt.Sprintf("%s/schemas", src.SRUrl) + } + req := GetNewRequest("GET", endpoint, src.SRApiKey, src.SRApiSecret, nil, nil) res, err := httpClient.Do(req) if err != nil { @@ -418,10 +407,23 @@ func (src SchemaRegistryClient) getSchemaList(deleted bool) map[int64]SchemaExtr log.Printf(err.Error()) } - responseMap := make(map[int64]SchemaExtraction) + responseMap := make(map[int64]map[string][]int64) for _, schema := range response { - responseMap[schema.Id] = schema + currentStateOfID, haveSeenIDBefore := responseMap[schema.Id] + if haveSeenIDBefore { + _, haveSeenSubject := currentStateOfID[schema.Subject] + if haveSeenSubject { + responseMap[schema.Id][schema.Subject] = append(responseMap[schema.Id][schema.Subject],schema.Version) + } else { + tempMap := responseMap[schema.Id] + tempMap[schema.Subject] = []int64{schema.Version} + responseMap[schema.Id] = tempMap + } + } else { + responseMap[schema.Id] = map[string][]int64{schema.Subject : {schema.Version}} + } + } return responseMap diff --git a/cmd/internals/schema-registry-light-client_test.go b/cmd/internals/schema-registry-light-client_test.go index 2f65059..e72e986 100644 --- a/cmd/internals/schema-registry-light-client_test.go +++ b/cmd/internals/schema-registry-light-client_test.go @@ -1,8 +1,8 @@ package client // -// schema-registry-light-client-test.go -// Author: Abraham Leal +// schema-registry-light-client_test.go +// Copyright 2020 Abraham Leal // import ( @@ -41,6 +41,7 @@ func TestMainStack(t *testing.T) { t.Run("TFilterListedSubjectsVersions", func(t *testing.T) { TFilterListedSubjectsVersions(t) }) t.Run("TPerformSoftDelete", func(t *testing.T) { TPerformSoftDelete(t) }) t.Run("TPerformHardDelete", func(t *testing.T) { TPerformHardDelete(t) }) + t.Run("TGetSoftDeletedIds", func(t *testing.T) { TPerformHardDelete(t) }) t.Run("TDeleteAllSubjectsPermanently", func(t *testing.T) { TDeleteAllSubjectsPermanently(t) }) tearDown() } @@ -144,13 +145,13 @@ func TGetVersions(t *testing.T) { } func TGetSchema(t *testing.T) { - record := testClient.GetSchema(testingSubject, 1) + record := testClient.GetSchema(testingSubject, 1,false) assert.Equal(t, mockSchema, record.Schema) } func TRegisterSchemaBySubjectAndIDAndVersion(t *testing.T) { testClient.RegisterSchemaBySubjectAndIDAndVersion(mockSchema, newSubject, 10001, 1, "AVRO") - record := testClient.GetSchema(newSubject, 1) + record := testClient.GetSchema(newSubject, 1,false) assert.Equal(t, mockSchema, record.Schema) testClient.PerformSoftDelete(newSubject, 1) @@ -170,9 +171,9 @@ func TGetSoftDeletedIDs(t *testing.T) { func TFilterIDs(t *testing.T) { - myIDs := map[int64]map[string]int64{ - 10001: {testingSubject: 1}, - 10002: {newSubject: 1}, + myIDs := map[int64]map[string][]int64{ + 10001: {testingSubject: []int64{1}}, + 10002: {newSubject: []int64{1}}, } // Test Allow lists @@ -181,8 +182,8 @@ func TFilterIDs(t *testing.T) { } DisallowList = nil - expected := map[int64]map[string]int64{ - 10002: {newSubject: 1}, + expected := map[int64]map[string][]int64{ + 10002: {newSubject: []int64{1}}, } filtered := filterIDs(myIDs) @@ -195,13 +196,13 @@ func TFilterIDs(t *testing.T) { newSubject: true, } - myIDs = map[int64]map[string]int64{ - 10001: {testingSubject: 1}, - 10002: {newSubject: 1}, + myIDs = map[int64]map[string][]int64{ + 10001: {testingSubject: []int64{1}}, + 10002: {newSubject: []int64{1}}, } - expected = map[int64]map[string]int64{ - 10001: {testingSubject: 1}, + expected = map[int64]map[string][]int64{ + 10001: {testingSubject: []int64{1}}, } filtered = filterIDs(myIDs) @@ -209,11 +210,11 @@ func TFilterIDs(t *testing.T) { assert.Equal(t, expected, filtered) // Test Both - myIDs = map[int64]map[string]int64{ - 10001: {testingSubject: 1}, - 10002: {newSubject: 1}, - 10003: {"hello": 1}, - 10004: {"IAmSubject": 1}, + myIDs = map[int64]map[string][]int64{ + 10001: {testingSubject: []int64{1}}, + 10002: {newSubject: []int64{1}}, + 10003: {"hello": []int64{1}}, + 10004: {"IAmSubject": []int64{1}}, } AllowList = StringArrayFlag{ newSubject: true, @@ -225,9 +226,9 @@ func TFilterIDs(t *testing.T) { } // Expect hello to be disallowed - expected = map[int64]map[string]int64{ - 10001: {testingSubject: 1}, - 10002: {newSubject: 1}, + expected = map[int64]map[string][]int64{ + 10001: {testingSubject: []int64{1}}, + 10002: {newSubject: []int64{1}}, } filtered = filterIDs(myIDs) @@ -341,7 +342,7 @@ func TPerformSoftDelete(t *testing.T) { //Soft delete it testClient.PerformSoftDelete(testingSubject, 1) //Check for it - checkIfSchemaRegistered := testClient.GetSchema(testingSubject, 1) + checkIfSchemaRegistered := testClient.GetSchema(testingSubject, 1, false) assert.Equal(t, "", checkIfSchemaRegistered.Schema) } diff --git a/cmd/internals/syncSchemas.go b/cmd/internals/syncSchemas.go index f1386cc..408ec59 100644 --- a/cmd/internals/syncSchemas.go +++ b/cmd/internals/syncSchemas.go @@ -2,7 +2,7 @@ package client // // syncSchemas.go -// Author: Abraham Leal +// Copyright 2020 Abraham Leal // import ( @@ -16,6 +16,11 @@ func Sync(srcClient *SchemaRegistryClient, destClient *SchemaRegistryClient) { listenForInterruption() + // Set up soft Deleted IDs in destination for interpretation by the destination registry + if SyncDeletes { + syncExistingSoftDeletedSubjects(srcClient,destClient) + } + //Begin sync for { if CancelRun == true { @@ -53,7 +58,7 @@ func initialSync(diff map[string][]int64, srcClient *SchemaRegistryClient, destC log.Println("Source registry has values that Destination does not, syncing...") for subject, versions := range diff { for _, v := range versions { - schema := srcClient.GetSchema(subject, v) + schema := srcClient.GetSchema(subject, v, false) log.Println("Registering new schema: " + schema.Subject + " with version: " + strconv.FormatInt(schema.Version, 10) + " and ID: " + strconv.FormatInt(schema.Id, 10) + @@ -81,13 +86,32 @@ func syncSoftDeletes(destSubjects map[string][]int64, srcSubjects map[string][]i } func syncHardDeletes(srcClient *SchemaRegistryClient, destClient *SchemaRegistryClient) { - permDel := getIDDiff(srcClient.GetSoftDeletedIDs(), destClient.GetSoftDeletedIDs()) + permDel := GetIDDiff(destClient.GetSoftDeletedIDs(),srcClient.GetSoftDeletedIDs()) if len(permDel) != 0 { - for id, subjectVersionMap := range permDel { - for subject, version := range subjectVersionMap { - log.Printf("Discovered Hard Deleted Schema with ID %d, Subject %s, and Version: %d", - id, subject, version) - destClient.PerformHardDelete(subject, version) + for id, subjectVersionsMap := range permDel { + for subject, versions := range subjectVersionsMap { + for _, version := range versions { + log.Printf("Discovered Hard Deleted Schema with ID %d, Subject %s, and Version: %d", + id, subject, version) + destClient.PerformHardDelete(subject, version) + } + } + } + } +} + +func syncExistingSoftDeletedSubjects (srcClient *SchemaRegistryClient, destClient *SchemaRegistryClient) { + softDel := GetIDDiff(srcClient.GetSoftDeletedIDs(),destClient.GetSoftDeletedIDs()) + if len(softDel) != 0 { + log.Println("There are soft Deleted IDs in the source. Sinking to the destination at startup...") + for _ , meta := range softDel { + for sbj, versions := range meta { + for _, version := range versions { + softDeletedSchema := srcClient.GetSchema(sbj,version,true) + destClient.RegisterSchemaBySubjectAndIDAndVersion(softDeletedSchema.Schema, + softDeletedSchema.Subject,softDeletedSchema.Id,softDeletedSchema.Version,softDeletedSchema.SType) + destClient.PerformSoftDelete(softDeletedSchema.Subject,softDeletedSchema.Version) + } } } }