From 3c7ef3569b67775f10279bfe953ee6b8dc029fde Mon Sep 17 00:00:00 2001 From: yunmaoQu <2643354262@qq.com> Date: Thu, 30 Jan 2025 06:46:09 +0000 Subject: [PATCH] fix Signed-off-by: yunmaoQu <2643354262@qq.com> --- .../dependencyprocessor/aggregator.go | 123 ++++++++----- .../dependencyprocessor/aggregator_test.go | 172 ++++++++++++++++++ .../processors/dependencyprocessor/config.go | 29 ++- .../processors/dependencyprocessor/factory.go | 8 +- .../dependencyprocessor/processor.go | 60 ++++-- .../{e2e_test.go => processor_test.go} | 0 plugin/storage/memory/memory.go | 71 ++------ 7 files changed, 330 insertions(+), 133 deletions(-) create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go rename cmd/jaeger/internal/processors/dependencyprocessor/{e2e_test.go => processor_test.go} (100%) diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go index 02ed6d149be..0f03d2afc41 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go @@ -11,17 +11,21 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage/spanstore" ) +var beamInitOnce sync.Once + type dependencyAggregator struct { config *Config telset component.TelemetrySettings - dependencyWriter *memory.Store - traces map[model.TraceID]*TraceState + dependencyWriter spanstore.Writer + traces map[pcommon.TraceID]*TraceState tracesLock sync.RWMutex closeChan chan struct{} beamPipeline *beam.Pipeline @@ -29,20 +33,23 @@ type dependencyAggregator struct { } type TraceState struct { - spans []*model.Span - spanMap map[model.SpanID]*model.Span + spans []*ptrace.Span + spanMap map[pcommon.SpanID]*ptrace.Span lastUpdateTime time.Time timer *time.Timer + serviceName string } -func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator { - beam.Init() +func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator { + beamInitOnce.Do(func() { + beam.Init() + }) p, s := beam.NewPipelineWithRoot() return &dependencyAggregator{ config: &cfg, telset: telset, dependencyWriter: dependencyWriter, - traces: make(map[model.TraceID]*TraceState), + traces: make(map[pcommon.TraceID]*TraceState), beamPipeline: p, beamScope: s, } @@ -56,9 +63,9 @@ func (agg *dependencyAggregator) Start(closeChan chan struct{}) { for { select { case <-ticker.C: - agg.processTraces(context.Background()) // Pass context + agg.processTraces(context.Background()) case <-agg.closeChan: - agg.processTraces(context.Background()) // Pass context + agg.processTraces(context.Background()) return } } @@ -76,29 +83,34 @@ func (agg *dependencyAggregator) Close() error { return nil } -func (agg *dependencyAggregator) HandleSpan(span *model.Span) { +func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) { agg.tracesLock.Lock() defer agg.tracesLock.Unlock() - traceState, ok := agg.traces[span.TraceID] + traceID := span.TraceID() + spanID := span.SpanID() + + traceState, ok := agg.traces[traceID] if !ok { traceState = &TraceState{ - spans: []*model.Span{}, - spanMap: make(map[model.SpanID]*model.Span), + spans: []*ptrace.Span{}, + spanMap: make(map[pcommon.SpanID]*ptrace.Span), lastUpdateTime: time.Now(), + serviceName: serviceName, } - agg.traces[span.TraceID] = traceState + agg.traces[traceID] = traceState } - traceState.spans = append(traceState.spans, span) - traceState.spanMap[span.SpanID] = span + spanCopy := span + traceState.spans = append(traceState.spans, &spanCopy) + traceState.spanMap[spanID] = &spanCopy traceState.lastUpdateTime = time.Now() if traceState.timer != nil { traceState.timer.Stop() } traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() { - agg.processTraces(context.Background()) // Pass context + agg.processTraces(ctx) }) } @@ -109,8 +121,9 @@ func (agg *dependencyAggregator) processTraces(ctx context.Context) { return } traces := agg.traces - agg.traces = make(map[model.TraceID]*TraceState) + agg.traces = make(map[pcommon.TraceID]*TraceState) agg.tracesLock.Unlock() + for _, traceState := range traces { if traceState.timer != nil { traceState.timer.Stop() @@ -123,8 +136,8 @@ func (agg *dependencyAggregator) processTraces(ctx context.Context) { } } -func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection { - var allSpans []*model.Span +func (agg *dependencyAggregator) createBeamInput(traces map[pcommon.TraceID]*TraceState) beam.PCollection { + var allSpans []*ptrace.Span for _, traceState := range traces { allSpans = append(allSpans, traceState.spans...) } @@ -135,16 +148,16 @@ func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*Trace } func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) { - keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) { - return s.TraceID, s + keyedSpans := beam.ParDo(agg.beamScope, func(s *ptrace.Span) (pcommon.TraceID, *ptrace.Span) { + return s.TraceID(), s }, input) groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans) - depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink { + depLinks := beam.ParDo(agg.beamScope, func(_ pcommon.TraceID, spansIter func(*ptrace.Span) bool) []*model.DependencyLink { deps := map[string]*model.DependencyLink{} - var span *model.Span + var span *ptrace.Span for spansIter(span) { - processSpan(deps, span, agg.traces) + processSpanOtel(deps, span, agg.traces) } return depMapToSlice(deps) }, groupedSpans) @@ -167,35 +180,45 @@ func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, inpu }() } -// processSpan is a copy from the memory storage plugin -func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) { - parentSpan := seekToSpan(s, traces) - if parentSpan != nil { - if parentSpan.Process.ServiceName == s.Process.ServiceName { - return - } - depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName - if _, ok := deps[depKey]; !ok { - deps[depKey] = &model.DependencyLink{ - Parent: parentSpan.Process.ServiceName, - Child: s.Process.ServiceName, - CallCount: 1, - } - } else { - deps[depKey].CallCount++ - } - } -} +func processSpanOtel(deps map[string]*model.DependencyLink, span *ptrace.Span, traces map[pcommon.TraceID]*TraceState) { + traceID := span.TraceID() + parentSpanID := span.ParentSpanID() -func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span { - traceState, ok := traces[span.TraceID] + traceState, ok := traces[traceID] if !ok { - return nil + return + } + + parentSpan := traceState.spanMap[parentSpanID] + if parentSpan == nil { + return + } + + parentTraceState := traces[traceID] + if parentTraceState == nil { + return + } + + parentService := parentTraceState.serviceName + currentService := traceState.serviceName + + if parentService == currentService || parentService == "" || currentService == "" { + return + } + + depKey := parentService + "&&&" + currentService + if _, ok := deps[depKey]; !ok { + deps[depKey] = &model.DependencyLink{ + Parent: parentService, + Child: currentService, + CallCount: 1, + } + } else { + deps[depKey].CallCount++ } - return traceState.spanMap[span.ParentSpanID()] } -// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin +// depMapToSlice converts dependency map to slice func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink { retMe := make([]*model.DependencyLink, 0, len(deps)) for _, dep := range deps { diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go new file mode 100644 index 00000000000..95b4876aa12 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go @@ -0,0 +1,172 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" +) + +// MockDependencyWriter is a mock implementation of spanstore.Writer +type MockDependencyWriter struct { + mock.Mock +} + +func (m *MockDependencyWriter) WriteSpan(ctx context.Context, span *model.Span) error { + args := m.Called(ctx, span) + return args.Error(0) +} + +func (m *MockDependencyWriter) WriteDependencies(ctx context.Context, ts time.Time, deps []model.DependencyLink) error { + args := m.Called(ctx, ts, deps) + return args.Error(0) +} + +func TestAggregator(t *testing.T) { + // Create mock writer + mockWriter := new(MockDependencyWriter) + + // Create config + cfg := Config{ + AggregationInterval: 100 * time.Millisecond, + InactivityTimeout: 50 * time.Millisecond, + } + + // Create logger + logger := zap.NewNop() + telemetrySettings := component.TelemetrySettings{ + Logger: logger, + } + + // Create aggregator + agg := newDependencyAggregator(cfg, telemetrySettings, mockWriter) + + // Start aggregator + closeChan := make(chan struct{}) + agg.Start(closeChan) + defer close(closeChan) + + // Create test spans + traceID := createTraceID(1) + parentSpanID := createSpanID(2) + childSpanID := createSpanID(3) + + // Create parent span + parentSpan := createSpan(traceID, parentSpanID, pcommon.SpanID{}, "service1") + + // Create child span + childSpan := createSpan(traceID, childSpanID, parentSpanID, "service2") + + // Setup mock expectations + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.MatchedBy(func(deps []model.DependencyLink) bool { + if len(deps) != 1 { + return false + } + dep := deps[0] + return dep.Parent == "service1" && dep.Child == "service2" && dep.CallCount == 1 + })).Return(nil) + + // Handle spans + ctx := context.Background() + agg.HandleSpan(ctx, parentSpan, "service1") + agg.HandleSpan(ctx, childSpan, "service2") + + // Wait for processing and verify + assert.Eventually(t, func() bool { + return mockWriter.AssertExpectations(t) + }, time.Second, 10*time.Millisecond, "Dependencies were not written as expected") +} + +func TestAggregatorInactivityTimeout(t *testing.T) { + mockWriter := new(MockDependencyWriter) + cfg := Config{ + AggregationInterval: 1 * time.Second, + InactivityTimeout: 50 * time.Millisecond, + } + + agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter) + closeChan := make(chan struct{}) + agg.Start(closeChan) + defer close(closeChan) + + traceID := createTraceID(1) + spanID := createSpanID(1) + span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1") + + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + ctx := context.Background() + agg.HandleSpan(ctx, span, "service1") + + assert.Eventually(t, func() bool { + agg.tracesLock.RLock() + defer agg.tracesLock.RUnlock() + return len(agg.traces) == 0 + }, time.Second, 10*time.Millisecond, "Trace was not cleared after inactivity timeout") +} + +func TestAggregatorClose(t *testing.T) { + mockWriter := new(MockDependencyWriter) + cfg := Config{ + AggregationInterval: 1 * time.Second, + InactivityTimeout: 1 * time.Second, + } + + agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter) + closeChan := make(chan struct{}) + agg.Start(closeChan) + + traceID := createTraceID(1) + spanID := createSpanID(1) + span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1") + + ctx := context.Background() + agg.HandleSpan(ctx, span, "service1") + + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + close(closeChan) + err := agg.Close() + require.NoError(t, err) + + assert.Eventually(t, func() bool { + agg.tracesLock.RLock() + defer agg.tracesLock.RUnlock() + return len(agg.traces) == 0 + }, time.Second, 10*time.Millisecond, "Traces were not cleared after close") +} + +// Helper functions + +func createTraceID(id byte) pcommon.TraceID { + var traceID [16]byte + traceID[15] = id + return pcommon.TraceID(traceID) +} + +func createSpanID(id byte) pcommon.SpanID { + var spanID [8]byte + spanID[7] = id + return pcommon.SpanID(spanID) +} + +func createSpan(traceID pcommon.TraceID, spanID pcommon.SpanID, parentSpanID pcommon.SpanID, serviceName string) ptrace.Span { + span := ptrace.NewSpan() + span.SetTraceID(traceID) + span.SetSpanID(spanID) + span.SetParentSpanID(parentSpanID) + // Additional span attributes could be set here if needed + return span +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go index ed6750cded1..22bf357c3af 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/config.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -4,18 +4,31 @@ package dependencyprocessor import ( + "errors" "time" - - "github.com/jaegertracing/jaeger/plugin/storage/memory" ) type Config struct { - // AggregationInterval defines how often the processor aggregates dependencies. - // This controls the frequency of flushing dependency data to storage. - // Default dependency aggregation interval: 10 seconds - AggregationInterval time.Duration `yaml:"aggregation_interval"` + // AggregationInterval defines the length of aggregation window after + // which the accumulated dependencies are flushed into storage. + AggregationInterval time.Duration `yaml:"aggregation_interval" valid:"gt=0"` // InactivityTimeout specifies the duration of inactivity after which a trace // is considered complete and ready for dependency aggregation. - // Default trace completion timeout: 2 seconds of inactivity - InactivityTimeout time.Duration `yaml:"inactivity_timeout"` + InactivityTimeout time.Duration `yaml:"inactivity_timeout" valid:"gt=0"` + // StorageName specifies the storage backend to use for dependency data. + StorageName string `yaml:"storage_name" valid:"required"` +} + +// Validate checks the configuration fields for validity. +func (c *Config) Validate() error { + if c.AggregationInterval <= 0 { + return errors.New("aggregation_interval must be greater than 0") + } + if c.InactivityTimeout <= 0 { + return errors.New("inactivity_timeout must be greater than 0") + } + if c.StorageName == "" { + return errors.New("storage_name must be provided and cannot be empty") + } + return nil } diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go index ee063431f83..96b24abe516 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go @@ -10,8 +10,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" - - "github.com/jaegertracing/jaeger/plugin/storage/memory" ) // componentType is the name of this processor in configuration. @@ -28,8 +26,8 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - AggregationInterval: 10 * time.Second, - InactivityTimeout: 2 * time.Second, + AggregationInterval: 10 * time.Minute, + InactivityTimeout: 2 * time.Minute, } } @@ -42,7 +40,7 @@ func createTracesProcessor( ) (processor.Traces, error) { oCfg := cfg.(*Config) - dp := newDependencyProcessor(*oCfg, set.TelemetrySettings, oCfg.Store, nextConsumer) + dp := newDependencyProcessor(*oCfg, set.TelemetrySettings, nextConsumer) return dp, nil } diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go index 62a59cf4fc5..56fd8f24454 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go @@ -9,32 +9,49 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - "github.com/jaegertracing/jaeger/plugin/storage/memory" - "github.com/jaegertracing/jaeger/storage_v2/v1adapter" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/storage/spanstore" ) type dependencyProcessor struct { config *Config aggregator *dependencyAggregator // Define the aggregator below. telset component.TelemetrySettings - dependencyWriter *memory.Store + dependencyWriter spanstore.Writer closeChan chan struct{} nextConsumer consumer.Traces } -func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store, nextConsumer consumer.Traces) *dependencyProcessor { +func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, nextConsumer consumer.Traces) *dependencyProcessor { return &dependencyProcessor{ - config: &cfg, - telset: telset, - dependencyWriter: dependencyWriter, - closeChan: make(chan struct{}), - nextConsumer: nextConsumer, + config: &cfg, + telset: telset, + closeChan: make(chan struct{}), + nextConsumer: nextConsumer, } } func (tp *dependencyProcessor) Start(_ context.Context, host component.Host) error { + storageName := tp.config.StorageName + if storageName == "" { + storageName = "memory" + } + + f, err := jaegerstorage.GetStorageFactory(storageName, host) + if err != nil { + return fmt.Errorf("failed to get storage factory: %w", err) + } + + writer, err := f.CreateSpanWriter() + if err != nil { + return fmt.Errorf("failed to create dependency writer: %w", err) + } + + tp.dependencyWriter = writer + tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter) tp.aggregator.Start(tp.closeChan) return nil @@ -42,7 +59,6 @@ func (tp *dependencyProcessor) Start(_ context.Context, host component.Host) err // Shutdown implements processor.Traces func (tp *dependencyProcessor) Shutdown(ctx context.Context) error { - close(tp.closeChan) if tp.aggregator != nil { if err := tp.aggregator.Close(); err != nil { return fmt.Errorf("failed to stop the dependency aggregator : %w", err) @@ -57,14 +73,26 @@ func (p *dependencyProcessor) Capabilities() consumer.Capabilities { } func (dp *dependencyProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - batches := v1adapter.ProtoFromTraces(td) - for _, batch := range batches { - for _, span := range batch.Spans { - if span.Process == nil { - span.Process = batch.Process + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) + serviceName := getServiceName(rs.Resource()) + + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + dp.aggregator.HandleSpan(ctx, span, serviceName) } - dp.aggregator.HandleSpan(span) } } + return dp.nextConsumer.ConsumeTraces(ctx, td) } + +func getServiceName(resource pcommon.Resource) string { + serviceNameAttr, found := resource.Attributes().Get("service.name") + if !found { + return "" + } + return serviceNameAttr.Str() +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor_test.go similarity index 100% rename from cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go rename to cmd/jaeger/internal/processors/dependencyprocessor/processor_test.go diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 43c0478c60b..02dff26a584 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -24,22 +24,20 @@ type Store struct { sync.RWMutex // Each tenant gets a copy of default config. // In the future this can be extended to contain per-tenant configuration. - defaultConfig Configuration - perTenant map[string]*Tenant - useNewDependencies bool + defaultConfig Configuration + perTenant map[string]*Tenant } // Tenant is an in-memory store of traces for a single tenant type Tenant struct { sync.RWMutex - ids []*model.TraceID - traces map[model.TraceID]*model.Trace - services map[string]struct{} - operations map[string]map[spanstore.Operation]struct{} - deduper adjuster.Adjuster - config Configuration - index int - dependencyLinks map[string]*model.DependencyLink + ids []*model.TraceID + traces map[model.TraceID]*model.Trace + services map[string]struct{} + operations map[string]map[spanstore.Operation]struct{} + deduper adjuster.Adjuster + config Configuration + index int } // NewStore creates an unbounded in-memory store @@ -50,21 +48,19 @@ func NewStore() *Store { // WithConfiguration creates a new in memory storage based on the given configuration func WithConfiguration(cfg Configuration) *Store { return &Store{ - defaultConfig: cfg, - perTenant: make(map[string]*Tenant), - useNewDependencies: false, + defaultConfig: cfg, + perTenant: make(map[string]*Tenant), } } func newTenant(cfg Configuration) *Tenant { return &Tenant{ - ids: make([]*model.TraceID, cfg.MaxTraces), - traces: map[model.TraceID]*model.Trace{}, - services: map[string]struct{}{}, - operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.ZipkinSpanIDUniquifier(), - config: cfg, - dependencyLinks: make(map[string]*model.DependencyLink), + ids: make([]*model.TraceID, cfg.MaxTraces), + traces: map[model.TraceID]*model.Trace{}, + services: map[string]struct{}{}, + operations: map[string]map[spanstore.Operation]struct{}{}, + deduper: adjuster.ZipkinSpanIDUniquifier(), + config: cfg, } } @@ -87,9 +83,6 @@ func (st *Store) getTenant(tenantID string) *Tenant { // GetDependencies returns dependencies between services func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - if st.useNewDependencies { - return st.getDependenciesNew(ctx) - } m := st.getTenant(tenancy.GetTenant(ctx)) // deduper used below can modify the spans, so we take an exclusive lock m.Lock() @@ -126,36 +119,6 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback return retMe, nil } -func (st *Store) getDependenciesNew(ctx context.Context) ([]model.DependencyLink, error) { - m := st.getTenant(tenancy.GetTenant(ctx)) - m.RLock() - defer m.RUnlock() - retMe := make([]model.DependencyLink, 0, len(m.dependencyLinks)) - for _, dep := range m.dependencyLinks { - retMe = append(retMe, *dep) - } - return retMe, nil -} - -func (st *Store) WriteDependencies(ctx context.Context, ts time.Time, dependencies []*model.DependencyLink) error { - m := st.getTenant(tenancy.GetTenant(ctx)) - m.Lock() - defer m.Unlock() - for _, dep := range dependencies { - key := dep.Parent + "&&&" + dep.Child - if _, ok := m.dependencyLinks[key]; !ok { - m.dependencyLinks[key] = &model.DependencyLink{ - Parent: dep.Parent, - Child: dep.Child, - CallCount: dep.CallCount, - } - } else { - m.dependencyLinks[key].CallCount += dep.CallCount - } - } - return nil -} - func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { for _, s := range trace.Spans { if s.SpanID == spanID {