Skip to content

Commit

Permalink
Log warning for varchar (old) headers column
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcin Hoppe committed Jul 12, 2017
1 parent 4765488 commit eb27731
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="When_checking_schema.cs" />
<Compile Include="When_using_ttbr.cs" />
<Compile Include="When_dispatching_messages.cs" />
<Compile Include="When_message_receive_takes_long.cs" />
Expand Down
54 changes: 54 additions & 0 deletions src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace NServiceBus.SqlServer.AcceptanceTests.TransportTransaction
{
using System.Threading.Tasks;
using NUnit.Framework;
using Transport.SQLServer;
using Transport;

public class When_checking_schema
{
const string QueueTableName = "CheckingSchema";

TableBasedQueue queue;

[SetUp]
public async Task SetUp()
{
var addressParser = new QueueAddressParser("dbo", null, null);

await ResetQueue(addressParser);

queue = new TableBasedQueue(addressParser.Parse(QueueTableName));
}

[Test]
public async Task It_returns_type_for_headers_column()
{
using (var connection = await sqlConnectionFactory.OpenNewConnection())
{
var type = await queue.CheckHeadersColumnType(connection);

Assert.AreEqual("nvarchar", type);
}
}

static SqlConnectionFactory sqlConnectionFactory = SqlConnectionFactory.Default(@"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True");

static async Task ResetQueue(QueueAddressParser addressParser)
{
var queueCreator = new QueueCreator(sqlConnectionFactory, addressParser);
var queueBindings = new QueueBindings();
queueBindings.BindReceiving(QueueTableName);

using (var connection = await sqlConnectionFactory.OpenNewConnection().ConfigureAwait(false))
{
using (var comm = connection.CreateCommand())
{
comm.CommandText = $"IF OBJECT_ID('{QueueTableName}', 'U') IS NOT NULL DROP TABLE {QueueTableName}";
comm.ExecuteNonQuery();
}
}
await queueCreator.CreateQueueIfNecessary(queueBindings, "").ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public async Task Should_stop_pumping_messages_after_first_unsuccessful_receive(
new QueuePurger(sqlConnectionFactory),
new ExpiredMessagesPurger(_ => sqlConnectionFactory.OpenNewConnection(), TimeSpan.MaxValue, 0),
new QueuePeeker(sqlConnectionFactory, new QueuePeekerOptions()),
new SchemaInspector(_ => sqlConnectionFactory.OpenNewConnection()),
new QueueAddressParser("dbo", null, null),
TimeSpan.MaxValue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
var queuePurger = new LegacyQueuePurger(connectionFactory);
var queuePeeker = new LegacyQueuePeeker(connectionFactory, peekerOptions);

var schemaInspector = new SchemaInspector(queue => connectionFactory.OpenNewConnection(queue.TransportAddress));

var expiredMessagesPurger = CreateExpiredMessagesPurger(connectionFactory);

SqlScopeOptions scopeOptions;
Expand Down Expand Up @@ -66,7 +68,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
Func<QueueAddress, TableBasedQueue> queueFactory = qa => new TableBasedQueue(qa);

return new TransportReceiveInfrastructure(
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, addressParser, waitTimeCircuitBreaker),
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaInspector, addressParser, waitTimeCircuitBreaker),
() => new LegacyQueueCreator(connectionFactory, addressParser),
() => Task.FromResult(StartupCheckResult.Success));
}
Expand Down
1 change: 1 addition & 0 deletions src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<Compile Include="InternalsVisibleTo.cs" />
<Compile Include="Legacy\LegacyCallbacks.cs" />
<Compile Include="Receiving\QueuePeekerOptions.cs" />
<Compile Include="Receiving\SchemaInspector.cs" />
<Compile Include="Sending\IQueueDispatcher.cs" />
<Compile Include="Legacy\MultiInstance\LegacyQueueCreator.cs" />
<Compile Include="Legacy\MultiInstance\LegacyQueuePeeker.cs" />
Expand Down
7 changes: 7 additions & 0 deletions src/NServiceBus.SqlServer/Queuing/Sql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,12 @@ [Expires] ASC
internal const string CheckIfExpiresIndexIsPresent = @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('{1}.{2}')";

internal const string ExpiresIndexName = "Index_Expires";

internal const string CheckHeadersColumnType = @"
SELECT t.name
FROM sys.columns c
INNER JOIN sys.types t ON c.system_type_id = t.system_type_id
WHERE c.object_id = OBJECT_ID('{0}.{1}')
AND c.name = 'Headers'";
}
}
20 changes: 15 additions & 5 deletions src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,28 @@ public async Task<int> PurgeBatchOfExpiredMessages(SqlConnection connection, int
}
}

public async Task LogWarningWhenIndexIsMissing(SqlConnection connection)
public async Task<bool> CheckExpiresIndexPresence(SqlConnection connection)
{
var commandText = Format(Sql.CheckIfExpiresIndexIsPresent, Sql.ExpiresIndexName, schemaName, tableName);

using (var command = new SqlCommand(commandText, connection))
{
var rowsCount = (int)await command.ExecuteScalarAsync().ConfigureAwait(false);
return rowsCount > 0;
//if (rowsCount == 0)
//{
// Logger.WarnFormat(@"Table {0}.{1} does not contain index '{2}'." + Environment.NewLine + "Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.", schemaName, tableName, Sql.ExpiresIndexName);
//}
}
}

