Skip to content

Commit

Permalink
Additional backward changes
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed May 23, 2023
1 parent a04955c commit d68bfdc
Show file tree
Hide file tree
Showing 17 changed files with 153 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@ public class OutboxConsumePipeSpecificationObserver<TContext> :
where TContext : class
{
readonly IReceiveEndpointConfigurator _configurator;
readonly IRegistrationContext _context;
readonly IServiceProvider _serviceProvider;
readonly ISetScopedConsumeContext _setter;

public OutboxConsumePipeSpecificationObserver(IReceiveEndpointConfigurator configurator, IRegistrationContext context)
: this(configurator, context, context as ISetScopedConsumeContext ?? throw new ArgumentException(nameof(context)))
{
}

public OutboxConsumePipeSpecificationObserver(IReceiveEndpointConfigurator configurator, IServiceProvider serviceProvider,
ISetScopedConsumeContext setter)
{
_configurator = configurator;
_context = context;
_serviceProvider = serviceProvider;
_setter = setter;

MessageDeliveryLimit = 1;
MessageDeliveryTimeout = TimeSpan.FromSeconds(30);
Expand Down Expand Up @@ -68,7 +76,7 @@ void AddScopedFilter<T, TMessage>(IPipeConfigurator<ConsumeContext<TMessage>> me
where T : class
where TMessage : class
{
var scopeProvider = new ConsumeScopeProvider(_context);
var scopeProvider = new ConsumeScopeProvider(_serviceProvider, _setter);

var options = new OutboxConsumeOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,25 @@ public static void UseInMemoryInboxOutbox(this IReceiveEndpointConfigurator conf
configurator.ConnectConsumerConfigurationObserver(observer);
configurator.ConnectSagaConfigurationObserver(observer);
}

/// <summary>
/// Includes a combination inbox/outbox in the consume pipeline, which stores outgoing messages in memory until
/// the message consumer completes.
/// </summary>
/// <param name="configurator"></param>
/// <param name="provider">Configuration service provider</param>
public static void UseInMemoryInboxOutbox(this IReceiveEndpointConfigurator configurator, IServiceProvider provider)
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
if (provider == null)
throw new ArgumentNullException(nameof(provider));

var observer = new OutboxConsumePipeSpecificationObserver<InMemoryOutboxMessageRepository>(configurator, provider,
LegacySetScopedConsumeContext.Instance);

configurator.ConnectConsumerConfigurationObserver(observer);
configurator.ConnectSagaConfigurationObserver(observer);
}
}
}
45 changes: 45 additions & 0 deletions src/MassTransit/Configuration/JobServiceConfigurationExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@ namespace MassTransit

public static class JobServiceConfigurationExtensions
{
/// <summary>
/// Configures support for job consumers on the service instance, which supports executing long-running jobs without blocking the consumer pipeline.
/// Job consumers use multiple state machines to track jobs, each of which runs on its own dedicated receive endpoint. Multiple service
/// instances will use the competing consumer pattern, so a shared saga repository should be configured.
/// </summary>
/// <typeparam name="T">The transport receive endpoint configurator type</typeparam>
/// <param name="configurator">The service instance</param>
/// <param name="configure"></param>
/// <param name="context"></param>
public static IServiceInstanceConfigurator<T> ConfigureJobServiceEndpoints<T>(this IServiceInstanceConfigurator<T> configurator,
IRegistrationContext context, Action<IJobServiceConfigurator> configure = default)
where T : IReceiveEndpointConfigurator
{
var jobServiceConfigurator = new JobServiceConfigurator<T>(configurator);

configure?.Invoke(jobServiceConfigurator);

jobServiceConfigurator.ConfigureJobServiceEndpoints(context);

return configurator;
}

/// <summary>
/// Configures support for job consumers on the service instance, which supports executing long-running jobs without blocking the consumer pipeline.
/// Job consumers use multiple state machines to track jobs, each of which runs on its own dedicated receive endpoint. Multiple service
Expand All @@ -27,6 +49,29 @@ public static IServiceInstanceConfigurator<T> ConfigureJobServiceEndpoints<T>(th
return configurator;
}

/// <summary>
/// Configures support for job consumers on the service instance, which supports executing long-running jobs without blocking the consumer pipeline.
/// Job consumers use multiple state machines to track jobs, each of which runs on its own dedicated receive endpoint. Multiple service
/// instances will use the competing consumer pattern, so a shared saga repository should be configured.
/// </summary>
/// <typeparam name="T">The transport receive endpoint configurator type</typeparam>
/// <param name="configurator">The service instance</param>
/// <param name="options"></param>
/// <param name="context"></param>
/// <param name="configure"></param>
public static IServiceInstanceConfigurator<T> ConfigureJobServiceEndpoints<T>(this IServiceInstanceConfigurator<T> configurator,
JobServiceOptions options, IRegistrationContext context, Action<IJobServiceConfigurator> configure = default)
where T : IReceiveEndpointConfigurator
{
var jobServiceConfigurator = new JobServiceConfigurator<T>(configurator, options);

configure?.Invoke(jobServiceConfigurator);

jobServiceConfigurator.ConfigureJobServiceEndpoints(context);

return configurator;
}

/// <summary>
/// Configures support for job consumers on the service instance, which supports executing long-running jobs without blocking the consumer pipeline.
/// Job consumers use multiple state machines to track jobs, each of which runs on its own dedicated receive endpoint. Multiple service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static void ConfigureServiceEndpoints<T>(this IBusFactoryConfigurator<T>
configurator.ServiceInstance(options, instanceConfigurator =>
{
if (options.TryGetOptions(out JobServiceOptions jobServiceOptions))
instanceConfigurator.ConfigureJobServiceEndpoints(jobServiceOptions);
instanceConfigurator.ConfigureJobServiceEndpoints(jobServiceOptions, registration);

registration.ConfigureEndpoints(instanceConfigurator, instanceConfigurator.EndpointNameFormatter, configureFilter);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ public class DefaultFutureDefinition<TFuture> :
FutureDefinition<TFuture>
where TFuture : class, SagaStateMachine<FutureState>
{
protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator)
protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseDelayedRedelivery(r => r.Intervals(5000, 30000, 120000));
endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500));
endpointConfigurator.UseInMemoryOutbox();
endpointConfigurator.UseInMemoryOutbox(context);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ public RequestConsumerFutureDefinition(IConsumerDefinition<TConsumer> consumerDe
_requestDefinition?.RequestAddress ??
throw new ConfigurationException($"The consumer definition was not a FutureConsumerDefinition: {TypeCache<TConsumer>.ShortName}");

protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator)
protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator, ISagaConfigurator<FutureState> sagaConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(100, 200, 500, 1000, 5000, 10000));
endpointConfigurator.UseInMemoryOutbox();
endpointConfigurator.UseInMemoryOutbox(context);
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/MassTransit/JobService/Configuration/JobServiceConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,24 @@ public IEnumerable<ValidationResult> Validate()
return turnoutOptions.Validate();
}

