Skip to content

Commit

Permalink
continously check databasse schema from inline_verifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Manan007224 committed Jul 8, 2021
1 parent e605f11 commit 2d0fab2
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 27 deletions.
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *InlineVerifierConfig) Validate() error {
}

if c.VerifyBinlogEventsInterval == "" {
c.VerifyBinlogEventsInterval = "1s"
c.VerifyBinlogEventsInterval = "50ms"
}

c.verifyBinlogEventsInterval, err = time.ParseDuration(c.VerifyBinlogEventsInterval)
Expand Down
42 changes: 27 additions & 15 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,31 @@ func (f *Ferry) NewInlineVerifier() *InlineVerifier {
binlogVerifyStore = NewBinlogVerifyStore()
}

schemaFingerPrint := map[string]string{}
if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil {
schemaFingerPrint = f.StateToResumeFrom.SchemaFingerPrint
f.logger.Info("SCHEMA FINGERPRINT FOUND")
}

return &InlineVerifier{
SourceDB: f.SourceDB,
TargetDB: f.TargetDB,
DatabaseRewrites: f.Config.DatabaseRewrites,
TableRewrites: f.Config.TableRewrites,
TableSchemaCache: f.Tables,
BatchSize: f.Config.BinlogEventBatchSize,
VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval,
MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime,
SourceDB: f.SourceDB,
TargetDB: f.TargetDB,
DatabaseRewrites: f.Config.DatabaseRewrites,
TableRewrites: f.Config.TableRewrites,
TableSchemaCache: f.Tables,
BatchSize: f.Config.BinlogEventBatchSize,
VerifyBinlogEventsInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval,
VerifiySchemaFingerPrintInterval: f.Config.InlineVerifierConfig.verifyBinlogEventsInterval,
MaxExpectedDowntime: f.Config.InlineVerifierConfig.maxExpectedDowntime,

StateTracker: f.StateTracker,
ErrorHandler: f.ErrorHandler,

reverifyStore: binlogVerifyStore,
sourceStmtCache: NewStmtCache(),
targetStmtCache: NewStmtCache(),
logger: logrus.WithField("tag", "inline-verifier"),
schemaFingerPrints: schemaFingerPrint,
reverifyStore: binlogVerifyStore,
sourceStmtCache: NewStmtCache(),
targetStmtCache: NewStmtCache(),
logger: logrus.WithField("tag", "inline-verifier"),
}
}

Expand Down Expand Up @@ -908,12 +916,16 @@ func (f *Ferry) SerializeStateToJSON() (string, error) {
err := errors.New("no valid StateTracker")
return "", err
}
var binlogVerifyStore *BinlogVerifyStore = nil
var (
binlogVerifyStore *BinlogVerifyStore = nil
schemaFingerPrint map[string]string = nil
)
if f.inlineVerifier != nil {
binlogVerifyStore = f.inlineVerifier.reverifyStore
schemaFingerPrint = f.inlineVerifier.schemaFingerPrints
}

serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore)
serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, schemaFingerPrint)

if f.Config.DoNotIncludeSchemaCacheInStateDump {
serializedState.LastKnownTableSchemaCache = nil
Expand Down Expand Up @@ -945,7 +957,7 @@ func (f *Ferry) Progress() *Progress {
}

// Table Progress
serializedState := f.StateTracker.Serialize(nil, nil)
serializedState := f.StateTracker.Serialize(nil, nil, nil)
// Note the below will not necessarily be synchronized with serializedState.
// This is fine as we don't need to be super precise with performance data.
rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable()
Expand Down
109 changes: 100 additions & 9 deletions inline_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package ghostferry
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

"crypto/md5"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/golang/snappy"
Expand Down Expand Up @@ -233,21 +237,24 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore {
}

type InlineVerifier struct {
SourceDB *sql.DB
TargetDB *sql.DB
DatabaseRewrites map[string]string
TableRewrites map[string]string
TableSchemaCache TableSchemaCache
BatchSize int
VerifyBinlogEventsInterval time.Duration
MaxExpectedDowntime time.Duration
SourceDB *sql.DB
TargetDB *sql.DB
DatabaseRewrites map[string]string
TableRewrites map[string]string
TableSchemaCache TableSchemaCache
BatchSize int
VerifyBinlogEventsInterval time.Duration
VerifiySchemaFingerPrintInterval time.Duration
MaxExpectedDowntime time.Duration

StateTracker *StateTracker
ErrorHandler ErrorHandler

reverifyStore *BinlogVerifyStore
verifyDuringCutoverStarted AtomicBoolean

schemaFingerPrints map[string]string

sourceStmtCache *StmtCache
targetStmtCache *StmtCache
logger *logrus.Entry
Expand Down Expand Up @@ -368,6 +375,9 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target
func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) {
v.logger.Info("starting periodic reverifier")
ticker := time.NewTicker(v.VerifyBinlogEventsInterval)
ticker1 := time.NewTicker(v.VerifiySchemaFingerPrintInterval)

defer ticker1.Stop()
defer ticker.Stop()

for {
Expand All @@ -383,12 +393,16 @@ func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) {
v.logger.WithFields(logrus.Fields{
"remainingRowCount": v.reverifyStore.currentRowCount,
}).Debug("reverified")
case <-ticker1.C:
err := v.verifySchemaFingerPrint()
if err != nil {
v.ErrorHandler.Fatal("inline_verifier", err)
}
case <-ctx.Done():
v.logger.Info("shutdown periodic reverifier")
return
}
}

}

