Skip to content

Commit

Permalink
WIP SDK projections, using eventhandler protocol instead of runtime p…
Browse files Browse the repository at this point in the history
…rojections
  • Loading branch information
mhelleborg committed Mar 6, 2024
1 parent 47f717f commit 2a6eae1
Show file tree
Hide file tree
Showing 54 changed files with 663 additions and 1,078 deletions.
5 changes: 2 additions & 3 deletions Samples/ASP.NET/Customers/DishesEaten.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@

using System.Linq;
using Dolittle.SDK.Projections;
using Dolittle.SDK.Projections.Copies.MongoDB;

namespace Customers;

[Projection("185107c2-f897-40c8-bb06-643b3642f229")]
public class DishesEaten
[Projection("185107c2-f897-40c8-bb06-643b3642f230")]
public class DishesEaten: ProjectionBase
{
public string[] Dishes { get; set; } = {};

Expand Down
2 changes: 1 addition & 1 deletion Samples/ASP.NET/Kitchen/Kitchen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Kitchen;
[AggregateRoot("01ad9a9f-711f-47a8-8549-43320f782a1e")]
public class Kitchen : AggregateRoot
{
int _ingredients = 2;
int _ingredients = 100;

public Kitchen(EventSourceId eventSource)
: base(eventSource)
Expand Down
1 change: 1 addition & 0 deletions Samples/Tutorials/Projections/Chef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using Dolittle.SDK.Projections;

[Projection("e518986f-7e72-45f8-b4ac-2e34b26082ac")]
public class Chef: ProjectionBase
{
public string Name = "";
Expand Down
2 changes: 1 addition & 1 deletion Samples/Tutorials/Projections/DishCounter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Dolittle.SDK.Projections;

[Projection("98f9db66-b6ca-4e5f-9fc3-638626c9ecfa")]
public class DishCounter
public class DishCounter: ProjectionBase
{
public int NumberOfTimesPrepared = 0;
public string Name = "";
Expand Down
6 changes: 4 additions & 2 deletions Samples/Tutorials/Projections/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Sample code for the tutorial at https://dolittle.io/tutorials/projections/csharp/

using System;
using System.Linq;
using System.Threading.Tasks;
using Dolittle.SDK;
using Dolittle.SDK.Tenancy;
Expand Down Expand Up @@ -33,9 +34,10 @@

await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);

var dishes = await client.Projections
var dishes = client.Projections
.ForTenant(TenantId.Development)
.GetAll<DishCounter>().ConfigureAwait(false);
.AsQueryable<DishCounter>()
.ToList();

foreach (var dish in dishes)
{
Expand Down
2 changes: 2 additions & 0 deletions Source/Common/Model/ModelBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public void UnbindIdentifierToProcessorBuilder<TBuilder>(IIdentifier identifier,
}
}



/// <summary>
/// Builds a valid Dolittle application model from the bindings.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void Register(
eventHandler,
processingConverter,
loggerFactory.CreateLogger<EventHandlerProcessor>()),
new EventHandlerProtocol(),
EventHandlerProtocol.Instance,
cancellationToken);
}
}
Expand Down
6 changes: 6 additions & 0 deletions Source/Events.Handling/Internal/EventHandlerProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ namespace Dolittle.SDK.Events.Handling.Internal;
public class EventHandlerProtocol : IAmAReverseCallProtocol<EventHandlerClientToRuntimeMessage, EventHandlerRuntimeToClientMessage,
EventHandlerRegistrationRequest, EventHandlerRegistrationResponse, HandleEventRequest, EventHandlerResponse>
{
EventHandlerProtocol()
{
}

public static EventHandlerProtocol Instance { get; } = new();

/// <inheritdoc/>
public AsyncDuplexStreamingCall<EventHandlerClientToRuntimeMessage, EventHandlerRuntimeToClientMessage> Call(ChannelBase channel, CallOptions callOptions)
=> new EventHandlersClient(channel).Connect(callOptions);
Expand Down
30 changes: 30 additions & 0 deletions Source/Projections/Actors/ClusterIdentityMapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Dolittle.SDK.Tenancy;
using Proto.Cluster;

namespace Dolittle.SDK.Projections.Actors;

static class ClusterIdentityMapper
{
public static ClusterIdentity GetClusterIdentity(TenantId tenantId, Key projectionKey, string kind) =>
ClusterIdentity.Create($"{tenantId}:{projectionKey}", kind);

public static (TenantId, Key) GetTenantAndKey(ClusterIdentity clusterIdentity)
{
ArgumentNullException.ThrowIfNull(clusterIdentity);
var separator = clusterIdentity.Identity.IndexOf(':');

if(separator == -1)
{
throw new ArgumentException("ClusterIdentity is not in the correct format", nameof(clusterIdentity));
}

TenantId tenantId = clusterIdentity.Identity[..separator];
Key projectionKey = clusterIdentity.Identity[(separator + 1)..];

return (tenantId, projectionKey);
}
}
106 changes: 87 additions & 19 deletions Source/Projections/Actors/ProjectionActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,118 @@

using System;
using System.Threading.Tasks;
using Dolittle.SDK.Projections.Internal;
using Dolittle.SDK.Async;
using Dolittle.SDK.Events;
using Dolittle.SDK.Tenancy;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using Proto;
using Proto.Cluster;

namespace Dolittle.SDK.Projections.Actors;

public class ProjectionActor<TProjection> : IActor
where TProjection : ProjectionBase, new()
{
public static string Kind => $"proj_{ProjectionType<TProjection>.ProjectionModelId!.Id.Value:N}";
record ProjectedEvent(Key Key, object Event, EventType EventType, EventContext Context);

readonly GetServiceProviderForTenant _providerForTenant;
readonly ILogger<ProjectionActor<TProjection>> _logger;
public class ProjectionActor<TProjection>(
GetServiceProviderForTenant getServiceProvider,
IProjection<TProjection> projectionType,
ILogger<ProjectionActor<TProjection>> logger,
TimeSpan idleUnloadTimeout) : IActor where TProjection : ProjectionBase, new()
{
/// <summary>
/// The cluster kind for the projection actor.
/// </summary>
public static string GetKind(IProjection<TProjection> projection) => $"proj_{projection.Identifier.Value:N}";

public ProjectionActor(GetServiceProviderForTenant providerForTenant, ILogger<ProjectionActor<TProjection>> logger, TimeSpan idleUnloadTimeout)
{
_providerForTenant = providerForTenant;
_logger = logger;
}
IMongoCollection<TProjection>? _collection;
string? _id;
TProjection? _projection;
bool _initialized;

public Task ReceiveAsync(IContext context)
public async Task ReceiveAsync(IContext context)
{
try
{
switch (context.Message)
{
case Started:
return Init(context.ClusterIdentity()!.Identity);
await Init(context.ClusterIdentity()!, context);
return;
case ReceiveTimeout:
// ReSharper disable once MethodHasAsyncOverload
context.Stop(context.Self);
return;
case ProjectedEvent projectedEvent:
await On(projectedEvent, context);
return;
default:
return Task.CompletedTask;
return;
}
}
catch (Exception e)
{
_logger.LogError(e, "Error processing {Message}", context.Message);
logger.LogError(e, "Error processing {Message}", context.Message);
// ReSharper disable once MethodHasAsyncOverload
context.Stop(context.Self);
return Task.CompletedTask;
}
}

Task Init(string id)
async Task On(ProjectedEvent projectedEvent, IContext context)
{
return Task.CompletedTask;
try
{
if (!_initialized)
{
await Init(context.ClusterIdentity()!, context);
}

var firstEvent = _projection is null;
if (firstEvent)
{
_projection = new TProjection
{
Id = _id!
};
}

var projectionContext = new ProjectionContext(firstEvent, projectedEvent.Key, projectedEvent.Context);
var result = projectionType.On(_projection!, projectedEvent.Event, projectedEvent.EventType, projectionContext);
switch (result.Type)
{
case ProjectionResultType.Replace:
_projection = result.ReadModel;
_projection!.LastUpdated = projectionContext.EventContext.Occurred;
await _collection!.ReplaceOneAsync(p => p.Id == _projection!.Id, _projection, new ReplaceOptions { IsUpsert = true });
break;
case ProjectionResultType.Delete:
await _collection!.DeleteOneAsync(p => p.Id == _projection!.Id);
_projection = null;
// ReSharper disable once MethodHasAsyncOverload - Would deadlock the actor
context.Stop(context.Self);
break;
case ProjectionResultType.Keep:
default:
// No change
break;
}
context.Respond(new Try<ProjectionResultType>(result.Type));
}
catch (Exception e)
{
context.Respond(new Try<ProjectionResultType>(e));
}
}

async Task Init(ClusterIdentity id, IContext context)
{
var (tenantId, key) = ClusterIdentityMapper.GetTenantAndKey(id);
_id = key.Value;

var sp = await getServiceProvider(tenantId);
_collection = sp.GetRequiredService<IMongoCollection<TProjection>>();
_projection = await _collection.Find(p => p.Id == _id).SingleOrDefaultAsync();
context.SetReceiveTimeout(idleUnloadTimeout);
_initialized = true;
}
}
54 changes: 54 additions & 0 deletions Source/Projections/Actors/ProjectionClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Dolittle.SDK.Async;
using Dolittle.SDK.Events;
using Proto.Cluster;

namespace Dolittle.SDK.Projections.Actors;

public interface IProjectionClient<TProjection>
{
/// <summary>
/// Project an event to the projection.
/// </summary>
/// <param name="event"></param>
/// <param name="eventType"></param>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<ProjectionResultType> On(object @event, EventType eventType, EventContext context, CancellationToken cancellationToken);
}

public class ProjectionClient<TProjection>(IProjection<TProjection> projection, Cluster cluster) : IProjectionClient<TProjection> where TProjection : ProjectionBase, new()
{
readonly string _kind = ProjectionActor<TProjection>.GetKind(projection);

public async Task<ProjectionResultType> On(object @event, EventType eventType, EventContext context, CancellationToken cancellationToken)
{
if (!projection.Events.TryGetValue(eventType, out var keySelector))
{
throw new UnhandledEventType($"Projection {projection.Identifier} does not handle event type {eventType}.", eventType);
}

var key = keySelector.GetKey(@event, context);
var message = new ProjectedEvent(key, @event, eventType, context);

var clusterIdentity = GetIdentity(context, key);
var response = await cluster.RequestAsync<Try<ProjectionResultType>>(clusterIdentity, message, cancellationToken);
response.ThrowIfFailed();

return response.Result;
}

ClusterIdentity GetIdentity(EventContext context, Key key) =>
ClusterIdentityMapper.GetClusterIdentity(context.CurrentExecutionContext.Tenant, key, _kind);
}

public class UnhandledEventType(string message, EventType eventType) : Exception(message)
{
public EventType EventType => eventType;
}
18 changes: 6 additions & 12 deletions Source/Projections/Actors/ProjectionClusterKindFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Dolittle.SDK.Projections.Internal;
using Dolittle.SDK.Tenancy;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand All @@ -13,29 +12,24 @@ namespace Dolittle.SDK.Projections.Actors;

static class ProjectionClusterKindFactory
{
public static ClusterKind CreateKind(IServiceProvider serviceProvider, Type projectionType)
public static ClusterKind CreateKind(IServiceProvider serviceProvider, IProjection projection)
{
var createKind = typeof(ProjectionClusterKindFactory<>).MakeGenericType(projectionType).GetMethod(nameof(CreateKind));
return (ClusterKind)createKind!.Invoke(null, [serviceProvider])!;
var createKind = typeof(ProjectionClusterKindFactory<>).MakeGenericType(projection.ProjectionType).GetMethod(nameof(CreateKind));
return (ClusterKind)createKind!.Invoke(null, [serviceProvider, projection])!;
}
}

static class ProjectionClusterKindFactory<TProjection> where TProjection : ProjectionBase, new()
{
// ReSharper disable once UnusedMember.Global - Called by reflection
public static ClusterKind CreateKind(IServiceProvider serviceProvider)
public static ClusterKind CreateKind(IServiceProvider serviceProvider, IProjection<TProjection> projection)
{
if (ProjectionType<TProjection>.ProjectionModelId is null)
{
throw new ArgumentException($"Projection type {typeof(TProjection).FullName} is missing the Projection attribute");
}

var providerForTenant = serviceProvider.GetRequiredService<GetServiceProviderForTenant>();
var logger = serviceProvider.GetRequiredService<ILogger<ProjectionActor<TProjection>>>();
var idleUnloadTimeout = TimeSpan.FromSeconds(20); // TODO: make timeouts configurable

return new ClusterKind(ProjectionActor<TProjection>.Kind,
Props.FromProducer(() => new ProjectionActor<TProjection>(providerForTenant, logger, idleUnloadTimeout))
return new ClusterKind(ProjectionActor<TProjection>.GetKind(projection),
Props.FromProducer(() => new ProjectionActor<TProjection>(providerForTenant, projection, logger, idleUnloadTimeout))
.WithClusterRequestDeduplication(TimeSpan.FromMinutes(5)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IProjectionCopyToMongoDBBuilder<TReadModel>
/// </summary>
/// <param name="name">The <see cref="ProjectionCopyToMongoDB"/>.</param>
/// <returns>The builder for continuation</returns>
IProjectionCopyToMongoDBBuilder<TReadModel> ToCollection(MongoDBCopyCollectionName name);
IProjectionCopyToMongoDBBuilder<TReadModel> ToCollection(MongoDBProjectionCollectionName name);

/// <summary>
/// Sets the conversion from a field to a <see cref="BsonType"/>.
Expand Down
3 changes: 0 additions & 3 deletions Source/Projections/Builder/IUnregisteredProjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using Dolittle.SDK.DependencyInversion;
using Dolittle.SDK.Events.Processing;
using Dolittle.SDK.Projections.Store;
using Dolittle.SDK.Projections.Store.Converters;
using Microsoft.Extensions.Logging;

namespace Dolittle.SDK.Projections.Builder;
Expand All @@ -27,13 +26,11 @@ public interface IUnregisteredProjections : IUniqueBindings<ProjectionModelId, I
/// </summary>
/// <param name="eventProcessors">The <see cref="IEventProcessors" />.</param>
/// <param name="processingConverter">The <see cref="IEventProcessingConverter" />.</param>
/// <param name="projectionsConverter">The <see cref="IConvertProjectionsToSDK"/>.</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory" />.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken" />.</param>
void Register(
IEventProcessors eventProcessors,
IEventProcessingConverter processingConverter,
IConvertProjectionsToSDK projectionsConverter,
ILoggerFactory loggerFactory,
CancellationToken cancellationToken);

Expand Down
Loading

0 comments on commit 2a6eae1

Please sign in to comment.