Skip to content

Commit

Permalink
Initial
Browse files Browse the repository at this point in the history
  • Loading branch information
KarolGrzesiak committed Mar 4, 2024
1 parent a71f75b commit a1734a4
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 48 deletions.
17 changes: 10 additions & 7 deletions Source/Aggregates/Actors/AggregateActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ namespace Dolittle.SDK.Aggregates.Actors;

class Perform<TAggregate> where TAggregate : AggregateRoot
{
public Perform(Func<TAggregate, Task> callback, CancellationToken cancellationToken)
public Perform(Func<TAggregate, Task<object>> callback, CancellationToken cancellationToken)
{
Callback = callback;
CancellationToken = cancellationToken;
}

public Func<TAggregate, Task> Callback { get; }
public Func<TAggregate, Task<object>> Callback { get; }
public CancellationToken CancellationToken { get; }
}

Expand All @@ -44,7 +44,9 @@ class AggregateActor<TAggregate> : IActor where TAggregate : AggregateRoot
// ReSharper disable once StaticMemberInGenericType
readonly TimeSpan _idleUnloadTimeout;

internal AggregateActor(GetServiceProviderForTenant getServiceProvider, ILogger<AggregateActor<TAggregate>> logger, TimeSpan idleUnloadTimeout)
internal AggregateActor(GetServiceProviderForTenant getServiceProvider,
ILogger<AggregateActor<TAggregate>> logger,
TimeSpan idleUnloadTimeout)
{
_getServiceProvider = getServiceProvider;
_logger = logger;
Expand Down Expand Up @@ -83,7 +85,8 @@ async Task OnStarted(IContext context)

_eventSourceId = eventSourceId;
var serviceProvider = await _getServiceProvider(tenantId);
_aggregateWrapper = ActivatorUtilities.CreateInstance<AggregateWrapper<TAggregate>>(serviceProvider, _eventSourceId);
_aggregateWrapper =
ActivatorUtilities.CreateInstance<AggregateWrapper<TAggregate>>(serviceProvider, _eventSourceId);
if (_idleUnloadTimeout > TimeSpan.Zero)
{
context.SetReceiveTimeout(_idleUnloadTimeout);
Expand All @@ -106,13 +109,13 @@ async Task OnPerform(Perform<TAggregate> perform, IContext context)
{
try
{
await _aggregateWrapper!.Perform(perform.Callback, perform.CancellationToken);
context.Respond(new Try<bool>(true));
var result = await _aggregateWrapper!.Perform(perform.Callback, perform.CancellationToken);
context.Respond(new Try<object>(result));
}
catch (Exception e)
{
Activity.Current?.RecordError(e);
context.Respond(new Try<bool>(e));
context.Respond(new Try<object>(e));
}
finally
{
Expand Down
31 changes: 17 additions & 14 deletions Source/Aggregates/AggregateRootOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace Dolittle.SDK.Aggregates;
/// </summary>
public delegate CancellationToken DefaultAggregatePerformTimeout();


/// <summary>
/// Represents an implementation of <see cref="IAggregateRootOperations{T}"/>.
/// </summary>
Expand All @@ -39,7 +38,8 @@ public class AggregateRootOperations<TAggregate> : IAggregateRootOperations<TAgg
/// <param name="tenantId">The <see cref="TenantId"/> of the current tenant.</param>
/// <param name="context">The <see cref="IRootContext" />Root context used to communicate with actors</param>
/// <param name="defaultTimeout">The &lt;see cref="DefaultAggregatePerformTimeout" /&gt; Used if no cancellation token is passed.</param>
public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, IRootContext context, DefaultAggregatePerformTimeout defaultTimeout)
public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, IRootContext context,
DefaultAggregatePerformTimeout defaultTimeout)
{
_context = context;
_defaultTimeout = defaultTimeout;
Expand All @@ -48,37 +48,40 @@ public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, I
}

/// <inheritdoc/>
public Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default)
=> Perform(
aggregate =>
{
method(aggregate);
return Task.CompletedTask;
},
cancellationToken);
public async Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default) =>
await Perform(aggregate =>
{
method(aggregate);
return Task.FromResult(new object());
}, cancellationToken);

/// <inheritdoc/>
public async Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public async Task<object> Perform(Func<TAggregate, Task<object>> method,
CancellationToken cancellationToken = default)
{
if(cancellationToken == default)
if (cancellationToken == default)
{
cancellationToken = _defaultTimeout();
}

using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform")
?.Tag(_eventSourceId);

try
{
var result = await _context.System.Cluster()
.RequestAsync<Try<bool>>(_clusterIdentity, new Perform<TAggregate>(method, cancellationToken), _context,
.RequestAsync<Try<object>>(_clusterIdentity, new Perform<TAggregate>(method, cancellationToken),
_context,
cancellationToken);

if (!result.Success)
{
throw result.Exception;
}

return result.Result;
}
catch (AggregateRootOperationFailed e) when(e.InnerException is not null)
catch (AggregateRootOperationFailed e) when (e.InnerException is not null)
{
activity?.RecordError(e.InnerException);
// ReSharper disable once PossibleIntendedRethrow
Expand Down
4 changes: 2 additions & 2 deletions Source/Aggregates/IAggregateRootOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface IAggregateRootOperations<TAggregate>
/// <param name="method"><see cref="Action{T}">Method</see> to perform.</param>
/// <param name="cancellationToken">Token that can be used to cancel this operation.</param>
/// <returns>The <see cref="Task" /> representing the asynchronous operation.</returns>
Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default);
}
Task<object> Perform(Func<TAggregate, Task<object>> method, CancellationToken cancellationToken = default);
}
22 changes: 15 additions & 7 deletions Source/Aggregates/Internal/AggregateWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Dolittle.SDK.Events.Store;
using Dolittle.SDK.Events.Store.Builders;
using Microsoft.Extensions.Logging;

