Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix child WF ID generation #1803

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
8 changes: 6 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error),
) {
if params.WorkflowID == "" {
params.WorkflowID = wc.workflowInfo.WorkflowExecution.RunID + "_" + wc.GenerateSequenceID()
params.WorkflowID = wc.workflowInfo.childWorkflowIDSeed + "_" + wc.GenerateSequenceID()
}
memo, err := getWorkflowMemo(params.Memo, wc.dataConverter)
if err != nil {
Expand Down Expand Up @@ -1220,7 +1220,11 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
case enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
// No Operation
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
// No Operation
// update the childWorkflowIDSeed if the workflow was reset at this point.
attr := event.GetWorkflowTaskFailedEventAttributes()
if attr.GetCause() == enumspb.WORKFLOW_TASK_FAILED_CAUSE_RESET_WORKFLOW {
weh.workflowInfo.childWorkflowIDSeed = attr.GetNewRunId()
}
case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
// No Operation
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
Expand Down
7 changes: 5 additions & 2 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,7 @@ OrderEvents:
break OrderEvents
}
case enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Quinn-With-Two-Ns - can you think of any ramifications for now putting workflow task failed events in taskEvents.events where we hadn't before?

enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT:
// Skip
default:
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
Expand Down Expand Up @@ -744,6 +743,10 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
RetryPolicy: convertFromPBRetryPolicy(attributes.RetryPolicy),
// Use the original execution run ID from the start event as the initial seed.
// Original execution run ID stays the same for the entire chain of workflow resets.
// This helps us keep child workflow IDs consistent up until a reset-point is encountered.
childWorkflowIDSeed: attributes.GetOriginalExecutionRunId(),
}

return newWorkflowExecutionContext(workflowInfo, wth), nil
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_task_handlers_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommands() {
createTestEventWorkflowTaskStarted(3),
{
EventId: 4,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
gow marked this conversation as resolved.
Show resolved Hide resolved
},
{
EventId: 5,
Expand Down Expand Up @@ -278,7 +278,7 @@ func (s *PollLayerInterfacesTestSuite) TestMessageCommands() {
createTestEventWorkflowTaskStarted(3),
{
EventId: 4,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
},
createTestEventWorkflowTaskScheduled(5, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(6),
Expand Down
2 changes: 2 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,8 @@ type WorkflowInfo struct {
continueAsNewSuggested bool
currentHistorySize int
currentHistoryLength int
// childWorkflowIDSeed is used to generate unique child workflow IDs
childWorkflowIDSeed string
}

// UpdateInfo information about a currently running update
Expand Down
95 changes: 95 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,101 @@ func (ts *IntegrationTestSuite) TestResetWorkflowExecution() {
ts.Equal(originalResult, newResult)
}

// TestResetWorkflowExecutionWithChildren tests the behavior of child workflow ID generation when a workflow with children is reset.
// It repeatedly resets the workflow at different points in its execution and verifies that the child workflow IDs are generated correctly.
func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithChildren() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great test

wfID := "reset-workflow-with-children"
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

// Start a workflow with 2 children.
options := ts.startWorkflowOptions(wfID)
run, err := ts.client.ExecuteWorkflow(ctx, options, ts.workflows.WorkflowWithChildren)
ts.NoError(err)
var originalResult string
err = run.Get(ctx, &originalResult)
ts.NoError(err)

// save child init childIDs for later comparison.
childIDs := ts.getChildInitEventsFromHistory(ctx, wfID, run.GetRunID())
ts.Len(childIDs, 2)
child1IDBeforeReset := childIDs[0]
child2IDBeforeReset := childIDs[1]

resetRequest := &workflowservice.ResetWorkflowExecutionRequest{
Namespace: ts.config.Namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: wfID,
RunId: run.GetRunID(),
},
Reason: "integration test",
}
// (reset #1) - resetting the workflow execution before both child workflows are started.
resetRequest.RequestId = "reset-request-1"
resetRequest.WorkflowTaskFinishEventId = 4
resp, err := ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
ts.NoError(err)
// Wait for the new run to complete.
var resultAfterReset1 string
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset1)
ts.NoError(err)
ts.Equal(originalResult, resultAfterReset1)

childIDs = ts.getChildInitEventsFromHistory(ctx, wfID, resp.GetRunId())
ts.Len(childIDs, 2)
// both child workflow IDs should be different after reset.
ts.NotEqual(child1IDBeforeReset, childIDs[0])
ts.NotEqual(child2IDBeforeReset, childIDs[1])

// (reset #2) - resetting the workflow execution after child-1 but before child-2
resetRequest.RequestId = "reset-request-2"
resetRequest.WorkflowTaskFinishEventId = 13
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
ts.NoError(err)
// Wait for the new run to complete.
var resultAfterReset2 string
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset2)
ts.NoError(err)
ts.Equal(originalResult, resultAfterReset2)

childIDs = ts.getChildInitEventsFromHistory(ctx, wfID, resp.GetRunId())
ts.Len(childIDs, 2)
ts.Equal(child1IDBeforeReset, childIDs[0]) // child-1 should be the same as before reset.
ts.NotEqual(child2IDBeforeReset, childIDs[1]) // child-2 should be different after reset.

// (reset #3) - resetting the workflow execution after child-1 but before child-2
resetRequest.RequestId = "reset-request-3"
resetRequest.WorkflowTaskFinishEventId = 22
resp, err = ts.client.ResetWorkflowExecution(context.Background(), resetRequest)
ts.NoError(err)
// Wait for the new run to complete.
var resultAfterReset3 string
err = ts.client.GetWorkflow(context.Background(), wfID, resp.GetRunId()).Get(ctx, &resultAfterReset3)
ts.NoError(err)
ts.Equal(originalResult, resultAfterReset3)

childIDs = ts.getChildInitEventsFromHistory(ctx, wfID, resp.GetRunId())
ts.Len(childIDs, 2)
// both child workflow IDs should be the same as before reset.
ts.Equal(child1IDBeforeReset, childIDs[0])
ts.Equal(child2IDBeforeReset, childIDs[1])
}

func (ts *IntegrationTestSuite) getChildInitEventsFromHistory(ctx context.Context, wfID string, runID string) []string {
iter := ts.client.GetWorkflowHistory(ctx, wfID, runID, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var childIDs []string
for iter.HasNext() {
event, err1 := iter.Next()
if err1 != nil {
break
}
if event.GetEventType() == enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED {
childIDs = append(childIDs, event.GetStartChildWorkflowExecutionInitiatedEventAttributes().GetWorkflowId())
}
}
return childIDs
}

func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithUpdate() {
ctx := context.Background()
wfId := "reset-workflow-execution-with-update"
Expand Down
18 changes: 18 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,24 @@ func (s *replayTestSuite) TestPartialReplayNonCommandEvent() {
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestResetWorkflowBeforeChildInit() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(ResetWorkflowWithChild)
// Verify we can replay workflow history containing a reset before StartChildWorkflowExecutionInitiated & ChildWorkflowExecutionCompleted events.
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "reset-workflow-before-child-init.json")
s.NoError(err)
require.NoError(s.T(), err)
}

func (s *replayTestSuite) TestResetWorkflowAfterChildComplete() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(ResetWorkflowWithChild)
// Verify we can replay workflow history containing a reset event after StartChildWorkflowExecutionInitiated & ChildWorkflowExecutionCompleted events.
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "reset-workflow-after-child-complete.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
Expand Down
Loading
Loading