Skip to content

Commit

Permalink
Add more vstream metrics for vstream manager
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Feb 24, 2025
1 parent 81ce29c commit 6359a99
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 17 deletions.
41 changes: 31 additions & 10 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ type vstreamManager struct {
toposerv srvtopo.Server
cell string

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
vstreamsCount *stats.CountersWithMultiLabels
vstreamsEventsStreamed *stats.CountersWithMultiLabels
vstreamsEndedWithErrors *stats.CountersWithMultiLabels
}

// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
Expand Down Expand Up @@ -151,10 +154,22 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
[]string{"Keyspace", "ShardName", "TabletType"}),
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
vstreamsLag: exporter.NewGaugesWithMultiLabels(
"VStreamsLag",
"Difference between event current time and the binlog event timestamp",
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
vstreamsCount: exporter.NewCountersWithMultiLabels(
"VStreamsCount",
"Number of active vstreams",
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels(
"VStreamsEventsStreamed",
"Number of vstreams events sent",
[]string{"Keyspace", "ShardName", "TabletType", "TabletHostname"}),
vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels(
"VStreamsEndedWithErrors",
"Number of vstreams ended with errors",
[]string{"Keyspace", "ShardName", "TabletType"}),
}
}
Expand Down Expand Up @@ -378,11 +393,17 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
vs.wg.Add(1)
go func() {
defer vs.wg.Done()

labels := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}
vs.vsm.vstreamsEndedWithErrors.Add(labels, 0)

err := vs.streamFromTablet(ctx, sgtid)

// Set the error on exit. First one wins.
if err != nil {
log.Errorf("Error in vstream for %+v: %s", sgtid, err)

vs.vsm.vstreamsEndedWithErrors.Add(labels, 1)
vs.once.Do(func() {
vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid))
vs.cancel()
Expand Down Expand Up @@ -613,18 +634,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
TableLastPKs: sgtid.TablePKs,
Options: options,
}
var vstreamCreatedOnce sync.Once

labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String(), tablet.Hostname}
vs.vsm.vstreamsCreated.Add(labels, 1)
vs.vsm.vstreamsCount.Add(labels, 1)

log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0

labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}

vstreamCreatedOnce.Do(func() {
vs.vsm.vstreamsCreated.Add(labels, 1)
})

select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
Expand All @@ -646,6 +665,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for i, event := range events {
vs.vsm.vstreamsEventsStreamed.Add(labels, 1)
switch event.Type {
case binlogdatapb.VEventType_FIELD:
// Update table names and send.
Expand Down Expand Up @@ -762,6 +782,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
return nil
})
vs.vsm.vstreamsCount.Add(labels, -1)
// If stream was ended (by a journal event), return nil without checking for error.
select {
case <-journalDone:
Expand Down
97 changes: 90 additions & 7 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func TestVStreamMulti(t *testing.T) {
}
}

func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
func TestVStreamsMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cell := "aa"
Expand All @@ -346,9 +346,11 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
vsm := newTestVStreamManager(ctx, hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
hostname1 := "host1"
hostname2 := "host2"
sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())

send0 := []*binlogdatapb.VEvent{
Expand Down Expand Up @@ -377,15 +379,96 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
ch := startVStream(ctx, t, vsm, vgtid, nil)
<-ch
<-ch
expectedLabels1Prefix := "TestVStream.-20.PRIMARY"
expectedLabels2Prefix := "TestVStream.20-40.PRIMARY"
expectedLabels1 := expectedLabels1Prefix + "." + hostname1
expectedLabels2 := expectedLabels2Prefix + "." + hostname2
wantVStreamsCreated := make(map[string]int64)
wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
wantVStreamsCreated[expectedLabels1] = 1
wantVStreamsCreated[expectedLabels2] = 1
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")

wantVStreamsLag := make(map[string]int64)
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
wantVStreamsLag[expectedLabels1] = 5
wantVStreamsLag[expectedLabels2] = 7
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")

wantVStreamsCount := make(map[string]int64)
wantVStreamsCount[expectedLabels1] = 1
wantVStreamsCount[expectedLabels2] = 1
assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches")

wantVStreamsEventsStreamed := make(map[string]int64)
wantVStreamsEventsStreamed[expectedLabels1] = 2
wantVStreamsEventsStreamed[expectedLabels2] = 2
assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches")

wantVStreamsEndedWithErrors := make(map[string]int64)
wantVStreamsEndedWithErrors[expectedLabels1Prefix] = 0
wantVStreamsEndedWithErrors[expectedLabels2Prefix] = 0
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
}

func TestVStreamsMetricsErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cell := "aa"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(ctx, hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
hostname1 := "host1"
hostname2 := "host2"
sbc0 := hc.AddTestTablet(cell, hostname1, 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, hostname2, 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())

const wantErr = "Invalid arg message"
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr))

send1 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(send1, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}, {
Keyspace: ks,
Shard: "20-40",
Gtid: "pos",
}},
}
ch := make(chan *binlogdatapb.VStreamResponse)
done := make(chan struct{})
go func() {
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
ch <- &binlogdatapb.VStreamResponse{Events: events}
return nil
})

if err == nil || !strings.Contains(err.Error(), wantErr) {
t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr)
}
close(done)
}()
<-done

expectedLabels1 := "TestVStream.-20.PRIMARY"
expectedLabels2 := "TestVStream.20-40.PRIMARY"

wantVStreamsEndedWithErrors := make(map[string]int64)
wantVStreamsEndedWithErrors[expectedLabels1] = 1
wantVStreamsEndedWithErrors[expectedLabels2] = 1
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
}

func TestVStreamRetriableErrors(t *testing.T) {
Expand Down

0 comments on commit 6359a99

Please sign in to comment.