Skip to content

Commit

Permalink
Force optimistic concurrency checks when using Marten to avoid upsert…
Browse files Browse the repository at this point in the history
…s that overwrite changed data
  • Loading branch information
phatboyg committed May 22, 2023
1 parent 4873759 commit 2cc64eb
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public MartenSagaRepositoryStoreOptionsConfigurator(Action<MartenRegistry.Docume
public void Configure(IServiceProvider services, StoreOptions options)
{
MartenRegistry.DocumentMappingExpression<TSaga> mappingExpression =
options.Schema.For<TSaga>().Identity(x => x.CorrelationId).IdStrategy(new NoOpIdGeneration());
options.Schema.For<TSaga>().Identity(x => x.CorrelationId).IdStrategy(new NoOpIdGeneration()).UseOptimisticConcurrency(true);

_configure?.Invoke(mappingExpression);
}
Expand Down
158 changes: 158 additions & 0 deletions tests/MassTransit.MartenIntegration.Tests/Concurrency_Specs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
namespace MassTransit.MartenIntegration.Tests
{
using System;
using System.Threading.Tasks;
using ConcurrentSagaTypes;
using Microsoft.Extensions.DependencyInjection;
using NUnit.Framework;
using Testing;


[TestFixture]
public class When_consuming_concurrent_messages_for_the_same_saga_instance
{
[Test]
public async Task Should_properly_transition_the_state()
{
await using var provider = new ServiceCollection()
.AddMassTransitTestHarness(x =>
{
x.SetTestTimeouts(testInactivityTimeout: TimeSpan.FromSeconds(5));

x.AddHandler(async (ConsumeContext<Running> context) =>
{
await Task.WhenAll(
context.Publish(new RunningA(context.Message.CorrelationId)),
context.Publish(new RunningB(context.Message.CorrelationId)));
});

x.AddHandler(async (ConsumeContext<Completing> context) =>
{
await Task.WhenAll(
context.Publish(new CompletingA(context.Message.CorrelationId)),
context.Publish(new CompletingB(context.Message.CorrelationId)));
});

x.AddSagaStateMachine<TransactionStateMachine, TransactionState>()
.MartenRepository("server=localhost;port=5432;database=MartenTest;user id=postgres;password=Password12!;", r =>
{
r.CreateDatabasesForTenants(c =>
{
c.ForTenant()
.CheckAgainstPgDatabase()
.WithOwner("postgres")
.WithEncoding("UTF-8")
.ConnectionLimit(-1);
});
});

x.UsingInMemory((context, cfg) =>
{
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseMessageScope(context);
cfg.UseInMemoryOutbox(context);

cfg.ConfigureEndpoints(context);
});
})
.BuildServiceProvider(true);

var harness = provider.GetTestHarness();

await harness.Start();

Guid correlationId = NewId.NextGuid();

await harness.Bus.Publish(new Started(correlationId));

Assert.That(await harness.Published.Any<Completed>(x => x.Context.Message.CorrelationId == correlationId));
}
}


namespace ConcurrentSagaTypes
{
using System;


public record Started(Guid CorrelationId);


public record Running(Guid CorrelationId);


public record RunningA(Guid CorrelationId);


public record RunningB(Guid CorrelationId);


public record Completing(Guid CorrelationId);


public record CompletingA(Guid CorrelationId);


public record CompletingB(Guid CorrelationId);


public record Completed(Guid CorrelationId);


public class TransactionState :
SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public int State { get; set; }

public int RunningStatus { get; set; }
public int CompletingStatus { get; set; }
}


public class TransactionStateMachine :
MassTransitStateMachine<TransactionState>
{
public TransactionStateMachine()
{
InstanceState(x => x.State);

Initially(
When(Started)
.TransitionTo(Running)
.Publish(context => new Running(context.Message.CorrelationId))
);

During(Running,
When(RunningFinished)
.TransitionTo(Completing)
.Publish(context => new Completing(context.Saga.CorrelationId))
);

During(Completing,
When(CompletingFinished)
.TransitionTo(Completed)
.Publish(context => new Completed(context.Saga.CorrelationId))
);

CompositeEvent(() => RunningFinished, x => x.RunningStatus, RunningA, RunningB);
CompositeEvent(() => CompletingFinished, x => x.CompletingStatus, CompletingA, CompletingB);

SetCompletedWhenFinalized();
}

// ReSharper disable UnassignedGetOnlyAutoProperty
public State Running { get; }
public State Completing { get; }
public State Completed { get; }

public Event<Started> Started { get; }
public Event<RunningA> RunningA { get; }
public Event<RunningB> RunningB { get; }
public Event RunningFinished { get; }
public Event<CompletingA> CompletingA { get; }
public Event<CompletingB> CompletingB { get; }
public Event CompletingFinished { get; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<LangVersion>10</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down

0 comments on commit 2cc64eb

Please sign in to comment.