Skip to content

Commit

Permalink
activity
Browse files Browse the repository at this point in the history
  • Loading branch information
Lack30 committed Nov 29, 2024
1 parent 705eafa commit 1a6723b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 36 deletions.
41 changes: 23 additions & 18 deletions activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ type TaskTrace struct {
dataObjects map[string]any
headers map[string]any
properties map[string]any
forward chan DoResponse
response chan DoResponse
done chan struct{}
}
Expand All @@ -356,6 +357,7 @@ func newTaskTrace() *TaskTrace {
dataObjects: make(map[string]any),
headers: make(map[string]any),
properties: make(map[string]any),
forward: make(chan DoResponse, 1),
response: make(chan DoResponse, 1),
done: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -396,9 +398,7 @@ func (t *TaskTrace) Do(options ...DoOption) {
}

response := newDoOption(options...)

t.response <- *response
close(t.done)
t.forward <- *response
}

func (t *TaskTrace) process() {
Expand All @@ -407,22 +407,27 @@ func (t *TaskTrace) process() {
duration = time.Second
}

for {
select {
case <-t.done:
return
case <-t.ctx.Done():
rsp := newDoOption(WithErr(t.ctx.Err()))
t.response <- *rsp
case <-time.After(duration):
var tid string
if v, ok := t.activity.Element().Id(); ok {
tid = *v
}

rsp := newDoOption(WithErr(errors.TaskExecError{Id: tid, Reason: "timed out"}))
t.response <- *rsp
select {
case <-t.done:
case <-t.ctx.Done():
rsp := newDoOption(WithErr(t.ctx.Err()))
t.response <- *rsp
case <-time.After(duration):
var tid string
if v, ok := t.activity.Element().Id(); ok {
tid = *v
}

rsp := newDoOption(WithErr(errors.TaskExecError{Id: tid, Reason: "timed out"}))
t.response <- *rsp
case rsp := <-t.forward:
t.response <- rsp
}

select {
case <-t.done:
default:
close(t.done)
}
}

Expand Down
5 changes: 3 additions & 2 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/xml"
"io"
"log"
"time"

"github.com/olive-io/bpmn/schema"
"github.com/olive-io/bpmn/v2"
Expand Down Expand Up @@ -146,8 +147,8 @@ func main() {
switch tr := trace.(type) {
case *bpmn.TaskTrace:
log.Printf("%#v\n", trace)
_ = tr
//tr.Do()
time.Sleep(time.Second * 1)
tr.Do()
case bpmn.ErrorTrace:
log.Fatalf("%#v", trace)
default:
Expand Down
2 changes: 1 addition & 1 deletion examples/basic/task.bpmn
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<olive:taskDefinition type="grpc" retries="2" />
<olive:taskHeaders>
<olive:header name="ov:content_type" value="application/grpc+json" />
<olive:header name="timeout" value="1s" />
<olive:header name="timeout" value="3s" />
<olive:header name="ov:handler" value="rpc" />
<olive:header name="ov:method" value="/gatewaypb.TestService/Hello" />
<olive:header name="ov:url" value="/gatewaypb.TestService/Hello" />
Expand Down
8 changes: 2 additions & 6 deletions flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,9 @@ type Snapshot struct {
sequenceFlow *SequenceFlow
}

func (s *Snapshot) Id() id.Id {
return s.flowId
}
func (s *Snapshot) Id() id.Id { return s.flowId }

func (s *Snapshot) SequenceFlow() *SequenceFlow {
return s.sequenceFlow
}
func (s *Snapshot) SequenceFlow() *SequenceFlow { return s.sequenceFlow }

// Flow Represents a flow
type Flow struct {
Expand Down
11 changes: 5 additions & 6 deletions flow_wiring.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func NewWiring(
if err != nil {
return
}
var ownId string
if ownIdPtr, present := flowNode.Id(); !present {
ownIdPtr, present := flowNode.Id()
if !present {
err = errors.NotFoundError{
Expected: fmt.Sprintf("flow node %#v to have an ID", flowNode),
}
return
} else {
ownId = *ownIdPtr
}

ownId := *ownIdPtr
node = &Wiring{
ProcessInstanceId: processInstanceId,
FlowNodeId: ownId,
Expand Down Expand Up @@ -127,7 +127,6 @@ func (wiring *Wiring) CloneFor(flowNode *schema.FlowNode) (result *Wiring, err e
if err != nil {
return
}
var ownId string

ownIdPtr, present := flowNode.Id()
if !present {
Expand All @@ -137,7 +136,7 @@ func (wiring *Wiring) CloneFor(flowNode *schema.FlowNode) (result *Wiring, err e
return
}

ownId = *ownIdPtr
ownId := *ownIdPtr
result = &Wiring{
ProcessInstanceId: wiring.ProcessInstanceId,
FlowNodeId: ownId,
Expand Down
4 changes: 2 additions & 2 deletions subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (p *SubProcess) runner(ctx context.Context, out tracing.ITracer) {
}

trace = tracing.Unwrap(trace)
switch trace := trace.(type) {
switch tr := trace.(type) {
case CeaseFlowTrace:
out.Trace(ProcessLandMarkTrace{Node: p.element})
break loop
Expand All @@ -564,7 +564,7 @@ func (p *SubProcess) runner(ctx context.Context, out tracing.ITracer) {
case TerminationTrace:
// ignore end event of sub process
default:
out.Trace(trace)
out.Trace(tr)
}
}
p.tracer.Unsubscribe(traces)
Expand Down
1 change: 0 additions & 1 deletion task.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (task *Task) runner(ctx context.Context) {
task.Tracer.Trace(CancellationFlowNodeTrace{Node: task.element})
return
case <-at.out():

}

m.response <- action
Expand Down

0 comments on commit 1a6723b

Please sign in to comment.