public void ConfigureJobServiceEndpoints()
public void ConfigureJobServiceEndpoints(IRegistrationContext context = null)
{
if (_endpointsConfigured)
return;

void UseInMemoryOutbox(IReceiveEndpointConfigurator configurator)
{
if (context == null)
configurator.UseInMemoryOutbox();
else
configurator.UseInMemoryOutbox(context);
}

_busConfigurator.ReceiveEndpoint(_options.JobStateSagaEndpointName, e =>
{
e.UseMessageRetry(r => r.Intervals(100, 1000, 2000, 5000));
e.UseInMemoryOutbox();

UseInMemoryOutbox(e);

if (_options.SagaPartitionCount.HasValue)
{
Expand Down Expand Up @@ -186,7 +195,8 @@ public void ConfigureJobServiceEndpoints()
_busConfigurator.ReceiveEndpoint(_options.JobAttemptSagaEndpointName, e =>
{
e.UseMessageRetry(r => r.Intervals(100, 1000, 2000, 5000));
e.UseInMemoryOutbox();

UseInMemoryOutbox(e);

if (_options.SagaPartitionCount.HasValue)
{
Expand Down Expand Up @@ -217,7 +227,8 @@ public void ConfigureJobServiceEndpoints()
_busConfigurator.ReceiveEndpoint(_options.JobTypeSagaEndpointName, e =>
{
e.UseMessageRetry(r => r.Intervals(100, 200, 300, 500, 1000, 2000, 5000));
e.UseInMemoryOutbox();

UseInMemoryOutbox(e);

if (_options.SagaPartitionCount.HasValue)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace MassTransit
{
using System;
using Configuration;
using DependencyInjection;
using EntityFrameworkCoreIntegration;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
Expand Down Expand Up @@ -50,6 +51,29 @@ public static void UseEntityFrameworkOutbox<TDbContext>(this IReceiveEndpointCon
configurator.ConnectSagaConfigurationObserver(observer);
}

/// <summary>
/// Configure the Entity Framework outbox on the receive endpoint
/// </summary>
/// <param name="configurator"></param>
/// <param name="provider">Configuration service provider</param>
/// <param name="configure"></param>
public static void UseEntityFrameworkOutbox<TDbContext>(this IReceiveEndpointConfigurator configurator, IServiceProvider provider,
Action<IOutboxOptionsConfigurator>? configure = null)
where TDbContext : DbContext
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
if (provider == null)
throw new ArgumentNullException(nameof(provider));

var observer = new OutboxConsumePipeSpecificationObserver<TDbContext>(configurator, provider, LegacySetScopedConsumeContext.Instance);

configure?.Invoke(observer);

configurator.ConnectConsumerConfigurationObserver(observer);
configurator.ConnectSagaConfigurationObserver(observer);
}

/// <summary>
/// Configure the outbox for use with SQL Server
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace MassTransit
{
using System;
using Configuration;
using DependencyInjection;
using MongoDbIntegration;


Expand Down Expand Up @@ -44,5 +45,27 @@ public static void UseMongoDbOutbox(this IReceiveEndpointConfigurator configurat
configurator.ConnectConsumerConfigurationObserver(observer);
configurator.ConnectSagaConfigurationObserver(observer);
}

/// <summary>
/// Configure the Entity Framework outbox on the receive endpoint
/// </summary>
/// <param name="configurator"></param>
/// <param name="provider">Configuration service provider</param>
/// <param name="configure"></param>
public static void UseMongoDbOutbox(this IReceiveEndpointConfigurator configurator, IServiceProvider provider,
Action<IOutboxOptionsConfigurator>? configure = null)
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
if (provider == null)
throw new ArgumentNullException(nameof(provider));

var observer = new OutboxConsumePipeSpecificationObserver<MongoDbContext>(configurator, provider, LegacySetScopedConsumeContext.Instance);

configure?.Invoke(observer);

configurator.ConnectConsumerConfigurationObserver(observer);
configurator.ConnectSagaConfigurationObserver(observer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public PauseScheduledRecurringMessageConsumerDefinition(HangfireEndpointDefiniti
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<PauseScheduledRecurringMessageConsumer> consumerConfigurator)
IConsumerConfigurator<PauseScheduledRecurringMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Message<PauseScheduledRecurringMessage>(m =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ResumeScheduledRecurringMessageConsumerDefinition(HangfireEndpointDefinit
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ResumeScheduledRecurringMessageConsumer> consumerConfigurator)
IConsumerConfigurator<ResumeScheduledRecurringMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Message<ResumeScheduledRecurringMessage>(m =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ScheduleMessageConsumerDefinition(HangfireEndpointDefinition endpointDefi
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ScheduleMessageConsumer> consumerConfigurator)
IConsumerConfigurator<ScheduleMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r.Interval(5, 250));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ScheduleRecurringMessageConsumerDefinition(HangfireEndpointDefinition end
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ScheduleRecurringMessageConsumer> consumerConfigurator)
IConsumerConfigurator<ScheduleRecurringMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Message<ScheduleRecurringMessage>(m =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public CancelScheduledMessageConsumerDefinition(QuartzEndpointDefinition endpoin
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CancelScheduledMessageConsumer> consumerConfigurator)
IConsumerConfigurator<CancelScheduledMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Message<CancelScheduledMessage>(m => m.UsePartitioner(_endpointDefinition.Partition, p => p.Message.TokenId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public PauseScheduledMessageConsumerDefinition(QuartzEndpointDefinition endpoint
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<PauseScheduledMessageConsumer> consumerConfigurator)
IConsumerConfigurator<PauseScheduledMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Message<PauseScheduledRecurringMessage>(m =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ResumeScheduledMessageConsumerDefinition(QuartzEndpointDefinition endpoin
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ResumeScheduledMessageConsumer> consumerConfigurator)
IConsumerConfigurator<ResumeScheduledMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Message<ResumeScheduledRecurringMessage>(m =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ScheduleMessageConsumerDefinition(QuartzEndpointDefinition endpointDefini
}

protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ScheduleMessageConsumer> consumerConfigurator)
IConsumerConfigurator<ScheduleMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
endpointConfigurator.UseMessageRetry(r => r.Interval(5, 250));

Expand Down

0 comments on commit d68bfdc

Please sign in to comment.