Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(*): add custom task handler #6

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<!-- <p align="center"><img src="" alt="" height="100px"></p> -->
<p align="center"><img src="doc/fornext.png" alt="" height="100px"></p>

<div align="center">
<a href="https://codecov.io/gh/fornext-io/fornext" >
Expand Down
Binary file added doc/fornext.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 7 additions & 1 deletion examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"path"

Expand All @@ -24,7 +25,12 @@ func main() {
panic(err)
}

e := executor.NewExecutor(&sm)
e := executor.NewExecutor(&sm, map[string]func(*executor.CreateTaskCommand) []byte{
"task1": func(ctc *executor.CreateTaskCommand) []byte {
slog.Info("start task", slog.Any("CreateTaskCommand", ctc))
return ctc.Input
},
})
e.Run([]byte(`{"name": "123", "data": 10}`))
ec := e.WaitExecutionDone()
fmt.Printf("id: %v\n", ec.ID)
Expand Down
20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,27 @@ require (
)

require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
github.com/cockroachdb/pebble v1.0.0 // indirect
github.com/cockroachdb/redact v1.0.8 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
Expand Down
615 changes: 615 additions & 0 deletions go.sum

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions pkg/executor/choice_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *choiceStateProcessor) StartState(ctx context.Context, cmd *StartStateCo
return nil
}

