Skip to content

Commit

Permalink
Merge pull request #11 from sceneskope/dev
Browse files Browse the repository at this point in the history
Added base reading receiver
  • Loading branch information
nrandell authored Aug 3, 2018
2 parents 8f63383 + ff67ea8 commit c063fec
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 55 deletions.
2 changes: 1 addition & 1 deletion directory.build.props
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<PackageLicenseUrl>https://opensource.org/licenses/MIT</PackageLicenseUrl>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<LangVersion>latest</LangVersion>
<VersionPrefix>5.1.1</VersionPrefix>
<VersionPrefix>5.2.0</VersionPrefix>
<DebugType>embedded</DebugType>
</PropertyGroup>
<PropertyGroup Condition="'$(IsTestProject)' != 'true' ">
Expand Down
52 changes: 52 additions & 0 deletions src/SceneSkope.ServiceFabric.EventHubs/BaseReadingReceiver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.ServiceFabric.Data;
using Microsoft.ServiceFabric.Data.Collections;
using Serilog;
using ServiceFabric.Utilities;

namespace SceneSkope.ServiceFabric.EventHubs
{
public abstract class BaseReadingReceiver : IReadingReceiver
{
public Func<Exception, bool> TransientExceptionChecker { get; }
public ServiceFabricRetryHandler RetryHandler { get; }
public ILogger Log { get; }
public IReliableStateManager StateManager { get; }

protected readonly IReliableDictionary<string, string> _offsets;
protected readonly string _partition;

public PartitionReceiver Receiver { get; }

protected BaseReadingReceiver(ILogger log,
IReliableStateManager stateManager,
PartitionReceiver receiver,
IReliableDictionary<string, string> offsets, string partition,
ServiceFabricRetryHandler retryHandler,
Func<Exception, bool> transientExceptionChecker = null)
{
RetryHandler = retryHandler;
Log = log;
StateManager = stateManager;
Receiver = receiver;
_offsets = offsets;
_partition = partition;
TransientExceptionChecker = transientExceptionChecker;
}

public virtual Task InitialiseAsync() => Task.CompletedTask;

protected Task SaveOffsetAsync(ITransaction tx, string latestOffset) => _offsets.SetAsync(tx, _partition, latestOffset);

public virtual void Dispose()
{
}

public Task ProcessEventsAsync(IEnumerable<EventData> events) => ProcessEventsAsync((IReadOnlyList<EventData>)events);

protected abstract Task ProcessEventsAsync(IReadOnlyList<EventData> events);
}
}
31 changes: 6 additions & 25 deletions src/SceneSkope.ServiceFabric.EventHubs/BatchedReadingReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,23 @@

namespace SceneSkope.ServiceFabric.EventHubs
{
public abstract class BatchedReadingReceiver : IReadingReceiver
public abstract class BatchedReadingReceiver : BaseReadingReceiver
{
public Func<Exception, bool> TransientExceptionChecker { get; }
public ServiceFabricRetryHandler RetryHandler { get; }
public ILogger Log { get; }
public IReliableStateManager StateManager { get; }

private readonly IReliableDictionary<string, string> _offsets;
protected readonly string _partition;

public PartitionReceiver Receiver { get; }

protected BatchedReadingReceiver(ILogger log,
IReliableStateManager stateManager,
PartitionReceiver receiver,
IReliableDictionary<string, string> offsets, string partition,
IReliableDictionary<string, string> offsets,
string partition,
ServiceFabricRetryHandler retryHandler,
Func<Exception, bool> transientExceptionChecker = null)
Func<Exception, bool> transientExceptionChecker = null) :
base(log, stateManager, receiver, offsets, partition, retryHandler, transientExceptionChecker)
{
RetryHandler = retryHandler;
Log = log;
StateManager = stateManager;
Receiver = receiver;
_offsets = offsets;
_partition = partition;
TransientExceptionChecker = transientExceptionChecker;
}

public virtual Task InitialiseAsync() => Task.CompletedTask;

protected abstract Task ProcessEventAsync(ITransaction tx, EventData @event, CancellationToken serviceCancellationToken);

public async Task ProcessEventsAsync(IEnumerable<EventData> events)
protected override async Task ProcessEventsAsync(IReadOnlyList<EventData> events)
{
await BeforeProcessEventsAsync(RetryHandler.ServiceCancellationToken).ConfigureAwait(false);
await RetryHandler.HandleAsync(async cancel =>
Expand All @@ -69,8 +53,5 @@ await RetryHandler.HandleAsync(async cancel =>
public virtual Task BeforeProcessEventsAsync(CancellationToken serviceCancellationToken) => Task.CompletedTask;
public virtual Task AfterProcessEventsAsync(CancellationToken serviceCancellationToken) => Task.CompletedTask;

public virtual void Dispose()
{
}
}
}
35 changes: 6 additions & 29 deletions src/SceneSkope.ServiceFabric.EventHubs/SimpleReadingReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,22 @@

namespace SceneSkope.ServiceFabric.EventHubs
{
public abstract class SimpleReadingReceiver : IReadingReceiver
public abstract class SimpleReadingReceiver : BaseReadingReceiver
{
public Func<Exception, bool> TransientExceptionChecker { get; }
public ServiceFabricRetryHandler RetryHandler { get; }
public ILogger Log { get; }
public IReliableStateManager StateManager { get; }

private readonly IReliableDictionary<string, string> _offsets;
protected readonly string _partition;

public PartitionReceiver Receiver { get; }

protected SimpleReadingReceiver(ILogger log,
IReliableStateManager stateManager,
PartitionReceiver receiver,
IReliableDictionary<string, string> offsets, string partition,
IReliableDictionary<string, string> offsets,
string partition,
ServiceFabricRetryHandler retryHandler,
Func<Exception, bool> transientExceptionChecker = null)
Func<Exception, bool> transientExceptionChecker = null) :
base(log, stateManager, receiver, offsets, partition, retryHandler, transientExceptionChecker)
{
RetryHandler = retryHandler;
Log = log;
StateManager = stateManager;
Receiver = receiver;
_offsets = offsets;
_partition = partition;
TransientExceptionChecker = transientExceptionChecker;
}

public virtual Task InitialiseAsync() => Task.CompletedTask;

protected abstract Task ProcessEventAsync(EventData @event);

public async Task ProcessEventsAsync(IEnumerable<EventData> events)
protected override async Task ProcessEventsAsync(IReadOnlyList<EventData> events)
{
string latestOffset = null;
foreach (var @event in events)
Expand All @@ -53,11 +36,5 @@ public async Task ProcessEventsAsync(IEnumerable<EventData> events)
}

protected virtual Task OnAllEventsProcessedAsync(string latestOffset) => Task.CompletedTask;

protected Task SaveOffsetAsync(ITransaction tx, string latestOffset) => _offsets.SetAsync(tx, _partition, latestOffset);

public virtual void Dispose()
{
}
}
}

0 comments on commit c063fec

Please sign in to comment.