From bbaa1cc6092013197c65dab3e016282646812d02 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 28 Jan 2025 13:12:37 +0100 Subject: [PATCH 1/5] Fix race when a vdiff resumes on vttablet restart Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/shard_sync.go | 2 +- go/vt/vttablet/tabletmanager/vdiff/controller.go | 3 --- go/vt/vttablet/tabletmanager/vdiff/engine.go | 5 +++++ .../vttablet/tabletmanager/vdiff/framework_test.go | 14 ++++++++++++-- .../tabletmanager/vdiff/workflow_differ_test.go | 8 ++------ 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/shard_sync.go b/go/vt/vttablet/tabletmanager/shard_sync.go index ab995ec14b1..e457ba03567 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync.go +++ b/go/vt/vttablet/tabletmanager/shard_sync.go @@ -85,7 +85,7 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st // We don't use the watch event except to know that we should // re-read the shard record, and to know if the watch dies. log.Info("Change in shard record") - if event.Err != nil { + if event != nil && event.Err != nil { // The watch failed. Stop it so we start a new one if needed. log.Errorf("Shard watch failed: %v", event.Err) shardWatch.stop() diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 20c1501989e..31316d8431a 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -104,9 +104,6 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"), TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), } - ctx, ct.cancel = context.WithCancel(ctx) - go ct.run(ctx) - return ct, nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index b2285a070fa..d3a762a479b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -221,6 +221,11 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman globalStats.mu.Lock() defer globalStats.mu.Unlock() globalStats.controllers[ct.id] = ct + + // run() can start a vdiff in pending/started state, so we should init the stats first before starting it. + controllerCtx, cancel := context.WithCancel(vde.ctx) + ct.cancel = cancel + go ct.run(controllerCtx) return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 563741fcd23..78f50e3d182 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -676,8 +676,7 @@ func (tvde *testVDiffEnv) createController(t *testing.T, id int) *controller { fmt.Sprintf("%d|%s|%s|%s|%s|%s|%s|%s|", id, uuid.New(), tvde.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS), ) tvde.dbClient.ExpectRequest(fmt.Sprintf("select * from _vt.vdiff where id = %d", id), noResults, nil) - ct, err := newController(context.Background(), controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts) - require.NoError(t, err) + ct := tvde.newController(t, controllerQR) ct.sources = map[string]*migrationSource{ tstenv.ShardName: { vrID: 1, @@ -688,5 +687,16 @@ func (tvde *testVDiffEnv) createController(t *testing.T, id int) *controller { }, } ct.sourceKeyspace = tstenv.KeyspaceName + + return ct +} + +func (tvde *testVDiffEnv) newController(t *testing.T, controllerQR *sqltypes.Result) *controller { + ctx := context.Background() + ct, err := newController(ctx, controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts) + require.NoError(t, err) + ctx2, cancel := context.WithCancel(ctx) + ct.cancel = cancel + go ct.run(ctx2) return ct } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go index 5ac0fabd726..9d6f4050a64 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ_test.go @@ -17,7 +17,6 @@ limitations under the License. package vdiff import ( - "context" "fmt" "strings" "testing" @@ -49,8 +48,7 @@ func TestBuildPlanSuccess(t *testing.T) { ) vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil) - ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts) - require.NoError(t, err) + ct := vdenv.newController(t, controllerQR) ct.sources = map[string]*migrationSource{ tstenv.ShardName: { vrID: 1, @@ -698,9 +696,7 @@ func TestBuildPlanFailure(t *testing.T) { fmt.Sprintf("1|%s|%s|%s|%s|%s|%s|%s|", UUID, vdiffenv.workflow, tstenv.KeyspaceName, tstenv.ShardName, vdiffDBName, PendingState, optionsJS), ) vdiffenv.dbClient.ExpectRequest("select * from _vt.vdiff where id = 1", noResults, nil) - ct, err := newController(context.Background(), controllerQR.Named().Row(), vdiffenv.dbClientFactory, tstenv.TopoServ, vdiffenv.vde, vdiffenv.opts) - require.NoError(t, err) - + ct := vdenv.newController(t, controllerQR) testcases := []struct { input *binlogdatapb.Rule err string From e0cc80110a740634090a265abba7cebe0b8d4642 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 28 Jan 2025 15:16:09 +0100 Subject: [PATCH 2/5] Log if nil event was received for visibility Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/shard_sync.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/shard_sync.go b/go/vt/vttablet/tabletmanager/shard_sync.go index e457ba03567..4afbaa1a639 100644 --- a/go/vt/vttablet/tabletmanager/shard_sync.go +++ b/go/vt/vttablet/tabletmanager/shard_sync.go @@ -85,10 +85,15 @@ func (tm *TabletManager) shardSyncLoop(ctx context.Context, notifyChan <-chan st // We don't use the watch event except to know that we should // re-read the shard record, and to know if the watch dies. log.Info("Change in shard record") - if event != nil && event.Err != nil { - // The watch failed. Stop it so we start a new one if needed. - log.Errorf("Shard watch failed: %v", event.Err) - shardWatch.stop() + + if event != nil { + if event.Err != nil { + // The watch failed. Stop it so we start a new one if needed. + log.Errorf("Shard watch failed: %v", event.Err) + shardWatch.stop() + } + } else { + log.Infof("Got a nil event from the shard watcher for %s. This should not happen.", tm.tabletAlias) } case <-ctx.Done(): // Our context was cancelled. Terminate the loop. From ff0279f5375a5fc5c9147486fbed26830058ebed Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 28 Jan 2025 18:36:59 +0100 Subject: [PATCH 3/5] Init map while initing stats Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vdiff/engine.go | 14 +------------- go/vt/vttablet/tabletmanager/vdiff/stats.go | 1 + 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index d3a762a479b..7cd486a55e4 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -153,12 +153,12 @@ func (vde *Engine) openLocked(ctx context.Context) error { if err != nil { return err } + vde.ctx, vde.cancel = context.WithCancel(ctx) vde.isOpen = true // now we are open and have things to close if err := vde.initControllers(rows); err != nil { return err } - vde.updateStats() // At this point we've fully and successfully opened so begin // retrying error'd VDiffs until the engine is closed. @@ -400,16 +400,4 @@ func (vde *Engine) resetControllers() { ct.Stop() } vde.controllers = make(map[int64]*controller) - vde.updateStats() -} - -// updateStats must only be called while holding the engine lock. -func (vre *Engine) updateStats() { - globalStats.mu.Lock() - defer globalStats.mu.Unlock() - - globalStats.controllers = make(map[int64]*controller, len(vre.controllers)) - for id, ct := range vre.controllers { - globalStats.controllers[id] = ct - } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 04cda6ac0c1..265f0407d89 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -49,6 +49,7 @@ func (vds *vdiffStats) register() { globalStats.ErrorCount = stats.NewCounter("", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table") globalStats.RowsDiffedCount = stats.NewCounter("", "") + globalStats.controllers = make(map[int64]*controller) stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers) From 853bb6f414676b62c31bf7ec8d5c36c05af6dcb0 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 28 Jan 2025 20:56:44 +0100 Subject: [PATCH 4/5] Remove unnecessary param Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vdiff/controller.go | 2 +- go/vt/vttablet/tabletmanager/vdiff/engine.go | 2 +- go/vt/vttablet/tabletmanager/vdiff/framework_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 31316d8431a..7b5e1311066 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -83,7 +83,7 @@ type controller struct { TableDiffPhaseTimings *stats.Timings } -func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient, +func newController(row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient, ts *topo.Server, vde *Engine, options *tabletmanagerdata.VDiffOptions) (*controller, error) { log.Infof("VDiff controller initializing for %+v", row) diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index 7cd486a55e4..e3abe651c58 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -212,7 +212,7 @@ func (vde *Engine) retry(ctx context.Context, err error) { // addController creates a new controller using the given vdiff record and adds it to the engine. // You must already have the main engine mutex (mu) locked before calling this. func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletmanagerdata.VDiffOptions) error { - ct, err := newController(vde.ctx, row, vde.dbClientFactoryDba, vde.ts, vde, options) + ct, err := newController(row, vde.dbClientFactoryDba, vde.ts, vde, options) if err != nil { return fmt.Errorf("controller could not be initialized for stream %+v on tablet %v", row, vde.thisTablet.Alias) diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index 78f50e3d182..4c1c7e8f3b8 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -693,7 +693,7 @@ func (tvde *testVDiffEnv) createController(t *testing.T, id int) *controller { func (tvde *testVDiffEnv) newController(t *testing.T, controllerQR *sqltypes.Result) *controller { ctx := context.Background() - ct, err := newController(ctx, controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts) + ct, err := newController(controllerQR.Named().Row(), tvde.dbClientFactory, tstenv.TopoServ, tvde.vde, tvde.opts) require.NoError(t, err) ctx2, cancel := context.WithCancel(ctx) ct.cancel = cancel From 2cdeec5b89132f3c1b8c548e36074bf99d9b9c55 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 4 Feb 2025 14:27:21 +0100 Subject: [PATCH 5/5] Remove obsolete comment. Reinit controller stats on engine open Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vdiff/engine.go | 3 ++- go/vt/vttablet/tabletmanager/vdiff/stats.go | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index e3abe651c58..cd3771accfd 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -146,6 +146,8 @@ func (vde *Engine) openLocked(ctx context.Context) error { vde.resetControllers() } + globalStats.initControllerStats() + // At this point the tablet has no controllers running. So // we want to start any VDiffs that have not been explicitly // stopped or otherwise finished. @@ -222,7 +224,6 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman defer globalStats.mu.Unlock() globalStats.controllers[ct.id] = ct - // run() can start a vdiff in pending/started state, so we should init the stats first before starting it. controllerCtx, cancel := context.WithCancel(vde.ctx) ct.cancel = cancel go ct.run(controllerCtx) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 265f0407d89..ae59884e6c2 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -44,12 +44,18 @@ type vdiffStats struct { RowsDiffedCount *stats.Counter } +func (vds *vdiffStats) initControllerStats() { + vds.mu.Lock() + defer vds.mu.Unlock() + vds.controllers = make(map[int64]*controller) +} + func (vds *vdiffStats) register() { globalStats.Count = stats.NewGauge("", "") globalStats.ErrorCount = stats.NewCounter("", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table") globalStats.RowsDiffedCount = stats.NewCounter("", "") - globalStats.controllers = make(map[int64]*controller) + globalStats.initControllerStats() stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers)