Skip to content

Commit

Permalink
[storage] Add helper to storage extension for retrieving sampling sto…
Browse files Browse the repository at this point in the history
…re factory (jaegertracing#6689)
  • Loading branch information
mahadzaryab1 authored Feb 8, 2025
1 parent bc2a643 commit 24dfa4f
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 18 deletions.
14 changes: 14 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ func GetTraceStoreFactory(name string, host component.Host) (tracestore.Factory,
return v1adapter.NewFactory(f), nil
}

func GetSamplingStoreFactory(name string, host component.Host) (storage.SamplingStoreFactory, error) {
f, err := GetStorageFactory(name, host)
if err != nil {
return nil, err
}

ssf, ok := f.(storage.SamplingStoreFactory)
if !ok {
return nil, fmt.Errorf("storage '%s' does not support sampling store", name)
}

return ssf, nil
}

func findExtension(host component.Host) (Extension, error) {
var id component.ID
var comp component.Component
Expand Down
93 changes: 84 additions & 9 deletions cmd/jaeger/internal/extension/jaegerstorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,83 @@ func TestGetFactory(t *testing.T) {
require.NotNil(t, f3)
}

func TestGetSamplingStoreFactory(t *testing.T) {
tests := []struct {
name string
storageName string
expectedError string
setupFunc func(t *testing.T) component.Component
}{
{
name: "Supported",
storageName: "foo",
setupFunc: func(t *testing.T) component.Component {
traceStoreFactory := "foo"
return startStorageExtension(t, traceStoreFactory, "bar")
},
},
{
name: "NotFound",
storageName: "nonexistingstorage",
expectedError: "cannot find definition of storage",
setupFunc: func(t *testing.T) component.Component {
traceStoreFactory := "foo"
return startStorageExtension(t, traceStoreFactory, "bar")
},
},
{
name: "NotSupported",
storageName: "foo",
expectedError: "storage 'foo' does not support sampling store",
setupFunc: func(t *testing.T) component.Component {
versionResponse, err := json.Marshal(map[string]any{
"Version": map[string]any{
"Number": "7",
},
})
require.NoError(t, err)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Write(versionResponse)
}))
t.Cleanup(func() { server.Close() })

ext := makeStorageExtension(t, &Config{
TraceBackends: map[string]TraceBackend{
"foo": {
Elasticsearch: &esCfg.Configuration{
Servers: []string{server.URL},
LogLevel: "error",
},
},
},
})
require.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ext.Shutdown(context.Background()))
})
return ext
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ext := test.setupFunc(t)
host := storagetest.NewStorageHost().WithExtension(ID, ext)

ssf, err := GetSamplingStoreFactory(test.storageName, host)
if test.expectedError != "" {
require.ErrorContains(t, err, test.expectedError)
require.Nil(t, ssf)
} else {
require.NotNil(t, ssf)
}
})
}
}

func TestBadger(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
ext := makeStorageExtension(t, &Config{
TraceBackends: map[string]TraceBackend{
"foo": {
Badger: &badger.Config{
Expand All @@ -138,7 +213,7 @@ func TestBadger(t *testing.T) {
}

func TestGRPC(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
ext := makeStorageExtension(t, &Config{
TraceBackends: map[string]TraceBackend{
"foo": {
GRPC: &grpc.Config{
Expand All @@ -156,7 +231,7 @@ func TestGRPC(t *testing.T) {
}

func TestPrometheus(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
ext := makeStorageExtension(t, &Config{
MetricBackends: map[string]MetricBackend{
"foo": {
Prometheus: &promCfg.Configuration{
Expand All @@ -172,7 +247,7 @@ func TestPrometheus(t *testing.T) {
}

func TestStartError(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
ext := makeStorageExtension(t, &Config{
TraceBackends: map[string]TraceBackend{
"foo": {},
},
Expand All @@ -183,7 +258,7 @@ func TestStartError(t *testing.T) {
}

func TestMetricsStorageStartError(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
ext := makeStorageExtension(t, &Config{
MetricBackends: map[string]MetricBackend{
"foo": {
Prometheus: &promCfg.Configuration{},
Expand All @@ -195,7 +270,7 @@ func TestMetricsStorageStartError(t *testing.T) {
}

func testElasticsearchOrOpensearch(t *testing.T, cfg TraceBackend) {
ext := makeStorageExtenion(t, &Config{
ext := makeStorageExtension(t, &Config{
TraceBackends: map[string]TraceBackend{
"foo": cfg,
},
Expand Down Expand Up @@ -238,7 +313,7 @@ func TestXYZsearch(t *testing.T) {
func TestCassandraError(t *testing.T) {
// since we cannot successfully create storage factory for Cassandra
// without running a Cassandra server, we only test the error case.
ext := makeStorageExtenion(t, &Config{
ext := makeStorageExtension(t, &Config{
TraceBackends: map[string]TraceBackend{
"cassandra": {
Cassandra: &cassandra.Options{},
Expand All @@ -258,7 +333,7 @@ func noopTelemetrySettings() component.TelemetrySettings {
}
}

func makeStorageExtenion(t *testing.T, config *Config) component.Component {
func makeStorageExtension(t *testing.T, config *Config) component.Component {
extensionFactory := NewFactory()
ctx := context.Background()
ext, err := extensionFactory.Create(ctx,
Expand Down Expand Up @@ -292,7 +367,7 @@ func startStorageExtension(t *testing.T, memstoreName string, promstoreName stri
}
require.NoError(t, config.Validate())

ext := makeStorageExtenion(t, config)
ext := makeStorageExtension(t, config)
err := ext.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
Expand Down
11 changes: 3 additions & 8 deletions cmd/jaeger/internal/extension/remotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/adaptive"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy/file"
"github.com/jaegertracing/jaeger/internal/storage/v1"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/samplingstore"
"github.com/jaegertracing/jaeger/pkg/metrics"
)
Expand Down Expand Up @@ -184,14 +183,10 @@ func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error

func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error {
storageName := ext.cfg.Adaptive.SamplingStore
f, err := jaegerstorage.GetStorageFactory(storageName, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

storeFactory, ok := f.(storage.SamplingStoreFactory)
if !ok {
return fmt.Errorf("storage '%s' does not support sampling store", storageName)
storeFactory, err := jaegerstorage.GetSamplingStoreFactory(storageName, host)
if err != nil {
return fmt.Errorf("failed to obtain sampling store factory: %w", err)
}

store, err := storeFactory.CreateSamplingStore(ext.cfg.Adaptive.AggregationBuckets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestStartAdaptiveStrategyProviderErrors(t *testing.T) {
},
}
err := ext.startAdaptiveStrategyProvider(host)
require.ErrorContains(t, err, "cannot find storage factory")
require.ErrorContains(t, err, "failed to obtain sampling store factory")
}

func TestGetAdaptiveSamplingComponents(t *testing.T) {
Expand Down

0 comments on commit 24dfa4f

Please sign in to comment.