Skip to content

Commit

Permalink
Separate scoped ConsumeContext through components
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed May 23, 2023
1 parent 1e5959d commit e679624
Show file tree
Hide file tree
Showing 183 changed files with 2,880 additions and 1,323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ protected string CompensateEndpointName
IEndpointDefinition? IActivityDefinition.CompensateEndpointDefinition => CompensateEndpointDefinition;

void IActivityDefinition<TActivity, TArguments, TLog>.Configure(IReceiveEndpointConfigurator endpointConfigurator,
ICompensateActivityConfigurator<TActivity, TLog> compensateActivityConfigurator)
ICompensateActivityConfigurator<TActivity, TLog> compensateActivityConfigurator, IRegistrationContext context)
{
if (ConcurrentMessageLimit.HasValue)
compensateActivityConfigurator.ConcurrentMessageLimit = ConcurrentMessageLimit;

ConfigureCompensateActivity(endpointConfigurator, compensateActivityConfigurator);
ConfigureCompensateActivity(endpointConfigurator, compensateActivityConfigurator, context);
}

string IActivityDefinition.GetCompensateEndpointName(IEndpointNameFormatter formatter)
Expand Down Expand Up @@ -67,5 +68,16 @@ protected virtual void ConfigureCompensateActivity(IReceiveEndpointConfigurator
ICompensateActivityConfigurator<TActivity, TLog> compensateActivityConfigurator)
{
}

/// <summary>
/// Called when the compensate activity is being configured on the endpoint.
/// </summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="compensateActivityConfigurator"></param>
/// <param name="context"></param>
protected virtual void ConfigureCompensateActivity(IReceiveEndpointConfigurator endpointConfigurator,
ICompensateActivityConfigurator<TActivity, TLog> compensateActivityConfigurator, IRegistrationContext context)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ public int? ConcurrentMessageLimit
protected set => _concurrentMessageLimit = value;
}

void IConsumerDefinition<TConsumer>.Configure(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
void IConsumerDefinition<TConsumer>.Configure(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator,
IRegistrationContext context)
{
if (_concurrentMessageLimit.HasValue)
consumerConfigurator.ConcurrentMessageLimit = _concurrentMessageLimit;

ConfigureConsumer(endpointConfigurator, consumerConfigurator);
ConfigureConsumer(endpointConfigurator, consumerConfigurator, context);
}

Type IConsumerDefinition.ConsumerType => typeof(TConsumer);
Expand Down Expand Up @@ -81,5 +83,17 @@ protected void Endpoint(Action<IEndpointRegistrationConfigurator>? configure = n
protected virtual void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator)
{
}

/// <summary>
/// Called when the consumer is being configured on the endpoint. Configuration only applies to this consumer, and does not apply to
/// the endpoint.
/// </summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="consumerConfigurator">The consumer configurator</param>
/// <param name="context"></param>
protected virtual void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator,
IRegistrationContext context)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ public int? ConcurrentMessageLimit
}

void IExecuteActivityDefinition<TActivity, TArguments>.Configure(IReceiveEndpointConfigurator endpointConfigurator,
IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator)
IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator, IRegistrationContext context)
{
if (_concurrentMessageLimit.HasValue)
executeActivityConfigurator.ConcurrentMessageLimit = _concurrentMessageLimit;

ConfigureExecuteActivity(endpointConfigurator, executeActivityConfigurator);
ConfigureExecuteActivity(endpointConfigurator, executeActivityConfigurator, context);
}

string IExecuteActivityDefinition.GetExecuteEndpointName(IEndpointNameFormatter formatter)
Expand Down Expand Up @@ -80,5 +81,16 @@ protected virtual void ConfigureExecuteActivity(IReceiveEndpointConfigurator end
IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator)
{
}

/// <summary>
/// Called when the compensate activity is being configured on the endpoint.
/// </summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="executeActivityConfigurator"></param>
/// <param name="context"></param>
protected virtual void ConfigureExecuteActivity(IReceiveEndpointConfigurator endpointConfigurator,
IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator, IRegistrationContext context)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ public int? ConcurrentMessageLimit
protected set => _concurrentMessageLimit = value;
}

void IFutureDefinition<TFuture>.Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator)
void IFutureDefinition<TFuture>.Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator,
IRegistrationContext context)
{
if (_concurrentMessageLimit.HasValue)
sagaConfigurator.ConcurrentMessageLimit = _concurrentMessageLimit;

ConfigureSaga(endpointConfigurator, sagaConfigurator);
ConfigureSaga(endpointConfigurator, sagaConfigurator, context);
}

Type IFutureDefinition.FutureType => typeof(TFuture);
Expand All @@ -70,6 +72,18 @@ protected virtual void ConfigureSaga(IReceiveEndpointConfigurator endpointConfig
{
}

/// <summary>
/// Called when configuring the saga on the endpoint. Configuration only applies to this saga, and does not apply to
/// the endpoint.
/// </summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="sagaConfigurator">The saga configurator</param>
/// <param name="context"></param>
protected virtual void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator,
IRegistrationContext context)
{
}

