Skip to content

Commit

Permalink
Merge pull request #365 from cschleiden/improve-tracing
Browse files Browse the repository at this point in the history
Improve tracing
  • Loading branch information
cschleiden authored Oct 27, 2024
2 parents 8dee177 + 8bfa7a6 commit fcc98ff
Show file tree
Hide file tree
Showing 61 changed files with 1,123 additions and 490 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: 1.21
go-version: 1.22
check-latest: true
cache: true

Expand Down
6 changes: 6 additions & 0 deletions backend/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const (

// Recorded result of a side-efect
EventType_SideEffectResult

// Distributed tracing span has been started
EventType_TraceStarted
)

func (et EventType) String() string {
Expand Down Expand Up @@ -102,6 +105,9 @@ func (et EventType) String() string {
case EventType_SideEffectResult:
return "SideEffectResult"

case EventType_TraceStarted:
return "TraceStarted"

default:
return "Unknown"
}
Expand Down
3 changes: 3 additions & 0 deletions backend/history/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func DeserializeAttributes(eventType EventType, attributes []byte) (attr interfa
case EventType_SideEffectResult:
attr = &SideEffectResultAttributes{}

case EventType_TraceStarted:
attr = &TraceStartedAttributes{}

case EventType_TimerScheduled:
attr = &TimerScheduledAttributes{}
case EventType_TimerFired:
Expand Down
7 changes: 7 additions & 0 deletions backend/history/span_started.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package history

import "github.com/cschleiden/go-workflows/backend/payload"

type TraceStartedAttributes struct {
SpanID payload.Payload `json:"spanID"`
}
2 changes: 2 additions & 0 deletions backend/history/subworkflow_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ type SubWorkflowScheduledAttributes struct {
Inputs []payload.Payload `json:"inputs,omitempty"`

Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`

WorkflowSpanID [8]byte `json:"workflow_span_id,omitempty"`
}
11 changes: 9 additions & 2 deletions backend/history/timer_fired.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package history

import "time"
import (
"time"

"github.com/cschleiden/go-workflows/internal/tracing"
)

type TimerFiredAttributes struct {
At time.Time `json:"at,omitempty"`
ScheduledAt time.Time `json:"scheduled_at,omitempty"`
At time.Time `json:"at,omitempty"`
Name string `json:"name,omitempty"`
TraceContext tracing.Context `json:"span_metadata,omitempty"`
}
3 changes: 2 additions & 1 deletion backend/history/timer_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ package history
import "time"

type TimerScheduledAttributes struct {
At time.Time `json:"at,omitempty"`
At time.Time `json:"at,omitempty"`
Name string `json:"name,omitempty"`
}
2 changes: 2 additions & 0 deletions backend/history/workflow_started.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ type ExecutionStartedAttributes struct {
Metadata *metadata.WorkflowMetadata `json:"metadata,omitempty"`

Inputs []payload.Payload `json:"inputs,omitempty"`

WorkflowSpanID [8]byte `json:"workflowSpanID,omitempty"`
}
7 changes: 4 additions & 3 deletions backend/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"github.com/cschleiden/go-workflows/backend/converter"
"github.com/cschleiden/go-workflows/backend/metrics"
mi "github.com/cschleiden/go-workflows/internal/metrics"
"github.com/cschleiden/go-workflows/internal/tracing"
"github.com/cschleiden/go-workflows/internal/propagators"
"github.com/cschleiden/go-workflows/workflow"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)

type Options struct {
Expand Down Expand Up @@ -51,10 +52,10 @@ var DefaultOptions Options = Options{

Logger: slog.Default(),
Metrics: mi.NewNoopMetricsClient(),
TracerProvider: trace.NewNoopTracerProvider(),
TracerProvider: noop.NewTracerProvider(),
Converter: converter.DefaultConverter,

ContextPropagators: []workflow.ContextPropagator{&tracing.TracingContextPropagator{}},
ContextPropagators: []workflow.ContextPropagator{&propagators.TracingContextPropagator{}},

RemoveContinuedAsNewInstances: false,
}
Expand Down
4 changes: 2 additions & 2 deletions backend/redis/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (rb *redisBackend) CancelWorkflowInstance(ctx context.Context, instance *co
}

// Cancel instance
if cmds, err := rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
if _, err := rb.rdb.Pipelined(ctx, func(p redis.Pipeliner) error {
return rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), instance, event)
}); err != nil {
fmt.Println(cmds)
// fmt.Println(cmds)
return fmt.Errorf("adding cancellation event to workflow instance: %w", err)
}

Expand Down
16 changes: 0 additions & 16 deletions backend/redis/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ import (

"github.com/cschleiden/go-workflows/backend"
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/internal/log"
"github.com/cschleiden/go-workflows/internal/tracing"
"github.com/cschleiden/go-workflows/workflow"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, event *history.Event) error {
Expand All @@ -30,18 +26,6 @@ func (rb *redisBackend) SignalWorkflow(ctx context.Context, instanceID string, e
return err
}

ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, instanceState.Metadata)
if err != nil {
rb.Options().Logger.Error("extracting tracing context", log.ErrorKey, err)
}

a := event.Attributes.(*history.SignalReceivedAttributes)
ctx, span := rb.Tracer().Start(ctx, fmt.Sprintf("SignalWorkflow: %s", a.Name), trace.WithAttributes(
attribute.String(log.InstanceIDKey, instanceID),
attribute.String(log.SignalNameKey, event.Attributes.(*history.SignalReceivedAttributes).Name),
))
defer span.End()

if _, err = rb.rdb.TxPipelined(ctx, func(p redis.Pipeliner) error {
if err := rb.addWorkflowInstanceEventP(ctx, p, workflow.Queue(instanceState.Queue), instanceState.Instance, event); err != nil {
return fmt.Errorf("adding event to stream: %w", err)
Expand Down
12 changes: 2 additions & 10 deletions backend/redis/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ import (
"github.com/cschleiden/go-workflows/backend/history"
"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/internal/log"
"github.com/cschleiden/go-workflows/internal/tracing"
"github.com/cschleiden/go-workflows/internal/propagators"
"github.com/cschleiden/go-workflows/internal/workflowerrors"
"github.com/cschleiden/go-workflows/workflow"
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func (rb *redisBackend) PrepareWorkflowQueues(ctx context.Context, queues []workflow.Queue) error {
Expand Down Expand Up @@ -301,17 +299,11 @@ func (rb *redisBackend) CompleteWorkflowTask(

if state == core.WorkflowInstanceStateFinished || state == core.WorkflowInstanceStateContinuedAsNew {
// Trace workflow completion
ctx, err = (&tracing.TracingContextPropagator{}).Extract(ctx, task.Metadata)
ctx, err = (&propagators.TracingContextPropagator{}).Extract(ctx, task.Metadata)
if err != nil {
rb.options.Logger.Error("extracting tracing context", log.ErrorKey, err)
}

_, span := rb.Tracer().Start(ctx, "WorkflowComplete",
trace.WithAttributes(
attribute.String(log.NamespaceKey+log.InstanceIDKey, task.WorkflowInstance.InstanceID),
))
span.End()

// Auto expiration
expiration := rb.options.AutoExpiration
if state == core.WorkflowInstanceStateContinuedAsNew && rb.options.AutoExpirationContinueAsNew > 0 {
Expand Down
1 change: 1 addition & 0 deletions backend/test/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
tests = append(tests, e2eQueueTests...)
tests = append(tests, e2eRemovalTests...)
tests = append(tests, e2eContinueAsNewTests...)
tests = append(tests, e2eTracingTests...)

run := func(suffix string, workerOptions worker.Options) {
for _, tt := range tests {
Expand Down
4 changes: 1 addition & 3 deletions backend/test/e2e_removal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ var e2eRemovalTests = []backendTest{
_, err := client.GetWorkflowResult[bool](ctx, c, workflowA, time.Second*10)
require.NoError(t, err)

now := time.Now()

for i := 0; i < 10; i++ {
time.Sleep(300 * time.Millisecond)

err = b.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(now))
err = b.RemoveWorkflowInstances(ctx, backend.RemoveFinishedBefore(time.Now()))
if errors.As(err, &backend.ErrNotSupported{}) {
t.Skip()
return
Expand Down
Loading

0 comments on commit fcc98ff

Please sign in to comment.