Skip to content

Commit

Permalink
Merge pull request #3 from byerlikaya/refactor-and-improvements
Browse files Browse the repository at this point in the history
Refactor and improvements
  • Loading branch information
byerlikaya authored Jun 25, 2024
2 parents d4c1b36 + 914a335 commit ed67619
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 53 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ PM> Install-Package Basic.RabbitMQ
2. Add services.AddRabbitMQClient(Configuration);

```csharp
builder.Services.AddRabbitMQClient(Configuration);
builder.Services.AddRabbitMqClient(Configuration);
```

3. Add the necessary information to the `appsettings.json` file.
Expand Down
2 changes: 1 addition & 1 deletion sample/Consumer.Sample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ public Startup()
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton(Configuration);
services.AddRabbitMQClient(Configuration);
services.AddRabbitMqClient(Configuration);
}
}
11 changes: 8 additions & 3 deletions sample/Producer.Sample.Api/Controllers/ProducerController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ namespace Producer.Sample.Api.Controllers;
public class ProducerController(IMessageProducer messageProducer) : ControllerBase
{
[HttpPost("/sendMessage")]
public Task<IActionResult> SendEmail(string message)
public async Task<IActionResult> SendMessage(string message)
{
messageProducer.SendMessage("Test_Queue", "Test_Routing_Key", message);
return Task.FromResult<IActionResult>(Ok());
await Parallel.ForAsync(0, 10000, (x, _) =>
{
messageProducer.SendMessage("Test_Queue", "Test_Routing_Key", $"{message}-{x}");
return default;
});

return Ok();
}
}
2 changes: 1 addition & 1 deletion sample/Producer.Sample.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

var configuration = builder.Configuration;

builder.Services.AddRabbitMQClient(configuration);
builder.Services.AddRabbitMqClient(configuration, ServiceLifetime.Scoped);

var app = builder.Build();

Expand Down
2 changes: 1 addition & 1 deletion src/Basic.RabbitMQ/Basic.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<Description>.NET Core library that simplifies RabbitMQ usage and works with the Direct Exchange type.</Description>
<PackageId>Basic.RabbitMQ</PackageId>
<Version>1.1.1</Version>
<Version>2.0.0</Version>
<Product>Basic.RabbitMQ</Product>
<LangVersion>preview</LangVersion>
</PropertyGroup>
Expand Down
60 changes: 50 additions & 10 deletions src/Basic.RabbitMQ/Extensions/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,22 +1,62 @@
namespace Basic.RabbitMQ.Extensions;

