Skip to content

Commit

Permalink
feat: OTEL redis support
Browse files Browse the repository at this point in the history
  • Loading branch information
kirinnee committed Dec 12, 2023
1 parent a01cfae commit 4606ef1
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 7 deletions.
2 changes: 1 addition & 1 deletion App/Config/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ AllowedHosts: "*"
Kestrel:
Endpoints:
Http:
Url: http://+:9001
Url: http://+:9002
Logging:
LogLevel:
Default: Information
Expand Down
8 changes: 8 additions & 0 deletions App/Modules/Bookings/Data/BookingCdcModel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace App.Modules.Bookings.Data;

public class BookingCdcModel
{

}


14 changes: 14 additions & 0 deletions App/Modules/Bookings/Data/BookingCdcRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace App.Modules.Bookings.Data;




public interface IBookingCdcRepository
{
Task Add(string type);
}

public class BookingCdcRepository
{

}
142 changes: 142 additions & 0 deletions App/Modules/Common/OtelRedis.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using System.Diagnostics;
using System.Text.Json;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using StackExchange.Redis;
using StackExchange.Redis.Extensions.Core.Abstractions;

namespace App.Modules.Common;

public record OtelRedisMessage<T>(Dictionary<string, string> Context, T Message);

public class OtelRedisDatabase(IRedisDatabase redis)
{

private static readonly TextMapPropagator Propagator = new TraceContextPropagator();

private static readonly JsonSerializerOptions SerializeOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};

private static readonly ActivitySource ActivitySource = new("AutoTrace.Redis.Shim");


public async Task<long> PublishAsync<T>(RedisChannel channel, T message, CommandFlags flag = CommandFlags.None)
{
var carrier = new Dictionary<string, string>();

using var activity = ActivitySource.StartActivity();

if (activity != null) Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), carrier, (c, k, v) => c[k] = v);

var otelMessage = new OtelRedisMessage<T>(carrier, message);
return await redis.PublishAsync(channel, otelMessage, flag);
}

public async Task SubscribeAsync<T>(RedisChannel channel, Func<T?, Task> handler,
CommandFlags flag = CommandFlags.None)
{
await redis.SubscribeAsync<OtelRedisMessage<T>>(channel, async m =>
{
var carrier = m?.Context ?? [];

var parentContext = Propagator.Extract(default, carrier,
(c, k) => c.TryGetValue(k, out var v) ? new[] { v } : Enumerable.Empty<string>());
Baggage.Current = parentContext.Baggage;

using var activity = ActivitySource.StartActivity();
var message = m == null ? default : m.Message;
await handler(message);
}, flag);
}

public RedisValue StreamAdd<T>(RedisKey key, T message, RedisValue? messageId = null, int? maxLength = null,
bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None)
{
var carrier = new Dictionary<string, string>();

using var activity = ActivitySource.StartActivity();
if (activity != null)
Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), carrier, (c, k, v) => c[k] = v);

var contextJson = JsonSerializer.Serialize(carrier, SerializeOptions);
var messageJson = JsonSerializer.Serialize(message, SerializeOptions);

NameValueEntry[] nameValueEntry = [
new NameValueEntry("context", contextJson),
new NameValueEntry("message", messageJson),
];

return redis.Database.StreamAdd(key, nameValueEntry,
messageId, maxLength, useApproximateMaxLength, flags);
}

public async Task StreamRead<T>(RedisKey key, RedisValue position, Func<T?, Task> handler, int? count = null,
CommandFlags flags = CommandFlags.None)
{
var entries = redis.Database.StreamRead(key, position, count, flags);
var messages = entries.Select(x => this.FromNameValueEntry<T>(x.Values));

foreach (var otelRedisMessage in messages)
{

var carrier = otelRedisMessage?.Context ?? [];
var parentContext = Propagator.Extract(default, carrier,
(c, k) => c.TryGetValue(k, out var v) ? new[] { v } : Enumerable.Empty<string>());
Baggage.Current = parentContext.Baggage;

using var activity = ActivitySource.StartActivity();

var m = otelRedisMessage == null ? default : otelRedisMessage.Message;
await handler(m);
}
}

