From 2d0fab251f88d48c8d2fe5a7052c73ab071d0523 Mon Sep 17 00:00:00 2001 From: Manan007224 Date: Sun, 4 Jul 2021 16:47:12 -0700 Subject: [PATCH] continously check databasse schema from inline_verifier --- config.go | 2 +- ferry.go | 42 +++++---- inline_verifier.go | 109 +++++++++++++++++++++-- state_tracker.go | 7 +- test/go/data_iterator_test.go | 2 +- test/integration/inline_verifier_test.rb | 23 +++++ 6 files changed, 158 insertions(+), 27 deletions(-) diff --git a/config.go b/config.go index 2054e7670..91256e2c8 100644 --- a/config.go +++ b/config.go @@ -219,7 +219,7 @@ func (c *InlineVerifierConfig) Validate() error { } if c.VerifyBinlogEventsInterval == "" { - c.VerifyBinlogEventsInterval = "1s" + c.VerifyBinlogEventsInterval = "50ms" } c.verifyBinlogEventsInterval, err = time.ParseDuration(c.VerifyBinlogEventsInterval) diff --git a/ferry.go b/ferry.go index 3e9a9d5d4..bf23ea36f 100644 --- a/ferry.go +++ b/ferry.go @@ -225,23 +225,31 @@ func (f *Ferry) NewInlineVerifier() *InlineVerifier { binlogVerifyStore = NewBinlogVerifyStore() } + schemaFingerPrint := map[string]string{} + if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil { + schemaFingerPrint = f.StateToResumeFrom.SchemaFingerPrint + f.logger.Info("SCHEMA FINGERPRINT FOUND") + } + return &InlineVerifier{ - SourceDB: f.SourceDB, - TargetDB: f.TargetDB, - DatabaseRewrites: f.Config.DatabaseRewrites, - TableRewrites: f.Config.TableRewrites, - TableSchemaCache: f.Tables, - BatchSize: f.Config.BinlogEventBatchSize, - VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval, - MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime, + SourceDB: f.SourceDB, + TargetDB: f.TargetDB, + DatabaseRewrites: f.Config.DatabaseRewrites, + TableRewrites: f.Config.TableRewrites, + TableSchemaCache: f.Tables, + BatchSize: f.Config.BinlogEventBatchSize, + VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval, + VerifiySchemaFingerPrintInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval, + MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime, StateTracker: f.StateTracker, ErrorHandler: f.ErrorHandler, - reverifyStore: binlogVerifyStore, - sourceStmtCache: NewStmtCache(), - targetStmtCache: NewStmtCache(), - logger: logrus.WithField("tag", "inline-verifier"), + schemaFingerPrints: schemaFingerPrint, + reverifyStore: binlogVerifyStore, + sourceStmtCache: NewStmtCache(), + targetStmtCache: NewStmtCache(), + logger: logrus.WithField("tag", "inline-verifier"), } } @@ -908,12 +916,16 @@ func (f *Ferry) SerializeStateToJSON() (string, error) { err := errors.New("no valid StateTracker") return "", err } - var binlogVerifyStore *BinlogVerifyStore = nil + var ( + binlogVerifyStore *BinlogVerifyStore = nil + schemaFingerPrint map[string]string = nil + ) if f.inlineVerifier != nil { binlogVerifyStore = f.inlineVerifier.reverifyStore + schemaFingerPrint = f.inlineVerifier.schemaFingerPrints } - serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore) + serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, schemaFingerPrint) if f.Config.DoNotIncludeSchemaCacheInStateDump { serializedState.LastKnownTableSchemaCache = nil @@ -945,7 +957,7 @@ func (f *Ferry) Progress() *Progress { } // Table Progress - serializedState := f.StateTracker.Serialize(nil, nil) + serializedState := f.StateTracker.Serialize(nil, nil, nil) // Note the below will not necessarily be synchronized with serializedState. // This is fine as we don't need to be super precise with performance data. rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable() diff --git a/inline_verifier.go b/inline_verifier.go index d221479a3..0a9680c36 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -3,6 +3,8 @@ package ghostferry import ( "bytes" "context" + "encoding/hex" + "encoding/json" "errors" "fmt" "strconv" @@ -10,6 +12,8 @@ import ( "sync" "time" + "crypto/md5" + sql "github.com/Shopify/ghostferry/sqlwrapper" "github.com/golang/snappy" @@ -233,14 +237,15 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore { } type InlineVerifier struct { - SourceDB *sql.DB - TargetDB *sql.DB - DatabaseRewrites map[string]string - TableRewrites map[string]string - TableSchemaCache TableSchemaCache - BatchSize int - VerifyBinlogEventsInterval time.Duration - MaxExpectedDowntime time.Duration + SourceDB *sql.DB + TargetDB *sql.DB + DatabaseRewrites map[string]string + TableRewrites map[string]string + TableSchemaCache TableSchemaCache + BatchSize int + VerifyBinlogEventsInterval time.Duration + VerifiySchemaFingerPrintInterval time.Duration + MaxExpectedDowntime time.Duration StateTracker *StateTracker ErrorHandler ErrorHandler @@ -248,6 +253,8 @@ type InlineVerifier struct { reverifyStore *BinlogVerifyStore verifyDuringCutoverStarted AtomicBoolean + schemaFingerPrints map[string]string + sourceStmtCache *StmtCache targetStmtCache *StmtCache logger *logrus.Entry @@ -368,6 +375,9 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) { v.logger.Info("starting periodic reverifier") ticker := time.NewTicker(v.VerifyBinlogEventsInterval) + ticker1 := time.NewTicker(v.VerifiySchemaFingerPrintInterval) + + defer ticker1.Stop() defer ticker.Stop() for { @@ -383,12 +393,16 @@ func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) { v.logger.WithFields(logrus.Fields{ "remainingRowCount": v.reverifyStore.currentRowCount, }).Debug("reverified") + case <-ticker1.C: + err := v.verifySchemaFingerPrint() + if err != nil { + v.ErrorHandler.Fatal("inline_verifier", err) + } case <-ctx.Done(): v.logger.Info("shutdown periodic reverifier") return } } - } func (v *InlineVerifier) VerifyBeforeCutover() error { @@ -423,12 +437,23 @@ func (v *InlineVerifier) VerifyBeforeCutover() error { return fmt.Errorf("cutover stage verification will not complete within max downtime duration (took %s)", timeToVerify) } + err := v.verifySchemaFingerPrint() + if err != nil { + v.ErrorHandler.Fatal("inline_verifier", err) + } + return nil } func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) { v.verifyDuringCutoverStarted.Set(true) + err := v.verifySchemaFingerPrint() + if err != nil { + v.logger.WithError(err).Error("failed to VerifyDuringCutover") + return VerificationResult{}, err + } + mismatchFound, mismatches, err := v.verifyAllEventsInStore() if err != nil { v.logger.WithError(err).Error("failed to VerifyDuringCutover") @@ -763,3 +788,69 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e return v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData), nil } + +func (v *InlineVerifier) verifySchemaFingerPrint() error { + newSchemaFingerPrint, err := v.getSchemaFingerPrint() + if err != nil { + return err + } + + oldSchemaFingerPrint := v.schemaFingerPrints + if len(oldSchemaFingerPrint) == 0 { + v.schemaFingerPrints = newSchemaFingerPrint + return nil + } + + for _, table := range v.TableSchemaCache { + if newSchemaFingerPrint[table.Schema] != oldSchemaFingerPrint[table.Schema] { + return fmt.Errorf("failed to verifiy schema fingerprint for %s", table.Schema) + } + } + + v.schemaFingerPrints = newSchemaFingerPrint + return nil +} + +func (v *InlineVerifier) getSchemaFingerPrint() (map[string]string, error) { + schemaFingerPrints := map[string]string{} + dbSet := map[string]struct{}{} + + for _, table := range v.TableSchemaCache { + if _, found := dbSet[table.Schema]; found { + continue + } + dbSet[table.Schema] = struct{}{} + + query := fmt.Sprintf("SELECT * FROM information_schema.columns WHERE table_schema = '%s' ORDER BY table_name, column_name", table.Schema) + rows, err := v.SourceDB.Query(query) + if err != nil { + fmt.Println(err) + return schemaFingerPrints, err + } + + schemaData := [][]interface{}{} + for rows.Next() { + rowData, err := ScanGenericRow(rows, 21) + if err != nil { + return schemaFingerPrints, err + } + + _, isIgnored := table.IgnoredColumnsForVerification[string(rowData[3].([]byte))] + _, isBlacklisted := v.TableRewrites[string(rowData[2].([]byte))] + + if !isIgnored && !isBlacklisted { + schemaData = append(schemaData, rowData) + } + } + + schemaDataInBytes, err := json.Marshal(schemaData) + if err != nil { + return schemaFingerPrints, err + } + + hash := md5.Sum([]byte(schemaDataInBytes)) + schemaFingerPrints[table.Schema] = hex.EncodeToString(hash[:]) + } + + return schemaFingerPrints, nil +} diff --git a/state_tracker.go b/state_tracker.go index f346a5c58..1d1475b99 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -40,6 +40,7 @@ type SerializableState struct { BinlogVerifyStore BinlogVerifySerializedStore LastStoredBinlogPositionForInlineVerifier mysql.Position LastStoredBinlogPositionForTargetVerifier mysql.Position + SchemaFingerPrint map[string]string } func (s *SerializableState) MinSourceBinlogPosition() mysql.Position { @@ -253,7 +254,7 @@ func (s *StateTracker) updateSpeedLog(deltaPaginationKey uint64) { } } -func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore) *SerializableState { +func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore, schemaFingerPrint map[string]string) *SerializableState { s.BinlogRWMutex.RLock() defer s.BinlogRWMutex.RUnlock() @@ -274,6 +275,10 @@ func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, bin state.BinlogVerifyStore = binlogVerifyStore.Serialize() } + if schemaFingerPrint != nil { + state.SchemaFingerPrint = schemaFingerPrint + } + // Need a copy because lastSuccessfulPaginationKeys may change after Serialize // returns. This would inaccurately reflect the state of Ghostferry when // Serialize is called. diff --git a/test/go/data_iterator_test.go b/test/go/data_iterator_test.go index bbacf529a..8254c8869 100644 --- a/test/go/data_iterator_test.go +++ b/test/go/data_iterator_test.go @@ -163,7 +163,7 @@ func (this *DataIteratorTestSuite) TestDoneListenerGetsNotifiedWhenDone() { } func (this *DataIteratorTestSuite) completedTables() map[string]bool { - return this.di.StateTracker.Serialize(nil, nil).CompletedTables + return this.di.StateTracker.Serialize(nil, nil, nil).CompletedTables } func (this *DataIteratorTestSuite) TestDataIterationBatchSizePerTableOverride() { diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index 97c209f89..dd161a636 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -1,4 +1,5 @@ require "test_helper" +require "pry" class InlineVerifierTest < GhostferryTestCase INSERT_TRIGGER_NAME = "corrupting_insert_trigger" @@ -536,6 +537,28 @@ def test_null_in_different_order assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"] end + def test_inline_verifier_fails_if_database_schema_is_changed_during_data_copy + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) + + verification_ran = false + batches_written = 0 + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + batches_written += 1 + if batches_written == 1 + source_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN extracolumn VARCHAR(15);") + end + end + + error_occured = false + ghostferry.on_callback("error") do |err| + error_occured = true + end + + ghostferry.run + end + ################### # Collation Tests # ###################