Skip to content

Commit

Permalink
[chore][tests] Clean up integration tests to remove archive reader / …
Browse files Browse the repository at this point in the history
…writer (#6625)

## Which problem is this PR solving?
- Towards #6065

## Description of the changes
- This PR cleans up the integration test suite for jaeger-v1 by removing
the archive related fields for all tests since they were being skipped
anyway, with the exception of elasticsearch.
- Since the only storage that has a difference between primary and
archive storage is ElasticSearch, this PR moves the archive trace test
to `elasticsearch_test.go`.

## How was this change tested?
- CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
  • Loading branch information
mahadzaryab1 authored Jan 29, 2025
1 parent 9d3a516 commit 4ac036b
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 92 deletions.
3 changes: 1 addition & 2 deletions cmd/jaeger/internal/integration/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ func TestBadgerStorage(t *testing.T) {
s := &E2EStorageIntegration{
ConfigFile: "../../config-badger.yaml",
StorageIntegration: integration.StorageIntegration{
SkipArchiveTest: true,
CleanUp: purge,
CleanUp: purge,

// TODO: remove this once badger supports returning spanKind from GetOperations
// Cf https://github.com/jaegertracing/jaeger/issues/1922
Expand Down
1 change: 0 additions & 1 deletion cmd/jaeger/internal/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func TestCassandraStorage(t *testing.T) {
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,

SkipList: integration.CassandraSkippedTests,
},
Expand Down
1 change: 0 additions & 1 deletion cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func TestKafkaStorage(t *testing.T) {
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
GetDependenciesReturnsSource: true,
SkipArchiveTest: true,
},
EnvVarOverrides: envVarOverrides,
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/jaeger/internal/integration/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ func TestMemoryStorage(t *testing.T) {
s := &E2EStorageIntegration{
ConfigFile: "../../config.yaml",
StorageIntegration: integration.StorageIntegration{
SkipArchiveTest: true,
CleanUp: purge,
CleanUp: purge,
},
}
s.e2eInitialize(t, "memory")
Expand Down
2 changes: 0 additions & 2 deletions plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func TestBadgerStorage(t *testing.T) {
})
s := &BadgerIntegrationStorage{
StorageIntegration: StorageIntegration{
SkipArchiveTest: true,

// TODO: remove this badger supports returning spanKind from GetOperations
GetOperationsMissingSpanKind: true,
},
Expand Down
17 changes: 1 addition & 16 deletions plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import (

type CassandraStorageIntegration struct {
StorageIntegration
factory *cassandra.Factory
archiveFactory *cassandra.Factory
factory *cassandra.Factory
}

func newCassandraStorageIntegration() *CassandraStorageIntegration {
Expand All @@ -42,7 +41,6 @@ func newCassandraStorageIntegration() *CassandraStorageIntegration {

func (s *CassandraStorageIntegration) cleanUp(t *testing.T) {
require.NoError(t, s.factory.Purge(context.Background()))
require.NoError(t, s.archiveFactory.Purge(context.Background()))
}

func (*CassandraStorageIntegration) initializeCassandraFactory(t *testing.T, flags []string, factoryInit func() *cassandra.Factory) *cassandra.Factory {
Expand All @@ -67,27 +65,14 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
"--cassandra.username=" + username,
"--cassandra.keyspace=jaeger_v1_dc1",
}, cassandra.NewFactory)
af := s.initializeCassandraFactory(t, []string{
"--cassandra-archive.keyspace=jaeger_v1_dc1_archive",
"--cassandra-archive.enabled=true",
"--cassandra-archive.servers=127.0.0.1",
"--cassandra-archive.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator",
"--cassandra-archive.password=" + password,
"--cassandra-archive.username=" + username,
}, cassandra.NewArchiveFactory)
s.factory = f
s.archiveFactory = af
var err error
spanWriter, err := f.CreateSpanWriter()
require.NoError(t, err)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = af.CreateSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = af.CreateSpanWriter()
require.NoError(t, err)
s.SamplingStore, err = f.CreateSamplingStore(0)
require.NoError(t, err)
s.initializeDependencyReaderAndWriter(t, f)
Expand Down
38 changes: 36 additions & 2 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand All @@ -52,6 +54,9 @@ type ESStorageIntegration struct {
client *elastic.Client
v8Client *elasticsearch8.Client

ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer

factory *es.Factory
archiveFactory *es.Factory
}
Expand Down Expand Up @@ -174,15 +179,15 @@ func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) {
require.NoError(t, healthCheck(c))
s := &ESStorageIntegration{
StorageIntegration: StorageIntegration{
Fixtures: LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"),
SkipArchiveTest: false,
Fixtures: LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"),
// TODO: remove this flag after ES supports returning spanKind
// Issue https://github.com/jaegertracing/jaeger/issues/1923
GetOperationsMissingSpanKind: true,
},
}
s.initializeES(t, c, allTagsAsFields)
s.RunAll(t)
t.Run("ArchiveTrace", s.testArchiveTrace)
}

func TestElasticsearchStorage(t *testing.T) {
Expand Down Expand Up @@ -249,3 +254,32 @@ func (s *ESStorageIntegration) cleanESIndexTemplates(t *testing.T, prefix string
}
return nil
}

// testArchiveTrace validates that a trace with a start time older than maxSpanAge
// can still be retrieved via the archive storage. This ensures archived traces are
// accessible even when their age exceeds the retention period for primary storage.
// This test applies only to Elasticsearch (ES) storage.
func (s *ESStorageIntegration) testArchiveTrace(t *testing.T) {
s.skipIfNeeded(t)
defer s.cleanUp(t)
tID := model.NewTraceID(uint64(11), uint64(22))
expected := &model.Span{
OperationName: "archive_span",
StartTime: time.Now().Add(-maxSpanAge * 5).Truncate(time.Microsecond),
TraceID: tID,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
}

require.NoError(t, s.ArchiveSpanWriter.WriteSpan(context.Background(), expected))

var actual *model.Trace
found := s.waitForCondition(t, func(_ *testing.T) bool {
var err error
actual, err = s.ArchiveSpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: tID})
return err == nil && len(actual.Spans) == 1
})
require.True(t, found)
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
}
24 changes: 3 additions & 21 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import (

type GRPCStorageIntegrationTestSuite struct {
StorageIntegration
flags []string
archiveFlags []string
factory *grpc.Factory
archiveFactory *grpc.Factory
remoteStorage *RemoteMemoryStorage
archiveRemoteStorage *RemoteMemoryStorage
flags []string
factory *grpc.Factory
remoteStorage *RemoteMemoryStorage
}

func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
s.remoteStorage = StartNewRemoteMemoryStorage(t, ports.RemoteStorageGRPC)
s.archiveRemoteStorage = StartNewRemoteMemoryStorage(t, ports.RemoteStorageGRPC+1)

initFactory := func(f *grpc.Factory, flags []string) {
v, command := config.Viperize(f.AddFlags)
Expand All @@ -41,22 +37,15 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
require.NoError(t, f.Initialize(metrics.NullFactory, logger))
}
f := grpc.NewFactory()
af := grpc.NewArchiveFactory()
initFactory(f, s.flags)
initFactory(af, s.archiveFlags)
s.factory = f
s.archiveFactory = af

spanWriter, err := f.CreateSpanWriter()
require.NoError(t, err)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = af.CreateSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = af.CreateSpanWriter()
require.NoError(t, err)

// TODO DependencyWriter is not implemented in grpc store

Expand All @@ -65,9 +54,7 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {

func (s *GRPCStorageIntegrationTestSuite) close(t *testing.T) {
require.NoError(t, s.factory.Close())
require.NoError(t, s.archiveFactory.Close())
s.remoteStorage.Close(t)
s.archiveRemoteStorage.Close(t)
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) {
Expand All @@ -85,11 +72,6 @@ func TestGRPCRemoteStorage(t *testing.T) {
"--grpc-storage.server=localhost:17271",
"--grpc-storage.tls.enabled=false",
},
archiveFlags: []string{
"--grpc-storage-archive.enabled=true",
"--grpc-storage-archive.server=localhost:17272",
"--grpc-storage-archive.tls.enabled=false",
},
}
s.initialize(t)
defer s.close(t)
Expand Down
47 changes: 6 additions & 41 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/storage/samplingstore"
samplemodel "github.com/jaegertracing/jaeger/storage/samplingstore/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
Expand All @@ -43,14 +42,12 @@ var fixtures embed.FS
// Some implementations may declare multiple tests, with different settings,
// and RunAll() under different conditions.
type StorageIntegration struct {
TraceWriter tracestore.Writer
TraceReader tracestore.Reader
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
DependencyWriter depstore.Writer
DependencyReader depstore.Reader
SamplingStore samplingstore.Store
Fixtures []*QueryFixtures
TraceWriter tracestore.Writer
TraceReader tracestore.Reader
DependencyWriter depstore.Writer
DependencyReader depstore.Reader
SamplingStore samplingstore.Store
Fixtures []*QueryFixtures

// TODO: remove this after all storage backends return spanKind from GetOperations
GetOperationsMissingSpanKind bool
Expand All @@ -59,9 +56,6 @@ type StorageIntegration struct {

GetDependenciesReturnsSource bool

// Skip Archive Test if not supported by the storage backend
SkipArchiveTest bool

// List of tests which has to be skipped, it can be regex too.
SkipList []string

Expand Down Expand Up @@ -182,34 +176,6 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
}
}

func (s *StorageIntegration) testArchiveTrace(t *testing.T) {
s.skipIfNeeded(t)
if s.SkipArchiveTest {
t.Skip("Skipping ArchiveTrace test because archive reader or writer is nil")
}
defer s.cleanUp(t)
tID := model.NewTraceID(uint64(11), uint64(22))
expected := &model.Span{
OperationName: "archive_span",
StartTime: time.Now().Add(-time.Hour * 72 * 5).Truncate(time.Microsecond),
TraceID: tID,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
}

require.NoError(t, s.ArchiveSpanWriter.WriteSpan(context.Background(), expected))

var actual *model.Trace
found := s.waitForCondition(t, func(_ *testing.T) bool {
var err error
actual, err = s.ArchiveSpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: tID})
return err == nil && len(actual.Spans) == 1
})
require.True(t, found)
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
}

func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
s.skipIfNeeded(t)
defer s.cleanUp(t)
Expand Down Expand Up @@ -596,7 +562,6 @@ func (s *StorageIntegration) insertThroughput(t *testing.T) {
// RunAll runs all integration tests
func (s *StorageIntegration) RunAll(t *testing.T) {
s.RunSpanStoreTests(t)
t.Run("ArchiveTrace", s.testArchiveTrace)
t.Run("GetDependencies", s.testGetDependencies)
t.Run("GetThroughput", s.testGetThroughput)
t.Run("GetLatestProbability", s.testGetLatestProbability)
Expand Down
1 change: 0 additions & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) {
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)
s.CleanUp = func(_ *testing.T) {}
s.SkipArchiveTest = true
}

// The ingester consumes spans from kafka and writes them to an in-memory traceStore
Expand Down
3 changes: 0 additions & 3 deletions plugin/storage/integration/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) {
s.logger, _ = testutils.NewLogger()

store := memory.NewStore()
archiveStore := memory.NewStore()
s.SamplingStore = memory.NewSamplingStore(2)
s.TraceReader = v1adapter.NewTraceReader(store)
s.TraceWriter = v1adapter.NewTraceWriter(store)
s.ArchiveSpanReader = archiveStore
s.ArchiveSpanWriter = archiveStore

// TODO DependencyWriter is not implemented in memory store

Expand Down

0 comments on commit 4ac036b

Please sign in to comment.