Skip to content

Commit

Permalink
Merge branch 'hotfix-3.0.3' into support-3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Jul 14, 2017
2 parents 89308d1 + 5836b43 commit 1ca58ba
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="When_checking_schema.cs" />
<Compile Include="When_using_ttbr.cs" />
<Compile Include="When_dispatching_messages.cs" />
<Compile Include="When_message_receive_takes_long.cs" />
Expand Down
54 changes: 54 additions & 0 deletions src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace NServiceBus.SqlServer.AcceptanceTests.TransportTransaction
{
using System.Threading.Tasks;
using NUnit.Framework;
using Transport.SQLServer;
using Transport;

public class When_checking_schema
{
const string QueueTableName = "CheckingSchema";

TableBasedQueue queue;

[SetUp]
public async Task SetUp()
{
var addressParser = new QueueAddressParser("dbo", null, null);

await ResetQueue(addressParser);

queue = new TableBasedQueue(addressParser.Parse(QueueTableName));
}

[Test]
public async Task It_returns_type_for_headers_column()
{
using (var connection = await sqlConnectionFactory.OpenNewConnection())
{
var type = await queue.CheckHeadersColumnType(connection);

Assert.AreEqual("nvarchar", type);
}
}

static SqlConnectionFactory sqlConnectionFactory = SqlConnectionFactory.Default(@"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True");

static async Task ResetQueue(QueueAddressParser addressParser)
{
var queueCreator = new QueueCreator(sqlConnectionFactory, addressParser);
var queueBindings = new QueueBindings();
queueBindings.BindReceiving(QueueTableName);

using (var connection = await sqlConnectionFactory.OpenNewConnection().ConfigureAwait(false))
{
using (var comm = connection.CreateCommand())
{
comm.CommandText = $"IF OBJECT_ID('{QueueTableName}', 'U') IS NOT NULL DROP TABLE {QueueTableName}";
comm.ExecuteNonQuery();
}
}
await queueCreator.CreateQueueIfNecessary(queueBindings, "").ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public async Task Should_stop_pumping_messages_after_first_unsuccessful_receive(
new QueuePurger(sqlConnectionFactory),
new ExpiredMessagesPurger(_ => sqlConnectionFactory.OpenNewConnection(), TimeSpan.MaxValue, 0),
new QueuePeeker(sqlConnectionFactory, new QueuePeekerOptions()),
new SchemaInspector(_ => sqlConnectionFactory.OpenNewConnection()),
new QueueAddressParser("dbo", null, null),
TimeSpan.MaxValue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<Compile Include="App_Packages\NSB.TransportTests.6.1.2\When_user_aborts_processing.cs" />
<Compile Include="App_Packages\NSB.TransportTests.6.1.2\When_using_non_durable_delivery.cs" />
<Compile Include="ConfigureSqlServerTransportInfrastructure.cs" />
<Compile Include="When_using_unicode_characters_in_headers.cs" />
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace NServiceBus.TransportTests
{
using System.Collections.Generic;
using System.Threading.Tasks;
using NUnit.Framework;
using Transport;

public class When_using_unicode_characters_in_headers : NServiceBusTransportTest
{
[Test]
public async Task Should_support_unicode_characters()
{
var onMessageCalled = new TaskCompletionSource<MessageContext>();

await StartPump(m =>
{
onMessageCalled.SetResult(m);
return Task.FromResult(0);
},
error => Task.FromResult(ErrorHandleResult.Handled),
TransportTransactionMode.None);

var sentHeaders = new Dictionary<string, string>
{
{ "a-B1", "a-B" },
{ "a-B2", "a-ɤϡ֎ᾣ♥-b" },
{ "a-ɤϡ֎ᾣ♥-B3", "a-B" },
{ "a-B4", "a-\U0001F60D-b" },
{ "a-\U0001F605-B5", "a-B" },
{ "a-B6", "a-😍-b" },
{ "a-😅-B7", "a-B" },
};
await SendMessage(InputQueueName, sentHeaders);

var messageContext = await onMessageCalled.Task;

Assert.IsNotEmpty(messageContext.Headers);
CollectionAssert.IsSupersetOf(messageContext.Headers, sentHeaders);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
var queuePurger = new LegacyQueuePurger(connectionFactory);
var queuePeeker = new LegacyQueuePeeker(connectionFactory, peekerOptions);

var schemaInspector = new SchemaInspector(queue => connectionFactory.OpenNewConnection(queue.TransportAddress));

var expiredMessagesPurger = CreateExpiredMessagesPurger(connectionFactory);

SqlScopeOptions scopeOptions;
Expand Down Expand Up @@ -66,7 +68,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
Func<QueueAddress, TableBasedQueue> queueFactory = qa => new TableBasedQueue(qa);

return new TransportReceiveInfrastructure(
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, addressParser, waitTimeCircuitBreaker),
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaInspector, addressParser, waitTimeCircuitBreaker),
() => new LegacyQueueCreator(connectionFactory, addressParser),
() => Task.FromResult(StartupCheckResult.Success));
}
Expand Down
1 change: 1 addition & 0 deletions src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<Compile Include="InternalsVisibleTo.cs" />
<Compile Include="Legacy\LegacyCallbacks.cs" />
<Compile Include="Receiving\QueuePeekerOptions.cs" />
<Compile Include="Receiving\SchemaInspector.cs" />
<Compile Include="Sending\IQueueDispatcher.cs" />
<Compile Include="Legacy\MultiInstance\LegacyQueueCreator.cs" />
<Compile Include="Legacy\MultiInstance\LegacyQueuePeeker.cs" />
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.SqlServer/Queuing/MessageRow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void PrepareSendCommand(SqlCommand command)
AddParameter(command, "ReplyToAddress", SqlDbType.VarChar, replyToAddress);
AddParameter(command, "Recoverable", SqlDbType.Bit, recoverable);
AddParameter(command, "TimeToBeReceivedMs", SqlDbType.Int, timeToBeReceived);
AddParameter(command, "Headers", SqlDbType.VarChar, headers);
AddParameter(command, "Headers", SqlDbType.NVarChar, headers);
AddParameter(command, "Body", SqlDbType.VarBinary, bodyBytes);
}

Expand Down
9 changes: 8 additions & 1 deletion src/NServiceBus.SqlServer/Queuing/Sql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ [CorrelationId] [varchar](255) NULL,
[ReplyToAddress] [varchar](255) NULL,
[Recoverable] [bit] NOT NULL,
[Expires] [datetime] NULL,
[Headers] [varchar](max) NOT NULL,
[Headers] [nvarchar](max) NOT NULL,
[Body] [varbinary](max) NULL,
[RowVersion] [bigint] IDENTITY(1,1) NOT NULL
) ON [PRIMARY];
Expand Down Expand Up @@ -71,5 +71,12 @@ [Expires] ASC
internal const string CheckIfExpiresIndexIsPresent = @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('{1}.{2}')";

internal const string ExpiresIndexName = "Index_Expires";

internal const string CheckHeadersColumnType = @"
SELECT t.name
FROM sys.columns c
INNER JOIN sys.types t ON c.system_type_id = t.system_type_id
WHERE c.object_id = OBJECT_ID('{0}.{1}')
AND c.name = 'Headers'";
}
}
18 changes: 12 additions & 6 deletions src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,24 @@ public async Task<int> PurgeBatchOfExpiredMessages(SqlConnection connection, int
}
}

