Skip to content

Commit

Permalink
add tests. improve doc
Browse files Browse the repository at this point in the history
  • Loading branch information
piaodazhu committed May 2, 2023
1 parent 573f184 commit ae36fea
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 104 deletions.
155 changes: 154 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,155 @@
# gotcc
A simple package for Task Concurrency Control for Go. It manages the synchronization relationship between subtasks and supports rollback.

🤖 `gotcc` is a Golang package for Task Concurrency Control. It allows you to define tasks, their dependencies, and the controller will run the tasks concurrently while respecting the dependencies.

Features of `gotcc`
- Automatic task concurrency control based on dependency declarations.
- Support dependency logic expressions: `not`, `and`, `or`, `xor` and any combination of them.
- Many-to-many result delivery between tasks.
- Support tasks rollback in case of any error.
- Support multiple error collection.

## Installation

```bash
go get github.com/piaodazhu/gotcc
```

## Usage

A simple usage:

```go
import "github.com/piaodazhu/gotcc"

// User-defined task function
func ExampleFunc1(args map[string]interface{}) (interface{}, error) {
fmt.Println(args["BIND"].(string))
return "DONE", nil
}
func ExampleFunc2(args map[string]interface{}) (interface{}, error) {
return args["BIND"].(string), nil
}

// User-defined undo function
func ExampleUndo(args map[string]interface{}) error {
fmt.Println("Undo > ", args["BIND"].(string))
return nil
}

func main() {
// 1. Create a new controller
controller := gotcc.NewTCController()

// 2. Add tasks to the controller
// TaskA: bind arguments with ExampleFunc1
taskA := controller.AddTask("taskA", ExampleFunc1, "BindArg-A")
// TaskB: like TaskA, but set undoFunction
taskB := controller.AddTask("taskB", ExampleFunc1, "BindArg-B").SetUndoFunc(ExampleUndo, true)
// TaskC: bind arguments with ExampleFunc2
taskC := controller.AddTask("taskC", ExampleFunc2, "BindArg-C")

// 3. Define dependencies
// B depend on A
taskB.SetDependency(taskC.NewDependencyExpr(taskA))
// C depend on A and B
taskC.SetDependency(gotcc.MakeAndExpr(taskC.NewDependencyExpr(taskA), taskC.NewDependencyExpr(taskB)))

// 4. Define termination (Important)
// set TaskC's finish as termination
controller.SetTermination(controller.NewTerminationExpr(taskC))

// 5. Run the tasks
result, err := controller.RunTasks()
if err != nil {
// get taskErrors: err.(ErrAborted).TaskErrors
// get undoErrors: err.(ErrAborted).UndoErrors
}

// 6. Will Print "BindArg-C"
fmt.Println(result["taskC"].(string))
}
```
Tasks will run concurrently, but taskB will not start until taskA completes, and taskC will not start until both taskA and taskB complete. But if taskC failed (return err!=nil), `ExampleUndo("BindArg-B")` will be executed.

More detailed usage information can be found in test files, you can refer to `example_test.go` for a more complex dependency topology, `dependency_test.go` for the advanced usage of dependency logic expressions, and `tcc_test.go` for tasks rollback and error message collection.

## Specifications

### Execution
In summary, a single execution of the TCController contains multiple tasks. There may be some dependencies between tasks, and the termination of the execution depends on the completion of some of these tasks. **Therefore, `controller.SetTermination` must be called before calling `controller.RunTasks`.**

### Task Function
The task function must have this form:
```go
func (args map[string]interface{}) (interface{}, error)
```
There are some built-in keys when running the task function:
- `BIND`: the value is the third arguments when `controller.AddTask()` was called.
- `CANCEL`: the value is a context.Context, with cancel.

Other keys are the **names** of its dependent tasks, and the corresponding values are the return value of these tasks.

### Undo Function
The undo function must have this form:
```go
func (args map[string]interface{}) error
```

