Skip to content

Commit

Permalink
feat(*): add custom task handler
Browse files Browse the repository at this point in the history
Signed-off-by: Soren Yang <lsytj0413@gmail.com>
  • Loading branch information
lsytj0413 committed Jan 15, 2024
1 parent 8d47194 commit 70e8f85
Show file tree
Hide file tree
Showing 27 changed files with 1,012 additions and 68 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<!-- <p align="center"><img src="" alt="" height="100px"></p> -->
<p align="center"><img src="doc/fornext.png" alt="" height="100px"></p>

<div align="center">
<a href="https://codecov.io/gh/fornext-io/fornext" >
Expand Down
Binary file added doc/fornext.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added examples/fornext/000060.sst
Binary file not shown.
Binary file added examples/fornext/000061.sst
Binary file not shown.
1 change: 1 addition & 0 deletions examples/fornext/CURRENT
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MANIFEST-000000
Empty file added examples/fornext/LOCK
Empty file.
Binary file added examples/fornext/MANIFEST-000058
Binary file not shown.
Binary file added examples/fornext/MANIFEST-000063
Binary file not shown.
56 changes: 56 additions & 0 deletions examples/fornext/OPTIONS-000064
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[Version]
pebble_version=0.1

[Options]
bytes_per_sync=524288
cache_size=8388608
cleaner=delete
compaction_debt_concurrency=1073741824
comparer=leveldb.BytewiseComparator
disable_wal=false
flush_delay_delete_range=0s
flush_delay_range_key=0s
flush_split_bytes=67108864
format_major_version=14
l0_compaction_concurrency=10
l0_compaction_file_threshold=500
l0_compaction_threshold=4
l0_stop_writes_threshold=12
lbase_max_bytes=67108864
max_concurrent_compactions=1
max_manifest_file_size=134217728
max_open_files=1000
mem_table_size=4194304
mem_table_stop_writes_threshold=2
min_deletion_rate=0
merger=pebble.concatenate
read_compaction_rate=16000
read_sampling_multiplier=16
strict_wal_tail=true
table_cache_shards=12
table_property_collectors=[]
validate_on_ingest=false
wal_dir=
wal_bytes_per_sync=0
max_writer_concurrency=0
force_writer_parallelism=false

[Level "0"]
block_restart_interval=16
block_size=65536
block_size_threshold=90
compression=NoCompression
filter_policy=none
filter_type=table
index_block_size=65536
target_file_size=33554432

[Level "1"]
block_restart_interval=16
block_size=65536
block_size_threshold=90
compression=ZSTD
filter_policy=none
filter_type=table
index_block_size=65536
target_file_size=67108864
Empty file.
Empty file.
8 changes: 7 additions & 1 deletion examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main
import (
"encoding/json"
"fmt"
"log/slog"
"os"
"path"

Expand All @@ -24,7 +25,12 @@ func main() {
panic(err)
}

e := executor.NewExecutor(&sm)
e := executor.NewExecutor(&sm, map[string]func(*executor.CreateTaskCommand) []byte{
"task1": func(ctc *executor.CreateTaskCommand) []byte {
slog.Info("start task", slog.Any("CreateTaskCommand", ctc))
return ctc.Input
},
})
e.Run([]byte(`{"name": "123", "data": 10}`))
ec := e.WaitExecutionDone()
fmt.Printf("id: %v\n", ec.ID)
Expand Down
20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,27 @@ require (
)

require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
github.com/cockroachdb/pebble v1.0.0 // indirect
github.com/cockroachdb/redact v1.0.8 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
Expand Down
615 changes: 615 additions & 0 deletions go.sum

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions pkg/executor/choice_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *choiceStateProcessor) StartState(ctx context.Context, cmd *StartStateCo
return nil
}

func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, output interface{}, rules []fsl.ChoiceRule) bool {
func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, output any, rules []fsl.ChoiceRule) bool {
for _, rule := range rules {
if !p.isMatchedChoiceRule(ctx, output, rule) {
return false
Expand All @@ -50,7 +50,7 @@ func (p *choiceStateProcessor) isMatchedAllChoiceRule(ctx context.Context, outpu
return true
}

func (p *choiceStateProcessor) isMatchedAnyChoiceRule(ctx context.Context, output interface{}, rules []fsl.ChoiceRule) bool {
func (p *choiceStateProcessor) isMatchedAnyChoiceRule(ctx context.Context, output any, rules []fsl.ChoiceRule) bool {
for _, rule := range rules {
if p.isMatchedChoiceRule(ctx, output, rule) {
return true
Expand Down Expand Up @@ -293,7 +293,7 @@ func (p *choiceStateProcessor) isMatchedChoiceRule(ctx context.Context, output i
case rule.IsNull != nil:
return variableIsNull(*rule.Variable, output)
case rule.IsPresent != nil:
return variableIsPresent(*rule.Variable, output)
return variableIsPresent(*rule.Variable, output) == *rule.IsPresent
case rule.IsNumeric != nil:
return variableTypeMatch[float64](*rule.Variable, output)
case rule.IsString != nil:
Expand Down Expand Up @@ -331,8 +331,11 @@ func (p *choiceStateProcessor) CompleteState(ctx context.Context, cmd *CompleteS
slog.InfoContext(ctx, "complete choice state",
slog.String("ActivityID", cmd.ActivityID),
)
at, err := Get[ActivityContext](context.Background(), e.store, cmd.ActivityID)
if err != nil {
panic(err)
}

at := e.activityContextes[cmd.ActivityID]
// TODO: found the next state
next, err := p.findNextState(ctx, cmd.Output, s)
if err != nil {
Expand Down
Loading

0 comments on commit 70e8f85

Please sign in to comment.