diff --git a/Directory.Build.props b/Directory.Build.props index 0925be3..ed00d01 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -11,7 +11,7 @@ https://github.com/squidex/squidex true snupkg - 6.31.0 + 6.32.0 diff --git a/events/Squidex.Events.EntityFramework/EFEventStore_Reader.cs b/events/Squidex.Events.EntityFramework/EFEventStore_Reader.cs index eb8b892..72cac07 100644 --- a/events/Squidex.Events.EntityFramework/EFEventStore_Reader.cs +++ b/events/Squidex.Events.EntityFramework/EFEventStore_Reader.cs @@ -24,21 +24,25 @@ public async Task> QueryStreamAsync(string streamName { await using var context = await dbContextFactory.CreateDbContextAsync(ct); - var commits = await context.Set() - .ByStream(StreamFilter.Name(streamName)) - .ByOffset(afterStreamPosition) - .ToListAsync(ct); + var commits = + await context.Set() + .WhereStreamMatches(StreamFilter.Name(streamName)) + .WherePositionAfter(afterStreamPosition) + .WhereCommited() + .ToListAsync(ct); var result = Convert(commits, afterStreamPosition); if ((commits.Count == 0 || commits[0].EventStreamOffset != afterStreamPosition) && afterStreamPosition > EventsVersion.Empty) { - commits = await context.Set() - .ByStream(StreamFilter.Name(streamName)) - .ByBeforeOffset(afterStreamPosition) - .OrderByDescending(x => x.EventStreamOffset) - .Take(1) - .ToListAsync(ct); + commits = + await context.Set() + .WhereStreamMatches(StreamFilter.Name(streamName)) + .WherePositionBefore(afterStreamPosition) + .WhereCommited() + .OrderByDescending(x => x.EventStreamOffset) + .Take(1) + .ToListAsync(ct); result = Convert(commits, afterStreamPosition).ToList(); } @@ -57,12 +61,14 @@ public async IAsyncEnumerable QueryAllReverseAsync(StreamFilter fil await using var context = await dbContextFactory.CreateDbContextAsync(ct); DateTime streamTime = timestamp; - var query = await context.Set() - .ByStream(filter) - .ByTimestamp(streamTime) - .OrderByDescending(x => x.Position).ThenBy(x => x.EventStream) - .Take(take) - .ToListAsync(ct); + var query = + await context.Set() + .WhereStreamMatches(filter) + .WhereTimestampAfter(streamTime) + .WhereCommited() + .OrderByDescending(x => x.Position).ThenBy(x => x.EventStream) + .Take(take) + .ToListAsync(ct); var taken = 0; foreach (var commit in query) @@ -91,11 +97,13 @@ public async IAsyncEnumerable QueryAllAsync(StreamFilter filter = d await using var context = await dbContextFactory.CreateDbContextAsync(ct); ParsedStreamPosition streamPosition = position; - var query = context.Set() - .ByStream(filter) - .ByPosition(streamPosition) - .OrderBy(x => x.Position).ThenBy(x => x.EventStream) - .Take(take); + var query = + context.Set() + .WhereStreamMatches(filter) + .WherePositionAfter(streamPosition) + .WhereCommited() + .OrderBy(x => x.Position).ThenBy(x => x.EventStream) + .Take(take); var taken = 0; await foreach (var commit in query.AsAsyncEnumerable().WithCancellation(ct)) diff --git a/events/Squidex.Events.EntityFramework/EFEventStore_Writer.cs b/events/Squidex.Events.EntityFramework/EFEventStore_Writer.cs index a2e8da7..11abb64 100644 --- a/events/Squidex.Events.EntityFramework/EFEventStore_Writer.cs +++ b/events/Squidex.Events.EntityFramework/EFEventStore_Writer.cs @@ -103,17 +103,19 @@ public async Task DeleteAsync(StreamFilter filter, { await using var context = await dbContextFactory.CreateDbContextAsync(ct); - await context.Set().ByStream(filter) + await context.Set().WhereStreamMatches(filter) .ExecuteDeleteAsync(ct); } private static async Task GetEventStreamOffsetAsync(DbSet commitSet, string streamName) { - var record = await commitSet - .Where(x => x.EventStream == streamName) - .OrderByDescending(x => x.EventStreamOffset) - .Select(x => new { x.EventStreamOffset, x.EventsCount }) - .FirstOrDefaultAsync(); + var record = + await commitSet + .Where(x => x.Position != null) + .Where(x => x.EventStream == streamName) + .OrderByDescending(x => x.EventStreamOffset) + .Select(x => new { x.EventStreamOffset, x.EventsCount }) + .FirstOrDefaultAsync(); if (record == null) { diff --git a/events/Squidex.Events.EntityFramework/FilterBuilder.cs b/events/Squidex.Events.EntityFramework/FilterBuilder.cs index 20fcbf6..b4099ca 100644 --- a/events/Squidex.Events.EntityFramework/FilterBuilder.cs +++ b/events/Squidex.Events.EntityFramework/FilterBuilder.cs @@ -18,7 +18,12 @@ internal static class FilterBuilder private static readonly MethodInfo DbLikeMethod = typeof(DbFunctionsExtensions).GetMethod("Like", [typeof(DbFunctions), typeof(string), typeof(string)])!; private static readonly ConstantExpression DbFunctions = Expression.Constant(EF.Functions); - public static IQueryable ByTimestamp(this IQueryable q, DateTime timestamp) + public static IQueryable WhereCommited(this IQueryable q) + { + return q.Where(x => x.Position != null); + } + + public static IQueryable WhereTimestampAfter(this IQueryable q, DateTime timestamp) { if (timestamp == default) { @@ -28,7 +33,7 @@ public static IQueryable ByTimestamp(this IQueryable x.Timestamp >= timestamp); } - public static IQueryable ByBeforeOffset(this IQueryable q, long offset) + public static IQueryable WherePositionBefore(this IQueryable q, long offset) { if (offset <= EventsVersion.Empty) { @@ -38,7 +43,7 @@ public static IQueryable ByBeforeOffset(this IQueryable x.EventStreamOffset < offset); } - public static IQueryable ByOffset(this IQueryable q, long offset) + public static IQueryable WherePositionAfter(this IQueryable q, long offset) { if (offset <= EventsVersion.Empty) { @@ -48,7 +53,7 @@ public static IQueryable ByOffset(this IQueryable return q.Where(x => x.EventStreamOffset >= offset); } - public static IQueryable ByPosition(this IQueryable q, ParsedStreamPosition position) + public static IQueryable WherePositionAfter(this IQueryable q, ParsedStreamPosition position) { if (position.IsEndOfCommit) { @@ -58,7 +63,7 @@ public static IQueryable ByPosition(this IQueryable x.Position >= position.Position); } - public static IQueryable ByStream(this IQueryable q, StreamFilter filter) + public static IQueryable WhereStreamMatches(this IQueryable q, StreamFilter filter) { if (filter.Prefixes == null || filter.Prefixes.Count == 0) { @@ -85,9 +90,14 @@ public static IQueryable ByStream(this IQueryable public static IEnumerable Filtered(this EFEventCommit commit, ParsedStreamPosition position) { + if (!commit.Position.HasValue) + { + yield break; + } + var eventStreamOffset = commit.EventStreamOffset; - var commitPosition = commit.Position!.Value; + var commitPosition = commit.Position.Value; var commitOffset = 0; foreach (var @event in commit.Events) @@ -108,9 +118,14 @@ public static IEnumerable Filtered(this EFEventCommit commit, Parse public static IEnumerable Filtered(this EFEventCommit commit, long position) { + if (!commit.Position.HasValue) + { + yield break; + } + var eventStreamOffset = commit.EventStreamOffset; - var commitPosition = commit.Position!.Value; + var commitPosition = commit.Position.Value; var commitOffset = 0; foreach (var @event in commit.Events)