// ReSharper disable once UnusedType.Global
public static class ServiceCollectionExtensions
{
public static void AddRabbitMQClient(this IServiceCollection services, IConfiguration configuration)
{
var brokerOptions = configuration.GetSection(nameof(MessageBrokerOptions)).Get<MessageBrokerOptions>();
// ReSharper disable once UnusedMember.Global
public static void AddRabbitMqClient(
this IServiceCollection services,
IConfiguration configuration,
ServiceLifetime messageConsumerServiceLifetime = ServiceLifetime.Singleton) =>
CreateServices(services, configuration.GetSection(nameof(MessageBrokerOptions)).Get<MessageBrokerOptions>(), messageConsumerServiceLifetime);

// ReSharper disable once UnusedMember.Global
public static void AddRabbitMqClient(
this IServiceCollection services,
MessageBrokerOptions messageBrokerOptions,
ServiceLifetime messageConsumerServiceLifetime = ServiceLifetime.Singleton) =>
CreateServices(services, messageBrokerOptions, messageConsumerServiceLifetime);

private static void CreateServices(
IServiceCollection services,
MessageBrokerOptions messageBrokerOptions,
ServiceLifetime messageConsumerServiceLifetime)
{
services.AddSingleton(_ => new ConnectionFactory
{
HostName = brokerOptions.HostName,
UserName = brokerOptions.Username,
Password = brokerOptions.Password,
VirtualHost = brokerOptions.VirtualHost,
DispatchConsumersAsync = true
HostName = messageBrokerOptions.HostName,
UserName = messageBrokerOptions.Username,
Password = messageBrokerOptions.Password,
VirtualHost = messageBrokerOptions.VirtualHost,
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(30)
});

services.AddSingleton<RabbitMQClientService>();
services.AddSingleton<IMessageProducer, MessageProducer>();
services.AddSingleton<RabbitMqClientService>();
services.AddSingleton<IMessageConsumer, MessageConsumer>();

CreateConsumerService(services, messageConsumerServiceLifetime);
}

private static void CreateConsumerService(
IServiceCollection services,
ServiceLifetime messageConsumerServiceLifetime)
{
switch (messageConsumerServiceLifetime)
{
case ServiceLifetime.Singleton:
services.AddSingleton<IMessageProducer, MessageProducer>();
break;
case ServiceLifetime.Scoped:
services.AddScoped<IMessageProducer, MessageProducer>();
break;
case ServiceLifetime.Transient:
services.AddTransient<IMessageProducer, MessageProducer>();
break;
default:
services.AddSingleton<IMessageProducer, MessageProducer>();
break;
}
}
}
2 changes: 1 addition & 1 deletion src/Basic.RabbitMQ/Interfaces/IMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public interface IMessageConsumer
{
IModel Channel(string queueName, string routingKey);
IModel Channel(string queueName, string routingKey, ushort prefetchCount = 1);

AsyncEventingBasicConsumer GetConsumer(IModel channel);
}
11 changes: 6 additions & 5 deletions src/Basic.RabbitMQ/MessageConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
namespace Basic.RabbitMQ;

public class MessageConsumer(RabbitMQClientService rabbitMqClientService) : IMessageConsumer
public class MessageConsumer(RabbitMqClientService rabbitMqClientService) : IMessageConsumer
{
public IModel Channel(string queueName, string routingKey)
public IModel Channel(string queueName, string routingKey, ushort prefetchCount = 1)
{
var channel = rabbitMqClientService.Connect(queueName);

channel.QueueBind(
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
queue: queueName,
routingKey: routingKey);
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
routingKey: routingKey,
arguments: null);

channel.BasicQos(0, 10, false);
channel.BasicQos(0, prefetchCount, false);

return channel;
}
Expand Down
24 changes: 17 additions & 7 deletions src/Basic.RabbitMQ/MessageProducer.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
namespace Basic.RabbitMQ;

public class MessageProducer(RabbitMQClientService rabbitMqClientService) : IMessageProducer
public class MessageProducer(RabbitMqClientService rabbitMqClientService, ConnectionFactory connectionFactory)
: IMessageProducer, IDisposable
{
private readonly IConnection _connection = connectionFactory.CreateConnection();

public void SendMessage(string queueName, string routingKey, string message)
{
using var channel = rabbitMqClientService.Connect(queueName);
using var channel = rabbitMqClientService.Connect(_connection, queueName);
channel.QueueBind(
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
queue: queueName,
routingKey: routingKey);
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
routingKey: routingKey,
arguments: null);

var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);

var properties = channel.CreateBasicProperties();

properties.Persistent = true;
properties.DeliveryMode = 2;

channel.BasicPublish(
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
Expand All @@ -25,23 +31,27 @@ public void SendMessage(string queueName, string routingKey, string message)

public void SendMessage<T>(string queueName, string routingKey, T message)
{
using var channel = rabbitMqClientService.Connect(queueName);
using var channel = rabbitMqClientService.Connect(_connection, queueName);
channel.QueueBind(
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
queue: queueName,
routingKey: routingKey);
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
routingKey: routingKey,
arguments: null);

var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);

var properties = channel.CreateBasicProperties();

properties.Persistent = true;
properties.DeliveryMode = 2;

channel.BasicPublish(
exchange: rabbitMqClientService.BrokerOptions.ExchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
}

public void Dispose() => _connection?.Dispose();
}
38 changes: 15 additions & 23 deletions src/Basic.RabbitMQ/Services/RabbitMQClientService.cs
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
namespace Basic.RabbitMQ.Services;

// ReSharper disable once InconsistentNaming
public class RabbitMQClientService(IConfiguration configuration, ConnectionFactory connectionFactory) : IDisposable
public class RabbitMqClientService(IConfiguration configuration, ConnectionFactory connectionFactory)
{
public readonly MessageBrokerOptions BrokerOptions = configuration.GetSection(nameof(MessageBrokerOptions)).Get<MessageBrokerOptions>();

private IConnection _connection;
private IModel _channel;

public IModel Connect(string queueName)
{
if (_channel is { IsOpen: true } && _channel.CurrentQueue == queueName)
return _channel;

if (_connection is not { IsOpen: true })
_connection = connectionFactory.CreateConnection();
return CreateChannel(_connection, queueName);
}

_channel = _connection.CreateModel();
public IModel Connect(
IConnection connection,
string queueName) => CreateChannel(connection, queueName);

private IModel CreateChannel(IConnection connection, string queueName)
{
var channel = connection.CreateModel();

_channel.ExchangeDeclare(
channel.ExchangeDeclare(
exchange: BrokerOptions.ExchangeName,
type: "direct",
durable: true,
autoDelete: false);
type: ExchangeType.Direct);

_channel.QueueDeclare(
channel.QueueDeclare(
queue: queueName,
durable: true,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

return _channel;
}

public void Dispose()
{
_channel?.Close();
_channel?.Dispose();

_connection?.Close();
_connection?.Dispose();
return channel;
}
}

0 comments on commit ed67619

Please sign in to comment.