Skip to content

Commit

Permalink
Merge pull request #625 from Particular/port-customize-isolation-level
Browse files Browse the repository at this point in the history
Port customize isolation level
  • Loading branch information
HEskandari authored Feb 23, 2021
2 parents 11fc210 + 5058f19 commit e61540b
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 17 deletions.
3 changes: 1 addition & 2 deletions src/ScriptBuilder.Tests/APIApprovals.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using NServiceBus.Persistence.Sql.ScriptBuilder;
using NUnit.Framework;
using NUnit.Framework;
using Particular.Approvals;
using PublicApiGenerator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ namespace NServiceBus
public static void DisableCleanup(this NServiceBus.Outbox.OutboxSettings configuration) { }
public static void KeepDeduplicationDataFor(this NServiceBus.Outbox.OutboxSettings configuration, System.TimeSpan timeToKeepDeduplicationData) { }
public static void RunDeduplicationDataCleanupEvery(this NServiceBus.Outbox.OutboxSettings configuration, System.TimeSpan frequencyToRunDeduplicationDataCleanup) { }
public static void TransactionIsolationLevel(this NServiceBus.Outbox.OutboxSettings outboxSettings, System.Data.IsolationLevel isolationLevel) { }
public static void UsePessimisticConcurrencyControl(this NServiceBus.Outbox.OutboxSettings outboxSettings) { }
public static void UseTransactionScope(this NServiceBus.Outbox.OutboxSettings outboxSettings) { }
public static void UseTransactionScope(this NServiceBus.Outbox.OutboxSettings outboxSettings, System.Transactions.IsolationLevel isolationLevel) { }
}
public class static SqlPersistenceStorageSessionExtensions
{
Expand Down
7 changes: 3 additions & 4 deletions src/SqlPersistence.Tests/Outbox/OutboxPersisterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using System.Transactions;
using NServiceBus.Extensibility;
using NServiceBus.Outbox;
Expand Down Expand Up @@ -55,8 +54,8 @@ OutboxPersister Setup(string theSchema)
}

