From 858879c81df37a7b2f54cb6066f0e822fd9b3217 Mon Sep 17 00:00:00 2001 From: David Guida <1432872+mizrael@users.noreply.github.com> Date: Sat, 8 Jun 2024 18:35:23 -0400 Subject: [PATCH] added streams info --- .../EventsProviderBenckmarks.cs | 2 +- src/EvenireDB.Server/Routes/StreamsRoutes.cs | 115 ++++++++++-------- src/EvenireDB/ExtentInfoProviderConfig.cs | 5 - src/EvenireDB/Extents/ExtentInfo.cs | 13 +- src/EvenireDB/Extents/ExtentInfoProvider.cs | 26 ---- src/EvenireDB/Extents/IExtentInfoProvider.cs | 7 -- src/EvenireDB/Extents/IStreamInfoProvider.cs | 8 ++ src/EvenireDB/Extents/StreamInfo.cs | 7 ++ src/EvenireDB/Extents/StreamInfoProvider.cs | 53 ++++++++ src/EvenireDB/IServiceCollectionExtensions.cs | 4 +- src/EvenireDB/Persistence/EventsProvider.cs | 8 +- .../Persistence/HeadersRepository.cs | 16 ++- tests/EvenireDB.Tests/EventsProviderTests.cs | 8 +- 13 files changed, 157 insertions(+), 115 deletions(-) delete mode 100644 src/EvenireDB/ExtentInfoProviderConfig.cs delete mode 100644 src/EvenireDB/Extents/ExtentInfoProvider.cs delete mode 100644 src/EvenireDB/Extents/IExtentInfoProvider.cs create mode 100644 src/EvenireDB/Extents/IStreamInfoProvider.cs create mode 100644 src/EvenireDB/Extents/StreamInfo.cs create mode 100644 src/EvenireDB/Extents/StreamInfoProvider.cs diff --git a/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs b/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs index 59f3d22..9e8d116 100644 --- a/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs +++ b/src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs @@ -29,7 +29,7 @@ public void GlobalSetup() Directory.CreateDirectory(dataPath); var repoConfig = new FileEventsRepositoryConfig(); - var extentInfoProvider = new ExtentInfoProvider(new ExtentInfoProviderConfig(dataPath)); + var extentInfoProvider = new StreamInfoProvider(new StreamInfoProviderConfig(dataPath)); var dataRepo = new DataRepository(); var headersRepo = new HeadersRepository(); var repo = new EventsProvider(headersRepo, dataRepo, extentInfoProvider); diff --git a/src/EvenireDB.Server/Routes/StreamsRoutes.cs b/src/EvenireDB.Server/Routes/StreamsRoutes.cs index 3d52cc3..c27e69c 100644 --- a/src/EvenireDB.Server/Routes/StreamsRoutes.cs +++ b/src/EvenireDB.Server/Routes/StreamsRoutes.cs @@ -1,64 +1,79 @@ using EvenireDB.Common; +using EvenireDB.Extents; using EvenireDB.Server.DTO; using Microsoft.AspNetCore.Mvc; -namespace EvenireDB.Server.Routes +namespace EvenireDB.Server.Routes; +public static class StreamsRoutes { - //TODO: add endpoint to get all streams - public static class StreamsRoutes + public static WebApplication MapEventsRoutes(this WebApplication app) { - public static WebApplication MapEventsRoutes(this WebApplication app) - { - var api = app.NewVersionedApi(); - var v1 = api.MapGroup("/api/v{version:apiVersion}/streams") - .HasApiVersion(1.0); - v1.MapGet("/{streamId:guid}/events", GetEvents).WithName(nameof(GetEvents)); - v1.MapPost("/{streamId:guid}/events", SaveEvents).WithName(nameof(SaveEvents)); + var api = app.NewVersionedApi(); + var v1 = api.MapGroup("/api/v{version:apiVersion}/streams") + .HasApiVersion(1.0); + v1.MapGet("", GetStreams).WithName(nameof(GetStreams)); + v1.MapGet("/{streamId:guid}", GetStreamInfo).WithName(nameof(GetStreamInfo)); + v1.MapGet("/{streamId:guid}/events", GetEvents).WithName(nameof(GetEvents)); + v1.MapPost("/{streamId:guid}/events", SaveEvents).WithName(nameof(SaveEvents)); - return app; - } + return app; + } - private static async IAsyncEnumerable GetEvents( - [FromServices] IEventsReader reader, - Guid streamId, - [FromQuery(Name = "pos")] uint startPosition = 0, - [FromQuery(Name = "dir")] Direction direction = Direction.Forward) - { - await foreach (var @event in reader.ReadAsync(streamId, direction: direction, startPosition: startPosition).ConfigureAwait(false)) - yield return EventDTO.FromModel(@event); - } + private static IResult GetStreams( + [FromServices] IStreamInfoProvider provider) + { + var streams = provider.GetStreamsInfo(); + return Results.Ok(streams); + } - private static async ValueTask SaveEvents( - [FromServices] EventMapper mapper, - [FromServices] IEventsWriter writer, - Guid streamId, - [FromQuery(Name = "version")] int? expectedVersion, - [FromBody] EventDataDTO[]? dtos) - { - if(dtos is null) - return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, "No events provided")); + private static IResult GetStreamInfo( + [FromServices] IStreamInfoProvider provider, + Guid streamId) + { + var result = provider.GetStreamInfo(streamId); + return Results.Ok(result); + } - EventData[] events; + private static async IAsyncEnumerable GetEvents( + [FromServices] IEventsReader reader, + Guid streamId, + [FromQuery(Name = "pos")] uint startPosition = 0, + [FromQuery(Name = "dir")] Direction direction = Direction.Forward) + { + await foreach (var @event in reader.ReadAsync(streamId, direction: direction, startPosition: startPosition).ConfigureAwait(false)) + yield return EventDTO.FromModel(@event); + } - try - { - events = mapper.ToModels(dtos); - } - catch(Exception ex) - { - return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, ex.Message)); - } + private static async ValueTask SaveEvents( + [FromServices] EventMapper mapper, + [FromServices] IEventsWriter writer, + Guid streamId, + [FromQuery(Name = "version")] int? expectedVersion, + [FromBody] EventDataDTO[]? dtos) + { + if (dtos is null) + return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, "No events provided")); + + EventData[] events; - var result = await writer.AppendAsync(streamId, events, expectedVersion) - .ConfigureAwait(false); - return result switch - { - FailureResult { Code: ErrorCodes.DuplicateEvent } d => Results.Conflict(new ApiError(ErrorCodes.DuplicateEvent, d.Message)), - FailureResult { Code: ErrorCodes.VersionMismatch } d => Results.BadRequest(new ApiError(ErrorCodes.VersionMismatch, d.Message)), - FailureResult { Code: ErrorCodes.BadRequest } d => Results.BadRequest(new ApiError(ErrorCodes.BadRequest, d.Message)), - FailureResult => Results.StatusCode(500), - _ => Results.AcceptedAtRoute(nameof(GetEvents), new { streamId }) - }; + try + { + events = mapper.ToModels(dtos); } - } + catch (Exception ex) + { + return Results.BadRequest(new ApiError(ErrorCodes.BadRequest, ex.Message)); + } + + var result = await writer.AppendAsync(streamId, events, expectedVersion) + .ConfigureAwait(false); + return result switch + { + FailureResult { Code: ErrorCodes.DuplicateEvent } d => Results.Conflict(new ApiError(ErrorCodes.DuplicateEvent, d.Message)), + FailureResult { Code: ErrorCodes.VersionMismatch } d => Results.BadRequest(new ApiError(ErrorCodes.VersionMismatch, d.Message)), + FailureResult { Code: ErrorCodes.BadRequest } d => Results.BadRequest(new ApiError(ErrorCodes.BadRequest, d.Message)), + FailureResult => Results.StatusCode(500), + _ => Results.AcceptedAtRoute(nameof(GetEvents), new { streamId }) + }; + } } \ No newline at end of file diff --git a/src/EvenireDB/ExtentInfoProviderConfig.cs b/src/EvenireDB/ExtentInfoProviderConfig.cs deleted file mode 100644 index 308cb0c..0000000 --- a/src/EvenireDB/ExtentInfoProviderConfig.cs +++ /dev/null @@ -1,5 +0,0 @@ - -namespace EvenireDB -{ - internal record ExtentInfoProviderConfig(string BasePath); -} \ No newline at end of file diff --git a/src/EvenireDB/Extents/ExtentInfo.cs b/src/EvenireDB/Extents/ExtentInfo.cs index 7781005..3dbc06d 100644 --- a/src/EvenireDB/Extents/ExtentInfo.cs +++ b/src/EvenireDB/Extents/ExtentInfo.cs @@ -1,8 +1,7 @@ -namespace EvenireDB.Extents +namespace EvenireDB.Extents; + +public readonly struct ExtentInfo { - public readonly struct ExtentInfo - { - public readonly required string DataPath { get; init; } - public readonly required string HeadersPath { get; init; } - } -} \ No newline at end of file + public readonly required string DataPath { get; init; } + public readonly required string HeadersPath { get; init; } +} diff --git a/src/EvenireDB/Extents/ExtentInfoProvider.cs b/src/EvenireDB/Extents/ExtentInfoProvider.cs deleted file mode 100644 index 9752a5f..0000000 --- a/src/EvenireDB/Extents/ExtentInfoProvider.cs +++ /dev/null @@ -1,26 +0,0 @@ -namespace EvenireDB.Extents -{ - internal class ExtentInfoProvider : IExtentInfoProvider - { - private readonly ExtentInfoProviderConfig _config; - - public ExtentInfoProvider(ExtentInfoProviderConfig config) - { - _config = config ?? throw new ArgumentNullException(nameof(config)); - if (!Directory.Exists(_config.BasePath)) - Directory.CreateDirectory(config.BasePath); - } - - public ExtentInfo Get(Guid streamId) - { - // TODO: tests - var key = streamId.ToString("N"); - int extentNumber = 0; // TODO: calculate - return new ExtentInfo - { - DataPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"), - HeadersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat"), - }; - } - } -} \ No newline at end of file diff --git a/src/EvenireDB/Extents/IExtentInfoProvider.cs b/src/EvenireDB/Extents/IExtentInfoProvider.cs deleted file mode 100644 index 6654ef3..0000000 --- a/src/EvenireDB/Extents/IExtentInfoProvider.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace EvenireDB.Extents -{ - public interface IExtentInfoProvider - { - ExtentInfo Get(Guid streamId); - } -} \ No newline at end of file diff --git a/src/EvenireDB/Extents/IStreamInfoProvider.cs b/src/EvenireDB/Extents/IStreamInfoProvider.cs new file mode 100644 index 0000000..ccac4cd --- /dev/null +++ b/src/EvenireDB/Extents/IStreamInfoProvider.cs @@ -0,0 +1,8 @@ +namespace EvenireDB.Extents; + +public interface IStreamInfoProvider +{ + ExtentInfo GetExtentInfo(Guid streamId); + IEnumerable GetStreamsInfo(); + StreamInfo GetStreamInfo(Guid streamId); +} \ No newline at end of file diff --git a/src/EvenireDB/Extents/StreamInfo.cs b/src/EvenireDB/Extents/StreamInfo.cs new file mode 100644 index 0000000..aa64efb --- /dev/null +++ b/src/EvenireDB/Extents/StreamInfo.cs @@ -0,0 +1,7 @@ +namespace EvenireDB.Extents; + +public readonly record struct StreamInfo( + Guid StreamId, + long EventsCount, + DateTimeOffset CreatedAt, + DateTimeOffset LastAccessedAt); \ No newline at end of file diff --git a/src/EvenireDB/Extents/StreamInfoProvider.cs b/src/EvenireDB/Extents/StreamInfoProvider.cs new file mode 100644 index 0000000..4eb59c6 --- /dev/null +++ b/src/EvenireDB/Extents/StreamInfoProvider.cs @@ -0,0 +1,53 @@ +using System.Runtime.InteropServices; + +namespace EvenireDB.Extents; + +internal record StreamInfoProviderConfig(string BasePath); + +internal class StreamInfoProvider : IStreamInfoProvider +{ + private readonly StreamInfoProviderConfig _config; + private readonly int _headerSize = Marshal.SizeOf(); + + public StreamInfoProvider(StreamInfoProviderConfig config) + { + _config = config ?? throw new ArgumentNullException(nameof(config)); + if (!Directory.Exists(_config.BasePath)) + Directory.CreateDirectory(config.BasePath); + } + + public ExtentInfo GetExtentInfo(Guid streamId) + { + // TODO: tests + var key = streamId.ToString("N"); + int extentNumber = 0; // TODO: calculate + return new ExtentInfo + { + DataPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_data.dat"), + HeadersPath = Path.Combine(_config.BasePath, $"{key}_{extentNumber}_headers.dat"), + }; + } + + public StreamInfo GetStreamInfo(Guid streamId) + { + var extent = this.GetExtentInfo(streamId); + + var fileInfo = new FileInfo(extent.HeadersPath); + var headersCount = fileInfo.Length / _headerSize; + return new StreamInfo( + streamId, + headersCount, + fileInfo.CreationTimeUtc, + fileInfo.LastWriteTimeUtc); + } + + public IEnumerable GetStreamsInfo() + { + var headersFiles = Directory.GetFiles(_config.BasePath, "*_headers.dat"); + foreach(var headerFile in headersFiles) + { + var key = Path.GetFileNameWithoutExtension(headerFile).Split('_')[0]; + yield return GetStreamInfo(Guid.Parse(key)); + } + } +} \ No newline at end of file diff --git a/src/EvenireDB/IServiceCollectionExtensions.cs b/src/EvenireDB/IServiceCollectionExtensions.cs index 2bffd4a..1210b39 100644 --- a/src/EvenireDB/IServiceCollectionExtensions.cs +++ b/src/EvenireDB/IServiceCollectionExtensions.cs @@ -52,9 +52,9 @@ public static IServiceCollection AddEvenire(this IServiceCollection services, Ev dataPath = Path.Combine(AppContext.BaseDirectory, dataPath); } - return new ExtentInfoProviderConfig(dataPath); + return new StreamInfoProviderConfig(dataPath); }) - .AddSingleton() + .AddSingleton() .AddSingleton() .AddSingleton() .AddSingleton() diff --git a/src/EvenireDB/Persistence/EventsProvider.cs b/src/EvenireDB/Persistence/EventsProvider.cs index 906b083..56db950 100644 --- a/src/EvenireDB/Persistence/EventsProvider.cs +++ b/src/EvenireDB/Persistence/EventsProvider.cs @@ -6,12 +6,12 @@ namespace EvenireDB.Persistence; internal class EventsProvider : IEventsProvider { - private readonly IExtentInfoProvider _extentInfoProvider; + private readonly IStreamInfoProvider _extentInfoProvider; private readonly IHeadersRepository _headersRepo; private readonly IDataRepository _dataRepo; private readonly ConcurrentDictionary _streamLocks = new(); - public EventsProvider(IHeadersRepository headersRepo, IDataRepository dataRepo, IExtentInfoProvider extentInfoProvider) + public EventsProvider(IHeadersRepository headersRepo, IDataRepository dataRepo, IStreamInfoProvider extentInfoProvider) { _headersRepo = headersRepo; _dataRepo = dataRepo; @@ -24,7 +24,7 @@ public async IAsyncEnumerable ReadAsync( int? take = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - var extentInfo = _extentInfoProvider.Get(streamId); + var extentInfo = _extentInfoProvider.GetExtentInfo(streamId); if (!File.Exists(extentInfo.DataPath) || !File.Exists(extentInfo.HeadersPath)) yield break; @@ -39,7 +39,7 @@ public async ValueTask AppendAsync( Guid streamId, IEnumerable events, CancellationToken cancellationToken = default) { - var extentInfo = _extentInfoProvider.Get(streamId); + var extentInfo = _extentInfoProvider.GetExtentInfo(streamId); var semaphore = _streamLocks.GetOrAdd(streamId, _ => new SemaphoreSlim(1, 1)); await semaphore.WaitAsync(cancellationToken); diff --git a/src/EvenireDB/Persistence/HeadersRepository.cs b/src/EvenireDB/Persistence/HeadersRepository.cs index 27fab57..130b08e 100644 --- a/src/EvenireDB/Persistence/HeadersRepository.cs +++ b/src/EvenireDB/Persistence/HeadersRepository.cs @@ -7,6 +7,8 @@ namespace EvenireDB.Persistence; internal class HeadersRepository : IHeadersRepository { + private readonly int _headerSize = Marshal.SizeOf(); + public async ValueTask AppendAsync(ExtentInfo extentInfo, IAsyncEnumerable headers, CancellationToken cancellationToken = default) { var headerSize = Marshal.SizeOf(); @@ -38,24 +40,20 @@ public async IAsyncEnumerable ReadAsync( int? take = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) { - var headerSize = Marshal.SizeOf(); - - var streamBufferSize = headerSize * take.GetValueOrDefault(100); + var streamBufferSize = _headerSize * take.GetValueOrDefault(100); using var stream = new FileStream(extentInfo.HeadersPath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: streamBufferSize, useAsync: true); - byte[] buffer = ArrayPool.Shared.Rent(headerSize); + byte[] buffer = ArrayPool.Shared.Rent(_headerSize); try { - - if (skip.HasValue) - stream.Seek(skip.Value * headerSize, SeekOrigin.Begin); + stream.Seek(skip.Value * _headerSize, SeekOrigin.Begin); while (!cancellationToken.IsCancellationRequested) { - var bytesRead = await stream.ReadAsync(buffer, 0, headerSize, cancellationToken); + var bytesRead = await stream.ReadAsync(buffer, 0, _headerSize, cancellationToken); if (bytesRead == 0) yield break; @@ -66,7 +64,7 @@ public async IAsyncEnumerable ReadAsync( yield break; } - var header = MemoryMarshal.Read(buffer.AsSpan(0, headerSize)); + var header = MemoryMarshal.Read(buffer.AsSpan(0, _headerSize)); yield return header; } } diff --git a/tests/EvenireDB.Tests/EventsProviderTests.cs b/tests/EvenireDB.Tests/EventsProviderTests.cs index 10915d2..87ce300 100644 --- a/tests/EvenireDB.Tests/EventsProviderTests.cs +++ b/tests/EvenireDB.Tests/EventsProviderTests.cs @@ -12,10 +12,10 @@ public EventsProviderTests(DataFixture fixture) _fixture = fixture; } - private EventsProvider CreateSut(Guid streamId, out IExtentInfoProvider extentInfoProvider) + private EventsProvider CreateSut(Guid streamId, out IStreamInfoProvider extentInfoProvider) { var config = _fixture.CreateExtentsConfig(streamId); - extentInfoProvider = new ExtentInfoProvider(config); + extentInfoProvider = new StreamInfoProvider(config); var dataRepo = new DataRepository(); var headersRepo = new HeadersRepository(); return new EventsProvider(headersRepo, dataRepo, extentInfoProvider); @@ -34,7 +34,7 @@ public async Task AppendAsync_should_write_events(int eventsCount, int expectedF await sut.AppendAsync(streamId, events); - var extentInfo = extentInfoProvider.Get(streamId); + var extentInfo = extentInfoProvider.GetExtentInfo(streamId); var bytes = File.ReadAllBytes(extentInfo.DataPath); Assert.Equal(expectedFileSize, bytes.Length); } @@ -54,7 +54,7 @@ public async Task AppendAsync_should_append_events(int batchesCount, int eventsP foreach (var events in batches) await sut.AppendAsync(streamId, events); - var extentInfo = extentInfoProvider.Get(streamId); + var extentInfo = extentInfoProvider.GetExtentInfo(streamId); var bytes = File.ReadAllBytes(extentInfo.DataPath); Assert.Equal(expectedFileSize, bytes.Length); }