Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Enable Using Return Types From Aggregates Methods #236

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions Source/Aggregates/Actors/AggregateActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ namespace Dolittle.SDK.Aggregates.Actors;

class Perform<TAggregate> where TAggregate : AggregateRoot
{
public Perform(Func<TAggregate, Task> callback, CancellationToken cancellationToken)
public Perform(Func<TAggregate, Task<object>> callback, CancellationToken cancellationToken)
{
Callback = callback;
CancellationToken = cancellationToken;
}

public Func<TAggregate, Task> Callback { get; }
public Func<TAggregate, Task<object>> Callback { get; }
public CancellationToken CancellationToken { get; }
}

Expand All @@ -44,7 +44,9 @@ class AggregateActor<TAggregate> : IActor where TAggregate : AggregateRoot
// ReSharper disable once StaticMemberInGenericType
readonly TimeSpan _idleUnloadTimeout;

internal AggregateActor(GetServiceProviderForTenant getServiceProvider, ILogger<AggregateActor<TAggregate>> logger, TimeSpan idleUnloadTimeout)
internal AggregateActor(GetServiceProviderForTenant getServiceProvider,
ILogger<AggregateActor<TAggregate>> logger,
TimeSpan idleUnloadTimeout)
{
_getServiceProvider = getServiceProvider;
_logger = logger;
Expand Down Expand Up @@ -83,7 +85,8 @@ async Task OnStarted(IContext context)

_eventSourceId = eventSourceId;
var serviceProvider = await _getServiceProvider(tenantId);
_aggregateWrapper = ActivatorUtilities.CreateInstance<AggregateWrapper<TAggregate>>(serviceProvider, _eventSourceId);
_aggregateWrapper =
ActivatorUtilities.CreateInstance<AggregateWrapper<TAggregate>>(serviceProvider, _eventSourceId);
if (_idleUnloadTimeout > TimeSpan.Zero)
{
context.SetReceiveTimeout(_idleUnloadTimeout);
Expand All @@ -106,13 +109,13 @@ async Task OnPerform(Perform<TAggregate> perform, IContext context)
{
try
{
await _aggregateWrapper!.Perform(perform.Callback, perform.CancellationToken);
context.Respond(new Try<bool>(true));
var result = await _aggregateWrapper!.Perform(perform.Callback, perform.CancellationToken);
context.Respond(new Try<object>(result));
}
catch (Exception e)
{
Activity.Current?.RecordError(e);
context.Respond(new Try<bool>(e));
context.Respond(new Try<object>(e));
}
finally
{
Expand Down
31 changes: 17 additions & 14 deletions Source/Aggregates/AggregateRootOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace Dolittle.SDK.Aggregates;
/// </summary>
public delegate CancellationToken DefaultAggregatePerformTimeout();


/// <summary>
/// Represents an implementation of <see cref="IAggregateRootOperations{T}"/>.
/// </summary>
Expand All @@ -39,7 +38,8 @@ public class AggregateRootOperations<TAggregate> : IAggregateRootOperations<TAgg
/// <param name="tenantId">The <see cref="TenantId"/> of the current tenant.</param>
/// <param name="context">The <see cref="IRootContext" />Root context used to communicate with actors</param>
/// <param name="defaultTimeout">The &lt;see cref="DefaultAggregatePerformTimeout" /&gt; Used if no cancellation token is passed.</param>
public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, IRootContext context, DefaultAggregatePerformTimeout defaultTimeout)
public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, IRootContext context,
DefaultAggregatePerformTimeout defaultTimeout)
{
_context = context;
_defaultTimeout = defaultTimeout;
Expand All @@ -48,37 +48,40 @@ public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, I
}

/// <inheritdoc/>
public Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default)
=> Perform(
aggregate =>
{
method(aggregate);
return Task.CompletedTask;
},
cancellationToken);
public async Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default) =>
await Perform(aggregate =>
{
method(aggregate);
return Task.FromResult(new object());
}, cancellationToken);

/// <inheritdoc/>
public async Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public async Task<object> Perform(Func<TAggregate, Task<object>> method,
CancellationToken cancellationToken = default)
{
if(cancellationToken == default)
if (cancellationToken == default)
{
cancellationToken = _defaultTimeout();
}

using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform")
?.Tag(_eventSourceId);

try
{
var result = await _context.System.Cluster()
.RequestAsync<Try<bool>>(_clusterIdentity, new Perform<TAggregate>(method, cancellationToken), _context,
.RequestAsync<Try<object>>(_clusterIdentity, new Perform<TAggregate>(method, cancellationToken),
_context,
cancellationToken);

if (!result.Success)
{
throw result.Exception;
}

return result.Result;
}
catch (AggregateRootOperationFailed e) when(e.InnerException is not null)
catch (AggregateRootOperationFailed e) when (e.InnerException is not null)
{
activity?.RecordError(e.InnerException);
// ReSharper disable once PossibleIntendedRethrow
Expand Down
4 changes: 2 additions & 2 deletions Source/Aggregates/IAggregateRootOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface IAggregateRootOperations<TAggregate>
/// <param name="method"><see cref="Action{T}">Method</see> to perform.</param>
/// <param name="cancellationToken">Token that can be used to cancel this operation.</param>
/// <returns>The <see cref="Task" /> representing the asynchronous operation.</returns>
Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default);
}
Task<object> Perform(Func<TAggregate, Task<object>> method, CancellationToken cancellationToken = default);
}
22 changes: 15 additions & 7 deletions Source/Aggregates/Internal/AggregateWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Dolittle.SDK.Events.Store;
using Dolittle.SDK.Events.Store.Builders;
using Microsoft.Extensions.Logging;

