-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
schema fingerprint verifier #292
base: main
Are you sure you want to change the base?
Changes from 7 commits
2d0fab2
df0c644
d83ec42
4e34389
fbb08e4
9bf1bcc
031bd4c
0f58419
2afb0c3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,8 @@ type Ferry struct { | |
DataIterator *DataIterator | ||
BatchWriter *BatchWriter | ||
|
||
SchemaFingerPrintVerifier *SchemaFingerPrintVerifier | ||
|
||
StateTracker *StateTracker | ||
ErrorHandler ErrorHandler | ||
Throttler Throttler | ||
|
@@ -245,6 +247,29 @@ func (f *Ferry) NewInlineVerifier() *InlineVerifier { | |
} | ||
} | ||
|
||
func (f *Ferry) NewSchemaFingerPrintVerifier() (*SchemaFingerPrintVerifier, error) { | ||
fingerPrint := map[string]string{} | ||
if f.StateToResumeFrom != nil && f.StateToResumeFrom.SchemaFingerPrint != nil { | ||
fingerPrint = f.StateToResumeFrom.SchemaFingerPrint | ||
} | ||
periodicallyVerifyInterval, err := time.ParseDuration(f.Config.PeriodicallyVerifySchemaFingerPrintInterval) | ||
if err != nil { | ||
return nil, fmt.Errorf("invalid PeriodicallyVerifySchemaFingerPrintInterval: %v. this error should have been caught via .Validate()", err) | ||
} | ||
|
||
return &SchemaFingerPrintVerifier{ | ||
SourceDB: f.SourceDB, | ||
TableRewrites: f.Config.TableRewrites, | ||
TableSchemaCache: f.Tables, | ||
ErrorHandler: f.ErrorHandler, | ||
PeriodicallyVerifyInterval: periodicallyVerifyInterval, | ||
|
||
FingerPrints: fingerPrint, | ||
|
||
logger: logrus.WithField("tag", "schema_fingerprint_verifier"), | ||
}, nil | ||
} | ||
|
||
func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier { | ||
v := f.NewInlineVerifier() | ||
v.StateTracker = nil | ||
|
@@ -489,6 +514,11 @@ func (f *Ferry) Initialize() (err error) { | |
} | ||
} | ||
|
||
f.SchemaFingerPrintVerifier, err = f.NewSchemaFingerPrintVerifier() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// The iterative verifier needs the binlog streamer so this has to be first. | ||
// Eventually this can be moved below the verifier initialization. | ||
f.BinlogStreamer = f.NewSourceBinlogStreamer() | ||
|
@@ -686,6 +716,13 @@ func (f *Ferry) Run() { | |
}() | ||
} | ||
|
||
schemaFingerVerifierPrintWg := &sync.WaitGroup{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could probably just reuse the |
||
schemaFingerVerifierPrintWg.Add(1) | ||
go func() { | ||
defer schemaFingerVerifierPrintWg.Done() | ||
f.SchemaFingerPrintVerifier.PeriodicallyVerifySchemaFingerprints(ctx) | ||
}() | ||
|
||
inlineVerifierWg := &sync.WaitGroup{} | ||
inlineVerifierContext, stopInlineVerifier := context.WithCancel(ctx) | ||
if f.inlineVerifier != nil { | ||
|
@@ -787,6 +824,8 @@ func (f *Ferry) Run() { | |
f.DoneTime = time.Now() | ||
|
||
shutdown() | ||
|
||
schemaFingerVerifierPrintWg.Wait() | ||
supportingServicesWg.Wait() | ||
|
||
if f.Config.ProgressCallback.URI != "" { | ||
|
@@ -908,12 +947,14 @@ func (f *Ferry) SerializeStateToJSON() (string, error) { | |
err := errors.New("no valid StateTracker") | ||
return "", err | ||
} | ||
var binlogVerifyStore *BinlogVerifyStore = nil | ||
var ( | ||
binlogVerifyStore *BinlogVerifyStore = nil | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question for my own knowledge: what prompted this change? A linting error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, seems like it. Will change it back. |
||
if f.inlineVerifier != nil { | ||
binlogVerifyStore = f.inlineVerifier.reverifyStore | ||
} | ||
|
||
serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore) | ||
serializedState := f.StateTracker.Serialize(f.Tables, binlogVerifyStore, f.SchemaFingerPrintVerifier.FingerPrints) | ||
|
||
if f.Config.DoNotIncludeSchemaCacheInStateDump { | ||
serializedState.LastKnownTableSchemaCache = nil | ||
|
@@ -945,7 +986,7 @@ func (f *Ferry) Progress() *Progress { | |
} | ||
|
||
// Table Progress | ||
serializedState := f.StateTracker.Serialize(nil, nil) | ||
serializedState := f.StateTracker.Serialize(nil, nil, nil) | ||
// Note the below will not necessarily be synchronized with serializedState. | ||
// This is fine as we don't need to be super precise with performance data. | ||
rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading this code gives me a bit of pause. There's currently a race condition I think:
Did I miss something and this situation is not possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So for clarity currently the following happens when you run the migration with copydb
Also just to avoid confusion by
table schema
I mean the information we retrieve frominformation_schema.columns
table from mysql and not theTableSchema
struct we use internally in ghostferry.I am not sure I understand your point about after ghostferry starts copying rows why would the table schema would have changed. Copying rows shouldn't alter the schema structure of the table. Here is what an example output is when we try to retrieve the schema of a table.