Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: yunmaoQu <2643354262@qq.com>
  • Loading branch information
yunmaoQu committed Jan 30, 2025
1 parent 05dc78f commit 3c7ef35
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 133 deletions.
123 changes: 73 additions & 50 deletions cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,45 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var beamInitOnce sync.Once

type dependencyAggregator struct {
config *Config
telset component.TelemetrySettings
dependencyWriter *memory.Store
traces map[model.TraceID]*TraceState
dependencyWriter spanstore.Writer
traces map[pcommon.TraceID]*TraceState
tracesLock sync.RWMutex
closeChan chan struct{}
beamPipeline *beam.Pipeline
beamScope beam.Scope
}

type TraceState struct {
spans []*model.Span
spanMap map[model.SpanID]*model.Span
spans []*ptrace.Span
spanMap map[pcommon.SpanID]*ptrace.Span
lastUpdateTime time.Time
timer *time.Timer
serviceName string
}

func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator {
beam.Init()
func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator {
beamInitOnce.Do(func() {
beam.Init()
})
p, s := beam.NewPipelineWithRoot()
return &dependencyAggregator{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
traces: make(map[model.TraceID]*TraceState),
traces: make(map[pcommon.TraceID]*TraceState),
beamPipeline: p,
beamScope: s,
}
Expand All @@ -56,9 +63,9 @@ func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
for {
select {
case <-ticker.C:
agg.processTraces(context.Background()) // Pass context
agg.processTraces(context.Background())
case <-agg.closeChan:
agg.processTraces(context.Background()) // Pass context
agg.processTraces(context.Background())
return
}
}
Expand All @@ -76,29 +83,34 @@ func (agg *dependencyAggregator) Close() error {
return nil
}

func (agg *dependencyAggregator) HandleSpan(span *model.Span) {
func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) {
agg.tracesLock.Lock()
defer agg.tracesLock.Unlock()

traceState, ok := agg.traces[span.TraceID]
traceID := span.TraceID()
spanID := span.SpanID()

traceState, ok := agg.traces[traceID]
if !ok {
traceState = &TraceState{
spans: []*model.Span{},
spanMap: make(map[model.SpanID]*model.Span),
spans: []*ptrace.Span{},
spanMap: make(map[pcommon.SpanID]*ptrace.Span),
lastUpdateTime: time.Now(),
serviceName: serviceName,
}
agg.traces[span.TraceID] = traceState
agg.traces[traceID] = traceState
}

traceState.spans = append(traceState.spans, span)
traceState.spanMap[span.SpanID] = span
spanCopy := span
traceState.spans = append(traceState.spans, &spanCopy)
traceState.spanMap[spanID] = &spanCopy
traceState.lastUpdateTime = time.Now()

if traceState.timer != nil {
traceState.timer.Stop()
}
traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() {
agg.processTraces(context.Background()) // Pass context
agg.processTraces(ctx)
})
}

Expand All @@ -109,8 +121,9 @@ func (agg *dependencyAggregator) processTraces(ctx context.Context) {
return
}
traces := agg.traces
agg.traces = make(map[model.TraceID]*TraceState)
agg.traces = make(map[pcommon.TraceID]*TraceState)
agg.tracesLock.Unlock()

for _, traceState := range traces {
if traceState.timer != nil {
traceState.timer.Stop()
Expand All @@ -123,8 +136,8 @@ func (agg *dependencyAggregator) processTraces(ctx context.Context) {
}
}

func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection {
var allSpans []*model.Span
func (agg *dependencyAggregator) createBeamInput(traces map[pcommon.TraceID]*TraceState) beam.PCollection {
var allSpans []*ptrace.Span
for _, traceState := range traces {
allSpans = append(allSpans, traceState.spans...)
}
Expand All @@ -135,16 +148,16 @@ func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*Trace
}

func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) {
keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) {
return s.TraceID, s
keyedSpans := beam.ParDo(agg.beamScope, func(s *ptrace.Span) (pcommon.TraceID, *ptrace.Span) {
return s.TraceID(), s
}, input)

groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans)
depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink {
depLinks := beam.ParDo(agg.beamScope, func(_ pcommon.TraceID, spansIter func(*ptrace.Span) bool) []*model.DependencyLink {
deps := map[string]*model.DependencyLink{}
var span *model.Span
var span *ptrace.Span
for spansIter(span) {
processSpan(deps, span, agg.traces)
processSpanOtel(deps, span, agg.traces)
}
return depMapToSlice(deps)
}, groupedSpans)
Expand All @@ -167,35 +180,45 @@ func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, inpu
}()
}

// processSpan is a copy from the memory storage plugin
func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) {
parentSpan := seekToSpan(s, traces)
if parentSpan != nil {
if parentSpan.Process.ServiceName == s.Process.ServiceName {
return
}
depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentSpan.Process.ServiceName,
Child: s.Process.ServiceName,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
}
}
func processSpanOtel(deps map[string]*model.DependencyLink, span *ptrace.Span, traces map[pcommon.TraceID]*TraceState) {
traceID := span.TraceID()
parentSpanID := span.ParentSpanID()

func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span {
traceState, ok := traces[span.TraceID]
traceState, ok := traces[traceID]
if !ok {
return nil
return
}

parentSpan := traceState.spanMap[parentSpanID]
if parentSpan == nil {
return
}

parentTraceState := traces[traceID]
if parentTraceState == nil {
return
}

parentService := parentTraceState.serviceName
currentService := traceState.serviceName

if parentService == currentService || parentService == "" || currentService == "" {
return
}

depKey := parentService + "&&&" + currentService
if _, ok := deps[depKey]; !ok {
deps[depKey] = &model.DependencyLink{
Parent: parentService,
Child: currentService,
CallCount: 1,
}
} else {
deps[depKey].CallCount++
}
return traceState.spanMap[span.ParentSpanID()]
}

// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin
// depMapToSlice converts dependency map to slice
func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink {
retMe := make([]*model.DependencyLink, 0, len(deps))
for _, dep := range deps {
Expand Down
172 changes: 172 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
)

// MockDependencyWriter is a mock implementation of spanstore.Writer
type MockDependencyWriter struct {
mock.Mock
}

func (m *MockDependencyWriter) WriteSpan(ctx context.Context, span *model.Span) error {
args := m.Called(ctx, span)
return args.Error(0)
}

func (m *MockDependencyWriter) WriteDependencies(ctx context.Context, ts time.Time, deps []model.DependencyLink) error {
args := m.Called(ctx, ts, deps)
return args.Error(0)
}

func TestAggregator(t *testing.T) {
// Create mock writer
mockWriter := new(MockDependencyWriter)

// Create config
cfg := Config{
AggregationInterval: 100 * time.Millisecond,
InactivityTimeout: 50 * time.Millisecond,
}

// Create logger
logger := zap.NewNop()
telemetrySettings := component.TelemetrySettings{
Logger: logger,
}

// Create aggregator
agg := newDependencyAggregator(cfg, telemetrySettings, mockWriter)

// Start aggregator
closeChan := make(chan struct{})
agg.Start(closeChan)
defer close(closeChan)

// Create test spans
traceID := createTraceID(1)
parentSpanID := createSpanID(2)
childSpanID := createSpanID(3)

// Create parent span
parentSpan := createSpan(traceID, parentSpanID, pcommon.SpanID{}, "service1")

// Create child span
childSpan := createSpan(traceID, childSpanID, parentSpanID, "service2")

// Setup mock expectations
mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.MatchedBy(func(deps []model.DependencyLink) bool {
if len(deps) != 1 {
return false
}
dep := deps[0]
return dep.Parent == "service1" && dep.Child == "service2" && dep.CallCount == 1
})).Return(nil)

// Handle spans
ctx := context.Background()
agg.HandleSpan(ctx, parentSpan, "service1")
agg.HandleSpan(ctx, childSpan, "service2")

// Wait for processing and verify
assert.Eventually(t, func() bool {
return mockWriter.AssertExpectations(t)
}, time.Second, 10*time.Millisecond, "Dependencies were not written as expected")
}

func TestAggregatorInactivityTimeout(t *testing.T) {
mockWriter := new(MockDependencyWriter)
cfg := Config{
AggregationInterval: 1 * time.Second,
InactivityTimeout: 50 * time.Millisecond,
}

agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter)
closeChan := make(chan struct{})
agg.Start(closeChan)
defer close(closeChan)

traceID := createTraceID(1)
spanID := createSpanID(1)
span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1")

mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil)