return transactionScope
? (ISqlOutboxTransaction)new TransactionScopeSqlOutboxTransaction(behavior, connectionManager)
: new AdoNetSqlOutboxTransaction(behavior, connectionManager);
? (ISqlOutboxTransaction)new TransactionScopeSqlOutboxTransaction(behavior, connectionManager, IsolationLevel.ReadCommitted)
: new AdoNetSqlOutboxTransaction(behavior, connectionManager, System.Data.IsolationLevel.ReadCommitted);
},
cleanupBatchSize: 5);
using (var connection = GetConnection()(theSchema))
Expand Down Expand Up @@ -176,7 +175,7 @@ public async Task TransactionScope()
var contextBag = CreateContextBag(messageId);
using (var transaction = await persister.BeginTransaction(contextBag))
{
var ambientTransaction = System.Transactions.Transaction.Current;
var ambientTransaction = Transaction.Current;
Assert.IsNotNull(ambientTransaction);

await transaction.Commit();
Expand Down
10 changes: 7 additions & 3 deletions src/SqlPersistence/Outbox/AdoNetSqlOutboxTransaction.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Data.Common;
using System.Data;
using System.Data.Common;
using System.Threading.Tasks;
using NServiceBus.Extensibility;
using NServiceBus.Logging;
Expand All @@ -9,11 +10,14 @@ class AdoNetSqlOutboxTransaction : ISqlOutboxTransaction
static ILog Log = LogManager.GetLogger<AdoNetSqlOutboxTransaction>();

IConnectionManager connectionManager;
IsolationLevel isolationLevel;
ConcurrencyControlStrategy concurrencyControlStrategy;

public AdoNetSqlOutboxTransaction(ConcurrencyControlStrategy concurrencyControlStrategy, IConnectionManager connectionManager)
public AdoNetSqlOutboxTransaction(ConcurrencyControlStrategy concurrencyControlStrategy,
IConnectionManager connectionManager, IsolationLevel isolationLevel)
{
this.connectionManager = connectionManager;
this.isolationLevel = isolationLevel;
this.concurrencyControlStrategy = concurrencyControlStrategy;
}

Expand All @@ -29,7 +33,7 @@ public async Task Begin(ContextBag context)
{
var incomingMessage = context.GetIncomingMessage();
Connection = await connectionManager.OpenConnection(incomingMessage).ConfigureAwait(false);
Transaction = Connection.BeginTransaction();
Transaction = Connection.BeginTransaction(isolationLevel);
await concurrencyControlStrategy.Begin(incomingMessage.MessageId, Connection, Transaction).ConfigureAwait(false);
}

Expand Down
18 changes: 14 additions & 4 deletions src/SqlPersistence/Outbox/SqlOutboxFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ protected override void Setup(FeatureConfigurationContext context)
var sqlDialect = settings.GetSqlDialect();

var pessimisticMode = context.Settings.GetOrDefault<bool>(ConcurrencyMode);
var transactionScopeMode = context.Settings.GetOrDefault<bool>(TransactionMode);
var transactionScopeMode = context.Settings.GetOrDefault<bool>(UseTransactionScope);

var adoTransactionIsolationLevel = context.Settings.GetOrDefault<System.Data.IsolationLevel>(AdoTransactionIsolationLevel);
if (adoTransactionIsolationLevel == default)
{
//Default to Read Committed
adoTransactionIsolationLevel = System.Data.IsolationLevel.ReadCommitted;
}
var transactionScopeIsolationLevel = context.Settings.GetOrDefault<System.Transactions.IsolationLevel>(TransactionScopeIsolationLevel);

var outboxCommands = OutboxCommandBuilder.Build(sqlDialect, tablePrefix);

Expand All @@ -34,8 +42,8 @@ protected override void Setup(FeatureConfigurationContext context)
ISqlOutboxTransaction transactionFactory()
{
return transactionScopeMode
? (ISqlOutboxTransaction)new TransactionScopeSqlOutboxTransaction(concurrencyControlStrategy, connectionManager)
: new AdoNetSqlOutboxTransaction(concurrencyControlStrategy, connectionManager);
? (ISqlOutboxTransaction)new TransactionScopeSqlOutboxTransaction(concurrencyControlStrategy, connectionManager, transactionScopeIsolationLevel)
: new AdoNetSqlOutboxTransaction(concurrencyControlStrategy, connectionManager, adoTransactionIsolationLevel);
}

var outboxPersister = new OutboxPersister(connectionManager, sqlDialect, outboxCommands, transactionFactory);
Expand Down Expand Up @@ -74,5 +82,7 @@ ISqlOutboxTransaction transactionFactory()
internal const string FrequencyToRunDeduplicationDataCleanup = "Persistence.Sql.Outbox.FrequencyToRunDeduplicationDataCleanup";
internal const string DisableCleanup = "Persistence.Sql.Outbox.DisableCleanup";
internal const string ConcurrencyMode = "Persistence.Sql.Outbox.PessimisticMode";
internal const string TransactionMode = "Persistence.Sql.Outbox.TransactionScopeMode";
internal const string UseTransactionScope = "Persistence.Sql.Outbox.TransactionScopeMode";
internal const string AdoTransactionIsolationLevel = "Persistence.Sql.Outbox.AdoTransactionIsolationLevel";
internal const string TransactionScopeIsolationLevel = "Persistence.Sql.Outbox.TransactionScopeIsolationLevel";
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace NServiceBus

namespace NServiceBus
{
using System.Transactions;
using Configuration.AdvancedExtensibility;
using System;
using Outbox;
Expand Down Expand Up @@ -61,14 +63,52 @@ public static void UsePessimisticConcurrencyControl(this OutboxSettings outboxSe
outboxSettings.GetSettings().Set(SqlOutboxFeature.ConcurrencyMode, true);
}

/// <summary>
/// Configures the outbox to use specific transaction level.
/// Only levels Read Committed, Repeatable Read and Serializable are supported.
/// </summary>
public static void TransactionIsolationLevel(this OutboxSettings outboxSettings, System.Data.IsolationLevel isolationLevel)
{
if (isolationLevel == System.Data.IsolationLevel.Chaos
|| isolationLevel == System.Data.IsolationLevel.ReadUncommitted
|| isolationLevel == System.Data.IsolationLevel.Snapshot
|| isolationLevel == System.Data.IsolationLevel.Unspecified)
{
throw new Exception($"Isolation level {isolationLevel} is not supported.");
}
outboxSettings.GetSettings().Set(SqlOutboxFeature.AdoTransactionIsolationLevel, isolationLevel);
}

/// <summary>
/// Configures the outbox to use TransactionScope instead of SqlTransaction. This allows wrapping the
/// the outbox transaction (and synchronized storage session it manages) and other database transactions in a single scope - provided that
/// Distributed Transaction Coordinator (DTC) infrastructure is configured.
///
/// Uses the default isolation level (Serializable).
/// </summary>
public static void UseTransactionScope(this OutboxSettings outboxSettings)
{
outboxSettings.GetSettings().Set(SqlOutboxFeature.TransactionMode, true);
UseTransactionScope(outboxSettings, IsolationLevel.Serializable);
}

/// <summary>
/// Configures the outbox to use TransactionScope instead of SqlTransaction. This allows wrapping the
/// the outbox transaction (and synchronized storage session it manages) and other database transactions in a single scope - provided that
/// Distributed Transaction Coordinator (DTC) infrastructure is configured.
/// </summary>
/// <param name="outboxSettings">Outbox settings.</param>
/// <param name="isolationLevel">Isolation level to use. Only levels Read Committed, Repeatable Read and Serializable are supported.</param>
public static void UseTransactionScope(this OutboxSettings outboxSettings, IsolationLevel isolationLevel)
{
if (isolationLevel == IsolationLevel.Chaos
|| isolationLevel == IsolationLevel.ReadUncommitted
|| isolationLevel == IsolationLevel.Snapshot
|| isolationLevel == IsolationLevel.Unspecified)
{
throw new Exception($"Isolation level {isolationLevel} is not supported.");
}
outboxSettings.GetSettings().Set(SqlOutboxFeature.UseTransactionScope, true);
outboxSettings.GetSettings().Set(SqlOutboxFeature.TransactionScopeIsolationLevel, isolationLevel);
}
}
}
12 changes: 10 additions & 2 deletions src/SqlPersistence/Outbox/TransactionScopeSqlOutboxTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ class TransactionScopeSqlOutboxTransaction : ISqlOutboxTransaction
static ILog Log = LogManager.GetLogger<TransactionScopeSqlOutboxTransaction>();

