Skip to content

Commit

Permalink
otelgrpc: Fix NewClientHandler to emit proper request/response metrics (
Browse files Browse the repository at this point in the history
#6250)

Tested by removing `metricdatatest.IgnoreValue()` from stas_handler
tests, and see that only the duration metrics differ.

---------

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Oct 16, 2024
1 parent 8a26744 commit d9c9bbe
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Transform nil attribute values to `log.Value` zero value instead of panicking in `go.opentelemetry.io/contrib/bridges/otellogrus`. (#6237)
- Transform nil attribute values to `log.Value` zero value instead of panicking in `go.opentelemetry.io/contrib/bridges/otelzap`. (#6237)
- Transform nil attribute values to `log.Value` zero value instead of `log.StringValue("<nil>")` in `go.opentelemetry.io/contrib/bridges/otelslog`. (#6246)
- Fix `NewClientHandler` so that `rpc.client.request.*` metrics measure requests instead of responses and `rpc.client.responses.*` metrics measure responses instead of requests in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#6250)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
52 changes: 35 additions & 17 deletions instrumentation/google.golang.org/grpc/otelgrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type config struct {
tracer trace.Tracer
meter metric.Meter

rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
rpcDuration metric.Float64Histogram
rpcInBytes metric.Int64Histogram
rpcOutBytes metric.Int64Histogram
rpcInMessages metric.Int64Histogram
rpcOutMessages metric.Int64Histogram
}

// Option applies an option value for a config.
Expand Down Expand Up @@ -96,46 +96,64 @@ func newConfig(opts []Option, role string) *config {
}
}

c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
rpcRequestSize, err := c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcRequestSize == nil {
c.rpcRequestSize = noop.Int64Histogram{}
if rpcRequestSize == nil {
rpcRequestSize = noop.Int64Histogram{}
}
}

c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
rpcResponseSize, err := c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
if c.rpcResponseSize == nil {
c.rpcResponseSize = noop.Int64Histogram{}
if rpcResponseSize == nil {
rpcResponseSize = noop.Int64Histogram{}
}
}

c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
rpcRequestsPerRPC, err := c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcRequestsPerRPC == nil {
c.rpcRequestsPerRPC = noop.Int64Histogram{}
if rpcRequestsPerRPC == nil {
rpcRequestsPerRPC = noop.Int64Histogram{}
}
}

c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
rpcResponsesPerRPC, err := c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
if c.rpcResponsesPerRPC == nil {
c.rpcResponsesPerRPC = noop.Int64Histogram{}
if rpcResponsesPerRPC == nil {
rpcResponsesPerRPC = noop.Int64Histogram{}
}
}

switch role {
case "client":
c.rpcInBytes = rpcResponseSize
c.rpcInMessages = rpcResponsesPerRPC
c.rpcOutBytes = rpcRequestSize
c.rpcOutMessages = rpcRequestsPerRPC
case "server":
c.rpcInBytes = rpcRequestSize
c.rpcInMessages = rpcRequestsPerRPC
c.rpcOutBytes = rpcResponseSize
c.rpcOutMessages = rpcResponsesPerRPC
default:
c.rpcInBytes = noop.Int64Histogram{}
c.rpcInMessages = noop.Int64Histogram{}
c.rpcOutBytes = noop.Int64Histogram{}
c.rpcOutMessages = noop.Int64Histogram{}
}

return c
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ func TestNilInstruments(t *testing.T) {

ctx := context.Background()
assert.NotPanics(t, func() { c.rpcDuration.Record(ctx, 0) }, "rpcDuration")
assert.NotPanics(t, func() { c.rpcRequestSize.Record(ctx, 0) }, "rpcRequestSize")
assert.NotPanics(t, func() { c.rpcResponseSize.Record(ctx, 0) }, "rpcResponseSize")
assert.NotPanics(t, func() { c.rpcRequestsPerRPC.Record(ctx, 0) }, "rpcRequestsPerRPC")
assert.NotPanics(t, func() { c.rpcResponsesPerRPC.Record(ctx, 0) }, "rpcResponsesPerRPC")
assert.NotPanics(t, func() { c.rpcInBytes.Record(ctx, 0) }, "rpcInBytes")
assert.NotPanics(t, func() { c.rpcOutBytes.Record(ctx, 0) }, "rpcOutBytes")
assert.NotPanics(t, func() { c.rpcInMessages.Record(ctx, 0) }, "rpcInMessages")
assert.NotPanics(t, func() { c.rpcOutMessages.Record(ctx, 0) }, "rpcOutMessages")
}

type meterProvider struct {
Expand Down
23 changes: 12 additions & 11 deletions instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@ import (
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
)

type gRPCContextKey struct{}

type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
record bool
inMessages int64
outMessages int64
metricAttrs []attribute.KeyValue
record bool
}

type serverHandler struct {
Expand Down Expand Up @@ -150,8 +151,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.inMessages, 1)
c.rpcInBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.ReceivedEvent {
Expand All @@ -166,8 +167,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
messageId = atomic.AddInt64(&gctx.outMessages, 1)
c.rpcOutBytes.Record(ctx, int64(rs.Length), metric.WithAttributeSet(attribute.NewSet(metricAttrs...)))
}

if c.SentEvent {
Expand Down Expand Up @@ -213,8 +214,8 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool

c.rpcDuration.Record(ctx, elapsedTime, recordOpts...)
if gctx != nil {
c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), recordOpts...)
c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), recordOpts...)
c.rpcInMessages.Record(ctx, atomic.LoadInt64(&gctx.inMessages), recordOpts...)
c.rpcOutMessages.Record(ctx, atomic.LoadInt64(&gctx.outMessages), recordOpts...)
}
default:
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/status"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"

