diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/DatabaseSetup.cs b/src/ServiceControl.Audit.Persistence.RavenDB/DatabaseSetup.cs index 59675cc062..6010e227fd 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/DatabaseSetup.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/DatabaseSetup.cs @@ -46,6 +46,8 @@ await documentStore.Maintenance.Server new SagaDetailsIndex() }; + await DeleteLegacySagaDetailsIndex(documentStore, cancellationToken); + if (configuration.EnableFullTextSearch) { indexList.Add(new MessagesViewIndexWithFullTextSearch()); @@ -69,6 +71,22 @@ await documentStore.Maintenance await documentStore.Maintenance.SendAsync(new ConfigureExpirationOperation(expirationConfig), cancellationToken); } + public static async Task DeleteLegacySagaDetailsIndex(IDocumentStore documentStore, CancellationToken cancellationToken) + { + // If the SagaDetailsIndex exists but does not have a .Take(50000), then we remove the current SagaDetailsIndex and + // create a new one. If we do not remove the current one, then RavenDB will attempt to do a side-by-side migration. + // Doing a side-by-side migration results in the index never swapping if there is constant ingestion as RavenDB will wait. + // for the index to not be stale before swapping to the new index. Constant ingestion means the index will never be not-stale. + // This needs to stay in place until the next major version as the user could upgrade from an older version of the current + // Major (v5.x.x) which might still have the incorrect index. + var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex"); + var sagaDetailsIndexDefinition = await documentStore.Maintenance.SendAsync(sagaDetailsIndexOperation, cancellationToken); + if (sagaDetailsIndexDefinition != null && !sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)")) + { + await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex"), cancellationToken); + } + } + readonly DatabaseConfiguration configuration; } } diff --git a/src/ServiceControl.Audit.Persistence.RavenDB/Indexes/SagaDetailsIndex.cs b/src/ServiceControl.Audit.Persistence.RavenDB/Indexes/SagaDetailsIndex.cs index 05222d3f11..b6dec541a0 100644 --- a/src/ServiceControl.Audit.Persistence.RavenDB/Indexes/SagaDetailsIndex.cs +++ b/src/ServiceControl.Audit.Persistence.RavenDB/Indexes/SagaDetailsIndex.cs @@ -53,6 +53,7 @@ into g SagaType = first.SagaType, Changes = g.SelectMany(x => x.Changes) .OrderByDescending(x => x.FinishTime) + .Take(50000) .ToList() }; diff --git a/src/ServiceControl.Audit.Persistence.Tests.RavenDB/SagaDetailsIndexTests.cs b/src/ServiceControl.Audit.Persistence.Tests.RavenDB/SagaDetailsIndexTests.cs new file mode 100644 index 0000000000..ffa30919ed --- /dev/null +++ b/src/ServiceControl.Audit.Persistence.Tests.RavenDB/SagaDetailsIndexTests.cs @@ -0,0 +1,123 @@ +namespace ServiceControl.Audit.Persistence.Tests +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using NUnit.Framework; + using Raven.Client.Documents.Indexes; + using Raven.Client.Documents.Operations.Indexes; + using ServiceControl.SagaAudit; + + [TestFixture] + class SagaDetailsIndexTests : PersistenceTestFixture + { + [Test] + public async Task Deletes_index_that_does_not_have_cap_of_50000() + { + await configuration.DocumentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex")); + + var indexWithout50000capDefinition = new IndexDefinition + { + Name = "SagaDetailsIndex", + Maps = new System.Collections.Generic.HashSet + { + @"from doc in docs + select new + { + doc.SagaId, + Id = doc.SagaId, + doc.SagaType, + Changes = new[] + { + new + { + Endpoint = doc.Endpoint, + FinishTime = doc.FinishTime, + InitiatingMessage = doc.InitiatingMessage, + OutgoingMessages = doc.OutgoingMessages, + StartTime = doc.StartTime, + StateAfterChange = doc.StateAfterChange, + Status = doc.Status + } + } + }" + }, + Reduce = @"from result in results + group result by result.SagaId + into g + let first = g.First() + select new + { + Id = first.SagaId, + SagaId = first.SagaId, + SagaType = first.SagaType, + Changes = g.SelectMany(x => x.Changes) + .OrderByDescending(x => x.FinishTime) + .ToList() + }" + }; + + var putIndexesOp = new PutIndexesOperation(indexWithout50000capDefinition); + + await configuration.DocumentStore.Maintenance.SendAsync(putIndexesOp); + + var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex"); + var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation); + + Assert.IsNotNull(sagaDetailsIndexDefinition); + + await Persistence.RavenDB.DatabaseSetup.DeleteLegacySagaDetailsIndex(configuration.DocumentStore, CancellationToken.None); + + sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation); + + Assert.IsNull(sagaDetailsIndexDefinition); + } + + [Test] + public async Task Does_not_delete_index_that_does_have_cap_of_50000() + { + await Persistence.RavenDB.DatabaseSetup.DeleteLegacySagaDetailsIndex(configuration.DocumentStore, CancellationToken.None); + + var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex"); + var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation); + + Assert.IsNotNull(sagaDetailsIndexDefinition); + } + + [Test] + public async Task Should_only_reduce_the_last_50000_saga_state_changes() + { + var sagaType = "MySagaType"; + var sagaState = "some-saga-state"; + + await IngestSagaAudits(new SagaSnapshot + { + SagaId = Guid.NewGuid(), + SagaType = sagaType, + Status = SagaStateChangeStatus.New, + StateAfterChange = sagaState + }); + + await configuration.CompleteDBOperation(); + + using (var session = configuration.DocumentStore.OpenAsyncSession()) + { + var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex"); + var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation); + + Assert.IsTrue(sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"), "The SagaDetails index definition does not contain a .Take(50000) to limit the number of saga state changes that are reduced by the map/reduce"); + } + } + + async Task IngestSagaAudits(params SagaSnapshot[] snapshots) + { + var unitOfWork = StartAuditUnitOfWork(snapshots.Length); + foreach (var snapshot in snapshots) + { + await unitOfWork.RecordSagaSnapshot(snapshot); + } + await unitOfWork.DisposeAsync(); + await configuration.CompleteDBOperation(); + } + } +} \ No newline at end of file