Skip to content

Commit

Permalink
move scheme_fingerprint_verification out of inline_verifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Manan007224 committed Jul 8, 2021
1 parent 2d0fab2 commit 908f39f
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 116 deletions.
9 changes: 7 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *InlineVerifierConfig) Validate() error {
}

if c.VerifyBinlogEventsInterval == "" {
c.VerifyBinlogEventsInterval = "50ms"
c.VerifyBinlogEventsInterval = "1s"
}

c.verifyBinlogEventsInterval, err = time.ParseDuration(c.VerifyBinlogEventsInterval)
Expand Down Expand Up @@ -763,7 +763,8 @@ type Config struct {
// NOTE:
// The ghostferry target user should have SUPER permissions to actually write to the target DB,
// if ghostferry is ran with AllowSuperUserOnReadOnly = true and the target DB is set to read_only.
AllowSuperUserOnReadOnly bool
AllowSuperUserOnReadOnly bool
PeriodicallyVerifySchemaFingerPrintInterval string
}

func (c *Config) ValidateConfig() error {
Expand Down Expand Up @@ -833,5 +834,9 @@ func (c *Config) ValidateConfig() error {
c.CutoverRetryWaitSeconds = 1
}

if len(c.PeriodicallyVerifySchemaFingerPrintInterval) == 0 {
c.PeriodicallyVerifySchemaFingerPrintInterval = "60s"
}

return nil
}
72 changes: 49 additions & 23 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Ferry struct {
DataIterator *DataIterator
BatchWriter *BatchWriter

SchemaFingerPrintVerifier *SchemaFingerPrintVerifier

StateTracker *StateTracker
ErrorHandler ErrorHandler
Throttler Throttler
Expand Down Expand Up @@ -225,31 +227,44 @@ func (f *Ferry) NewInlineVerifier() *InlineVerifier {
binlogVerifyStore = NewBinlogVerifyStore()
}

schemaFingerPrint := map[string]string{}
if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil {
schemaFingerPrint = f.StateToResumeFrom.SchemaFingerPrint
f.logger.Info("SCHEMA FINGERPRINT FOUND")
}

return &InlineVerifier{
SourceDB: f.SourceDB,
TargetDB: f.TargetDB,
DatabaseRewrites: f.Config.DatabaseRewrites,
TableRewrites: f.Config.TableRewrites,
TableSchemaCache: f.Tables,
BatchSize: f.Config.BinlogEventBatchSize,
VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval,
VerifiySchemaFingerPrintInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval,
MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime,
SourceDB: f.SourceDB,
TargetDB: f.TargetDB,
DatabaseRewrites: f.Config.DatabaseRewrites,
TableRewrites: f.Config.TableRewrites,
TableSchemaCache: f.Tables,
BatchSize: f.Config.BinlogEventBatchSize,
VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval,
MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime,

StateTracker: f.StateTracker,
ErrorHandler: f.ErrorHandler,

schemaFingerPrints: schemaFingerPrint,
reverifyStore: binlogVerifyStore,
sourceStmtCache: NewStmtCache(),
targetStmtCache: NewStmtCache(),
logger: logrus.WithField("tag", "inline-verifier"),
reverifyStore: binlogVerifyStore,
sourceStmtCache: NewStmtCache(),
targetStmtCache: NewStmtCache(),
logger: logrus.WithField("tag", "inline-verifier"),
}
}

func (f *Ferry) NewSchemaFingerPrintVerifier() *SchemaFingerPrintVerifier {
fingerPrint := map[string]string{}
if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil {
fingerPrint = f.StateToResumeFrom.SchemaFingerPrint
}

periodicallyVerifyInterval, _ := time.ParseDuration(f.Config.PeriodicallyVerifySchemaFingerPrintInterval)

return &SchemaFingerPrintVerifier{
SourceDB: f.SourceDB,
TableRewrites: f.Config.TableRewrites,
TableSchemaCache: f.Tables,
ErrorHandler: f.ErrorHandler,
PeriodicallyVerifyInterval: periodicallyVerifyInterval,

FingerPrints: fingerPrint,

logger: logrus.WithField("tag", "schema_fingerprint"),
}
}

Expand Down Expand Up @@ -497,6 +512,8 @@ func (f *Ferry) Initialize() (err error) {
}
}

f.SchemaFingerPrintVerifier = f.NewSchemaFingerPrintVerifier()

// The iterative verifier needs the binlog streamer so this has to be first.
// Eventually this can be moved below the verifier initialization.
f.BinlogStreamer = f.NewSourceBinlogStreamer()
Expand Down Expand Up @@ -694,6 +711,14 @@ func (f *Ferry) Run() {
}()
}

schemaFingerVerifierPrintWg := &sync.WaitGroup{}
schemaFingerPrintVerifierContext, stopSchemaFingerprintVerifier := context.WithCancel(ctx)
schemaFingerVerifierPrintWg.Add(1)
go func() {
defer schemaFingerVerifierPrintWg.Done()
f.SchemaFingerPrintVerifier.PeriodicallyVerifySchemaFingerprints(schemaFingerPrintVerifierContext)
}()