#pragma warning disable CS0618 // Refers to EventSourceId which is marked obsolete for clients. Should still be used internally

namespace Dolittle.SDK.Aggregates.Internal;
Expand All @@ -38,7 +39,8 @@ class AggregateWrapper<TAggregate> where TAggregate : AggregateRoot
/// <param name="eventTypes">The <see cref="IEventTypes"/>.</param>
/// <param name="serviceProvider">The tenant scoped <see cref="IServiceProvider"/>.</param>
/// <param name="logger">The <see cref="ILogger" />.</param>
public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes, IServiceProvider serviceProvider, ILogger<AggregateWrapper<TAggregate>> logger)
public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes,
IServiceProvider serviceProvider, ILogger<AggregateWrapper<TAggregate>> logger)
{
_eventSourceId = eventSourceId;
_eventTypes = eventTypes;
Expand All @@ -47,7 +49,8 @@ public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEv
_logger = logger;
}

public async Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public async Task<TResult> Perform<TResult>(Func<TAggregate, Task<TResult>> method,
CancellationToken cancellationToken = default)
{
using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform")
?.Tag(_eventSourceId);
Expand All @@ -58,12 +61,14 @@ public async Task Perform(Func<TAggregate, Task> method, CancellationToken cance
var aggregateRootId = _instance.AggregateRootId;
activity?.Tag(aggregateRootId);
_logger.PerformingOn(typeof(TAggregate), aggregateRootId, _instance.EventSourceId);
await method(_instance);
var result = await method(_instance);
if (_instance.AppliedEvents.Any())
{
await CommitAppliedEvents(_instance, aggregateRootId).ConfigureAwait(false);
_instance.ClearAppliedEvents();
}

return result;
}
catch (Exception e)
{
Expand Down Expand Up @@ -99,8 +104,10 @@ Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, Cancel
var eventSourceId = aggregateRoot.EventSourceId;
_logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId);
var eventTypesToFetch = GetEventTypes(_eventTypes);
var committedEventsBatches = _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
return aggregateRoot.RehydrateInternal(committedEventsBatches, AggregateRootMetadata<TAggregate>.MethodsPerEventType, cancellationToken);
var committedEventsBatches =
_eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
return aggregateRoot.RehydrateInternal(committedEventsBatches,
AggregateRootMetadata<TAggregate>.MethodsPerEventType, cancellationToken);
}

static bool IsStateLess => AggregateRootMetadata<TAggregate>.IsStateLess;
Expand All @@ -113,11 +120,12 @@ Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, Cancel
static IEnumerable<EventType> GetEventTypes(IEventTypes eventTypes)
=> IsStateLess
? Enumerable.Empty<EventType>()
: AggregateRootMetadata<TAggregate>.MethodsPerEventType.Keys.Select(eventTypes.GetFor);
: AggregateRootMetadata<TAggregate>.MethodsPerEventType.Keys.Select(eventTypes.GetFor);

Task<CommittedAggregateEvents> CommitAppliedEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId)
{
_logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(), aggregateRoot.EventSourceId);
_logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(),
aggregateRoot.EventSourceId);
return _eventStore
.ForAggregate(aggregateRootId)
.WithEventSource(aggregateRoot.EventSourceId)
Expand Down
26 changes: 8 additions & 18 deletions Source/Testing/Aggregates/AggregateRootOperationsMock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,44 +52,34 @@ public AggregateRootOperationsMock(
}

/// <inheritdoc />
public Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default)
=> Perform(_ =>
{
method(_);
return Task.CompletedTask;
}, cancellationToken);
public async Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default) => await Perform(_ => { method(_); }, cancellationToken);

/// <summary>
/// Performs operation on aggregate synchronously.
/// </summary>
/// <param name="method">The method to perform.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public void PerformSync(Action<TAggregate> method, CancellationToken cancellationToken = default)
=> Perform(_ =>
{
method(_);
return Task.CompletedTask;
}, cancellationToken).GetAwaiter().GetResult();



/// <summary>
/// Performs operation on aggregate synchronously.
/// </summary>
/// <param name="method">The method to perform.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public void PerformSync(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public void PerformSync(Func<TAggregate, Task<object>> method, CancellationToken cancellationToken = default)
=> Perform(method, cancellationToken).GetAwaiter().GetResult();

/// <inheritdoc />
public Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public Task<object> Perform(Func<TAggregate, Task<object>> method, CancellationToken cancellationToken = default)
{
lock (_concurrencyLock)
{
var previousAppliedEvents = new ReadOnlyCollection<AppliedEvent>(_aggregateRoot.AppliedEvents.ToList());
try
{
_persistNumEventsBeforeLastOperation(previousAppliedEvents.Count);
method(_aggregateRoot).GetAwaiter().GetResult();
return Task.CompletedTask;
var result = method(_aggregateRoot).GetAwaiter().GetResult();
return Task.FromResult(result);
}
catch (Exception ex)
{
Expand Down
Loading