From 6359a99ff2f28462d5b61c74ed0f4d42ec031f0b Mon Sep 17 00:00:00 2001 From: twthorn Date: Mon, 24 Feb 2025 17:55:50 -0500 Subject: [PATCH] Add more vstream metrics for vstream manager --- go/vt/vtgate/vstream_manager.go | 41 +++++++++--- go/vt/vtgate/vstream_manager_test.go | 97 ++++++++++++++++++++++++++-- 2 files changed, 121 insertions(+), 17 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index ada1ddb131c..57016cc0a7d 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -51,8 +51,11 @@ type vstreamManager struct { toposerv srvtopo.Server cell string - vstreamsCreated *stats.CountersWithMultiLabels - vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCreated *stats.CountersWithMultiLabels + vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCount *stats.CountersWithMultiLabels + vstreamsEventsStreamed *stats.CountersWithMultiLabels + vstreamsEndedWithErrors *stats.CountersWithMultiLabels } // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set @@ -151,10 +154,22 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", - []string{"Keyspace", "ShardName", "TabletType"}), + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), vstreamsLag: exporter.NewGaugesWithMultiLabels( "VStreamsLag", "Difference between event current time and the binlog event timestamp", + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + vstreamsCount: exporter.NewCountersWithMultiLabels( + "VStreamsCount", + "Number of active vstreams", + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels( + "VStreamsEventsStreamed", + "Number of vstreams events sent", + []string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}), + vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels( + "VStreamsEndedWithErrors", + "Number of vstreams ended with errors", []string{"Keyspace", "ShardName", "TabletType"}), } } @@ -378,11 +393,17 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard vs.wg.Add(1) go func() { defer vs.wg.Done() + + labels := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} + vs.vsm.vstreamsEndedWithErrors.Add(labels, 0) + err := vs.streamFromTablet(ctx, sgtid) // Set the error on exit. First one wins. if err != nil { log.Errorf("Error in vstream for %+v: %s", sgtid, err) + + vs.vsm.vstreamsEndedWithErrors.Add(labels, 1) vs.once.Do(func() { vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid)) vs.cancel() @@ -613,18 +634,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha TableLastPKs: sgtid.TablePKs, Options: options, } - var vstreamCreatedOnce sync.Once + + labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String(), tablet.Hostname} + vs.vsm.vstreamsCreated.Add(labels, 1) + vs.vsm.vstreamsCount.Add(labels, 1) + log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 - labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()} - - vstreamCreatedOnce.Do(func() { - vs.vsm.vstreamsCreated.Add(labels, 1) - }) - select { case <-ctx.Done(): return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s", @@ -646,6 +665,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { + vs.vsm.vstreamsEventsStreamed.Add(labels, 1) switch event.Type { case binlogdatapb.VEventType_FIELD: // Update table names and send. @@ -762,6 +782,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } return nil }) + vs.vsm.vstreamsCount.Add(labels, -1) // If stream was ended (by a journal event), return nil without checking for error. select { case <-journalDone: diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index e209e15fb3d..f34345a767e 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -335,7 +335,7 @@ func TestVStreamMulti(t *testing.T) { } } -func TestVStreamsCreatedAndLagMetrics(t *testing.T) { +func TestVStreamsMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() cell := "aa" @@ -346,9 +346,11 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { vsm := newTestVStreamManager(ctx, hc, st, cell) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() - sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + hostname1 := "host1" + hostname2 := "host2" + sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) - sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) send0 := []*binlogdatapb.VEvent{ @@ -377,15 +379,96 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { ch := startVStream(ctx, t, vsm, vgtid, nil) <-ch <-ch + expectedLabels1Prefix := "TestVStream.-20.PRIMARY" + expectedLabels2Prefix := "TestVStream.20-40.PRIMARY" + expectedLabels1 := expectedLabels1Prefix + "." + hostname1 + expectedLabels2 := expectedLabels2Prefix + "." + hostname2 wantVStreamsCreated := make(map[string]int64) - wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1 - wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1 + wantVStreamsCreated[expectedLabels1] = 1 + wantVStreamsCreated[expectedLabels2] = 1 assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches") wantVStreamsLag := make(map[string]int64) - wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5 - wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7 + wantVStreamsLag[expectedLabels1] = 5 + wantVStreamsLag[expectedLabels2] = 7 assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches") + + wantVStreamsCount := make(map[string]int64) + wantVStreamsCount[expectedLabels1] = 1 + wantVStreamsCount[expectedLabels2] = 1 + assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches") + + wantVStreamsEventsStreamed := make(map[string]int64) + wantVStreamsEventsStreamed[expectedLabels1] = 2 + wantVStreamsEventsStreamed[expectedLabels2] = 2 + assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches") + + wantVStreamsEndedWithErrors := make(map[string]int64) + wantVStreamsEndedWithErrors[expectedLabels1Prefix] = 0 + wantVStreamsEndedWithErrors[expectedLabels2Prefix] = 0 + assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") +} + +func TestVStreamsMetricsErrors(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cell := "aa" + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.vstreamsCreated.ResetAll() + vsm.vstreamsLag.ResetAll() + hostname1 := "host1" + hostname2 := "host2" + sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + + const wantErr = "Invalid arg message" + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr)) + + send1 := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9}, + } + sbc1.AddVStreamEvents(send1, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + ch := make(chan *binlogdatapb.VStreamResponse) + done := make(chan struct{}) + go func() { + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + ch <- &binlogdatapb.VStreamResponse{Events: events} + return nil + }) + + if err == nil || !strings.Contains(err.Error(), wantErr) { + t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + } + close(done) + }() + <-done + + expectedLabels1 := "TestVStream.-20.PRIMARY" + expectedLabels2 := "TestVStream.20-40.PRIMARY" + + wantVStreamsEndedWithErrors := make(map[string]int64) + wantVStreamsEndedWithErrors[expectedLabels1] = 1 + wantVStreamsEndedWithErrors[expectedLabels2] = 1 + assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") } func TestVStreamRetriableErrors(t *testing.T) {