#pragma warning disable CS0618 // Refers to EventSourceId which is marked obsolete for clients. Should still be used internally

namespace Dolittle.SDK.Aggregates.Internal;
Expand All @@ -38,7 +39,8 @@ class AggregateWrapper<TAggregate> where TAggregate : AggregateRoot
/// <param name="eventTypes">The <see cref="IEventTypes"/>.</param>
/// <param name="serviceProvider">The tenant scoped <see cref="IServiceProvider"/>.</param>
/// <param name="logger">The <see cref="ILogger" />.</param>
public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes, IServiceProvider serviceProvider, ILogger<AggregateWrapper<TAggregate>> logger)
public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes,
IServiceProvider serviceProvider, ILogger<AggregateWrapper<TAggregate>> logger)
{
_eventSourceId = eventSourceId;
_eventTypes = eventTypes;
Expand All @@ -47,7 +49,8 @@ public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEv
_logger = logger;
}

public async Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public async Task<TResult> Perform<TResult>(Func<TAggregate, Task<TResult>> method,
CancellationToken cancellationToken = default)
{
using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform")
?.Tag(_eventSourceId);
Expand All @@ -58,12 +61,14 @@ public async Task Perform(Func<TAggregate, Task> method, CancellationToken cance
var aggregateRootId = _instance.AggregateRootId;
activity?.Tag(aggregateRootId);
_logger.PerformingOn(typeof(TAggregate), aggregateRootId, _instance.EventSourceId);
await method(_instance);
var result = await method(_instance);
if (_instance.AppliedEvents.Any())
{
await CommitAppliedEvents(_instance, aggregateRootId).ConfigureAwait(false);
_instance.ClearAppliedEvents();
}

return result;
}
catch (Exception e)
{
Expand Down Expand Up @@ -99,8 +104,10 @@ Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, Cancel
var eventSourceId = aggregateRoot.EventSourceId;
_logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId);
var eventTypesToFetch = GetEventTypes(_eventTypes);
var committedEventsBatches = _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
return aggregateRoot.RehydrateInternal(committedEventsBatches, AggregateRootMetadata<TAggregate>.MethodsPerEventType, cancellationToken);
var committedEventsBatches =
_eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken);
return aggregateRoot.RehydrateInternal(committedEventsBatches,
AggregateRootMetadata<TAggregate>.MethodsPerEventType, cancellationToken);
}

static bool IsStateLess => AggregateRootMetadata<TAggregate>.IsStateLess;
Expand All @@ -113,11 +120,12 @@ Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, Cancel
static IEnumerable<EventType> GetEventTypes(IEventTypes eventTypes)
=> IsStateLess
? Enumerable.Empty<EventType>()
: AggregateRootMetadata<TAggregate>.MethodsPerEventType.Keys.Select(eventTypes.GetFor);
: AggregateRootMetadata<TAggregate>.MethodsPerEventType.Keys.Select(eventTypes.GetFor);

Task<CommittedAggregateEvents> CommitAppliedEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId)
{
_logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(), aggregateRoot.EventSourceId);
_logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(),
aggregateRoot.EventSourceId);
return _eventStore
.ForAggregate(aggregateRootId)
.WithEventSource(aggregateRoot.EventSourceId)
Expand Down
26 changes: 8 additions & 18 deletions Source/Testing/Aggregates/AggregateRootOperationsMock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,44 +52,34 @@ public AggregateRootOperationsMock(
}

/// <inheritdoc />
public Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default)
=> Perform(_ =>
{
method(_);
return Task.CompletedTask;
}, cancellationToken);
public async Task Perform(Action<TAggregate> method, CancellationToken cancellationToken = default) => await Perform(_ => { method(_); }, cancellationToken);

/// <summary>
/// Performs operation on aggregate synchronously.
/// </summary>
/// <param name="method">The method to perform.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public void PerformSync(Action<TAggregate> method, CancellationToken cancellationToken = default)
=> Perform(_ =>
{
method(_);
return Task.CompletedTask;
}, cancellationToken).GetAwaiter().GetResult();



/// <summary>
/// Performs operation on aggregate synchronously.
/// </summary>
/// <param name="method">The method to perform.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public void PerformSync(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public void PerformSync(Func<TAggregate, Task<object>> method, CancellationToken cancellationToken = default)
=> Perform(method, cancellationToken).GetAwaiter().GetResult();

/// <inheritdoc />
public Task Perform(Func<TAggregate, Task> method, CancellationToken cancellationToken = default)
public Task<object> Perform(Func<TAggregate, Task<object>> method, CancellationToken cancellationToken = default)
{
lock (_concurrencyLock)
{
var previousAppliedEvents = new ReadOnlyCollection<AppliedEvent>(_aggregateRoot.AppliedEvents.ToList());
try
{
_persistNumEventsBeforeLastOperation(previousAppliedEvents.Count);
method(_aggregateRoot).GetAwaiter().GetResult();
return Task.CompletedTask;
var result = method(_aggregateRoot).GetAwaiter().GetResult();
return Task.FromResult(result);
}
catch (Exception ex)
{
Expand Down

0 comments on commit a1734a4

Please sign in to comment.