Skip to content

Commit

Permalink
Merge pull request #11 from mizrael/event-id
Browse files Browse the repository at this point in the history
updated event id generation
  • Loading branch information
mizrael authored Dec 18, 2023
2 parents 730849d + 97e6731 commit 0c10542
Show file tree
Hide file tree
Showing 62 changed files with 884 additions and 688 deletions.
18 changes: 17 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,29 @@
"request": "launch",
"preLaunchTask": "build",
// If you have changed target frameworks, make sure to update the program path.
"program": "${workspaceFolder}/src/EvenireDB.Server/bin/Debug/net7.0/EvenireDB.Server.dll",
"program": "${workspaceFolder}/src/EvenireDB.Server/bin/Debug/net8.0/EvenireDB.Server.dll",
"args": [],
"cwd": "${workspaceFolder}/src/EvenireDB.Server",
// For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console
"console": "internalConsole",
"stopAtEntry": false
},
{
// Use IntelliSense to find out which attributes exist for C# debugging
// Use hover for the description of the existing attributes
// For further information visit https://github.com/dotnet/vscode-csharp/blob/main/debugger-launchjson.md
"name": "launch sample 1",
"type": "coreclr",
"request": "launch",
"preLaunchTask": "build",
// If you have changed target frameworks, make sure to update the program path.
"program": "${workspaceFolder}/samples/EvenireDB.Samples.TemperatureSensors/bin/Debug/net8.0/EvenireDB.Samples.TemperatureSensors.dll",
"args": [],
"cwd": "${workspaceFolder}/samples/EvenireDB.Samples.TemperatureSensors",
// For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console
"console": "internalConsole",
"stopAtEntry": false
},
{
"name": ".NET Core Attach",
"type": "coreclr",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var reading = new ReadingReceived(Random.Shared.NextDouble() * 100, DateTimeOffset.UtcNow);
await _eventsClient.AppendAsync(sensorId, new[]
{
Event.Create(reading),
EventData.Create(reading),
}, stoppingToken);
}
await Task.Delay(_delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
}
},
"ConnectionStrings": {
"evenire": "http://localhost:5001"
"evenire": "http://localhost:5243"
}
}
18 changes: 10 additions & 8 deletions src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class EventsProviderBenckmarks
private readonly static Guid _streamId = Guid.NewGuid();
private readonly Consumer _consumer = new Consumer();

private EventsProvider _sut;
private EventsReader _sut;

[Params(10, 100, 1000)]
public uint EventsCount;
Expand All @@ -26,19 +26,21 @@ public void GlobalSetup()
if (!Directory.Exists(dataPath))
Directory.CreateDirectory(dataPath);

var factory = new EventFactory(500_000);
var factory = new EventDataValidator(500_000);
var repoConfig = new FileEventsRepositoryConfig(dataPath);
var repo = new FileEventsRepository(repoConfig, factory);

var cache = new LRUCache<Guid, CachedEvents>(this.EventsCount);
var logger = new NullLogger<EventsProvider>();
var cache = new EventsCache(
new NullLogger<EventsCache>(),
new LRUCache<Guid, CachedEvents>(this.EventsCount),
repo);

var channel = Channel.CreateUnbounded<IncomingEventsGroup>();
var logger = new NullLogger<EventsReader>();

_sut = new EventsProvider(EventsProviderConfig.Default, repo, cache, channel.Writer, logger);
_sut = new EventsReader(EventsReaderConfig.Default, repo, cache, logger);

var events = Enumerable.Range(0, (int)this.EventsCount).Select(i => factory.Create(Guid.NewGuid(), "lorem", _data)).ToArray();
Task.WaitAll(_sut.AppendAsync(_streamId, events).AsTask());
var events = Enumerable.Range(0, (int)this.EventsCount).Select(i => new Event(new EventId(i, 0), "lorem", _data)).ToArray();
Task.WaitAll(repo.AppendAsync(_streamId, events).AsTask());
}

