Skip to content

Commit

Permalink
Squash types, make sure mismatchType is always there
Browse files Browse the repository at this point in the history
  • Loading branch information
driv3r committed Sep 10, 2024
1 parent 3045341 commit abf8691
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 89 deletions.
117 changes: 66 additions & 51 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package ghostferry
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -233,11 +235,23 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore {
return s.store.Copy()
}

type mismatchType string

const (
MismatchColumnMissingOnSource mismatchType = "column missing on source"
MismatchColumnMissingOnTarget mismatchType = "column missing on target"
MismatchRowMissingOnSource mismatchType = "row missing on source"
MismatchRowMissingOnTarget mismatchType = "row missing on target"
MismatchContentDifference mismatchType = "content difference"
MismatchChecksumDifference mismatchType = "rows checksum difference"
)

type InlineVerifierMismatches struct {
Pk uint64
SourceChecksum string
TargetChecksum string
Mismatch mismatch
MismatchColumn string
MismatchType mismatchType
}

type InlineVerifier struct {
Expand Down Expand Up @@ -452,18 +466,23 @@ func formatMismatches(mismatches map[string]map[string][]InlineVerifierMismatche
incorrectTables = append(incorrectTables, tableNameWithSchema)

messageBuf.WriteString(tableNameWithSchema)
messageBuf.WriteString(" [paginationKeys: ")
messageBuf.WriteString(" [PKs: ")
for _, mismatch := range mismatches[schemaName][tableName] {
messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10))
messageBuf.WriteString(" (source: ")
messageBuf.WriteString(mismatch.SourceChecksum)
messageBuf.WriteString(", target: ")
messageBuf.WriteString(mismatch.TargetChecksum)
messageBuf.WriteString(", type: ")
messageBuf.WriteString(string(mismatch.Mismatch.mismatchType))
if mismatch.Mismatch.column != "" {
messageBuf.WriteString(" (type: ")
messageBuf.WriteString(string(mismatch.MismatchType))
if mismatch.SourceChecksum != "" {
messageBuf.WriteString(", source: ")
messageBuf.WriteString(mismatch.SourceChecksum)
}
if mismatch.TargetChecksum != "" {
messageBuf.WriteString(", target: ")
messageBuf.WriteString(mismatch.TargetChecksum)
}

if mismatch.MismatchColumn != "" {
messageBuf.WriteString(", column: ")
messageBuf.WriteString(mismatch.Mismatch.column)
messageBuf.WriteString(mismatch.MismatchColumn)
}

messageBuf.WriteString(") ")
Expand Down Expand Up @@ -592,69 +611,68 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin

for paginationKey, targetHash := range target {
sourceHash, exists := source[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
if !exists {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchRowMissingOnSource,
}
} else if !bytes.Equal(sourceHash, targetHash) {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchChecksumDifference,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
}
}
}

for paginationKey, sourceHash := range source {
targetHash, exists := target[paginationKey]
if !bytes.Equal(sourceHash, targetHash) || !exists {
for paginationKey, _ := range source {
_, exists := target[paginationKey]
if !exists {
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: string(sourceHash),
TargetChecksum: string(targetHash),
MismatchType: MismatchRowMissingOnTarget,
}
}
}

return mismatchSet
}

type mismatchType string

const (
MismatchColumnMissingOnSource mismatchType = "column missing on source"
MismatchColumnMissingOnTarget mismatchType = "column missing on target"
MismatchRowMissingOnSource mismatchType = "row missing on source"
MismatchRowMissingOnTarget mismatchType = "row missing on target"
MismatchContentDifference mismatchType = "content difference"
)

type mismatch struct {
column string
mismatchType mismatchType
}

func compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]mismatch {
mismatchSet := map[uint64]mismatch{}
func compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]InlineVerifierMismatches {
mismatchSet := map[uint64]InlineVerifierMismatches{}

for paginationKey, targetDecompressedColumns := range target {
sourceDecompressedColumns, exists := source[paginationKey]
if !exists {
// row missing on source
mismatchSet[paginationKey] = mismatch{
mismatchType: MismatchRowMissingOnSource,
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchRowMissingOnSource,
}
continue
}

for colName, targetData := range targetDecompressedColumns {
sourceData, exists := sourceDecompressedColumns[colName]
if !exists {
mismatchSet[paginationKey] = mismatch{
column: colName,
mismatchType: MismatchColumnMissingOnSource,
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchColumnMissingOnSource,
MismatchColumn: colName,
}
break // no need to compare other columns
} else if !bytes.Equal(sourceData, targetData) {
mismatchSet[paginationKey] = mismatch{
column: colName,
mismatchType: MismatchContentDifference,
sourceChecksum := md5.Sum(sourceData)
targetChecksum := md5.Sum(targetData)

mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchContentDifference,
MismatchColumn: colName,
SourceChecksum: hex.EncodeToString(sourceChecksum[:]),
TargetChecksum: hex.EncodeToString(targetChecksum[:]),
}
break // no need to compare other columns
}
Expand All @@ -665,18 +683,20 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui
targetDecompressedColumns, exists := target[paginationKey]
if !exists {
// row missing on target
mismatchSet[paginationKey] = mismatch{
mismatchType: MismatchRowMissingOnTarget,
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchType: MismatchRowMissingOnTarget,
}
continue
}

for colName := range sourceDecompressedColumns {
_, exists := targetDecompressedColumns[colName]
if !exists {
mismatchSet[paginationKey] = mismatch{
column: colName,
mismatchType: MismatchColumnMissingOnTarget,
mismatchSet[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
MismatchColumn: colName,
MismatchType: MismatchColumnMissingOnTarget,
}
}
}
Expand All @@ -689,12 +709,7 @@ func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uin
mismatches := v.compareHashes(sourceHashes, targetHashes)
compressedMismatch := compareDecompressedData(sourceData, targetData)
for paginationKey, mismatch := range compressedMismatch {
mismatches[paginationKey] = InlineVerifierMismatches{
Pk: paginationKey,
SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here
TargetChecksum: "compressed-data-mismatch",
Mismatch: mismatch,
}
mismatches[paginationKey] = mismatch
}

mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches))
Expand Down
50 changes: 21 additions & 29 deletions inline_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestCompareDecompressedDataNoDifference(t *testing.T) {

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]mismatch{}, result)
assert.Equal(t, map[uint64]InlineVerifierMismatches{}, result)
}