func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, output interface{}, rules []fsl.ChoiceRule) bool {
func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, output any, rules []fsl.ChoiceRule) bool {
for _, rule := range rules {
if !p.isMatchedChoiceRule(ctx, output, rule) {
return false
Expand All @@ -50,7 +50,7 @@ func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, outpu
return true
}

func (p *choiceStateProcessor) isMatchedAnyChoiceRule(ctx context.Context, output interface{}, rules []fsl.ChoiceRule) bool {
func (p *choiceStateProcessor) isMatchedAnyChoiceRule(ctx context.Context, output any, rules []fsl.ChoiceRule) bool {
for _, rule := range rules {
if p.isMatchedChoiceRule(ctx, output, rule) {
return true
Expand Down Expand Up @@ -293,7 +293,7 @@ func (p *choiceStateProcessor) isMatchedChoiceRule(ctx context.Context, output i
case rule.IsNull != nil:
return variableIsNull(*rule.Variable, output)
case rule.IsPresent != nil:
return variableIsPresent(*rule.Variable, output)
return variableIsPresent(*rule.Variable, output) == *rule.IsPresent
case rule.IsNumeric != nil:
return variableTypeMatch[float64](*rule.Variable, output)
case rule.IsString != nil:
Expand Down Expand Up @@ -331,8 +331,11 @@ func (p *choiceStateProcessor) CompleteState(ctx context.Context, cmd *CompleteS
slog.InfoContext(ctx, "complete choice state",
slog.String("ActivityID", cmd.ActivityID),
)
at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}

at := e.activityContextes[cmd.ActivityID]
// TODO: found the next state
next, err := p.findNextState(ctx, cmd.Output, s)
if err != nil {
Expand Down
157 changes: 110 additions & 47 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,38 @@ type Executor struct {
sm *fsl.StateMachine
ctx *ExecutionContext

activityContextes map[string]*ActivityContext
branchContextes map[string]*BranchContext
iterationContextes map[string]*IterationContext
taskContextes map[string]*TaskContext
// iterationContextes map[string]*IterationContext
store *Storage

done chan interface{}
ev chan interface{}

c *hybridLogicalClock
c *hybridLogicalClock
taskHandlers map[string]func(cmd *CreateTaskCommand) []byte
}

// NewExecutor ...
func NewExecutor(sm *fsl.StateMachine) *Executor {
func NewExecutor(sm *fsl.StateMachine, handlers map[string]func(*CreateTaskCommand) []byte) *Executor {
store, err := NewStorage("./fornext")
if err != nil {
panic(err)
}

v := &Executor{
sm: sm,
done: make(chan interface{}),
ev: make(chan interface{}, 100),
activityContextes: map[string]*ActivityContext{},
branchContextes: map[string]*BranchContext{},
iterationContextes: map[string]*IterationContext{},
taskContextes: map[string]*TaskContext{},
c: newHybridLogicalClock(),
sm: sm,
done: make(chan interface{}),
ev: make(chan interface{}, 100),
// iterationContextes: map[string]*IterationContext{},
store: store,
c: newHybridLogicalClock(),
taskHandlers: map[string]func(*CreateTaskCommand) []byte{},
}
v.taskHandlers["__sleep"] = v.executeSleepTask

for k, vv := range handlers {
v.taskHandlers[k] = vv
}

return v
}

Expand Down Expand Up @@ -199,13 +208,17 @@ func (e *Executor) processStartStateCommand(cmd *StartStateCommand) {
ActivityID: cmd.ActivityID,
}

e.activityContextes[cmd.ActivityID] = &ActivityContext{
err := Set(context.Background(), e.store, cmd.ActivityID, &ActivityContext{
ID: cmd.ActivityID,
StateName: cmd.StateName,
ParentBranchID: cmd.ParentBranchID,
ParentIterationID: cmd.ParentIterationID,
Input: cmd.Input,
})
if err != nil {
panic(err)
}

state := findState(e.sm, cmd.StateName)
switch sss := state.(type) {
case *fsl.TaskState:
Expand Down Expand Up @@ -254,28 +267,38 @@ func (e *Executor) processStartStateCommand(cmd *StartStateCommand) {
}
}

func (e *Executor) executeTask(cmd *CreateTaskCommand) {
go func() {
h := e.taskHandlers[cmd.Resource]
if h == nil {
panic(fmt.Errorf("cannot find handler for task %v", cmd.Resource))
}

output := h(cmd)
e.ev <- &CompleteTaskCommand{
TaskID: cmd.ID,
Output: output,
}
}()
}

func (e *Executor) executeSleepTask(cmd *CreateTaskCommand) []byte {
time.Sleep(5 * time.Second)
return cmd.Input
}

func (e *Executor) processCreateTaskCommand(cmd *CreateTaskCommand) {
e.ev <- &TaskCreatedEvent{
ID: cmd.ID,
}
e.taskContextes[cmd.ID] = &TaskContext{
err := Set(context.Background(), e.store, cmd.ID, &TaskContext{
ActivityID: cmd.ActivityID,
})
if err != nil {
panic(err)
}

go func() {
if cmd.Resource == "__sleep" {
time.Sleep(5 * time.Second)
e.ev <- &CompleteTaskCommand{
TaskID: cmd.ID,
Output: cmd.Input,
}
} else {
e.ev <- &CompleteTaskCommand{
TaskID: cmd.ID,
Output: cmd.Input,
}
}
}()
e.executeTask(cmd)
}

func (e *Executor) processActivateTaskCommand(_ *ActivateTaskCommand) {
Expand All @@ -285,7 +308,10 @@ func (e *Executor) processActivateTaskCommand(_ *ActivateTaskCommand) {
func (e *Executor) processCompleteTaskCommand(cmd *CompleteTaskCommand) {
fmt.Printf("complete task command: %+v\n", cmd)

taskCtx := e.taskContextes[cmd.TaskID]
taskCtx, err := Get[TaskContext](context.Background(), e.store, cmd.TaskID)
if err != nil {
panic(err)
}

e.ev <- &TaskCompletedEvent{
TaskID: cmd.TaskID,
Expand All @@ -296,21 +322,18 @@ func (e *Executor) processCompleteTaskCommand(cmd *CompleteTaskCommand) {
}
}

// 本地状态在什么时候修改?
// 1. 发送消息之前,那么可能出现消息没有发送成功,需要回滚(其实不会出现这个问题,因为 command 是 determin 的)
// 2. 关键是什么时候回复给客户?如果是 determin 的,那么无所谓
// 3. 收到一个 cmd,然后记录到本地存储中(标记为还没有处理),然后发送给 scheduler + processor,调度(不同的 execution 可以并行)
// 然后执行并记录本地状态,发送新的 command,删除旧的 command,处理下一个(不用等待发送成功?应该无需,可以直接开始处理下一个,不然要等待发送完成才能进行下一个的处理,延迟会很高;而且这里不等待是没有问题的,因为 cmd 是 determin 的,假设有新的 leader 启动,得到的状态也会是一致的),但是这里删除、修改是需要 transaction 的,避免修改成功,删除没成功,导致重启时 cmd 的重复处理
// 另外,可能出现本地应用成功,然后重启重新成为 leader,但是 cmd & event 没有发送成功的情况,则需要将要发送的 cmd 保存到 kv,然后重新成为 leader 的时候重发
// 对于 follower,则在收到 event 之后删除旧的 command
func (e *Executor) processCompleteStateCommand(cmd *CompleteStateCommand) {
slog.InfoContext(context.Background(), "complete state command", slog.Any("Command", cmd), slog.String("Output", string(cmd.Output)))

e.ev <- &StateCompletedEvent{
ActivityID: cmd.ActivityID,
}
state := findState(e.sm, e.activityContextes[cmd.ActivityID].StateName)
act, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}

state := findState(e.sm, act.StateName)
switch sss := state.(type) {
case *fsl.TaskState:
err := (&taskStateProcessor{}).CompleteState(context.Background(), cmd, e, sss)
Expand Down Expand Up @@ -361,13 +384,20 @@ func (e *Executor) processStartBranchCommand(cmd *StartBranchCommand) {
e.ev <- &BranchStartedEvent{
BranchID: cmd.BranchID,
}
e.branchContextes[cmd.BranchID] = &BranchContext{
err := Set(context.Background(), e.store, cmd.BranchID, &BranchContext{
BranchID: cmd.BranchID,
Index: cmd.Index,
ActivityID: cmd.ActivityID,
Input: cmd.Input,
})
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}
at := e.activityContextes[cmd.ActivityID]

state := findState(e.sm, at.StateName).(*fsl.ParallelState)
e.ev <- &StartStateCommand{
Expand All @@ -383,10 +413,23 @@ func (e *Executor) processCompleteBranchCommand(cmd *CompleteBranchCommand) {
e.ev <- &BranchCompletedEvent{
BranchID: cmd.BranchID,
}
at := e.activityContextes[e.branchContextes[cmd.BranchID].ActivityID]
branchCtx, err := Get[BranchContext](context.Background(), e.store, cmd.BranchID)
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, branchCtx.ActivityID)
if err != nil {
panic(err)
}

at.BranchStatus.Done++
// 此处还需要考虑顺序问题
at.BranchStatus.Output = append(at.BranchStatus.Output, cmd.Output)
err = Set(context.Background(), e.store, branchCtx.ActivityID, at)
if err != nil {
panic(err)
}

if at.BranchStatus.Done == at.BranchStatus.Max {
// 拼接所有的 output 为列表
Expand All @@ -400,7 +443,7 @@ func (e *Executor) processCompleteBranchCommand(cmd *CompleteBranchCommand) {
}

e.ev <- &CompleteStateCommand{
ActivityID: e.branchContextes[cmd.BranchID].ActivityID,
ActivityID: branchCtx.ActivityID,
Output: outputBytes,
}
}
Expand All @@ -410,12 +453,19 @@ func (e *Executor) processStartIterationCommand(cmd *StartIterationCommand) {
e.ev <- &IterationStartedEvent{
IterationID: cmd.IterationID,
}
e.iterationContextes[cmd.IterationID] = &IterationContext{
err := Set(context.Background(), e.store, cmd.IterationID, &IterationContext{
IterationID: cmd.IterationID,
Index: cmd.Index,
ActivityID: cmd.ActivityID,
})
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}
at := e.activityContextes[cmd.ActivityID]

state := findState(e.sm, at.StateName).(*fsl.MapState)
e.ev <- &StartStateCommand{
Expand All @@ -431,9 +481,22 @@ func (e *Executor) processCompleteIterationCommand(cmd *CompleteIterationCommand
e.ev <- &IterationCompletedEvent{
IterationID: cmd.IterationID,
}
at := e.activityContextes[e.iterationContextes[cmd.IterationID].ActivityID]
iterCtx, err := Get[IterationContext](context.Background(), e.store, cmd.IterationID)
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, iterCtx.ActivityID)
if err != nil {
panic(err)
}

at.IterationStatus.Done++
at.IterationStatus.Output = append(at.IterationStatus.Output, cmd.Output)
err = Set(context.Background(), e.store, iterCtx.ActivityID, at)
if err != nil {
panic(err)
}

if at.IterationStatus.Done == at.IterationStatus.Max {
// 拼接所有的 output 为列表
Expand All @@ -447,7 +510,7 @@ func (e *Executor) processCompleteIterationCommand(cmd *CompleteIterationCommand
}

e.ev <- &CompleteStateCommand{
ActivityID: e.iterationContextes[cmd.IterationID].ActivityID,
ActivityID: iterCtx.ActivityID,
Output: outputBytes,
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/fail_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func (p *failStateProcessor) CompleteState(ctx context.Context, cmd *CompleteSta
)

// 1. find the next state
at := e.activityContextes[cmd.ActivityID]
at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}

switch {
case at.ParentBranchID != nil:
Expand Down
Loading
Loading