[Benchmark(Baseline = true)]
Expand Down
12 changes: 6 additions & 6 deletions src/EvenireDB.Benchmark/FileEventsRepositoryWriteBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ public class FileEventsRepositoryWriteBenckmarks
private readonly static byte[] _data = Enumerable.Repeat((byte)42, 100).ToArray();

private FileEventsRepositoryConfig _repoConfig;
private IEventFactory _factory;
private IEventDataValidator _factory;

private IEvent[] BuildEvents(int count)
=> Enumerable.Range(0, count).Select(i => _factory.Create(Guid.NewGuid(), "lorem", _data)).ToArray();
private Event[] BuildEvents(int count)
=> Enumerable.Range(0, count).Select(i => new Event(new EventId(i, 0), "lorem", _data)).ToArray();

[GlobalSetup]
public void Setup()
Expand All @@ -23,19 +23,19 @@ public void Setup()
if(!Directory.Exists(dataPath))
Directory.CreateDirectory(dataPath);

_factory = new EventFactory(500_000);
_factory = new EventDataValidator(500_000);
_repoConfig = new FileEventsRepositoryConfig(dataPath);
}

[Benchmark(Baseline = true)]
[ArgumentsSource(nameof(Data))]
public async Task WriteAsync_Baseline(IEvent[] events)
public async Task WriteAsync_Baseline(Event[] events)
{
var sut = new FileEventsRepository(_repoConfig, _factory);
await sut.AppendAsync(Guid.NewGuid(), events);
}

