Skip to content

Commit

Permalink
asb worker refinement
Browse files Browse the repository at this point in the history
  • Loading branch information
ggcol committed Nov 19, 2024
1 parent 53f3c79 commit 855f15a
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 7 deletions.
222 changes: 222 additions & 0 deletions ASureBus.Tests/ASureBus/Core/AsbWorkerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
using Moq;
using ASureBus.Core;
using ASureBus.Core.Caching;
using ASureBus.Core.Messaging;
using ASureBus.Core.Sagas;
using ASureBus.Core.TypesHandling;
using ASureBus.Core.TypesHandling.Entities;
using ASureBus.Services.ServiceBus;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Hosting;

namespace ASureBus.Tests.ASureBus.Core;

[TestFixture]
public class AsbWorkerTests
{
[Test]
public async Task StartAsync_StartsAllProcessors_ForHandlers()
{
// Arrange
var mockProcessor = new Mock<ServiceBusProcessor>();
var mockAzureServiceBusService = new Mock<IAzureServiceBusService>();
var mockTypesLoader = new Mock<ITypesLoader>();

var anHandler = new HandlerType()
{
MessageType = new MessageType()
{
IsCommand = true,
Type = typeof(object)
},
Type = typeof(object)
};

var handlerSet = new[]
{
anHandler
}.ToHashSet();

mockTypesLoader.Setup(t => t.Handlers).Returns(handlerSet);
mockTypesLoader.Setup(t => t.Sagas).Returns(Array.Empty<SagaType>().ToHashSet());

mockAzureServiceBusService
.Setup(a => a.GetProcessor(handlerSet.FirstOrDefault()!, It.IsAny<CancellationToken>()))
.ReturnsAsync(mockProcessor.Object);

var worker = new AsbWorker(
Mock.Of<IHostApplicationLifetime>(),
Mock.Of<IServiceProvider>(),
mockAzureServiceBusService.Object,
Mock.Of<IMessageEmitter>(),
mockTypesLoader.Object,
Mock.Of<IAsbCache>(),
Mock.Of<ISagaBehaviour>());

// Act
await worker.StartAsync(CancellationToken.None);

// Assert
mockProcessor.Verify(p => p.StartProcessingAsync(It.IsAny<CancellationToken>()),
Times.Once);
}

[Test]
public async Task StartAsync_StartsAllProcessors_ForSagas()
{
// Arrange
var mockProcessor = new Mock<ServiceBusProcessor>();
var mockAzureServiceBusService = new Mock<IAzureServiceBusService>();
var mockTypesLoader = new Mock<ITypesLoader>();

var sagaHandlerType = new SagaHandlerType()
{
IsInitMessageHandler = true,
MessageType = new MessageType()
{
IsCommand = true,
Type = typeof(object)
}
};

var aSaga = new SagaType()
{
Type = typeof(object),
Listeners = new[]
{
sagaHandlerType
}.ToHashSet()
};

var sagaSet = new[]
{
aSaga
}.ToHashSet();

mockTypesLoader.Setup(t => t.Handlers).Returns(Array.Empty<HandlerType>().ToHashSet());
mockTypesLoader.Setup(t => t.Sagas).Returns(sagaSet);

mockAzureServiceBusService
.Setup(a => a.GetProcessor(sagaHandlerType, It.IsAny<CancellationToken>()))
.ReturnsAsync(mockProcessor.Object);

var worker = new AsbWorker(
Mock.Of<IHostApplicationLifetime>(),
Mock.Of<IServiceProvider>(),
mockAzureServiceBusService.Object,
Mock.Of<IMessageEmitter>(),
mockTypesLoader.Object,
Mock.Of<IAsbCache>(),
Mock.Of<ISagaBehaviour>());

// Act
await worker.StartAsync(CancellationToken.None);

// Assert
mockProcessor.Verify(p => p.StartProcessingAsync(It.IsAny<CancellationToken>()),
Times.Once);
}

[Test]
public async Task StopAsync_StopsAndDisposesAllProcessors_ForHandlers()
{
// Arrange
var mockProcessor = new Mock<ServiceBusProcessor>();
var mockAzureServiceBusService = new Mock<IAzureServiceBusService>();
var mockTypesLoader = new Mock<ITypesLoader>();

var anHandler = new HandlerType()
{
MessageType = new MessageType()
{
IsCommand = true,
Type = typeof(object)
},
Type = typeof(object)
};

var handlerSet = new[]
{
anHandler
}.ToHashSet();

mockTypesLoader.Setup(t => t.Handlers).Returns(handlerSet);
mockTypesLoader.Setup(t => t.Sagas).Returns(Array.Empty<SagaType>().ToHashSet());

mockAzureServiceBusService
.Setup(a => a.GetProcessor(handlerSet.FirstOrDefault()!, It.IsAny<CancellationToken>()))
.ReturnsAsync(mockProcessor.Object);

var worker = new AsbWorker(
Mock.Of<IHostApplicationLifetime>(),
Mock.Of<IServiceProvider>(),
mockAzureServiceBusService.Object,
Mock.Of<IMessageEmitter>(),
mockTypesLoader.Object,
Mock.Of<IAsbCache>(),
Mock.Of<ISagaBehaviour>());

// Act
await worker.StartAsync(CancellationToken.None);
await worker.StopAsync(CancellationToken.None);

// Assert
mockProcessor.Verify(p => p.StopProcessingAsync(It.IsAny<CancellationToken>()), Times.Once);
}

[Test]
public async Task StopAsync_StopsAndDisposesAllProcessors_ForSagas()
{
// Arrange
var mockProcessor = new Mock<ServiceBusProcessor>();
var mockAzureServiceBusService = new Mock<IAzureServiceBusService>();
var mockTypesLoader = new Mock<ITypesLoader>();

var sagaHandlerType = new SagaHandlerType()
{
IsInitMessageHandler = true,
MessageType = new MessageType()
{
IsCommand = true,
Type = typeof(object)
}
};

var aSaga = new SagaType()
{
Type = typeof(object),
Listeners = new[]
{
sagaHandlerType
}.ToHashSet()
};

var sagaSet = new[]
{
aSaga
}.ToHashSet();

mockTypesLoader.Setup(t => t.Handlers).Returns(Array.Empty<HandlerType>().ToHashSet());
mockTypesLoader.Setup(t => t.Sagas).Returns(sagaSet);

mockAzureServiceBusService
.Setup(a => a.GetProcessor(sagaHandlerType, It.IsAny<CancellationToken>()))
.ReturnsAsync(mockProcessor.Object);

var worker = new AsbWorker(
Mock.Of<IHostApplicationLifetime>(),
Mock.Of<IServiceProvider>(),
mockAzureServiceBusService.Object,
Mock.Of<IMessageEmitter>(),
mockTypesLoader.Object,
Mock.Of<IAsbCache>(),
Mock.Of<ISagaBehaviour>());

// Act
await worker.StartAsync(CancellationToken.None);
await worker.StopAsync(CancellationToken.None);

// Assert
mockProcessor.Verify(p => p.StopProcessingAsync(It.IsAny<CancellationToken>()), Times.Once);
}
}
14 changes: 7 additions & 7 deletions ASureBus/Core/AsbWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,23 +85,23 @@ public AsbWorker(

public async Task StartAsync(CancellationToken cancellationToken)
{
foreach (var processorKvp in _processors)
foreach (var processor in _processors.Values)
{
await processorKvp.Value
await processor
.StartProcessingAsync(cancellationToken)
.ConfigureAwait(false);
}
}

public async Task StopAsync(CancellationToken cancellationToken)
{
foreach (var processorKvp in _processors)
foreach (var processor in _processors.Values)
{
await processorKvp.Value
await processor
.StopProcessingAsync(cancellationToken)
.ConfigureAwait(false);

await processorKvp.Value
await processor
.DisposeAsync()
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -154,7 +154,7 @@ private async Task ProcessError(SagaType sagaType, SagaHandlerType listenerType,
var broker = BrokerFactory.Get(_serviceProvider, sagaType, implSaga, listenerType,
correlationId);

await broker.HandleError(ex?.OriginalException, args.CancellationToken)
await broker.HandleError(ex?.OriginalException!, args.CancellationToken)
.ConfigureAwait(false);
}

Expand Down Expand Up @@ -197,7 +197,7 @@ await _messageEmitter.FlushAll(broker.Collector, args.CancellationToken)
/*
* exception caught here is always TargetInvocationException
* since every saga's handle method is called by reflection
* and the actual exception should be stored in InnerException
* the actual exception should be stored in InnerException
*/

if (ex.InnerException is AsbException) throw;
Expand Down

0 comments on commit 855f15a

Please sign in to comment.