Skip to content

Commit

Permalink
modify FlowNode
Browse files Browse the repository at this point in the history
  • Loading branch information
Lack30 committed Dec 11, 2024
1 parent ee4cb61 commit 474df4a
Show file tree
Hide file tree
Showing 20 changed files with 210 additions and 231 deletions.
16 changes: 8 additions & 8 deletions activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m nextHarnessActionMessage) message() {}

type Harness struct {
*Wiring
runnerChannel chan imessage
mch chan imessage
activity Activity
active int32
cancellation sync.Once
Expand Down Expand Up @@ -128,9 +128,9 @@ func NewHarness(ctx context.Context, wiring *Wiring, idGenerator id.IGenerator,
}

node = &Harness{
Wiring: wiring,
runnerChannel: make(chan imessage, len(wiring.Incoming)*2+1),
activity: activity,
Wiring: wiring,
mch: make(chan imessage, len(wiring.Incoming)*2+1),
activity: activity,
}

err = node.EventEgress.RegisterEventConsumer(node)
Expand Down Expand Up @@ -167,16 +167,16 @@ func NewHarness(ctx context.Context, wiring *Wiring, idGenerator id.IGenerator,
flowable.Start(ctx)
}
sender := node.Tracer.RegisterSender()
go node.runner(ctx, sender)
go node.run(ctx, sender)
return
}

