From 855f15ad29c3cfc5b463f70b04a81d5ceb8c70a4 Mon Sep 17 00:00:00 2001 From: Gianluca Colombo Date: Tue, 19 Nov 2024 16:33:30 +0100 Subject: [PATCH] asb worker refinement --- .../ASureBus/Core/AsbWorkerTests.cs | 222 ++++++++++++++++++ ASureBus/Core/AsbWorker.cs | 14 +- 2 files changed, 229 insertions(+), 7 deletions(-) create mode 100644 ASureBus.Tests/ASureBus/Core/AsbWorkerTests.cs diff --git a/ASureBus.Tests/ASureBus/Core/AsbWorkerTests.cs b/ASureBus.Tests/ASureBus/Core/AsbWorkerTests.cs new file mode 100644 index 0000000..0324664 --- /dev/null +++ b/ASureBus.Tests/ASureBus/Core/AsbWorkerTests.cs @@ -0,0 +1,222 @@ +using Moq; +using ASureBus.Core; +using ASureBus.Core.Caching; +using ASureBus.Core.Messaging; +using ASureBus.Core.Sagas; +using ASureBus.Core.TypesHandling; +using ASureBus.Core.TypesHandling.Entities; +using ASureBus.Services.ServiceBus; +using Azure.Messaging.ServiceBus; +using Microsoft.Extensions.Hosting; + +namespace ASureBus.Tests.ASureBus.Core; + +[TestFixture] +public class AsbWorkerTests +{ + [Test] + public async Task StartAsync_StartsAllProcessors_ForHandlers() + { + // Arrange + var mockProcessor = new Mock(); + var mockAzureServiceBusService = new Mock(); + var mockTypesLoader = new Mock(); + + var anHandler = new HandlerType() + { + MessageType = new MessageType() + { + IsCommand = true, + Type = typeof(object) + }, + Type = typeof(object) + }; + + var handlerSet = new[] + { + anHandler + }.ToHashSet(); + + mockTypesLoader.Setup(t => t.Handlers).Returns(handlerSet); + mockTypesLoader.Setup(t => t.Sagas).Returns(Array.Empty().ToHashSet()); + + mockAzureServiceBusService + .Setup(a => a.GetProcessor(handlerSet.FirstOrDefault()!, It.IsAny())) + .ReturnsAsync(mockProcessor.Object); + + var worker = new AsbWorker( + Mock.Of(), + Mock.Of(), + mockAzureServiceBusService.Object, + Mock.Of(), + mockTypesLoader.Object, + Mock.Of(), + Mock.Of()); + + // Act + await worker.StartAsync(CancellationToken.None); + + // Assert + mockProcessor.Verify(p => p.StartProcessingAsync(It.IsAny()), + Times.Once); + } + + [Test] + public async Task StartAsync_StartsAllProcessors_ForSagas() + { + // Arrange + var mockProcessor = new Mock(); + var mockAzureServiceBusService = new Mock(); + var mockTypesLoader = new Mock(); + + var sagaHandlerType = new SagaHandlerType() + { + IsInitMessageHandler = true, + MessageType = new MessageType() + { + IsCommand = true, + Type = typeof(object) + } + }; + + var aSaga = new SagaType() + { + Type = typeof(object), + Listeners = new[] + { + sagaHandlerType + }.ToHashSet() + }; + + var sagaSet = new[] + { + aSaga + }.ToHashSet(); + + mockTypesLoader.Setup(t => t.Handlers).Returns(Array.Empty().ToHashSet()); + mockTypesLoader.Setup(t => t.Sagas).Returns(sagaSet); + + mockAzureServiceBusService + .Setup(a => a.GetProcessor(sagaHandlerType, It.IsAny())) + .ReturnsAsync(mockProcessor.Object); + + var worker = new AsbWorker( + Mock.Of(), + Mock.Of(), + mockAzureServiceBusService.Object, + Mock.Of(), + mockTypesLoader.Object, + Mock.Of(), + Mock.Of()); + + // Act + await worker.StartAsync(CancellationToken.None); + + // Assert + mockProcessor.Verify(p => p.StartProcessingAsync(It.IsAny()), + Times.Once); + } + + [Test] + public async Task StopAsync_StopsAndDisposesAllProcessors_ForHandlers() + { + // Arrange + var mockProcessor = new Mock(); + var mockAzureServiceBusService = new Mock(); + var mockTypesLoader = new Mock(); + + var anHandler = new HandlerType() + { + MessageType = new MessageType() + { + IsCommand = true, + Type = typeof(object) + }, + Type = typeof(object) + }; + + var handlerSet = new[] + { + anHandler + }.ToHashSet(); + + mockTypesLoader.Setup(t => t.Handlers).Returns(handlerSet); + mockTypesLoader.Setup(t => t.Sagas).Returns(Array.Empty().ToHashSet()); + + mockAzureServiceBusService + .Setup(a => a.GetProcessor(handlerSet.FirstOrDefault()!, It.IsAny())) + .ReturnsAsync(mockProcessor.Object); + + var worker = new AsbWorker( + Mock.Of(), + Mock.Of(), + mockAzureServiceBusService.Object, + Mock.Of(), + mockTypesLoader.Object, + Mock.Of(), + Mock.Of()); + + // Act + await worker.StartAsync(CancellationToken.None); + await worker.StopAsync(CancellationToken.None); + + // Assert + mockProcessor.Verify(p => p.StopProcessingAsync(It.IsAny()), Times.Once); + } + + [Test] + public async Task StopAsync_StopsAndDisposesAllProcessors_ForSagas() + { + // Arrange + var mockProcessor = new Mock(); + var mockAzureServiceBusService = new Mock(); + var mockTypesLoader = new Mock(); + + var sagaHandlerType = new SagaHandlerType() + { + IsInitMessageHandler = true, + MessageType = new MessageType() + { + IsCommand = true, + Type = typeof(object) + } + }; + + var aSaga = new SagaType() + { + Type = typeof(object), + Listeners = new[] + { + sagaHandlerType + }.ToHashSet() + }; + + var sagaSet = new[] + { + aSaga + }.ToHashSet(); + + mockTypesLoader.Setup(t => t.Handlers).Returns(Array.Empty().ToHashSet()); + mockTypesLoader.Setup(t => t.Sagas).Returns(sagaSet); + + mockAzureServiceBusService + .Setup(a => a.GetProcessor(sagaHandlerType, It.IsAny())) + .ReturnsAsync(mockProcessor.Object); + + var worker = new AsbWorker( + Mock.Of(), + Mock.Of(), + mockAzureServiceBusService.Object, + Mock.Of(), + mockTypesLoader.Object, + Mock.Of(), + Mock.Of()); + + // Act + await worker.StartAsync(CancellationToken.None); + await worker.StopAsync(CancellationToken.None); + + // Assert + mockProcessor.Verify(p => p.StopProcessingAsync(It.IsAny()), Times.Once); + } +} \ No newline at end of file diff --git a/ASureBus/Core/AsbWorker.cs b/ASureBus/Core/AsbWorker.cs index b82f07a..a50634c 100644 --- a/ASureBus/Core/AsbWorker.cs +++ b/ASureBus/Core/AsbWorker.cs @@ -85,9 +85,9 @@ public AsbWorker( public async Task StartAsync(CancellationToken cancellationToken) { - foreach (var processorKvp in _processors) + foreach (var processor in _processors.Values) { - await processorKvp.Value + await processor .StartProcessingAsync(cancellationToken) .ConfigureAwait(false); } @@ -95,13 +95,13 @@ await processorKvp.Value public async Task StopAsync(CancellationToken cancellationToken) { - foreach (var processorKvp in _processors) + foreach (var processor in _processors.Values) { - await processorKvp.Value + await processor .StopProcessingAsync(cancellationToken) .ConfigureAwait(false); - await processorKvp.Value + await processor .DisposeAsync() .ConfigureAwait(false); } @@ -154,7 +154,7 @@ private async Task ProcessError(SagaType sagaType, SagaHandlerType listenerType, var broker = BrokerFactory.Get(_serviceProvider, sagaType, implSaga, listenerType, correlationId); - await broker.HandleError(ex?.OriginalException, args.CancellationToken) + await broker.HandleError(ex?.OriginalException!, args.CancellationToken) .ConfigureAwait(false); } @@ -197,7 +197,7 @@ await _messageEmitter.FlushAll(broker.Collector, args.CancellationToken) /* * exception caught here is always TargetInvocationException * since every saga's handle method is called by reflection - * and the actual exception should be stored in InnerException + * the actual exception should be stored in InnerException */ if (ex.InnerException is AsbException) throw;