if (rowsCount == 0)
{
Logger.WarnFormat(@"Table {0}.{1} does not contain index '{2}'." + Environment.NewLine + "Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.", schemaName, tableName, Sql.ExpiresIndexName);
}
public async Task<string> CheckHeadersColumnType(SqlConnection connection)
{
var commandText = Format(Sql.CheckHeadersColumnType, schemaName, tableName);

using (var command = new SqlCommand(commandText, connection))
{
return (string) await command.ExecuteScalarAsync().ConfigureAwait(false);
}
}

Expand Down
15 changes: 0 additions & 15 deletions src/NServiceBus.SqlServer/Receiving/ExpiredMessagesPurger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,6 @@ public async Task Purge(TableBasedQueue queue, CancellationToken cancellationTok
}
}

public async Task Initialize(TableBasedQueue queue)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
await queue.LogWarningWhenIndexIsMissing(connection).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

Func<TableBasedQueue, Task<SqlConnection>> openConnection;
const int DefaultPurgeBatchSize = 10000;
static TimeSpan DefaultPurgeTaskDelay = TimeSpan.FromMinutes(5);
Expand Down
6 changes: 4 additions & 2 deletions src/NServiceBus.SqlServer/Receiving/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@

class MessagePump : IPushMessages
{
public MessagePump(Func<TransportTransactionMode, ReceiveStrategy> receiveStrategyFactory, Func<QueueAddress, TableBasedQueue> queueFactory, IPurgeQueues queuePurger, ExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, QueueAddressParser addressParser, TimeSpan waitTimeCircuitBreaker)
public MessagePump(Func<TransportTransactionMode, ReceiveStrategy> receiveStrategyFactory, Func<QueueAddress, TableBasedQueue> queueFactory, IPurgeQueues queuePurger, ExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, SchemaInspector schemaInspector, QueueAddressParser addressParser, TimeSpan waitTimeCircuitBreaker)
{
this.receiveStrategyFactory = receiveStrategyFactory;
this.queuePurger = queuePurger;
this.queueFactory = queueFactory;
this.expiredMessagesPurger = expiredMessagesPurger;
this.queuePeeker = queuePeeker;
this.schemaInspector = schemaInspector;
this.addressParser = addressParser;
this.waitTimeCircuitBreaker = waitTimeCircuitBreaker;
}
Expand Down Expand Up @@ -47,7 +48,7 @@ public async Task Init(Func<MessageContext, Task> onMessage, Func<ErrorContext,
}
}

await expiredMessagesPurger.Initialize(inputQueue).ConfigureAwait(false);
await schemaInspector.PerformInspection(inputQueue).ConfigureAwait(false);
}

public void Start(PushRuntimeSettings limitations)
Expand Down Expand Up @@ -221,6 +222,7 @@ async Task PurgeExpiredMessages()
Func<QueueAddress, TableBasedQueue> queueFactory;
ExpiredMessagesPurger expiredMessagesPurger;
IPeekMessagesInQueue queuePeeker;
SchemaInspector schemaInspector;
QueueAddressParser addressParser;
TimeSpan waitTimeCircuitBreaker;
ConcurrentDictionary<Task, Task> runningReceiveTasks;
Expand Down
62 changes: 62 additions & 0 deletions src/NServiceBus.SqlServer/Receiving/SchemaInspector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
namespace NServiceBus.Transport.SQLServer
{
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using Logging;

class SchemaInspector
{
public SchemaInspector(Func<TableBasedQueue, Task<SqlConnection>> openConnection)
{
this.openConnection = openConnection;
}

public async Task PerformInspection(TableBasedQueue queue)
{
await VerifyExpiredIndex(queue).ConfigureAwait(false);
await VerifyHeadersColumnType(queue).ConfigureAwait(false);
}

async Task VerifyExpiredIndex(TableBasedQueue queue)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
var indexExists = await queue.CheckExpiresIndexPresence(connection).ConfigureAwait(false);
if (!indexExists)
{
Logger.Warn($@"Table {queue} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.");
}
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

async Task VerifyHeadersColumnType(TableBasedQueue queue)
{
try
{
using (var connection = await openConnection(queue).ConfigureAwait(false))
{
var columnType = await queue.CheckHeadersColumnType(connection).ConfigureAwait(false);
if (string.Equals(columnType, "varchar", StringComparison.OrdinalIgnoreCase))
{
Logger.Warn($"Table {queue} stores headers in a non Unicode-compatible column (varchar).{Environment.NewLine}This may lead to data loss when sending non-ASCII characters in headers. SQL Server transport 3.1 and newer can take advantage of the nvarchar column type for headers. Please change the column type in the database.");
}
}
}
catch (Exception ex)
{
Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex);
}
}

Func<TableBasedQueue, Task<SqlConnection>> openConnection;
static ILog Logger = LogManager.GetLogger<ExpiredMessagesPurger>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,14 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()

var queuePurger = new QueuePurger(connectionFactory);
var queuePeeker = new QueuePeeker(connectionFactory, queuePeekerOptions);
var schemaInspector = new SchemaInspector(_ => connectionFactory.OpenNewConnection());

var expiredMessagesPurger = CreateExpiredMessagesPurger(connectionFactory);

Func<QueueAddress, TableBasedQueue> queueFactory = qa => new TableBasedQueue(qa);

return new TransportReceiveInfrastructure(
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, addressParser, waitTimeCircuitBreaker),
() => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaInspector, addressParser, waitTimeCircuitBreaker),
() => new QueueCreator(connectionFactory, addressParser),
() => Task.FromResult(StartupCheckResult.Success));
}
Expand Down

0 comments on commit eb27731

Please sign in to comment.