/// <summary>
/// Configure the saga endpoint
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface IActivityDefinition<TActivity, TArguments, TLog> :
/// </summary>
/// <param name="endpointConfigurator"></param>
/// <param name="compensateActivityConfigurator"></param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, ICompensateActivityConfigurator<TActivity, TLog> compensateActivityConfigurator);
/// <param name="context"></param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, ICompensateActivityConfigurator<TActivity, TLog> compensateActivityConfigurator,
IRegistrationContext context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface IConsumerDefinition<TConsumer> :
/// </summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="consumerConfigurator">The consumer configurator</param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator);
/// <param name="context"></param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<TConsumer> consumerConfigurator,
IRegistrationContext context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public interface IExecuteActivityDefinition<TActivity, TArguments> :
/// </summary>
/// <param name="endpointConfigurator"></param>
/// <param name="executeActivityConfigurator"></param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator);
/// <param name="context"></param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator,
IRegistrationContext context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public interface IFutureDefinition<TFuture> :
/// <summary>Configure the future on the receive endpoint</summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="sagaConfigurator">The consumer configurator</param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator);
/// <param name="context"></param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator,
IRegistrationContext context);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface IRegistrationContext :
/// <param name="configurator"></param>
/// <param name="configure"></param>
/// <typeparam name="T">The consumer type</typeparam>
void ConfigureConsumer<T>(IReceiveEndpointConfigurator configurator, Action<IConsumerConfigurator<T>> configure = null)
void ConfigureConsumer<T>(IReceiveEndpointConfigurator configurator, Action<IConsumerConfigurator<T>>? configure = null)
where T : class, IConsumer;

/// <summary>
Expand All @@ -45,7 +45,7 @@ void ConfigureConsumer<T>(IReceiveEndpointConfigurator configurator, Action<ICon
/// <param name="configurator"></param>
/// <param name="configure"></param>
/// <typeparam name="T">The saga type</typeparam>
void ConfigureSaga<T>(IReceiveEndpointConfigurator configurator, Action<ISagaConfigurator<T>> configure = null)
void ConfigureSaga<T>(IReceiveEndpointConfigurator configurator, Action<ISagaConfigurator<T>>? configure = null)
where T : class, ISaga;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public interface ISagaDefinition<TSaga> :
/// </summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="sagaConfigurator">The consumer configurator</param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<TSaga> sagaConfigurator);
/// <param name="context"></param>
void Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<TSaga> sagaConfigurator, IRegistrationContext context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ public int? ConcurrentMessageLimit
protected set => _concurrentMessageLimit = value;
}

void ISagaDefinition<TSaga>.Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<TSaga> sagaConfigurator)
void ISagaDefinition<TSaga>.Configure(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<TSaga> sagaConfigurator,
IRegistrationContext context)
{
if (_concurrentMessageLimit.HasValue)
sagaConfigurator.ConcurrentMessageLimit = _concurrentMessageLimit;

ConfigureSaga(endpointConfigurator, sagaConfigurator);
ConfigureSaga(endpointConfigurator, sagaConfigurator, context);
}

Type ISagaDefinition.SagaType => typeof(TSaga);
Expand All @@ -70,6 +72,18 @@ protected virtual void ConfigureSaga(IReceiveEndpointConfigurator endpointConfig
{
}

/// <summary>
/// Called when configuring the saga on the endpoint. Configuration only applies to this saga, and does not apply to
/// the endpoint.
/// </summary>
/// <param name="endpointConfigurator">The receive endpoint configurator for the consumer</param>
/// <param name="sagaConfigurator">The saga configurator</param>
/// <param name="context"></param>
protected virtual void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<TSaga> sagaConfigurator,
IRegistrationContext context)
{
}

/// <summary>
/// Configure the saga endpoint
/// </summary>
Expand Down
3 changes: 2 additions & 1 deletion src/MassTransit.Abstractions/ILoadSagaRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ namespace MassTransit
using System.Threading.Tasks;


