Skip to content

Commit

Permalink
Merge branch 'master' into issue-1072
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
  • Loading branch information
bobo10001 committed Nov 11, 2022
2 parents 60cfe3d + 97974b2 commit 0ef8be8
Show file tree
Hide file tree
Showing 32 changed files with 606 additions and 123 deletions.
3 changes: 3 additions & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# These are supported funding model platforms

github: [danielgerlag]
8 changes: 4 additions & 4 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
<Version>3.6.4</Version>
<AssemblyVersion>3.6.4.0</AssemblyVersion>
<FileVersion>3.6.4.0</FileVersion>
<Version>3.7.0</Version>
<AssemblyVersion>3.7.0.0</AssemblyVersion>
<FileVersion>3.7.0.0</FileVersion>
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
<PackageVersion>3.6.4</PackageVersion>
<PackageVersion>3.7.0</PackageVersion>
</PropertyGroup>
</Project>
5 changes: 3 additions & 2 deletions src/WorkflowCore.DSL/Models/v1/StepSourceV1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace WorkflowCore.Models.DefinitionStorage.v1
public class StepSourceV1
{
public string StepType { get; set; }

public string Id { get; set; }

public string Name { get; set; }
Expand All @@ -29,8 +29,9 @@ public class StepSourceV1
public ExpandoObject Inputs { get; set; } = new ExpandoObject();

public Dictionary<string, string> Outputs { get; set; } = new Dictionary<string, string>();

public Dictionary<string, string> SelectNextStep { get; set; } = new Dictionary<string, string>();

public bool ProceedOnCancel { get; set; } = false;
}
}
1 change: 1 addition & 0 deletions src/WorkflowCore.DSL/Services/DefinitionLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Ty
targetStep.ErrorBehavior = nextStep.ErrorBehavior;
targetStep.RetryInterval = nextStep.RetryInterval;
targetStep.ExternalId = $"{nextStep.Id}";
targetStep.ProceedOnCancel = nextStep.ProceedOnCancel;

AttachInputs(nextStep, dataType, stepType, targetStep);
AttachOutputs(nextStep, dataType, stepType, targetStep);
Expand Down
10 changes: 10 additions & 0 deletions src/WorkflowCore/Interface/IWorkflowModifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,15 @@ IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, Time
IStepBuilder<TData, Activity> Activity(string activityName, Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);

/// <summary>
/// Wait here until an external activity is complete
/// </summary>
/// <param name="activityName">The name used to identify the activity to wait for</param>
/// <param name="parameters">The data to pass the external activity worker</param>
/// <param name="effectiveDate">Listen for events as of this effective date</param>
/// <param name="cancelCondition">A conditon that when true will cancel this WaitFor</param>
/// <returns></returns>
IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
}
}
2 changes: 2 additions & 0 deletions src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public interface IWorkflowRepository

Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);

Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default);

Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);

[Obsolete]
Expand Down
7 changes: 5 additions & 2 deletions src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ public virtual void Start()
public virtual void Stop()
{
_cancellationTokenSource.Cancel();
DispatchTask.Wait();
DispatchTask = null;
if (DispatchTask != null)
{
DispatchTask.Wait();
DispatchTask = null;
}
}

private async Task Execute()
Expand Down
10 changes: 3 additions & 7 deletions src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
finally
{
WorkflowActivity.Enrich(result);
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken);
await QueueProvider.QueueWork(itemId, QueueType.Index);
_greylist.Remove($"wf:{itemId}");
}
Expand All @@ -68,7 +68,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
{
foreach (var sub in result.Subscriptions)
{
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
await TryProcessSubscription(sub, _persistenceStore, cancellationToken);
}

await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
Expand Down Expand Up @@ -98,12 +98,8 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()

}

private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
{
//TODO: move to own class
Logger.LogDebug("Subscribing to event {EventName} {EventKey} for workflow {WorkflowId} step {StepId}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);

await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
if (subscription.EventName != Event.EventTypeActivity)
{
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _
}
}

public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
{
lock (_instances)
{
var existing = _instances.First(x => x.Id == workflow.Id);
_instances.Remove(existing);
_instances.Add(workflow);

lock (_subscriptions)
{
foreach (var subscription in subscriptions)
{
subscription.Id = Guid.NewGuid().ToString();
_subscriptions.Add(subscription);
}
}
}
}

public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken _ = default)
{
lock (_instances)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)

public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);

public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
{
await PersistWorkflow(workflow, cancellationToken);

foreach(var subscription in subscriptions)
{
await CreateEventSubscription(subscription, cancellationToken);
}
}

public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
public Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);

Expand Down
19 changes: 19 additions & 0 deletions src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -522,5 +522,24 @@ public IStepBuilder<TData, Activity> Activity(string activityName, Expression<Fu
Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
return stepBuilder;
}

public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null)
{
var newStep = new WorkflowStep<Activity>();
newStep.CancelCondition = cancelCondition;

WorkflowBuilder.AddStep(newStep);
var stepBuilder = new StepBuilder<TData, Activity>(WorkflowBuilder, newStep);
stepBuilder.Input((step) => step.ActivityName, activityName);

if (parameters != null)
stepBuilder.Input((step) => step.Parameters, parameters);

if (effectiveDate != null)
stepBuilder.Input((step) => step.EffectiveDate, effectiveDate);

Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
return stepBuilder;
}
}
}
14 changes: 14 additions & 0 deletions src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public void AttachBranch(IWorkflowBuilder branch)
if (step2.Children[i] == oldId)
step2.Children[i] = step.Id;
}