The undo functions will be run in the reverse order of the task function completion. And the second arguments of `SetUndoFunc` means whether to skip this error if the undo function errors out.

The undo function will be executed when:
1. Some task return `err!=nil` when the controller execute the tasks.
2. The corresponding task has been completed.
3. The predecessor undo functions have been completed or skipped.

When the undo function run, the arguments `args` is exactly the same as its corresponding task.

### Errors

During the execution of TCController, multiple tasks may fail and after failure, multiple tasks may be cancelled. During rollback, multiple rollback functions may also encounter errors. Therefore, the error definitions in the return value of `RunTasks` are as follows:
```go
type ErrAborted struct {
TaskErrors *ErrorList
UndoErrors *ErrorList
}

type ErrorList struct {
Lock sync.Mutex
Items []*ErrorMessage
}

type ErrorMessage struct {
TaskName string
Error error
}
```

### Dependency Expression

Supported dependency logic expressions are `not`, `and`, `or`, `xor` and any combination of them.

For taskB, create a dependency expression about taskA:
```go
ExprAB := taskB.NewDependencyExpr(taskA)
```

Combine existing dependency expressions to generate dependency expressions:
```go
Expr3 := gotcc.MakeOrExpr(Expr1, Expr2)
```

Get the current dependency expression of taskA.
```go
Expr := taskA.DependencyExpr()
```

Set the dependency expression for taskA.
```go
taskA.SetDependencyExpr(Expr)
```

And termination setup has the same logic as above.

## License
`gotcc` is released under the MIT license. See [LICENSE](https://github.com/piaodazhu/gotcc/blob/master/LICENSE) for details.
72 changes: 36 additions & 36 deletions dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,75 +35,75 @@ func TestDependency(t *testing.T) {
A.SetDependency(DefaultFalseExpr)
B.SetDependency(DefaultFalseExpr)

C.MarkDependency(A.Id, A.CalcDependency())
C.MarkDependency(B.Id, B.CalcDependency())
C.MarkDependency(A.Id, A.DependencyExpr()())
C.MarkDependency(B.Id, B.DependencyExpr()())

D.MarkDependency(A.Id, A.CalcDependency())
D.MarkDependency(B.Id, B.CalcDependency())
D.MarkDependency(A.Id, A.DependencyExpr()())
D.MarkDependency(B.Id, B.DependencyExpr()())

E.MarkDependency(D.Id, D.CalcDependency())
E.MarkDependency(D.Id, D.DependencyExpr()())

F.MarkDependency(B.Id, B.CalcDependency())
F.MarkDependency(D.Id, D.CalcDependency())
F.MarkDependency(B.Id, B.DependencyExpr()())
F.MarkDependency(D.Id, D.DependencyExpr()())

if A.CalcDependency() != false || B.CalcDependency() != false || C.CalcDependency() != false || D.CalcDependency() != false || E.CalcDependency() != true || F.CalcDependency() != false {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.CalcDependency(), B.CalcDependency(), C.CalcDependency(), D.CalcDependency(), E.CalcDependency(), F.CalcDependency())
if A.DependencyExpr()() != false || B.DependencyExpr()() != false || C.DependencyExpr()() != false || D.DependencyExpr()() != false || E.DependencyExpr()() != true || F.DependencyExpr()() != false {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.DependencyExpr(), B.DependencyExpr(), C.DependencyExpr(), D.DependencyExpr(), E.DependencyExpr(), F.DependencyExpr())
}

// 1 0 0 1 0 1
A.SetDependency(DefaultTrueExpr)
B.SetDependency(DefaultFalseExpr)

C.MarkDependency(A.Id, A.CalcDependency())
C.MarkDependency(B.Id, B.CalcDependency())
C.MarkDependency(A.Id, A.DependencyExpr()())
C.MarkDependency(B.Id, B.DependencyExpr()())

D.MarkDependency(A.Id, A.CalcDependency())
D.MarkDependency(B.Id, B.CalcDependency())
D.MarkDependency(A.Id, A.DependencyExpr()())
D.MarkDependency(B.Id, B.DependencyExpr()())

E.MarkDependency(D.Id, D.CalcDependency())
E.MarkDependency(D.Id, D.DependencyExpr()())

F.MarkDependency(B.Id, B.CalcDependency())
F.MarkDependency(D.Id, D.CalcDependency())
F.MarkDependency(B.Id, B.DependencyExpr()())
F.MarkDependency(D.Id, D.DependencyExpr()())

if A.CalcDependency() != true || B.CalcDependency() != false || C.CalcDependency() != false || D.CalcDependency() != true || E.CalcDependency() != false || F.CalcDependency() != true {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.CalcDependency(), B.CalcDependency(), C.CalcDependency(), D.CalcDependency(), E.CalcDependency(), F.CalcDependency())
if A.DependencyExpr()() != true || B.DependencyExpr()() != false || C.DependencyExpr()() != false || D.DependencyExpr()() != true || E.DependencyExpr()() != false || F.DependencyExpr()() != true {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.DependencyExpr(), B.DependencyExpr(), C.DependencyExpr(), D.DependencyExpr(), E.DependencyExpr(), F.DependencyExpr())
}