public IEnumerable<IEvent[]> Data()
public IEnumerable<Event[]> Data()
{
yield return BuildEvents(1_000);
yield return BuildEvents(10_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void Setup()
Encoding.UTF8.GetBytes(eventType, eventTypeBuff);

_headers[i] = new RawEventHeader(
eventId: Guid.NewGuid(),
eventId: new EventId(42, 71),
eventType: eventTypeBuff,
dataPosition: 42,
eventDataLength: 71,
Expand All @@ -41,7 +41,8 @@ public void BitConverter_Copy()
{
var header = _headers[i];

Array.Copy(header.EventId.ToByteArray(), 0, _buffer, 0, TypeSizes.GUID);
Array.Copy(BitConverter.GetBytes(header.EventIdTimestamp), 0, _buffer, 0, sizeof(ulong));
Array.Copy(BitConverter.GetBytes(header.EventIdSequence), 0, _buffer, sizeof(ulong), sizeof(ushort));
Array.Copy(BitConverter.GetBytes(header.DataPosition), 0, _buffer, 16, sizeof(long));
Array.Copy(header.EventType, 0, _buffer, 24, Constants.MAX_EVENT_TYPE_LENGTH);
Array.Copy(BitConverter.GetBytes(header.EventTypeLength), 0, _buffer, 74, sizeof(short));
Expand All @@ -56,7 +57,8 @@ public void Unsafe_Copy()
{
var header = _headers[i];

Array.Copy(header.EventId.ToByteArray(), 0, _buffer, 0, TypeSizes.GUID);
Unsafe.As<byte, long>(ref _buffer[0]) = header.EventIdTimestamp;
Unsafe.As<byte, int>(ref _buffer[sizeof(long)]) = header.EventIdSequence;
Unsafe.As<byte, long>(ref _buffer[16]) = header.DataPosition;
Array.Copy(header.EventType, 0, _buffer, 24, Constants.MAX_EVENT_TYPE_LENGTH);
Unsafe.As<byte, short>(ref _buffer[74]) = header.EventTypeLength;
Expand Down
36 changes: 5 additions & 31 deletions src/EvenireDB.Client/Event.cs
Original file line number Diff line number Diff line change
@@ -1,38 +1,12 @@
using EvenireDB.Common;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace EvenireDB.Client
namespace EvenireDB.Client
{
public record Event
public record Event : EventData
{
public Event(Guid id, string type, ReadOnlyMemory<byte> data)
public Event(EventId id, string type, ReadOnlyMemory<byte> data) : base(type, data)
{
if (string.IsNullOrWhiteSpace(type))
throw new ArgumentException($"'{nameof(type)}' cannot be null or whitespace.", nameof(type));

if (type.Length > Constants.MAX_EVENT_TYPE_LENGTH)
throw new ArgumentOutOfRangeException($"event type cannot be longer than {Constants.MAX_EVENT_TYPE_LENGTH} characters.", nameof(type));

Id = id;
Type = type;

if (data.Length == 0)
throw new ArgumentNullException(nameof(data));
Data = data;
Id = id ?? throw new ArgumentNullException(nameof(id));
}

public Guid Id { get; }
public string Type { get; }

public ReadOnlyMemory<byte> Data { get; }

public static Event Create<T>(T payload, string type = "")
{
if (string.IsNullOrWhiteSpace(type))
type = typeof(T).Name;
var bytes = JsonSerializer.SerializeToUtf8Bytes<T>(payload);
return new Event(Guid.NewGuid(), type, bytes);
}
public EventId Id { get; }
}
}
35 changes: 35 additions & 0 deletions src/EvenireDB.Client/EventData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using EvenireDB.Common;
using System.Text.Json;

namespace EvenireDB.Client
{
public record EventData
{
public EventData(string type, ReadOnlyMemory<byte> data)
{
if (string.IsNullOrWhiteSpace(type))
throw new ArgumentException($"'{nameof(type)}' cannot be null or whitespace.", nameof(type));

if (type.Length > Constants.MAX_EVENT_TYPE_LENGTH)
throw new ArgumentOutOfRangeException($"event type cannot be longer than {Constants.MAX_EVENT_TYPE_LENGTH} characters.", nameof(type));

Type = type;

if (data.Length == 0)
throw new ArgumentNullException(nameof(data));
Data = data;
}

public string Type { get; }

public ReadOnlyMemory<byte> Data { get; }

public static EventData Create<T>(T payload, string type = "")
{
if (string.IsNullOrWhiteSpace(type))
type = typeof(T).Name;
var bytes = JsonSerializer.SerializeToUtf8Bytes<T>(payload);
return new EventData(type, bytes);
}
}
}
4 changes: 4 additions & 0 deletions src/EvenireDB.Client/EventId.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace EvenireDB.Client
{
public record EventId(long Timestamp, int Sequence);
}
14 changes: 7 additions & 7 deletions src/EvenireDB.Client/GrpcEventsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ public GrpcEventsClient(EventsGrpcService.EventsGrpcServiceClient client)
_client = client ?? throw new ArgumentNullException(nameof(client));
}

public async ValueTask AppendAsync(Guid streamId, IEnumerable<Event> events, CancellationToken cancellationToken = default)
public async ValueTask AppendAsync(Guid streamId, IEnumerable<EventData> events, CancellationToken cancellationToken = default)
{
var request = new AppendRequest()
{
StreamId = streamId.ToString()
};

foreach (var @event in events)
{
request.Events.Add(new GrpcEvents.Event()
{
Id = @event.Id.ToString(),
request.Events.Add(new GrpcEvents.EventData()
{
Type = @event.Type,
Data = Google.Protobuf.UnsafeByteOperations.UnsafeWrap(@event.Data)
});
Expand Down Expand Up @@ -66,9 +66,9 @@ public async IAsyncEnumerable<Event> ReadAsync(
using var response = _client.Read(request, cancellationToken: cancellationToken);
await foreach(var item in response.ResponseStream.ReadAllAsync().ConfigureAwait(false))
{
var eventId = Guid.Parse(item.Id);
yield return new Event(eventId, item.Type, item.Data.Memory);
var eventId = new EventId(item.Id.Timestamp, (ushort)item.Id.Sequence);

yield return new Event(eventId, item.Type, item.Data.Memory);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/EvenireDB.Client/HttpEventsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async IAsyncEnumerable<Event> ReadAsync(
Direction direction = Direction.Forward,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var endpoint = $"/api/v1/events/{streamId}?pos={position}&dir={(int)direction}";
var endpoint = $"/api/v1/streams/{streamId}/events?pos={position}&dir={(int)direction}";
var response = await _httpClient.GetAsync(endpoint, HttpCompletionOption.ResponseHeadersRead, cancellationToken)
.ConfigureAwait(false);
response.EnsureSuccessStatusCode();
Expand All @@ -32,9 +32,9 @@ public async IAsyncEnumerable<Event> ReadAsync(
yield return item;
}

public async ValueTask AppendAsync(Guid streamId, IEnumerable<Event> events, CancellationToken cancellationToken = default)
public async ValueTask AppendAsync(Guid streamId, IEnumerable<EventData> events, CancellationToken cancellationToken = default)
{
var response = await _httpClient.PostAsJsonAsync($"/api/v1/events/{streamId}", events, cancellationToken)
var response = await _httpClient.PostAsJsonAsync($"/api/v1/streams/{streamId}/events", events, cancellationToken)
.ConfigureAwait(false);
if (response.IsSuccessStatusCode)
return;
Expand Down
2 changes: 1 addition & 1 deletion src/EvenireDB.Client/IEventsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace EvenireDB.Client
public interface IEventsClient
{
//TODO: add response
ValueTask AppendAsync(Guid streamId, IEnumerable<Event> events, CancellationToken cancellationToken = default);
ValueTask AppendAsync(Guid streamId, IEnumerable<EventData> events, CancellationToken cancellationToken = default);

IAsyncEnumerable<Event> ReadAsync(Guid streamId, StreamPosition position, Direction direction = Direction.Forward, CancellationToken cancellationToken = default);
}
Expand Down
6 changes: 3 additions & 3 deletions src/EvenireDB.Server/DTO/EventDTO.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace EvenireDB.Server.DTO
{
public record EventDTO(Guid Id, string Type, ReadOnlyMemory<byte> Data)
public record EventDTO(EventIdDTO Id, string Type, ReadOnlyMemory<byte> Data)
{
public static EventDTO FromModel(IEvent @event)
=> new EventDTO(@event.Id, @event.Type, @event.Data);
public static EventDTO FromModel(Event @event)
=> new EventDTO(EventIdDTO.FromModel(@event.Id), @event.Type, @event.Data);
}
}
4 changes: 4 additions & 0 deletions src/EvenireDB.Server/DTO/EventDataDTO.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace EvenireDB.Server.DTO
{
public record EventDataDTO(string Type, ReadOnlyMemory<byte> Data);
}
11 changes: 11 additions & 0 deletions src/EvenireDB.Server/DTO/EventIdDTO.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace EvenireDB.Server.DTO
{
public record EventIdDTO(long Timestamp, int Sequence)
{
public static EventIdDTO FromModel(EventId eventId)
=> new EventIdDTO(eventId.Timestamp, eventId.Sequence);

public EventId ToModel()
=> new EventId(this.Timestamp, this.Sequence);
}
}
19 changes: 9 additions & 10 deletions src/EvenireDB.Server/EventMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,31 @@

public class EventMapper
{
private readonly IEventFactory _factory;
private readonly IEventDataValidator _validator;

public EventMapper(IEventFactory factory)
public EventMapper(IEventDataValidator validator)
{
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
_validator = validator;
}

public IEvent[] ToModels(EventDTO[] dtos)
public EventData[] ToModels(EventDataDTO[] dtos)
{
if (dtos is null)
throw new ArgumentNullException(nameof(dtos));
if (dtos.Length == 0)
return Array.Empty<IEvent>();
return Array.Empty<Event>();

var events = new IEvent[dtos.Length];
var events = new EventData[dtos.Length];
for (int i = 0; i < dtos.Length; i++)
{
events[i] = ToModel(dtos[i]);
}
return events;
}

public IEvent ToModel(EventDTO dto)
public EventData ToModel(EventDataDTO dto)
{
ArgumentNullException.ThrowIfNull(dto, nameof(dto));

return _factory.Create(dto.Id, dto.Type, dto.Data);
_validator.Validate(dto.Type, dto.Data);
return new EventData(dto.Type, dto.Data);
}
}
Loading

0 comments on commit 0c10542

Please sign in to comment.