Skip to content

Commit

Permalink
Merge branch 'main' into 1044-fix-links-for-mounts-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
knylander-grafana authored Feb 6, 2025
2 parents 5b8a3c9 + f947882 commit 2dd6879
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 351 deletions.
6 changes: 5 additions & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,11 @@ func (t *App) initBlockBuilder() (services.Service, error) {
t.cfg.BlockBuilder.AssignedPartitions = map[string][]int32{t.cfg.BlockBuilder.InstanceID: {0}}
}

t.blockBuilder = blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store)
bb, err := blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store)
if err != nil {
return nil, fmt.Errorf("failed to create block-builder: %w", err)
}
t.blockBuilder = bb

return t.blockBuilder, nil
}
Expand Down
8 changes: 8 additions & 0 deletions docs/docs.mk
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,11 @@ update: ## Fetch the latest version of this Makefile and the `make-docs` script
curl -s -LO https://raw.githubusercontent.com/grafana/writers-toolkit/main/docs/docs.mk
curl -s -LO https://raw.githubusercontent.com/grafana/writers-toolkit/main/docs/make-docs
chmod +x make-docs

.PHONY: topic/%
topic/%: ## Create a topic from the Writers' Toolkit template. Specify the topic type as the target, for example, `make topic/task TOPIC_PATH=sources/my-new-topic.md`.
topic/%:
$(if $(TOPIC_PATH),,$(error "You must set the TOPIC_PATH variable to the path where the $(@) topic will be created.
For example: make $(@) TOPIC_PATH=sources/my-new-topic.md"))
mkdir -p $(dir $(TOPIC_PATH))
curl -s -o $(TOPIC_PATH) https://raw.githubusercontent.com/grafana/writers-toolkit/refs/heads/main/docs/static/templates/$(@F)-template.md
71 changes: 71 additions & 0 deletions docs/sources/tempo/setup/size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
---
title: Size the cluster
menuTitle: Size the cluster
description: Plan the size of your Tempo cluster.
aliases:
- /docs/tempo/deployment
- /docs/tempo/deployment/deployment
- /docs/tempo/setup/deployment
weight: 250
---

# Size the cluster

Resource requirements for your Grafana Tempo cluster depend on the amount and rate of data processed, retained, and queried.

This document provides basic configuration guidelines that you can use as a starting point to help size your own deployment.

{{< admonition type="note" >}}
Tempo is under continuous development. These requirements can change with each release.
{{< /admonition >}}

## Factors impacting cluster sizing

The size of the cluster you deploy depends on how many resources it needs for a given ingestion rate and retention: number of spans/time, average byte span size, rate of querying, and retention N days.

Tracing instrumentation also effects your Tempo cluster requirements.
Refer to [Best practices](https://grafana.com/docs/tempo/<TEMPO_VERSION>/getting-started/best-practices/) for suggestions on determining where to add spans, span length, and attributes.

## Example sample cluster sizing

Distributor:

* 1 replica per every 10MB/s of received traffic
* CPU: 2 cores
* Mem: 2 GB

Ingester:

* 1 replica per every 3-5MB/s of received traffic.
* CPU: 2.5 cores
* Mem: 4-20GB, determined by trace composition

Querier:

* 1 replica per every 1-2MB/s of received traffic.
* CPU: dependent on trace size and queries
* Mem: 4-20GB, determined by trace composition and queries
* This number of queriers should give good performance for typical search patterns and time ranges. Can scale up or down to fit the specific workload.

Query-Frontend:

* 2 replicas, for high availability
* CPU: dependent on trace size and queries
* Mem: 4-20GB, dependent on trace size and queries

Compactor:

* 1 replica per every 3-5 MB/s of received traffic.
* CPU: 1 core (compactors are primarily I/O bound, therefore do not require much CPU)
* Mem: 4-20GB, determined by trace composition

## Performance tuning resources

Refer to these documents for additional information on tuning your Tempo cluster:

* [Monitor Tempo](https://grafana.com/docs/tempo/<TEMPO_VERSION>/operations/monitor/)
* [Tune search performance](https://grafana.com/docs/tempo/<TEMPO_VERSION>/operations/backend_search/)
* [Improve performance with caching](https://grafana.com/docs/tempo/<TEMPO_VERSION>/operations/caching/)
* [Dedicated attribute columns](https://grafana.com/docs/tempo/<TEMPO_VERSION>/operations/dedicated_columns/)

For information on more advanced system options, refer to [Manage advanced systems](https://grafana.com/docs/tempo/<TEMPO_VERSION>/operations/manage-advanced-systems/).
27 changes: 7 additions & 20 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ func New(
partitionRing ring.PartitionRingReader,
overrides Overrides,
store storage.Store,
) *BlockBuilder {
) (*BlockBuilder, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}

b := &BlockBuilder{
logger: logger,
cfg: cfg,
Expand All @@ -113,7 +117,7 @@ func New(
}

b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
return b
return b, nil
}

func (b *BlockBuilder) starting(ctx context.Context) (err error) {
Expand Down Expand Up @@ -244,7 +248,6 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov
init bool
writer *writer
lastRec *kgo.Record
nextCut time.Time
end time.Time
)

Expand Down Expand Up @@ -329,8 +332,7 @@ outer:
if !init {
end = rec.Timestamp.Add(dur) // When block will be cut
metricPartitionLagSeconds.WithLabelValues(partLabel).Set(time.Since(rec.Timestamp).Seconds())
writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), rec.Timestamp, dur, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
nextCut = rec.Timestamp.Add(cutTime)
writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), rec.Timestamp, dur, b.cfg.WAL.IngestionSlack, b.cfg.BlockConfig, b.overrides, b.wal, b.enc)
init = true
}

Expand All @@ -346,15 +348,6 @@ outer:
break outer
}

if rec.Timestamp.After(nextCut) {
// Cut before appending this trace
err = writer.cutidle(rec.Timestamp.Add(-cutTime), false)
if err != nil {
return false, err
}
nextCut = rec.Timestamp.Add(cutTime)
}

err := b.pushTraces(rec.Timestamp, rec.Key, rec.Value, writer)
if err != nil {
return false, err
Expand All @@ -378,12 +371,6 @@ outer:
return false, nil
}

// Cut any remaining
err = writer.cutidle(time.Time{}, true)
if err != nil {
return false, err
}

err = writer.flush(ctx, b.writer)
if err != nil {
return false, err
Expand Down
34 changes: 20 additions & 14 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address)

b := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
Expand Down Expand Up @@ -109,7 +110,8 @@ func TestBlockbuilder_startWithCommit(t *testing.T) {
admClient := kadm.NewClient(client)
require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets))

b := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
Expand Down Expand Up @@ -159,7 +161,8 @@ func TestBlockbuilder_flushingFails(t *testing.T) {
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
producedRecords := sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles

b := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
Expand Down Expand Up @@ -194,7 +197,8 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) {
store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address)

b := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
Expand Down Expand Up @@ -283,7 +287,8 @@ func TestBlockbuilder_committingFails(t *testing.T) {
client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka)
producedRecords := sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles

b := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
Expand Down Expand Up @@ -525,26 +530,27 @@ func BenchmarkBlockBuilder(b *testing.B) {

cfg.ConsumeCycleDuration = 1 * time.Hour

bb := New(cfg, logger, newPartitionRingReader(), o, store)
bb, err := New(cfg, logger, newPartitionRingReader(), o, store)
require.NoError(b, err)
defer func() { require.NoError(b, bb.stopping(nil)) }()

// Startup (without starting the background consume cycle)
err := bb.starting(ctx)
err = bb.starting(ctx)
require.NoError(b, err)

b.ResetTimer()

for i := 0; i < b.N; i++ {

var records []*kgo.Record
// Send more data
b.StopTimer()
size := 0
for i := 0; i < 1000; i++ {
records = append(records, sendReq(b, ctx, client)...)
}

var size int
for _, r := range records {
size += len(r.Value)
for _, r := range sendReq(b, ctx, client) {
size += len(r.Value)
}
}
b.StartTimer()

err = bb.consume(ctx)
require.NoError(b, err)
Expand Down
70 changes: 39 additions & 31 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"golang.org/x/sync/errgroup"
)

type partitionSectionWriter interface {
Expand All @@ -23,10 +24,12 @@ type partitionSectionWriter interface {
type writer struct {
logger log.Logger

blockCfg BlockConfig
partition, firstOffset uint64
startSectionTime time.Time
cycleDuration time.Duration
blockCfg BlockConfig
partition uint64
startOffset uint64
startTime time.Time
cycleDuration time.Duration
slackDuration time.Duration

overrides Overrides
wal *wal.WAL
Expand All @@ -36,19 +39,20 @@ type writer struct {
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, partition, firstOffset uint64, startSectionTime time.Time, cycleDuration time.Duration, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition, firstOffset uint64, startTime time.Time, cycleDuration, slackDuration time.Duration, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
firstOffset: firstOffset,
startSectionTime: startSectionTime,
cycleDuration: cycleDuration,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
logger: logger,
partition: partition,
startOffset: firstOffset,
startTime: startTime,
cycleDuration: cycleDuration,
slackDuration: slackDuration,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
}
}

Expand All @@ -74,24 +78,28 @@ func (p *writer) pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRe
return nil
}

func (p *writer) cutidle(since time.Time, immediate bool) error {
for _, i := range p.m {
if err := i.CutIdle(p.startSectionTime, p.cycleDuration, since, immediate); err != nil {
return err
}
}
return nil
}

func (p *writer) flush(ctx context.Context, store tempodb.Writer) error {
// TODO - Retry with backoff?

// Flush tenants concurrently
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(flushConcurrency)

for _, i := range p.m {
level.Info(p.logger).Log("msg", "flushing tenant", "tenant", i.tenantID)
if err := i.Flush(ctx, store); err != nil {
return err
}
g.Go(func() error {
i := i
st := time.Now()

level.Info(p.logger).Log("msg", "flushing tenant", "tenant", i.tenantID)
err := i.Flush(ctx, store)
if err != nil {
return err
}
level.Info(p.logger).Log("msg", "flushed tenant", "tenant", i.tenantID, "elapsed", time.Since(st))
return nil
})
}
return nil
return g.Wait()
}

func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {
Expand All @@ -102,7 +110,7 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {
return i, nil
}

i, err := newTenantStore(tenant, p.partition, p.firstOffset, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
i, err := newTenantStore(tenant, p.partition, p.startOffset, p.startTime, p.cycleDuration, p.slackDuration, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
if err != nil {
return nil, err
}
Expand Down
17 changes: 11 additions & 6 deletions modules/blockbuilder/partition_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ import (
)

func getPartitionWriter(t *testing.T) *writer {
logger := log.NewNopLogger()
startTime := time.Now()
cycleDuration := 1 * time.Minute
blockCfg := BlockConfig{}
tmpDir := t.TempDir()
var (
logger = log.NewNopLogger()
blockCfg = BlockConfig{}
tmpDir = t.TempDir()
partition = uint64(1)
startOffset = uint64(1)
startTime = time.Now()
cycleDuration = time.Minute
slackDuration = time.Minute
)
w, err := wal.New(&wal.Config{
Filepath: tmpDir,
Encoding: backend.EncNone,
Expand All @@ -26,7 +31,7 @@ func getPartitionWriter(t *testing.T) *writer {
})
require.NoError(t, err)

return newPartitionSectionWriter(logger, 1, 1, startTime, cycleDuration, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding())
return newPartitionSectionWriter(logger, partition, startOffset, startTime, cycleDuration, slackDuration, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding())
}

func TestPushBytes(t *testing.T) {
Expand Down
Loading

0 comments on commit 2dd6879

Please sign in to comment.