public async Task StreamReadGroup<T>(RedisKey key, RedisValue groupName, RedisValue consumerName,
Func<T?, Task> handler, RedisValue? position = null, int? count = null, bool noAck = false,
CommandFlags flags = CommandFlags.None)
{
var entries = redis.Database.StreamReadGroup(key, groupName, consumerName, position, count, flags);
var messages = entries.Select(x => this.FromNameValueEntry<T>(x.Values));

foreach (var otelRedisMessage in messages)
{
var carrier = otelRedisMessage?.Context ?? [];
var parentContext = Propagator.Extract(default, carrier,
(c, k) => c.TryGetValue(k, out var v) ? new[] { v } : Enumerable.Empty<string>());
Baggage.Current = parentContext.Baggage;

using var activity = ActivitySource.StartActivity();
var m = otelRedisMessage == null ? default : otelRedisMessage.Message;
await handler(m);
}
}


private NameValueEntry[] ToNameValueEntry<T>(T message)
{
var carrier = new Dictionary<string, string>();
var contextJson = JsonSerializer.Serialize(carrier, SerializeOptions);
var messageJson = JsonSerializer.Serialize(message, SerializeOptions);
NameValueEntry[] nameValueEntry =
[
new NameValueEntry("context", contextJson),
new NameValueEntry("message", messageJson),
];
return nameValueEntry;
}

private OtelRedisMessage<T>? FromNameValueEntry<T>(NameValueEntry[] nameValueEntry)
{
string? contextJson = nameValueEntry.FirstOrDefault(x => x.Name == "context").Value;
string? messageJson = nameValueEntry.FirstOrDefault(x => x.Name == "message").Value;
if (contextJson == null || messageJson == null) return null;

var context = JsonSerializer.Deserialize<Dictionary<string, string>>(contextJson, SerializeOptions);
var message = JsonSerializer.Deserialize<T>(messageJson, SerializeOptions);
if (context == null || message == null) return null;

return new OtelRedisMessage<T>(context, message);
}
}
4 changes: 3 additions & 1 deletion App/Modules/System/SystemController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
using Asp.Versioning;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using StackExchange.Redis.Extensions.Core.Abstractions;

namespace App.Modules.System;

[ApiVersionNeutral]
[ApiController]
[Route("/")]
public class SystemController(IOptionsSnapshot<AppOption> app, IAuthHelper h) : AtomiControllerBase(h)
public class SystemController(IRedisClientFactory factory, IOptionsSnapshot<AppOption> app, IAuthHelper h)
: AtomiControllerBase(h)
{
[HttpGet]
public ActionResult<object> SystemInfo()
Expand Down
3 changes: 1 addition & 2 deletions App/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
"launchUrl": "swagger",
"hotReloadEnabled": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"LANDSCAPE": "lapras"
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ tasks:
LANDSCAPE: lapras
cmds:
- dotnet ef migrations --project ./App add {{.CLI_ARGS}}
run:
desc: Runs local .NET Server
env:
LANDSCAPE: corsola
cmds:
- dotnet run --project App
setup:
desc: Setups dotnet
cmds:
Expand Down
2 changes: 1 addition & 1 deletion infra/api_chart/app/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ AllowedHosts: "*"
Kestrel:
Endpoints:
Http:
Url: http://+:9001
Url: http://+:9002
Logging:
LogLevel:
Default: Information
Expand Down
2 changes: 1 addition & 1 deletion infra/migrate.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ WORKDIR /app
RUN dotnet tool install --global dotnet-ef
ENV PATH="$PATH:/home/dotnet/.dotnet/tools"
ENV LANDSCAPE=lapras
RUN dotnet-ef migrations bundle --project ./App
RUN dotnet-ef migrations bundle --project ./App
CMD [ "./efbundle" ]
2 changes: 1 addition & 1 deletion infra/migration_chart/app/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ AllowedHosts: "*"
Kestrel:
Endpoints:
Http:
Url: http://+:9001
Url: http://+:9002
Logging:
LogLevel:
Default: Information
Expand Down

0 comments on commit 4606ef1

Please sign in to comment.