Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection DI #246

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 51 additions & 8 deletions Source/Projections/Actors/ProjectionActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,48 @@ public record GetProjectionRequest(ulong WaitForOffset)
public static readonly GetProjectionRequest GetCurrentValue = new(0);
}

/// <summary>
/// Message to subscribe to updates for the current projection id.
/// </summary>
/// <param name="Subscriber"></param>
public record SubscriptionRequest(PID Subscriber);

/// <summary>
/// Message to unsubscribe from projection updates
/// </summary>
/// <param name="Subscriber"></param>
public record Unsubscribe(PID Subscriber);

/// <summary>
/// Message to indicate that the actor has been unsubscribed.
/// </summary>
/// <param name="Exception"></param>
public record Unsubscribed(Exception? Exception)
{
public static readonly Unsubscribed Normally = new((Exception?)null);
}

/// <summary>
/// This actor is responsible for managing a projection.
/// In addition to holding the current state of the projection, it also manages subscriptions, and allows
/// clients to subscribe to updates for the specific ID it manages.
/// Actors with active subscriptions will not be unloaded from memory.
/// </summary>
/// <param name="getServiceProvider"></param>
/// <param name="projectionType"></param>
/// <param name="logger"></param>
/// <param name="idleUnloadTimeout">The projection will unload from memory if it has been idle for this duration</param>
/// <typeparam name="TProjection"></typeparam>
public class ProjectionActor<TProjection>(
GetServiceProviderForTenant getServiceProvider,
IProjection<TProjection> projectionType,
ILogger<ProjectionActor<TProjection>> logger,
TimeSpan idleUnloadTimeout) : IActor where TProjection : ReadModel, new()
{
Dictionary<ulong, TaskCompletionSource>? _waitingForUpdate;
readonly TimeSpan _idleUnloadTimeout = idleUnloadTimeout > TimeSpan.Zero ? idleUnloadTimeout : TimeSpan.FromMilliseconds(100);

readonly TimeSpan _idleUnloadTimeout =
idleUnloadTimeout > TimeSpan.Zero ? idleUnloadTimeout : TimeSpan.FromMilliseconds(100);

/// <summary>
/// The cluster kind for the projection actor.
Expand All @@ -54,7 +79,12 @@ public class ProjectionActor<TProjection>(
TProjection? _projection;
bool _initialized;
HashSet<PID>? _subscribers;
IServiceProvider? _serviceProvider;

/// <summary>
/// Process the incoming message.
/// </summary>
/// <param name="context"></param>
public async Task ReceiveAsync(IContext context)
{
try
Expand Down Expand Up @@ -116,7 +146,7 @@ void OnSubscriptionRequest(SubscriptionRequest request, IContext context)
context.CancelReceiveTimeout(); // Keep the actor alive as long as there are subscribers
}

public void OnUnsubscribe(Unsubscribe request, IContext context)
void OnUnsubscribe(Unsubscribe request, IContext context)
{
RemoveSubscriber(context, request.Subscriber);
context.Respond(Unsubscribed.Normally);
Expand Down Expand Up @@ -211,6 +241,7 @@ async Task On(ProjectedEvent projectedEvent, IContext context)
{
Id = _id!,
};
InitDependencies(_projection, _serviceProvider);
}
else
{
Expand All @@ -227,9 +258,11 @@ async Task On(ProjectedEvent projectedEvent, IContext context)
{
case ProjectionResultType.Replace:
_projection = result.ReadModel;
_projection!.SetLastUpdated(projectionContext.EventContext.SequenceNumber.Value, projectionContext.EventContext.Occurred);
_projection!.SetLastUpdated(projectionContext.EventContext.SequenceNumber.Value,
projectionContext.EventContext.Occurred);
OnReplace(_projection, context);
await _collection!.ReplaceOneAsync(p => p.Id == _projection!.Id, _projection, new ReplaceOptions { IsUpsert = true });
await _collection!.ReplaceOneAsync(p => p.Id == _projection!.Id, _projection,
new ReplaceOptions { IsUpsert = true });
break;
case ProjectionResultType.Delete:
OnDeleted(context);
Expand Down Expand Up @@ -308,14 +341,24 @@ async ValueTask Init(IContext context)
return;
}

var id = context.ClusterIdentity();

var id = context.ClusterIdentity() ?? throw new InvalidOperationException("No cluster identity");
var (tenantId, key) = ClusterIdentityMapper.GetTenantAndKey(id);
_id = key.Value;

var sp = await getServiceProvider(tenantId);
_collection = sp.GetRequiredService<IMongoCollection<TProjection>>();
_serviceProvider = await getServiceProvider(tenantId);
_collection = _serviceProvider.GetRequiredService<IMongoCollection<TProjection>>();
_projection = await _collection.Find(p => p.Id == _id).SingleOrDefaultAsync();
InitDependencies(_projection, _serviceProvider);

_initialized = true;
}

static void InitDependencies(TProjection? projection, IServiceProvider? sp)
{
// ReSharper disable once SuspiciousTypeConversion.Global
if (projection is IRequireDependencies<TProjection> requiresDependencies && sp is not null)
{
requiresDependencies.Resolve(sp);
}
}
}
20 changes: 20 additions & 0 deletions Source/Projections/IRequireDependencies.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Dolittle.SDK.Projections;

/// <summary>
/// Marks a read model that requires external dependencies
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IRequireDependencies<T> where T : ReadModel
{
/// <summary>
/// Initialize the read model with the required dependencies
/// </summary>
/// <param name="serviceProvider"></param>
/// <returns></returns>
public void Resolve(IServiceProvider serviceProvider);
}
14 changes: 11 additions & 3 deletions Source/Testing/Projections/ProjectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public abstract class ProjectionTests<TProjection>
EventLogSequenceNumber _sequenceNumber = EventLogSequenceNumber.Initial;
readonly Dictionary<Key, TProjection> _projections = new();
readonly IProjection<TProjection> _projection = ProjectionFixture<TProjection>.Projection;
readonly ServiceProvider _serviceProvider;

/// <summary>
/// Gets the <see cref="IAggregates"/> for the test.
Expand All @@ -40,8 +41,8 @@ protected ProjectionTests(Action<IServiceCollection>? configureServices = defaul
{
var serviceCollection = new ServiceCollection();
configureServices?.Invoke(serviceCollection);
var serviceProvider = serviceCollection.BuildServiceProvider();
Aggregates = new AggregatesMock(serviceProvider, OnAggregateEvents);
_serviceProvider = serviceCollection.BuildServiceProvider();
Aggregates = new AggregatesMock(_serviceProvider, OnAggregateEvents);
}

/// <summary>
Expand Down Expand Up @@ -138,10 +139,17 @@ void On(CommittedEvent evt)
var existed = _projections.TryGetValue(key, out var projection);
if (!existed)
{
_projections[key] = projection = new TProjection
var readModel = new TProjection
{
Id = key.Value,
};
// ReSharper disable once SuspiciousTypeConversion.Global
if(readModel is IRequireDependencies<TProjection> requireDependencies)
{
requireDependencies.Resolve(_serviceProvider);
}
_projections[key] = projection = readModel;

}

var projectionContext = new ProjectionContext(!existed, key, context);
Expand Down
81 changes: 81 additions & 0 deletions Tests/ProjectionsTests/ProjectionWithDependencyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Dolittle.SDK.Events;
using Dolittle.SDK.Projections.Types;
using Dolittle.SDK.Testing.Projections;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Xunit;

namespace Dolittle.SDK.Projections;

public class ProjectionWithDependencyTests : ProjectionTests<ReadModelWithDependency>
{
static readonly List<NameChanged> _nameChangedEvents = new();

public ProjectionWithDependencyTests() : base(ConfigureServices)
{
_nameChangedEvents.Clear();
}

static void ConfigureServices(IServiceCollection services)
{
// This configures the TestDependency that is resolved by the projection DI
services.AddSingleton<TestDependency>(evt =>
{
_nameChangedEvents.Add(evt);
});
}

static readonly EventSourceId _eventSourceId = "foo";

[Fact]
public void ShouldUpdateProjectionOnAggregateChanges()
{
WhenAggregateMutated<TestAggregate>(_eventSourceId, agg => agg.Rename("Bob"));

AssertThat.HasReadModel(_eventSourceId.Value)
.AndThat(
it => it.Name.Should().Be("Bob"),
it => it.TimesChanged.Should().Be(1));

_nameChangedEvents.Should().HaveCount(1);
}

[Fact]
public void ShouldUpdateProjectionOnAggregateChangesAgain()
{
WhenAggregateMutated<TestAggregate>(_eventSourceId, agg =>
{
agg.Rename("Bob");
agg.Rename("Bobby");
});

var projection = AssertThat.ReadModel(_eventSourceId.Value);
projection.Name.Should().Be("Bobby");
projection.TimesChanged.Should().Be(2);
_nameChangedEvents.Should().HaveCount(2);
}

[Fact]
public void ShouldUpdateProjectionOnAggregateChangesAgainAndAgain()
{
WhenAggregateMutated<TestAggregate>(_eventSourceId, agg =>
{
agg.Rename("Bob");
agg.Rename("Bobby");
});

WhenAggregateMutated<TestAggregate>(_eventSourceId, agg =>
{
agg.Rename("Bobby");
agg.Rename("Bob");
});

var projection = AssertThat.ReadModel(_eventSourceId.Value);
projection.Name.Should().Be("Bob");
projection.TimesChanged.Should().Be(3);
_nameChangedEvents.Should().HaveCount(3);
}
}
27 changes: 27 additions & 0 deletions Tests/ProjectionsTests/Types/ReadModelWithDependency.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Microsoft.Extensions.DependencyInjection;

namespace Dolittle.SDK.Projections.Types;

public delegate void TestDependency(NameChanged evt);

[Projection("bad319f1-6af8-48a0-b190-323e21ba6cde")]
public class ReadModelWithDependency : ReadModel, IRequireDependencies<ReadModelWithDependency>
{
TestDependency? _dep;

public string Name { get; set; } = string.Empty;
public int TimesChanged { get; set; }


public void Resolve(IServiceProvider serviceProvider) => _dep = serviceProvider.GetRequiredService<TestDependency>();

public void On(NameChanged evt)
{
Name = evt.Name;
TimesChanged++;
_dep?.Invoke(evt);
}
}
Loading