Skip to content

Commit

Permalink
Event handlers are now idempotent. (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
Utar94 authored Dec 28, 2024
1 parent 894212c commit 6d807b8
Show file tree
Hide file tree
Showing 9 changed files with 520 additions and 169 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

Nothing yet.
### Fixed

- Event handlers are now idempotent.

## [3.0.4] - 2024-12-27

Expand Down Expand Up @@ -67,7 +69,8 @@ Nothing yet.
- Relational storage (PostgreSQL and Microsoft SQL Server) for Identity entities.
- Unit and Integration tests.

[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.4...HEAD
[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.5...HEAD
[3.0.5]: https://github.com/Logitar/Identity/compare/v3.0.4...v3.0.5
[3.0.4]: https://github.com/Logitar/Identity/compare/v3.0.3...v3.0.4
[3.0.3]: https://github.com/Logitar/Identity/compare/v3.0.2...v3.0.3
[3.0.2]: https://github.com/Logitar/Identity/compare/v3.0.1...v3.0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public void Enable(UserEnabled @event)

public void RemoveCustomIdentifier(UserIdentifierRemoved @event)
{
Update(@event);

UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value);
if (identifier != null)
{
Expand Down Expand Up @@ -241,6 +243,8 @@ public void SetAddress(UserAddressChanged @event)

public void SetCustomIdentifier(UserIdentifierChanged @event)
{
Update(@event);

UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value);
if (identifier == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,39 @@ public sealed class ApiKeyEvents : INotificationHandler<ApiKeyAuthenticated>,
{
private readonly IdentityContext _context;
private readonly ICustomAttributeService _customAttributes;
private readonly IMediator _mediator;

public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes)
public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator)
{
_context = context;
_customAttributes = customAttributes;
_mediator = mediator;
}

public async Task Handle(ApiKeyAuthenticated @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

apiKey.Authenticate(@event);
if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
apiKey.Authenticate(@event);

await _context.SaveChangesAsync(cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationToken)
{
ApiKeyEntity? apiKey = await _context.ApiKeys.AsNoTracking()
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (apiKey == null)
{
apiKey = new(@event);
Expand All @@ -44,61 +55,98 @@ public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationTok

await SaveActorAsync(apiKey, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
else
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
}

public async Task Handle(ApiKeyDeleted @event, CancellationToken cancellationToken)
{
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
if (apiKey != null)

if (apiKey == null)
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
_context.ApiKeys.Remove(apiKey);

await DeleteActorAsync(apiKey, cancellationToken);
await _customAttributes.RemoveAsync(EntityType.ApiKey, apiKey.ApiKeyId, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyRoleAdded @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
RoleEntity role = await _context.Roles
.SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken)
?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found.");

RoleEntity role = await _context.Roles
.SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken)
?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found.");
apiKey.AddRole(role, @event);

apiKey.AddRole(role, @event);
await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyRoleRemoved @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
ApiKeyEntity? apiKey = await _context.ApiKeys
.Include(x => x.Roles)
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

apiKey.RemoveRole(@event);
if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
apiKey.RemoveRole(@event);

await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyUpdated @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
apiKey.Update(@event);

apiKey.Update(@event);
await SaveActorAsync(apiKey, cancellationToken);
await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await SaveActorAsync(apiKey, cancellationToken);
await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

private async Task DeleteActorAsync(ApiKeyEntity apiKey, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using Logitar.EventSourcing;
using MediatR;

namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers;

public record EventHandled(DomainEvent Event) : INotification;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Logitar.EventSourcing;
using Logitar.Identity.EntityFrameworkCore.Relational.Entities;
using MediatR;

namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers;

public record EventNotHandled : INotification
{
public long ExpectedVersion { get; }
public long ActualVersion { get; }

public EventNotHandled(long expectedVersion, long actualVersion)
{
ArgumentOutOfRangeException.ThrowIfNegative(expectedVersion);
ArgumentOutOfRangeException.ThrowIfNegative(actualVersion);

ExpectedVersion = expectedVersion;
ActualVersion = actualVersion;
}

public EventNotHandled(DomainEvent @event, AggregateEntity? aggregate)
{
ExpectedVersion = @event.Version - 1;
ActualVersion = aggregate?.Version ?? 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,71 +13,111 @@ public sealed class OneTimePasswordEvents : INotificationHandler<OneTimePassword
{
private readonly IdentityContext _context;
private readonly ICustomAttributeService _customAttributes;
private readonly IMediator _mediator;

public OneTimePasswordEvents(IdentityContext context, ICustomAttributeService customAttributes)
public OneTimePasswordEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator)
{
_context = context;
_customAttributes = customAttributes;
_mediator = mediator;
}

public async Task Handle(OneTimePasswordCreated @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords.AsNoTracking()
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null)
{
oneTimePassword = new(@event);

_context.OneTimePasswords.Add(oneTimePassword);

await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
else
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
}

public async Task Handle(OneTimePasswordDeleted @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
if (oneTimePassword != null)

if (oneTimePassword == null)
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
_context.OneTimePasswords.Remove(oneTimePassword);

await _customAttributes.RemoveAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(OneTimePasswordUpdated @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
oneTimePassword.Update(@event);

oneTimePassword.Update(@event);
await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(OneTimePasswordValidationFailed @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
oneTimePassword.Fail(@event);

oneTimePassword.Fail(@event);
await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(OneTimePasswordValidationSucceeded @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
oneTimePassword.Succeed(@event);

oneTimePassword.Succeed(@event);
await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}
}
Loading

0 comments on commit 6d807b8

Please sign in to comment.