Skip to content

Commit

Permalink
feat: adding consumer from assembly
Browse files Browse the repository at this point in the history
  • Loading branch information
stormaref committed Apr 20, 2024
1 parent 5a5ecfc commit 554f183
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 6 deletions.
14 changes: 14 additions & 0 deletions KafkaStorm.Test/AddingConsumerFromAssemblyTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using KafkaStorm.Registration;
using KafkaStorm.Test.TestConsumers;

namespace KafkaStorm.Test;

public class AddingConsumerFromAssemblyTest : TestBase
{
[Fact]
public void ConsumerShouldHaveBeenAdded()
{
var result = ConsumerRegistrationFactory.ConsumerConfigs.TryGetValue(typeof(AutomatedConsumer).FullName!, out var config);
Assert.True(result);
}
}
9 changes: 7 additions & 2 deletions KafkaStorm.Test/TestBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Reflection;
using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;
using KafkaStorm.Registration;
using KafkaStorm.Test.TestConsumers;
using KafkaStorm.Test.TestEvents;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -40,9 +42,12 @@ private IServiceCollection ConfigureServices()
factory.AddConsumers(crf =>
{
crf.SetConsumingPeriod(5);

var config = new ConsumerConfig { BootstrapServers = "localhost:29092", GroupId = "TestGroup" };

crf.AddConsumer<HelloConsumer, HelloEvent>(config, "my-topic");

crf.AddConsumer<HelloConsumer, HelloEvent>(
new ConsumerConfig {BootstrapServers = "localhost:29092", GroupId = "TestGroup"}, "my-topic");
crf.AddConsumersFromAssembly(Assembly.GetExecutingAssembly(), config);
});
});
return collection;
Expand Down
13 changes: 13 additions & 0 deletions KafkaStorm.Test/TestConsumers/AutomatedConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using KafkaStorm.Interfaces;
using KafkaStorm.Test.TestEvents;

namespace KafkaStorm.Test.TestConsumers;

public class AutomatedConsumer : IConsumer<AutomatedEvent>
{
public Task Handle(AutomatedEvent message, CancellationToken cancellationToken)
{
Console.WriteLine(message.Title);
return Task.CompletedTask;
}
}
12 changes: 12 additions & 0 deletions KafkaStorm.Test/TestEvents/AutomatedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using KafkaStorm.Interfaces;

namespace KafkaStorm.Test.TestEvents;

public class AutomatedEvent : IMessage
{
public AutomatedEvent()
{
Title = "Automated";
}
public string Title { get; set; }
}
6 changes: 6 additions & 0 deletions KafkaStorm/Interfaces/IMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaStorm.Interfaces;

public interface IMessage
{

}
2 changes: 1 addition & 1 deletion KafkaStorm/KafkaStorm.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
<Authors>ArefAzizian</Authors>
<Version>8.0.6</Version>
<Version>8.1.0</Version>
<PackageId>KafKaStorm</PackageId>
<LangVersion>latest</LangVersion>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
Expand Down
1 change: 0 additions & 1 deletion KafkaStorm/Models/StoredMessage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using KafkaStorm.Exceptions;

namespace KafkaStorm.Models;
Expand Down
5 changes: 3 additions & 2 deletions KafkaStorm/Registration/ConsumerRegistrationFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace KafkaStorm.Registration;

public class ConsumerRegistrationFactory
{
internal static Dictionary<string, ConsumerConfig> ConsumerConfigs = null!;
public static Dictionary<string, ConsumerConfig> ConsumerConfigs = null!;
internal static Dictionary<string, string> ConsumerTopics = null!;
internal static int ConsumingPeriod = 10;
private readonly IServiceCollection _serviceCollection;
Expand All @@ -34,7 +34,7 @@ public void AddConsumer<TConsumer, TMessage>(ConsumerConfig config, string? topi
{
var topic = string.IsNullOrWhiteSpace(topicName) ? typeof(TMessage).Name : topicName;

var fullName = typeof(TConsumer).FullName;
var fullName = typeof(TConsumer).FullName!;
var succeeded = ConsumerConfigs.TryAdd(fullName, config) &&
ConsumerTopics.TryAdd(fullName, topic);

Expand All @@ -52,4 +52,5 @@ public void SetConsumingPeriod(int period = 10)
{
ConsumingPeriod = period;
}

}
43 changes: 43 additions & 0 deletions KafkaStorm/Registration/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Confluent.Kafka;
using KafkaStorm.Interfaces;

namespace KafkaStorm.Registration;

public static class Extensions
{
private static Type? GetConsumerType(Assembly assembly, Type messageType)
{
var iConsumerType = typeof(IConsumer<>).MakeGenericType(messageType);
return assembly.GetTypes().FirstOrDefault(t =>
t.GetInterfaces().Contains(iConsumerType) &&
t is { IsClass: true, IsVisible: true });
}

private static List<Type> GetMessageTypes(Assembly assembly)
{
return assembly.GetTypes().Where(t =>
t.IsClass &&
t.GetInterfaces().Contains(typeof(IMessage)))
.ToList();
}

public static void AddConsumersFromAssembly(this ConsumerRegistrationFactory crf, Assembly assembly,
ConsumerConfig config)
{
var messageTypes = GetMessageTypes(assembly);
var method = typeof(ConsumerRegistrationFactory).GetMethod("AddConsumer");
var methodInfos = (from messageType in messageTypes
let consumerType = GetConsumerType(assembly, messageType)
where consumerType != default
select method!.MakeGenericMethod(consumerType, messageType))
.ToList();
foreach (var generic in methodInfos)
{
generic.Invoke(crf, [config, null]);
}
}
}
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ builder.Services.AddKafkaStorm(factory =>

> It's the same ConsumerConfig as Confluent.Kafka
## New Feature 🎉
Adding consumers is even easier now:

```csharp
using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;

builder.Services.AddKafkaStorm(factory =>
{
factory.AddConsumers(crf =>
{
var config = new ConsumerConfig { BootstrapServers = "localhost:29092", GroupId = "TestGroup" };

//This line can add all consumers in the assembly with their according messages automatically
crf.AddConsumersFromAssembly(Assembly.GetExecutingAssembly(), config);
});
});
```

## Consuming
```csharp
using KafkaStorm.Interfaces;
Expand Down

0 comments on commit 554f183

Please sign in to comment.