ctx := context.Background()
agg.HandleSpan(ctx, span, "service1")

assert.Eventually(t, func() bool {
agg.tracesLock.RLock()
defer agg.tracesLock.RUnlock()
return len(agg.traces) == 0
}, time.Second, 10*time.Millisecond, "Trace was not cleared after inactivity timeout")
}

func TestAggregatorClose(t *testing.T) {
mockWriter := new(MockDependencyWriter)
cfg := Config{
AggregationInterval: 1 * time.Second,
InactivityTimeout: 1 * time.Second,
}

agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter)
closeChan := make(chan struct{})
agg.Start(closeChan)

traceID := createTraceID(1)
spanID := createSpanID(1)
span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1")

ctx := context.Background()
agg.HandleSpan(ctx, span, "service1")

mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil)

close(closeChan)
err := agg.Close()
require.NoError(t, err)

assert.Eventually(t, func() bool {
agg.tracesLock.RLock()
defer agg.tracesLock.RUnlock()
return len(agg.traces) == 0
}, time.Second, 10*time.Millisecond, "Traces were not cleared after close")
}

// Helper functions

func createTraceID(id byte) pcommon.TraceID {
var traceID [16]byte
traceID[15] = id
return pcommon.TraceID(traceID)
}

func createSpanID(id byte) pcommon.SpanID {
var spanID [8]byte
spanID[7] = id
return pcommon.SpanID(spanID)
}

func createSpan(traceID pcommon.TraceID, spanID pcommon.SpanID, parentSpanID pcommon.SpanID, serviceName string) ptrace.Span {
span := ptrace.NewSpan()
span.SetTraceID(traceID)
span.SetSpanID(spanID)
span.SetParentSpanID(parentSpanID)
// Additional span attributes could be set here if needed
return span
}
Loading

0 comments on commit 3c7ef35

Please sign in to comment.