From a1734a4ce5cd886ab3bfc932c5e153da587df492 Mon Sep 17 00:00:00 2001 From: Karol Grzesiak Date: Mon, 4 Mar 2024 17:32:58 +0100 Subject: [PATCH] Initial --- Source/Aggregates/Actors/AggregateActor.cs | 17 +++++----- Source/Aggregates/AggregateRootOperations.cs | 31 ++++++++++--------- Source/Aggregates/IAggregateRootOperations.cs | 4 +-- .../Aggregates/Internal/AggregateWrapper.cs | 22 ++++++++----- .../Aggregates/AggregateRootOperationsMock.cs | 26 +++++----------- 5 files changed, 52 insertions(+), 48 deletions(-) diff --git a/Source/Aggregates/Actors/AggregateActor.cs b/Source/Aggregates/Actors/AggregateActor.cs index fd4d93cf..25840cd7 100644 --- a/Source/Aggregates/Actors/AggregateActor.cs +++ b/Source/Aggregates/Actors/AggregateActor.cs @@ -23,13 +23,13 @@ namespace Dolittle.SDK.Aggregates.Actors; class Perform where TAggregate : AggregateRoot { - public Perform(Func callback, CancellationToken cancellationToken) + public Perform(Func> callback, CancellationToken cancellationToken) { Callback = callback; CancellationToken = cancellationToken; } - public Func Callback { get; } + public Func> Callback { get; } public CancellationToken CancellationToken { get; } } @@ -44,7 +44,9 @@ class AggregateActor : IActor where TAggregate : AggregateRoot // ReSharper disable once StaticMemberInGenericType readonly TimeSpan _idleUnloadTimeout; - internal AggregateActor(GetServiceProviderForTenant getServiceProvider, ILogger> logger, TimeSpan idleUnloadTimeout) + internal AggregateActor(GetServiceProviderForTenant getServiceProvider, + ILogger> logger, + TimeSpan idleUnloadTimeout) { _getServiceProvider = getServiceProvider; _logger = logger; @@ -83,7 +85,8 @@ async Task OnStarted(IContext context) _eventSourceId = eventSourceId; var serviceProvider = await _getServiceProvider(tenantId); - _aggregateWrapper = ActivatorUtilities.CreateInstance>(serviceProvider, _eventSourceId); + _aggregateWrapper = + ActivatorUtilities.CreateInstance>(serviceProvider, _eventSourceId); if (_idleUnloadTimeout > TimeSpan.Zero) { context.SetReceiveTimeout(_idleUnloadTimeout); @@ -106,13 +109,13 @@ async Task OnPerform(Perform perform, IContext context) { try { - await _aggregateWrapper!.Perform(perform.Callback, perform.CancellationToken); - context.Respond(new Try(true)); + var result = await _aggregateWrapper!.Perform(perform.Callback, perform.CancellationToken); + context.Respond(new Try(result)); } catch (Exception e) { Activity.Current?.RecordError(e); - context.Respond(new Try(e)); + context.Respond(new Try(e)); } finally { diff --git a/Source/Aggregates/AggregateRootOperations.cs b/Source/Aggregates/AggregateRootOperations.cs index 952f1b4d..1bd11a4d 100644 --- a/Source/Aggregates/AggregateRootOperations.cs +++ b/Source/Aggregates/AggregateRootOperations.cs @@ -19,7 +19,6 @@ namespace Dolittle.SDK.Aggregates; /// public delegate CancellationToken DefaultAggregatePerformTimeout(); - /// /// Represents an implementation of . /// @@ -39,7 +38,8 @@ public class AggregateRootOperations : IAggregateRootOperationsThe of the current tenant. /// The Root context used to communicate with actors /// The <see cref="DefaultAggregatePerformTimeout" /> Used if no cancellation token is passed. - public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, IRootContext context, DefaultAggregatePerformTimeout defaultTimeout) + public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, IRootContext context, + DefaultAggregatePerformTimeout defaultTimeout) { _context = context; _defaultTimeout = defaultTimeout; @@ -48,37 +48,40 @@ public AggregateRootOperations(EventSourceId eventSourceId, TenantId tenantId, I } /// - public Task Perform(Action method, CancellationToken cancellationToken = default) - => Perform( - aggregate => - { - method(aggregate); - return Task.CompletedTask; - }, - cancellationToken); + public async Task Perform(Action method, CancellationToken cancellationToken = default) => + await Perform(aggregate => + { + method(aggregate); + return Task.FromResult(new object()); + }, cancellationToken); /// - public async Task Perform(Func method, CancellationToken cancellationToken = default) + public async Task Perform(Func> method, + CancellationToken cancellationToken = default) { - if(cancellationToken == default) + if (cancellationToken == default) { cancellationToken = _defaultTimeout(); } + using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform") ?.Tag(_eventSourceId); try { var result = await _context.System.Cluster() - .RequestAsync>(_clusterIdentity, new Perform(method, cancellationToken), _context, + .RequestAsync>(_clusterIdentity, new Perform(method, cancellationToken), + _context, cancellationToken); if (!result.Success) { throw result.Exception; } + + return result.Result; } - catch (AggregateRootOperationFailed e) when(e.InnerException is not null) + catch (AggregateRootOperationFailed e) when (e.InnerException is not null) { activity?.RecordError(e.InnerException); // ReSharper disable once PossibleIntendedRethrow diff --git a/Source/Aggregates/IAggregateRootOperations.cs b/Source/Aggregates/IAggregateRootOperations.cs index 55969baa..efe3aa67 100644 --- a/Source/Aggregates/IAggregateRootOperations.cs +++ b/Source/Aggregates/IAggregateRootOperations.cs @@ -29,5 +29,5 @@ public interface IAggregateRootOperations /// Method to perform. /// Token that can be used to cancel this operation. /// The representing the asynchronous operation. - Task Perform(Func method, CancellationToken cancellationToken = default); -} \ No newline at end of file + Task Perform(Func> method, CancellationToken cancellationToken = default); +} diff --git a/Source/Aggregates/Internal/AggregateWrapper.cs b/Source/Aggregates/Internal/AggregateWrapper.cs index 65dc430a..16b34746 100644 --- a/Source/Aggregates/Internal/AggregateWrapper.cs +++ b/Source/Aggregates/Internal/AggregateWrapper.cs @@ -12,6 +12,7 @@ using Dolittle.SDK.Events.Store; using Dolittle.SDK.Events.Store.Builders; using Microsoft.Extensions.Logging; + #pragma warning disable CS0618 // Refers to EventSourceId which is marked obsolete for clients. Should still be used internally namespace Dolittle.SDK.Aggregates.Internal; @@ -38,7 +39,8 @@ class AggregateWrapper where TAggregate : AggregateRoot /// The . /// The tenant scoped . /// The . - public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes, IServiceProvider serviceProvider, ILogger> logger) + public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEventTypes eventTypes, + IServiceProvider serviceProvider, ILogger> logger) { _eventSourceId = eventSourceId; _eventTypes = eventTypes; @@ -47,7 +49,8 @@ public AggregateWrapper(EventSourceId eventSourceId, IEventStore eventStore, IEv _logger = logger; } - public async Task Perform(Func method, CancellationToken cancellationToken = default) + public async Task Perform(Func> method, + CancellationToken cancellationToken = default) { using var activity = Tracing.ActivitySource.StartActivity($"{typeof(TAggregate).Name}.Perform") ?.Tag(_eventSourceId); @@ -58,12 +61,14 @@ public async Task Perform(Func method, CancellationToken cance var aggregateRootId = _instance.AggregateRootId; activity?.Tag(aggregateRootId); _logger.PerformingOn(typeof(TAggregate), aggregateRootId, _instance.EventSourceId); - await method(_instance); + var result = await method(_instance); if (_instance.AppliedEvents.Any()) { await CommitAppliedEvents(_instance, aggregateRootId).ConfigureAwait(false); _instance.ClearAppliedEvents(); } + + return result; } catch (Exception e) { @@ -99,8 +104,10 @@ Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, Cancel var eventSourceId = aggregateRoot.EventSourceId; _logger.RehydratingAggregateRoot(typeof(TAggregate), aggregateRootId, eventSourceId); var eventTypesToFetch = GetEventTypes(_eventTypes); - var committedEventsBatches = _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken); - return aggregateRoot.RehydrateInternal(committedEventsBatches, AggregateRootMetadata.MethodsPerEventType, cancellationToken); + var committedEventsBatches = + _eventStore.FetchStreamForAggregate(aggregateRootId, eventSourceId, eventTypesToFetch, cancellationToken); + return aggregateRoot.RehydrateInternal(committedEventsBatches, + AggregateRootMetadata.MethodsPerEventType, cancellationToken); } static bool IsStateLess => AggregateRootMetadata.IsStateLess; @@ -113,11 +120,12 @@ Task Rehydrate(TAggregate aggregateRoot, AggregateRootId aggregateRootId, Cancel static IEnumerable GetEventTypes(IEventTypes eventTypes) => IsStateLess ? Enumerable.Empty() - : AggregateRootMetadata.MethodsPerEventType.Keys.Select(eventTypes.GetFor); + : AggregateRootMetadata.MethodsPerEventType.Keys.Select(eventTypes.GetFor); Task CommitAppliedEvents(TAggregate aggregateRoot, AggregateRootId aggregateRootId) { - _logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(), aggregateRoot.EventSourceId); + _logger.CommittingEvents(aggregateRoot.GetType(), aggregateRootId, aggregateRoot.AppliedEvents.Count(), + aggregateRoot.EventSourceId); return _eventStore .ForAggregate(aggregateRootId) .WithEventSource(aggregateRoot.EventSourceId) diff --git a/Source/Testing/Aggregates/AggregateRootOperationsMock.cs b/Source/Testing/Aggregates/AggregateRootOperationsMock.cs index 4a3d6249..d1277ba5 100644 --- a/Source/Testing/Aggregates/AggregateRootOperationsMock.cs +++ b/Source/Testing/Aggregates/AggregateRootOperationsMock.cs @@ -52,35 +52,25 @@ public AggregateRootOperationsMock( } /// - public Task Perform(Action method, CancellationToken cancellationToken = default) - => Perform(_ => - { - method(_); - return Task.CompletedTask; - }, cancellationToken); + public async Task Perform(Action method, CancellationToken cancellationToken = default) => await Perform(_ => { method(_); }, cancellationToken); /// /// Performs operation on aggregate synchronously. /// /// The method to perform. /// The cancellation token. - public void PerformSync(Action method, CancellationToken cancellationToken = default) - => Perform(_ => - { - method(_); - return Task.CompletedTask; - }, cancellationToken).GetAwaiter().GetResult(); - + + /// /// Performs operation on aggregate synchronously. /// /// The method to perform. /// The cancellation token. - public void PerformSync(Func method, CancellationToken cancellationToken = default) + public void PerformSync(Func> method, CancellationToken cancellationToken = default) => Perform(method, cancellationToken).GetAwaiter().GetResult(); - + /// - public Task Perform(Func method, CancellationToken cancellationToken = default) + public Task Perform(Func> method, CancellationToken cancellationToken = default) { lock (_concurrencyLock) { @@ -88,8 +78,8 @@ public Task Perform(Func method, CancellationToken cancellatio try { _persistNumEventsBeforeLastOperation(previousAppliedEvents.Count); - method(_aggregateRoot).GetAwaiter().GetResult(); - return Task.CompletedTask; + var result = method(_aggregateRoot).GetAwaiter().GetResult(); + return Task.FromResult(result); } catch (Exception ex) {