From ba6228f520668dca8ad1e6dff690dd470d7446c1 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Wed, 25 Dec 2024 00:26:39 -0500 Subject: [PATCH] [v2][query] Implement helper to buffer sequence of traces (#6401) ## Which problem is this PR solving? - Towards #6337 ## Description of the changes - This PR implements a helper `AggregateTraces` in the `jptrace` package to aggregate a sequence of `[]ptrace.Traces` to `ptrace.Traces`. This was done by combining contiguous trace chunks together based on the traceID. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab --- internal/jptrace/aggregator.go | 55 +++++++++++ internal/jptrace/aggregator_test.go | 136 ++++++++++++++++++++++++++++ storage_v2/tracestore/reader.go | 1 + 3 files changed, 192 insertions(+) create mode 100644 internal/jptrace/aggregator.go create mode 100644 internal/jptrace/aggregator_test.go diff --git a/internal/jptrace/aggregator.go b/internal/jptrace/aggregator.go new file mode 100644 index 00000000000..fa8cde63646 --- /dev/null +++ b/internal/jptrace/aggregator.go @@ -0,0 +1,55 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + "iter" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// AggregateTraces aggregates a sequence of trace batches into individual traces. +// +// The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces. +func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptrace.Traces, error] { + return func(yield func(trace ptrace.Traces, err error) bool) { + currentTrace := ptrace.NewTraces() + currentTraceID := pcommon.NewTraceIDEmpty() + + tracesSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + yield(ptrace.NewTraces(), err) + return false + } + for _, trace := range traces { + resources := trace.ResourceSpans() + traceID := resources.At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + if currentTraceID == traceID { + mergeTraces(trace, currentTrace) + } else { + if currentTrace.ResourceSpans().Len() > 0 { + if !yield(currentTrace, nil) { + return false + } + } + currentTrace = trace + currentTraceID = traceID + } + } + return true + }) + if currentTrace.ResourceSpans().Len() > 0 { + yield(currentTrace, nil) + } + } +} + +func mergeTraces(src, dest ptrace.Traces) { + resources := src.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + resource := resources.At(i) + resource.CopyTo(dest.ResourceSpans().AppendEmpty()) + } +} diff --git a/internal/jptrace/aggregator_test.go b/internal/jptrace/aggregator_test.go new file mode 100644 index 00000000000..abeeaaa0845 --- /dev/null +++ b/internal/jptrace/aggregator_test.go @@ -0,0 +1,136 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestAggregateTraces_AggregatesSpansWithSameTraceID(t *testing.T) { + trace1 := ptrace.NewTraces() + resource1 := trace1.ResourceSpans().AppendEmpty() + scope1 := resource1.ScopeSpans().AppendEmpty() + span1 := scope1.Spans().AppendEmpty() + span1.SetTraceID(pcommon.TraceID([16]byte{1})) + span1.SetName("span1") + + trace1Continued := ptrace.NewTraces() + resource2 := trace1Continued.ResourceSpans().AppendEmpty() + scope2 := resource2.ScopeSpans().AppendEmpty() + span2 := scope2.Spans().AppendEmpty() + span2.SetTraceID(pcommon.TraceID([16]byte{1})) + span2.SetName("span2") + + trace2 := ptrace.NewTraces() + resource3 := trace2.ResourceSpans().AppendEmpty() + scope3 := resource3.ScopeSpans().AppendEmpty() + span3 := scope3.Spans().AppendEmpty() + span3.SetTraceID(pcommon.TraceID([16]byte{2})) + span3.SetName("span3") + + trace3 := ptrace.NewTraces() + resource4 := trace3.ResourceSpans().AppendEmpty() + scope4 := resource4.ScopeSpans().AppendEmpty() + span4 := scope4.Spans().AppendEmpty() + span4.SetTraceID(pcommon.TraceID([16]byte{3})) + span4.SetName("span4") + + tracesSeq := func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace1, trace1Continued, trace2}, nil) + yield([]ptrace.Traces{trace3}, nil) + } + + var result []ptrace.Traces + AggregateTraces(tracesSeq)(func(trace ptrace.Traces, _ error) bool { + result = append(result, trace) + return true + }) + + require.Len(t, result, 3) + + require.Equal(t, 2, result[0].ResourceSpans().Len()) + require.Equal(t, 1, result[1].ResourceSpans().Len()) + require.Equal(t, 1, result[2].ResourceSpans().Len()) + + gotSpan1 := result[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan1.TraceID(), pcommon.TraceID([16]byte{1})) + require.Equal(t, "span1", gotSpan1.Name()) + + gotSpan2 := result[0].ResourceSpans().At(1).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan2.TraceID(), pcommon.TraceID([16]byte{1})) + require.Equal(t, "span2", gotSpan2.Name()) + + gotSpan3 := result[1].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan3.TraceID(), pcommon.TraceID([16]byte{2})) + require.Equal(t, "span3", gotSpan3.Name()) + + gotSpan4 := result[2].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan4.TraceID(), pcommon.TraceID([16]byte{3})) + require.Equal(t, "span4", gotSpan4.Name()) +} + +func TestAggregateTraces_YieldsErrorFromTracesSeq(t *testing.T) { + trace1 := ptrace.NewTraces() + resource1 := trace1.ResourceSpans().AppendEmpty() + scope1 := resource1.ScopeSpans().AppendEmpty() + span1 := scope1.Spans().AppendEmpty() + span1.SetTraceID(pcommon.TraceID([16]byte{1})) + span1.SetName("span1") + + tracesSeq := func(yield func([]ptrace.Traces, error) bool) { + if !yield(nil, assert.AnError) { + return + } + yield([]ptrace.Traces{trace1}, nil) // should not get here + } + aggregatedSeq := AggregateTraces(tracesSeq) + + var lastResult ptrace.Traces + var lastErr error + aggregatedSeq(func(trace ptrace.Traces, e error) bool { + lastResult = trace + if e != nil { + lastErr = e + } + return true + }) + + require.ErrorIs(t, lastErr, assert.AnError) + require.Equal(t, ptrace.NewTraces(), lastResult) +} + +func TestAggregateTraces_RespectsEarlyReturn(t *testing.T) { + trace1 := ptrace.NewTraces() + resource1 := trace1.ResourceSpans().AppendEmpty() + scope1 := resource1.ScopeSpans().AppendEmpty() + span1 := scope1.Spans().AppendEmpty() + span1.SetTraceID(pcommon.TraceID([16]byte{1})) + span1.SetName("span1") + + trace2 := ptrace.NewTraces() + resource2 := trace2.ResourceSpans().AppendEmpty() + scope2 := resource2.ScopeSpans().AppendEmpty() + span2 := scope2.Spans().AppendEmpty() + span2.SetTraceID(pcommon.TraceID([16]byte{2})) + span2.SetName("span2") + + tracesSeq := func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace1}, nil) + yield([]ptrace.Traces{trace2}, nil) + } + aggregatedSeq := AggregateTraces(tracesSeq) + + var lastResult ptrace.Traces + aggregatedSeq(func(trace ptrace.Traces, _ error) bool { + lastResult = trace + return false + }) + + require.Equal(t, trace1, lastResult) +} diff --git a/storage_v2/tracestore/reader.go b/storage_v2/tracestore/reader.go index 5426ff8e8d8..5eef0ed8aca 100644 --- a/storage_v2/tracestore/reader.go +++ b/storage_v2/tracestore/reader.go @@ -22,6 +22,7 @@ type Reader interface { // Chunking requirements: // - A single ptrace.Traces chunk MUST NOT contain spans from multiple traces. // - Large traces MAY be split across multiple, *consecutive* ptrace.Traces chunks. + // - Each returned ptrace.Traces object MUST NOT be empty. // // Edge cases: // - If no spans are found for any given trace ID, the ID is ignored.