Skip to content

Commit

Permalink
Cherry pick cap (#3885)
Browse files Browse the repository at this point in the history
* Cap the maximum number of entries in SagaDetailsIndex to 50k

* Add test to check that index has a take

* Add a unit test for the removal of the legacy saga details index

---------

Co-authored-by: Szymon Pobiega <szymon.pobiega@gmail.com>
  • Loading branch information
WilliamBZA and SzymonPobiega authored Dec 7, 2023
1 parent a2e7a5f commit f34afdc
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/ServiceControl.Audit.Persistence.RavenDB/DatabaseSetup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ await documentStore.Maintenance.Server
new SagaDetailsIndex()
};

await DeleteLegacySagaDetailsIndex(documentStore, cancellationToken);

if (configuration.EnableFullTextSearch)
{
indexList.Add(new MessagesViewIndexWithFullTextSearch());
Expand All @@ -69,6 +71,22 @@ await documentStore.Maintenance
await documentStore.Maintenance.SendAsync(new ConfigureExpirationOperation(expirationConfig), cancellationToken);
}

public static async Task DeleteLegacySagaDetailsIndex(IDocumentStore documentStore, CancellationToken cancellationToken)
{
// If the SagaDetailsIndex exists but does not have a .Take(50000), then we remove the current SagaDetailsIndex and
// create a new one. If we do not remove the current one, then RavenDB will attempt to do a side-by-side migration.
// Doing a side-by-side migration results in the index never swapping if there is constant ingestion as RavenDB will wait.
// for the index to not be stale before swapping to the new index. Constant ingestion means the index will never be not-stale.
// This needs to stay in place until the next major version as the user could upgrade from an older version of the current
// Major (v5.x.x) which might still have the incorrect index.
var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
var sagaDetailsIndexDefinition = await documentStore.Maintenance.SendAsync(sagaDetailsIndexOperation, cancellationToken);
if (sagaDetailsIndexDefinition != null && !sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"))
{
await documentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex"), cancellationToken);
}
}

readonly DatabaseConfiguration configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ into g
SagaType = first.SagaType,
Changes = g.SelectMany(x => x.Changes)
.OrderByDescending(x => x.FinishTime)
.Take(50000)
.ToList()
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
namespace ServiceControl.Audit.Persistence.Tests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Raven.Client.Documents.Indexes;
using Raven.Client.Documents.Operations.Indexes;
using ServiceControl.SagaAudit;

[TestFixture]
class SagaDetailsIndexTests : PersistenceTestFixture
{
[Test]
public async Task Deletes_index_that_does_not_have_cap_of_50000()
{
await configuration.DocumentStore.Maintenance.SendAsync(new DeleteIndexOperation("SagaDetailsIndex"));

var indexWithout50000capDefinition = new IndexDefinition
{
Name = "SagaDetailsIndex",
Maps = new System.Collections.Generic.HashSet<string>
{
@"from doc in docs
select new
{
doc.SagaId,
Id = doc.SagaId,
doc.SagaType,
Changes = new[]
{
new
{
Endpoint = doc.Endpoint,
FinishTime = doc.FinishTime,
InitiatingMessage = doc.InitiatingMessage,
OutgoingMessages = doc.OutgoingMessages,
StartTime = doc.StartTime,
StateAfterChange = doc.StateAfterChange,
Status = doc.Status
}
}
}"
},
Reduce = @"from result in results
group result by result.SagaId
into g
let first = g.First()
select new
{
Id = first.SagaId,
SagaId = first.SagaId,
SagaType = first.SagaType,
Changes = g.SelectMany(x => x.Changes)
.OrderByDescending(x => x.FinishTime)
.ToList()
}"
};

var putIndexesOp = new PutIndexesOperation(indexWithout50000capDefinition);

await configuration.DocumentStore.Maintenance.SendAsync(putIndexesOp);

var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);

Assert.IsNotNull(sagaDetailsIndexDefinition);

await Persistence.RavenDB.DatabaseSetup.DeleteLegacySagaDetailsIndex(configuration.DocumentStore, CancellationToken.None);

sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);

Assert.IsNull(sagaDetailsIndexDefinition);
}

[Test]
public async Task Does_not_delete_index_that_does_have_cap_of_50000()
{
await Persistence.RavenDB.DatabaseSetup.DeleteLegacySagaDetailsIndex(configuration.DocumentStore, CancellationToken.None);

var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);

Assert.IsNotNull(sagaDetailsIndexDefinition);
}

[Test]
public async Task Should_only_reduce_the_last_50000_saga_state_changes()
{
var sagaType = "MySagaType";
var sagaState = "some-saga-state";

await IngestSagaAudits(new SagaSnapshot
{
SagaId = Guid.NewGuid(),
SagaType = sagaType,
Status = SagaStateChangeStatus.New,
StateAfterChange = sagaState
});

await configuration.CompleteDBOperation();

using (var session = configuration.DocumentStore.OpenAsyncSession())
{
var sagaDetailsIndexOperation = new GetIndexOperation("SagaDetailsIndex");
var sagaDetailsIndexDefinition = await configuration.DocumentStore.Maintenance.SendAsync(sagaDetailsIndexOperation);

Assert.IsTrue(sagaDetailsIndexDefinition.Reduce.Contains("Take(50000)"), "The SagaDetails index definition does not contain a .Take(50000) to limit the number of saga state changes that are reduced by the map/reduce");
}
}

async Task IngestSagaAudits(params SagaSnapshot[] snapshots)
{
var unitOfWork = StartAuditUnitOfWork(snapshots.Length);
foreach (var snapshot in snapshots)
{
await unitOfWork.RecordSagaSnapshot(snapshot);
}
await unitOfWork.DisposeAsync();
await configuration.CompleteDBOperation();
}
}
}

0 comments on commit f34afdc

Please sign in to comment.