Skip to content

Commit

Permalink
added implementation in grpc client
Browse files Browse the repository at this point in the history
Signed-off-by: Alok Kumar Singh <dev.alok.singh123@gmail.com>
  • Loading branch information
akstron committed Dec 5, 2024
1 parent 0d66a50 commit 50b8079
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 150 deletions.
10 changes: 8 additions & 2 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,14 @@ message InsertProbabilitiesAndQPSResponse {
}

message GetThroughputRequest {
google.protobuf.Timestamp start_time = 1;
google.protobuf.Timestamp end_time = 2;
google.protobuf.Timestamp start_time = 1[
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
google.protobuf.Timestamp end_time = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
}

message GetThroughputResponse {
Expand Down
113 changes: 113 additions & 0 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"errors"
"fmt"
"io"
"strconv"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

samplingStoreModel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
Expand All @@ -39,6 +41,7 @@ type GRPCClient struct {
capabilitiesClient storage_v1.PluginCapabilitiesClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
samplingStoreClient storage_v1.SamplingStorePluginClient
}

func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) *GRPCClient {
Expand Down Expand Up @@ -266,3 +269,113 @@ func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace,

return &trace, nil
}

func (c *GRPCClient) InsertThroughput(ctx context.Context, throughputs []*samplingStoreModel.Throughput) error {
storageV1Throughput := []*storage_v1.Throughput{}
for _, throughput := range throughputs {
probsAsArray := []float64{}
for prob := range throughput.Probabilities {
probInFloat, err := strconv.ParseFloat(prob, 64)
if err != nil {
return err
}
probsAsArray = append(probsAsArray, probInFloat)
}

storageV1Throughput = append(storageV1Throughput, &storage_v1.Throughput{
Service: throughput.Service,
Operation: throughput.Operation,
Count: throughput.Count,
Probabilities: probsAsArray,
})
}

_, err := c.samplingStoreClient.InsertThroughput(ctx, &storage_v1.InsertThroughputRequest{
Throughput: storageV1Throughput,
})
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

return nil
}

func (c *GRPCClient) InsertProbabilitiesAndQPS(ctx context.Context, hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error {
stringFloatMapToV1StringFloatMap := func(in map[string]float64) *storage_v1.StringFloatMap {
return &storage_v1.StringFloatMap{
StringFloatMap: in,
}
}

convertToV1Map := func(in map[string]map[string]float64) map[string]*storage_v1.StringFloatMap {
res := make(map[string]*storage_v1.StringFloatMap)
for k, v := range in {
res[k] = stringFloatMapToV1StringFloatMap(v)
}
return res
}

_, err := c.samplingStoreClient.InsertProbabilitiesAndQPS(ctx, &storage_v1.InsertProbabilitiesAndQPSRequest{
Hostname: hostname,
Probabilities: &storage_v1.ServiceOperationProbabilities{
ServiceOperationProbabilities: convertToV1Map(probabilities),
},
Qps: &storage_v1.ServiceOperationQPS{
ServiceOperationQPS: convertToV1Map(qps),
},
})
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

return nil
}

func (c *GRPCClient) GetThroughput(ctx context.Context, start, end time.Time) ([]*samplingStoreModel.Throughput, error) {
resp, err := c.samplingStoreClient.GetThroughput(ctx, &storage_v1.GetThroughputRequest{
StartTime: start,
EndTime: end,
})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

resThroughput := []*samplingStoreModel.Throughput{}

for _, throughput := range resp.Throughput {
probsAsSet := make(map[string]struct{})
for _, prob := range throughput.Probabilities {
probsAsSet[strconv.FormatFloat(prob, 'E', -1, 64)] = struct{}{}
}

resThroughput = append(resThroughput, &samplingStoreModel.Throughput{
Service: throughput.Service,
Operation: throughput.Operation,
Count: throughput.Count,
Probabilities: probsAsSet,
})
}

return resThroughput, nil
}

func (c *GRPCClient) GetLatestProbabilities(ctx context.Context) (samplingStoreModel.ServiceOperationProbabilities, error) {
resp, err := c.samplingStoreClient.GetLatestProbabilities(ctx, &storage_v1.GetLatestProbabilitiesRequest{})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

v1StringFloatMapToStringFloatMap := func(in *storage_v1.StringFloatMap) map[string]float64 {
return in.StringFloatMap
}

convertToMap := func(in map[string]*storage_v1.StringFloatMap) map[string]map[string]float64 {
res := make(map[string]map[string]float64)
for k, v := range in {
res[k] = v1StringFloatMapToStringFloatMap(v)
}
return res
}

return convertToMap(resp.ServiceOperationProbabilities.ServiceOperationProbabilities), nil
}
Loading

0 comments on commit 50b8079

Please sign in to comment.