From a9786a7e208a5cf4588d256fdd04872bcbbbe51c Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Mon, 21 Oct 2024 13:35:27 +0300 Subject: [PATCH] fix --- internal/balancer/balancer.go | 5 ++- internal/repeater/repeater.go | 10 ++--- log/driver.go | 1 + otel/config.go | 72 ------------------------------- otel/driver.go | 12 +++--- otel/helpers.go | 79 +++++++++++++++++++++++++++++++++++ otel/noop.go | 20 ++++----- trace/driver.go | 8 ++-- trace/driver_gtrace.go | 6 ++- 9 files changed, 113 insertions(+), 100 deletions(-) create mode 100644 otel/helpers.go diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go index 08c927540..273993ded 100644 --- a/internal/balancer/balancer.go +++ b/internal/balancer/balancer.go @@ -88,8 +88,10 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context) (err error) { onDone = trace.DriverOnBalancerClusterDiscoveryAttempt( b.driverConfig.Trace(), &ctx, stack.FunctionID( - "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).clusterDiscoveryAttempt"), + "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).clusterDiscoveryAttempt", + ), address, + b.driverConfig.Database(), ) endpoints []endpoint.Endpoint localDC string @@ -130,6 +132,7 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi stack.FunctionID( "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"), b.config.DetectNearestDC, + b.driverConfig.Database(), ) previous = b.connections().All() ) diff --git a/internal/repeater/repeater.go b/internal/repeater/repeater.go index 480998606..d4d6c6bb1 100644 --- a/internal/repeater/repeater.go +++ b/internal/repeater/repeater.go @@ -139,12 +139,8 @@ func (r *repeater) Force() { } } -func (r *repeater) wakeUp(ctx context.Context, e Event) (err error) { - if err = ctx.Err(); err != nil { - return err - } - - ctx = WithEvent(ctx, e) +func (r *repeater) wakeUp(e Event) (err error) { + ctx := WithEvent(context.Background(), e) onDone := trace.DriverOnRepeaterWakeUp(r.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater.(*repeater).wakeUp"), @@ -203,7 +199,7 @@ func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) { if event == EventCancel { return } - if err := r.wakeUp(ctx, event); err != nil { + if err := r.wakeUp(event); err != nil { forceIndex++ } else { forceIndex = 0 diff --git a/log/driver.go b/log/driver.go index c6848f0a3..caa201209 100644 --- a/log/driver.go +++ b/log/driver.go @@ -464,6 +464,7 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver { ctx := with(*info.Context, TRACE, "ydb", "driver", "balancer", "update") l.Log(ctx, "start", kv.Bool("needLocalDC", info.NeedLocalDC), + kv.String("database", info.Database), ) start := time.Now() diff --git a/otel/config.go b/otel/config.go index d153044a9..4683ebdc1 100644 --- a/otel/config.go +++ b/otel/config.go @@ -3,9 +3,7 @@ package otel import ( "context" - "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/internal/kv" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) @@ -27,73 +25,3 @@ type ( Start(ctx context.Context, operationName string, attributes ...KeyValue) (context.Context, Span) } ) - -func childSpanWithReplaceCtx( - cfg Config, - ctx *context.Context, - operationName string, - fields ...KeyValue, -) (s Span) { - *ctx, s = childSpan(cfg, *ctx, operationName, fields...) - - return s -} - -func childSpan( - cfg Config, - ctx context.Context, //nolint:revive - operationName string, - fields ...KeyValue, -) (context.Context, Span) { - return cfg.Start(ctx, - operationName, - fields..., - ) -} - -func finish( - s Span, - err error, - fields ...KeyValue, -) { - if err != nil { - s.Msg(err.Error(), kv.Error(err)) - } - s.End(fields...) -} - -func logError( - s Span, - err error, - fields ...KeyValue, -) { - var ydbErr ydb.Error - if xerrors.As(err, &ydbErr) { - fields = append(fields, - kv.Error(err), - kv.Int("error.ydb.code", int(ydbErr.Code())), - kv.String("error.ydb.name", ydbErr.Name()), - ) - } - s.Msg(err.Error(), fields...) -} - -func logToParentSpan( - cfg Config, - ctx context.Context, //nolint:revive - msg string, - fields ...KeyValue, //nolint:unparam -) { - parent := cfg.SpanFromContext(ctx) - parent.Msg(msg, fields...) -} - -func logToParentSpanError( - cfg Config, - ctx context.Context, //nolint:revive - err error, - fields ...KeyValue, //nolint:unparam -) { - parent := cfg.SpanFromContext(ctx) - logError(parent, err, fields...) -} diff --git a/otel/driver.go b/otel/driver.go index 5a55e1c7d..27c638ece 100644 --- a/otel/driver.go +++ b/otel/driver.go @@ -265,6 +265,7 @@ func driver(cfg Config) trace.Driver { //nolint:gocyclo,funlen cfg, info.Context, info.Call.FunctionID(), + kv.String("address", info.Address), ) return func(info trace.DriverBalancerClusterDiscoveryAttemptDoneInfo) { @@ -275,9 +276,11 @@ func driver(cfg Config) trace.Driver { //nolint:gocyclo,funlen if cfg.Details()&trace.DriverBalancerEvents == 0 { return nil } - needLocalDC := info.NeedLocalDC - functionID := info.Call.FunctionID() - s := cfg.SpanFromContext(*info.Context) + start := childSpanWithReplaceCtx(cfg, info.Context, + info.Call.FunctionID(), + kv.String("database", info.Database), + kv.Bool("need_local_dc", info.NeedLocalDC), + ) return func(info trace.DriverBalancerUpdateDoneInfo) { var ( @@ -294,12 +297,11 @@ func driver(cfg Config) trace.Driver { //nolint:gocyclo,funlen for i, e := range info.Dropped { dropped[i] = e.String() } - s.Msg(functionID, + start.End( kv.String("local_dc", info.LocalDC), kv.Strings("endpoints", endpoints), kv.Strings("added", added), kv.Strings("dropped", dropped), - kv.Bool("need_local_dc", needLocalDC), ) } }, diff --git a/otel/helpers.go b/otel/helpers.go new file mode 100644 index 000000000..cb8efc78f --- /dev/null +++ b/otel/helpers.go @@ -0,0 +1,79 @@ +package otel + +import ( + "context" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/kv" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" +) + +func childSpanWithReplaceCtx( + cfg Config, + ctx *context.Context, + operationName string, + fields ...KeyValue, +) (s Span) { + *ctx, s = childSpan(cfg, *ctx, operationName, fields...) + + return s +} + +func childSpan( + cfg Config, + ctx context.Context, //nolint:revive + operationName string, + fields ...KeyValue, +) (context.Context, Span) { + return cfg.Start(ctx, + operationName, + fields..., + ) +} + +func finish( + s Span, + err error, + fields ...KeyValue, +) { + if err != nil { + s.Msg(err.Error(), kv.Error(err)) + } + s.End(fields...) +} + +func logError( + s Span, + err error, + fields ...KeyValue, +) { + var ydbErr ydb.Error + if xerrors.As(err, &ydbErr) { + fields = append(fields, + kv.Error(err), + kv.Int("error.ydb.code", int(ydbErr.Code())), + kv.String("error.ydb.name", ydbErr.Name()), + ) + } + s.Msg(err.Error(), fields...) +} + +func logToParentSpan( + cfg Config, + ctx context.Context, //nolint:revive + msg string, + fields ...KeyValue, //nolint:unparam +) { + parent := cfg.SpanFromContext(ctx) + parent.Msg(msg, fields...) +} + +func logToParentSpanError( + cfg Config, + ctx context.Context, //nolint:revive + err error, + fields ...KeyValue, //nolint:unparam +) { + parent := cfg.SpanFromContext(ctx) + logError(parent, err, fields...) +} diff --git a/otel/noop.go b/otel/noop.go index cc8226019..f2ff1ade4 100644 --- a/otel/noop.go +++ b/otel/noop.go @@ -16,16 +16,6 @@ type ( noopSpan struct{} ) -func (noopSpan) Relation(Span) {} - -func (noopSpan) TraceID() string { - return "" -} - -func (n noopSpan) Msg(string, ...KeyValue) {} - -func (n noopSpan) End(...KeyValue) {} - func (noopConfig) Details() trace.Details { return 0 } @@ -37,3 +27,13 @@ func (noopConfig) SpanFromContext(context.Context) Span { func (noopConfig) Start(ctx context.Context, _ string, _ ...KeyValue) (context.Context, Span) { return ctx, noopSpan{} } + +func (noopSpan) Relation(Span) {} + +func (noopSpan) TraceID() string { + return "" +} + +func (n noopSpan) Msg(string, ...KeyValue) {} + +func (n noopSpan) End(...KeyValue) {} diff --git a/trace/driver.go b/trace/driver.go index 31a55d088..f7644b1ac 100644 --- a/trace/driver.go +++ b/trace/driver.go @@ -189,6 +189,7 @@ type ( Context *context.Context Call call NeedLocalDC bool + Database string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals DriverBalancerUpdateDoneInfo struct { @@ -203,9 +204,10 @@ type ( // Pointer to context provide replacement of context in trace callback function. // Warning: concurrent access to pointer on client side must be excluded. // Safe replacement of context are provided only inside callback function - Context *context.Context - Call call - Address string + Context *context.Context + Call call + Address string + Database string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals DriverBalancerClusterDiscoveryAttemptDoneInfo struct { diff --git a/trace/driver_gtrace.go b/trace/driver_gtrace.go index 32c008d09..4abf7ef74 100644 --- a/trace/driver_gtrace.go +++ b/trace/driver_gtrace.go @@ -1551,11 +1551,12 @@ func DriverOnBalancerChooseEndpoint(t *Driver, c *context.Context, call call) fu } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call call, address string) func(error) { +func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call call, address string, database string) func(error) { var p DriverBalancerClusterDiscoveryAttemptStartInfo p.Context = c p.Call = call p.Address = address + p.Database = database res := t.onBalancerClusterDiscoveryAttempt(p) return func(e error) { var p DriverBalancerClusterDiscoveryAttemptDoneInfo @@ -1564,11 +1565,12 @@ func DriverOnBalancerClusterDiscoveryAttempt(t *Driver, c *context.Context, call } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalDC bool) func(endpoints []EndpointInfo, added []EndpointInfo, dropped []EndpointInfo, localDC string) { +func DriverOnBalancerUpdate(t *Driver, c *context.Context, call call, needLocalDC bool, database string) func(endpoints []EndpointInfo, added []EndpointInfo, dropped []EndpointInfo, localDC string) { var p DriverBalancerUpdateStartInfo p.Context = c p.Call = call p.NeedLocalDC = needLocalDC + p.Database = database res := t.onBalancerUpdate(p) return func(endpoints []EndpointInfo, added []EndpointInfo, dropped []EndpointInfo, localDC string) { var p DriverBalancerUpdateDoneInfo