public async Task LogWarningWhenIndexIsMissing(SqlConnection connection)
public async Task<bool> CheckExpiresIndexPresence(SqlConnection connection)
{
var commandText = Format(Sql.CheckIfExpiresIndexIsPresent, Sql.ExpiresIndexName, schemaName, tableName);

using (var command = new SqlCommand(commandText, connection))
{
var rowsCount = (int)await command.ExecuteScalarAsync().ConfigureAwait(false);
return rowsCount > 0;
}
}

if (rowsCount == 0)
{
Logger.WarnFormat(@"Table {0}.{1} does not contain index '{2}'." + Environment.NewLine + "Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.", schemaName, tableName, Sql.ExpiresIndexName);
}
public async Task<string> CheckHeadersColumnType(SqlConnection connection)
{
var commandText = Format(Sql.CheckHeadersColumnType, schemaName, tableName);

using (var command = new SqlCommand(commandText, connection))
{
return (string) await command.ExecuteScalarAsync().ConfigureAwait(false);
}
}

Expand All @@ -172,4 +178,4 @@ public override string ToString()

static ILog Logger = LogManager.GetLogger(typeof(TableBasedQueue));
}
}
}
15 changes: 0 additions & 15 deletions src/NServiceBus.SqlServer/Receiving/ExpiredMessagesPurger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,6 @@ public async Task Purge(TableBasedQueue queue, CancellationToken cancellationTok
}
}

public async Task Initialize(TableBasedQueue queue)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
await queue.LogWarningWhenIndexIsMissing(connection).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

