Skip to content

Commit

Permalink
Do not return events without position.
Browse files Browse the repository at this point in the history
  • Loading branch information
SebastianStehle committed Feb 3, 2025
1 parent 3aceb76 commit b2529e3
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageProjectUrl>https://github.com/squidex/squidex</PackageProjectUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<Version>6.31.0</Version>
<Version>6.32.0</Version>
</PropertyGroup>

<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
Expand Down
50 changes: 29 additions & 21 deletions events/Squidex.Events.EntityFramework/EFEventStore_Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,25 @@ public async Task<IReadOnlyList<StoredEvent>> QueryStreamAsync(string streamName
{
await using var context = await dbContextFactory.CreateDbContextAsync(ct);

var commits = await context.Set<EFEventCommit>()
.ByStream(StreamFilter.Name(streamName))
.ByOffset(afterStreamPosition)
.ToListAsync(ct);
var commits =
await context.Set<EFEventCommit>()
.WhereStreamMatches(StreamFilter.Name(streamName))
.WherePositionAfter(afterStreamPosition)
.WhereCommited()
.ToListAsync(ct);

var result = Convert(commits, afterStreamPosition);

if ((commits.Count == 0 || commits[0].EventStreamOffset != afterStreamPosition) && afterStreamPosition > EventsVersion.Empty)
{
commits = await context.Set<EFEventCommit>()
.ByStream(StreamFilter.Name(streamName))
.ByBeforeOffset(afterStreamPosition)
.OrderByDescending(x => x.EventStreamOffset)
.Take(1)
.ToListAsync(ct);
commits =
await context.Set<EFEventCommit>()
.WhereStreamMatches(StreamFilter.Name(streamName))
.WherePositionBefore(afterStreamPosition)
.WhereCommited()
.OrderByDescending(x => x.EventStreamOffset)
.Take(1)
.ToListAsync(ct);

result = Convert(commits, afterStreamPosition).ToList();
}
Expand All @@ -57,12 +61,14 @@ public async IAsyncEnumerable<StoredEvent> QueryAllReverseAsync(StreamFilter fil
await using var context = await dbContextFactory.CreateDbContextAsync(ct);

DateTime streamTime = timestamp;
var query = await context.Set<EFEventCommit>()
.ByStream(filter)
.ByTimestamp(streamTime)
.OrderByDescending(x => x.Position).ThenBy(x => x.EventStream)
.Take(take)
.ToListAsync(ct);
var query =
await context.Set<EFEventCommit>()
.WhereStreamMatches(filter)
.WhereTimestampAfter(streamTime)
.WhereCommited()
.OrderByDescending(x => x.Position).ThenBy(x => x.EventStream)
.Take(take)
.ToListAsync(ct);

var taken = 0;
foreach (var commit in query)
Expand Down Expand Up @@ -91,11 +97,13 @@ public async IAsyncEnumerable<StoredEvent> QueryAllAsync(StreamFilter filter = d
await using var context = await dbContextFactory.CreateDbContextAsync(ct);

ParsedStreamPosition streamPosition = position;
var query = context.Set<EFEventCommit>()
.ByStream(filter)
.ByPosition(streamPosition)
.OrderBy(x => x.Position).ThenBy(x => x.EventStream)
.Take(take);
var query =
context.Set<EFEventCommit>()
.WhereStreamMatches(filter)
.WherePositionAfter(streamPosition)
.WhereCommited()
.OrderBy(x => x.Position).ThenBy(x => x.EventStream)
.Take(take);

var taken = 0;
await foreach (var commit in query.AsAsyncEnumerable().WithCancellation(ct))
Expand Down
14 changes: 8 additions & 6 deletions events/Squidex.Events.EntityFramework/EFEventStore_Writer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,19 @@ public async Task DeleteAsync(StreamFilter filter,
{
await using var context = await dbContextFactory.CreateDbContextAsync(ct);

await context.Set<EFEventCommit>().ByStream(filter)
await context.Set<EFEventCommit>().WhereStreamMatches(filter)
.ExecuteDeleteAsync(ct);
}

private static async Task<long> GetEventStreamOffsetAsync(DbSet<EFEventCommit> commitSet, string streamName)
{
var record = await commitSet
.Where(x => x.EventStream == streamName)
.OrderByDescending(x => x.EventStreamOffset)
.Select(x => new { x.EventStreamOffset, x.EventsCount })
.FirstOrDefaultAsync();
var record =
await commitSet
.Where(x => x.Position != null)
.Where(x => x.EventStream == streamName)
.OrderByDescending(x => x.EventStreamOffset)
.Select(x => new { x.EventStreamOffset, x.EventsCount })
.FirstOrDefaultAsync();

if (record == null)
{
Expand Down
29 changes: 22 additions & 7 deletions events/Squidex.Events.EntityFramework/FilterBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ internal static class FilterBuilder
private static readonly MethodInfo DbLikeMethod = typeof(DbFunctionsExtensions).GetMethod("Like", [typeof(DbFunctions), typeof(string), typeof(string)])!;
private static readonly ConstantExpression DbFunctions = Expression.Constant(EF.Functions);

public static IQueryable<EFEventCommit> ByTimestamp(this IQueryable<EFEventCommit> q, DateTime timestamp)
public static IQueryable<EFEventCommit> WhereCommited(this IQueryable<EFEventCommit> q)
{
return q.Where(x => x.Position != null);
}

public static IQueryable<EFEventCommit> WhereTimestampAfter(this IQueryable<EFEventCommit> q, DateTime timestamp)
{
if (timestamp == default)
{
Expand All @@ -28,7 +33,7 @@ public static IQueryable<EFEventCommit> ByTimestamp(this IQueryable<EFEventCommi
return q.Where(x => x.Timestamp >= timestamp);
}

public static IQueryable<EFEventCommit> ByBeforeOffset(this IQueryable<EFEventCommit> q, long offset)
public static IQueryable<EFEventCommit> WherePositionBefore(this IQueryable<EFEventCommit> q, long offset)
{
if (offset <= EventsVersion.Empty)
{
Expand All @@ -38,7 +43,7 @@ public static IQueryable<EFEventCommit> ByBeforeOffset(this IQueryable<EFEventCo
return q.Where(x => x.EventStreamOffset < offset);
}

public static IQueryable<EFEventCommit> ByOffset(this IQueryable<EFEventCommit> q, long offset)
public static IQueryable<EFEventCommit> WherePositionAfter(this IQueryable<EFEventCommit> q, long offset)
{
if (offset <= EventsVersion.Empty)
{
Expand All @@ -48,7 +53,7 @@ public static IQueryable<EFEventCommit> ByOffset(this IQueryable<EFEventCommit>
return q.Where(x => x.EventStreamOffset >= offset);
}

public static IQueryable<EFEventCommit> ByPosition(this IQueryable<EFEventCommit> q, ParsedStreamPosition position)
public static IQueryable<EFEventCommit> WherePositionAfter(this IQueryable<EFEventCommit> q, ParsedStreamPosition position)
{
if (position.IsEndOfCommit)
{
Expand All @@ -58,7 +63,7 @@ public static IQueryable<EFEventCommit> ByPosition(this IQueryable<EFEventCommit
return q.Where(x => x.Position >= position.Position);
}

public static IQueryable<EFEventCommit> ByStream(this IQueryable<EFEventCommit> q, StreamFilter filter)
public static IQueryable<EFEventCommit> WhereStreamMatches(this IQueryable<EFEventCommit> q, StreamFilter filter)
{
if (filter.Prefixes == null || filter.Prefixes.Count == 0)
{
Expand All @@ -85,9 +90,14 @@ public static IQueryable<EFEventCommit> ByStream(this IQueryable<EFEventCommit>

public static IEnumerable<StoredEvent> Filtered(this EFEventCommit commit, ParsedStreamPosition position)
{
if (!commit.Position.HasValue)
{
yield break;
}

var eventStreamOffset = commit.EventStreamOffset;

var commitPosition = commit.Position!.Value;
var commitPosition = commit.Position.Value;
var commitOffset = 0;

foreach (var @event in commit.Events)
Expand All @@ -108,9 +118,14 @@ public static IEnumerable<StoredEvent> Filtered(this EFEventCommit commit, Parse

public static IEnumerable<StoredEvent> Filtered(this EFEventCommit commit, long position)
{
if (!commit.Position.HasValue)
{
yield break;
}

var eventStreamOffset = commit.EventStreamOffset;

var commitPosition = commit.Position!.Value;
var commitPosition = commit.Position.Value;
var commitOffset = 0;

foreach (var @event in commit.Events)
Expand Down

0 comments on commit b2529e3

Please sign in to comment.