public interface ILoadSagaRepository<TSaga>
public interface ILoadSagaRepository<TSaga> :
IProbeSite
where TSaga : class, ISaga
{
Task<TSaga> Load(Guid correlationId);
Expand Down
3 changes: 2 additions & 1 deletion src/MassTransit.Abstractions/Saga/IQuerySagaRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ namespace MassTransit
using System.Threading.Tasks;


public interface IQuerySagaRepository<TSaga>
public interface IQuerySagaRepository<TSaga> :
IProbeSite
where TSaga : class, ISaga
{
Task<IEnumerable<Guid>> Find(ISagaQuery<TSaga> query);
Expand Down
10 changes: 8 additions & 2 deletions src/MassTransit.TestFramework/InMemoryContainerTestFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,16 @@ protected IClientFactory GetClientFactory()
return ServiceProvider.GetRequiredService<IClientFactory>();
}

protected ISagaRepository<T> GetSagaRepository<T>()
protected ILoadSagaRepository<T> GetLoadSagaRepository<T>()
where T : class, ISaga
{
return ServiceProvider.GetRequiredService<ISagaRepository<T>>();
return ServiceProvider.GetRequiredService<ILoadSagaRepository<T>>();
}

protected IQuerySagaRepository<T> GetQuerySagaRepository<T>()
where T : class, ISaga
{
return ServiceProvider.GetRequiredService<IQuerySagaRepository<T>>();
}

protected async Task<Task<ConsumeContext<T>>> ConnectPublishHandler<T>()
Expand Down
18 changes: 8 additions & 10 deletions src/MassTransit/Clients/ClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@ public class ClientFactory :
IClientFactory,
IAsyncDisposable
{
readonly ClientFactoryContext _context;

public ClientFactory(ClientFactoryContext context)
{
_context = context;
Context = context;
}

public ValueTask DisposeAsync()
{
if (_context is IAsyncDisposable asyncDisposable)
if (Context is IAsyncDisposable asyncDisposable)
return asyncDisposable.DisposeAsync();

return default;
}

public ClientFactoryContext Context => _context;
public ClientFactoryContext Context { get; }

public RequestHandle<T> CreateRequest<T>(T message, CancellationToken cancellationToken, RequestTimeout timeout)
where T : class
Expand Down Expand Up @@ -98,7 +96,7 @@ public IRequestClient<T> CreateRequestClient<T>(RequestTimeout timeout)
if (EndpointConvention.TryGetDestinationAddress<T>(out var destinationAddress))
return CreateRequestClient<T>(destinationAddress, timeout);

return new RequestClient<T>(_context, _context.GetRequestEndpoint<T>(), timeout.Or(_context.DefaultTimeout));
return new RequestClient<T>(Context, Context.GetRequestEndpoint<T>(), timeout.Or(Context.DefaultTimeout));
}

public IRequestClient<T> CreateRequestClient<T>(ConsumeContext consumeContext, RequestTimeout timeout)
Expand All @@ -107,21 +105,21 @@ public IRequestClient<T> CreateRequestClient<T>(ConsumeContext consumeContext, R
if (EndpointConvention.TryGetDestinationAddress<T>(out var destinationAddress))
return CreateRequestClient<T>(consumeContext, destinationAddress, timeout);

return new RequestClient<T>(_context, _context.GetRequestEndpoint<T>(consumeContext), timeout.Or(_context.DefaultTimeout));
return new RequestClient<T>(Context, Context.GetRequestEndpoint<T>(consumeContext), timeout.Or(Context.DefaultTimeout));
}

public IRequestClient<T> CreateRequestClient<T>(Uri destinationAddress, RequestTimeout timeout)
where T : class
{
IRequestSendEndpoint<T> requestSendEndpoint = _context.GetRequestEndpoint<T>(destinationAddress);
IRequestSendEndpoint<T> requestSendEndpoint = Context.GetRequestEndpoint<T>(destinationAddress);

return new RequestClient<T>(_context, requestSendEndpoint, timeout.Or(_context.DefaultTimeout));
return new RequestClient<T>(Context, requestSendEndpoint, timeout.Or(Context.DefaultTimeout));
}

public IRequestClient<T> CreateRequestClient<T>(ConsumeContext consumeContext, Uri destinationAddress, RequestTimeout timeout)
where T : class
{
return new RequestClient<T>(_context, _context.GetRequestEndpoint<T>(destinationAddress, consumeContext), timeout.Or(_context.DefaultTimeout));
return new RequestClient<T>(Context, Context.GetRequestEndpoint<T>(destinationAddress, consumeContext), timeout.Or(Context.DefaultTimeout));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace MassTransit.Configuration
{
using System;
using System.Collections.Generic;
using Middleware;
using Middleware.InMemoryOutbox;
Expand All @@ -10,12 +11,25 @@ public class InMemoryCompensateContextOutboxSpecification<TArguments> :
IOutboxConfigurator
where TArguments : class
{
readonly ISetScopedConsumeContext _setter;

public InMemoryCompensateContextOutboxSpecification(IRegistrationContext context)
: this(context as ISetScopedConsumeContext ?? throw new ArgumentException(nameof(context)))
{
}

public InMemoryCompensateContextOutboxSpecification(ISetScopedConsumeContext setter)
{
_setter = setter;
}

public bool ConcurrentMessageDelivery { get; set; }

public void Apply(IPipeBuilder<CompensateContext<TArguments>> builder)
{
builder.AddFilter(
new InMemoryOutboxFilter<CompensateContext<TArguments>, InMemoryOutboxCompensateContext<TArguments>>(Factory, ConcurrentMessageDelivery));
new InMemoryOutboxFilter<CompensateContext<TArguments>, InMemoryOutboxCompensateContext<TArguments>>(_setter, Factory,
ConcurrentMessageDelivery));
}

public IEnumerable<ValidationResult> Validate()
Expand Down
Loading

0 comments on commit e679624

Please sign in to comment.