Skip to content

Commit

Permalink
fix: refactor router trace instrumentation (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
StarpTech authored Feb 5, 2024
1 parent 7bc4d89 commit 889d06c
Show file tree
Hide file tree
Showing 20 changed files with 816 additions and 464 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ CREATE TABLE IF NOT EXISTS traces (
StatusMessage String CODEC (ZSTD(3)),
OperationHash String CODEC (ZSTD(3)),
OperationContent String CODEC (ZSTD(3)),
OperationVariables String CODEC (ZSTD(3)),
OperationPersistedID String CODEC (ZSTD(3)),
HttpStatusCode String CODEC (ZSTD(3)),
HttpHost String CODEC (ZSTD(3)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ export class TraceRepository {
SpanAttributes['http.target'] as attrHttpTarget,
SpanAttributes['wg.subgraph.name'] as attrSubgraphName,
SpanAttributes['wg.engine.plan_cache_hit'] as attrEnginePlanCacheHit,
SpanAttributes['wg.engine.request_tracing_enabled'] as attrEngineRequestTracingEnabled
SpanAttributes['wg.engine.request_tracing_enabled'] as attrEngineRequestTracingEnabled,
SpanAttributes['wg.operation.variables'] as attrOperationVariables
FROM ${this.client.database}.otel_traces
WHERE (TraceId = trace_id) AND (Timestamp >= start) AND (Timestamp <= end) AND SpanAttributes['wg.organization.id'] = '${organizationID}'
ORDER BY Timestamp ASC
Expand Down Expand Up @@ -81,6 +82,7 @@ export class TraceRepository {
subgraphName: result.attrSubgraphName,
enginePlanCacheHit: result.attrEnginePlanCacheHit,
engineRequestTracingEnabled: result.attrEngineRequestTracingEnabled,
operationVariables: result.attrOperationVariables,
},
}));
}
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ services:
image: redis:${DC_REDIS_VERSION:-7.2.4}-alpine
ports:
- '6380:6379'
command: redis-server --slaveof redis 6379 /usr/local/etc/redis/redis.conf
command: redis-server /usr/local/etc/redis/redis.conf
depends_on:
- redis
volumes:
- ./docker/redis/redis.conf:/usr/local/etc/redis/redis.conf
- ./docker/redis/redis-slave.conf:/usr/local/etc/redis/redis.conf
- redis-slave:/data
networks:
- primary
Expand Down
2 changes: 2 additions & 0 deletions docker/redis/redis-slave.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
slaveof redis 6379
masterauth test
11 changes: 7 additions & 4 deletions router/cmd/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ func traceConfig(cfg *config.Telemetry) *trace.Config {
}

return &trace.Config{
Enabled: cfg.Tracing.Enabled,
Name: cfg.ServiceName,
Version: core.Version,
Sampler: cfg.Tracing.SamplingRate,
Enabled: cfg.Tracing.Enabled,
Name: cfg.ServiceName,
Version: core.Version,
Sampler: cfg.Tracing.SamplingRate,
ExportGraphQLVariables: trace.ExportGraphQLVariables{
Enabled: cfg.Tracing.ExportGraphQLVariables,
},
Exporters: exporters,
Propagators: propagators,
}
Expand Down
4 changes: 2 additions & 2 deletions router/core/graphql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
requestLogger := h.log.With(logging.WithRequestID(middleware.GetReqID(r.Context())))
operationCtx := getOperationContext(r.Context())

