Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rhythm] Move group partition lag metric to ingest package, export from generators too #4571

Merged
merged 7 commits into from
Jan 22, 2025
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
* [CHANGE] **BREAKING CHANGE** Removed `internal_error` as a reason from `tempo_discarded_spans_total`. [#4554](https://github.com/grafana/tempo/pull/4554) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
* [ENHANCEMENT] Improve block-builder performance by flushing blocks concurrently [#4565](https://github.com/grafana/tempo/pull/4565) (@mdisibio)
* [ENHANCEMENT] Export new `tempo_ingest_group_partition_lag` metric from block-builders and metrics-generators [#4571](https://github.com/grafana/tempo/pull/4571) (@mdisibio)
* [BUGFIX] Choose a default step for a gRPC streaming query range request if none is provided. [#4546](https://github.com/grafana/tempo/pull/4546) (@joe-elliott)
Fix an issue where the tempo-cli was not correctly dumping exemplar results.
* [BUGFIX] Fix performance bottleneck and file cleanup in block builder [#4550](https://github.com/grafana/tempo/pull/4550) (@mdisibio)
77 changes: 6 additions & 71 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
)

@@ -51,12 +50,6 @@ var (
Name: "fetch_records_total",
Help: "Total number of records fetched from Kafka",
}, []string{"partition"})
metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "partition_lag",
Help: "Lag of a partition.",
}, []string{"partition"})
metricPartitionLagSeconds = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "block_builder",
@@ -167,7 +160,12 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) {

b.kadm = kadm.NewClient(b.kafkaClient)

go b.metricLag(ctx)
ingest.ExportPartitionLagMetrics(
ctx,
b.kadm,
b.logger,
b.cfg.IngestStorageConfig,
b.getAssignedActivePartitions)

return nil
}
@@ -384,33 +382,6 @@ outer:
return more, nil
}

func (b *BlockBuilder) metricLag(ctx context.Context) {
var (
waitTime = time.Second * 15
topic = b.cfg.IngestStorageConfig.Kafka.Topic
group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
)

for {
select {
case <-time.After(waitTime):
lag, err := getGroupLag(ctx, b.kadm, topic, group)
if err != nil {
level.Error(b.logger).Log("msg", "metric lag failed:", "err", err)
continue
}
for _, p := range b.getAssignedActivePartitions() {
l, ok := lag.Lookup(topic, p)
if ok {
metricPartitionLag.WithLabelValues(strconv.Itoa(int(p))).Set(float64(l.Lag))
}
}
case <-ctx.Done():
return
}
}
}

func (b *BlockBuilder) stopping(err error) error {
if b.kafkaClient != nil {
b.kafkaClient.Close()
@@ -439,39 +410,3 @@ func (b *BlockBuilder) getAssignedActivePartitions() []int32 {
}
return assignedActivePartitions
}

// getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
//
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
// If the block builder committed an offset for a given partition to the consumer group at least once, then
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
return nil, fmt.Errorf("fetch offsets: %w", err)
}
}
if err := offsets.Error(); err != nil {
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
}

startOffsets, err := admClient.ListStartOffsets(ctx, topic)
if err != nil {
return nil, err
}
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
if err != nil {
return nil, err
}

descrGroup := kadm.DescribedGroup{
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
// because we don't use group consumption.
State: "Empty",
}
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
}
2 changes: 1 addition & 1 deletion modules/generator/generator.go
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ type Generator struct {
logger log.Logger

kafkaWG sync.WaitGroup
kafkaStop chan struct{}
kafkaStop func()
kafkaClient *kgo.Client
kafkaAdm *kadm.Client
partitionRing ring.PartitionRingReader
15 changes: 8 additions & 7 deletions modules/generator/generator_kafka.go
Original file line number Diff line number Diff line change
@@ -17,21 +17,24 @@ import (
)

func (g *Generator) startKafka() {
g.kafkaStop = make(chan struct{})
// Create context that will be used to stop the goroutines.
var ctx context.Context
ctx, g.kafkaStop = context.WithCancel(context.Background())

g.kafkaWG.Add(1)
go g.listenKafka()
go g.listenKafka(ctx)
go ingest.ExportPartitionLagMetrics(ctx, g.kafkaAdm, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions)
}

func (g *Generator) stopKafka() {
close(g.kafkaStop)
g.kafkaStop()
g.kafkaWG.Wait()
}

func (g *Generator) listenKafka() {
func (g *Generator) listenKafka(ctx context.Context) {
defer g.kafkaWG.Done()

level.Info(g.logger).Log("msg", "generator now listening to kafka")
ctx := context.Background()
for {
select {
case <-time.After(2 * time.Second):
@@ -44,8 +47,6 @@ func (g *Generator) listenKafka() {
level.Error(g.logger).Log("msg", "readKafka failed", "err", err)
continue
}
case <-g.kafkaStop:
return
case <-ctx.Done():
return
}
89 changes: 89 additions & 0 deletions pkg/ingest/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package ingest

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
)

var metricPartitionLag = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Subsystem: "ingest",
Name: "group_partition_lag",
Help: "Lag of a partition.",
}, []string{"group", "partition"})

// TODO - Simplify signature to create client instead?
func ExportPartitionLagMetrics(ctx context.Context, admClient *kadm.Client, log log.Logger, cfg Config, getAssignedActivePartitions func() []int32) {
go func() {
var (
waitTime = time.Second * 15
topic = cfg.Kafka.Topic
group = cfg.Kafka.ConsumerGroup
)

for {
select {
case <-time.After(waitTime):
lag, err := getGroupLag(ctx, admClient, topic, group)
if err != nil {
level.Error(log).Log("msg", "metric lag failed:", "err", err)
continue
}
for _, p := range getAssignedActivePartitions() {
l, ok := lag.Lookup(topic, p)
if ok {
metricPartitionLag.WithLabelValues(group, strconv.Itoa(int(p))).Set(float64(l.Lag))
}
}
case <-ctx.Done():
return
}
}
}()
}

// getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
//
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
// If the block builder committed an offset for a given partition to the consumer group at least once, then
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
return nil, fmt.Errorf("fetch offsets: %w", err)
}
}
if err := offsets.Error(); err != nil {
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
}

startOffsets, err := admClient.ListStartOffsets(ctx, topic)
if err != nil {
return nil, err
}
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
if err != nil {
return nil, err
}

descrGroup := kadm.DescribedGroup{
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
// because we don't use group consumption.
State: "Empty",
}
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
}