"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"

testpb "google.golang.org/grpc/interop/grpc_testing"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal/test"
)

var (
Expand Down Expand Up @@ -803,10 +802,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
Max: metricdata.NewExtrema(int64(314167)),
Min: metricdata.NewExtrema(int64(314167)),
Max: metricdata.NewExtrema(int64(271840)),
Min: metricdata.NewExtrema(int64(271840)),
Count: 1,
Sum: 314167,
Sum: 271840,
},
{
Attributes: attribute.NewSet(
Expand All @@ -815,11 +814,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 4,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45912)),
Min: metricdata.NewExtrema(int64(12)),
Count: 4,
Sum: 74948,
},
{
Attributes: attribute.NewSet(
Expand All @@ -828,11 +827,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 93082,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(21)),
Min: metricdata.NewExtrema(int64(21)),
Count: 1,
Sum: 21,
},
{
Attributes: attribute.NewSet(
Expand All @@ -841,11 +840,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45918)),
Min: metricdata.NewExtrema(int64(16)),
Count: 4,
Sum: 93082,
Sum: 74969,
},
},
},
Expand Down Expand Up @@ -878,10 +877,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
Max: metricdata.NewExtrema(int64(271840)),
Min: metricdata.NewExtrema(int64(271840)),
Max: metricdata.NewExtrema(int64(314167)),
Min: metricdata.NewExtrema(int64(314167)),
Count: 1,
Sum: 271840,
Sum: 314167,
},
{
Attributes: attribute.NewSet(
Expand All @@ -890,11 +889,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45912)),
Min: metricdata.NewExtrema(int64(12)),
Count: 4,
Sum: 74948,
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand All @@ -903,11 +902,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(21)),
Min: metricdata.NewExtrema(int64(21)),
Count: 1,
Sum: 21,
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 93082,
},
{
Attributes: attribute.NewSet(
Expand All @@ -916,11 +915,11 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
semconv.RPCSystemGRPC,
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2},
Max: metricdata.NewExtrema(int64(45918)),
Min: metricdata.NewExtrema(int64(16)),
BucketCounts: []uint64{0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2},
Max: metricdata.NewExtrema(int64(58987)),
Min: metricdata.NewExtrema(int64(13)),
Count: 4,
Sum: 74969,
Sum: 93082,
},
},
},
Expand Down Expand Up @@ -969,10 +968,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand All @@ -983,10 +982,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Count: 1,
Sum: 4,
Sum: 1,
},
{
Attributes: attribute.NewSet(
Expand Down Expand Up @@ -1049,10 +1048,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Count: 1,
Sum: 4,
Sum: 1,
},
{
Attributes: attribute.NewSet(
Expand All @@ -1063,10 +1062,10 @@ func checkClientMetrics(t *testing.T, reader metric.Reader) {
testMetricAttr),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Max: metricdata.NewExtrema(int64(1)),
Min: metricdata.NewExtrema(int64(1)),
Max: metricdata.NewExtrema(int64(4)),
Min: metricdata.NewExtrema(int64(4)),
Count: 1,
Sum: 1,
Sum: 4,
},
{
Attributes: attribute.NewSet(
Expand Down

0 comments on commit d9c9bbe

Please sign in to comment.