executionContext, graphqlExecutionSpan := h.tracer.Start(r.Context(), "Operation - Execution",
trace.WithSpanKind(trace.SpanKindServer),
executionContext, graphqlExecutionSpan := h.tracer.Start(r.Context(), "Operation - Execute",
trace.WithSpanKind(trace.SpanKindInternal),
)
defer graphqlExecutionSpan.End()

Expand Down
206 changes: 154 additions & 52 deletions router/core/graphql_prehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"crypto/ecdsa"
"errors"
"fmt"
"github.com/golang-jwt/jwt/v5"
"github.com/wundergraph/cosmo/router/pkg/logging"
"github.com/wundergraph/cosmo/router/pkg/otel"
rtrace "github.com/wundergraph/cosmo/router/pkg/trace"
"go.opentelemetry.io/otel/attribute"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"net/http"
"strconv"
"sync"
"time"

"github.com/go-chi/chi/middleware"
"github.com/golang-jwt/jwt/v5"
"go.uber.org/zap"

"github.com/wundergraph/cosmo/router/internal/cdn"
Expand All @@ -29,21 +31,22 @@ type PreHandlerOptions struct {
Logger *zap.Logger
Executor *Executor
Metrics RouterMetrics
Parser *OperationParser
OperationProcessor *OperationProcessor
Planner *OperationPlanner
AccessController *AccessController
DevelopmentMode bool
RouterPublicKey *ecdsa.PublicKey
EnableRequestTracing bool
TracerProvider *sdktrace.TracerProvider
FlushTelemetryAfterResponse bool
TraceExportVariables bool
}

type PreHandler struct {
log *zap.Logger
executor *Executor
metrics RouterMetrics
parser *OperationParser
operationProcessor *OperationProcessor
planner *OperationPlanner
accessController *AccessController
developmentMode bool
Expand All @@ -52,21 +55,24 @@ type PreHandler struct {
tracerProvider *sdktrace.TracerProvider
flushTelemetryAfterResponse bool
tracer trace.Tracer
traceExportVariables bool
}

func NewPreHandler(opts *PreHandlerOptions) *PreHandler {
return &PreHandler{
log: opts.Logger,
executor: opts.Executor,
metrics: opts.Metrics,
parser: opts.Parser,
operationProcessor: opts.OperationProcessor,
planner: opts.Planner,
accessController: opts.AccessController,
routerPublicKey: opts.RouterPublicKey,
developmentMode: opts.DevelopmentMode,
enableRequestTracing: opts.EnableRequestTracing,
flushTelemetryAfterResponse: opts.FlushTelemetryAfterResponse,
tracerProvider: opts.TracerProvider,
traceExportVariables: opts.TraceExportVariables,

tracer: opts.TracerProvider.Tracer(
"wundergraph/cosmo/router/pre_handler",
trace.WithInstrumentationVersion("0.0.1"),
Expand Down Expand Up @@ -98,9 +104,20 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
tracePlanStart int64
)

routerSpan := trace.SpanFromContext(r.Context())

clientInfo := NewClientInfoFromRequest(r)
baseAttributeValues := []attribute.KeyValue{
otel.WgClientName.String(clientInfo.Name),
otel.WgClientVersion.String(clientInfo.Version),
otel.WgOperationProtocol.String(OperationProtocolHTTP.String()),
}

metrics := h.metrics.StartOperation(clientInfo, requestLogger, r.ContentLength)

routerSpan.SetAttributes(baseAttributeValues...)
metrics.AddAttributes(baseAttributeValues...)

if h.flushTelemetryAfterResponse {
defer h.flushMetrics(r.Context(), requestLogger)
}
Expand All @@ -114,14 +131,116 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
buf := pool.GetBytesBuffer()
defer pool.PutBytesBuffer(buf)

body, err := h.parser.ReadBody(r.Context(), buf, r.Body)
body, err := h.operationProcessor.ReadBody(buf, r.Body)
if err != nil {
finalErr = err
requestLogger.Error(err.Error())
writeRequestErrors(r.Context(), http.StatusBadRequest, graphql.RequestErrorsFromError(err), w, requestLogger)
return
}

/**
* Parse the operation
*/

engineParseCtx, engineParseSpan := h.tracer.Start(r.Context(), "Operation - Parse",
trace.WithSpanKind(trace.SpanKindInternal),
)

operationKit, err := h.operationProcessor.NewKit(body)
defer operationKit.Free()

if err != nil {
finalErr = err

rtrace.AttachErrToSpan(engineParseSpan, err)
engineParseSpan.End()

h.writeOperationError(engineParseCtx, w, requestLogger, err)
return
}

err = operationKit.Parse(r.Context(), clientInfo, requestLogger)
if err != nil {
finalErr = err

rtrace.AttachErrToSpan(engineParseSpan, err)
engineParseSpan.End()

h.writeOperationError(engineParseCtx, w, requestLogger, err)
return
}

engineParseSpan.End()

// Set the router span name after we have the operation name
routerSpan.SetName(GetSpanName(operationKit.parsedOperation.Name, operationKit.parsedOperation.Type))

baseAttributeValues = []attribute.KeyValue{
otel.WgOperationName.String(operationKit.parsedOperation.Name),
otel.WgOperationType.String(operationKit.parsedOperation.Type),
}
if operationKit.parsedOperation.PersistedID != "" {
baseAttributeValues = append(baseAttributeValues, otel.WgOperationPersistedID.String(operationKit.parsedOperation.PersistedID))
}

routerSpan.SetAttributes(baseAttributeValues...)
metrics.AddAttributes(baseAttributeValues...)

/**
* Normalize the operation
*/

engineNormalizeCtx, engineNormalizeSpan := h.tracer.Start(r.Context(), "Operation - Normalize",
trace.WithSpanKind(trace.SpanKindInternal),
)

err = operationKit.Normalize()
if err != nil {
finalErr = err

rtrace.AttachErrToSpan(engineNormalizeSpan, err)
engineNormalizeSpan.End()

h.writeOperationError(engineNormalizeCtx, w, requestLogger, err)
return
}

engineNormalizeSpan.End()

if h.traceExportVariables {
// At this stage the variables are normalized
routerSpan.SetAttributes(otel.WgOperationVariables.String(string(operationKit.parsedOperation.Variables)))
}

baseAttributeValues = []attribute.KeyValue{
otel.WgOperationHash.String(strconv.FormatUint(operationKit.parsedOperation.ID, 10)),
}

// Set the normalized operation as soon as we have it
routerSpan.SetAttributes(otel.WgOperationContent.String(operationKit.parsedOperation.NormalizedRepresentation))
routerSpan.SetAttributes(baseAttributeValues...)

metrics.AddAttributes(baseAttributeValues...)

/**
* Validate the operation
*/

engineValidateCtx, engineValidateSpan := h.tracer.Start(r.Context(), "Operation - Validate",
trace.WithSpanKind(trace.SpanKindInternal),
)
err = operationKit.Validate()
if err != nil {
finalErr = err

rtrace.AttachErrToSpan(engineValidateSpan, err)
engineValidateSpan.End()

h.writeOperationError(engineValidateCtx, w, requestLogger, err)
return
}

if h.enableRequestTracing {
if clientInfo.WGRequestToken != "" && h.routerPublicKey != nil {
_, err = jwt.Parse(clientInfo.WGRequestToken, func(token *jwt.Token) (interface{}, error) {
Expand Down Expand Up @@ -150,6 +269,34 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
r = r.WithContext(resolve.SetTraceStart(r.Context(), traceOptions.EnablePredictableDebugTimings))
}

engineValidateSpan.End()

/**
* Plan the operation
*/

enginePlanSpanCtx, enginePlanSpan := h.tracer.Start(r.Context(), "Operation - Plan",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(otel.WgEngineRequestTracingEnabled.Bool(traceOptions.Enable)),
)

opContext, err := h.planner.Plan(operationKit.parsedOperation, clientInfo, OperationProtocolHTTP, traceOptions)

if err != nil {
finalErr = err

rtrace.AttachErrToSpan(enginePlanSpan, err)
enginePlanSpan.End()

requestLogger.Error("failed to plan operation", zap.Error(err))
h.writeOperationError(enginePlanSpanCtx, w, requestLogger, err)
return
}

enginePlanSpan.SetAttributes(otel.WgEnginePlanCacheHit.Bool(opContext.planCacheHit))

enginePlanSpan.End()

// If we have authenticators, we try to authenticate the request
if len(h.accessController.authenticators) > 0 {
authenticateSpanCtx, authenticateSpan := h.tracer.Start(r.Context(), "Authenticate",
Expand All @@ -173,58 +320,13 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
r = validatedReq
}

engineParseValidateCtx, engineParseValidateSpan := h.tracer.Start(r.Context(), "Operation - Parse and Validate",
trace.WithSpanKind(trace.SpanKindServer),
)

operation, err := h.parser.Parse(r.Context(), clientInfo, body, requestLogger)
if err != nil {
finalErr = err

rtrace.AttachErrToSpan(engineParseValidateSpan, err)
engineParseValidateSpan.End()

h.writeOperationError(engineParseValidateCtx, w, requestLogger, err)
return
}

engineParseValidateSpan.End()

// If the request has a query parameter wg_trace=true we skip the cache
// and always plan the operation
// this allows us to "write" to the plan
if !traceOptions.ExcludePlannerStats {
tracePlanStart = resolve.GetDurationNanoSinceTraceStart(r.Context())
}

enginePlanSpanCtx, enginePlanSpan := h.tracer.Start(r.Context(), "Operation - Planning",
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(otel.WgEngineRequestTracingEnabled.Bool(traceOptions.Enable)),
)

opContext, err := h.planner.Plan(operation, clientInfo, OperationProtocolHTTP, traceOptions)

commonAttributeValues := commonMetricAttributes(opContext)
metrics.AddAttributes(commonAttributeValues...)

// The attributes have to be added on the root router span
initializeSpan(r.Context(), operation, commonAttributeValues)

if err != nil {
finalErr = err

rtrace.AttachErrToSpan(enginePlanSpan, err)
enginePlanSpan.End()

requestLogger.Error("failed to plan operation", zap.Error(err))
h.writeOperationError(enginePlanSpanCtx, w, requestLogger, err)
return
}

enginePlanSpan.SetAttributes(otel.WgEnginePlanCacheHit.Bool(opContext.planCacheHit))

enginePlanSpan.End()

if !traceOptions.ExcludePlannerStats {
planningTime := resolve.GetDurationNanoSinceTraceStart(r.Context()) - tracePlanStart
resolve.SetPlannerStats(r.Context(), resolve.PlannerStats{
Expand Down Expand Up @@ -255,9 +357,9 @@ func (h *PreHandler) Handler(next http.Handler) http.Handler {
finalErr = requestContext.error

// Mark the root span of the router as failed, so we can easily identify failed requests
span := trace.SpanFromContext(newReq.Context())
routerSpan = trace.SpanFromContext(newReq.Context())
if finalErr != nil {
rtrace.AttachErrToSpan(span, finalErr)
rtrace.AttachErrToSpan(routerSpan, finalErr)
}
})
}
Expand Down
Loading

0 comments on commit 889d06c

Please sign in to comment.