Skip to content

Commit

Permalink
Restore For Merge Conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
  • Loading branch information
mahadzaryab1 committed Jan 25, 2025
1 parent 558b1fc commit 48ff461
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 1 deletion.
35 changes: 34 additions & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"errors"
"time"

"github.com/jaegertracing/jaeger/model"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
Expand Down Expand Up @@ -161,6 +164,36 @@ func (qs QueryService) GetCapabilities() StorageCapabilities {
}
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.BaseFactory, logger *zap.Logger) bool {
archiveFactory, ok := storageFactory.(storage.ArchiveFactory)
if !ok {
logger.Info("Archive storage not supported by the factory")
return false
}
reader, err := archiveFactory.CreateArchiveSpanReader()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
if err != nil {
logger.Error("Cannot init archive storage reader", zap.Error(err))
return false
}
writer, err := archiveFactory.CreateArchiveSpanWriter()
if errors.Is(err, storage.ErrArchiveStorageNotConfigured) || errors.Is(err, storage.ErrArchiveStorageNotSupported) {
logger.Info("Archive storage not created", zap.String("reason", err.Error()))
return false
}
if err != nil {
logger.Error("Cannot init archive storage writer", zap.Error(err))
return false
}
opts.ArchiveSpanReader = reader
opts.ArchiveSpanWriter = writer
return true
}

// hasArchiveStorage returns true if archive storage reader/writer are initialized.
func (opts *QueryServiceOptions) hasArchiveStorage() bool {
return opts.ArchiveSpanReader != nil && opts.ArchiveSpanWriter != nil
Expand Down
81 changes: 81 additions & 0 deletions plugin/storage/grpc/shared/archive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2020 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package shared

import (
"context"
"errors"
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var (
_ spanstore.Reader = (*archiveReader)(nil)
_ spanstore.Writer = (*archiveWriter)(nil)
)

// archiveReader wraps storage_v1.ArchiveSpanReaderPluginClient into spanstore.Reader
type archiveReader struct {
client storage_v1.ArchiveSpanReaderPluginClient
}

// ArchiveWriter wraps storage_v1.ArchiveSpanWriterPluginClient into spanstore.Writer
type archiveWriter struct {
client storage_v1.ArchiveSpanWriterPluginClient
}

// GetTrace takes a traceID and returns a Trace associated with that traceID from Archive Storage
func (r *archiveReader) GetTrace(ctx context.Context, q spanstore.GetTraceParameters) (*model.Trace, error) {
stream, err := r.client.GetArchiveTrace(ctx, &storage_v1.GetTraceRequest{
TraceID: q.TraceID,
StartTime: q.StartTime,
EndTime: q.EndTime,
})
if status.Code(err) == codes.NotFound {
return nil, spanstore.ErrTraceNotFound
}
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

return readTrace(stream)
}

// GetServices not used in archiveReader
func (*archiveReader) GetServices(context.Context) ([]string, error) {
return nil, errors.New("GetServices not implemented")
}

// GetOperations not used in archiveReader
func (*archiveReader) GetOperations(context.Context, spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
return nil, errors.New("GetOperations not implemented")
}

// FindTraces not used in archiveReader
func (*archiveReader) FindTraces(context.Context, *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
return nil, errors.New("FindTraces not implemented")
}

// FindTraceIDs not used in archiveReader
func (*archiveReader) FindTraceIDs(context.Context, *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
return nil, errors.New("FindTraceIDs not implemented")
}

// WriteSpan saves the span into Archive Storage
func (w *archiveWriter) WriteSpan(ctx context.Context, span *model.Span) error {
_, err := w.client.WriteArchiveSpan(ctx, &storage_v1.WriteSpanRequest{
Span: span,
})
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

return nil
}
106 changes: 106 additions & 0 deletions plugin/storage/grpc/shared/archive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2020 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package shared

import (
"context"
"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger-idl/model/v1"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/proto-gen/storage_v1/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

func TestArchiveWriter_WriteSpan(t *testing.T) {
mockSpan := &model.Span{
TraceID: mockTraceID,
SpanID: model.NewSpanID(1),
Process: &model.Process{},
}

archiveSpanWriter := new(mocks.ArchiveSpanWriterPluginClient)
archiveSpanWriter.On("WriteArchiveSpan", mock.Anything, &storage_v1.WriteSpanRequest{Span: mockSpan}).
Return(&storage_v1.WriteSpanResponse{}, nil)
writer := &archiveWriter{client: archiveSpanWriter}

err := writer.WriteSpan(context.Background(), mockSpan)
require.NoError(t, err)
}

func TestArchiveReader_GetTrace(t *testing.T) {
mockTraceID := model.NewTraceID(0, 123456)
mockSpan := model.Span{
TraceID: mockTraceID,
SpanID: model.NewSpanID(1),
Process: &model.Process{},
}
expected := &model.Trace{
Spans: []*model.Span{&mockSpan},
}

traceClient := new(mocks.ArchiveSpanReaderPlugin_GetArchiveTraceClient)
traceClient.On("Recv").Return(&storage_v1.SpansResponseChunk{
Spans: []model.Span{mockSpan},
}, nil).Once()
traceClient.On("Recv").Return(nil, io.EOF)

archiveSpanReader := new(mocks.ArchiveSpanReaderPluginClient)
archiveSpanReader.On("GetArchiveTrace", mock.Anything, &storage_v1.GetTraceRequest{
TraceID: mockTraceID,
}).Return(traceClient, nil)
reader := &archiveReader{client: archiveSpanReader}

trace, err := reader.GetTrace(context.Background(), spanstore.GetTraceParameters{
TraceID: mockTraceID,
})
require.NoError(t, err)
assert.Equal(t, expected, trace)
}

func TestArchiveReaderGetTrace_NoTrace(t *testing.T) {
mockTraceID := model.NewTraceID(0, 123456)

archiveSpanReader := new(mocks.ArchiveSpanReaderPluginClient)
archiveSpanReader.On("GetArchiveTrace", mock.Anything, &storage_v1.GetTraceRequest{
TraceID: mockTraceID,
}).Return(nil, status.Errorf(codes.NotFound, ""))
reader := &archiveReader{client: archiveSpanReader}

_, err := reader.GetTrace(context.Background(), spanstore.GetTraceParameters{
TraceID: mockTraceID,
})
assert.Equal(t, spanstore.ErrTraceNotFound, err)
}

func TestArchiveReader_FindTraceIDs(t *testing.T) {
reader := archiveReader{client: &mocks.ArchiveSpanReaderPluginClient{}}
_, err := reader.FindTraceIDs(context.Background(), nil)
require.Error(t, err)
}

func TestArchiveReader_FindTraces(t *testing.T) {
reader := archiveReader{client: &mocks.ArchiveSpanReaderPluginClient{}}
_, err := reader.FindTraces(context.Background(), nil)
require.Error(t, err)
}

func TestArchiveReader_GetOperations(t *testing.T) {
reader := archiveReader{client: &mocks.ArchiveSpanReaderPluginClient{}}
_, err := reader.GetOperations(context.Background(), spanstore.OperationQueryParameters{})
require.Error(t, err)
}

func TestArchiveReader_GetServices(t *testing.T) {
reader := archiveReader{client: &mocks.ArchiveSpanReaderPluginClient{}}
_, err := reader.GetServices(context.Background())
require.Error(t, err)
}

0 comments on commit 48ff461

Please sign in to comment.