Skip to content

Commit

Permalink
chore(deps): add OTEL tracing and span logging for GRPC server (#361)
Browse files Browse the repository at this point in the history
### TL;DR

Removed `trace.WithNewRoot()` from OpenTelemetry span creation and added tracing to GRPC requests.

### What changed?

- Removed `trace.WithNewRoot()` parameter from all `otel.Tracer(TracerName).Start()` calls across multiple files.
- Added OpenTelemetry tracing to GRPC requests in the `logGRPC` function.
- Updated the context in `logGRPC` to include trace and span IDs.

### How to test?

1. Run the CSI driver with OpenTelemetry tracing enabled.
2. Perform various CSI operations (e.g., CreateVolume, DeleteVolume, NodePublishVolume).
3. Verify that the traces are correctly propagated and that GRPC requests are now included in the trace.
4. Ensure that removing `trace.WithNewRoot()` doesn't negatively impact the existing tracing functionality.

### Why make this change?

This change improves the tracing capabilities of the CSI driver:

1. Removing `trace.WithNewRoot()` allows for better trace continuity across operations, potentially providing more context in distributed tracing scenarios.
2. Adding tracing to GRPC requests provides more granular insights into the communication between components, helping with debugging and performance analysis.

These improvements will enhance observability and make it easier to diagnose issues in production environments.
  • Loading branch information
sergeyberezansky authored Oct 10, 2024
2 parents aa14ee0 + 864d588 commit 5e45be0
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
15 changes: 7 additions & 8 deletions pkg/wekafs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -209,7 +208,7 @@ func (cs *ControllerServer) initializeSemaphore(ctx context.Context, op string)

func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
op := "CreateVolume"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down Expand Up @@ -329,7 +328,7 @@ func DeleteVolumeError(ctx context.Context, errorCode codes.Code, errorMessage s

func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
op := "DeleteVolume"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down Expand Up @@ -399,7 +398,7 @@ func ExpandVolumeError(ctx context.Context, errorCode codes.Code, errorMessage s

func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
op := "ExpandVolume"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
volumeID := req.GetVolumeId()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).
Expand Down Expand Up @@ -491,7 +490,7 @@ func CreateSnapshotError(ctx context.Context, errorCode codes.Code, errorMessage

func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
op := "CreateSnapshot"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down Expand Up @@ -566,7 +565,7 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
op := "DeleteSnapshot"
snapshotID := req.GetSnapshotId()
secrets := req.GetSecrets()
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down Expand Up @@ -624,7 +623,7 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
op := "ControllerGetCapabilities"
result := "SUCCESS"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand All @@ -651,7 +650,7 @@ func ValidateVolumeCapsError(ctx context.Context, errorCode codes.Code, errorMes
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
op := "ValidateVolumeCapabilities"
volumeID := req.GetVolumeId()
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down
5 changes: 2 additions & 3 deletions pkg/wekafs/identityserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -49,7 +48,7 @@ func NewIdentityServer(name, version string, config *DriverConfig) *identityServ
func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
op := "GetPluginInfo"
result := "SUCCESS"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down Expand Up @@ -103,7 +102,7 @@ func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*c
func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
op := "GetPluginCapabilities"
result := "SUCCESS"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down
7 changes: 3 additions & 4 deletions pkg/wekafs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -252,7 +251,7 @@ func NodePublishVolumeError(ctx context.Context, errorCode codes.Code, errorMess
func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
op := "NodePublishVolume"
volumeID := req.GetVolumeId()
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down Expand Up @@ -412,7 +411,7 @@ func NodeUnpublishVolumeError(ctx context.Context, errorCode codes.Code, errorMe
func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
op := "NodeUnpublishVolume"
result := "FAILURE"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down Expand Up @@ -504,7 +503,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
func (ns *NodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
op := "NodeGetInfo"
result := "SUCCESS"
ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot())
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)

Expand Down
5 changes: 4 additions & 1 deletion pkg/wekafs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
"net"
"os"
Expand Down Expand Up @@ -131,7 +132,9 @@ func parseEndpoint(ep string) (string, string, error) {
}

func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx = log.With().Logger().WithContext(ctx)
ctx, span := otel.Tracer(TracerName).Start(ctx, "GrpcRequest")
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Logger().WithContext(ctx)
if info.FullMethod != "/csi.v1.Identity/Probe" {
// suppress annoying probe messages
log.Ctx(ctx).Trace().Str("method", info.FullMethod).Str("request", protosanitizer.StripSecrets(req).String()).Msg("GRPC request")
Expand Down

0 comments on commit 5e45be0

Please sign in to comment.