Skip to content

Commit

Permalink
process
Browse files Browse the repository at this point in the history
  • Loading branch information
Lack30 committed Nov 27, 2024
1 parent 6fbddbf commit 92b6731
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 46 deletions.
8 changes: 2 additions & 6 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ func NewWorkflow(reader io.Reader, opts ...bpmn.Option) (*Workflow, error) {
ctx, cancel := context.WithCancel(context.Background())
tracer := tracing.NewTracer(ctx)

processes := make([]*bpmn.Process, 0)
instances := make([]*bpmn.Instance, 0)

opts = append(opts, bpmn.WithContext(ctx), bpmn.WithTracer(tracer))
options := bpmn.NewOptions(opts...)

for _, element := range *definitions.Processes() {
able, ok := element.IsExecutable()
Expand All @@ -72,10 +72,7 @@ func NewWorkflow(reader io.Reader, opts ...bpmn.Option) (*Workflow, error) {
continue
}

pr := bpmn.NewProcess(&element, &definitions, opts...)
processes = append(processes, pr)

instance, err := pr.Instantiate()
instance, err := bpmn.NewInstance(&element, &definitions, options)
if err != nil {
cancel()
return nil, err
Expand All @@ -88,7 +85,6 @@ func NewWorkflow(reader io.Reader, opts ...bpmn.Option) (*Workflow, error) {
cancel: cancel,
definitions: &definitions,
tracer: tracer,
processes: processes,
instances: instances,
}

Expand Down
18 changes: 9 additions & 9 deletions process.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,24 @@ func NewProcess(element *schema.Process, definitions *schema.Definitions, opts .
return &process
}

func (process *Process) Process() *schema.Process { return process.Element }
func (p *Process) Process() *schema.Process { return p.Element }

func (process *Process) Tracer() tracing.ITracer { return process.tracer }
func (p *Process) Tracer() tracing.ITracer { return p.tracer }

func (process *Process) Instantiate(opts ...Option) (inst *Instance, err error) {
subTracer := process.subTracerMaker()
func (p *Process) Instantiate(opts ...Option) (inst *Instance, err error) {
subTracer := p.subTracerMaker()

opts = append([]Option{
WithIdGenerator(process.idGeneratorBuilder),
WithEventDefinitionInstanceBuilder(process.eventDefinitionInstanceBuilder),
WithEventEgress(process.eventEgress),
WithEventIngress(process.eventIngress),
WithIdGenerator(p.idGeneratorBuilder),
WithEventDefinitionInstanceBuilder(p.eventDefinitionInstanceBuilder),
WithEventEgress(p.eventEgress),
WithEventIngress(p.eventIngress),
WithTracer(subTracer),
}, opts...)

options := NewOptions(opts...)

inst, err = NewInstance(process.Element, process.Definitions, options)
inst, err = NewInstance(p.Element, p.Definitions, options)
if err != nil {
return
}
Expand Down
62 changes: 31 additions & 31 deletions process_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,35 @@ type Instance struct {
eventConsumers []event.IConsumer
}

func (instance *Instance) Id() id.Id {
return instance.id
func (ins *Instance) Id() id.Id {
return ins.id
}

func (instance *Instance) Process() *schema.Process { return instance.process }
func (ins *Instance) Process() *schema.Process { return ins.process }

func (instance *Instance) Tracer() tracing.ITracer { return instance.tracer }
func (ins *Instance) Tracer() tracing.ITracer { return ins.tracer }

func (instance *Instance) Locator() data.IFlowDataLocator { return instance.locator }
func (ins *Instance) Locator() data.IFlowDataLocator { return ins.locator }

func (instance *Instance) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error) {
instance.eventConsumersLock.RLock()
func (ins *Instance) ConsumeEvent(ev event.IEvent) (result event.ConsumptionResult, err error) {
ins.eventConsumersLock.RLock()
// We're copying the list of consumers here to ensure that
// new consumers can subscribe during event forwarding
eventConsumers := instance.eventConsumers
instance.eventConsumersLock.RUnlock()
eventConsumers := ins.eventConsumers
ins.eventConsumersLock.RUnlock()
result, err = event.ForwardEvent(ev, &eventConsumers)
return
}

func (instance *Instance) RegisterEventConsumer(ev event.IConsumer) (err error) {
instance.eventConsumersLock.Lock()
defer instance.eventConsumersLock.Unlock()
instance.eventConsumers = append(instance.eventConsumers, ev)
func (ins *Instance) RegisterEventConsumer(ev event.IConsumer) (err error) {
ins.eventConsumersLock.Lock()
defer ins.eventConsumersLock.Unlock()
ins.eventConsumers = append(ins.eventConsumers, ev)
return
}

func (instance *Instance) FlowNodeMapping() *FlowNodeMapping {
return instance.flowNodeMapping
func (ins *Instance) FlowNodeMapping() *FlowNodeMapping {
return ins.flowNodeMapping
}

func NewInstance(element *schema.Process, definitions *schema.Definitions, options *Options) (instance *Instance, err error) {
Expand Down Expand Up @@ -459,14 +459,14 @@ func NewInstance(element *schema.Process, definitions *schema.Definitions, optio
}

// StartWith explicitly starts the instance by triggering a given start event
func (instance *Instance) StartWith(ctx context.Context, startEvent schema.StartEventInterface) (err error) {
flowNode, found := instance.flowNodeMapping.ResolveElementToFlowNode(startEvent)
func (ins *Instance) StartWith(ctx context.Context, startEvent schema.StartEventInterface) (err error) {
flowNode, found := ins.flowNodeMapping.ResolveElementToFlowNode(startEvent)
elementId := "<unnamed>"
if idPtr, present := startEvent.Id(); present {
elementId = *idPtr
}
processId := "<unnamed>"
if idPtr, present := instance.process.Id(); present {
if idPtr, present := ins.process.Id(); present {
processId = *idPtr
}
if !found {
Expand All @@ -486,26 +486,26 @@ func (instance *Instance) StartWith(ctx context.Context, startEvent schema.Start
}

// StartAll explicitly starts the instance by triggering all start events, if any
func (instance *Instance) StartAll() (err error) {
ctx := instance.ctx
for i := range *instance.process.StartEvents() {
err = instance.StartWith(ctx, &(*instance.process.StartEvents())[i])
func (ins *Instance) StartAll() (err error) {
ctx := ins.ctx
for i := range *ins.process.StartEvents() {
err = ins.StartWith(ctx, &(*ins.process.StartEvents())[i])
if err != nil {
return
}
}
return
}

func (instance *Instance) ceaseFlowMonitor(tracer tracing.ITracer) func(ctx context.Context, sender tracing.ISenderHandle) {
func (ins *Instance) ceaseFlowMonitor(tracer tracing.ITracer) func(ctx context.Context, sender tracing.ISenderHandle) {
// Subscribing to traces early as otherwise events produced
// after the goroutine below is started are not going to be
// sent to it.
traces := tracer.Subscribe()
instance.complete.Lock()
ins.complete.Lock()
return func(ctx context.Context, sender tracing.ISenderHandle) {
defer sender.Done()
defer instance.complete.Unlock()
defer ins.complete.Unlock()

/* 13.4.6 End Events:
Expand All @@ -528,7 +528,7 @@ func (instance *Instance) ceaseFlowMonitor(tracer tracing.ITracer) func(ctx cont
// [(1.2) will be addded when we actually support them]

for {
if len(startEventsActivated) == len(*instance.process.StartEvents()) {
if len(startEventsActivated) == len(*ins.process.StartEvents()) {
break
}

Expand Down Expand Up @@ -561,25 +561,25 @@ func (instance *Instance) ceaseFlowMonitor(tracer tracing.ITracer) func(ctx cont
// Then, we're waiting for (2) to occur
waitIsOver := make(chan struct{})
go func() {
instance.flowWaitGroup.Wait()
ins.flowWaitGroup.Wait()
close(waitIsOver)
}()
select {
case <-waitIsOver:
// Send out a cease flow trace
tracer.Trace(CeaseFlowTrace{Process: instance.process})
tracer.Trace(CeaseFlowTrace{Process: ins.process})
case <-ctx.Done():
}
}
}

// WaitUntilComplete waits until the instance is complete.
// Returns true if the instance was complete, false if the context signalled `Done`
func (instance *Instance) WaitUntilComplete(ctx context.Context) (complete bool) {
func (ins *Instance) WaitUntilComplete(ctx context.Context) (complete bool) {
signal := make(chan bool)
go func() {
instance.complete.Lock()
defer instance.complete.Unlock()
ins.complete.Lock()
defer ins.complete.Unlock()
signal <- true
}()
select {
Expand Down

0 comments on commit 92b6731

Please sign in to comment.