if (step2.CompensationStepId == oldId)
{
step2.CompensationStepId = step.Id;
}
}
}

Expand All @@ -104,6 +109,11 @@ public void AttachBranch(IWorkflowBuilder branch)
if (step2.Children[i] == oldId)
step2.Children[i] = step.Id;
}

if (step2.CompensationStepId == oldId)
{
step2.CompensationStepId = step.Id;
}
}
}

Expand Down Expand Up @@ -271,6 +281,10 @@ public IStepBuilder<TData, Activity> Activity(string activityName, Expression<Fu
{
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
}
public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null)
{
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
}

private IStepBuilder<TData, InlineStepBody> Start()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,32 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
await db.SaveChangesAsync(cancellationToken);
}
}

public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
{
using (var db = ConstructDbContext())
{
var uid = new Guid(workflow.Id);
var existingEntity = await db.Set<PersistedWorkflow>()
.Where(x => x.InstanceId == uid)
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsTracking()
.FirstAsync(cancellationToken);

var workflowPersistable = workflow.ToPersistable(existingEntity);

foreach (var subscription in subscriptions)
{
subscription.Id = Guid.NewGuid().ToString();
var subscriptionPersistable = subscription.ToPersistable();
db.Set<PersistedSubscription>().Add(subscriptionPersistable);
}

await db.SaveChangesAsync(cancellationToken);
}
}

public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken);
}

public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
{
using (var session = await _database.Client.StartSessionAsync())
{
session.StartTransaction();
await PersistWorkflow(workflow, cancellationToken);
await EventSubscriptions.InsertManyAsync(subscriptions, cancellationToken: cancellationToken);
await session.CommitTransactionAsync();
}
}

public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
{
var now = asAt.ToUniversalTime().Ticks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
base.OnConfiguring(optionsBuilder);
#if NETSTANDARD2_0
optionsBuilder.UseMySql(_connectionString, _mysqlOptionsAction);
#elif NETSTANDARD2_1_OR_GREATER
#elif NETSTANDARD2_1_OR_GREATER || NET6_0_OR_GREATER
optionsBuilder.UseMySql(_connectionString, ServerVersion.AutoDetect(_connectionString), _mysqlOptionsAction);
#endif
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.3" />
<PackageReference Include="Npgsql" Version="6.0.6" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.6" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="6.0.3">
<PrivateAssets>All</PrivateAssets>
</PackageReference>
Expand All @@ -34,7 +35,7 @@
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
<PackageReference Include="Npgsql" Version="5.0.1.1" />
<PackageReference Include="Npgsql" Version="5.0.14" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="5.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="5.0.1">
<PrivateAssets>All</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Raven.Client.Documents;
using Raven.Client.Documents.Linq;
using Raven.Client.Documents.Operations;
using Raven.Client.Documents.Session;
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -56,21 +57,41 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
{
using (var session = _database.OpenAsyncSession())
{
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId);
session.Advanced.Patch<WorkflowInstance, int>(workflow.Id, x => x.Version, workflow.Version);
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Description, workflow.Description);
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Reference, workflow.Reference);
session.Advanced.Patch<WorkflowInstance, ExecutionPointerCollection>(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers);
session.Advanced.Patch<WorkflowInstance, long?>(workflow.Id, x => x.NextExecution, workflow.NextExecution);
session.Advanced.Patch<WorkflowInstance, WorkflowStatus>(workflow.Id, x => x.Status, workflow.Status);
session.Advanced.Patch<WorkflowInstance, object>(workflow.Id, x => x.Data, workflow.Data);
session.Advanced.Patch<WorkflowInstance, DateTime>(workflow.Id, x => x.CreateTime, workflow.CreateTime);
session.Advanced.Patch<WorkflowInstance, DateTime?>(workflow.Id, x => x.CompleteTime, workflow.CompleteTime);
PatchSession(session, workflow);
await session.SaveChangesAsync(cancellationToken);
}
}

public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
{
using (var session = _database.OpenAsyncSession())
{
PatchSession(session, workflow);

foreach (var subscription in subscriptions)
{
await session.StoreAsync(subscription, cancellationToken);
}

await session.SaveChangesAsync(cancellationToken);
}
}

private void PatchSession(IAsyncDocumentSession session, WorkflowInstance workflow)
{
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId);
session.Advanced.Patch<WorkflowInstance, int>(workflow.Id, x => x.Version, workflow.Version);
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Description, workflow.Description);
session.Advanced.Patch<WorkflowInstance, string>(workflow.Id, x => x.Reference, workflow.Reference);
session.Advanced.Patch<WorkflowInstance, ExecutionPointerCollection>(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers);
session.Advanced.Patch<WorkflowInstance, long?>(workflow.Id, x => x.NextExecution, workflow.NextExecution);
session.Advanced.Patch<WorkflowInstance, WorkflowStatus>(workflow.Id, x => x.Status, workflow.Status);
session.Advanced.Patch<WorkflowInstance, object>(workflow.Id, x => x.Data, workflow.Data);
session.Advanced.Patch<WorkflowInstance, DateTime>(workflow.Id, x => x.CreateTime, workflow.CreateTime);
session.Advanced.Patch<WorkflowInstance, DateTime?>(workflow.Id, x => x.CompleteTime, workflow.CompleteTime);

}

public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
{
var now = asAt.ToUniversalTime().Ticks;
Expand Down
Loading

0 comments on commit 0ef8be8

Please sign in to comment.