Skip to content

Commit

Permalink
MT-88: Use Compiled queries where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
NooNameR authored and phatboyg committed May 23, 2023
1 parent e679624 commit 164e146
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 84 deletions.
11 changes: 11 additions & 0 deletions src/MassTransit/Internals/Extensions/AsyncEnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace MassTransit.Internals
{
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;


Expand All @@ -15,5 +16,15 @@ public static async Task<IList<TElement>> ToListAsync<TElement>(this IAsyncEnume

return elementsList;
}

public static async Task<IList<TElement>> ToListAsync<TElement>(this IAsyncEnumerable<TElement> elements, CancellationToken cancellationToken)
where TElement : class
{
var elementsList = new List<TElement>();
await foreach (var element in elements.WithCancellation(cancellationToken).ConfigureAwait(false))
elementsList.Add(element);

return elementsList;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,9 @@ static void CheckContextConstructors<TContext>()

ISagaRepositoryLockStrategy<TSaga> CreateOptimisticLockStrategy()
{
ILoadQueryProvider<TSaga> queryProvider = new DefaultSagaLoadQueryProvider<TSaga>();
if (_queryCustomization != null)
queryProvider = new CustomSagaLoadQueryProvider<TSaga>(queryProvider, _queryCustomization);
var queryExecutor = new OptimisticLoadQueryExecutor<TSaga>(_queryCustomization);

var queryExecutor = new OptimisticLoadQueryExecutor<TSaga>(queryProvider);

return new OptimisticSagaRepositoryLockStrategy<TSaga>(queryProvider, queryExecutor, _isolationLevel);
return new OptimisticSagaRepositoryLockStrategy<TSaga>(queryExecutor, _queryCustomization, _isolationLevel);
}

ISagaRepositoryLockStrategy<TSaga> CreatePessimisticLockStrategy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace MassTransit.EntityFrameworkCoreIntegration
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Internals;
using Logging;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
Expand All @@ -29,7 +30,9 @@ public class BusOutboxDeliveryService<TDbContext> :
readonly ILogger _logger;
readonly IBusOutboxNotification _notification;
readonly OutboxDeliveryServiceOptions _options;
readonly Func<TDbContext, Guid, long, int, IAsyncEnumerable<OutboxMessage>> _outboxMessagesQuery;
readonly IServiceProvider _provider;

string _getOutboxIdStatement;

public BusOutboxDeliveryService(IBusControl busControl, IOptions<OutboxDeliveryServiceOptions> options,
Expand All @@ -45,6 +48,13 @@ public BusOutboxDeliveryService(IBusControl busControl, IOptions<OutboxDeliveryS

_lockStatementProvider = outboxOptions.Value.LockStatementProvider;
_isolationLevel = outboxOptions.Value.IsolationLevel;

_outboxMessagesQuery = EF.CompileAsyncQuery((TDbContext context, Guid outboxId, long lastSequenceNumber, int limit) =>
context.Set<OutboxMessage>()
.Where(x => x.OutboxId == outboxId && x.SequenceNumber > lastSequenceNumber)
.OrderBy(x => x.SequenceNumber)
.Take(limit)
.AsNoTracking());
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand Down Expand Up @@ -213,12 +223,9 @@ async Task<int> DeliverOutboxMessages(TDbContext dbContext, OutboxState outboxSt

var lastSequenceNumber = outboxState.LastSequenceNumber ?? 0;

List<OutboxMessage> messages = await dbContext.Set<OutboxMessage>()
.Where(x => x.OutboxId == outboxState.OutboxId && x.SequenceNumber > lastSequenceNumber)
.OrderBy(x => x.SequenceNumber)
.Take(messageLimit)
.AsNoTracking()
.ToListAsync(cancellationToken).ConfigureAwait(false);
IList<OutboxMessage> messages = await _outboxMessagesQuery(dbContext, outboxState.OutboxId, lastSequenceNumber, messageLimit)
.ToListAsync(cancellationToken)
.ConfigureAwait(false);

var sentSequenceNumber = 0L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ public static class EntityFrameworkSagaRepository<TSaga>
public static ISagaRepository<TSaga> CreateOptimistic(ISagaDbContextFactory<TSaga> dbContextFactory,
Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization = null)
{
ILoadQueryProvider<TSaga> queryProvider = new DefaultSagaLoadQueryProvider<TSaga>();
if (queryCustomization != null)
queryProvider = new CustomSagaLoadQueryProvider<TSaga>(queryProvider, queryCustomization);

var queryExecutor = new OptimisticLoadQueryExecutor<TSaga>(queryProvider);
var lockStrategy = new OptimisticSagaRepositoryLockStrategy<TSaga>(queryProvider, queryExecutor, IsolationLevel.ReadCommitted);
var queryExecutor = new OptimisticLoadQueryExecutor<TSaga>(queryCustomization);
var lockStrategy = new OptimisticSagaRepositoryLockStrategy<TSaga>(queryExecutor, queryCustomization, IsolationLevel.ReadCommitted);

return CreateRepository(dbContextFactory, lockStrategy);
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace MassTransit.EntityFrameworkCoreIntegration.Saga
{
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
Expand All @@ -10,16 +11,28 @@ public class OptimisticLoadQueryExecutor<TSaga> :
ILoadQueryExecutor<TSaga>
where TSaga : class, ISaga
{
readonly ILoadQueryProvider<TSaga> _provider;
readonly Func<DbContext, Guid, Task<TSaga>> _compiledQuery;
readonly Func<IQueryable<TSaga>, IQueryable<TSaga>> _queryCustomization;

public OptimisticLoadQueryExecutor(ILoadQueryProvider<TSaga> provider)
public OptimisticLoadQueryExecutor(Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization = null)
{
_provider = provider;
_queryCustomization = queryCustomization;

if (queryCustomization == null)
{
_compiledQuery = EF.CompileAsyncQuery((DbContext context, Guid id) =>
context.Set<TSaga>().AsTracking().SingleOrDefault(x => x.CorrelationId == id));
}
}

public Task<TSaga> Load(DbContext dbContext, Guid correlationId, CancellationToken cancellationToken)
{
return _provider.GetQueryable(dbContext).AsTracking().SingleOrDefaultAsync(x => x.CorrelationId == correlationId, cancellationToken);
if (_compiledQuery != null)
return _compiledQuery(dbContext, correlationId);

IQueryable<TSaga> queryable = _queryCustomization(dbContext.Set<TSaga>());

return queryable.AsTracking().SingleOrDefaultAsync(x => x.CorrelationId == correlationId, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace MassTransit.EntityFrameworkCoreIntegration.Saga
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -17,20 +18,25 @@ public class OptimisticSagaLockContext<TSaga> :
{
readonly CancellationToken _cancellationToken;
readonly DbContext _context;
readonly ILoadQueryProvider<TSaga> _provider;
readonly ISagaQuery<TSaga> _query;
readonly Func<IQueryable<TSaga>, IQueryable<TSaga>> _queryCustomization;

public OptimisticSagaLockContext(DbContext context, ISagaQuery<TSaga> query, CancellationToken cancellationToken, ILoadQueryProvider<TSaga> provider)
public OptimisticSagaLockContext(DbContext context, ISagaQuery<TSaga> query, CancellationToken cancellationToken,
Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization)
{
_context = context;
_query = query;
_cancellationToken = cancellationToken;
_provider = provider;
_queryCustomization = queryCustomization;
}

public async Task<IList<TSaga>> Load()
{
List<TSaga> instances = await _provider.GetQueryable(_context)
IQueryable<TSaga> queryable = _context.Set<TSaga>();
if (_queryCustomization != null)
queryable = _queryCustomization(queryable);

List<TSaga> instances = await queryable.AsTracking()
.Where(_query.FilterExpression)
.ToListAsync(_cancellationToken)
.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace MassTransit.EntityFrameworkCoreIntegration.Saga
{
using System;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
Expand All @@ -12,12 +13,13 @@ public class OptimisticSagaRepositoryLockStrategy<TSaga> :
where TSaga : class, ISaga
{
readonly ILoadQueryExecutor<TSaga> _executor;
readonly ILoadQueryProvider<TSaga> _provider;
readonly Func<IQueryable<TSaga>, IQueryable<TSaga>> _queryCustomization;

public OptimisticSagaRepositoryLockStrategy(ILoadQueryProvider<TSaga> provider, ILoadQueryExecutor<TSaga> executor, IsolationLevel isolationLevel)
public OptimisticSagaRepositoryLockStrategy(ILoadQueryExecutor<TSaga> executor, Func<IQueryable<TSaga>, IQueryable<TSaga>> queryCustomization,
IsolationLevel isolationLevel)
{
_provider = provider;
_executor = executor;
_queryCustomization = queryCustomization;

IsolationLevel = isolationLevel;
}
Expand All @@ -31,7 +33,7 @@ public Task<TSaga> Load(DbContext context, Guid correlationId, CancellationToken

public async Task<SagaLockContext<TSaga>> CreateLockContext(DbContext context, ISagaQuery<TSaga> query, CancellationToken cancellationToken)
{
return new OptimisticSagaLockContext<TSaga>(context, query, cancellationToken, _provider);
return new OptimisticSagaLockContext<TSaga>(context, query, cancellationToken, _queryCustomization);
}
}
}

0 comments on commit 164e146

Please sign in to comment.