// 0 1 0 1 0 0
A.SetDependency(DefaultFalseExpr)
B.SetDependency(DefaultTrueExpr)

C.MarkDependency(A.Id, A.CalcDependency())
C.MarkDependency(B.Id, B.CalcDependency())
C.MarkDependency(A.Id, A.DependencyExpr()())
C.MarkDependency(B.Id, B.DependencyExpr()())

D.MarkDependency(A.Id, A.CalcDependency())
D.MarkDependency(B.Id, B.CalcDependency())
D.MarkDependency(A.Id, A.DependencyExpr()())
D.MarkDependency(B.Id, B.DependencyExpr()())

E.MarkDependency(D.Id, D.CalcDependency())
E.MarkDependency(D.Id, D.DependencyExpr()())

F.MarkDependency(B.Id, B.CalcDependency())
F.MarkDependency(D.Id, D.CalcDependency())
F.MarkDependency(B.Id, B.DependencyExpr()())
F.MarkDependency(D.Id, D.DependencyExpr()())

if A.CalcDependency() != false || B.CalcDependency() != true || C.CalcDependency() != false || D.CalcDependency() != true || E.CalcDependency() != false || F.CalcDependency() != false {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.CalcDependency(), B.CalcDependency(), C.CalcDependency(), D.CalcDependency(), E.CalcDependency(), F.CalcDependency())
if A.DependencyExpr()() != false || B.DependencyExpr()() != true || C.DependencyExpr()() != false || D.DependencyExpr()() != true || E.DependencyExpr()() != false || F.DependencyExpr()() != false {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.DependencyExpr(), B.DependencyExpr(), C.DependencyExpr(), D.DependencyExpr(), E.DependencyExpr(), F.DependencyExpr())
}

// 1 1 1 1 0 0
A.SetDependency(DefaultTrueExpr)
B.SetDependency(DefaultTrueExpr)

C.MarkDependency(A.Id, A.CalcDependency())
C.MarkDependency(B.Id, B.CalcDependency())
C.MarkDependency(A.Id, A.DependencyExpr()())
C.MarkDependency(B.Id, B.DependencyExpr()())

D.MarkDependency(A.Id, A.CalcDependency())
D.MarkDependency(B.Id, B.CalcDependency())
D.MarkDependency(A.Id, A.DependencyExpr()())
D.MarkDependency(B.Id, B.DependencyExpr()())

E.MarkDependency(D.Id, D.CalcDependency())
E.MarkDependency(D.Id, D.DependencyExpr()())

F.MarkDependency(B.Id, B.CalcDependency())
F.MarkDependency(D.Id, D.CalcDependency())
F.MarkDependency(B.Id, B.DependencyExpr()())
F.MarkDependency(D.Id, D.DependencyExpr()())

