diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index 3aa760426f3..9c97ae3af37 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -341,7 +341,11 @@ func (t *App) initBlockBuilder() (services.Service, error) { t.cfg.BlockBuilder.AssignedPartitions = map[string][]int32{t.cfg.BlockBuilder.InstanceID: {0}} } - t.blockBuilder = blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store) + bb, err := blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store) + if err != nil { + return nil, fmt.Errorf("failed to create block-builder: %w", err) + } + t.blockBuilder = bb return t.blockBuilder, nil } diff --git a/docs/docs.mk b/docs/docs.mk index c0aae10ba50..46b2afb6547 100644 --- a/docs/docs.mk +++ b/docs/docs.mk @@ -120,3 +120,11 @@ update: ## Fetch the latest version of this Makefile and the `make-docs` script curl -s -LO https://raw.githubusercontent.com/grafana/writers-toolkit/main/docs/docs.mk curl -s -LO https://raw.githubusercontent.com/grafana/writers-toolkit/main/docs/make-docs chmod +x make-docs + +.PHONY: topic/% +topic/%: ## Create a topic from the Writers' Toolkit template. Specify the topic type as the target, for example, `make topic/task TOPIC_PATH=sources/my-new-topic.md`. +topic/%: + $(if $(TOPIC_PATH),,$(error "You must set the TOPIC_PATH variable to the path where the $(@) topic will be created. + For example: make $(@) TOPIC_PATH=sources/my-new-topic.md")) + mkdir -p $(dir $(TOPIC_PATH)) + curl -s -o $(TOPIC_PATH) https://raw.githubusercontent.com/grafana/writers-toolkit/refs/heads/main/docs/static/templates/$(@F)-template.md diff --git a/docs/sources/tempo/setup/size.md b/docs/sources/tempo/setup/size.md new file mode 100644 index 00000000000..16c5afdf8b2 --- /dev/null +++ b/docs/sources/tempo/setup/size.md @@ -0,0 +1,71 @@ +--- +title: Size the cluster +menuTitle: Size the cluster +description: Plan the size of your Tempo cluster. +aliases: + - /docs/tempo/deployment + - /docs/tempo/deployment/deployment + - /docs/tempo/setup/deployment +weight: 250 +--- + +# Size the cluster + +Resource requirements for your Grafana Tempo cluster depend on the amount and rate of data processed, retained, and queried. + +This document provides basic configuration guidelines that you can use as a starting point to help size your own deployment. + +{{< admonition type="note" >}} +Tempo is under continuous development. These requirements can change with each release. +{{< /admonition >}} + +## Factors impacting cluster sizing + +The size of the cluster you deploy depends on how many resources it needs for a given ingestion rate and retention: number of spans/time, average byte span size, rate of querying, and retention N days. + +Tracing instrumentation also effects your Tempo cluster requirements. +Refer to [Best practices](https://grafana.com/docs/tempo//getting-started/best-practices/) for suggestions on determining where to add spans, span length, and attributes. + +## Example sample cluster sizing + +Distributor: + +* 1 replica per every 10MB/s of received traffic +* CPU: 2 cores +* Mem: 2 GB + +Ingester: + +* 1 replica per every 3-5MB/s of received traffic. +* CPU: 2.5 cores +* Mem: 4-20GB, determined by trace composition + +Querier: + +* 1 replica per every 1-2MB/s of received traffic. +* CPU: dependent on trace size and queries +* Mem: 4-20GB, determined by trace composition and queries +* This number of queriers should give good performance for typical search patterns and time ranges. Can scale up or down to fit the specific workload. + +Query-Frontend: + +* 2 replicas, for high availability +* CPU: dependent on trace size and queries +* Mem: 4-20GB, dependent on trace size and queries + +Compactor: + +* 1 replica per every 3-5 MB/s of received traffic. +* CPU: 1 core (compactors are primarily I/O bound, therefore do not require much CPU) +* Mem: 4-20GB, determined by trace composition + +## Performance tuning resources + +Refer to these documents for additional information on tuning your Tempo cluster: + +* [Monitor Tempo](https://grafana.com/docs/tempo//operations/monitor/) +* [Tune search performance](https://grafana.com/docs/tempo//operations/backend_search/) +* [Improve performance with caching](https://grafana.com/docs/tempo//operations/caching/) +* [Dedicated attribute columns](https://grafana.com/docs/tempo//operations/dedicated_columns/) + +For information on more advanced system options, refer to [Manage advanced systems](https://grafana.com/docs/tempo//operations/manage-advanced-systems/). \ No newline at end of file diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index d550c6d9403..8e9f89361ad 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -102,7 +102,11 @@ func New( partitionRing ring.PartitionRingReader, overrides Overrides, store storage.Store, -) *BlockBuilder { +) (*BlockBuilder, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + b := &BlockBuilder{ logger: logger, cfg: cfg, @@ -113,7 +117,7 @@ func New( } b.Service = services.NewBasicService(b.starting, b.running, b.stopping) - return b + return b, nil } func (b *BlockBuilder) starting(ctx context.Context) (err error) { @@ -244,7 +248,6 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov init bool writer *writer lastRec *kgo.Record - nextCut time.Time end time.Time ) @@ -329,8 +332,7 @@ outer: if !init { end = rec.Timestamp.Add(dur) // When block will be cut metricPartitionLagSeconds.WithLabelValues(partLabel).Set(time.Since(rec.Timestamp).Seconds()) - writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), rec.Timestamp, dur, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) - nextCut = rec.Timestamp.Add(cutTime) + writer = newPartitionSectionWriter(b.logger, uint64(partition), uint64(rec.Offset), rec.Timestamp, dur, b.cfg.WAL.IngestionSlack, b.cfg.BlockConfig, b.overrides, b.wal, b.enc) init = true } @@ -346,15 +348,6 @@ outer: break outer } - if rec.Timestamp.After(nextCut) { - // Cut before appending this trace - err = writer.cutidle(rec.Timestamp.Add(-cutTime), false) - if err != nil { - return false, err - } - nextCut = rec.Timestamp.Add(cutTime) - } - err := b.pushTraces(rec.Timestamp, rec.Key, rec.Value, writer) if err != nil { return false, err @@ -378,12 +371,6 @@ outer: return false, nil } - // Cut any remaining - err = writer.cutidle(time.Time{}, true) - if err != nil { - return false, err - } - err = writer.flush(ctx, b.writer) if err != nil { return false, err diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index e98b73ed015..be463ea6748 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -54,7 +54,8 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) { store := newStore(ctx, t) cfg := blockbuilderConfig(t, address) - b := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, b)) @@ -109,7 +110,8 @@ func TestBlockbuilder_startWithCommit(t *testing.T) { admClient := kadm.NewClient(client) require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets)) - b := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, b)) @@ -159,7 +161,8 @@ func TestBlockbuilder_flushingFails(t *testing.T) { client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka) producedRecords := sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles - b := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, b)) @@ -194,7 +197,8 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) { store := newStore(ctx, t) cfg := blockbuilderConfig(t, address) - b := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, b)) @@ -283,7 +287,8 @@ func TestBlockbuilder_committingFails(t *testing.T) { client := newKafkaClient(t, cfg.IngestStorageConfig.Kafka) producedRecords := sendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond) // Send for 1 second, <1 consumption cycles - b := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(ctx, b)) @@ -525,26 +530,27 @@ func BenchmarkBlockBuilder(b *testing.B) { cfg.ConsumeCycleDuration = 1 * time.Hour - bb := New(cfg, logger, newPartitionRingReader(), o, store) + bb, err := New(cfg, logger, newPartitionRingReader(), o, store) + require.NoError(b, err) defer func() { require.NoError(b, bb.stopping(nil)) }() // Startup (without starting the background consume cycle) - err := bb.starting(ctx) + err = bb.starting(ctx) require.NoError(b, err) b.ResetTimer() for i := 0; i < b.N; i++ { - var records []*kgo.Record + // Send more data + b.StopTimer() + size := 0 for i := 0; i < 1000; i++ { - records = append(records, sendReq(b, ctx, client)...) - } - - var size int - for _, r := range records { - size += len(r.Value) + for _, r := range sendReq(b, ctx, client) { + size += len(r.Value) + } } + b.StartTimer() err = bb.consume(ctx) require.NoError(b, err) diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index 5783462e08d..92667510caf 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/wal" + "golang.org/x/sync/errgroup" ) type partitionSectionWriter interface { @@ -23,10 +24,12 @@ type partitionSectionWriter interface { type writer struct { logger log.Logger - blockCfg BlockConfig - partition, firstOffset uint64 - startSectionTime time.Time - cycleDuration time.Duration + blockCfg BlockConfig + partition uint64 + startOffset uint64 + startTime time.Time + cycleDuration time.Duration + slackDuration time.Duration overrides Overrides wal *wal.WAL @@ -36,19 +39,20 @@ type writer struct { m map[string]*tenantStore } -func newPartitionSectionWriter(logger log.Logger, partition, firstOffset uint64, startSectionTime time.Time, cycleDuration time.Duration, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { +func newPartitionSectionWriter(logger log.Logger, partition, firstOffset uint64, startTime time.Time, cycleDuration, slackDuration time.Duration, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { return &writer{ - logger: logger, - partition: partition, - firstOffset: firstOffset, - startSectionTime: startSectionTime, - cycleDuration: cycleDuration, - blockCfg: blockCfg, - overrides: overrides, - wal: wal, - enc: enc, - mtx: sync.Mutex{}, - m: make(map[string]*tenantStore), + logger: logger, + partition: partition, + startOffset: firstOffset, + startTime: startTime, + cycleDuration: cycleDuration, + slackDuration: slackDuration, + blockCfg: blockCfg, + overrides: overrides, + wal: wal, + enc: enc, + mtx: sync.Mutex{}, + m: make(map[string]*tenantStore), } } @@ -74,24 +78,28 @@ func (p *writer) pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRe return nil } -func (p *writer) cutidle(since time.Time, immediate bool) error { - for _, i := range p.m { - if err := i.CutIdle(p.startSectionTime, p.cycleDuration, since, immediate); err != nil { - return err - } - } - return nil -} - func (p *writer) flush(ctx context.Context, store tempodb.Writer) error { // TODO - Retry with backoff? + + // Flush tenants concurrently + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(flushConcurrency) + for _, i := range p.m { - level.Info(p.logger).Log("msg", "flushing tenant", "tenant", i.tenantID) - if err := i.Flush(ctx, store); err != nil { - return err - } + g.Go(func() error { + i := i + st := time.Now() + + level.Info(p.logger).Log("msg", "flushing tenant", "tenant", i.tenantID) + err := i.Flush(ctx, store) + if err != nil { + return err + } + level.Info(p.logger).Log("msg", "flushed tenant", "tenant", i.tenantID, "elapsed", time.Since(st)) + return nil + }) } - return nil + return g.Wait() } func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) { @@ -102,7 +110,7 @@ func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) { return i, nil } - i, err := newTenantStore(tenant, p.partition, p.firstOffset, p.blockCfg, p.logger, p.wal, p.enc, p.overrides) + i, err := newTenantStore(tenant, p.partition, p.startOffset, p.startTime, p.cycleDuration, p.slackDuration, p.blockCfg, p.logger, p.wal, p.enc, p.overrides) if err != nil { return nil, err } diff --git a/modules/blockbuilder/partition_writer_test.go b/modules/blockbuilder/partition_writer_test.go index d3a6dc106a7..46ab8e06e86 100644 --- a/modules/blockbuilder/partition_writer_test.go +++ b/modules/blockbuilder/partition_writer_test.go @@ -13,11 +13,16 @@ import ( ) func getPartitionWriter(t *testing.T) *writer { - logger := log.NewNopLogger() - startTime := time.Now() - cycleDuration := 1 * time.Minute - blockCfg := BlockConfig{} - tmpDir := t.TempDir() + var ( + logger = log.NewNopLogger() + blockCfg = BlockConfig{} + tmpDir = t.TempDir() + partition = uint64(1) + startOffset = uint64(1) + startTime = time.Now() + cycleDuration = time.Minute + slackDuration = time.Minute + ) w, err := wal.New(&wal.Config{ Filepath: tmpDir, Encoding: backend.EncNone, @@ -26,7 +31,7 @@ func getPartitionWriter(t *testing.T) *writer { }) require.NoError(t, err) - return newPartitionSectionWriter(logger, 1, 1, startTime, cycleDuration, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) + return newPartitionSectionWriter(logger, partition, startOffset, startTime, cycleDuration, slackDuration, blockCfg, &mockOverrides{}, w, encoding.DefaultEncoding()) } func TestPushBytes(t *testing.T) { diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index c4f9570816e..793bb030634 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -12,12 +12,9 @@ import ( "github.com/google/uuid" "github.com/grafana/tempo/modules/blockbuilder/util" "github.com/grafana/tempo/modules/overrides" - "github.com/grafana/tempo/pkg/boundedwaitgroup" "github.com/grafana/tempo/pkg/dataquality" "github.com/grafana/tempo/pkg/livetraces" - "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" - "github.com/grafana/tempo/pkg/tracesizes" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" @@ -25,7 +22,6 @@ import ( "github.com/grafana/tempo/tempodb/wal" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" ) var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec( @@ -41,92 +37,43 @@ const ( flushConcurrency = 4 ) -// TODO - This needs locking type tenantStore struct { - tenantID string - idGenerator util.IDGenerator - - cfg BlockConfig - logger log.Logger - overrides Overrides - enc encoding.VersionedEncoding - - wal *wal.WAL - - headBlockMtx sync.Mutex - headBlock common.WALBlock - - blocksMtx sync.Mutex - walBlocks []common.WALBlock + tenantID string + idGenerator util.IDGenerator + cfg BlockConfig + startTime time.Time + cycleDuration time.Duration + slackDuration time.Duration + logger log.Logger + overrides Overrides + enc encoding.VersionedEncoding + wal *wal.WAL liveTraces *livetraces.LiveTraces[[]byte] - traceSizes *tracesizes.Tracker } -func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) { +func newTenantStore(tenantID string, partitionID, startOffset uint64, startTime time.Time, cycleDuration, slackDuration time.Duration, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) { s := &tenantStore{ - tenantID: tenantID, - idGenerator: util.NewDeterministicIDGenerator(tenantID, partitionID, endTimestamp), - cfg: cfg, - logger: logger, - overrides: o, - wal: wal, - headBlockMtx: sync.Mutex{}, - blocksMtx: sync.Mutex{}, - enc: enc, - liveTraces: livetraces.New[[]byte](func(b []byte) uint64 { return uint64(len(b)) }), - traceSizes: tracesizes.New(), + tenantID: tenantID, + idGenerator: util.NewDeterministicIDGenerator(tenantID, partitionID, startOffset), + startTime: startTime, + cycleDuration: cycleDuration, + slackDuration: slackDuration, + cfg: cfg, + logger: logger, + overrides: o, + wal: wal, + enc: enc, + liveTraces: livetraces.New[[]byte](func(b []byte) uint64 { return uint64(len(b)) }), } - return s, s.resetHeadBlock() -} - -// TODO - periodically flush -func (s *tenantStore) cutHeadBlock(immediate bool) error { - s.headBlockMtx.Lock() - defer s.headBlockMtx.Unlock() - - dataLen := s.headBlock.DataLength() - - if s.headBlock == nil || dataLen == 0 { - return nil - } - - if !immediate && dataLen < s.cfg.MaxBlockBytes { - return nil - } - - s.blocksMtx.Lock() - defer s.blocksMtx.Unlock() - - if err := s.headBlock.Flush(); err != nil { - return err - } - s.walBlocks = append(s.walBlocks, s.headBlock) - s.headBlock = nil - - return s.resetHeadBlock() -} - -func (s *tenantStore) resetHeadBlock() error { - meta := &backend.BlockMeta{ - BlockID: s.idGenerator.NewID(), - TenantID: s.tenantID, - DedicatedColumns: s.overrides.DedicatedColumns(s.tenantID), - ReplicationFactor: backend.MetricsGeneratorReplicationFactor, - } - block, err := s.wal.NewBlock(meta, model.CurrentEncoding) - if err != nil { - return err - } - s.headBlock = block - return nil + return s, nil } func (s *tenantStore) AppendTrace(traceID []byte, tr []byte, ts time.Time) error { maxSz := s.overrides.MaxBytesPerTrace(s.tenantID) - if maxSz > 0 && !s.traceSizes.Allow(traceID, len(tr), maxSz) { + if !s.liveTraces.PushWithTimestampAndLimits(ts, traceID, tr, 0, uint64(maxSz)) { // Record dropped spans due to trace too large // We have to unmarhal to count the number of spans. // TODO - There might be a better way @@ -141,213 +88,245 @@ func (s *tenantStore) AppendTrace(traceID []byte, tr []byte, ts time.Time) error } } overrides.RecordDiscardedSpans(count, reasonTraceTooLarge, s.tenantID) - return nil } - s.liveTraces.PushWithTimestamp(ts, traceID, tr, 0) - return nil } -func (s *tenantStore) CutIdle(startSectionTime time.Time, cycleDuration time.Duration, since time.Time, immediate bool) error { - idle := s.liveTraces.CutIdle(since, immediate) +func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { + if s.liveTraces.Len() == 0 { + // This can happen if the tenant instance was created but + // no live traces were successfully pushed. i.e. all exceeded max trace size. + return nil + } - slices.SortFunc(idle, func(a, b *livetraces.LiveTrace[[]byte]) int { - return bytes.Compare(a.ID, b.ID) - }) + // Initial meta for creating the block + meta := backend.NewBlockMeta(s.tenantID, uuid.UUID(s.idGenerator.NewID()), s.enc.Version(), backend.EncNone, "") + meta.DedicatedColumns = s.overrides.DedicatedColumns(s.tenantID) + meta.ReplicationFactor = 1 + meta.TotalObjects = int64(s.liveTraces.Len()) var ( - unmarshalWg = sync.WaitGroup{} - unmarshalErr = atomic.NewError(nil) - unmarshaled = make([]*tempopb.Trace, len(idle)) - starts = make([]uint32, len(idle)) - ends = make([]uint32, len(idle)) + st = time.Now() + l = s.wal.LocalBackend() + reader = backend.NewReader(l) + writer = backend.NewWriter(l) + iter = newLiveTracesIter(s.liveTraces) ) - // Unmarshal and process in parallel, each goroutine handles 1/Nth - for i := 0; i < len(idle) && i < flushConcurrency; i++ { - unmarshalWg.Add(1) - go func(i int) { - defer unmarshalWg.Done() - - for j := i; j < len(idle); j += flushConcurrency { - tr := new(tempopb.Trace) - - for _, b := range idle[j].Batches { - // This unmarshal appends the batches onto the existing tempopb.Trace - // so we don't need to allocate another container temporarily - err := tr.Unmarshal(b) - if err != nil { - unmarshalErr.Store(err) - return - } - } - - // Get trace timestamp bounds - var start, end uint64 - for _, b := range tr.ResourceSpans { - for _, ss := range b.ScopeSpans { - for _, s := range ss.Spans { - if start == 0 || s.StartTimeUnixNano < start { - start = s.StartTimeUnixNano - } - if s.EndTimeUnixNano > end { - end = s.EndTimeUnixNano - } - } - } - } + level.Info(s.logger).Log( + "msg", "Flushing block", + "tenant", s.tenantID, + "blockid", meta.BlockID, + "meta", meta, + ) - // Convert from unix nanos to unix seconds - starts[j] = uint32(start / uint64(time.Second)) - ends[j] = uint32(end / uint64(time.Second)) - unmarshaled[j] = tr - } - }(i) + newMeta, err := s.enc.CreateBlock(ctx, &s.cfg.BlockCfg, meta, iter, reader, writer) + if err != nil { + return err } - unmarshalWg.Wait() - if err := unmarshalErr.Load(); err != nil { + // Update meta timestamps which couldn't be known until we unmarshaled + // all of the traces. + start, end := iter.MinMaxTimestamps() + newMeta.StartTime, newMeta.EndTime = s.adjustTimeRangeForSlack(time.Unix(0, int64(start)), time.Unix(0, int64(end))) + + newBlock, err := s.enc.OpenBlock(newMeta, reader) + if err != nil { return err } - for i, tr := range unmarshaled { - start, end := s.adjustTimeRangeForSlack(startSectionTime, cycleDuration, starts[i], ends[i]) - if err := s.headBlock.AppendTrace(idle[i].ID, tr, start, end, false); err != nil { - return err - } + if err := store.WriteBlock(ctx, NewWriteableBlock(newBlock, reader, writer)); err != nil { + return err } - // Return prealloc slices to the pool - for _, i := range idle { - tempopb.ReuseByteSlices(i.Batches) - } + metricBlockBuilderFlushedBlocks.WithLabelValues(s.tenantID).Inc() - err := s.headBlock.Flush() - if err != nil { + if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(newMeta.BlockID), s.tenantID); err != nil { return err } - // Cut head block if needed - return s.cutHeadBlock(false) + level.Info(s.logger).Log( + "msg", "Flushed block", + "tenant", s.tenantID, + "blockid", newMeta.BlockID, + "elapsed", time.Since(st), + "meta", newMeta, + ) + + return nil } -func (s *tenantStore) adjustTimeRangeForSlack(startSectionTime time.Time, cycleDuration time.Duration, start, end uint32) (uint32, uint32) { - startOfRange := uint32(startSectionTime.Add(-s.headBlock.IngestionSlack()).Unix()) - endOfRange := uint32(startSectionTime.Add(s.headBlock.IngestionSlack() + cycleDuration).Unix()) +func (s *tenantStore) adjustTimeRangeForSlack(start, end time.Time) (time.Time, time.Time) { + startOfRange := s.startTime.Add(-s.slackDuration) + endOfRange := s.startTime.Add(s.slackDuration + s.cycleDuration) warn := false - if start < startOfRange { + if start.Before(startOfRange) { warn = true - start = uint32(startSectionTime.Unix()) + start = s.startTime } - if end > endOfRange || end < start { + if end.After(endOfRange) || end.Before(start) { warn = true - end = uint32(startSectionTime.Unix()) + end = s.startTime } if warn { - dataquality.WarnBlockBuilderOutsideIngestionSlack(s.headBlock.BlockMeta().TenantID) + dataquality.WarnBlockBuilderOutsideIngestionSlack(s.tenantID) } return start, end } -func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { - // TODO - Advance some of this work if possible +type entry struct { + id common.ID + hash uint64 +} - // Cut head block - if err := s.cutHeadBlock(true); err != nil { - return err +type chEntry struct { + id common.ID + tr *tempopb.Trace + err error +} + +type liveTracesIter struct { + mtx sync.Mutex + entries []entry + liveTraces *livetraces.LiveTraces[[]byte] + ch chan []chEntry + chBuf []chEntry + cancel func() + start, end uint64 +} + +func newLiveTracesIter(liveTraces *livetraces.LiveTraces[[]byte]) *liveTracesIter { + // Get the list of all traces sorted by ID + tids := make([]entry, 0, len(liveTraces.Traces)) + for hash, t := range liveTraces.Traces { + tids = append(tids, entry{t.ID, hash}) } - s.blocksMtx.Lock() - defer s.blocksMtx.Unlock() + slices.SortFunc(tids, func(a, b entry) int { + return bytes.Compare(a.id, b.id) + }) - var ( - completeBlocks = make([]tempodb.WriteableBlock, len(s.walBlocks)) - jobErr = atomic.NewError(nil) - wg = boundedwaitgroup.New(flushConcurrency) - ) + ctx, cancel := context.WithCancel(context.Background()) - // Convert WALs to backend blocks - for i, block := range s.walBlocks { - wg.Add(1) - go func(i int, block common.WALBlock) { - defer wg.Done() + l := &liveTracesIter{ + entries: tids, + liveTraces: liveTraces, + ch: make(chan []chEntry, 1), + cancel: cancel, + } - completeBlock, err := s.buildWriteableBlock(ctx, block) - if err != nil { - jobErr.Store(err) - return - } + go l.iter(ctx) - err = block.Clear() - if err != nil { - jobErr.Store(err) - return - } + return l +} - completeBlocks[i] = completeBlock - }(i, block) +func (i *liveTracesIter) Next(ctx context.Context) (common.ID, *tempopb.Trace, error) { + if len(i.chBuf) == 0 { + select { + case entries, ok := <-i.ch: + if !ok { + return nil, nil, nil + } + i.chBuf = entries + case <-ctx.Done(): + return nil, nil, ctx.Err() + } } - wg.Wait() - if err := jobErr.Load(); err != nil { - return err + // Pop next entry + if len(i.chBuf) > 0 { + entry := i.chBuf[0] + i.chBuf = i.chBuf[1:] + return entry.id, entry.tr, entry.err } - // Write all blocks to the store - level.Info(s.logger).Log("msg", "writing blocks to storage", "num_blocks", len(completeBlocks)) - for _, block := range completeBlocks { - wg.Add(1) - go func(block tempodb.WriteableBlock) { - defer wg.Done() - level.Info(s.logger).Log("msg", "writing block to storage", "block_id", block.BlockMeta().BlockID.String()) - if err := store.WriteBlock(ctx, block); err != nil { - jobErr.Store(err) - return - } + // Channel is open but buffer is empty? + return nil, nil, nil +} - metricBlockBuilderFlushedBlocks.WithLabelValues(s.tenantID).Inc() +func (i *liveTracesIter) iter(ctx context.Context) { + i.mtx.Lock() + defer i.mtx.Unlock() + defer close(i.ch) - if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil { - jobErr.Store(err) - } - }(block) + // Get the list of all traces sorted by ID + tids := make([]entry, 0, len(i.liveTraces.Traces)) + for hash, t := range i.liveTraces.Traces { + tids = append(tids, entry{t.ID, hash}) } + slices.SortFunc(tids, func(a, b entry) int { + return bytes.Compare(a.id, b.id) + }) - wg.Wait() - if err := jobErr.Load(); err != nil { - return err - } + // Begin sending to channel in chunks to reduce channel overhead. + seq := slices.Chunk(i.entries, 10) + for entries := range seq { + output := make([]chEntry, 0, len(entries)) - // Clear the blocks - s.walBlocks = s.walBlocks[:0] + for _, e := range entries { - return nil -} + entry := i.liveTraces.Traces[e.hash] -func (s *tenantStore) buildWriteableBlock(ctx context.Context, b common.WALBlock) (tempodb.WriteableBlock, error) { - level.Debug(s.logger).Log("msg", "building writeable block", "block_id", b.BlockMeta().BlockID.String()) + tr := new(tempopb.Trace) - iter, err := b.Iterator() - if err != nil { - return nil, err - } - defer iter.Close() + for _, b := range entry.Batches { + // This unmarshal appends the batches onto the existing tempopb.Trace + // so we don't need to allocate another container temporarily + err := tr.Unmarshal(b) + if err != nil { + i.ch <- []chEntry{{err: err}} + return + } + } + + // Update block timestamp bounds + for _, b := range tr.ResourceSpans { + for _, ss := range b.ScopeSpans { + for _, s := range ss.Spans { + if i.start == 0 || s.StartTimeUnixNano < i.start { + i.start = s.StartTimeUnixNano + } + if s.EndTimeUnixNano > i.end { + i.end = s.EndTimeUnixNano + } + } + } + } - reader, writer := backend.NewReader(s.wal.LocalBackend()), backend.NewWriter(s.wal.LocalBackend()) + tempopb.ReuseByteSlices(entry.Batches) + delete(i.liveTraces.Traces, e.hash) - newMeta, err := s.enc.CreateBlock(ctx, &s.cfg.BlockCfg, b.BlockMeta(), iter, reader, writer) - if err != nil { - return nil, err - } + output = append(output, chEntry{ + id: entry.ID, + tr: tr, + err: nil, + }) + } - newBlock, err := s.enc.OpenBlock(newMeta, reader) - if err != nil { - return nil, err + select { + case i.ch <- output: + case <-ctx.Done(): + return + } } +} + +// MinMaxTimestamps returns the earliest start, and latest end span timestamps, +// which can't be known until all contents are unmarshaled. The iterated must +// be exhausted before this can be accessed. +func (i *liveTracesIter) MinMaxTimestamps() (uint64, uint64) { + i.mtx.Lock() + defer i.mtx.Unlock() - return NewWriteableBlock(newBlock, reader, writer), nil + return i.start, i.end } + +func (i *liveTracesIter) Close() { + i.cancel() +} + +var _ common.Iterator = (*liveTracesIter)(nil) diff --git a/modules/blockbuilder/tenant_store_test.go b/modules/blockbuilder/tenant_store_test.go index 4a7dbb5a3f1..f9635feee7e 100644 --- a/modules/blockbuilder/tenant_store_test.go +++ b/modules/blockbuilder/tenant_store_test.go @@ -8,14 +8,18 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/wal" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func getTenantStore(t *testing.T) (*tenantStore, error) { - logger := log.NewNopLogger() - blockCfg := BlockConfig{} - tmpDir := t.TempDir() +func getTenantStore(t *testing.T, startTime time.Time, cycleDuration, slackDuration time.Duration) (*tenantStore, error) { + var ( + logger = log.NewNopLogger() + blockCfg = BlockConfig{} + tmpDir = t.TempDir() + partition = uint64(1) + startOffset = uint64(1) + ) + w, err := wal.New(&wal.Config{ Filepath: tmpDir, Encoding: backend.EncNone, @@ -23,58 +27,61 @@ func getTenantStore(t *testing.T) (*tenantStore, error) { Version: encoding.DefaultEncoding().Version(), }) require.NoError(t, err) - return newTenantStore("test-tenant", 1, 1, blockCfg, logger, w, encoding.DefaultEncoding(), &mockOverrides{}) + return newTenantStore("test-tenant", partition, startOffset, startTime, cycleDuration, slackDuration, blockCfg, logger, w, encoding.DefaultEncoding(), &mockOverrides{}) } func TestAdjustTimeRangeForSlack(t *testing.T) { - store, err := getTenantStore(t) - require.NoError(t, err) + var ( + startCycleTime = time.Now() + cycleDuration = time.Minute + slackDuration = 3 * time.Minute + ) - startCycleTime := time.Now() - cycleDuration := 1 * time.Minute + store, err := getTenantStore(t, startCycleTime, cycleDuration, slackDuration) + require.NoError(t, err) tests := []struct { name string - start uint32 - end uint32 - expectedStart uint32 - expectedEnd uint32 + start time.Time + end time.Time + expectedStart time.Time + expectedEnd time.Time }{ { name: "within slack range", - start: uint32(startCycleTime.Add(-2 * time.Minute).Unix()), - end: uint32(startCycleTime.Add(2 * time.Minute).Unix()), - expectedStart: uint32(startCycleTime.Add(-2 * time.Minute).Unix()), - expectedEnd: uint32(startCycleTime.Add(2 * time.Minute).Unix()), + start: startCycleTime.Add(-2 * time.Minute), + end: startCycleTime.Add(2 * time.Minute), + expectedStart: startCycleTime.Add(-2 * time.Minute), + expectedEnd: startCycleTime.Add(2 * time.Minute), }, { name: "start before slack range", - start: uint32(startCycleTime.Add(-10 * time.Minute).Unix()), - end: uint32(startCycleTime.Add(2 * time.Minute).Unix()), - expectedStart: uint32(startCycleTime.Unix()), - expectedEnd: uint32(startCycleTime.Add(2 * time.Minute).Unix()), + start: startCycleTime.Add(-10 * time.Minute), + end: startCycleTime.Add(2 * time.Minute), + expectedStart: startCycleTime, + expectedEnd: startCycleTime.Add(2 * time.Minute), }, { name: "end after slack range", - start: uint32(startCycleTime.Add(-2 * time.Minute).Unix()), - end: uint32(startCycleTime.Add(20 * time.Minute).Unix()), - expectedStart: uint32(startCycleTime.Add(-2 * time.Minute).Unix()), - expectedEnd: uint32(startCycleTime.Unix()), + start: startCycleTime.Add(-2 * time.Minute), + end: startCycleTime.Add(20 * time.Minute), + expectedStart: startCycleTime.Add(-2 * time.Minute), + expectedEnd: startCycleTime, }, { name: "end before start", - start: uint32(startCycleTime.Add(-2 * time.Minute).Unix()), - end: uint32(startCycleTime.Add(-3 * time.Minute).Unix()), - expectedStart: uint32(startCycleTime.Add(-2 * time.Minute).Unix()), - expectedEnd: uint32(startCycleTime.Unix()), + start: startCycleTime.Add(-2 * time.Minute), + end: startCycleTime.Add(-3 * time.Minute), + expectedStart: startCycleTime.Add(-2 * time.Minute), + expectedEnd: startCycleTime, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - start, end := store.adjustTimeRangeForSlack(startCycleTime, cycleDuration, tt.start, tt.end) - assert.Equal(t, tt.expectedStart, start) - assert.Equal(t, tt.expectedEnd, end) + start, end := store.adjustTimeRangeForSlack(tt.start, tt.end) + require.Equal(t, tt.expectedStart, start) + require.Equal(t, tt.expectedEnd, end) }) } } diff --git a/pkg/livetraces/livetraces.go b/pkg/livetraces/livetraces.go index 425f70d6285..7d7759f8a6a 100644 --- a/pkg/livetraces/livetraces.go +++ b/pkg/livetraces/livetraces.go @@ -51,10 +51,10 @@ func (l *LiveTraces[T]) Size() uint64 { } func (l *LiveTraces[T]) Push(traceID []byte, batch T, max uint64) bool { - return l.PushWithTimestamp(time.Now(), traceID, batch, max) + return l.PushWithTimestampAndLimits(time.Now(), traceID, batch, max, 0) } -func (l *LiveTraces[T]) PushWithTimestamp(ts time.Time, traceID []byte, batch T, max uint64) bool { +func (l *LiveTraces[T]) PushWithTimestampAndLimits(ts time.Time, traceID []byte, batch T, maxLiveTraces, maxTraceSize uint64) (exceededLimits bool) { token := l.token(traceID) tr := l.Traces[token] @@ -62,7 +62,7 @@ func (l *LiveTraces[T]) PushWithTimestamp(ts time.Time, traceID []byte, batch T, // Before adding this check against max // Zero means no limit - if max > 0 && uint64(len(l.Traces)) >= max { + if maxLiveTraces > 0 && uint64(len(l.Traces)) >= maxLiveTraces { return false } @@ -73,6 +73,12 @@ func (l *LiveTraces[T]) PushWithTimestamp(ts time.Time, traceID []byte, batch T, } sz := l.szFunc(batch) + + // Before adding check against max trace size + if maxTraceSize > 0 && (tr.sz+sz > maxTraceSize) { + return false + } + tr.sz += sz l.sz += sz diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go index aacb3072fe6..ede15e667fb 100644 --- a/tempodb/backend/gcs/gcs.go +++ b/tempodb/backend/gcs/gcs.go @@ -23,16 +23,20 @@ import ( "cloud.google.com/go/storage" "github.com/cristalhq/hedgedhttp" + gkLog "github.com/go-kit/log" + "github.com/go-kit/log/level" "google.golang.org/api/iterator" "google.golang.org/api/option" google_http "google.golang.org/api/transport/http" "github.com/grafana/tempo/pkg/blockboundary" tempo_io "github.com/grafana/tempo/pkg/io" + "github.com/grafana/tempo/pkg/util/log" "github.com/grafana/tempo/tempodb/backend" ) type readerWriter struct { + logger gkLog.Logger cfg *Config bucket *storage.BucketHandle hedgedBucket *storage.BucketHandle @@ -100,6 +104,7 @@ func internalNew(cfg *Config, confirm bool) (*readerWriter, error) { } rw := &readerWriter{ + logger: log.Logger, cfg: cfg, bucket: bucket, hedgedBucket: hedgedBucket, @@ -118,14 +123,21 @@ func (rw *readerWriter) Write(ctx context.Context, name string, keypath backend. w := rw.writer(derivedCtx, backend.ObjectFileName(keypath, name), nil) - _, err := io.Copy(w, data) + written, err := io.Copy(w, data) if err != nil { w.Close() span.RecordError(err) return fmt.Errorf("failed to write: %w", err) } - return w.Close() + err = w.Close() + if err != nil { + return fmt.Errorf("failed to close: %w", err) + } + + level.Debug(rw.logger).Log("msg", "object uploaded to gcs", "objectName", backend.ObjectFileName(keypath, name), "size", written) + + return nil } // Append implements backend.Writer diff --git a/tempodb/encoding/common/config.go b/tempodb/encoding/common/config.go index 85a3ef9efe4..884f90a8120 100644 --- a/tempodb/encoding/common/config.go +++ b/tempodb/encoding/common/config.go @@ -57,7 +57,7 @@ func ValidateConfig(b *BlockConfig) error { return fmt.Errorf("positive index page size required") } - if b.BloomFP <= 0.0 { + if b.BloomFP <= 0.0 || b.BloomFP >= 1.0 { return fmt.Errorf("invalid bloom filter fp rate %v", b.BloomFP) } diff --git a/tempodb/encoding/vparquet4/create.go b/tempodb/encoding/vparquet4/create.go index 0067fc11ddc..83edfd9232a 100644 --- a/tempodb/encoding/vparquet4/create.go +++ b/tempodb/encoding/vparquet4/create.go @@ -6,8 +6,10 @@ import ( "errors" "fmt" "io" + "runtime" "github.com/google/uuid" + "github.com/grafana/tempo/pkg/dataquality" tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" @@ -37,49 +39,78 @@ func (b *backendWriter) Close() error { func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.BlockMeta, i common.Iterator, r backend.Reader, to backend.Writer) (*backend.BlockMeta, error) { s := newStreamingBlock(ctx, cfg, meta, r, to, tempo_io.NewBufferedWriter) - var next func(context.Context) (common.ID, parquet.Row, error) + var next func(context.Context) error if ii, ok := i.(*commonIterator); ok { - // Use interal iterator and avoid translation to/from proto - next = ii.NextRow + next = func(ctx context.Context) error { + // Use interal iterator and avoid translation to/from proto + id, row, err := ii.NextRow(ctx) + if err != nil { + return err + } + if row == nil { + return io.EOF + } + err = s.AddRaw(id, row, 0, 0) // start and end time of the wal meta are used. + if err != nil { + return err + } + + completeBlockRowPool.Put(row) + return nil + } } else { // Need to convert from proto->parquet obj - trp := &Trace{} - sch := parquet.SchemaOf(trp) - next = func(context.Context) (common.ID, parquet.Row, error) { + var ( + buffer = &Trace{} + connected bool + resMapping = dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource) + spanMapping = dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan) + ) + next = func(context.Context) error { id, tr, err := i.Next(ctx) - if errors.Is(err, io.EOF) || tr == nil { - return id, nil, err + if err != nil { + return err + } + if tr == nil { + return io.EOF } // Copy ID to allow it to escape the iterator. id = append([]byte(nil), id...) - trp, _ = traceToParquet(meta, id, tr, trp) // this logic only executes when we are transitioning from one block version to another. just ignore connected here + buffer, connected = traceToParquetWithMapping(id, tr, buffer, resMapping, spanMapping) + if !connected { + dataquality.WarnDisconnectedTrace(meta.TenantID, dataquality.PhaseTraceWalToComplete) + } + if buffer.RootSpanName == "" { + dataquality.WarnRootlessTrace(meta.TenantID, dataquality.PhaseTraceWalToComplete) + } - row := sch.Deconstruct(completeBlockRowPool.Get(), trp) + err = s.Add(buffer, 0, 0) // start and end time are set outside + if err != nil { + return err + } - return id, row, nil + return nil } } for { - id, row, err := next(ctx) - if errors.Is(err, io.EOF) || row == nil { + err := next(ctx) + if errors.Is(err, io.EOF) { break } - - err = s.AddRaw(id, row, 0, 0) // start and end time of the wal meta are used. if err != nil { return nil, err } - completeBlockRowPool.Put(row) if s.EstimatedBufferedBytes() > cfg.RowGroupSizeBytes { _, err = s.Flush() if err != nil { return nil, err } + runtime.GC() } }