Skip to content

Commit

Permalink
Upgrade guidance for Scoped ConsumeContext
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed May 23, 2023
1 parent d68bfdc commit 5a9f3d2
Show file tree
Hide file tree
Showing 20 changed files with 109 additions and 38 deletions.
19 changes: 19 additions & 0 deletions doc/content/4.support/4.upgrade.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# Upgrading

## Version 8.1

MassTransit version 8.1 is focused on improving cross-component integration between various components like the (mediator <-> bus, bus1 <-> bus2, etc). In previous versions of MassTransit, the `ConsumeContext` was used to send messages. This approach worked well for a long time, but as more components like the Mediator, MultiBus, and Riders became available, issues arose with resolving the correct `ConsumeContext`.

To address this issue, MassTransit v8.1 introduces a new capability to keep track of the owning component of the `ConsumeContext`. When the `ConsumeContext` is owned by another component, the library only copies necessary data such as headers, payloads, and source address. This change opens up the possibility of consuming message by the Mediator and sending it directly to the bus by resolving `IPublishEndpoint` or `ISendEndpointProvider`.

As this is a minor release, we have made every effort to ensure minimal impact on existing customer integrations. However, to use this capability, small changes are required. Previously, `IServiceProvider` was used as a parameter to most configuration methods, with this change `IRegistrationContext` should be used instead.

### Sagas
In MassTransit v8.1, the registration of `ISagaRepository<TSaga>` in the container has been updated. Previously, this interface was responsible for both retrieving and querying sagas from the repository. With this release, we have decided to separate these responsibilities, resulting in the registering of two additional interfaces in container:

- `ILoadSagaRepository<TSaga>` - should be used to load sagas by id.
- `IQuerySagaRepository<TSaga>` - should be used to query saga ids by expression.

Both of these interfaces are registered in the container as singletons.
::alert{type="warning"}
The registration `ISagaRepository<TSaga>` will be removed from the container, so it is recommended to start using these new interfaces instead.
::

## Version 8

