Skip to content

Commit

Permalink
feat(*): add custom task handler
Browse files Browse the repository at this point in the history
Signed-off-by: Soren Yang <lsytj0413@gmail.com>
  • Loading branch information
lsytj0413 committed Jan 16, 2024
1 parent 8d47194 commit 133a457
Show file tree
Hide file tree
Showing 18 changed files with 955 additions and 68 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<!-- <p align="center"><img src="" alt="" height="100px"></p> -->
<p align="center"><img src="doc/fornext.png" alt="" height="100px"></p>

<div align="center">
<a href="https://codecov.io/gh/fornext-io/fornext" >
Expand Down
Binary file added doc/fornext.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 7 additions & 1 deletion examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"path"

Expand All @@ -24,7 +25,12 @@ func main() {
panic(err)
}

e := executor.NewExecutor(&sm)
e := executor.NewExecutor(&sm, map[string]func(*executor.CreateTaskCommand) []byte{
"task1": func(ctc *executor.CreateTaskCommand) []byte {
slog.Info("start task", slog.Any("CreateTaskCommand", ctc))
return ctc.Input
},
})
e.Run([]byte(`{"name": "123", "data": 10}`))
ec := e.WaitExecutionDone()
fmt.Printf("id: %v\n", ec.ID)
Expand Down
20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,27 @@ require (
)

require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
github.com/cockroachdb/pebble v1.0.0 // indirect
github.com/cockroachdb/redact v1.0.8 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
Expand Down
615 changes: 615 additions & 0 deletions go.sum

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions pkg/executor/choice_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *choiceStateProcessor) StartState(ctx context.Context, cmd *StartStateCo
return nil
}

