Skip to content

Commit

Permalink
Ensure CustomDataAdapter.cs works in cluster with multiple silos
Browse files Browse the repository at this point in the history
  • Loading branch information
egil authored Jan 18, 2025
1 parent 2adfa49 commit 78f31ad
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions orleans/Streaming/CustomDataAdapter/Silo/CustomDataAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,33 @@ protected override IBatchContainer GetBatchContainer(EventHubMessage eventHubMes
=> new CustomBatchContainer(eventHubMessage);
}

public class CustomBatchContainer : IBatchContainer
[GenerateSerializer, Immutable]
public sealed class CustomBatchContainer : IBatchContainer
{
public StreamId StreamId { get; }
[Id(0)]
private readonly EventHubMessage _eventHubMessage;

[Id(1)]
public StreamSequenceToken SequenceToken { get; }

private readonly byte[] _payload;
public StreamId StreamId => _eventHubMessage.StreamId;

public CustomBatchContainer(EventHubMessage eventHubMessage)
{
StreamId = eventHubMessage.StreamId;
SequenceToken = new EventHubSequenceTokenV2(eventHubMessage.Offset, eventHubMessage.SequenceNumber, 0);
_payload = eventHubMessage.Payload;
_eventHubMessage = eventHubMessage;
SequenceToken = new EventHubSequenceTokenV2(_eventHubMessage.Offset, _eventHubMessage.SequenceNumber, 0);
}

public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{
try
{
var evt = JsonSerializer.Deserialize<T>(_payload)!;
var evt = JsonSerializer.Deserialize<T>(_eventHubMessage.Payload)!;
return new[] { Tuple.Create(evt, SequenceToken) };
}
catch (Exception)
{
return new List<Tuple<T, StreamSequenceToken>>();
return Array.Empty<Tuple<T, StreamSequenceToken>>();
}
}

Expand Down

0 comments on commit 78f31ad

Please sign in to comment.