func (node *Harness) runner(ctx context.Context, sender tracing.ISenderHandle) {
func (node *Harness) run(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()

for {
select {
case msg := <-node.runnerChannel:
case msg := <-node.mch:
switch m := msg.(type) {
case nextHarnessActionMessage:
atomic.StoreInt32(&node.active, 1)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (node *Harness) runner(ctx context.Context, sender tracing.ISenderHandle) {

func (node *Harness) NextAction(flow T) chan IAction {
response := make(chan chan IAction, 1)
node.runnerChannel <- nextHarnessActionMessage{flow: flow, response: response}
node.mch <- nextHarnessActionMessage{flow: flow, response: response}
return <-response
}

Expand Down
14 changes: 7 additions & 7 deletions event_catch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (m processEventMessage) message() {}
type CatchEvent struct {
*Wiring
element *schema.CatchEvent
runnerChannel chan imessage
mch chan imessage
activated bool
awaitingActions []chan IAction
satisfier *logic.CatchEventSatisfier
Expand All @@ -45,26 +45,26 @@ func NewCatchEvent(ctx context.Context, wiring *Wiring, catchEvent *schema.Catch
evt = &CatchEvent{
Wiring: wiring,
element: catchEvent,
runnerChannel: make(chan imessage, len(wiring.Incoming)*2+1),
mch: make(chan imessage, len(wiring.Incoming)*2+1),
activated: false,
awaitingActions: make([]chan IAction, 0),
satisfier: logic.NewCatchEventSatisfier(catchEvent, wiring.EventDefinitionInstanceBuilder),
}
sender := evt.Tracer.RegisterSender()
go evt.runner(ctx, sender)
go evt.run(ctx, sender)
err = evt.EventEgress.RegisterEventConsumer(evt)
if err != nil {
return
}
return
}

func (evt *CatchEvent) runner(ctx context.Context, sender tracing.ISenderHandle) {
func (evt *CatchEvent) run(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()

for {
select {
case msg := <-evt.runnerChannel:
case msg := <-evt.mch:
switch m := msg.(type) {
case processEventMessage:
if evt.activated {
Expand Down Expand Up @@ -94,14 +94,14 @@ func (evt *CatchEvent) runner(ctx context.Context, sender tracing.ISenderHandle)
}

func (evt *CatchEvent) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error) {
evt.runnerChannel <- processEventMessage{event: ev}
evt.mch <- processEventMessage{event: ev}
result = event.Consumed
return
}

func (evt *CatchEvent) NextAction(flow T) chan IAction {
response := make(chan IAction)
evt.runnerChannel <- nextActionMessage{response: response, flow: flow}
evt.mch <- nextActionMessage{response: response, flow: flow}
return response
}

Expand Down
12 changes: 6 additions & 6 deletions event_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type EndEvent struct {
element *schema.EndEvent
activated bool
completed bool
runnerChannel chan imessage
mch chan imessage
startEventsActivated []*schema.StartEvent
}

Expand All @@ -40,20 +40,20 @@ func NewEndEvent(ctx context.Context, wiring *Wiring, endEvent *schema.EndEvent)
element: endEvent,
activated: false,
completed: false,
runnerChannel: make(chan imessage, len(wiring.Incoming)*2+1),
mch: make(chan imessage, len(wiring.Incoming)*2+1),
startEventsActivated: make([]*schema.StartEvent, 0),
}
sender := evt.Tracer.RegisterSender()
go evt.runner(ctx, sender)
go evt.run(ctx, sender)
return
}

func (evt *EndEvent) runner(ctx context.Context, sender tracing.ISenderHandle) {
func (evt *EndEvent) run(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()

for {
select {
case msg := <-evt.runnerChannel:
case msg := <-evt.mch:
switch m := msg.(type) {
case nextActionMessage:
if !evt.activated {
Expand Down Expand Up @@ -82,7 +82,7 @@ func (evt *EndEvent) runner(ctx context.Context, sender tracing.ISenderHandle) {

func (evt *EndEvent) NextAction(T) chan IAction {
response := make(chan IAction, 1)
evt.runnerChannel <- nextActionMessage{response: response}
evt.mch <- nextActionMessage{response: response}
return response
}

Expand Down
44 changes: 17 additions & 27 deletions event_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ import (
"github.com/olive-io/bpmn/v2/pkg/tracing"
)

//type imessage interface {
// message()
//}
//
//type nextActionMessage struct {
// response chan IAction
//}

//func (m nextActionMessage) message() {}

type startMessage struct{}

func (m startMessage) message() {}
Expand All @@ -49,11 +39,11 @@ func (m eventMessage) message() {}

type StartEvent struct {
*Wiring
element *schema.StartEvent
runnerChannel chan imessage
activated bool
idGenerator id.IGenerator
satisfier *logic.CatchEventSatisfier
element *schema.StartEvent
mch chan imessage
activated bool
idGenerator id.IGenerator
satisfier *logic.CatchEventSatisfier
}

func NewStartEvent(ctx context.Context, wiring *Wiring,
Expand All @@ -72,28 +62,28 @@ func NewStartEvent(ctx context.Context, wiring *Wiring,
}

evt = &StartEvent{
Wiring: wiring,
element: startEvent,
runnerChannel: make(chan imessage, len(wiring.Incoming)*2+1),
activated: false,
idGenerator: idGenerator,
satisfier: logic.NewCatchEventSatisfier(startEvent, wiring.EventDefinitionInstanceBuilder),
Wiring: wiring,
element: startEvent,
mch: make(chan imessage, len(wiring.Incoming)*2+1),
activated: false,
idGenerator: idGenerator,
satisfier: logic.NewCatchEventSatisfier(startEvent, wiring.EventDefinitionInstanceBuilder),
}
sender := evt.Tracer.RegisterSender()
go evt.runner(ctx, sender)
go evt.run(ctx, sender)
err = evt.EventEgress.RegisterEventConsumer(evt)
if err != nil {
return
}
return
}

func (evt *StartEvent) runner(ctx context.Context, sender tracing.ISenderHandle) {
func (evt *StartEvent) run(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()

for {
select {
case msg := <-evt.runnerChannel:
case msg := <-evt.mch:
switch m := msg.(type) {
case nextActionMessage:
if !evt.activated {
Expand Down Expand Up @@ -126,18 +116,18 @@ func (evt *StartEvent) flow(ctx context.Context) {
}

func (evt *StartEvent) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error) {
evt.runnerChannel <- eventMessage{event: ev}
evt.mch <- eventMessage{event: ev}
result = event.Consumed
return
}

func (evt *StartEvent) Trigger() {
evt.runnerChannel <- startMessage{}
evt.mch <- startMessage{}
}

func (evt *StartEvent) NextAction(flow T) chan IAction {
response := make(chan IAction)
evt.runnerChannel <- nextActionMessage{response: response, flow: flow}
evt.mch <- nextActionMessage{response: response, flow: flow}
return response
}

Expand Down
22 changes: 11 additions & 11 deletions gateway_event_based.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,29 @@ import (

type EventBasedGateway struct {
*Wiring
element *schema.EventBasedGateway
runnerChannel chan imessage
activated bool
element *schema.EventBasedGateway
mch chan imessage
activated bool
}

func NewEventBasedGateway(ctx context.Context, wiring *Wiring, eventBasedGateway *schema.EventBasedGateway) (gw *EventBasedGateway, err error) {
gw = &EventBasedGateway{
Wiring: wiring,
element: eventBasedGateway,
runnerChannel: make(chan imessage, len(wiring.Incoming)*2+1),
activated: false,
Wiring: wiring,
element: eventBasedGateway,
mch: make(chan imessage, len(wiring.Incoming)*2+1),
activated: false,
}
sender := gw.Tracer.RegisterSender()
go gw.runner(ctx, sender)
go gw.run(ctx, sender)
return
}

func (gw *EventBasedGateway) runner(ctx context.Context, sender tracing.ISenderHandle) {
func (gw *EventBasedGateway) run(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()

for {
select {
case msg := <-gw.runnerChannel:
case msg := <-gw.mch:
switch m := msg.(type) {
case nextActionMessage:
var first int32 = 0
Expand Down Expand Up @@ -103,7 +103,7 @@ func (gw *EventBasedGateway) runner(ctx context.Context, sender tracing.ISenderH

func (gw *EventBasedGateway) NextAction(flow T) chan IAction {
response := make(chan IAction)
gw.runnerChannel <- nextActionMessage{response: response, flow: flow}
gw.mch <- nextActionMessage{response: response, flow: flow}
return response
}

Expand Down
16 changes: 8 additions & 8 deletions gateway_exclusive.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (e ExclusiveNoEffectiveSequenceFlows) Error() string {
type ExclusiveGateway struct {
*Wiring
element *schema.ExclusiveGateway
runnerChannel chan imessage
mch chan imessage
defaultSequenceFlow *SequenceFlow
nonDefaultSequenceFlows []*SequenceFlow
probing map[id.Id]*chan IAction
Expand Down Expand Up @@ -79,29 +79,29 @@ func NewExclusiveGateway(ctx context.Context, wiring *Wiring, exclusiveGateway *
gw = &ExclusiveGateway{
Wiring: wiring,
element: exclusiveGateway,
runnerChannel: make(chan imessage, len(wiring.Incoming)*2+1),
mch: make(chan imessage, len(wiring.Incoming)*2+1),
nonDefaultSequenceFlows: nonDefaultSequenceFlows,
defaultSequenceFlow: defaultSequenceFlow,
probing: make(map[id.Id]*chan IAction),
}
sender := gw.Tracer.RegisterSender()
go gw.runner(ctx, sender)
go gw.run(ctx, sender)
return
}

func (gw *ExclusiveGateway) runner(ctx context.Context, sender tracing.ISenderHandle) {
func (gw *ExclusiveGateway) run(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()

for {
select {
case msg := <-gw.runnerChannel:
case msg := <-gw.mch:
switch m := msg.(type) {
case probingReport:
if response, ok := gw.probing[m.flowId]; ok {
if response == nil {
// Reschedule, there's no next action yet
go func() {
gw.runnerChannel <- m
gw.mch <- m
}()
continue
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (gw *ExclusiveGateway) runner(ctx context.Context, sender tracing.ISenderHa
m.response <- ProbeAction{
SequenceFlows: gw.nonDefaultSequenceFlows,
ProbeReport: func(indices []int) {
gw.runnerChannel <- probingReport{
gw.mch <- probingReport{
result: indices,
flowId: m.flow.Id(),
}
Expand All @@ -177,7 +177,7 @@ func (gw *ExclusiveGateway) runner(ctx context.Context, sender tracing.ISenderHa

func (gw *ExclusiveGateway) NextAction(flow T) chan IAction {
response := make(chan IAction)
gw.runnerChannel <- nextActionMessage{response: response, flow: flow}
gw.mch <- nextActionMessage{response: response, flow: flow}
return response
}

Expand Down
Loading

0 comments on commit 474df4a

Please sign in to comment.