func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, output interface{}, rules []fsl.ChoiceRule) bool {
func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, output any, rules []fsl.ChoiceRule) bool {
for _, rule := range rules {
if !p.isMatchedChoiceRule(ctx, output, rule) {
return false
Expand All @@ -50,7 +50,7 @@ func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, outpu
return true
}

func (p *choiceStateProcessor) isMatchedAnyChoiceRule(ctx context.Context, output interface{}, rules []fsl.ChoiceRule) bool {
func (p *choiceStateProcessor) isMatchedAnyChoiceRule(ctx context.Context, output any, rules []fsl.ChoiceRule) bool {
for _, rule := range rules {
if p.isMatchedChoiceRule(ctx, output, rule) {
return true
Expand Down Expand Up @@ -293,7 +293,7 @@ func (p *choiceStateProcessor) isMatchedChoiceRule(ctx context.Context, output i
case rule.IsNull != nil:
return variableIsNull(*rule.Variable, output)
case rule.IsPresent != nil:
return variableIsPresent(*rule.Variable, output)
return variableIsPresent(*rule.Variable, output) == *rule.IsPresent
case rule.IsNumeric != nil:
return variableTypeMatch[float64](*rule.Variable, output)
case rule.IsString != nil:
Expand Down Expand Up @@ -331,8 +331,11 @@ func (p *choiceStateProcessor) CompleteState(ctx context.Context, cmd *CompleteS
slog.InfoContext(ctx, "complete choice state",
slog.String("ActivityID", cmd.ActivityID),
)
at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}

at := e.activityContextes[cmd.ActivityID]
// TODO: found the next state
next, err := p.findNextState(ctx, cmd.Output, s)
if err != nil {
Expand Down
157 changes: 110 additions & 47 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,38 @@ type Executor struct {
sm *fsl.StateMachine
ctx *ExecutionContext

activityContextes map[string]*ActivityContext
branchContextes map[string]*BranchContext
iterationContextes map[string]*IterationContext
taskContextes map[string]*TaskContext
// iterationContextes map[string]*IterationContext
store *Storage

done chan interface{}
ev chan interface{}

c *hybridLogicalClock
c *hybridLogicalClock
taskHandlers map[string]func(cmd *CreateTaskCommand) []byte
}

// NewExecutor ...
func NewExecutor(sm *fsl.StateMachine) *Executor {
func NewExecutor(sm *fsl.StateMachine, handlers map[string]func(*CreateTaskCommand) []byte) *Executor {
store, err := NewStorage("./fornext")
if err != nil {
panic(err)
}

v := &Executor{
sm: sm,
done: make(chan interface{}),
ev: make(chan interface{}, 100),
activityContextes: map[string]*ActivityContext{},
branchContextes: map[string]*BranchContext{},
iterationContextes: map[string]*IterationContext{},
taskContextes: map[string]*TaskContext{},
c: newHybridLogicalClock(),
sm: sm,
done: make(chan interface{}),
ev: make(chan interface{}, 100),
// iterationContextes: map[string]*IterationContext{},
store: store,
c: newHybridLogicalClock(),
taskHandlers: map[string]func(*CreateTaskCommand) []byte{},
}
v.taskHandlers["__sleep"] = v.executeSleepTask

for k, vv := range handlers {
v.taskHandlers[k] = vv
}

return v
}

Expand Down Expand Up @@ -199,13 +208,17 @@ func (e *Executor) processStartStateCommand(cmd *StartStateCommand) {
ActivityID: cmd.ActivityID,
}

e.activityContextes[cmd.ActivityID] = &ActivityContext{
err := Set(context.Background(), e.store, cmd.ActivityID, &ActivityContext{
ID: cmd.ActivityID,
StateName: cmd.StateName,
ParentBranchID: cmd.ParentBranchID,
ParentIterationID: cmd.ParentIterationID,
Input: cmd.Input,
})
if err != nil {
panic(err)
}

state := findState(e.sm, cmd.StateName)
switch sss := state.(type) {
case *fsl.TaskState:
Expand Down Expand Up @@ -254,28 +267,38 @@ func (e *Executor) processStartStateCommand(cmd *StartStateCommand) {
}
}

func (e *Executor) executeTask(cmd *CreateTaskCommand) {
go func() {
h := e.taskHandlers[cmd.Resource]
if h == nil {
panic(fmt.Errorf("cannot find handler for task %v", cmd.Resource))
}

output := h(cmd)
e.ev <- &CompleteTaskCommand{
TaskID: cmd.ID,
Output: output,
}
}()
}

func (e *Executor) executeSleepTask(cmd *CreateTaskCommand) []byte {
time.Sleep(5 * time.Second)
return cmd.Input
}

func (e *Executor) processCreateTaskCommand(cmd *CreateTaskCommand) {
e.ev <- &TaskCreatedEvent{
ID: cmd.ID,
}
e.taskContextes[cmd.ID] = &TaskContext{
err := Set(context.Background(), e.store, cmd.ID, &TaskContext{
ActivityID: cmd.ActivityID,
})
if err != nil {
panic(err)
}

go func() {
if cmd.Resource == "__sleep" {
time.Sleep(5 * time.Second)
e.ev <- &CompleteTaskCommand{
TaskID: cmd.ID,
Output: cmd.Input,
}
} else {
e.ev <- &CompleteTaskCommand{
TaskID: cmd.ID,
Output: cmd.Input,
}
}
}()
e.executeTask(cmd)
}

func (e *Executor) processActivateTaskCommand(_ *ActivateTaskCommand) {
Expand All @@ -285,7 +308,10 @@ func (e *Executor) processActivateTaskCommand(_ *ActivateTaskCommand) {
func (e *Executor) processCompleteTaskCommand(cmd *CompleteTaskCommand) {
fmt.Printf("complete task command: %+v\n", cmd)

taskCtx := e.taskContextes[cmd.TaskID]
taskCtx, err := Get[TaskContext](context.Background(), e.store, cmd.TaskID)
if err != nil {
panic(err)
}

e.ev <- &TaskCompletedEvent{
TaskID: cmd.TaskID,
Expand All @@ -296,21 +322,18 @@ func (e *Executor) processCompleteTaskCommand(cmd *CompleteTaskCommand) {
}
}

// 本地状态在什么时候修改?
// 1. 发送消息之前,那么可能出现消息没有发送成功,需要回滚(其实不会出现这个问题,因为 command 是 determin 的)
// 2. 关键是什么时候回复给客户?如果是 determin 的,那么无所谓
// 3. 收到一个 cmd,然后记录到本地存储中(标记为还没有处理),然后发送给 scheduler + processor,调度(不同的 execution 可以并行)
// 然后执行并记录本地状态,发送新的 command,删除旧的 command,处理下一个(不用等待发送成功?应该无需,可以直接开始处理下一个,不然要等待发送完成才能进行下一个的处理,延迟会很高;而且这里不等待是没有问题的,因为 cmd 是 determin 的,假设有新的 leader 启动,得到的状态也会是一致的),但是这里删除、修改是需要 transaction 的,避免修改成功,删除没成功,导致重启时 cmd 的重复处理
// 另外,可能出现本地应用成功,然后重启重新成为 leader,但是 cmd & event 没有发送成功的情况,则需要将要发送的 cmd 保存到 kv,然后重新成为 leader 的时候重发
// 对于 follower,则在收到 event 之后删除旧的 command
func (e *Executor) processCompleteStateCommand(cmd *CompleteStateCommand) {
slog.InfoContext(context.Background(), "complete state command", slog.Any("Command", cmd), slog.String("Output", string(cmd.Output)))

e.ev <- &StateCompletedEvent{
ActivityID: cmd.ActivityID,
}
state := findState(e.sm, e.activityContextes[cmd.ActivityID].StateName)
act, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}

state := findState(e.sm, act.StateName)
switch sss := state.(type) {
case *fsl.TaskState:
err := (&taskStateProcessor{}).CompleteState(context.Background(), cmd, e, sss)
Expand Down Expand Up @@ -361,13 +384,20 @@ func (e *Executor) processStartBranchCommand(cmd *StartBranchCommand) {
e.ev <- &BranchStartedEvent{
BranchID: cmd.BranchID,
}
e.branchContextes[cmd.BranchID] = &BranchContext{
err := Set(context.Background(), e.store, cmd.BranchID, &BranchContext{
BranchID: cmd.BranchID,
Index: cmd.Index,
ActivityID: cmd.ActivityID,
Input: cmd.Input,
})
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}
at := e.activityContextes[cmd.ActivityID]

state := findState(e.sm, at.StateName).(*fsl.ParallelState)
e.ev <- &StartStateCommand{
Expand All @@ -383,10 +413,23 @@ func (e *Executor) processCompleteBranchCommand(cmd *CompleteBranchCommand) {
e.ev <- &BranchCompletedEvent{
BranchID: cmd.BranchID,
}
at := e.activityContextes[e.branchContextes[cmd.BranchID].ActivityID]
branchCtx, err := Get[BranchContext](context.Background(), e.store, cmd.BranchID)
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, branchCtx.ActivityID)
if err != nil {
panic(err)
}

at.BranchStatus.Done++
// 此处还需要考虑顺序问题
at.BranchStatus.Output = append(at.BranchStatus.Output, cmd.Output)
err = Set(context.Background(), e.store, branchCtx.ActivityID, at)
if err != nil {
panic(err)
}

if at.BranchStatus.Done == at.BranchStatus.Max {
// 拼接所有的 output 为列表
Expand All @@ -400,7 +443,7 @@ func (e *Executor) processCompleteBranchCommand(cmd *CompleteBranchCommand) {
}

e.ev <- &CompleteStateCommand{
ActivityID: e.branchContextes[cmd.BranchID].ActivityID,
ActivityID: branchCtx.ActivityID,
Output: outputBytes,
}
}
Expand All @@ -410,12 +453,19 @@ func (e *Executor) processStartIterationCommand(cmd *StartIterationCommand) {
e.ev <- &IterationStartedEvent{
IterationID: cmd.IterationID,
}
e.iterationContextes[cmd.IterationID] = &IterationContext{
err := Set(context.Background(), e.store, cmd.IterationID, &IterationContext{
IterationID: cmd.IterationID,
Index: cmd.Index,
ActivityID: cmd.ActivityID,
})
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}
at := e.activityContextes[cmd.ActivityID]

state := findState(e.sm, at.StateName).(*fsl.MapState)
e.ev <- &StartStateCommand{
Expand All @@ -431,9 +481,22 @@ func (e *Executor) processCompleteIterationCommand(cmd *CompleteIterationCommand
e.ev <- &IterationCompletedEvent{
IterationID: cmd.IterationID,
}
at := e.activityContextes[e.iterationContextes[cmd.IterationID].ActivityID]
iterCtx, err := Get[IterationContext](context.Background(), e.store, cmd.IterationID)
if err != nil {
panic(err)
}

at, err := Get[ActivityContext](context.Background(), e.store, iterCtx.ActivityID)
if err != nil {
panic(err)
}

at.IterationStatus.Done++
at.IterationStatus.Output = append(at.IterationStatus.Output, cmd.Output)
err = Set(context.Background(), e.store, iterCtx.ActivityID, at)
if err != nil {
panic(err)
}

if at.IterationStatus.Done == at.IterationStatus.Max {
// 拼接所有的 output 为列表
Expand All @@ -447,7 +510,7 @@ func (e *Executor) processCompleteIterationCommand(cmd *CompleteIterationCommand
}

e.ev <- &CompleteStateCommand{
ActivityID: e.iterationContextes[cmd.IterationID].ActivityID,
ActivityID: iterCtx.ActivityID,
Output: outputBytes,
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/fail_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func (p *failStateProcessor) CompleteState(ctx context.Context, cmd *CompleteSta
)

// 1. find the next state
at := e.activityContextes[cmd.ActivityID]
at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}

switch {
case at.ParentBranchID != nil:
Expand Down
Loading

0 comments on commit 133a457

Please sign in to comment.