func TestCompareDecompressedDataContentDifference(t *testing.T) {
Expand All @@ -29,7 +29,15 @@ func TestCompareDecompressedDataContentDifference(t *testing.T) {

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]mismatch{1: {mismatchType: MismatchContentDifference, column: "name"}}, result)
assert.Equal(t, map[uint64]InlineVerifierMismatches{
1: {
Pk: 1,
MismatchType: MismatchContentDifference,
MismatchColumn: "name",
SourceChecksum: "e356a972989f87a1531252cfa2152797",
TargetChecksum: "81b8a1b77068d06e1c8190825253066f",
},
}, result)
}

func TestCompareDecompressedDataMissingTarget(t *testing.T) {
Expand All @@ -40,7 +48,7 @@ func TestCompareDecompressedDataMissingTarget(t *testing.T) {

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]mismatch{1: {mismatchType: MismatchRowMissingOnTarget}}, result)
assert.Equal(t, map[uint64]InlineVerifierMismatches{1: {Pk: 1, MismatchType: MismatchRowMissingOnTarget}}, result)
}

func TestCompareDecompressedDataMissingSource(t *testing.T) {
Expand All @@ -51,7 +59,7 @@ func TestCompareDecompressedDataMissingSource(t *testing.T) {

result := compareDecompressedData(source, target)

assert.Equal(t, map[uint64]mismatch{3: {mismatchType: MismatchRowMissingOnSource}}, result)
assert.Equal(t, map[uint64]InlineVerifierMismatches{3: {Pk: 3, MismatchType: MismatchRowMissingOnSource}}, result)
}

func TestFormatMismatch(t *testing.T) {
Expand All @@ -60,18 +68,14 @@ func TestFormatMismatch(t *testing.T) {
"users": {
InlineVerifierMismatches{
Pk: 1,
SourceChecksum: "",
TargetChecksum: "bar",
Mismatch: mismatch{
mismatchType: MismatchRowMissingOnSource,
},
MismatchType: MismatchRowMissingOnSource,
},
},
},
}
message, tables := formatMismatches(mismatches)

assert.Equal(t, string("cutover verification failed for: default.users [paginationKeys: 1 (source: , target: bar, type: row missing on source) ] "), message)
assert.Equal(t, string("cutover verification failed for: default.users [PKs: 1 (type: row missing on source) ] "), message)
assert.Equal(t, []string{string("default.users")}, tables)
}

Expand All @@ -81,47 +85,35 @@ func TestFormatMismatches(t *testing.T) {
"users": {
InlineVerifierMismatches{
Pk: 1,
SourceChecksum: "",
TargetChecksum: "bar",
Mismatch: mismatch{
mismatchType: MismatchRowMissingOnSource,
},
MismatchType: MismatchRowMissingOnSource,
},
InlineVerifierMismatches{
Pk: 5,
SourceChecksum: "baz",
TargetChecksum: "",
Mismatch: mismatch{
mismatchType: MismatchRowMissingOnTarget,
},
MismatchType: MismatchRowMissingOnTarget,
},
},
"posts": {
InlineVerifierMismatches{
Pk: 9,
MismatchType: MismatchContentDifference,
MismatchColumn: string("title"),
SourceChecksum: "boo",
TargetChecksum: "aaa",
Mismatch: mismatch{
mismatchType: MismatchContentDifference,
column: string("title"),
},
},
},
"attachments": {
InlineVerifierMismatches{
Pk: 7,
MismatchType: MismatchContentDifference,
MismatchColumn: string("name"),
SourceChecksum: "boo",
TargetChecksum: "aaa",
Mismatch: mismatch{
mismatchType: MismatchContentDifference,
column: string("name"),
},
},
},
},
}
message, tables := formatMismatches(mismatches)

assert.Equal(t, string("cutover verification failed for: default.attachments [paginationKeys: 7 (source: boo, target: aaa, type: content difference, column: name) ] default.posts [paginationKeys: 9 (source: boo, target: aaa, type: content difference, column: title) ] default.users [paginationKeys: 1 (source: , target: bar, type: row missing on source) 5 (source: baz, target: , type: row missing on target) ] "), message)
assert.Equal(t, string("cutover verification failed for: default.attachments [PKs: 7 (type: content difference, source: boo, target: aaa, column: name) ] default.posts [PKs: 9 (type: content difference, source: boo, target: aaa, column: title) ] default.users [PKs: 1 (type: row missing on source) 5 (type: row missing on target) ] "), message)
assert.Equal(t, []string{string("default.attachments"), string("default.posts"), string("default.users")}, tables)
}
Loading

0 comments on commit abf8691

Please sign in to comment.