func (v *InlineVerifier) VerifyBeforeCutover() error {
Expand Down Expand Up @@ -423,12 +437,23 @@ func (v *InlineVerifier) VerifyBeforeCutover() error {
return fmt.Errorf("cutover stage verification will not complete within max downtime duration (took %s)", timeToVerify)
}

err := v.verifySchemaFingerPrint()
if err != nil {
v.ErrorHandler.Fatal("inline_verifier", err)
}

return nil
}

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) {
v.verifyDuringCutoverStarted.Set(true)

err := v.verifySchemaFingerPrint()
if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
return VerificationResult{}, err
}

mismatchFound, mismatches, err := v.verifyAllEventsInStore()
if err != nil {
v.logger.WithError(err).Error("failed to VerifyDuringCutover")
Expand Down Expand Up @@ -763,3 +788,69 @@ func (v *InlineVerifier) verifyBinlogBatch(batch BinlogVerifyBatch) ([]uint64, e

return v.compareHashesAndData(sourceFingerprints, targetFingerprints, sourceDecompressedData, targetDecompressedData), nil
}

func (v *InlineVerifier) verifySchemaFingerPrint() error {
newSchemaFingerPrint, err := v.getSchemaFingerPrint()
if err != nil {
return err
}

oldSchemaFingerPrint := v.schemaFingerPrints
if len(oldSchemaFingerPrint) == 0 {
v.schemaFingerPrints = newSchemaFingerPrint
return nil
}

for _, table := range v.TableSchemaCache {
if newSchemaFingerPrint[table.Schema] != oldSchemaFingerPrint[table.Schema] {
return fmt.Errorf("failed to verifiy schema fingerprint for %s", table.Schema)
}
}

v.schemaFingerPrints = newSchemaFingerPrint
return nil
}

func (v *InlineVerifier) getSchemaFingerPrint() (map[string]string, error) {
schemaFingerPrints := map[string]string{}
dbSet := map[string]struct{}{}

for _, table := range v.TableSchemaCache {
if _, found := dbSet[table.Schema]; found {
continue
}
dbSet[table.Schema] = struct{}{}

query := fmt.Sprintf("SELECT * FROM information_schema.columns WHERE table_schema = '%s' ORDER BY table_name, column_name", table.Schema)
rows, err := v.SourceDB.Query(query)
if err != nil {
fmt.Println(err)
return schemaFingerPrints, err
}

schemaData := [][]interface{}{}
for rows.Next() {
rowData, err := ScanGenericRow(rows, 21)
if err != nil {
return schemaFingerPrints, err
}

_, isIgnored := table.IgnoredColumnsForVerification[string(rowData[3].([]byte))]
_, isBlacklisted := v.TableRewrites[string(rowData[2].([]byte))]

if !isIgnored && !isBlacklisted {
schemaData = append(schemaData, rowData)
}
}

schemaDataInBytes, err := json.Marshal(schemaData)
if err != nil {
return schemaFingerPrints, err
}

hash := md5.Sum([]byte(schemaDataInBytes))
schemaFingerPrints[table.Schema] = hex.EncodeToString(hash[:])
}

return schemaFingerPrints, nil
}
7 changes: 6 additions & 1 deletion state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type SerializableState struct {
BinlogVerifyStore BinlogVerifySerializedStore
LastStoredBinlogPositionForInlineVerifier mysql.Position
LastStoredBinlogPositionForTargetVerifier mysql.Position
SchemaFingerPrint map[string]string
}

func (s *SerializableState) MinSourceBinlogPosition() mysql.Position {
Expand Down Expand Up @@ -253,7 +254,7 @@ func (s *StateTracker) updateSpeedLog(deltaPaginationKey uint64) {
}
}

func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore) *SerializableState {
func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore, schemaFingerPrint map[string]string) *SerializableState {
s.BinlogRWMutex.RLock()
defer s.BinlogRWMutex.RUnlock()

Expand All @@ -274,6 +275,10 @@ func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, bin
state.BinlogVerifyStore = binlogVerifyStore.Serialize()
}

if schemaFingerPrint != nil {
state.SchemaFingerPrint = schemaFingerPrint
}

// Need a copy because lastSuccessfulPaginationKeys may change after Serialize
// returns. This would inaccurately reflect the state of Ghostferry when
// Serialize is called.
Expand Down
2 changes: 1 addition & 1 deletion test/go/data_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (this *DataIteratorTestSuite) TestDoneListenerGetsNotifiedWhenDone() {
}

func (this *DataIteratorTestSuite) completedTables() map[string]bool {
return this.di.StateTracker.Serialize(nil, nil).CompletedTables
return this.di.StateTracker.Serialize(nil, nil, nil).CompletedTables
}

func (this *DataIteratorTestSuite) TestDataIterationBatchSizePerTableOverride() {
Expand Down
23 changes: 23 additions & 0 deletions test/integration/inline_verifier_test.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "test_helper"
require "pry"

class InlineVerifierTest < GhostferryTestCase
INSERT_TRIGGER_NAME = "corrupting_insert_trigger"
Expand Down Expand Up @@ -536,6 +537,28 @@ def test_null_in_different_order
assert_equal "cutover verification failed for: gftest.test_table_1 [paginationKeys: 1 ] ", ghostferry.error_lines.last["msg"]
end

def test_inline_verifier_fails_if_database_schema_is_changed_during_data_copy
seed_simple_database_with_single_table

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" })

verification_ran = false
batches_written = 0
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
batches_written += 1
if batches_written == 1
source_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN extracolumn VARCHAR(15);")
end
end

error_occured = false
ghostferry.on_callback("error") do |err|
error_occured = true
end

ghostferry.run
end

###################
# Collation Tests #
###################
Expand Down

0 comments on commit 2d0fab2

Please sign in to comment.