Skip to content

Commit

Permalink
Fix Inbox Starvation (#10)
Browse files Browse the repository at this point in the history
* fix inbox starvation by continuous postponing next message handling
  • Loading branch information
skrasekmichael authored May 6, 2024
1 parent 5e094a1 commit 490f8a6
Show file tree
Hide file tree
Showing 13 changed files with 819 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public InboxConsumer(IServiceProvider serviceProvider, IDateTimeProvider dateTim
}
}

internal async Task<Type?> DispatchEventAsync(InboxMessage message, CancellationToken ct = default)
private async Task<Type?> TryDispatchEventAsync(InboxMessage message, CancellationToken ct = default)
{
var integrationEventHandlerType = ResolveType(message);
if (integrationEventHandlerType is null)
Expand Down Expand Up @@ -102,11 +102,32 @@ public InboxConsumer(IServiceProvider serviceProvider, IDateTimeProvider dateTim
}
}

internal static Task<List<InboxMessage>> GetInboxAsync(DbContext dbContext, CancellationToken ct)
internal async Task<Type?> DispatchEventAsync(InboxMessage message, CancellationToken ct = default)
{
var result = await TryDispatchEventAsync(message, ct);

if (result is null)
{
message.FailCount++;
message.NextProcessingUtc = message.FailCount switch
{
< 5 => _dateTimeProvider.UtcNow,
< 10 => _dateTimeProvider.UtcNow.AddMinutes(message.FailCount - 10),
_ => _dateTimeProvider.UtcNow.AddMinutes(message.FailCount)
};
}

return result;
}

internal Task<List<InboxMessage>> GetInboxAsync(DbContext dbContext, CancellationToken ct)
{
return dbContext
.Set<InboxMessage>()
.Where(msg => msg.ProcessedUtc == null)
.Where(msg =>
msg.ProcessedUtc == null && //unprocessed
msg.FailCount != -1 && //not marked for skipping
msg.NextProcessingUtc < _dateTimeProvider.UtcNow) //is scheduled for processing
.OrderBy(msg => msg.CreatedUtc)
.Take(20)
.ToListAsync(ct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ internal sealed record InboxMessage
public required string Data { get; init; }
public DateTime? ProcessedUtc { get; set; } = null;
public string? Error { get; set; } = null;
public int FailCount { get; set; } = 0;
public DateTime? NextProcessingUtc { get; set; } = null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public Task ProduceEventAsync<TIntegrationEventHandler, TEvent>(TEvent integrati
CreatedUtc = _dateTimeProvider.UtcNow,
Assembly = type.Assembly.GetName().Name!,
Type = type.FullName!,
Data = JsonSerializer.Serialize(integrationEvent, JsonSerializerOptions)
Data = JsonSerializer.Serialize(integrationEvent, JsonSerializerOptions),
NextProcessingUtc = _dateTimeProvider.UtcNow
};

_dbContext.Set<InboxMessage>().Add(message);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace TeamUp.Notifications.Infrastructure.Persistence.Migrations
{
/// <inheritdoc />
public partial class InboxStarvation : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AddColumn<int>(
name: "FailCount",
schema: "Notifications",
table: "InboxMessages",
type: "integer",
nullable: false,
defaultValue: 0);

migrationBuilder.AddColumn<DateTime>(
name: "NextProcessingUtc",
schema: "Notifications",
table: "InboxMessages",
type: "timestamp with time zone",
nullable: true);
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropColumn(
name: "FailCount",
schema: "Notifications",
table: "InboxMessages");

migrationBuilder.DropColumn(
name: "NextProcessingUtc",
schema: "Notifications",
table: "InboxMessages");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ protected override void BuildModel(ModelBuilder modelBuilder)
b.Property<string>("Error")
.HasColumnType("text");

b.Property<int>("FailCount")
.HasColumnType("integer");

b.Property<DateTime?>("NextProcessingUtc")
.HasColumnType("timestamp with time zone");

b.Property<DateTime?>("ProcessedUtc")
.HasColumnType("timestamp with time zone");

Expand Down
Loading

0 comments on commit 490f8a6

Please sign in to comment.