Skip to content

Commit

Permalink
Stream position metadata (#251)
Browse files Browse the repository at this point in the history
* Add support for stream position in event context / metadata. This allows event handlers to see and reason about which offset each event has in the stream.

* Dependencies updated

* Added the ability to set eventhandler to / from range dynamically.
  • Loading branch information
mhelleborg authored Jan 7, 2025
1 parent 1be65dd commit 7e27362
Show file tree
Hide file tree
Showing 25 changed files with 347 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,5 @@ csharp_style_prefer_range_operator = false:none
dotnet_diagnostic.IDE0055.severity = none
# prefere brace indentation when creating objects etc
csharp_indent_braces = false

resharper_remove_redundant_braces_highlighting = none
38 changes: 19 additions & 19 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,33 @@
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<PropertyGroup>
<ContractsVersion>7.8.0</ContractsVersion>
<ContractsVersion>7.8.1</ContractsVersion>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="Dolittle.Contracts" Version="$(ContractsVersion)" />
<PackageVersion Include="Autofac" Version="8.1.1" />
<PackageVersion Include="Autofac" Version="8.2.0" />
<PackageVersion Include="Autofac.Extensions.DependencyInjection" Version="10.0.0" />
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
<PackageVersion Include="ConsoleTables" Version="2.6.1" />
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
<PackageVersion Include="coverlet.collector" Version="6.0.3" />
<PackageVersion Include="Docker.DotNet" Version="3.125.15" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.65.0" />
<PackageVersion Include="Grpc.Core.Testing" Version="2.46.6" />
<PackageVersion Include="Grpc.Tools" Version="2.66.0">
<PackageVersion Include="Grpc.Tools" Version="2.68.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="Grpc.Net.Client" Version="2.66.0" />
<PackageVersion Include="Lamar" Version="13.1.0" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="13.1.0" />
<PackageVersion Include="Grpc.Net.Client" Version="2.67.0" />
<PackageVersion Include="Lamar" Version="14.0.1" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="14.0.1" />
<PackageVersion Include="Machine.Specifications" Version="1.1.2" />
<PackageVersion Include="Machine.Specifications.Should" Version="1.0.0" />
<PackageVersion Include="Machine.Specifications.Runner.VisualStudio" Version="2.10.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.11.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.11.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.12.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.12.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Analyzer.Testing.XUnit" Version="1.1.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.CodeFix.Testing.XUnit" Version="1.1.2" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="9.0.0" />
Expand All @@ -39,18 +39,18 @@
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Options" Version="9.0.0" />
<PackageVersion Include="Microsoft.Reactive.Testing" Version="6.0.1" />
<PackageVersion Include="MongoDB.Driver" Version="3.0.0" />
<PackageVersion Include="MongoDB.Driver" Version="3.1.0" />
<PackageVersion Include="MongoDB.Driver.Core.Extensions.DiagnosticSources" Version="2.0.0" />
<PackageVersion Include="Moq" Version="4.18.4" />
<PackageVersion Include="FluentAssertions" Version="6.12.2" />
<PackageVersion Include="FluentAssertions" Version="7.0.0" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="Polly" Version="8.4.1" />
<PackageVersion Include="Proto.Actor" Version="1.7.1-alpha.0.1" />
<PackageVersion Include="Proto.Remote" Version="1.7.1-alpha.0.1" />
<PackageVersion Include="Proto.OpenTelemetry" Version="1.7.1-alpha.0.1" />
<PackageVersion Include="Proto.Cluster" Version="1.7.1-alpha.0.1" />
<PackageVersion Include="Google.Protobuf" Version="3.28.3" />
<PackageVersion Include="Swashbuckle.AspNetCore" Version="7.0.0" />
<PackageVersion Include="Google.Protobuf" Version="3.29.2" />
<PackageVersion Include="Swashbuckle.AspNetCore" Version="7.2.0" />
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
<PackageVersion Include="System.Collections.Immutable" Version="9.0.0" />
<PackageVersion Include="System.Formats.Asn1" Version="9.0.0" />
Expand All @@ -60,14 +60,14 @@
<PackageVersion Include="System.Text.Json" Version="9.0.0" />
<PackageVersion Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageVersion Include="OpenTelemetry.Api" Version="1.10.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Runtime" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Runtime" Version="1.10.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.10.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.10.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.9.0-beta.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.9.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.10.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.10.0-beta.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.10.0" />
<PackageVersion Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageVersion Include="xunit" Version="2.9.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.0.0" />
</ItemGroup>
</Project>
15 changes: 15 additions & 0 deletions DotNET.SDK.sln
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Environments", "Environment
Samples\Environment\runtime.yml = Samples\Environment\runtime.yml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Events.Handling", "Tests\Events.Handling\Events.Handling.csproj", "{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -869,6 +871,18 @@ Global
{BF5E5AC9-C082-41E3-B2CF-2C49D2E8AE04}.Release|x64.Build.0 = Release|Any CPU
{BF5E5AC9-C082-41E3-B2CF-2C49D2E8AE04}.Release|x86.ActiveCfg = Release|Any CPU
{BF5E5AC9-C082-41E3-B2CF-2C49D2E8AE04}.Release|x86.Build.0 = Release|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Debug|x64.ActiveCfg = Debug|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Debug|x64.Build.0 = Debug|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Debug|x86.ActiveCfg = Debug|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Debug|x86.Build.0 = Debug|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Release|Any CPU.Build.0 = Release|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Release|x64.ActiveCfg = Release|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Release|x64.Build.0 = Release|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Release|x86.ActiveCfg = Release|Any CPU
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{CCA92423-6099-4712-8599-46426D7895BC} = {4A67BEB1-4496-4C90-96C2-4D6E518C84C5}
Expand Down Expand Up @@ -932,5 +946,6 @@ Global
{56E57E45-CFFB-42F2-A378-E155D9534CE8} = {E1103442-E3E5-43AE-B5B5-9F1965309E24}
{BF5E5AC9-C082-41E3-B2CF-2C49D2E8AE04} = {22418224-6C53-4187-A7FE-BDB8C1763764}
{15B891E5-2688-4370-B994-972AAA08897C} = {22418224-6C53-4187-A7FE-BDB8C1763764}
{459FFA28-2669-4A5E-ACFA-889B1D1A4CCB} = {E1103442-E3E5-43AE-B5B5-9F1965309E24}
EndGlobalSection
EndGlobal
2 changes: 1 addition & 1 deletion Samples/ASP.NET/DishEater.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public DishEater2(ILogger<DishEater2> logger)

public void Handle(DishEaten @event, EventContext ctx)
{
_logger.LogInformation("{CtxEventSourceId} has eaten unpartitioned {EventDish}. Yummm!", ctx.EventSourceId, @event.Dish);
_logger.LogInformation("{CtxEventSourceId} has eaten unpartitioned {EventDish}. It was event offset {Offset} in the stream", ctx.EventSourceId, @event.Dish, ctx.StreamPosition);
}
}
2 changes: 1 addition & 1 deletion Source/Events.Filters/Internal/FilterEventProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected FilterEventProcessor(
protected override Task<TResponse> Process(FilterEventRequest request, ExecutionContext executionContext, IServiceProvider serviceProvider, CancellationToken cancellation)
{
var committedEvent = _converter.ToSDK(request.Event);
return Filter(committedEvent.Content, committedEvent.GetEventContext(executionContext), cancellation);
return Filter(committedEvent.Content, committedEvent.GetEventContext(executionContext, null), cancellation);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ bool TryAddDecoratedHandlerMethods(
buildResults.AddFailure(_identifier, $"{method} has the signature of an event handler method, but is not public. Event handler methods needs to be public");
shouldAddHandler = false;
}

switch (shouldAddHandler)
{
case true when !eventTypesToMethods.TryAdd(eventType, createUntypedHandlerMethod(method)):
Expand Down
87 changes: 87 additions & 0 deletions Source/Events.Handling/EventHandlerWithRangeSelectorAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Dolittle.SDK.Common.Model;

namespace Dolittle.SDK.Events.Handling;

/// <summary>
/// Allows dynamic selection of the range to process.
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IProcessRangeSelector<T> where T : IProcessRangeSelector<T>, new()
{
/// <summary>
/// Get the range to process.
/// </summary>
/// <returns></returns>
public ProcessRange GetRange();
}

/// <summary>
/// Decorates a class to indicate the Event Handler Id of the Event Handler class.
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public class EventHandlerWithRangeSelectorAttribute<T> : Attribute, IDecoratedTypeDecorator<EventHandlerModelId>
where T : IProcessRangeSelector<T>, new()
{
readonly EventHandlerId _eventHandlerId;
readonly string? _alias;

/// <summary>
/// Initializes a new instance of the <see cref="EventHandlerAttribute"/> class.
/// </summary>
/// <param name="eventHandlerId">The unique identifier of the event handler.</param>
/// <param name="partitioned">Whether the event handler is partitioned.</param>
/// <param name="inScope">The scope that the event handler handles events in.</param>
/// <param name="alias">The alias for the event handler.</param>
/// <param name="concurrency">How many events can be processed simultaneously</param>
public EventHandlerWithRangeSelectorAttribute(
string eventHandlerId,
bool partitioned = true,
string? inScope = null,
string? alias = null,
int concurrency = 1
)
{
_eventHandlerId = eventHandlerId;
_alias = alias;
Concurrency = concurrency;
var selector = new T();
var range = selector.GetRange();
StartFrom = range.Mode;
StartAt = range.StartFrom;
StopAt = range.StopAt;
if (StartAt.HasValue && StopAt.HasValue && StartAt >= StopAt)
{
throw new ArgumentException("StartFromTimestamp must be before StopAtTimestamp");
}

Partitioned = partitioned;
Scope = inScope ?? ScopeId.Default;
}

public int Concurrency { get; set; }
public ProcessFrom StartFrom { get; }
public DateTimeOffset? StartAt { get; }
public DateTimeOffset? StopAt { get; }

/// <summary>
/// Gets a value indicating whether this event handler is partitioned.
/// </summary>
public bool Partitioned { get; }

/// <summary>
/// Gets the <see cref="ScopeId" />.
/// </summary>
public ScopeId Scope { get; }


/// <inheritdoc />
public EventHandlerModelId GetIdentifier(Type decoratedType)
{
return new(_eventHandlerId, Partitioned, Scope, _alias ?? decoratedType.Name, Concurrency, StartFrom,
StartAt, StopAt);
}
}
2 changes: 1 addition & 1 deletion Source/Events.Handling/Internal/EventHandlerProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ await _eventHandler
.Handle(
committedEvent.Content,
committedEvent.EventType,
committedEvent.GetEventContext(executionContext),
committedEvent.GetEventContext(executionContext, request.Event.StreamPosition),
serviceProvider,
cancellation)
.ConfigureAwait(false);
Expand Down
46 changes: 46 additions & 0 deletions Source/Events.Handling/ProcessRange.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;

namespace Dolittle.SDK.Events.Handling;

/// <summary>
/// Determine what date/time range of events to process.
/// </summary>
public record ProcessRange
{
/// <summary>
///
/// </summary>
/// <param name="Mode">Start from the start of the event log or just process new events</param>
/// <param name="StartFrom">Do not process events from before this. Can be in the future, Letting an event handler take over at a given point. (Optional)</param>
/// <param name="StopAt">Do not process events that were committed after this. (Optional)</param>
public ProcessRange(ProcessFrom Mode = ProcessFrom.Earliest,
DateTimeOffset? StartFrom = null,
DateTimeOffset? StopAt = null)
{
this.Mode = Mode;
this.StartFrom = StartFrom;
this.StopAt = StopAt;
if (StartFrom.HasValue && StopAt.HasValue && StartFrom.Value > StopAt.Value)
{
throw new ArgumentException("StartFrom cannot be after StopAt");
}
}

public ProcessFrom Mode { get; init; }

/// <summary>Mode: Start from the start of the event log or just process new events</summary>
public DateTimeOffset? StartFrom { get; init; }

/// <summary>Do not process events that were committed after this (Optional)</summary>
public DateTimeOffset? StopAt { get; init; }

public void Deconstruct(out ProcessFrom Mode, out DateTimeOffset? StartFrom, out DateTimeOffset? StopAt)
{
Mode = this.Mode;
StartFrom = this.StartFrom;
StopAt = this.StopAt;
}
}
3 changes: 2 additions & 1 deletion Source/Events.Processing/EventProcessingConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public bool TryToSDK(PbStreamEvent source, out StreamEvent @event, [NotNullWhen(
committedEvent,
source.Partitioned,
source.PartitionId,
scopeId);
scopeId,
source.StreamPosition);
return true;
}

Expand Down
11 changes: 9 additions & 2 deletions Source/Events.Processing/StreamEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ public class StreamEvent
/// <param name="partitioned">Whether the Event comes from a partitioned Stream or not.</param>
/// <param name="partition">The <see cref="PartitionId"/> assigned to the Event in the Stream.</param>
/// <param name="scope">The <see cref="ScopeId"/> of the Stream.</param>
public StreamEvent(CommittedEvent @event, bool partitioned, PartitionId partition, ScopeId scope)
/// <param name="position">The position in the stream</param>
public StreamEvent(CommittedEvent @event, bool partitioned, PartitionId partition, ScopeId scope, ulong position)
{
Event = @event;
Partitioned = partitioned;
Partition = partition;
Scope = scope;
Position = position;
}

/// <summary>
Expand All @@ -47,4 +49,9 @@ public StreamEvent(CommittedEvent @event, bool partitioned, PartitionId partitio
/// Gets the scope of the Stream the Event comes from.
/// </summary>
public ScopeId Scope { get; }
}

/// <summary>
/// Gets the position of the Event in the Stream.
/// </summary>
public ulong Position { get; }
}
5 changes: 4 additions & 1 deletion Source/Events/EventContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ namespace Dolittle.SDK.Events;
/// Represents the context in which an event occurred in.
/// </summary>
/// <param name="SequenceNumber">The <see cref="EventLogSequenceNumber">sequence number</see> that uniquely identifies the event in the event log which it was committed.</param>
/// <param name="StreamPosition">The <see cref="EventLogSequenceNumber">sequence number</see> that uniquely identifies the event in the event log which it was committed.</param>
/// <param name="EventType">The <see cref="EventType"/> of the event.</param>
/// <param name="EventSourceId">The <see cref="EventSourceId"/> that the event was committed to.</param>
/// <param name="Occurred">The <see cref="DateTimeOffset"/> when the event was committed to the <see cref="IEventStore"/>.</param>
/// <param name="CommittedExecutionContext">The <see cref="ExecutionContext"/> in which the event was committed to the <see cref="IEventStore"/>.</param>
/// <param name="CurrentExecutionContext">The <see cref="ExecutionContext"/> in which the event is currently being processed.</param>
/// <param name="StreamPosition">In event handler context, the position of the event in the stream. 0-indexed</param>
public record EventContext(
EventLogSequenceNumber SequenceNumber,
EventType EventType,
EventSourceId EventSourceId,
DateTimeOffset Occurred,
ExecutionContext CommittedExecutionContext,
ExecutionContext CurrentExecutionContext);
ExecutionContext CurrentExecutionContext,
StreamPosition? StreamPosition = null);
8 changes: 5 additions & 3 deletions Source/Events/Store/CommittedEventExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ public static class CommittedEventExtensions
/// </summary>
/// <param name="event">The <see cref="CommittedEvent"/> to get the context for.</param>
/// <param name="currentExecutionContext">The <see cref="ExecutionContext"/> in which the event is currently being processed.</param>
/// <param name="streamPosition">Optionally, the <see cref="StreamPosition"/> of the event in the stream.</param>
/// <returns>The <see cref="EventContext"/> for a <see cref="CommittedEvent"/>.</returns>
public static EventContext GetEventContext(this CommittedEvent @event, ExecutionContext currentExecutionContext)
public static EventContext GetEventContext(this CommittedEvent @event, ExecutionContext currentExecutionContext, StreamPosition? streamPosition)
=> new(
@event.EventLogSequenceNumber,
@event.EventType,
@event.EventSource,
@event.Occurred,
@event.ExecutionContext,
currentExecutionContext);
}
currentExecutionContext,
streamPosition);
}
18 changes: 18 additions & 0 deletions Source/Events/StreamPosition.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using Dolittle.SDK.Concepts;

namespace Dolittle.SDK.Events;

/// <summary>
/// Represents the position of the event in a stream. 0-indexed.
/// </summary>
public record StreamPosition(ulong Value) : ConceptAs<ulong>(Value)
{
/// <summary>
/// Implicitly convert a <see cref="uint"/> to an <see cref="StreamPosition"/>.
/// </summary>
/// <param name="number">The number.</param>
public static implicit operator StreamPosition(ulong number) => new(number);
}
Loading

0 comments on commit 7e27362

Please sign in to comment.