MassTransit v8 is the first major release since the availability of .NET 6. MassTransit v8 works a significant portion of the underlying components into a more manageable solution structure. Focused on the developer experience, while maintaining compatibility with previous versions, this release brings together the entire MassTransit stack.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public static void UseServiceScope(this IConsumePipeConfigurator configurator, I
configurator.AddPrePipeSpecification(specification);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Creates a single scope for the receive endpoint that is used by all consumers, sagas, messages, etc.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public static void Consumer<T>(this IReceiveEndpointConfigurator configurator, I
configurator.Consumer(consumerFactory, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Registers a consumer given the lifetime scope specified
/// </summary>
Expand Down Expand Up @@ -67,6 +69,8 @@ public static void Consumer<TConsumer, TMessage>(this IBatchConfigurator<TMessag
configurator.Consumer(consumerFactory, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Connect a consumer with a consumer factory method
/// </summary>
Expand Down Expand Up @@ -110,6 +114,8 @@ public static ConnectHandle ConnectConsumer<TConsumer>(this IConsumePipeConnecto
return connector.ConnectConsumer(consumerFactory, pipeSpecifications);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Connect a consumer to the bus/mediator
/// </summary>
Expand Down Expand Up @@ -149,6 +155,8 @@ public static void Saga<T>(this IReceiveEndpointConfigurator configurator, IRegi
configurator.Saga(repository, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Registers a saga using the container that has the repository resolved from the container
/// </summary>
Expand Down Expand Up @@ -183,6 +191,8 @@ public static void StateMachineSaga<TInstance>(this IReceiveEndpointConfigurator
configurator.StateMachineSaga(stateMachine, repository, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Subscribe a state machine saga to the endpoint
/// </summary>
Expand Down Expand Up @@ -219,6 +229,8 @@ public static void StateMachineSaga<TInstance>(this IReceiveEndpointConfigurator
configurator.StateMachineSaga(stateMachine, repository, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Subscribe a state machine saga to the endpoint
/// </summary>
Expand Down Expand Up @@ -250,6 +262,8 @@ public static void ExecuteActivityHost<TActivity, TArguments>(this IReceiveEndpo
configurator.ExecuteActivityHost(compensateAddress, factory, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
public static void ExecuteActivityHost<TActivity, TArguments>(this IReceiveEndpointConfigurator configurator, Uri compensateAddress,
IServiceProvider provider, Action<IExecuteActivityConfigurator<TActivity, TArguments>> configure = null)
where TActivity : class, IExecuteActivity<TArguments>
Expand All @@ -274,6 +288,8 @@ public static void ExecuteActivityHost<TActivity, TArguments>(this IReceiveEndpo
configurator.ExecuteActivityHost(factory, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
public static void ExecuteActivityHost<TActivity, TArguments>(this IReceiveEndpointConfigurator configurator, IServiceProvider provider,
Action<IExecuteActivityConfigurator<TActivity, TArguments>> configure = null)
where TActivity : class, IExecuteActivity<TArguments>
Expand All @@ -298,6 +314,8 @@ public static void CompensateActivityHost<TActivity, TLog>(this IReceiveEndpoint
configurator.CompensateActivityHost(factory, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
public static void CompensateActivityHost<TActivity, TLog>(this IReceiveEndpointConfigurator configurator, IServiceProvider provider,
Action<ICompensateActivityConfigurator<TActivity, TLog>> configure = null)
where TActivity : class, ICompensateActivity<TLog>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public static void UseInMemoryOutbox<T>(this IPipeConfigurator<ConsumeContext<T>
configurator.AddPipeSpecification(specification);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Includes an outbox in the consume filter path, which delays outgoing messages until the return path
/// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be
Expand Down Expand Up @@ -70,6 +72,8 @@ public static void UseInMemoryOutbox(this IConsumePipeConfigurator configurator,
var observer = new InMemoryOutboxConfigurationObserver(context, configurator, configure);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Includes an outbox in the consume filter path, which delays outgoing messages until the return path
/// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be
Expand Down Expand Up @@ -104,6 +108,8 @@ public static void UseInMemoryOutbox<TConsumer>(this IConsumerConfigurator<TCons
configurator.ConnectConsumerConfigurationObserver(observer);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Includes an outbox in the consume filter path, which delays outgoing messages until the return path
/// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be
Expand Down Expand Up @@ -140,6 +146,8 @@ public static void UseInMemoryOutbox<TSaga>(this ISagaConfigurator<TSaga> config
configurator.ConnectSagaConfigurationObserver(observer);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Includes an outbox in the consume filter path, which delays outgoing messages until the return path
/// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be
Expand Down Expand Up @@ -176,6 +184,8 @@ public static void UseInMemoryOutbox<TMessage>(this IHandlerConfigurator<TMessag
configurator.ConnectHandlerConfigurationObserver(observer);
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Includes an outbox in the consume filter path, which delays outgoing messages until the return path
/// of the pipeline returns to the outbox filter. At this point, the message execution pipeline should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public static IServiceInstanceConfigurator<T> ConfigureJobServiceEndpoints<T>(th
return configurator;
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Configures support for job consumers on the service instance, which supports executing long-running jobs without blocking the consumer pipeline.
/// Job consumers use multiple state machines to track jobs, each of which runs on its own dedicated receive endpoint. Multiple service
Expand Down Expand Up @@ -72,6 +74,8 @@ public static IServiceInstanceConfigurator<T> ConfigureJobServiceEndpoints<T>(th
return configurator;
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Configures support for job consumers on the service instance, which supports executing long-running jobs without blocking the consumer pipeline.
/// Job consumers use multiple state machines to track jobs, each of which runs on its own dedicated receive endpoint. Multiple service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public static IJobServiceConfigurator ConfigureSagaRepositories(this IJobService
return configurator;
}

[Obsolete(
"Use the IRegistrationContext overload to ensure message scope is properly handled. For more information, visit https://masstransit.io/support/upgrade#version-8.1")]
/// <summary>
/// Configure the job server saga repositories to resolve from the container.
/// </summary>
Expand Down
8 changes: 4 additions & 4 deletions src/MassTransit/Testing/ExtensionMethodsForSagas.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static class ExtensionMethodsForSagas
return default;
}

static async Task<Guid?> ShouldContainSaga<TSaga>(this IQuerySagaRepository<TSaga> repository, Guid correlationId, TimeSpan timeout)
public static async Task<Guid?> ShouldContainSaga<TSaga>(this IQuerySagaRepository<TSaga> repository, Guid correlationId, TimeSpan timeout)
where TSaga : class, ISaga
{
var giveUpAt = DateTime.Now + timeout;
Expand Down Expand Up @@ -69,7 +69,7 @@ public static class ExtensionMethodsForSagas
return TaskUtil.Faulted<Guid?>(new ArgumentException("Does not support IQuerySagaRepository", nameof(repository)));
}

static async Task<Guid?> ShouldContainSaga<TSaga>(this ILoadSagaRepository<TSaga> repository, Guid correlationId, Func<TSaga, bool> condition,
public static async Task<Guid?> ShouldContainSaga<TSaga>(this ILoadSagaRepository<TSaga> repository, Guid correlationId, Func<TSaga, bool> condition,
TimeSpan timeout)
where TSaga : class, ISaga
{
Expand Down Expand Up @@ -99,7 +99,7 @@ public static class ExtensionMethodsForSagas
return TaskUtil.Faulted<Guid?>(new ArgumentException("Does not support IQuerySagaRepository", nameof(repository)));
}

static async Task<Guid?> ShouldNotContainSaga<TSaga>(this ILoadSagaRepository<TSaga> repository, Guid correlationId, TimeSpan timeout)
public static async Task<Guid?> ShouldNotContainSaga<TSaga>(this ILoadSagaRepository<TSaga> repository, Guid correlationId, TimeSpan timeout)
where TSaga : class, ISaga
{
var giveUpAt = DateTime.Now + timeout;
Expand All @@ -119,7 +119,7 @@ public static class ExtensionMethodsForSagas
return instance.CorrelationId;
}

static async Task<Guid?> ShouldNotContainSaga<TSaga>(this IQuerySagaRepository<TSaga> repository, Guid correlationId, TimeSpan timeout)
public static async Task<Guid?> ShouldNotContainSaga<TSaga>(this IQuerySagaRepository<TSaga> repository, Guid correlationId, TimeSpan timeout)
where TSaga : class, ISaga
{
var giveUpAt = DateTime.Now + timeout;
Expand Down
32 changes: 15 additions & 17 deletions src/MassTransit/Testing/Implementations/BaseSagaTestHarness.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ namespace MassTransit.Testing.Implementations
public abstract class BaseSagaTestHarness<TSaga>
where TSaga : class, ISaga
{
protected BaseSagaTestHarness(ISagaRepository<TSaga> repository, TimeSpan testTimeout)
protected BaseSagaTestHarness(IQuerySagaRepository<TSaga> querySagaRepository, ILoadSagaRepository<TSaga> loadSagaRepository, TimeSpan testTimeout)
{
QuerySagaRepository = repository as IQuerySagaRepository<TSaga>;
QuerySagaRepository = querySagaRepository;
LoadSagaRepository = loadSagaRepository;

TestTimeout = testTimeout;
}

protected TimeSpan TestTimeout { get; }

protected IQuerySagaRepository<TSaga> QuerySagaRepository { get; }
protected ILoadSagaRepository<TSaga> LoadSagaRepository { get; }

/// <summary>
/// Waits until a saga exists with the specified correlationId
Expand All @@ -30,18 +32,16 @@ protected BaseSagaTestHarness(ISagaRepository<TSaga> repository, TimeSpan testTi
/// <returns></returns>
public async Task<Guid?> Exists(Guid correlationId, TimeSpan? timeout = default)
{
if (QuerySagaRepository == null)
throw new InvalidOperationException("The repository does not support Query operations");
if (LoadSagaRepository == null)
throw new InvalidOperationException("The repository does not support Load operations");

var giveUpAt = DateTime.Now + (timeout ?? TestTimeout);

var query = new SagaQuery<TSaga>(x => x.CorrelationId == correlationId);

while (DateTime.Now < giveUpAt)
{
var saga = (await QuerySagaRepository.Find(query).ConfigureAwait(false)).FirstOrDefault();
if (saga != Guid.Empty)
return saga;
var saga = await LoadSagaRepository.Load(correlationId).ConfigureAwait(false);
if (saga != null)
return saga.CorrelationId;

await Task.Delay(10).ConfigureAwait(false);
}
Expand Down Expand Up @@ -84,24 +84,22 @@ public async Task<IList<Guid>> Match(Expression<Func<TSaga, bool>> filter, TimeS
/// <returns></returns>
public async Task<Guid?> NotExists(Guid correlationId, TimeSpan? timeout = default)
{
if (QuerySagaRepository == null)
throw new InvalidOperationException("The repository does not support Query operations");
if (LoadSagaRepository == null)
throw new InvalidOperationException("The repository does not support Load operations");

var giveUpAt = DateTime.Now + (timeout ?? TestTimeout);

var query = new SagaQuery<TSaga>(x => x.CorrelationId == correlationId);

Guid? saga = default;
TSaga saga = default;
while (DateTime.Now < giveUpAt)
{
saga = (await QuerySagaRepository.Find(query).ConfigureAwait(false)).FirstOrDefault();
if (saga == Guid.Empty)
saga = await LoadSagaRepository.Load(correlationId).ConfigureAwait(false);
if (saga == null)
return default;

await Task.Delay(10).ConfigureAwait(false);
}

return saga;
return saga.CorrelationId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ public class RegistrationSagaStateMachineTestHarness<TStateMachine, TInstance> :
where TInstance : class, SagaStateMachineInstance
where TStateMachine : SagaStateMachine<TInstance>
{
public RegistrationSagaStateMachineTestHarness(ISagaRepositoryDecoratorRegistration<TInstance> registration, ISagaRepository<TInstance> repository,
TStateMachine stateMachine)
: base(repository, registration.TestTimeout)
public RegistrationSagaStateMachineTestHarness(ISagaRepositoryDecoratorRegistration<TInstance> registration,
IQuerySagaRepository<TInstance> querySagaRepository, ILoadSagaRepository<TInstance> loadSagaRepository, TStateMachine stateMachine)
: base(querySagaRepository, loadSagaRepository, registration.TestTimeout)
{
StateMachine = stateMachine;
Consumed = registration.Consumed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ public class RegistrationSagaTestHarness<TSaga> :
ISagaTestHarness<TSaga>
where TSaga : class, ISaga
{
public RegistrationSagaTestHarness(ISagaRepositoryDecoratorRegistration<TSaga> registration, ISagaRepository<TSaga> repository)
: base(repository, registration.TestTimeout)
public RegistrationSagaTestHarness(ISagaRepositoryDecoratorRegistration<TSaga> registration, ISagaRepository<TSaga> repository,
ILoadSagaRepository<TSaga> loadRepository, IQuerySagaRepository<TSaga> queryRepository)
: base(queryRepository, loadRepository, registration.TestTimeout)
{
Consumed = registration.Consumed;
Created = registration.Created;
Expand Down
Loading

0 comments on commit 5a9f3d2

Please sign in to comment.