Skip to content

Commit

Permalink
Merge pull request #2589 from Particular/handle-retry-succeeded-notif…
Browse files Browse the repository at this point in the history
…ications

Handle retry succeeded notifications
  • Loading branch information
SzymonPobiega authored Jul 14, 2021
2 parents 2c11e58 + 9498d04 commit 747bf6c
Show file tree
Hide file tree
Showing 59 changed files with 1,209 additions and 443 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public FailingEndpoint()
EndpointSetup<DefaultServer>(c =>
{
c.EnableFeature<Outbox>();

c.DisableFeature<PlatformRetryNotifications>();
var recoverability = c.Recoverability();
recoverability.Immediate(s => s.NumberOfRetries(1));
recoverability.Delayed(s => s.NumberOfRetries(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Infrastructure;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Features;
using NServiceBus.Settings;
using NUnit.Framework;
using ServiceControl.MessageFailures;
Expand Down Expand Up @@ -63,6 +64,8 @@ public Failing()
{
EndpointSetup<DefaultServer>(c =>
{
//Do not inform SC that the message has been already successfully handled
c.DisableFeature<PlatformRetryNotifications>();
c.NoRetries();
c.NoOutbox();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Infrastructure;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Features;
using NServiceBus.Settings;
using NUnit.Framework;
using ServiceControl.MessageFailures;
Expand Down Expand Up @@ -55,6 +56,8 @@ public FailingEndpoint()
{
EndpointSetup<DefaultServer>(c =>
{
//Do not inform SC that the message has been already successfully handled
c.DisableFeature<PlatformRetryNotifications>();
c.NoRetries();
c.NoOutbox();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Infrastructure;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Features;
using NServiceBus.Settings;
using NUnit.Framework;
using ServiceControl.MessageFailures;
Expand Down Expand Up @@ -51,6 +52,8 @@ public FailingEndpoint()
{
EndpointSetup<DefaultServer>(c =>
{
//Do not inform SC that the message has been already successfully handled
c.DisableFeature<PlatformRetryNotifications>();
c.NoRetries();
c.NoOutbox();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Infrastructure;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Features;
using NServiceBus.Settings;
using NUnit.Framework;
using ServiceControl.MessageFailures;
Expand Down Expand Up @@ -54,6 +55,8 @@ public Failing()
{
EndpointSetup<DefaultServer>(c =>
{
//Do not inform SC that the message has been already successfully handled
c.DisableFeature<PlatformRetryNotifications>();
c.NoRetries();
c.NoOutbox();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,14 @@ public FakeReturnToSender(IBodyStorage bodyStorage, IDocumentStore documentStore
this.myContext = myContext;
}

public override Task HandleMessage(MessageContext message, IDispatchMessages sender)
public override Task HandleMessage(MessageContext message, IDispatchMessages sender, string errorQueueTransportAddress)
{
if (message.Headers[Headers.MessageId] == myContext.DecommissionedEndpointMessageId)
{
throw new Exception("This endpoint is unreachable");
}

return base.HandleMessage(message, sender);
return base.HandleMessage(message, sender, "error");
}

MyContext myContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
namespace ServiceControl.AcceptanceTests.Recoverability
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;
using ServiceControl.MessageFailures;
using ServiceControl.MessageFailures.Api;
using TestSupport.EndpointTemplates;

[RunOnAllTransports]
class When_retry_is_confirmed : AcceptanceTest
{
[Test]
public async Task Should_mark_message_as_successfully_resolved()
{
var context = await Define<Context>()
.WithEndpoint<RetryingEndpoint>(b => b
.DoNotFailOnErrorMessages()
.When(s => s.SendLocal(new RetryMessage())))
.Do("Wait for failed message", async c =>
{
// wait till the failed message has been ingested
var tryGetMany = await this.TryGetMany<FailedMessageView>("/api/errors");
return tryGetMany;
})
.Do("Retry message", async c =>
{
//trigger retry
await this.Post<object>("/api/errors/retry/all");
})
.Do("Wait for retry confirmation", async c =>
{
if (!c.ReceivedRetry)
{
// wait till endpoint processed the retried message
return false;
}

var failedMessages = await this.TryGetMany<FailedMessageView>("/api/errors");
if (failedMessages.Items.Any(i => i.Status == FailedMessageStatus.Resolved))
{
c.MessagesView = failedMessages.Items;
return true;
}

return false;
})
.Done(c => true)
.Run();

Assert.AreEqual(1, context.MessagesView.Count);
var failedMessage = context.MessagesView.Single();
Assert.AreEqual(FailedMessageStatus.Resolved, failedMessage.Status);
Assert.AreEqual(1, failedMessage.NumberOfProcessingAttempts);
}

class Context : ScenarioContext, ISequenceContext
{
public bool ThrowOnHandler { get; set; } = true;
public bool ReceivedRetry { get; set; }
public int Step { get; set; }
public List<FailedMessageView> MessagesView { get; set; }
}

class RetryingEndpoint : EndpointConfigurationBuilder
{
public RetryingEndpoint()
{
EndpointSetup<DefaultServer>(c => c.NoRetries());
}

class RetryMessageHandler : IHandleMessages<RetryMessage>
{
Context testContext;

public RetryMessageHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(RetryMessage message, IMessageHandlerContext context)
{
if (testContext.ThrowOnHandler)
{
testContext.ThrowOnHandler = false;
throw new Exception("boom");
}

testContext.ReceivedRetry = true;
return Task.CompletedTask;
}
}
}

class RetryMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,34 +94,36 @@ async Task InitializeServiceControl(ScenarioContext context)
RunInMemory = true,
DisableHealthChecks = true,
ExposeApi = true,
OnMessage = (id, headers, body, @continue) =>
MessageFilter = messageContext =>
{
var headers = messageContext.Headers;
var id = messageContext.MessageId;
var log = LogManager.GetLogger<ServiceControlComponentRunner>();
headers.TryGetValue(Headers.MessageId, out var originalMessageId);
log.Debug($"OnMessage for message '{id}'({originalMessageId ?? string.Empty}).");

//Do not filter out CC, SA and HB messages as they can't be stamped
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypes)
&& messageTypes.StartsWith("ServiceControl."))
&& (messageTypes.StartsWith("ServiceControl.Contracts") || messageTypes.StartsWith("ServiceControl.EndpointPlugin")))
{
return @continue();
return false;
}

//Do not filter out subscribe messages as they can't be stamped
if (headers.TryGetValue(Headers.MessageIntent, out var intent)
&& intent == MessageIntentEnum.Subscribe.ToString())
{
return @continue();
return false;
}

var currentSession = context.TestRunId.ToString();
if (!headers.TryGetValue("SC.SessionID", out var session) || session != currentSession)
{
log.Debug($"Discarding message '{id}'({originalMessageId ?? string.Empty}) because it's session id is '{session}' instead of '{currentSession}'.");
return Task.FromResult(0);
return true;
}

return @continue();
return false;
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
namespace ServiceControl.Audit.AcceptanceTests.Recoverability
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Features;
using NServiceBus.Pipeline;
using NUnit.Framework;
using TestSupport.EndpointTemplates;
using Conventions = NServiceBus.AcceptanceTesting.Customization.Conventions;

[RunOnAllTransports]
class When_a_successful_retry_at_old_endpoint_is_detected : AcceptanceTest
{
[Test]
public async Task Should_send_acknowledgement()
{
var failedMessageId = Guid.NewGuid().ToString();
var context = await Define<Context>()
.WithEndpoint<AcknowledgementSpy>()
.WithEndpoint<Receiver>(b => b.When(s =>
{
var options = new SendOptions();

options.SetHeader("ServiceControl.Retry.UniqueMessageId", failedMessageId);
options.SetHeader("ServiceControl.Retry.AcknowledgementQueue", Conventions.EndpointNamingConvention(typeof(AcknowledgementSpy)));
options.RouteToThisEndpoint();
return s.Send(new MyMessage(), options);
}))
.Done(c => c.AcknowledgementSent)
.Run();

Assert.IsTrue(context.AcknowledgementSent);
}

public class Context : ScenarioContext
{
public bool AcknowledgementSent { get; set; }
}

public class AcknowledgementSpy : EndpointConfigurationBuilder
{
public AcknowledgementSpy()
{
EndpointSetup<DefaultServerWithoutAudit>(cfg =>
{
cfg.Pipeline.Register(b => new SpyBehavior(b.Build<Context>()), "Spy behavior");
cfg.Pipeline.Remove("DiscardMessagesBehavior"); //Otherwise the confirmation generated by the audit instance will be discarded
});
}

public class SpyBehavior : Behavior<ITransportReceiveContext>
{
Context scenarioContext;

public SpyBehavior(Context scenarioContext)
{
this.scenarioContext = scenarioContext;
}

public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
if (context.Message.Headers.ContainsKey("ServiceControl.Retry.Successful"))
{
scenarioContext.AcknowledgementSent = true;
}
return Task.CompletedTask;
}
}
}

public class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServerWithAudit>(cfg =>
{
// disable retry notifications feature to emulate an NServiceBus endpoint older than version 7.5
cfg.DisableFeature<PlatformRetryNotifications>();
});
}

public class MyMessageHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
return Task.FromResult(0);
}
}
}

public class MyMessage : ICommand
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using Audit.Auditing.MessagesView;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;
using TestSupport;
using TestSupport.EndpointTemplates;

class When_a_successful_retry_is_detected : AcceptanceTest
[RunOnAllTransports]
class When_a_successful_retry_from_old_SC_is_detected : AcceptanceTest
{
[Test]
public async Task Should_raise_integration_event()
Expand All @@ -28,18 +28,7 @@ public async Task Should_raise_integration_event()
options.RouteToThisEndpoint();
return s.Send(new MyMessage(), options);
}))
.Done(async c =>
{
var result = await this.TryGetSingle<MessagesView>("/api/messages", m =>
{
var storedMessageId = m.Headers.Select(kv => kv.Value.ToString())
.FirstOrDefault(v => v == failedMessageId);

return storedMessageId == failedMessageId;
});

return result.HasResult;
})
.Done(c => c.SentMarkMessageFailureResolvedByRetriesCommands.Any())
.Run();

var command = context.SentMarkMessageFailureResolvedByRetriesCommands.Single();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ public class MyContext : ScenarioContext
public bool Saga2Complete { get; set; }
public Guid Saga1Id { get; set; }
public Guid Saga2Id { get; set; }
public string Messages { get; set; }
}
}
}
}
Loading

0 comments on commit 747bf6c

Please sign in to comment.