if A.CalcDependency() != true || B.CalcDependency() != true || C.CalcDependency() != true || D.CalcDependency() != true || E.CalcDependency() != false || F.CalcDependency() != false {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.CalcDependency(), B.CalcDependency(), C.CalcDependency(), D.CalcDependency(), E.CalcDependency(), F.CalcDependency())
if A.DependencyExpr()() != true || B.DependencyExpr()() != true || C.DependencyExpr()() != true || D.DependencyExpr()() != true || E.DependencyExpr()() != false || F.DependencyExpr()() != false {
t.Errorf("Error: A=%v, B=%v, C=%v, D=%v, E=%v, F=%v\n", A.DependencyExpr(), B.DependencyExpr(), C.DependencyExpr(), D.DependencyExpr(), E.DependencyExpr(), F.DependencyExpr())
}
}
78 changes: 78 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package gotcc

import (
"fmt"
"time"
)

func hello(args map[string]interface{}) (interface{}, error) {
time.Sleep(10 * time.Millisecond)
fmt.Println("hello")
return 1, nil
}

func world(args map[string]interface{}) (interface{}, error) {
time.Sleep(20 * time.Millisecond)
fmt.Println("world")
return 2, nil
}

func helloworld(args map[string]interface{}) (interface{}, error) {
fmt.Println("helloworld")
return 3, nil
}

func foo(args map[string]interface{}) (interface{}, error) {
time.Sleep(30 * time.Millisecond)
fmt.Println("foo")
return 4, nil
}

func bar(args map[string]interface{}) (interface{}, error) {
time.Sleep(40 * time.Millisecond)
fmt.Println("bar")
return 5, nil
}

func foobar(args map[string]interface{}) (interface{}, error) {
fmt.Println("foobar")
return 5, nil
}

func ExampleNewTCController() {
// in this example:
// hello -+
// +-(&&)-> helloworld +
// world -+ +
// foo -+ +-(&&)-> [termination]
// +-(||)-> foobar +
// bar -+

controller := NewTCController()

hello := controller.AddTask("hello", hello, 0)
world := controller.AddTask("world", world, 1)
helloworld := controller.AddTask("helloworld", helloworld, 2)
foo := controller.AddTask("foo", foo, 3)
bar := controller.AddTask("bar", bar, 4)
foobar := controller.AddTask("foobar", foobar, 5)

helloworld.SetDependency(MakeAndExpr(helloworld.NewDependencyExpr(hello), helloworld.NewDependencyExpr(world)))

foobar.SetDependency(MakeOrExpr(foobar.NewDependencyExpr(foo), foobar.NewDependencyExpr(bar)))

controller.SetTermination(MakeAndExpr(controller.NewTerminationExpr(foobar), controller.NewTerminationExpr(helloworld)))

_, err := controller.RunTask()
if err != nil {
panic(err)
}

// Output:
// hello
// world
// helloworld
// foo
// foobar
// bar
}
8 changes: 4 additions & 4 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func (e *Executor) NewDependencyExpr(d *Executor) DependencyExpression {
return newDependencyExpr(e.dependency, d.Id)
}

func (e *Executor) DependencyExpr() DependencyExpression {
return e.dependencyExpr
}

func (e *Executor) SetDependency(Expr DependencyExpression) *Executor {
e.dependencyExpr = Expr
return e
Expand All @@ -55,10 +59,6 @@ func (e *Executor) MarkDependency(id uint32, finished bool) {
e.dependency[id] = finished
}

func (e *Executor) CalcDependency() bool {
return e.dependencyExpr()
}

func (e *Executor) SetUndoFunc(undo func(args map[string]interface{}) error, SkipError bool) *Executor {
e.Undo = undo
e.UndoSkipError = SkipError
Expand Down
Loading

0 comments on commit ae36fea

Please sign in to comment.