diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 3b556b7dd7f99..0bd8bf2895f05 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -56,7 +56,6 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/queue" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" @@ -206,10 +205,16 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk log.With(g.logger, "tenant", tenantID), "bloomgateway.FilterChunkRefs", ) - defer sp.Finish() + + stats, ctx := ContextWithEmptyStats(ctx) + defer func() { + level.Info(sp).Log(stats.KVArgs()...) + sp.Finish() + }() // start time == end time --> empty response if req.From.Equal(req.Through) { + stats.Status = labelSuccess return &logproto.FilterChunkRefResponse{ ChunkRefs: []*logproto.GroupedChunkRefs{}, }, nil @@ -217,23 +222,28 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // start time > end time --> error response if req.Through.Before(req.From) { + stats.Status = labelFailure return nil, errors.New("from time must not be after through time") } filters := v1.ExtractTestableLineFilters(req.Plan.AST) + stats.NumFilters = len(filters) g.metrics.receivedFilters.Observe(float64(len(filters))) // Shortcut if request does not contain filters if len(filters) == 0 { + stats.Status = labelSuccess return &logproto.FilterChunkRefResponse{ ChunkRefs: req.Refs, }, nil } seriesByDay := partitionRequest(req) + stats.NumTasks = len(seriesByDay) // no tasks --> empty response if len(seriesByDay) == 0 { + stats.Status = labelSuccess return &logproto.FilterChunkRefResponse{ ChunkRefs: []*logproto.GroupedChunkRefs{}, }, nil @@ -255,15 +265,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // TODO(owen-d): include capacity in constructor? task.responses = responsesPool.Get(len(seriesForDay.series)) - - level.Debug(sp).Log( - "msg", "created task for day", - "task", task.ID, - "day", seriesForDay.day, - "interval", seriesForDay.interval.String(), - "nSeries", len(seriesForDay.series), - "filters", JoinFunc(filters, ";", func(e syntax.LineFilterExpr) string { return e.String() }), - ) tasks = append(tasks, task) } @@ -285,13 +286,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk // When enqueuing, we also add the task to the pending tasks _ = g.pendingTasks.Inc() }); err != nil { + stats.Status = labelFailure return nil, errors.Wrap(err, "failed to enqueue task") } // TODO(owen-d): use `concurrency` lib, bound parallelism go g.consumeTask(ctx, task, tasksCh) } - sp.Log("enqueue_duration", time.Since(queueStart).String()) + sp.Log("msg", "enqueued tasks", "duration", time.Since(queueStart).String()) remaining := len(tasks) @@ -305,10 +307,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk for remaining > 0 { select { case <-ctx.Done(): + stats.Status = "cancel" return nil, errors.Wrap(ctx.Err(), "request failed") case task := <-tasksCh: level.Info(sp).Log("msg", "task done", "task", task.ID, "err", task.Err()) if task.Err() != nil { + stats.Status = labelFailure return nil, errors.Wrap(task.Err(), "request failed") } responses = append(responses, task.responses) @@ -318,7 +322,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk sp.Log("msg", "received all responses") + start := time.Now() filtered := filterChunkRefs(req, responses) + duration := time.Since(start) + stats.AddPostProcessingTime(duration) // free up the responses for _, resp := range responses { @@ -335,13 +342,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk g.metrics.requestedChunks.Observe(float64(preFilterChunks)) g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks)) - level.Info(sp).Log( - "msg", "return filtered chunk refs", - "requested_series", preFilterSeries, - "filtered_series", preFilterSeries-postFilterSeries, - "requested_chunks", preFilterChunks, - "filtered_chunks", preFilterChunks-postFilterChunks, - ) + stats.Status = "success" + stats.SeriesRequested = preFilterSeries + stats.SeriesFiltered = preFilterSeries - postFilterSeries + stats.ChunksRequested = preFilterChunks + stats.ChunksFiltered = preFilterChunks - postFilterChunks + + sp.Log("msg", "return filtered chunk refs") + return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil } diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index dc58d0a0664ca..74cb19d06911b 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -45,7 +45,7 @@ func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.M "msg", "process tasks with bounds", "tenant", tenant, "tasks", len(tasks), - "bounds", JoinFunc(bounds, ",", func(e v1.FingerprintBounds) string { return e.String() }), + "bounds", len(bounds), ) for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) { @@ -73,23 +73,30 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config. Interval: interval, Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max), } + + start := time.Now() metas, err := p.store.FetchMetas(ctx, metaSearch) + duration := time.Since(start) + level.Debug(p.logger).Log("msg", "fetched metas", "count", len(metas), "duration", duration, "err", err) + + for _, t := range tasks { + FromContext(t.ctx).AddMetasFetchTime(duration) + } + if err != nil { return err } blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces) - level.Info(p.logger).Log("msg", "blocks for metas", "num_metas", len(metas), "num_blocks", len(blocksRefs)) - return p.processBlocks(ctx, partitionTasks(tasks, blocksRefs)) -} -func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) error { + data := partitionTasks(tasks, blocksRefs) + refs := make([]bloomshipper.BlockRef, 0, len(data)) for _, block := range data { refs = append(refs, block.ref) } - start := time.Now() + start = time.Now() bqs, err := p.store.FetchBlocks( ctx, refs, @@ -101,12 +108,21 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er // the underlying bloom []byte outside of iteration bloomshipper.WithPool(true), ) - level.Debug(p.logger).Log("msg", "fetch blocks", "count", len(bqs), "duration", time.Since(start), "err", err) + duration = time.Since(start) + level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err) + + for _, t := range tasks { + FromContext(t.ctx).AddBlocksFetchTime(duration) + } if err != nil { return err } + return p.processBlocks(ctx, bqs, data) +} + +func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error { defer func() { for i := range bqs { if bqs[i] == nil { @@ -124,13 +140,6 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er } block := data[i] - level.Debug(p.logger).Log( - "msg", "process block with tasks", - "job", i+1, - "of_jobs", len(bqs), - "block", block.ref, - "num_tasks", len(block.tasks), - ) if !block.ref.Bounds.Equal(bq.Bounds) { return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds) @@ -178,10 +187,16 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie start := time.Now() err = fq.Run() + duration := time.Since(start) + if err != nil { - p.metrics.blockQueryLatency.WithLabelValues(p.id, labelFailure).Observe(time.Since(start).Seconds()) + p.metrics.blockQueryLatency.WithLabelValues(p.id, labelFailure).Observe(duration.Seconds()) } else { - p.metrics.blockQueryLatency.WithLabelValues(p.id, labelSuccess).Observe(time.Since(start).Seconds()) + p.metrics.blockQueryLatency.WithLabelValues(p.id, labelSuccess).Observe(duration.Seconds()) + } + + for _, task := range tasks { + FromContext(task.ctx).AddProcessingTime(duration) } return err diff --git a/pkg/bloomgateway/stats.go b/pkg/bloomgateway/stats.go new file mode 100644 index 0000000000000..308657a944214 --- /dev/null +++ b/pkg/bloomgateway/stats.go @@ -0,0 +1,90 @@ +package bloomgateway + +import ( + "context" + "time" + + "go.uber.org/atomic" +) + +type Stats struct { + Status string + NumTasks, NumFilters int + ChunksRequested, ChunksFiltered, SeriesRequested, SeriesFiltered int + QueueTime, MetasFetchTime, BlocksFetchTime, ProcessingTime, PostProcessingTime atomic.Duration +} + +type statsKey int + +var ctxKey = statsKey(0) + +// ContextWithEmptyStats returns a context with empty stats. +func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) { + stats := &Stats{Status: "unknown"} + ctx = context.WithValue(ctx, ctxKey, stats) + return stats, ctx +} + +// FromContext gets the Stats out of the Context. Returns nil if stats have not +// been initialised in the context. +func FromContext(ctx context.Context) *Stats { + o := ctx.Value(ctxKey) + if o == nil { + return nil + } + return o.(*Stats) +} + +func (s *Stats) KVArgs() []any { + if s == nil { + return []any{} + } + return []any{ + "status", s.Status, + "tasks", s.NumTasks, + "series_requested", s.SeriesRequested, + "series_filtered", s.SeriesFiltered, + "chunks_requested", s.ChunksRequested, + "chunks_filtered", s.ChunksFiltered, + "queue_time", s.QueueTime.Load(), + "metas_fetch_time", s.MetasFetchTime.Load(), + "blocks_fetch_time", s.BlocksFetchTime.Load(), + "processing_time", s.ProcessingTime.Load(), + "post_processing_time", s.PostProcessingTime.Load(), + } +} + +func (s *Stats) AddQueueTime(t time.Duration) { + if s == nil { + return + } + s.QueueTime.Add(t) +} + +func (s *Stats) AddMetasFetchTime(t time.Duration) { + if s == nil { + return + } + s.MetasFetchTime.Add(t) +} + +func (s *Stats) AddBlocksFetchTime(t time.Duration) { + if s == nil { + return + } + s.BlocksFetchTime.Add(t) +} + +func (s *Stats) AddProcessingTime(t time.Duration) { + if s == nil { + return + } + s.ProcessingTime.Add(t) +} + +func (s *Stats) AddPostProcessingTime(t time.Duration) { + if s == nil { + return + } + s.PostProcessingTime.Add(t) +} diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 52de8155d7783..eadbd2fa33c91 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -103,6 +103,7 @@ func (w *worker) running(_ context.Context) error { level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID) _ = w.pending.Dec() w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds()) + FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime)) tasks = append(tasks, task) first, last := getFirstLast(task.series)