diff --git a/config.go b/config.go index 40aed482..f29b7657 100644 --- a/config.go +++ b/config.go @@ -219,7 +219,7 @@ func (c *InlineVerifierConfig) Validate() error { } if c.VerifyBinlogEventsInterval == "" { - c.VerifyBinlogEventsInterval = "50ms" + c.VerifyBinlogEventsInterval = "1s" } c.verifyBinlogEventsInterval, err = time.ParseDuration(c.VerifyBinlogEventsInterval) @@ -753,6 +753,8 @@ type Config struct { // in the Progress. This behaviour is perfectly okay and doesn't mean there are no rows being written // to the target DB. EnableRowBatchSize bool + + PeriodicallyVerifySchemaFingerPrintInterval string } func (c *Config) ValidateConfig() error { @@ -822,5 +824,9 @@ func (c *Config) ValidateConfig() error { c.CutoverRetryWaitSeconds = 1 } + if len(c.PeriodicallyVerifySchemaFingerPrintInterval) == 0 { + c.PeriodicallyVerifySchemaFingerPrintInterval = "60s" + } + return nil } diff --git a/ferry.go b/ferry.go index ab040707..0ffda881 100644 --- a/ferry.go +++ b/ferry.go @@ -67,6 +67,8 @@ type Ferry struct { DataIterator *DataIterator BatchWriter *BatchWriter + SchemaFingerPrintVerifier *SchemaFingerPrintVerifier + StateTracker *StateTracker ErrorHandler ErrorHandler Throttler Throttler @@ -229,31 +231,44 @@ func (f *Ferry) NewInlineVerifier() *InlineVerifier { binlogVerifyStore = NewBinlogVerifyStore() } - schemaFingerPrint := map[string]string{} - if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil { - schemaFingerPrint = f.StateToResumeFrom.SchemaFingerPrint - f.logger.Info("SCHEMA FINGERPRINT FOUND") - } - return &InlineVerifier{ - SourceDB: f.SourceDB, - TargetDB: f.TargetDB, - DatabaseRewrites: f.Config.DatabaseRewrites, - TableRewrites: f.Config.TableRewrites, - TableSchemaCache: f.Tables, - BatchSize: f.Config.BinlogEventBatchSize, - VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval, - VerifiySchemaFingerPrintInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval, - MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime, + SourceDB: f.SourceDB, + TargetDB: f.TargetDB, + DatabaseRewrites: f.Config.DatabaseRewrites, + TableRewrites: f.Config.TableRewrites, + TableSchemaCache: f.Tables, + BatchSize: f.Config.BinlogEventBatchSize, + VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval, + MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime, StateTracker: f.StateTracker, ErrorHandler: f.ErrorHandler, - schemaFingerPrints: schemaFingerPrint, - reverifyStore: binlogVerifyStore, - sourceStmtCache: NewStmtCache(), - targetStmtCache: NewStmtCache(), - logger: logrus.WithField("tag", "inline-verifier"), + reverifyStore: binlogVerifyStore, + sourceStmtCache: NewStmtCache(), + targetStmtCache: NewStmtCache(), + logger: logrus.WithField("tag", "inline-verifier"), + } +} + +func (f *Ferry) NewSchemaFingerPrintVerifier() *SchemaFingerPrintVerifier { + fingerPrint := map[string]string{} + if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil { + fingerPrint = f.StateToResumeFrom.SchemaFingerPrint + } + + periodicallyVerifyInterval, _ := time.ParseDuration(f.Config.PeriodicallyVerifySchemaFingerPrintInterval) + + return &SchemaFingerPrintVerifier{ + SourceDB: f.SourceDB, + TableRewrites: f.Config.TableRewrites, + TableSchemaCache: f.Tables, + ErrorHandler: f.ErrorHandler, + PeriodicallyVerifyInterval: periodicallyVerifyInterval, + + FingerPrints: fingerPrint, + + logger: logrus.WithField("tag", "schema_fingerprint"), } } @@ -499,6 +514,8 @@ func (f *Ferry) Initialize() (err error) { } } + f.SchemaFingerPrintVerifier = f.NewSchemaFingerPrintVerifier() + // The iterative verifier needs the binlog streamer so this has to be first. // Eventually this can be moved below the verifier initialization. f.BinlogStreamer = f.NewSourceBinlogStreamer() @@ -696,6 +713,14 @@ func (f *Ferry) Run() { }() } + schemaFingerVerifierPrintWg := &sync.WaitGroup{} + schemaFingerPrintVerifierContext, stopSchemaFingerprintVerifier := context.WithCancel(ctx) + schemaFingerVerifierPrintWg.Add(1) + go func() { + defer schemaFingerVerifierPrintWg.Done() + f.SchemaFingerPrintVerifier.PeriodicallyVerifySchemaFingerprints(schemaFingerPrintVerifierContext) + }() + inlineVerifierWg := &sync.WaitGroup{} inlineVerifierContext, stopInlineVerifier := context.WithCancel(ctx) if f.inlineVerifier != nil { @@ -792,6 +817,9 @@ func (f *Ferry) Run() { binlogWg.Wait() + stopSchemaFingerprintVerifier() + schemaFingerVerifierPrintWg.Wait() + f.logger.Info("ghostferry run is complete, shutting down auxiliary services") f.OverallState.Store(StateDone) f.DoneTime = time.Now() @@ -920,14 +948,12 @@ func (f *Ferry) SerializeStateToJSON() (string, error) { } var ( binlogVerifyStore *BinlogVerifyStore = nil - schemaFingerPrint map[string]string = nil ) if f.inlineVerifier != nil { binlogVerifyStore = f.inlineVerifier.reverifyStore - schemaFingerPrint = f.inlineVerifier.schemaFingerPrints } - serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, schemaFingerPrint) + serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, f.SchemaFingerPrintVerifier.FingerPrints) if f.Config.DoNotIncludeSchemaCacheInStateDump { serializedState.LastKnownTableSchemaCache = nil diff --git a/inline_verifier.go b/inline_verifier.go index 0a9680c3..4fa38ad1 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -3,8 +3,6 @@ package ghostferry import ( "bytes" "context" - "encoding/hex" - "encoding/json" "errors" "fmt" "strconv" @@ -12,8 +10,6 @@ import ( "sync" "time" - "crypto/md5" - sql "github.com/Shopify/ghostferry/sqlwrapper" "github.com/golang/snappy" @@ -253,8 +249,6 @@ type InlineVerifier struct { reverifyStore *BinlogVerifyStore verifyDuringCutoverStarted AtomicBoolean - schemaFingerPrints map[string]string - sourceStmtCache *StmtCache targetStmtCache *StmtCache logger *logrus.Entry @@ -375,9 +369,7 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) { v.logger.Info("starting periodic reverifier") ticker := time.NewTicker(v.VerifyBinlogEventsInterval) - ticker1 := time.NewTicker(v.VerifiySchemaFingerPrintInterval) - defer ticker1.Stop() defer ticker.Stop() for { @@ -393,11 +385,6 @@ func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) { v.logger.WithFields(logrus.Fields{ "remainingRowCount": v.reverifyStore.currentRowCount, }).Debug("reverified") - case <-ticker1.C: - err := v.verifySchemaFingerPrint() - if err != nil { - v.ErrorHandler.Fatal("inline_verifier", err) - } case <-ctx.Done(): v.logger.Info("shutdown periodic reverifier") return @@ -437,23 +424,12 @@ func (v *InlineVerifier) VerifyBeforeCutover() error { return fmt.Errorf("cutover stage verification will not complete within max downtime duration (took %s)", timeToVerify) } - err := v.verifySchemaFingerPrint() - if err != nil { - v.ErrorHandler.Fatal("inline_verifier", err) - } - return nil } func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) { v.verifyDuringCutoverStarted.Set(true) - err := v.verifySchemaFingerPrint() - if err != nil { - v.logger.WithError(err).Error("failed to VerifyDuringCutover") - return VerificationResult{}, err - } - mismatchFound, mismatches, err := v.verifyAllEventsInStore() if err != nil { v.logger.WithError(err).Error("failed to VerifyDuringCutover") @@ -788,69 +764,3 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e return v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData), nil } - -func (v *InlineVerifier) verifySchemaFingerPrint() error { - newSchemaFingerPrint, err := v.getSchemaFingerPrint() - if err != nil { - return err - } - - oldSchemaFingerPrint := v.schemaFingerPrints - if len(oldSchemaFingerPrint) == 0 { - v.schemaFingerPrints = newSchemaFingerPrint - return nil - } - - for _, table := range v.TableSchemaCache { - if newSchemaFingerPrint[table.Schema] != oldSchemaFingerPrint[table.Schema] { - return fmt.Errorf("failed to verifiy schema fingerprint for %s", table.Schema) - } - } - - v.schemaFingerPrints = newSchemaFingerPrint - return nil -} - -func (v *InlineVerifier) getSchemaFingerPrint() (map[string]string, error) { - schemaFingerPrints := map[string]string{} - dbSet := map[string]struct{}{} - - for _, table := range v.TableSchemaCache { - if _, found := dbSet[table.Schema]; found { - continue - } - dbSet[table.Schema] = struct{}{} - - query := fmt.Sprintf("SELECT * FROM information_schema.columns WHERE table_schema = '%s' ORDER BY table_name, column_name", table.Schema) - rows, err := v.SourceDB.Query(query) - if err != nil { - fmt.Println(err) - return schemaFingerPrints, err - } - - schemaData := [][]interface{}{} - for rows.Next() { - rowData, err := ScanGenericRow(rows, 21) - if err != nil { - return schemaFingerPrints, err - } - - _, isIgnored := table.IgnoredColumnsForVerification[string(rowData[3].([]byte))] - _, isBlacklisted := v.TableRewrites[string(rowData[2].([]byte))] - - if !isIgnored && !isBlacklisted { - schemaData = append(schemaData, rowData) - } - } - - schemaDataInBytes, err := json.Marshal(schemaData) - if err != nil { - return schemaFingerPrints, err - } - - hash := md5.Sum([]byte(schemaDataInBytes)) - schemaFingerPrints[table.Schema] = hex.EncodeToString(hash[:]) - } - - return schemaFingerPrints, nil -} diff --git a/schema_fingerprint_verifier.go b/schema_fingerprint_verifier.go new file mode 100644 index 00000000..2673392a --- /dev/null +++ b/schema_fingerprint_verifier.go @@ -0,0 +1,108 @@ +package ghostferry + +import ( + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "time" + + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/sirupsen/logrus" +) + +type SchemaFingerPrintVerifier struct { + SourceDB *sql.DB + TableRewrites map[string]string + TableSchemaCache TableSchemaCache + ErrorHandler ErrorHandler + PeriodicallyVerifyInterval time.Duration + + FingerPrints map[string]string + + logger *logrus.Entry +} + +func (sf *SchemaFingerPrintVerifier) PeriodicallyVerifySchemaFingerprints(ctx context.Context) { + sf.logger.Info("starting periodic schema fingerprint verification") + ticker := time.NewTicker(sf.PeriodicallyVerifyInterval) + + for { + select { + case <-ticker.C: + err := sf.VerifySchemaFingerPrint() + if err != nil { + sf.ErrorHandler.Fatal("schema_fingerprint", err) + } + case <-ctx.Done(): + sf.logger.Info("shutdown periodic schema_fingerprint verification") + } + } +} + +func (sf *SchemaFingerPrintVerifier) VerifySchemaFingerPrint() error { + newSchemaFingerPrint, err := sf.GetSchemaFingerPrint() + if err != nil { + return err + } + + oldSchemaFingerPrint := sf.FingerPrints + if len(oldSchemaFingerPrint) == 0 { + sf.FingerPrints = newSchemaFingerPrint + return nil + } + + for _, table := range sf.TableSchemaCache { + if newSchemaFingerPrint[table.Schema] != oldSchemaFingerPrint[table.Schema] { + return fmt.Errorf("failed to verifiy schema fingerprint for %s", table.Schema) + } + } + + sf.FingerPrints = newSchemaFingerPrint + return nil +} + +func (sf *SchemaFingerPrintVerifier) GetSchemaFingerPrint() (map[string]string, error) { + schemaFingerPrints := map[string]string{} + dbSet := map[string]struct{}{} + + for _, table := range sf.TableSchemaCache { + if _, found := dbSet[table.Schema]; found { + continue + } + dbSet[table.Schema] = struct{}{} + + query := fmt.Sprintf("SELECT * FROM information_schema.columns WHERE table_schema = '%s' ORDER BY table_name, column_name", table.Schema) + rows, err := sf.SourceDB.Query(query) + if err != nil { + fmt.Println(err) + return schemaFingerPrints, err + } + + schemaData := [][]interface{}{} + for rows.Next() { + rowData, err := ScanGenericRow(rows, 21) + if err != nil { + return schemaFingerPrints, err + } + + _, isIgnored := table.IgnoredColumnsForVerification[string(rowData[3].([]byte))] + _, isBlacklisted := sf.TableRewrites[string(rowData[2].([]byte))] + + if !isIgnored && !isBlacklisted { + schemaData = append(schemaData, rowData) + } + } + + schemaDataInBytes, err := json.Marshal(schemaData) + if err != nil { + return schemaFingerPrints, err + } + + hash := md5.Sum([]byte(schemaDataInBytes)) + schemaFingerPrints[table.Schema] = hex.EncodeToString(hash[:]) + } + + return schemaFingerPrints, nil +} diff --git a/test/go/schema_fingerprint_verifier_test.go b/test/go/schema_fingerprint_verifier_test.go new file mode 100644 index 00000000..6f0673e0 --- /dev/null +++ b/test/go/schema_fingerprint_verifier_test.go @@ -0,0 +1,53 @@ +package test + +import ( + "fmt" + "testing" + "time" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/testhelpers" + "github.com/stretchr/testify/suite" +) + +type SchemaFingerPrintVerifierTestSuite struct { + *testhelpers.GhostferryUnitTestSuite + tablename string + sf *ghostferry.SchemaFingerPrintVerifier +} + +func alterTestTableSchema(this *SchemaFingerPrintVerifierTestSuite) { + query := fmt.Sprintf("ALTER TABLE IF EXISTS %s.%s ADD COLUMN extracol VARCHAR(15)", testhelpers.TestSchemaName, this.tablename) + this.Ferry.SourceDB.Query(query) +} + +func (this *SchemaFingerPrintVerifierTestSuite) SetupTest() { + this.GhostferryUnitTestSuite.SetupTest() + + this.tablename = "test_table_1" + testhelpers.SeedInitialData(this.Ferry.SourceDB, testhelpers.TestSchemaName, this.tablename, 0) + + periodicallyVerifyInterval, _ := time.ParseDuration(this.Ferry.Config.PeriodicallyVerifySchemaFingerPrintInterval) + + this.sf = &ghostferry.SchemaFingerPrintVerifier{ + SourceDB: this.Ferry.SourceDB, + ErrorHandler: this.Ferry.ErrorHandler, + TableRewrites: this.Ferry.TableRewrites, + TableSchemaCache: this.Ferry.Tables, + PeriodicallyVerifyInterval: periodicallyVerifyInterval, + FingerPrints: map[string]string{}, + } +} + +func (this *SchemaFingerPrintVerifierTestSuite) TestVerifySchemaFingerprint() { + err := this.sf.VerifySchemaFingerPrint() + this.Require().Nil(err) + + alterTestTableSchema(this) + err = this.sf.VerifySchemaFingerPrint() + this.Require().Error(fmt.Errorf("failed to verifiy schema fingerprint for")) +} + +func TestSchemaFingerPrintVerifierTestSuite(t *testing.T) { + suite.Run(t, new(SchemaFingerPrintVerifierTestSuite)) +} diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index dd161a63..8b470259 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -553,7 +553,7 @@ def test_inline_verifier_fails_if_database_schema_is_changed_during_data_copy error_occured = false ghostferry.on_callback("error") do |err| - error_occured = true + raise "ghostferry error ocurred - #{err}" end ghostferry.run diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry.go index 9f43c3e3..7d54741a 100644 --- a/test/lib/go/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -206,6 +206,8 @@ func NewStandardConfig() (*ghostferry.Config, error) { SkipTargetVerification: (os.Getenv("GHOSTFERRY_SKIP_TARGET_VERIFICATION") == "true"), EnableRowBatchSize: true, DumpStateToStdoutOnError: true, + + PeriodicallyVerifySchemaFingerPrintInterval: "1s", } integrationPort := os.Getenv(portEnvName)