Skip to content

Commit

Permalink
added more details to stream info
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Jun 9, 2024
1 parent 2a328ab commit c60864d
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 229 deletions.
4 changes: 2 additions & 2 deletions src/EvenireDB.Benchmark/EventsProviderBenckmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public void GlobalSetup()
var headersRepo = new HeadersRepository();
var repo = new EventsProvider(headersRepo, dataRepo, extentInfoProvider);

var cache = new EventsCache(
new NullLogger<EventsCache>(),
var cache = new StreamsCache(
new NullLogger<StreamsCache>(),
new LRUCache<Guid, CachedEvents>(this.EventsCount),
repo);

Expand Down
39 changes: 0 additions & 39 deletions src/EvenireDB/EventsCache.cs

This file was deleted.

6 changes: 3 additions & 3 deletions src/EvenireDB/EventsReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ namespace EvenireDB
{
internal class EventsReader : IEventsReader
{
private readonly IEventsCache _cache;
private readonly IStreamsCache _cache;
private readonly EventsReaderConfig _config;

public EventsReader(
EventsReaderConfig config,
IEventsCache cache)
IStreamsCache cache)
{
_cache = cache ?? throw new ArgumentNullException(nameof(cache));
_config = config ?? throw new ArgumentNullException(nameof(config));
Expand All @@ -27,7 +27,7 @@ public async IAsyncEnumerable<Event> ReadAsync(
if (startPosition < 0)
throw new ArgumentOutOfRangeException(nameof(startPosition));

CachedEvents entry = await _cache.EnsureStreamAsync(streamId, cancellationToken).ConfigureAwait(false);
CachedEvents entry = await _cache.GetEventsAsync(streamId, cancellationToken).ConfigureAwait(false);

if (entry?.Events == null || entry.Events.Count == 0)
yield break;
Expand Down
6 changes: 3 additions & 3 deletions src/EvenireDB/EventsWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ namespace EvenireDB
// TODO: append to a transaction log
public class EventsWriter : IEventsWriter
{
private readonly IEventsCache _cache;
private readonly IStreamsCache _cache;
private readonly ChannelWriter<IncomingEventsGroup> _writer;
private readonly IEventIdGenerator _idGenerator;
private readonly ILogger<EventsWriter> _logger;

public EventsWriter(IEventsCache cache, ChannelWriter<IncomingEventsGroup> writer, IEventIdGenerator idGenerator, ILogger<EventsWriter> logger)
public EventsWriter(IStreamsCache cache, ChannelWriter<IncomingEventsGroup> writer, IEventIdGenerator idGenerator, ILogger<EventsWriter> logger)
{
_cache = cache;
_writer = writer;
Expand All @@ -33,7 +33,7 @@ public async ValueTask<IOperationResult> AppendAsync(
if (!incomingEvents.Any())
return new SuccessResult();

CachedEvents entry = await _cache.EnsureStreamAsync(streamId, cancellationToken).ConfigureAwait(false);
CachedEvents entry = await _cache.GetEventsAsync(streamId, cancellationToken).ConfigureAwait(false);

if (expectedVersion.HasValue && entry.Events.Count != expectedVersion)
return FailureResult.VersionMismatch(streamId, expectedVersion.Value, entry.Events.Count);
Expand Down
7 changes: 0 additions & 7 deletions src/EvenireDB/IEventsCache.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/EvenireDB/IServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static IServiceCollection AddEvenire(this IServiceCollection services, Ev
{
return new LRUCache<Guid, CachedEvents>(settings.MaxInMemoryStreamsCount);
})
.AddSingleton<IEventsCache, EventsCache>()
.AddSingleton<IStreamsCache, StreamsCache>()
.AddSingleton(ctx =>
{

Expand Down
8 changes: 8 additions & 0 deletions src/EvenireDB/IStreamsCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace EvenireDB;

public interface IStreamsCache
{
void Update(Guid streamId, CachedEvents entry);
ValueTask<CachedEvents> GetEventsAsync(Guid streamId, CancellationToken cancellationToken);
bool ContainsKey(Guid streamId);
}
1 change: 1 addition & 0 deletions src/EvenireDB/StreamInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ namespace EvenireDB;
public readonly record struct StreamInfo(
Guid StreamId,
long EventsCount,
bool IsCached,
DateTimeOffset CreatedAt,
DateTimeOffset LastAccessedAt);
7 changes: 5 additions & 2 deletions src/EvenireDB/StreamInfoProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ internal class StreamInfoProvider : IStreamInfoProvider
{
private readonly int _headerSize = Marshal.SizeOf<RawHeader>();
private readonly IExtentInfoProvider _extentInfoProvider;

public StreamInfoProvider(IExtentInfoProvider extentInfoProvider)
private readonly IStreamsCache _cache;

public StreamInfoProvider(IExtentInfoProvider extentInfoProvider, IStreamsCache cache)
{
_extentInfoProvider = extentInfoProvider ?? throw new ArgumentNullException(nameof(extentInfoProvider));
_cache = cache ?? throw new ArgumentNullException(nameof(cache));
}

public StreamInfo GetStreamInfo(Guid streamId)
Expand All @@ -21,6 +23,7 @@ public StreamInfo GetStreamInfo(Guid streamId)
return new StreamInfo(
streamId,
headersCount,
_cache.ContainsKey(streamId),
fileInfo.CreationTimeUtc,
fileInfo.LastWriteTimeUtc);
}
Expand Down
41 changes: 41 additions & 0 deletions src/EvenireDB/StreamsCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using EvenireDB.Persistence;
using EvenireDB.Utils;
using Microsoft.Extensions.Logging;

namespace EvenireDB;

internal class StreamsCache : IStreamsCache
{
private readonly ICache<Guid, CachedEvents> _cache;
private readonly ILogger<StreamsCache> _logger;
private readonly IEventsProvider _repo;

public StreamsCache(
ILogger<StreamsCache> logger,
ICache<Guid, CachedEvents> cache,
IEventsProvider repo)
{
_logger = logger;
_cache = cache;
_repo = repo;
}

private async ValueTask<CachedEvents> Factory(Guid streamId, CancellationToken cancellationToken)
{
_logger.ReadingStreamFromRepository(streamId);

var persistedEvents = new List<Event>();
await foreach (var @event in _repo.ReadAsync(streamId, cancellationToken: cancellationToken))
persistedEvents.Add(@event);
return new CachedEvents(persistedEvents, new SemaphoreSlim(1));
}

public ValueTask<CachedEvents> GetEventsAsync(Guid streamId, CancellationToken cancellationToken)
=> _cache.GetOrAddAsync(streamId, this.Factory, cancellationToken);

public void Update(Guid streamId, CachedEvents entry)
=> _cache.AddOrUpdate(streamId, entry);

public bool ContainsKey(Guid streamId)
=> _cache.ContainsKey(streamId);
}
Loading

0 comments on commit c60864d

Please sign in to comment.