Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more vstream metrics for vstream manager #17858

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vtgate

import (
"context"
"errors"
"fmt"
"io"
"regexp"
Expand Down Expand Up @@ -51,8 +52,11 @@ type vstreamManager struct {
toposerv srvtopo.Server
cell string

vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
vstreamsCreated *stats.CountersWithMultiLabels
vstreamsLag *stats.GaugesWithMultiLabels
vstreamsCount *stats.CountersWithMultiLabels
vstreamsEventsStreamed *stats.CountersWithMultiLabels
vstreamsEndedWithErrors *stats.CountersWithMultiLabels
}

// maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set
Expand Down Expand Up @@ -143,6 +147,7 @@ type journalEvent struct {

func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager {
exporter := servenv.NewExporter(cell, "VStreamManager")
labels := []string{"Keyspace", "ShardName", "TabletType"}

return &vstreamManager{
resolver: resolver,
Expand All @@ -151,11 +156,23 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str
vstreamsCreated: exporter.NewCountersWithMultiLabels(
"VStreamsCreated",
"Number of vstreams created",
[]string{"Keyspace", "ShardName", "TabletType"}),
labels),
vstreamsLag: exporter.NewGaugesWithMultiLabels(
"VStreamsLag",
"Difference between event current time and the binlog event timestamp",
[]string{"Keyspace", "ShardName", "TabletType"}),
labels),
vstreamsCount: exporter.NewCountersWithMultiLabels(
"VStreamsCount",
"Number of active vstreams",
labels),
vstreamsEventsStreamed: exporter.NewCountersWithMultiLabels(
"VStreamsEventsStreamed",
"Number of events sent across all vstreams",
labels),
vstreamsEndedWithErrors: exporter.NewCountersWithMultiLabels(
"VStreamsEndedWithErrors",
"Number of vstreams that ended with errors",
labels),
}
}

Expand Down Expand Up @@ -378,11 +395,26 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard
vs.wg.Add(1)
go func() {
defer vs.wg.Done()

labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}
// Initialize vstreamsEndedWithErrors metric to zero.
vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any value in adding 0. Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to initialize the counter to zero for this keyspace/shard/tablet type. This is a best practice. Metrics are hard to work with if they only sometimes exist. This is also seen in the unit test for metrics that we can't assert that there were zero errors for a vstream that worked fine (the metric is simply missing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to Reset to hopefully make it more clear, as Add zero does appear to be not useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Reset what you really want? It's a counter and not a gauge. I assumed that it was meant to be a counter that spanned the life of the vtgate as the description is "Number of vstreams that ended with errors".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes good point, this could be called multiple times. So we will just keep with Add zero.

vs.vsm.vstreamsCreated.Add(labelValues, 1)
vs.vsm.vstreamsCount.Add(labelValues, 1)

err := vs.streamFromTablet(ctx, sgtid)

// Set the error on exit. First one wins.
if err != nil {
log.Errorf("Error in vstream for %+v: %s", sgtid, err)
// Get the original/base error.
uerr := vterrors.UnwrapAll(err)
if !errors.Is(uerr, context.Canceled) && !errors.Is(uerr, context.DeadlineExceeded) {
// The client did not intentionally end the stream so this was an error in the
// vstream itself.
vs.vsm.vstreamsEndedWithErrors.Add(labelValues, 1)
}
vs.vsm.vstreamsCount.Add(labelValues, -1)
vs.once.Do(func() {
vs.setError(err, fmt.Sprintf("error starting stream from shard GTID %+v", sgtid))
vs.cancel()
Expand Down Expand Up @@ -503,6 +535,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// It will be closed when all journal events converge.
var journalDone chan struct{}
ignoreTablets := make([]*topodatapb.TabletAlias, 0)
labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}

errCount := 0
for {
Expand Down Expand Up @@ -613,18 +646,11 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
TableLastPKs: sgtid.TablePKs,
Options: options,
}
var vstreamCreatedOnce sync.Once
log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req)
err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0

labels := []string{sgtid.Keyspace, sgtid.Shard, req.Target.TabletType.String()}

vstreamCreatedOnce.Do(func() {
vs.vsm.vstreamsCreated.Add(labels, 1)
})

select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s",
Expand Down Expand Up @@ -755,7 +781,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
sendevents = append(sendevents, event)
}
lag := event.CurrentTime/1e9 - event.Timestamp
vs.vsm.vstreamsLag.Set(labels, lag)
vs.vsm.vstreamsLag.Set(labelValues, lag)
}
if len(sendevents) != 0 {
eventss = append(eventss, sendevents)
Expand Down Expand Up @@ -793,6 +819,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err)
}

}