IConnectionManager connectionManager;
IsolationLevel isolationLevel;
ConcurrencyControlStrategy concurrencyControlStrategy;
TransactionScope transactionScope;
Transaction ambientTransaction;
bool commit;

public TransactionScopeSqlOutboxTransaction(ConcurrencyControlStrategy concurrencyControlStrategy, IConnectionManager connectionManager)
public TransactionScopeSqlOutboxTransaction(ConcurrencyControlStrategy concurrencyControlStrategy,
IConnectionManager connectionManager, IsolationLevel isolationLevel)
{
this.connectionManager = connectionManager;
this.isolationLevel = isolationLevel;
this.concurrencyControlStrategy = concurrencyControlStrategy;
}

Expand All @@ -27,7 +30,12 @@ public TransactionScopeSqlOutboxTransaction(ConcurrencyControlStrategy concurren
// Prepare is deliberately kept sync to allow floating of TxScope where needed
public void Prepare(ContextBag context)
{
transactionScope = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled);
var options = new TransactionOptions
{
IsolationLevel = isolationLevel
};

transactionScope = new TransactionScope(TransactionScopeOption.RequiresNew, options, TransactionScopeAsyncFlowOption.Enabled);
ambientTransaction = System.Transactions.Transaction.Current;
}

Expand Down

0 comments on commit e61540b

Please sign in to comment.