inlineVerifierWg := &sync.WaitGroup{}
inlineVerifierContext, stopInlineVerifier := context.WithCancel(ctx)
if f.inlineVerifier != nil {
Expand Down Expand Up @@ -790,6 +815,9 @@ func (f *Ferry) Run() {

binlogWg.Wait()

stopSchemaFingerprintVerifier()
schemaFingerVerifierPrintWg.Wait()

f.logger.Info("ghostferry run is complete, shutting down auxiliary services")
f.OverallState.Store(StateDone)
f.DoneTime = time.Now()
Expand Down Expand Up @@ -918,14 +946,12 @@ func (f *Ferry) SerializeStateToJSON() (string, error) {
}
var (
binlogVerifyStore *BinlogVerifyStore = nil
schemaFingerPrint map[string]string = nil
)
if f.inlineVerifier != nil {
binlogVerifyStore = f.inlineVerifier.reverifyStore
schemaFingerPrint = f.inlineVerifier.schemaFingerPrints
}

serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, schemaFingerPrint)
serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, f.SchemaFingerPrintVerifier.FingerPrints)

if f.Config.DoNotIncludeSchemaCacheInStateDump {
serializedState.LastKnownTableSchemaCache = nil
Expand Down
90 changes: 0 additions & 90 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ package ghostferry
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

"crypto/md5"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/golang/snappy"
Expand Down Expand Up @@ -253,8 +249,6 @@ type InlineVerifier struct {
reverifyStore *BinlogVerifyStore
verifyDuringCutoverStarted AtomicBoolean

schemaFingerPrints map[string]string

sourceStmtCache *StmtCache
targetStmtCache *StmtCache
logger *logrus.Entry
Expand Down Expand Up @@ -375,9 +369,7 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target
func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) {
v.logger.Info("starting periodic reverifier")
ticker := time.NewTicker(v.VerifyBinlogEventsInterval)
ticker1 := time.NewTicker(v.VerifiySchemaFingerPrintInterval)

defer ticker1.Stop()
defer ticker.Stop()

for {
Expand All @@ -393,11 +385,6 @@ func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) {
v.logger.WithFields(logrus.Fields{
"remainingRowCount": v.reverifyStore.currentRowCount,
}).Debug("reverified")
case <-ticker1.C:
err := v.verifySchemaFingerPrint()
if err != nil {
v.ErrorHandler.Fatal("inline_verifier", err)
}
case <-ctx.Done():
v.logger.Info("shutdown periodic reverifier")
return
Expand Down Expand Up @@ -437,23 +424,12 @@ func (v *InlineVerifier) VerifyBeforeCutover() error {
return fmt.Errorf("cutover stage verification will not complete within max downtime duration (took %s)", timeToVerify)
}

err := v.verifySchemaFingerPrint()
if err != nil {
v.ErrorHandler.Fatal("inline_verifier", err)
}

return nil
}

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
v.verifyDuringCutoverStarted.Set(true)

err := v.verifySchemaFingerPrint()
if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
return VerificationResult{}, err
}

mismatchFound, mismatches, err := v.verifyAllEventsInStore()
if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
Expand Down Expand Up @@ -788,69 +764,3 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e

return v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData), nil
}

func (v *InlineVerifier) verifySchemaFingerPrint() error {
newSchemaFingerPrint, err := v.getSchemaFingerPrint()
if err != nil {
return err
}

oldSchemaFingerPrint := v.schemaFingerPrints
if len(oldSchemaFingerPrint) == 0 {
v.schemaFingerPrints = newSchemaFingerPrint
return nil
}

for _, table := range v.TableSchemaCache {
if newSchemaFingerPrint[table.Schema] != oldSchemaFingerPrint[table.Schema] {
return fmt.Errorf("failed to verifiy schema fingerprint for %s", table.Schema)
}
}

v.schemaFingerPrints = newSchemaFingerPrint
return nil
}

func (v *InlineVerifier) getSchemaFingerPrint() (map[string]string, error) {
schemaFingerPrints := map[string]string{}
dbSet := map[string]struct{}{}

for _, table := range v.TableSchemaCache {
if _, found := dbSet[table.Schema]; found {
continue
}
dbSet[table.Schema] = struct{}{}

query := fmt.Sprintf("SELECT * FROM information_schema.columns WHERE table_schema = '%s' ORDER BY table_name, column_name", table.Schema)
rows, err := v.SourceDB.Query(query)
if err != nil {
fmt.Println(err)
return schemaFingerPrints, err
}

schemaData := [][]interface{}{}
for rows.Next() {
rowData, err := ScanGenericRow(rows, 21)
if err != nil {
return schemaFingerPrints, err
}

_, isIgnored := table.IgnoredColumnsForVerification[string(rowData[3].([]byte))]
_, isBlacklisted := v.TableRewrites[string(rowData[2].([]byte))]

if !isIgnored && !isBlacklisted {
schemaData = append(schemaData, rowData)
}
}

schemaDataInBytes, err := json.Marshal(schemaData)
if err != nil {
return schemaFingerPrints, err
}

hash := md5.Sum([]byte(schemaDataInBytes))
schemaFingerPrints[table.Schema] = hex.EncodeToString(hash[:])
}

return schemaFingerPrints, nil
}
Loading

0 comments on commit 908f39f

Please sign in to comment.