Skip to content

Commit

Permalink
[fix] Refactor archive storage initialization and remove error log (#…
Browse files Browse the repository at this point in the history
…6636)

## Which problem is this PR solving?
- Fixes #6634 

## Description of the changes
- Remove the error logs for archive storage as it is not an error if
archive storage cannot be configured
- Remove the duplicate logs by only initializing the archive storage
once for both query service and query service v2

## How was this change tested?
```
make run-all-in-one
{"level":"info","ts":1738292734.78397,"caller":"app/flags.go:168","msg":"Archive storage not initialized"}
```

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
  • Loading branch information
mahadzaryab1 authored Jan 31, 2025
1 parent 619a9f7 commit 378cbc9
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 140 deletions.
4 changes: 2 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ by default uses only in-memory database.`,
// query
queryTelset := baseTelset // copy
queryTelset.Metrics = queryMetricsFactory
querySvcOpts, v2querySvcOpts := qOpts.BuildQueryServiceOptions(storageFactory.InitArchiveStorage, logger)
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory.InitArchiveStorage, logger),
qOpts.BuildQueryServiceOptionsV2(storageFactory.InitArchiveStorage, logger),
svc, qOpts, querySvcOpts, v2querySvcOpts,
traceReader, dependencyReader, metricsQueryService,
tm, queryTelset,
)
Expand Down
36 changes: 15 additions & 21 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

Expand Down Expand Up @@ -140,41 +140,35 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
return qOpts, nil
}

type InitArchiveStorageFn func(logger *zap.Logger) (spanstore.Reader, spanstore.Writer)
type InitArchiveStorageFn func() (*storage.ArchiveStorage, error)

// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config
func (qOpts *QueryOptions) BuildQueryServiceOptions(
initArchiveStorageFn InitArchiveStorageFn,
logger *zap.Logger,
) *querysvc.QueryServiceOptions {
) (*querysvc.QueryServiceOptions, *v2querysvc.QueryServiceOptions) {
opts := &querysvc.QueryServiceOptions{
MaxClockSkewAdjust: qOpts.MaxClockSkewAdjust,
}
ar, aw := initArchiveStorageFn(logger)
if ar != nil && aw != nil {
opts.ArchiveSpanReader = ar
opts.ArchiveSpanWriter = aw
} else {
logger.Info("Archive storage not initialized")
}

return opts
}

func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(initArchiveStorageFn InitArchiveStorageFn, logger *zap.Logger) *v2querysvc.QueryServiceOptions {
opts := &v2querysvc.QueryServiceOptions{
v2Opts := &v2querysvc.QueryServiceOptions{
MaxClockSkewAdjust: qOpts.MaxClockSkewAdjust,
}
as, err := initArchiveStorageFn()
if err != nil {
logger.Error("Received an error when trying to initialize archive storage", zap.Error(err))
return opts, v2Opts
}

ar, aw := initArchiveStorageFn(logger)
if ar != nil && aw != nil {
opts.ArchiveTraceReader = v1adapter.NewTraceReader(ar)
opts.ArchiveTraceWriter = v1adapter.NewTraceWriter(aw)
if as != nil && as.Reader != nil && as.Writer != nil {
opts.ArchiveSpanReader = as.Reader
opts.ArchiveSpanWriter = as.Writer
v2Opts.ArchiveTraceReader = v1adapter.NewTraceReader(as.Reader)
v2Opts.ArchiveTraceWriter = v1adapter.NewTraceWriter(as.Writer)
} else {
logger.Info("Archive storage not initialized")
}

return opts
return opts, v2Opts
}

// stringSliceAsHeader parses a slice of strings and returns a http.Header.
Expand Down
122 changes: 60 additions & 62 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

Expand Down Expand Up @@ -86,69 +86,67 @@ func TestStringSliceAsHeader(t *testing.T) {
require.NoError(t, err)
}

func initializedFn(*zap.Logger) (spanstore.Reader, spanstore.Writer) {
return &spanstoremocks.Reader{}, &spanstoremocks.Writer{}
}

func uninitializedFn(*zap.Logger) (spanstore.Reader, spanstore.Writer) {
return nil, nil
}

func TestBuildQueryServiceOptions(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, qOpts)

qSvcOpts := qOpts.BuildQueryServiceOptions(initializedFn, zap.NewNop())
assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.ArchiveSpanReader)
assert.NotNil(t, qSvcOpts.ArchiveSpanWriter)
assert.Equal(t, defaultMaxClockSkewAdjust, qSvcOpts.MaxClockSkewAdjust)
}

func TestBuildQueryServiceOptions_NoArchiveStorage(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, qOpts)

logger, logBuf := testutils.NewLogger()
qSvcOpts := qOpts.BuildQueryServiceOptions(uninitializedFn, logger)
assert.NotNil(t, qSvcOpts)
assert.Nil(t, qSvcOpts.ArchiveSpanReader)
assert.Nil(t, qSvcOpts.ArchiveSpanWriter)
assert.Equal(t, defaultMaxClockSkewAdjust, qSvcOpts.MaxClockSkewAdjust)

require.Contains(t, logBuf.String(), "Archive storage not initialized")
}

func TestBuildQueryServiceOptionsV2(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, qOpts)

qSvcOpts := qOpts.BuildQueryServiceOptionsV2(initializedFn, zap.NewNop())

assert.NotNil(t, qSvcOpts)
assert.NotNil(t, qSvcOpts.ArchiveTraceReader)
assert.NotNil(t, qSvcOpts.ArchiveTraceWriter)
assert.Equal(t, defaultMaxClockSkewAdjust, qSvcOpts.MaxClockSkewAdjust)
}

func TestBuildQueryServiceOptionsV2_NoArchiveStorage(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
assert.NotNil(t, qOpts)

logger, logBuf := testutils.NewLogger()
qSvcOpts := qOpts.BuildQueryServiceOptionsV2(uninitializedFn, logger)
assert.Nil(t, qSvcOpts.ArchiveTraceReader)
assert.Nil(t, qSvcOpts.ArchiveTraceWriter)
tests := []struct {
name string
initFn func() (*storage.ArchiveStorage, error)
expectNilStorage bool
expectedLogEntry string
}{
{
name: "successful initialization",
initFn: func() (*storage.ArchiveStorage, error) {
return &storage.ArchiveStorage{
Reader: &spanstoremocks.Reader{},
Writer: &spanstoremocks.Writer{},
}, nil
},
expectNilStorage: false,
},
{
name: "error initializing archive storage",
initFn: func() (*storage.ArchiveStorage, error) {
return nil, assert.AnError
},
expectNilStorage: true,
expectedLogEntry: "Received an error when trying to initialize archive storage",
},
{
name: "no archive storage",
initFn: func() (*storage.ArchiveStorage, error) {
return nil, nil
},
expectNilStorage: true,
expectedLogEntry: "Archive storage not initialized",
},
}

require.Contains(t, logBuf.String(), "Archive storage not initialized")
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
v, _ := config.Viperize(AddFlags)
qOpts, err := new(QueryOptions).InitFromViper(v, zap.NewNop())
require.NoError(t, err)
require.NotNil(t, qOpts)

logger, logBuf := testutils.NewLogger()
qSvcOpts, v2qSvcOpts := qOpts.BuildQueryServiceOptions(test.initFn, logger)
require.Equal(t, defaultMaxClockSkewAdjust, qSvcOpts.MaxClockSkewAdjust)

if test.expectNilStorage {
require.Nil(t, qSvcOpts.ArchiveSpanReader)
require.Nil(t, qSvcOpts.ArchiveSpanWriter)
require.Nil(t, v2qSvcOpts.ArchiveTraceReader)
require.Nil(t, v2qSvcOpts.ArchiveTraceWriter)
} else {
require.NotNil(t, qSvcOpts.ArchiveSpanReader)
require.NotNil(t, qSvcOpts.ArchiveSpanWriter)
require.NotNil(t, v2qSvcOpts.ArchiveTraceReader)
require.NotNil(t, v2qSvcOpts.ArchiveTraceWriter)
}

require.Contains(t, logBuf.String(), test.expectedLogEntry)
})
}
}

func TestQueryOptionsPortAllocationFromFlags(t *testing.T) {
Expand Down
13 changes: 3 additions & 10 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,16 @@ func main() {
if err != nil {
logger.Fatal("Failed to create metrics query service", zap.Error(err))
}
queryServiceOptions := queryOpts.BuildQueryServiceOptions(
storageFactory.InitArchiveStorage,
logger,
)
querySvcOpts, v2querySvcOpts := queryOpts.BuildQueryServiceOptions(storageFactory.InitArchiveStorage, logger)
queryService := querysvc.NewQueryService(
traceReader,
dependencyReader,
*queryServiceOptions)
*querySvcOpts)

queryServiceOptionsV2 := queryOpts.BuildQueryServiceOptionsV2(
storageFactory.InitArchiveStorage,
logger,
)
queryServiceV2 := querysvcv2.NewQueryService(
traceReader,
dependencyReader,
*queryServiceOptionsV2)
*v2querySvcOpts)

tm := tenancy.NewManager(&queryOpts.Tenancy)
telset := baseTelset // copy
Expand Down
39 changes: 18 additions & 21 deletions plugin/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,37 +338,34 @@ func (f *Factory) initDownsamplingFromViper(v *viper.Viper) {
f.FactoryConfig.DownsamplingHashSalt = v.GetString(downsamplingHashSalt)
}

func (f *Factory) createArchiveSpanReader() (spanstore.Reader, error) {
factory, ok := f.archiveFactories[f.SpanReaderType]
if !ok {
return nil, fmt.Errorf("no %s backend registered for span store", f.SpanReaderType)
}
return factory.CreateSpanReader()
type ArchiveStorage struct {
Reader spanstore.Reader
Writer spanstore.Writer
}

func (f *Factory) createArchiveSpanWriter() (spanstore.Writer, error) {
factory, ok := f.archiveFactories[f.SpanWriterTypes[0]]
func (f *Factory) InitArchiveStorage() (*ArchiveStorage, error) {
factory, ok := f.archiveFactories[f.SpanReaderType]
if !ok {
return nil, fmt.Errorf("no %s backend registered for span store", f.SpanWriterTypes[0])
return nil, nil
}
return factory.CreateSpanWriter()
}

func (f *Factory) InitArchiveStorage(
logger *zap.Logger,
) (spanstore.Reader, spanstore.Writer) {
reader, err := f.createArchiveSpanReader()
reader, err := factory.CreateSpanReader()
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return nil, err
}

factory, ok = f.archiveFactories[f.SpanWriterTypes[0]]
if !ok {
return nil, nil
}
writer, err := f.createArchiveSpanWriter()
writer, err := factory.CreateSpanWriter()
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return nil, nil
return nil, err
}

return reader, writer
return &ArchiveStorage{
Reader: reader,
Writer: writer,
}, nil
}

var _ io.Closer = (*Factory)(nil)
Expand Down
Loading

0 comments on commit 378cbc9

Please sign in to comment.