Func<TableBasedQueue, Task<SqlConnection>> openConnection;
const int DefaultPurgeBatchSize = 10000;
static TimeSpan DefaultPurgeTaskDelay = TimeSpan.FromMinutes(5);
Expand Down
6 changes: 4 additions & 2 deletions src/NServiceBus.SqlServer/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@

class MessagePump : IPushMessages
{
public MessagePump(Func<TransportTransactionMode, ReceiveStrategy> receiveStrategyFactory, Func<QueueAddress, TableBasedQueue> queueFactory, IPurgeQueues queuePurger, ExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, QueueAddressParser addressParser, TimeSpan waitTimeCircuitBreaker)
public MessagePump(Func<TransportTransactionMode, ReceiveStrategy> receiveStrategyFactory, Func<QueueAddress, TableBasedQueue> queueFactory, IPurgeQueues queuePurger, ExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, SchemaInspector schemaInspector, QueueAddressParser addressParser, TimeSpan waitTimeCircuitBreaker)
{
this.receiveStrategyFactory = receiveStrategyFactory;
this.queuePurger = queuePurger;
this.queueFactory = queueFactory;
this.expiredMessagesPurger = expiredMessagesPurger;
this.queuePeeker = queuePeeker;
this.schemaInspector = schemaInspector;
this.addressParser = addressParser;
this.waitTimeCircuitBreaker = waitTimeCircuitBreaker;
}
Expand Down Expand Up @@ -47,7 +48,7 @@ public async Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext,
}
}

await expiredMessagesPurger.Initialize(inputQueue).ConfigureAwait(false);
await schemaInspector.PerformInspection(inputQueue).ConfigureAwait(false);
}

public void Start(PushRuntimeSettings limitations)
Expand Down Expand Up @@ -221,6 +222,7 @@ async Task PurgeExpiredMessages()
Func<QueueAddress, TableBasedQueue> queueFactory;
ExpiredMessagesPurger expiredMessagesPurger;
IPeekMessagesInQueue queuePeeker;
SchemaInspector schemaInspector;
QueueAddressParser addressParser;
TimeSpan waitTimeCircuitBreaker;
ConcurrentDictionary<Task, Task> runningReceiveTasks;
Expand Down
62 changes: 62 additions & 0 deletions src/NServiceBus.SqlServer/Receiving/SchemaInspector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
namespace NServiceBus.Transport.SQLServer
{
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Logging;

class SchemaInspector
{
public SchemaInspector(Func<TableBasedQueue, Task<SqlConnection>> openConnection)
{
this.openConnection = openConnection;
}

public async Task PerformInspection(TableBasedQueue queue)
{
await VerifyExpiredIndex(queue).ConfigureAwait(false);
await VerifyHeadersColumnType(queue).ConfigureAwait(false);
}

async Task VerifyExpiredIndex(TableBasedQueue queue)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
var indexExists = await queue.CheckExpiresIndexPresence(connection).ConfigureAwait(false);
if (!indexExists)
{
Logger.Warn($@"Table {queue} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.");
}
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

async Task VerifyHeadersColumnType(TableBasedQueue queue)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
var columnType = await queue.CheckHeadersColumnType(connection).ConfigureAwait(false);
if (string.Equals(columnType, "varchar", StringComparison.OrdinalIgnoreCase))
{
Logger.Warn($"Table {queue} stores headers in a non Unicode-compatible column (varchar).{Environment.NewLine}This may lead to data loss when sending non-ASCII characters in headers. SQL Server transport 3.1 and newer can take advantage of the nvarchar column type for headers. Please change the column type in the database.");
}
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

Func<TableBasedQueue, Task<SqlConnection>> openConnection;
static ILog Logger = LogManager.GetLogger<ExpiredMessagesPurger>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()

var queuePurger = new QueuePurger(connectionFactory);
var queuePeeker = new QueuePeeker(connectionFactory, queuePeekerOptions);
var schemaInspector = new SchemaInspector(_ => connectionFactory.OpenNewConnection());

var expiredMessagesPurger = CreateExpiredMessagesPurger(connectionFactory);

Func<QueueAddress, TableBasedQueue> queueFactory = qa => new TableBasedQueue(qa);

return new TransportReceiveInfrastructure(
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, addressParser, waitTimeCircuitBreaker),
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaInspector, addressParser, waitTimeCircuitBreaker),
() => new QueueCreator(connectionFactory, addressParser),
() => Task.FromResult(StartupCheckResult.Success));
}
Expand Down

0 comments on commit 1ca58ba

Please sign in to comment.