// shouldRetry determines whether we should exit immediately or retry the vstream.
Expand Down Expand Up @@ -838,6 +865,7 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) {
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
defer vs.mu.Unlock()
labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()}

// Send all chunks while holding the lock.
for _, events := range eventss {
Expand Down Expand Up @@ -890,6 +918,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
case <-ctx.Done():
return nil
case vs.eventCh <- events:
vs.vsm.vstreamsEventsStreamed.Add(labelValues, int64(len(events)))
}
}
return nil
Expand Down
96 changes: 90 additions & 6 deletions go/vt/vtgate/vstream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func TestVStreamMulti(t *testing.T) {
}
}

func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
func TestVStreamsMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cell := "aa"
Expand All @@ -346,9 +346,12 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
vsm := newTestVStreamManager(ctx, hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
vsm.vstreamsCount.ResetAll()
vsm.vstreamsEventsStreamed.ResetAll()
vsm.vstreamsEndedWithErrors.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())

send0 := []*binlogdatapb.VEvent{
Expand Down Expand Up @@ -377,15 +380,96 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) {
ch := startVStream(ctx, t, vsm, vgtid, nil)
<-ch
<-ch
expectedLabels1 := "TestVStream.-20.PRIMARY"
expectedLabels2 := "TestVStream.20-40.PRIMARY"
wantVStreamsCreated := make(map[string]int64)
wantVStreamsCreated["TestVStream.-20.PRIMARY"] = 1
wantVStreamsCreated["TestVStream.20-40.PRIMARY"] = 1
wantVStreamsCreated[expectedLabels1] = 1
wantVStreamsCreated[expectedLabels2] = 1
assert.Equal(t, wantVStreamsCreated, vsm.vstreamsCreated.Counts(), "vstreamsCreated matches")

wantVStreamsLag := make(map[string]int64)
wantVStreamsLag["TestVStream.-20.PRIMARY"] = 5
wantVStreamsLag["TestVStream.20-40.PRIMARY"] = 7
wantVStreamsLag[expectedLabels1] = 5
wantVStreamsLag[expectedLabels2] = 7
assert.Equal(t, wantVStreamsLag, vsm.vstreamsLag.Counts(), "vstreamsLag matches")

wantVStreamsCount := make(map[string]int64)
wantVStreamsCount[expectedLabels1] = 1
wantVStreamsCount[expectedLabels2] = 1
assert.Equal(t, wantVStreamsCount, vsm.vstreamsCount.Counts(), "vstreamsCount matches")

wantVStreamsEventsStreamed := make(map[string]int64)
wantVStreamsEventsStreamed[expectedLabels1] = 2
wantVStreamsEventsStreamed[expectedLabels2] = 2
assert.Equal(t, wantVStreamsEventsStreamed, vsm.vstreamsEventsStreamed.Counts(), "vstreamsEventsStreamed matches")

wantVStreamsEndedWithErrors := make(map[string]int64)
wantVStreamsEndedWithErrors[expectedLabels1] = 0
wantVStreamsEndedWithErrors[expectedLabels2] = 0
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
}

func TestVStreamsMetricsErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cell := "aa"
ks := "TestVStream"
_ = createSandbox(ks)
hc := discovery.NewFakeHealthCheck(nil)
st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"})
vsm := newTestVStreamManager(ctx, hc, st, cell)
vsm.vstreamsCreated.ResetAll()
vsm.vstreamsLag.ResetAll()
vsm.vstreamsCount.ResetAll()
vsm.vstreamsEventsStreamed.ResetAll()
vsm.vstreamsEndedWithErrors.ResetAll()
sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet())
sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil)
addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet())

const wantErr = "Invalid arg message"
sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, wantErr))

send1 := []*binlogdatapb.VEvent{
{Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"},
{Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9},
}
sbc1.AddVStreamEvents(send1, nil)

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: ks,
Shard: "-20",
Gtid: "pos",
}, {
Keyspace: ks,
Shard: "20-40",
Gtid: "pos",
}},
}
ch := make(chan *binlogdatapb.VStreamResponse)
done := make(chan struct{})
go func() {
err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error {
ch <- &binlogdatapb.VStreamResponse{Events: events}
return nil
})

if err == nil || !strings.Contains(err.Error(), wantErr) {
require.ErrorContains(t, err, wantErr)
}
close(done)
}()
<-ch
<-done

expectedLabels1 := "TestVStream.-20.PRIMARY"
expectedLabels2 := "TestVStream.20-40.PRIMARY"

wantVStreamsEndedWithErrors := make(map[string]int64)
wantVStreamsEndedWithErrors[expectedLabels1] = 1
wantVStreamsEndedWithErrors[expectedLabels2] = 0
assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches")
}

func TestVStreamRetriableErrors(t *testing.T) {
Expand Down
Loading