From a5ef20ce11994ec295288143237ba33e1b3a15eb Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Wed, 25 Sep 2019 14:47:30 -0400 Subject: [PATCH 1/3] Removed iterative verifier --- compression_verifier.go | 246 ------ config.go | 70 +- ferry.go | 69 +- inline_verifier.go | 23 +- iterative_verifier.go | 700 ------------------ table_schema_cache.go | 8 + test/go/iterative_verifier_collation_test.go | 118 --- .../go/iterative_verifier_integration_test.go | 293 -------- test/go/iterative_verifier_test.go | 477 ------------ test/integration/iterative_verifier_test.rb | 56 -- test/lib/go/integrationferry.go | 10 +- 11 files changed, 30 insertions(+), 2040 deletions(-) delete mode 100644 compression_verifier.go delete mode 100644 iterative_verifier.go delete mode 100644 test/go/iterative_verifier_collation_test.go delete mode 100644 test/go/iterative_verifier_integration_test.go delete mode 100644 test/go/iterative_verifier_test.go delete mode 100644 test/integration/iterative_verifier_test.rb diff --git a/compression_verifier.go b/compression_verifier.go deleted file mode 100644 index 546519020..000000000 --- a/compression_verifier.go +++ /dev/null @@ -1,246 +0,0 @@ -package ghostferry - -import ( - "crypto/md5" - "database/sql" - "encoding/hex" - "errors" - "fmt" - "strconv" - "strings" - - sq "github.com/Masterminds/squirrel" - "github.com/golang/snappy" - "github.com/siddontang/go-mysql/schema" - "github.com/sirupsen/logrus" -) - -const ( - // CompressionSnappy is used to identify Snappy (https://google.github.io/snappy/) compressed column data - CompressionSnappy = "SNAPPY" -) - -type ( - // TableColumnCompressionConfig represents compression configuration for a - // column in a table as table -> column -> compression-type - // ex: books -> contents -> snappy - TableColumnCompressionConfig map[string]map[string]string -) - -// UnsupportedCompressionError is used to identify errors resulting -// from attempting to decompress unsupported algorithms -type UnsupportedCompressionError struct { - table string - column string - algorithm string -} - -func (e UnsupportedCompressionError) Error() string { - return "Compression algorithm: " + e.algorithm + - " not supported on table: " + e.table + - " for column: " + e.column -} - -// CompressionVerifier provides support for verifying the payload of compressed columns that -// may have different hashes for the same data by first decompressing the compressed -// data before fingerprinting -type CompressionVerifier struct { - logger *logrus.Entry - - supportedAlgorithms map[string]struct{} - tableColumnCompressions TableColumnCompressionConfig -} - -// GetCompressedHashes compares the source data with the target data to ensure the integrity of the -// data being copied. -// -// The GetCompressedHashes method checks if the existing table contains compressed data -// and will apply the decompression algorithm to the applicable columns if necessary. -// After the columns are decompressed, the hashes of the data are used to verify equality -func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error) { - c.logger.WithFields(logrus.Fields{ - "tag": "compression_verifier", - "table": table, - }).Info("decompressing table data before verification") - - tableCompression := c.tableColumnCompressions[table] - - // Extract the raw rows using SQL to be decompressed - rows, err := getRows(db, schema, table, pkColumn, columns, pks) - if err != nil { - return nil, err - } - defer rows.Close() - - // Decompress applicable columns and hash the resulting column values for comparison - resultSet := make(map[uint64][]byte) - for rows.Next() { - rowData, err := ScanByteRow(rows, len(columns)+1) - if err != nil { - return nil, err - } - - pk, err := strconv.ParseUint(string(rowData[0]), 10, 64) - if err != nil { - return nil, err - } - - // Decompress the applicable columns and then hash them together - // to create a fingerprint. decompressedRowData contains a map of all - // the non-compressed columns and associated decompressed values by the - // index of the column - decompressedRowData := [][]byte{} - for idx, column := range columns { - if algorithm, ok := tableCompression[column.Name]; ok { - // rowData contains the result of "SELECT pkColumn, * FROM ...", so idx+1 to get each column - decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx+1]) - if err != nil { - return nil, err - } - decompressedRowData = append(decompressedRowData, decompressedColData) - } else { - decompressedRowData = append(decompressedRowData, rowData[idx+1]) - } - } - - // Hash the data of the row to be added to the result set - decompressedRowHash, err := c.HashRow(decompressedRowData) - if err != nil { - return nil, err - } - - resultSet[pk] = decompressedRowHash - } - - metrics.Gauge( - "compression_verifier_decompress_rows", - float64(len(resultSet)), - []MetricTag{{"table", table}}, - 1.0, - ) - - logrus.WithFields(logrus.Fields{ - "tag": "compression_verifier", - "rows": len(resultSet), - "table": table, - }).Debug("decompressed rows will be compared") - - return resultSet, nil -} - -// Decompress will apply the configured decompression algorithm to the configured columns data -func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error) { - var decompressed []byte - switch strings.ToUpper(algorithm) { - case CompressionSnappy: - return snappy.Decode(decompressed, compressed) - default: - return nil, UnsupportedCompressionError{ - table: table, - column: column, - algorithm: algorithm, - } - } - -} - -// HashRow will fingerprint the non-primary columns of the row to verify data equality -func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error) { - if len(decompressedRowData) == 0 { - return nil, errors.New("Row data to fingerprint must not be empty") - } - - hash := md5.New() - var rowFingerprint []byte - for _, colData := range decompressedRowData { - rowFingerprint = append(rowFingerprint, colData...) - } - - _, err := hash.Write(rowFingerprint) - if err != nil { - return nil, err - } - - return []byte(hex.EncodeToString(hash.Sum(nil))), nil -} - -// IsCompressedTable will identify whether or not a table is compressed -func (c *CompressionVerifier) IsCompressedTable(table string) bool { - if _, ok := c.tableColumnCompressions[table]; ok { - return true - } - return false -} - -func (c *CompressionVerifier) verifyConfiguredCompression(tableColumnCompressions TableColumnCompressionConfig) error { - for table, columns := range tableColumnCompressions { - for column, algorithm := range columns { - if _, ok := c.supportedAlgorithms[strings.ToUpper(algorithm)]; !ok { - return &UnsupportedCompressionError{ - table: table, - column: column, - algorithm: algorithm, - } - } - } - } - - return nil -} - -// NewCompressionVerifier first checks the map for supported compression algorithms before -// initializing and returning the initialized instance. -func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error) { - supportedAlgorithms := make(map[string]struct{}) - supportedAlgorithms[CompressionSnappy] = struct{}{} - - compressionVerifier := &CompressionVerifier{ - logger: logrus.WithField("tag", "compression_verifier"), - supportedAlgorithms: supportedAlgorithms, - tableColumnCompressions: tableColumnCompressions, - } - - if err := compressionVerifier.verifyConfiguredCompression(tableColumnCompressions); err != nil { - return nil, err - } - - return compressionVerifier, nil -} - -func getRows(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (*sql.Rows, error) { - quotedPK := quoteField(pkColumn) - sql, args, err := rowSelector(columns, pkColumn). - From(QuotedTableNameFromString(schema, table)). - Where(sq.Eq{quotedPK: pks}). - OrderBy(quotedPK). - ToSql() - - if err != nil { - return nil, err - } - - // This query must be a prepared query. If it is not, querying will use - // MySQL's plain text interface, which will scan all values into []uint8 - // if we give it []interface{}. - stmt, err := db.Prepare(sql) - if err != nil { - return nil, err - } - - defer stmt.Close() - rows, err := stmt.Query(args...) - if err != nil { - return nil, err - } - - return rows, nil -} - -func rowSelector(columns []schema.TableColumn, pkColumn string) sq.SelectBuilder { - columnStrs := make([]string, len(columns)) - for idx, column := range columns { - columnStrs[idx] = column.Name - } - - return sq.Select(fmt.Sprintf("%s, %s", quoteField(pkColumn), strings.Join(columnStrs, ","))) -} diff --git a/config.go b/config.go index 49061cd5b..2e4dd8c7c 100644 --- a/config.go +++ b/config.go @@ -15,7 +15,6 @@ import ( const ( VerifierTypeChecksumTable = "ChecksumTable" - VerifierTypeIterative = "Iterative" VerifierTypeInline = "Inline" VerifierTypeNoVerification = "NoVerification" ) @@ -179,58 +178,6 @@ func (c *InlineVerifierConfig) Validate() error { return nil } -type IterativeVerifierConfig struct { - // List of tables that should be ignored by the IterativeVerifier. - IgnoredTables []string - - // List of columns that should be ignored by the IterativeVerifier. - // This is in the format of table_name -> [list of column names] - IgnoredColumns map[string][]string - - // The number of concurrent verifiers. Note that a single table can only be - // assigned to one goroutine and currently multiple goroutines per table - // is not supported. - Concurrency int - - // The maximum expected downtime during cutover, in the format of - // time.ParseDuration. - MaxExpectedDowntime string - - // Map of the table and column identifying the compression type - // (if any) of the column. This is used during verification to ensure - // the data was successfully copied as some compression algorithms can - // output different compressed data with the same input data. - // - // The data structure is a map of table names to a map of column names - // to the compression algorithm. - // ex: {books: {contents: snappy}} - // - // Currently supported compression algorithms are: - // 1. Snappy (https://google.github.io/snappy/) as "SNAPPY" - // - // Optional: defaults to empty map/no compression - // - // Note that the IterativeVerifier is in the process of being deprecated. - // If this is specified, ColumnCompressionConfig should also be filled out in - // the main Config. - TableColumnCompression TableColumnCompressionConfig -} - -func (c *IterativeVerifierConfig) Validate() error { - if c.MaxExpectedDowntime != "" { - _, err := time.ParseDuration(c.MaxExpectedDowntime) - if err != nil { - return err - } - } - - if c.Concurrency == 0 { - c.Concurrency = 4 - } - - return nil -} - // SchemaName => TableName => ColumnName => CompressionAlgorithm // Example: blog1 => articles => body => snappy // (SELECT body FROM blog1.articles => returns compressed blob) @@ -379,19 +326,12 @@ type Config struct { // The verifier to use during the run. Valid choices are: // ChecksumTable - // Iterative // NoVerification // // If it is left blank, the Verifier member variable on the Ferry will be // used. If that member variable is nil, no verification will be done. VerifierType string - // Only useful if VerifierType == Iterative. - // This specifies the configurations to the IterativeVerifier. - // - // This option is in the process of being deprecated. - IterativeVerifierConfig IterativeVerifierConfig - // Only useful if VerifierType == Inline. // This specifies the configurations to the InlineVerifierConfig. InlineVerifierConfig InlineVerifierConfig @@ -415,10 +355,6 @@ type Config struct { // uncompressed data is equal. // - This column signals to the InlineVerifier that it needs to decompress // the data to compare identity. - // - // Note: a similar option exists in IterativeVerifier. However, the - // IterativeVerifier is being deprecated and this will be the correct place - // to specify it if you don't need the IterativeVerifier. CompressedColumnsForVerification ColumnCompressionConfig // This config is also for inline verification for the same special case of @@ -452,11 +388,7 @@ func (c *Config) ValidateConfig() error { return fmt.Errorf("StateToResumeFrom version mismatch: resume = %s, current = %s", c.StateToResumeFrom.GhostferryVersion, VersionString) } - if c.VerifierType == VerifierTypeIterative { - if err := c.IterativeVerifierConfig.Validate(); err != nil { - return fmt.Errorf("IterativeVerifierConfig invalid: %v", err) - } - } else if c.VerifierType == VerifierTypeInline { + if c.VerifierType == VerifierTypeInline { if err := c.InlineVerifierConfig.Validate(); err != nil { return fmt.Errorf("InlineVerifierConfig invalid: %v", err) } diff --git a/ferry.go b/ferry.go index 8df629130..6755c7d6b 100644 --- a/ferry.go +++ b/ferry.go @@ -71,8 +71,7 @@ type Ferry struct { // returned in Initialize. // // If VerifierType is specified and this is nil on Ferry initialization, a - // Verifier will be created by Initialize. If an IterativeVerifier is to be - // created, IterativeVerifierConfig will be used to create the verifier. + // Verifier will be created by Initialize. Verifier Verifier inlineVerifier *InlineVerifier @@ -226,65 +225,6 @@ func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier { return v } -func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error) { - f.ensureInitialized() - - var err error - config := f.Config.IterativeVerifierConfig - - var maxExpectedDowntime time.Duration - if config.MaxExpectedDowntime != "" { - maxExpectedDowntime, err = time.ParseDuration(config.MaxExpectedDowntime) - if err != nil { - return nil, fmt.Errorf("invalid MaxExpectedDowntime: %v. this error should have been caught via .Validate()", err) - } - } - - var compressionVerifier *CompressionVerifier - if config.TableColumnCompression != nil { - compressionVerifier, err = NewCompressionVerifier(config.TableColumnCompression) - if err != nil { - return nil, err - } - } - - ignoredColumns := make(map[string]map[string]struct{}) - for table, columns := range config.IgnoredColumns { - ignoredColumns[table] = make(map[string]struct{}) - for _, column := range columns { - ignoredColumns[table][column] = struct{}{} - } - } - - v := &IterativeVerifier{ - CursorConfig: &CursorConfig{ - DB: f.SourceDB, - BatchSize: f.Config.DataIterationBatchSize, - ReadRetries: f.Config.DBReadRetries, - }, - - BinlogStreamer: f.BinlogStreamer, - SourceDB: f.SourceDB, - TargetDB: f.TargetDB, - CompressionVerifier: compressionVerifier, - - Tables: f.Tables.AsSlice(), - TableSchemaCache: f.Tables, - IgnoredTables: config.IgnoredTables, - IgnoredColumns: ignoredColumns, - DatabaseRewrites: f.Config.DatabaseRewrites, - TableRewrites: f.Config.TableRewrites, - Concurrency: config.Concurrency, - MaxExpectedDowntime: maxExpectedDowntime, - } - - if f.CopyFilter != nil { - v.CursorConfig.BuildSelect = f.CopyFilter.BuildSelect - } - - return v, v.Initialize() -} - // Initialize all the components of Ghostferry and connect to the Database func (f *Ferry) Initialize() (err error) { f.StartTime = time.Now().Truncate(time.Second) @@ -431,8 +371,6 @@ func (f *Ferry) Initialize() (err error) { f.Tables = f.StateToResumeFrom.LastKnownTableSchemaCache } - // The iterative verifier needs the binlog streamer so this has to be first. - // Eventually this can be moved below the verifier initialization. f.BinlogStreamer = f.NewBinlogStreamer() f.BinlogWriter = f.NewBinlogWriter() f.DataIterator = f.NewDataIterator() @@ -444,11 +382,6 @@ func (f *Ferry) Initialize() (err error) { } switch f.Config.VerifierType { - case VerifierTypeIterative: - f.Verifier, err = f.NewIterativeVerifier() - if err != nil { - return err - } case VerifierTypeChecksumTable: f.Verifier = f.NewChecksumTableVerifier() case VerifierTypeInline: diff --git a/inline_verifier.go b/inline_verifier.go index 49d7bde6a..d4545eab2 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -14,10 +14,25 @@ import ( "github.com/sirupsen/logrus" ) -// This struct is very similar to ReverifyStore, but it is more optimized -// for serialization into JSON. -// -// TODO: remove IterativeVerifier and remove this comment. +const ( + // CompressionSnappy is used to identify Snappy (https://google.github.io/snappy/) compressed column data + CompressionSnappy = "SNAPPY" +) + +// UnsupportedCompressionError is used to identify errors resulting +// from attempting to decompress unsupported algorithms +type UnsupportedCompressionError struct { + table string + column string + algorithm string +} + +func (e UnsupportedCompressionError) Error() string { + return "Compression algorithm: " + e.algorithm + + " not supported on table: " + e.table + + " for column: " + e.column +} + type BinlogVerifyStore struct { EmitLogPerRowsAdded uint64 diff --git a/iterative_verifier.go b/iterative_verifier.go deleted file mode 100644 index c8c3b07e1..000000000 --- a/iterative_verifier.go +++ /dev/null @@ -1,700 +0,0 @@ -package ghostferry - -import ( - "bytes" - "database/sql" - "errors" - "fmt" - "math" - "strconv" - "strings" - "sync" - "time" - - sq "github.com/Masterminds/squirrel" - "github.com/siddontang/go-mysql/schema" - "github.com/sirupsen/logrus" -) - -type ReverifyBatch struct { - Pks []uint64 - Table TableIdentifier -} - -type ReverifyEntry struct { - Pk uint64 - Table *TableSchema -} - -type ReverifyStore struct { - MapStore map[TableIdentifier]map[uint64]struct{} - mapStoreMutex *sync.Mutex - BatchStore []ReverifyBatch - RowCount uint64 - EmitLogPerRowCount uint64 -} - -func NewReverifyStore() *ReverifyStore { - r := &ReverifyStore{ - mapStoreMutex: &sync.Mutex{}, - RowCount: uint64(0), - EmitLogPerRowCount: uint64(10000), - } - - r.flushStore() - return r -} - -func (r *ReverifyStore) Add(entry ReverifyEntry) { - r.mapStoreMutex.Lock() - defer r.mapStoreMutex.Unlock() - - tableId := NewTableIdentifierFromSchemaTable(entry.Table) - if _, exists := r.MapStore[tableId]; !exists { - r.MapStore[tableId] = make(map[uint64]struct{}) - } - - if _, exists := r.MapStore[tableId][entry.Pk]; !exists { - r.MapStore[tableId][entry.Pk] = struct{}{} - r.RowCount++ - if r.RowCount%r.EmitLogPerRowCount == 0 { - metrics.Gauge("iterative_verifier_store_rows", float64(r.RowCount), []MetricTag{}, 1.0) - logrus.WithFields(logrus.Fields{ - "tag": "reverify_store", - "rows": r.RowCount, - }).Debug("added rows will be reverified") - } - } -} - -func (r *ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch { - r.mapStoreMutex.Lock() - defer r.mapStoreMutex.Unlock() - - r.BatchStore = make([]ReverifyBatch, 0) - for tableId, pkSet := range r.MapStore { - pkBatch := make([]uint64, 0, batchsize) - for pk, _ := range pkSet { - pkBatch = append(pkBatch, pk) - delete(pkSet, pk) - if len(pkBatch) >= batchsize { - r.BatchStore = append(r.BatchStore, ReverifyBatch{ - Pks: pkBatch, - Table: tableId, - }) - pkBatch = make([]uint64, 0, batchsize) - } - } - - if len(pkBatch) > 0 { - r.BatchStore = append(r.BatchStore, ReverifyBatch{ - Pks: pkBatch, - Table: tableId, - }) - } - - delete(r.MapStore, tableId) - } - - r.flushStore() - return r.BatchStore -} - -func (r *ReverifyStore) flushStore() { - r.MapStore = make(map[TableIdentifier]map[uint64]struct{}) - r.RowCount = 0 -} - -type verificationResultAndError struct { - Result VerificationResult - Error error -} - -func (r verificationResultAndError) ErroredOrFailed() bool { - return r.Error != nil || !r.Result.DataCorrect -} - -type IterativeVerifier struct { - CompressionVerifier *CompressionVerifier - CursorConfig *CursorConfig - BinlogStreamer *BinlogStreamer - TableSchemaCache TableSchemaCache - SourceDB *sql.DB - TargetDB *sql.DB - - Tables []*TableSchema - IgnoredTables []string - IgnoredColumns map[string]map[string]struct{} - DatabaseRewrites map[string]string - TableRewrites map[string]string - Concurrency int - MaxExpectedDowntime time.Duration - - reverifyStore *ReverifyStore - logger *logrus.Entry - - beforeCutoverVerifyDone bool - verifyDuringCutoverStarted AtomicBoolean - - // Variables for verification in the background - verificationResultAndStatus VerificationResultAndStatus - verificationErr error - backgroundVerificationWg *sync.WaitGroup - backgroundStartTime time.Time - backgroundDoneTime time.Time -} - -func (v *IterativeVerifier) SanityCheckParameters() error { - if v.CursorConfig == nil { - return errors.New("CursorConfig must not be nil") - } - - if v.BinlogStreamer == nil { - return errors.New("BinlogStreamer must not be nil") - } - - if v.SourceDB == nil { - return errors.New("SourceDB must not be nil") - } - - if v.TargetDB == nil { - return errors.New("TargetDB must not be nil") - } - - if v.Concurrency <= 0 { - return fmt.Errorf("iterative verifier concurrency must be greater than 0, not %d", v.Concurrency) - } - - return nil -} - -func (v *IterativeVerifier) Initialize() error { - v.logger = logrus.WithField("tag", "iterative_verifier") - - if err := v.SanityCheckParameters(); err != nil { - v.logger.WithError(err).Error("iterative verifier parameter sanity check failed") - return err - } - - v.reverifyStore = NewReverifyStore() - return nil -} - -func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error) { - v.logger.Info("starting one-off verification of all tables") - - err := v.iterateAllTables(func(pk uint64, tableSchema *TableSchema) error { - return VerificationResult{ - DataCorrect: false, - Message: fmt.Sprintf("verification failed on table: %s for pk: %d", tableSchema.String(), pk), - IncorrectTables: []string{tableSchema.String()}, - } - }) - - v.logger.Info("one-off verification complete") - - switch e := err.(type) { - case VerificationResult: - return e, nil - default: - return NewCorrectVerificationResult(), e - } -} - -func (v *IterativeVerifier) VerifyBeforeCutover() error { - if v.TableSchemaCache == nil { - return fmt.Errorf("iterative verifier must be given the table schema cache before starting verify before cutover") - } - - v.logger.Info("starting pre-cutover verification") - - v.logger.Debug("attaching binlog event listener") - v.BinlogStreamer.AddEventListener(v.binlogEventListener) - - v.logger.Debug("verifying all tables") - err := v.iterateAllTables(func(pk uint64, tableSchema *TableSchema) error { - v.reverifyStore.Add(ReverifyEntry{Pk: pk, Table: tableSchema}) - return nil - }) - - if err == nil { - // This reverification phase is to reduce the size of the set of rows - // that need to be reverified during cutover. Failures during - // reverification at this point could have been caused by still - // ongoing writes and we therefore just re-add those rows to the - // store rather than failing the move prematurely. - err = v.reverifyUntilStoreIsSmallEnough(30) - } - - v.logger.Info("pre-cutover verification complete") - v.beforeCutoverVerifyDone = true - - return err -} - -func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error) { - v.logger.Info("starting verification during cutover") - v.verifyDuringCutoverStarted.Set(true) - result, err := v.verifyStore("iterative_verifier_during_cutover", []MetricTag{}) - v.logger.Info("cutover verification complete") - - return result, err -} - -func (v *IterativeVerifier) StartInBackground() error { - if v.logger == nil { - return errors.New("Initialize() must be called before this") - } - - if !v.beforeCutoverVerifyDone { - return errors.New("VerifyBeforeCutover() must be called before this") - } - - if v.verifyDuringCutoverStarted.Get() { - return errors.New("verification during cutover has already been started") - } - - v.verificationResultAndStatus = VerificationResultAndStatus{ - StartTime: time.Now(), - DoneTime: time.Time{}, - } - v.verificationErr = nil - v.backgroundVerificationWg = &sync.WaitGroup{} - - v.logger.Info("starting iterative verification in the background") - - v.backgroundVerificationWg.Add(1) - go func() { - defer func() { - v.backgroundDoneTime = time.Now() - v.backgroundVerificationWg.Done() - }() - - v.verificationResultAndStatus.VerificationResult, v.verificationErr = v.VerifyDuringCutover() - v.verificationResultAndStatus.DoneTime = time.Now() - }() - - return nil -} - -func (v *IterativeVerifier) Wait() { - v.backgroundVerificationWg.Wait() -} - -func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error) { - return v.verificationResultAndStatus, v.verificationErr -} - -func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error) { - sql, args, err := GetMd5HashesSql(schema, table, pkColumn, columns, pks) - if err != nil { - return nil, err - } - - // This query must be a prepared query. If it is not, querying will use - // MySQL's plain text interface, which will scan all values into []uint8 - // if we give it []interface{}. - stmt, err := db.Prepare(sql) - if err != nil { - return nil, err - } - - defer stmt.Close() - - rows, err := stmt.Query(args...) - if err != nil { - return nil, err - } - - defer rows.Close() - - resultSet := make(map[uint64][]byte) - for rows.Next() { - rowData, err := ScanGenericRow(rows, 2) - if err != nil { - return nil, err - } - - pk, err := rowData.GetUint64(0) - if err != nil { - return nil, err - } - - resultSet[pk] = rowData[1].([]byte) - } - return resultSet, nil -} - -func (v *IterativeVerifier) reverifyUntilStoreIsSmallEnough(maxIterations int) error { - var timeToVerify time.Duration - - for iteration := 0; iteration < maxIterations; iteration++ { - before := v.reverifyStore.RowCount - start := time.Now() - - _, err := v.verifyStore("reverification_before_cutover", []MetricTag{{"iteration", string(iteration)}}) - if err != nil { - return err - } - - after := v.reverifyStore.RowCount - timeToVerify = time.Now().Sub(start) - - v.logger.WithFields(logrus.Fields{ - "store_size_before": before, - "store_size_after": after, - "iteration": iteration, - }).Infof("completed re-verification iteration %d", iteration) - - if after <= 1000 || after >= before { - break - } - } - - if v.MaxExpectedDowntime != 0 && timeToVerify > v.MaxExpectedDowntime { - return fmt.Errorf("cutover stage verification will not complete within max downtime duration (took %s)", timeToVerify) - } - - return nil -} - -func (v *IterativeVerifier) iterateAllTables(mismatchedPkFunc func(uint64, *TableSchema) error) error { - pool := &WorkerPool{ - Concurrency: v.Concurrency, - Process: func(tableIndex int) (interface{}, error) { - table := v.Tables[tableIndex] - - if v.tableIsIgnored(table) { - return nil, nil - } - - err := v.iterateTableFingerprints(table, mismatchedPkFunc) - if err != nil { - v.logger.WithError(err).WithField("table", table.String()).Error("error occured during table verification") - } - return nil, err - }, - } - - _, err := pool.Run(len(v.Tables)) - - return err -} - -func (v *IterativeVerifier) iterateTableFingerprints(table *TableSchema, mismatchedPkFunc func(uint64, *TableSchema) error) error { - // The cursor will stop iterating when it cannot find anymore rows, - // so it will not iterate until MaxUint64. - cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64) - - // It only needs the PKs, not the entire row. - cursor.ColumnsToSelect = []string{fmt.Sprintf("`%s`", table.GetPKColumn(0).Name)} - return cursor.Each(func(batch *RowBatch) error { - metrics.Count("RowEvent", int64(batch.Size()), []MetricTag{ - MetricTag{"table", table.Name}, - MetricTag{"source", "iterative_verifier_before_cutover"}, - }, 1.0) - - pks := make([]uint64, 0, batch.Size()) - - for _, rowData := range batch.Values() { - pk, err := rowData.GetUint64(batch.PkIndex()) - if err != nil { - return err - } - - pks = append(pks, pk) - } - - mismatchedPks, err := v.compareFingerprints(pks, batch.TableSchema()) - if err != nil { - v.logger.WithError(err).Errorf("failed to fingerprint table %s", batch.TableSchema().String()) - return err - } - - if len(mismatchedPks) > 0 { - v.logger.WithFields(logrus.Fields{ - "table": batch.TableSchema().String(), - "mismatched_pks": mismatchedPks, - }).Info("found mismatched rows") - - for _, pk := range mismatchedPks { - err := mismatchedPkFunc(pk, batch.TableSchema()) - if err != nil { - return err - } - } - } - - return nil - }) -} - -func (v *IterativeVerifier) verifyStore(sourceTag string, additionalTags []MetricTag) (VerificationResult, error) { - allBatches := v.reverifyStore.FlushAndBatchByTable(int(v.CursorConfig.BatchSize)) - v.logger.WithField("batches", len(allBatches)).Debug("reverifying") - - if len(allBatches) == 0 { - return NewCorrectVerificationResult(), nil - } - - erroredOrFailed := errors.New("verification of store errored or failed") - - pool := &WorkerPool{ - Concurrency: v.Concurrency, - Process: func(reverifyBatchIndex int) (interface{}, error) { - reverifyBatch := allBatches[reverifyBatchIndex] - table := v.TableSchemaCache.Get(reverifyBatch.Table.SchemaName, reverifyBatch.Table.TableName) - - tags := append([]MetricTag{ - MetricTag{"table", table.Name}, - MetricTag{"source", sourceTag}, - }, additionalTags...) - - metrics.Count("RowEvent", int64(len(reverifyBatch.Pks)), tags, 1.0) - - v.logger.WithFields(logrus.Fields{ - "table": table.String(), - "len(pks)": len(reverifyBatch.Pks), - }).Debug("received pk batch to reverify") - - verificationResult, mismatchedPks, err := v.reverifyPks(table, reverifyBatch.Pks) - resultAndErr := verificationResultAndError{verificationResult, err} - - // If we haven't entered the cutover phase yet, then reverification failures - // could have been caused by ongoing writes. We will just re-add the rows for - // the cutover verification and ignore the failure at this point here. - if err == nil && !v.beforeCutoverVerifyDone { - for _, pk := range mismatchedPks { - v.reverifyStore.Add(ReverifyEntry{Pk: pk, Table: table}) - } - - resultAndErr.Result = NewCorrectVerificationResult() - } - - if resultAndErr.ErroredOrFailed() { - if resultAndErr.Error != nil { - v.logger.WithError(resultAndErr.Error).Error("error occured in reverification") - } else { - v.logger.Errorf("failed reverification: %s", resultAndErr.Result.Message) - } - - return resultAndErr, erroredOrFailed - } - - return resultAndErr, nil - }, - } - - results, _ := pool.Run(len(allBatches)) - - var result VerificationResult - var err error - for i := 0; i < v.Concurrency; i++ { - if results[i] == nil { - // This means the worker pool exited early and another goroutine - // must have returned an error. - continue - } - - resultAndErr := results[i].(verificationResultAndError) - result = resultAndErr.Result - err = resultAndErr.Error - - if resultAndErr.ErroredOrFailed() { - break - } - } - - return result, err -} - -func (v *IterativeVerifier) reverifyPks(table *TableSchema, pks []uint64) (VerificationResult, []uint64, error) { - mismatchedPks, err := v.compareFingerprints(pks, table) - if err != nil { - return VerificationResult{}, mismatchedPks, err - } - - if len(mismatchedPks) == 0 { - return NewCorrectVerificationResult(), mismatchedPks, nil - } - - pkStrings := make([]string, len(mismatchedPks)) - for idx, pk := range mismatchedPks { - pkStrings[idx] = strconv.FormatUint(pk, 10) - } - - return VerificationResult{ - DataCorrect: false, - Message: fmt.Sprintf("verification failed on table: %s for pks: %s", table.String(), strings.Join(pkStrings, ",")), - IncorrectTables: []string{table.String()}, - }, mismatchedPks, nil -} - -func (v *IterativeVerifier) binlogEventListener(evs []DMLEvent) error { - if v.verifyDuringCutoverStarted.Get() { - return fmt.Errorf("cutover has started but received binlog event!") - } - - for _, ev := range evs { - if v.tableIsIgnored(ev.TableSchema()) { - continue - } - - pk, err := ev.PK() - if err != nil { - return err - } - - v.reverifyStore.Add(ReverifyEntry{Pk: pk, Table: ev.TableSchema()}) - } - - return nil -} - -func (v *IterativeVerifier) tableIsIgnored(table *TableSchema) bool { - for _, ignored := range v.IgnoredTables { - if table.Name == ignored { - return true - } - } - - return false -} - -func (v *IterativeVerifier) columnsToVerify(table *TableSchema) []schema.TableColumn { - ignoredColsSet, containsIgnoredColumns := v.IgnoredColumns[table.Name] - if !containsIgnoredColumns { - return table.Columns - } - - var columns []schema.TableColumn - for _, column := range table.Columns { - if _, isIgnored := ignoredColsSet[column.Name]; !isIgnored { - columns = append(columns, column) - } - } - - return columns -} - -func (v *IterativeVerifier) compareFingerprints(pks []uint64, table *TableSchema) ([]uint64, error) { - targetDb := table.Schema - if targetDbName, exists := v.DatabaseRewrites[targetDb]; exists { - targetDb = targetDbName - } - - targetTable := table.Name - if targetTableName, exists := v.TableRewrites[targetTable]; exists { - targetTable = targetTableName - } - - wg := &sync.WaitGroup{} - wg.Add(2) - - var sourceHashes map[uint64][]byte - var sourceErr error - go func() { - defer wg.Done() - sourceErr = WithRetries(5, 0, v.logger, "get fingerprints from source db", func() (err error) { - sourceHashes, err = v.GetHashes(v.SourceDB, table.Schema, table.Name, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - return - }) - }() - - var targetHashes map[uint64][]byte - var targetErr error - go func() { - defer wg.Done() - targetErr = WithRetries(5, 0, v.logger, "get fingerprints from target db", func() (err error) { - targetHashes, err = v.GetHashes(v.TargetDB, targetDb, targetTable, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - return - }) - }() - - wg.Wait() - if sourceErr != nil { - return nil, sourceErr - } - if targetErr != nil { - return nil, targetErr - } - - mismatches := compareHashes(sourceHashes, targetHashes) - if len(mismatches) > 0 && v.CompressionVerifier != nil && v.CompressionVerifier.IsCompressedTable(table.Name) { - return v.compareCompressedHashes(targetDb, targetTable, table, pks) - } - - return mismatches, nil -} - -func (v *IterativeVerifier) compareCompressedHashes(targetDb, targetTable string, table *TableSchema, pks []uint64) ([]uint64, error) { - sourceHashes, err := v.CompressionVerifier.GetCompressedHashes(v.SourceDB, table.Schema, table.Name, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - if err != nil { - return nil, err - } - - targetHashes, err := v.CompressionVerifier.GetCompressedHashes(v.TargetDB, targetDb, targetTable, table.GetPKColumn(0).Name, v.columnsToVerify(table), pks) - if err != nil { - return nil, err - } - - return compareHashes(sourceHashes, targetHashes), nil -} - -func compareHashes(source, target map[uint64][]byte) []uint64 { - mismatchSet := map[uint64]struct{}{} - - for pk, targetHash := range target { - sourceHash, exists := source[pk] - if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[pk] = struct{}{} - } - } - - for pk, sourceHash := range source { - targetHash, exists := target[pk] - if !bytes.Equal(sourceHash, targetHash) || !exists { - mismatchSet[pk] = struct{}{} - } - } - - mismatches := make([]uint64, 0, len(mismatchSet)) - for mismatch, _ := range mismatchSet { - mismatches = append(mismatches, mismatch) - } - - return mismatches -} - -func GetMd5HashesSql(schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (string, []interface{}, error) { - quotedPK := quoteField(pkColumn) - return rowMd5Selector(columns, pkColumn). - From(QuotedTableNameFromString(schema, table)). - Where(sq.Eq{quotedPK: pks}). - OrderBy(quotedPK). - ToSql() -} - -func rowMd5Selector(columns []schema.TableColumn, pkColumn string) sq.SelectBuilder { - quotedPK := quoteField(pkColumn) - - hashStrs := make([]string, len(columns)) - for idx, column := range columns { - quotedCol := normalizeAndQuoteColumn(column) - hashStrs[idx] = fmt.Sprintf("MD5(COALESCE(%s, 'NULL'))", quotedCol) - } - - return sq.Select(fmt.Sprintf( - "%s, MD5(CONCAT(%s)) AS row_fingerprint", - quotedPK, - strings.Join(hashStrs, ","), - )) -} - -func normalizeAndQuoteColumn(column schema.TableColumn) (quoted string) { - quoted = quoteField(column.Name) - if column.Type == schema.TYPE_FLOAT { - quoted = fmt.Sprintf("(if (%s = '-0', 0, %s))", quoted, quoted) - } - return -} diff --git a/table_schema_cache.go b/table_schema_cache.go index 0a557cd02..4a3986e6b 100644 --- a/table_schema_cache.go +++ b/table_schema_cache.go @@ -306,3 +306,11 @@ func maxPk(db *sql.DB, table *TableSchema) (uint64, bool, error) { return maxPrimaryKey, true, nil } } + +func normalizeAndQuoteColumn(column schema.TableColumn) (quoted string) { + quoted = quoteField(column.Name) + if column.Type == schema.TYPE_FLOAT { + quoted = fmt.Sprintf("(if (%s = '-0', 0, %s))", quoted, quoted) + } + return +} diff --git a/test/go/iterative_verifier_collation_test.go b/test/go/iterative_verifier_collation_test.go deleted file mode 100644 index 4a82b4a1f..000000000 --- a/test/go/iterative_verifier_collation_test.go +++ /dev/null @@ -1,118 +0,0 @@ -package test - -import ( - "database/sql" - "fmt" - "testing" - - "github.com/Shopify/ghostferry/testhelpers" - "github.com/stretchr/testify/suite" -) - -type IterativeVerifierCollationTestSuite struct { - *IterativeVerifierTestSuite - - unsafeDb *sql.DB - asciiData string - utf8mb3Data string - utf8mb4Data string -} - -func (t *IterativeVerifierCollationTestSuite) SetupTest() { - t.IterativeVerifierTestSuite.SetupTest() - - unsafeDbConfig := t.Ferry.Source - t.Require().Equal("'STRICT_ALL_TABLES,NO_BACKSLASH_ESCAPES'", unsafeDbConfig.Params["sql_mode"]) - unsafeDbConfig.Params["sql_mode"] = "'NO_BACKSLASH_ESCAPES'" - - unsafeConfig, err := t.Ferry.Source.MySQLConfig() - t.Require().Nil(err) - - unsafeDSN := unsafeConfig.FormatDSN() - - t.unsafeDb, err = sql.Open("mysql", unsafeDSN) - t.Require().Nil(err) - - t.asciiData = "foobar" - t.utf8mb3Data = "これは普通なストリングです" - t.utf8mb4Data = "𠜎𠜱𠝹𠱓𠱸𠲖𠳏𠳕𠴕𠵼𠵿𠸎𠸏𠹷" -} - -func (t *IterativeVerifierCollationTestSuite) TearDownTest() { - t.IterativeVerifierTestSuite.TearDownTest() -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfAsciiValueDoesNotChangeFromUtf8Mb3ToUtf8Mb4() { - t.AssertIdentical(t.asciiData, "utf8mb3", "utf8mb4") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfAsciiValueDoesNotChangeFromUtf8Mb4ToUtf8Mb3() { - t.AssertIdentical(t.asciiData, "utf8mb4", "utf8mb3") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfUtf8Mb3ValueDoesNotChangeFromUtf8Mb3ToUtf8Mb4() { - t.AssertIdentical(t.utf8mb3Data, "utf8mb3", "utf8mb4") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfUtf8Mb3ValueDoesNotChangeFromUtf8Mb4ToUtf8Mb3() { - t.AssertIdentical(t.utf8mb3Data, "utf8mb4", "utf8mb3") -} - -func (t *IterativeVerifierCollationTestSuite) TestFingerprintOfUtf8Mb4ValueDoesChangeFromUtf8Mb4ToUtf8Mb3() { - t.AssertDifferent(t.utf8mb4Data, "utf8mb4", "utf8mb3") -} - -func (t *IterativeVerifierCollationTestSuite) AssertIdentical(data, from, to string) { - fingerprints := t.GetHashesFromDifferentCollations(data, from, to) - t.Require().Equal(fingerprints[0], fingerprints[1]) -} - -func (t *IterativeVerifierCollationTestSuite) AssertDifferent(data, from, to string) { - fingerprints := t.GetHashesFromDifferentCollations(data, from, to) - t.Require().NotEqual(fingerprints[0], fingerprints[1]) -} - -func (t *IterativeVerifierCollationTestSuite) GetHashesFromDifferentCollations(data, from, to string) []string { - var fingerprints []string - - t.SetDataColumnCollation(from) - t.InsertRow(42, data) - fingerprints = append(fingerprints, t.GetHashes([]uint64{42})[0]) - - t.SetDataColumnCollation(to) - fingerprints = append(fingerprints, t.GetHashes([]uint64{42})[0]) - - for _, fingerprint := range fingerprints { - t.Require().True(fingerprint != "") - } - - return fingerprints -} - -func (t *IterativeVerifierCollationTestSuite) SetDataColumnCollation(charset string) { - var collation string - if charset == "utf8mb4" { - collation = "utf8mb4_unicode_ci" - } else if charset == "utf8mb3" { - collation = "utf8_unicode_ci" - } - t.Require().True(collation != "") - - _, err := t.unsafeDb.Exec(fmt.Sprintf( - "ALTER TABLE %s.%s MODIFY data VARCHAR(255) CHARACTER SET %s COLLATE %s", - testhelpers.TestSchemaName, - testhelpers.TestTable1Name, - charset, - collation, - )) - t.Require().Nil(err) -} - -func TestIterativeVerifierCollationTestSuite(t *testing.T) { - testhelpers.SetupTest() - suite.Run(t, &IterativeVerifierCollationTestSuite{ - IterativeVerifierTestSuite: &IterativeVerifierTestSuite{ - GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}, - }, - }) -} diff --git a/test/go/iterative_verifier_integration_test.go b/test/go/iterative_verifier_integration_test.go deleted file mode 100644 index 15850abea..000000000 --- a/test/go/iterative_verifier_integration_test.go +++ /dev/null @@ -1,293 +0,0 @@ -package test - -import ( - "database/sql" - "testing" - - "github.com/Shopify/ghostferry" - "github.com/Shopify/ghostferry/testhelpers" - "github.com/siddontang/go-mysql/schema" - "github.com/stretchr/testify/assert" -) - -func TestHashesSql(t *testing.T) { - columns := []schema.TableColumn{schema.TableColumn{Name: "id"}, schema.TableColumn{Name: "data"}, schema.TableColumn{Name: "float_col", Type: schema.TYPE_FLOAT}} - pks := []uint64{1, 5, 42} - - sql, args, err := ghostferry.GetMd5HashesSql("gftest", "test_table", "id", columns, pks) - - assert.Nil(t, err) - assert.Equal(t, "SELECT `id`, MD5(CONCAT(MD5(COALESCE(`id`, 'NULL')),MD5(COALESCE(`data`, 'NULL')),MD5(COALESCE((if (`float_col` = '-0', 0, `float_col`)), 'NULL')))) "+ - "AS row_fingerprint FROM `gftest`.`test_table` WHERE `id` IN (?,?,?) ORDER BY `id`", sql) - for idx, arg := range args { - assert.Equal(t, pks[idx], arg.(uint64)) - } -} - -func TestVerificationFailsDeletedRow(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: setupSingleTableDatabase, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - deleteTestRowsToTriggerFailure(ferry) - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.False(t, result.DataCorrect) - assert.Regexp(t, "verification failed.*gftest.table1.*pks: (43)|(42)|(43,42)|(42,43)", result.Message) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestVerificationFailsUpdatedRow(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: setupSingleTableDatabase, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - modifyDataColumnInSourceDB(ferry) - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.False(t, result.DataCorrect) - assert.Regexp(t, "verification failed.*gftest.table1.*pks: (42)|(43)|(43,42)|(42,43)", result.Message) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestIgnoresColumns(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupSingleTableDatabase(ferry, sourceDB, targetDB) - iterativeVerifier.IgnoredColumns = map[string]map[string]struct{}{"table1": {"data": struct{}{}}} - }, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - modifyDataColumnInSourceDB(ferry) - - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.True(t, result.DataCorrect) - assert.Equal(t, "", result.Message) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestIgnoresTables(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupSingleTableDatabase(ferry, sourceDB, targetDB) - iterativeVerifier.IgnoredTables = []string{"table1"} - }, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - BeforeStoppingBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - ensureTestRowsAreReverified(ferry) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - modifyAllRows(ferry) - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.True(t, result.DataCorrect) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - DisableChecksumVerifier: true, - } - - testcase.Run() - assert.True(t, ran) -} - -func TestVerificationPasses(t *testing.T) { - ferry := testhelpers.NewTestFerry() - iterativeVerifier := &ghostferry.IterativeVerifier{} - ran := false - - testcase := &testhelpers.IntegrationTestCase{ - T: t, - SetupAction: setupSingleTableDatabase, - AfterRowCopyIsComplete: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - setupIterativeVerifierFromFerry(iterativeVerifier, ferry.Ferry) - - err := iterativeVerifier.Initialize() - testhelpers.PanicIfError(err) - - err = iterativeVerifier.VerifyBeforeCutover() - testhelpers.PanicIfError(err) - }, - AfterStoppedBinlogStreaming: func(ferry *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { - result, err := iterativeVerifier.VerifyDuringCutover() - assert.Nil(t, err) - assert.True(t, result.DataCorrect) - ran = true - }, - DataWriter: &testhelpers.MixedActionDataWriter{ - ProbabilityOfInsert: 1.0 / 3.0, - ProbabilityOfUpdate: 1.0 / 3.0, - ProbabilityOfDelete: 1.0 / 3.0, - NumberOfWriters: 4, - Tables: []string{"gftest.table1"}, - }, - Ferry: ferry, - } - - testcase.Run() - assert.True(t, ran) -} - -func setupIterativeVerifierFromFerry(v *ghostferry.IterativeVerifier, f *ghostferry.Ferry) { - v.CursorConfig = &ghostferry.CursorConfig{ - DB: f.SourceDB, - BatchSize: f.Config.DataIterationBatchSize, - ReadRetries: f.Config.DBReadRetries, - } - - v.BinlogStreamer = f.BinlogStreamer - v.SourceDB = f.SourceDB - v.TargetDB = f.TargetDB - v.Tables = f.Tables.AsSlice() - v.TableSchemaCache = f.Tables - v.Concurrency = 2 -} - -func ensureTestRowsAreReverified(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.SourceDB.Exec("INSERT IGNORE INTO gftest.table1 VALUES (42, \"OK\")") - testhelpers.PanicIfError(err) - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"OK\" WHERE id = \"42\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.SourceDB.Exec("INSERT IGNORE INTO gftest.table1 VALUES (43, \"OK\")") - testhelpers.PanicIfError(err) - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"OK\" WHERE id = \"43\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.SourceDB.Exec("INSERT IGNORE INTO gftest.table1 VALUES (44, \"OK\")") - testhelpers.PanicIfError(err) - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"OK\" WHERE id = \"44\"") - testhelpers.PanicIfError(err) -} - -func modifyDataColumnInSourceDB(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"FAIL\" WHERE id = \"42\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.SourceDB.Exec("UPDATE gftest.table1 SET data=\"FAIL\" WHERE id = \"43\"") - testhelpers.PanicIfError(err) -} - -func modifyAllRows(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.TargetDB.Exec("UPDATE gftest.table1 SET data=\"FAIL\"") - testhelpers.PanicIfError(err) -} - -func deleteTestRowsToTriggerFailure(ferry *testhelpers.TestFerry) { - _, err := ferry.Ferry.TargetDB.Exec("DELETE FROM gftest.table1 WHERE id = \"42\"") - testhelpers.PanicIfError(err) - - _, err = ferry.Ferry.TargetDB.Exec("DELETE FROM gftest.table1 WHERE id = \"43\"") - testhelpers.PanicIfError(err) -} diff --git a/test/go/iterative_verifier_test.go b/test/go/iterative_verifier_test.go deleted file mode 100644 index a211896e5..000000000 --- a/test/go/iterative_verifier_test.go +++ /dev/null @@ -1,477 +0,0 @@ -package test - -import ( - "database/sql" - "fmt" - "sort" - "testing" - "time" - - "github.com/Shopify/ghostferry" - "github.com/Shopify/ghostferry/testhelpers" - "github.com/siddontang/go-mysql/schema" - "github.com/stretchr/testify/suite" -) - -type IterativeVerifierTestSuite struct { - *testhelpers.GhostferryUnitTestSuite - - verifier *ghostferry.IterativeVerifier - db *sql.DB - table *ghostferry.TableSchema -} - -func (t *IterativeVerifierTestSuite) SetupTest() { - t.GhostferryUnitTestSuite.SetupTest() - t.SeedSourceDB(0) - t.SeedTargetDB(0) - - tableCompressions := make(ghostferry.TableColumnCompressionConfig) - tableCompressions[testhelpers.TestCompressedTable1Name] = make(map[string]string) - tableCompressions[testhelpers.TestCompressedTable1Name][testhelpers.TestCompressedColumn1Name] = ghostferry.CompressionSnappy - - compressionVerifier, err := ghostferry.NewCompressionVerifier(tableCompressions) - if err != nil { - t.FailNow(err.Error()) - } - - t.verifier = &ghostferry.IterativeVerifier{ - CompressionVerifier: compressionVerifier, - CursorConfig: &ghostferry.CursorConfig{ - DB: t.Ferry.SourceDB, - BatchSize: t.Ferry.Config.DataIterationBatchSize, - ReadRetries: t.Ferry.Config.DBReadRetries, - }, - BinlogStreamer: t.Ferry.BinlogStreamer, - SourceDB: t.Ferry.SourceDB, - TargetDB: t.Ferry.TargetDB, - - Concurrency: 1, - } - - t.db = t.Ferry.SourceDB - t.reloadTables() - - err = t.verifier.Initialize() - testhelpers.PanicIfError(err) -} - -func (t *IterativeVerifierTestSuite) TearDownTest() { - t.GhostferryUnitTestSuite.TearDownTest() -} - -func (t *IterativeVerifierTestSuite) TestNothingToVerify() { - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyOnceWithIgnoredColumns() { - ignoredColumns := map[string]map[string]struct{}{"test_table_1": {"data": struct{}{}}} - t.verifier.IgnoredColumns = ignoredColumns - - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyOnceFails() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal("verification failed on table: gftest.test_table_1 for pk: 42", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyCompressedOnceFails() { - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData2, t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal( - fmt.Sprintf("verification failed on table: %s.%s for pk: %s", testhelpers.TestSchemaName, testhelpers.TestCompressedTable1Name, "42"), - result.Message, - ) -} - -func (t *IterativeVerifierTestSuite) TestVerifyOncePass() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "foo", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyCompressedOncePass() { - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestVerifyDifferentCompressedSameDecompressedDataOncePass() { - t.Require().NotEqual(testhelpers.TestCompressedData3, testhelpers.TestCompressedData4) - - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData3, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData4, t.Ferry.TargetDB) - - result, err := t.verifier.VerifyOnce() - t.Require().NotNil(result) - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverFailuresFailAgainDuringCutover() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal("verification failed on table: gftest.test_table_1 for pks: 42", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverCompressionFailuresFailAgainDuringCutover() { - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData1, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(42, testhelpers.TestCompressedData2, t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().False(result.DataCorrect) - t.Require().Equal(fmt.Sprintf("verification failed on table: %s.%s for pks: %s", "gftest", testhelpers.TestCompressedTable1Name, "42"), result.Message) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverDifferentCompressedSameDecompressedDataPassDuringCutover() { - t.Require().NotEqual(testhelpers.TestCompressedData3, testhelpers.TestCompressedData4) - - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData3, t.Ferry.SourceDB) - t.InsertCompressedRowInDb(43, testhelpers.TestCompressedData4, t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestErrorsIfMaxDowntimeIsSurpassed() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - t.verifier.MaxExpectedDowntime = 1 * time.Nanosecond - err := t.verifier.VerifyBeforeCutover() - t.Require().Regexp("cutover stage verification will not complete within max downtime duration \\(took .*\\)", err.Error()) -} - -func (t *IterativeVerifierTestSuite) TestBeforeCutoverFailuresPassDuringCutover() { - t.InsertRowInDb(42, "foo", t.Ferry.SourceDB) - t.InsertRowInDb(42, "bar", t.Ferry.TargetDB) - - err := t.verifier.VerifyBeforeCutover() - t.Require().Nil(err) - - t.UpdateRowInDb(42, "foo", t.Ferry.TargetDB) - - result, err := t.verifier.VerifyDuringCutover() - t.Require().Nil(err) - t.Require().True(result.DataCorrect) - t.Require().Equal("", result.Message) -} - -func (t *IterativeVerifierTestSuite) TestChangingDataChangesHash() { - t.InsertRow(42, "foo") - old := t.GetHashes([]uint64{42})[0] - - t.UpdateRow(42, "bar") - new := t.GetHashes([]uint64{42})[0] - - t.Require().NotEqual(old, new) -} - -func (t *IterativeVerifierTestSuite) TestDeduplicatesHashes() { - t.InsertRow(42, "foo") - - hashes, err := t.verifier.GetHashes(t.db, t.table.Schema, t.table.Name, t.table.GetPKColumn(0).Name, t.table.Columns, []uint64{42, 42}) - t.Require().Nil(err) - t.Require().Equal(1, len(hashes)) -} - -func (t *IterativeVerifierTestSuite) TestDoesntReturnHashIfRecordDoesntExist() { - hashes, err := t.verifier.GetHashes(t.db, t.table.Schema, t.table.Name, t.table.GetPKColumn(0).Name, t.table.Columns, []uint64{42, 42}) - t.Require().Nil(err) - t.Require().Equal(0, len(hashes)) -} - -func (t *IterativeVerifierTestSuite) TestUnrelatedRowsDontAffectHash() { - t.InsertRow(42, "foo") - expected := t.GetHashes([]uint64{42})[0] - - t.InsertRow(43, "bar") - actual := t.GetHashes([]uint64{42})[0] - - t.Require().Equal(expected, actual) -} - -func (t *IterativeVerifierTestSuite) TestRowsWithSameDataButDifferentPKs() { - t.InsertRow(42, "foo") - t.InsertRow(43, "foo") - - hashes := t.GetHashes([]uint64{42, 43}) - t.Require().NotEqual(hashes[0], hashes[1]) -} - -func (t *IterativeVerifierTestSuite) TestPositiveAndNegativeZeroFloat() { - _, err := t.db.Exec("ALTER TABLE gftest.test_table_1 MODIFY data float") - t.Require().Nil(err) - t.reloadTables() - - _, err = t.db.Exec("INSERT INTO gftest.test_table_1 VALUES (42, \"0.0\")") - t.Require().Nil(err) - - expected := t.GetHashes([]uint64{42})[0] - - _, err = t.db.Exec("UPDATE gftest.test_table_1 SET data=\"-0.0\" WHERE id=42") - t.Require().Nil(err) - - actual := t.GetHashes([]uint64{42})[0] - - t.Require().Equal(expected, actual) -} - -func (t *IterativeVerifierTestSuite) TestChangingNumberValueChangesHash() { - _, err := t.db.Exec("ALTER TABLE gftest.test_table_1 MODIFY data bigint(20)") - t.Require().Nil(err) - t.reloadTables() - - _, err = t.db.Exec("INSERT INTO gftest.test_table_1 VALUES (42, -100)") - t.Require().Nil(err) - - neg := t.GetHashes([]uint64{42})[0] - - _, err = t.db.Exec("UPDATE gftest.test_table_1 SET data=100 WHERE id=42") - t.Require().Nil(err) - - pos := t.GetHashes([]uint64{42})[0] - - t.Require().NotEqual(neg, pos) -} - -func (t *IterativeVerifierTestSuite) TestNULLValues() { - _, err := t.db.Exec("INSERT INTO gftest.test_table_1 VALUES (42, NULL)") - t.Require().Nil(err) - null := t.GetHashes([]uint64{42})[0] - - t.UpdateRow(42, "") - empty := t.GetHashes([]uint64{42})[0] - - t.UpdateRow(42, "foo") - foo := t.GetHashes([]uint64{42})[0] - - t.Require().NotEqual(null, empty) - t.Require().NotEqual(foo, empty) - t.Require().NotEqual(foo, null) -} - -func (t *IterativeVerifierTestSuite) InsertRow(id int, data string) { - t.InsertRowInDb(id, data, t.db) -} - -func (t *IterativeVerifierTestSuite) InsertRowInDb(id int, data string, db *sql.DB) { - _, err := db.Exec(fmt.Sprintf("INSERT INTO %s.%s VALUES (%d,\"%s\")", testhelpers.TestSchemaName, testhelpers.TestTable1Name, id, data)) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) InsertCompressedRowInDb(id int, data string, db *sql.DB) { - t.SetColumnType(testhelpers.TestSchemaName, testhelpers.TestCompressedTable1Name, testhelpers.TestCompressedColumn1Name, "MEDIUMBLOB", db) - _, err := db.Exec("INSERT INTO "+testhelpers.TestSchemaName+"."+testhelpers.TestCompressedTable1Name+" VALUES (?,?)", id, data) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) SetColumnType(schema, table, column, columnType string, db *sql.DB) { - t.Require().True(columnType != "") - - _, err := db.Exec(fmt.Sprintf( - "ALTER TABLE %s.%s MODIFY %s %s", - schema, - table, - column, - columnType, - )) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) UpdateRow(id int, data string) { - t.UpdateRowInDb(id, data, t.db) -} - -func (t *IterativeVerifierTestSuite) UpdateRowInDb(id int, data string, db *sql.DB) { - _, err := db.Exec(fmt.Sprintf("UPDATE %s.%s SET data=\"%s\" WHERE id=%d", testhelpers.TestSchemaName, testhelpers.TestTable1Name, data, id)) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) DeleteRow(id int) { - _, err := t.db.Exec(fmt.Sprintf("DELETE FROM %s.%s WHERE id=%d", testhelpers.TestSchemaName, testhelpers.TestTable1Name, id)) - t.Require().Nil(err) -} - -func (t *IterativeVerifierTestSuite) GetHashes(ids []uint64) []string { - hashes, err := t.verifier.GetHashes(t.db, t.table.Schema, t.table.Name, t.table.GetPKColumn(0).Name, t.table.Columns, ids) - t.Require().Nil(err) - t.Require().Equal(len(hashes), len(ids)) - - res := make([]string, len(ids)) - - for idx, id := range ids { - hash, ok := hashes[id] - t.Require().True(ok) - t.Require().True(len(hash) > 0) - - res[idx] = string(hash) - } - - return res -} - -func (t *IterativeVerifierTestSuite) reloadTables() { - tableFilter := &testhelpers.TestTableFilter{ - DbsFunc: testhelpers.DbApplicabilityFilter([]string{testhelpers.TestSchemaName}), - TablesFunc: nil, - } - - tables, err := ghostferry.LoadTables(t.db, tableFilter, nil, nil) - t.Require().Nil(err) - - t.Ferry.Tables = tables - t.verifier.Tables = tables.AsSlice() - t.verifier.TableSchemaCache = tables - - t.table = tables.Get(testhelpers.TestSchemaName, testhelpers.TestTable1Name) - t.Require().NotNil(t.table) -} - -type ReverifyStoreTestSuite struct { - suite.Suite - - store *ghostferry.ReverifyStore -} - -func (t *ReverifyStoreTestSuite) SetupTest() { - t.store = ghostferry.NewReverifyStore() -} - -func (t *ReverifyStoreTestSuite) TestAddEntryIntoReverifyStoreWillDeduplicate() { - pk1 := uint64(100) - pk2 := uint64(101) - table1 := &ghostferry.TableSchema{Table: &schema.Table{Schema: "gftest", Name: "table1"}} - t.store.Add(ghostferry.ReverifyEntry{Pk: pk1, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk1, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk1, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk2, Table: table1}) - t.store.Add(ghostferry.ReverifyEntry{Pk: pk2, Table: table1}) - - t.Require().Equal(uint64(2), t.store.RowCount) - t.Require().Equal(1, len(t.store.MapStore)) - t.Require().Equal( - map[uint64]struct{}{ - pk1: struct{}{}, - pk2: struct{}{}, - }, - t.store.MapStore[ghostferry.TableIdentifier{"gftest", "table1"}], - ) -} - -func (t *ReverifyStoreTestSuite) TestFlushAndBatchByTableWillCreateReverifyBatchesAndClearTheMapStore() { - expectedTable1Pks := make([]uint64, 0, 55) - table1 := &ghostferry.TableSchema{Table: &schema.Table{Schema: "gftest", Name: "table1"}} - table2 := &ghostferry.TableSchema{Table: &schema.Table{Schema: "gftest", Name: "table2"}} - for i := uint64(100); i < 155; i++ { - t.store.Add(ghostferry.ReverifyEntry{Pk: i, Table: table1}) - expectedTable1Pks = append(expectedTable1Pks, i) - } - - expectedTable2Pks := make([]uint64, 0, 45) - for i := uint64(200); i < 245; i++ { - t.store.Add(ghostferry.ReverifyEntry{Pk: i, Table: table2}) - expectedTable2Pks = append(expectedTable2Pks, i) - } - - batches := t.store.FlushAndBatchByTable(10) - t.Require().Equal(11, len(batches)) - table1Batches := make([]ghostferry.ReverifyBatch, 0) - table2Batches := make([]ghostferry.ReverifyBatch, 0) - - for _, batch := range batches { - switch batch.Table.TableName { - case "table1": - table1Batches = append(table1Batches, batch) - case "table2": - table2Batches = append(table2Batches, batch) - } - } - - t.Require().Equal(6, len(table1Batches)) - t.Require().Equal(5, len(table2Batches)) - - actualTable1Pks := make([]uint64, 0) - for _, batch := range table1Batches { - for _, pk := range batch.Pks { - actualTable1Pks = append(actualTable1Pks, pk) - } - } - - sort.Slice(actualTable1Pks, func(i, j int) bool { return actualTable1Pks[i] < actualTable1Pks[j] }) - t.Require().Equal(expectedTable1Pks, actualTable1Pks) - - actualTable2Pks := make([]uint64, 0) - for _, batch := range table2Batches { - for _, pk := range batch.Pks { - actualTable2Pks = append(actualTable2Pks, pk) - } - } - - sort.Slice(actualTable2Pks, func(i, j int) bool { return actualTable2Pks[i] < actualTable2Pks[j] }) - t.Require().Equal(expectedTable2Pks, actualTable2Pks) - - t.Require().Equal(0, len(t.store.MapStore)) -} - -func TestIterativeVerifierTestSuite(t *testing.T) { - testhelpers.SetupTest() - suite.Run(t, &IterativeVerifierTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}}) -} - -func TestReverifyStoreTestSuite(t *testing.T) { - testhelpers.SetupTest() - suite.Run(t, &ReverifyStoreTestSuite{}) -} diff --git a/test/integration/iterative_verifier_test.rb b/test/integration/iterative_verifier_test.rb deleted file mode 100644 index cae3b86c7..000000000 --- a/test/integration/iterative_verifier_test.rb +++ /dev/null @@ -1,56 +0,0 @@ -require "test_helper" - -class IterativeVerifierTest < GhostferryTestCase - def setup - seed_simple_database_with_single_table - end - - def test_iterative_verifier_succeeds_in_normal_run - datawriter = new_source_datawriter - ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Iterative" }) - - start_datawriter_with_ghostferry(datawriter, ghostferry) - stop_datawriter_during_cutover(datawriter, ghostferry) - - verification_ran = false - ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables| - verification_ran = true - assert_equal 0, incorrect_tables.length - end - - ghostferry.run - assert verification_ran - assert_test_table_is_identical - end - - def test_iterative_verifier_fails_if_binlog_streamer_incorrectly_copies_data - datawriter = new_source_datawriter - ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Iterative" }) - - table_name = DEFAULT_FULL_TABLE_NAME - - chosen_id = 0 - verification_ran = false - ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do - result = source_db.query("SELECT id FROM #{table_name} ORDER BY id LIMIT 1") - chosen_id = result.first["id"] - - refute chosen_id == 0 - source_db.query("UPDATE #{table_name} SET data = 'something' WHERE id = #{chosen_id}") - end - - ghostferry.on_status(Ghostferry::Status::VERIFY_DURING_CUTOVER) do - refute chosen_id == 0 - source_db.query("DELETE FROM #{table_name} WHERE id = #{chosen_id}") - end - - ghostferry.on_status(Ghostferry::Status::VERIFIED) do |*incorrect_tables| - verification_ran = true - - assert_equal ["gftest.test_table_1"], incorrect_tables - end - - ghostferry.run - assert verification_ran - end -end diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry.go index 5a30cdaa6..868a42603 100644 --- a/test/lib/go/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -224,15 +224,7 @@ func NewStandardConfig() (*ghostferry.Config, error) { } } - verifierType := os.Getenv("GHOSTFERRY_VERIFIER_TYPE") - if verifierType == ghostferry.VerifierTypeIterative { - config.VerifierType = ghostferry.VerifierTypeIterative - config.IterativeVerifierConfig = ghostferry.IterativeVerifierConfig{ - Concurrency: 2, - } - } else if verifierType != "" { - config.VerifierType = verifierType - } + config.VerifierType = os.Getenv("GHOSTFERRY_VERIFIER_TYPE") return config, config.ValidateConfig() } From 176cddfbf1ddb15c00456f9b2cce8dbe6eddde01 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 8 Oct 2019 13:14:07 -0400 Subject: [PATCH 2/3] Updated documentation to refer to InlineVerifier --- docs/source/copydbinterruptresume.rst | 4 +- docs/source/verifiers.rst | 100 ++++++++++++++------------ 2 files changed, 58 insertions(+), 46 deletions(-) diff --git a/docs/source/copydbinterruptresume.rst b/docs/source/copydbinterruptresume.rst index 5a9a983dd..7cdf157ae 100644 --- a/docs/source/copydbinterruptresume.rst +++ b/docs/source/copydbinterruptresume.rst @@ -111,8 +111,8 @@ Some other considerations/notes: runs. * To test resuming errored runs further, see :ref:`prodtesting`. -* Verifiers are not resumable, including the IterativeVerifier. This may change - in the future. +* ChecksumTableVerifier is not resumable, but the InlineVerifier is as long as + cut over didn't begin. * While we are confident that the algorithm to be correct, this is still a highly experimental feature. USE AT YOUR OWN RISK. diff --git a/docs/source/verifiers.rst b/docs/source/verifiers.rst index d925e25c0..7487b8baa 100644 --- a/docs/source/verifiers.rst +++ b/docs/source/verifiers.rst @@ -6,28 +6,34 @@ Verifiers Verifiers in Ghostferry is designed to ensure that Ghostferry did not corrupt/miss data. There are two different verifiers: the -``ChecksumTableVerifier`` and the ``IterativeVerifier``. A comparison of them +``ChecksumTableVerifier`` and the ``InlineVerifier``. A comparison of them are given below: +-----------------------+-----------------------+-----------------------------+ -| | ChecksumTableVerifier | IterativeVerifier | +| | ChecksumTableVerifier | InlineVerifier | +-----------------------+-----------------------+-----------------------------+ -|Mechanism | ``CHECKSUM TABLE`` | Verify row before cutover; | -| | | Reverify changed rows during| -| | | cutover. | +|Mechanism | ``CHECKSUM TABLE`` | Each row is validated via a | +| | | MD5 type query on the MySQL | +| | | database after it is copied.| +| | | Any entries copied due to | +| | | binlog activity is verified | +| | | periodically during the copy| +| | | process and ultimately | +| | | during the cutover. | +-----------------------+-----------------------+-----------------------------+ -|Impacts on Cutover Time| Linear w.r.t data size| Linear w.r.t. change rate | -| | | [1]_ | +|Impacts on Cutover Time| Linear w.r.t data | Linear w.r.t. change rate | +| | size. | [1]_. | +-----------------------+-----------------------+-----------------------------+ -|Impacts on Copy Time | None | Linear w.r.t data size | +|Impacts on Copy Time | None. | Linear w.r.t data size [3]_.| |[2]_ | | | +-----------------------+-----------------------+-----------------------------+ -|Memory Usage | Minimal | Linear w.r.t rows changed | +|Memory Usage | Minimal. | Minimal. | +-----------------------+-----------------------+-----------------------------+ -|Partial table copy | Not supported | Supported | +|Partial table copy | Not supported. | Supported. | +-----------------------+-----------------------+-----------------------------+ -|Worst Case Scenario | Large databases causes| Verification is slower than | -| | unacceptable downtime | the change rate of the DB; | +|Worst Case Scenario | Large databases causes| Verification during cutover | +| | unacceptable downtime.| is slower than the change | +| | | rate of the DB. | +-----------------------+-----------------------+-----------------------------+ .. [1] Additional improvements could be made to reduce this as long as @@ -37,39 +43,45 @@ are given below: .. [2] Increase in copy time does not increase downtime. Downtime occurs only in cutover. +.. [3] The increase should be minimal as the verification is done immediately + after copy, when the data most likely still live in RAM. + If you want verification, you should try with the ``ChecksumTableVerifier`` first if you're copying whole tables at a time. If that takes too long, you can -try using the ``IterativeVerifier``. Alternatively, you can verify in a staging +try using the ``InlineVerifier``. Alternatively, you can verify in a staging run and not verify during the production run (see :ref:`copydbinprod`). -IterativeVerifier ------------------ - -IterativeVerifier verifies the source and target in a couple of steps: - -1. After the data copy, it first compares the hashes of each applicable rows - of the source and the target together to make sure they are the same. This - is known as the initial verification. - - a. If they are the same: the verification for that row is complete. - b. If they are not the same: add it into a reverify queue. - -2. For any rows changed during the initial verification process, add it into - the reverify queue. - -3. After the initial verification, verify the rows' hashes in the - reverification queue again. This is done to reduce the time needed to - reverify during the cutover as we assume the reverification queue will - become smaller during this process. - -4. During the cutover stage, verify all rows' hashes in the reverify queue. - - a. If they are the same: the verification for that row is complete. - b. If they are not the same: the verification fails. - -5. If no verification failure occurs, the source and the target are identical. - If verification failure does occur (4b), then the source and target are not - identical. - -A proof of concept TLA+ verification of this algorithm is done in -``_. +InlineVerifier +-------------- + +Ghostferry's core algorithm has ran for millions of times and is backed by a +TLA+ specification. There's a high degree of confidence in its correctness. +However, correctness analysis assumed that the data is perfectly copied from +the source to the target MySQL. However, this may not be the case as the data +is transformed from MySQL -> Go -> MySQL. Different encodings could change the +unintentionally data, resulting in corruptions. Some observed (and fixed) cases +includes: floating point values and datetime columns. + +The InlineVerifier is designed to catch these type of problems and fail the +run if discrepencies are detected. **It is not designed to verify that certain +records are missing**. Only the ChecksumTableVerifier can do that. The way the +InlineVerifier catches encoding type issues are as follows: + +* During the DataIterator + 1. While selecting a row to copy, a MD5 checksum is calculated on the source + MySQL server. + 2. After the data is copied onto the target, the same MD5 checksum is + calculated on the target. + 3. The hash is compared. If it is different, the run is aborted with an + error. +* During the BinlogStreamer + 1. Since the MD5 checksum cannot be synchronously generated with the Binlog + event, any rows seen in the BinlogStreamer is added to a background + verification queue. + 2. Periodically in the background, the checksum for all rows within the + queue are computed both on the source and the target database. + 3. The checksums are compared. If they match, the row is removed from the + verification queue. Otherwise, it remains in the queue. + 4. During the cutover, when the source and target are read-only, checksums + for all rows within the verification queue are checked. If any + mismatches are detected, an error is raised. From c459035c313fc655299748199dd4fd7bc314a8de Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 15 Oct 2019 10:00:10 -0400 Subject: [PATCH 3/3] Fixed typos in documentation --- docs/source/verifiers.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/source/verifiers.rst b/docs/source/verifiers.rst index 7487b8baa..56f390594 100644 --- a/docs/source/verifiers.rst +++ b/docs/source/verifiers.rst @@ -12,7 +12,7 @@ are given below: +-----------------------+-----------------------+-----------------------------+ | | ChecksumTableVerifier | InlineVerifier | +-----------------------+-----------------------+-----------------------------+ -|Mechanism | ``CHECKSUM TABLE`` | Each row is validated via a | +|Mechanism | ``CHECKSUM TABLE`` | Each row is validated via an| | | | MD5 type query on the MySQL | | | | database after it is copied.| | | | Any entries copied due to | @@ -54,7 +54,7 @@ run and not verify during the production run (see :ref:`copydbinprod`). InlineVerifier -------------- -Ghostferry's core algorithm has ran for millions of times and is backed by a +Ghostferry's core algorithm has run for millions of times and is backed by a TLA+ specification. There's a high degree of confidence in its correctness. However, correctness analysis assumed that the data is perfectly copied from the source to the target MySQL. However, this may not be the case as the data