diff --git a/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj b/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj index aaab0fcf0..415efd324 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj +++ b/src/NServiceBus.SqlServer.IntegrationTests/NServiceBus.SqlServer.IntegrationTests.csproj @@ -71,6 +71,7 @@ + diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs new file mode 100644 index 000000000..6b434f59b --- /dev/null +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_checking_schema.cs @@ -0,0 +1,54 @@ +namespace NServiceBus.SqlServer.AcceptanceTests.TransportTransaction +{ + using System.Threading.Tasks; + using NUnit.Framework; + using Transport.SQLServer; + using Transport; + + public class When_checking_schema + { + const string QueueTableName = "CheckingSchema"; + + TableBasedQueue queue; + + [SetUp] + public async Task SetUp() + { + var addressParser = new QueueAddressParser("dbo", null, null); + + await ResetQueue(addressParser); + + queue = new TableBasedQueue(addressParser.Parse(QueueTableName)); + } + + [Test] + public async Task It_returns_type_for_headers_column() + { + using (var connection = await sqlConnectionFactory.OpenNewConnection()) + { + var type = await queue.CheckHeadersColumnType(connection); + + Assert.AreEqual("nvarchar", type); + } + } + + static SqlConnectionFactory sqlConnectionFactory = SqlConnectionFactory.Default(@"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True"); + + static async Task ResetQueue(QueueAddressParser addressParser) + { + var queueCreator = new QueueCreator(sqlConnectionFactory, addressParser); + var queueBindings = new QueueBindings(); + queueBindings.BindReceiving(QueueTableName); + + using (var connection = await sqlConnectionFactory.OpenNewConnection().ConfigureAwait(false)) + { + using (var comm = connection.CreateCommand()) + { + comm.CommandText = $"IF OBJECT_ID('{QueueTableName}', 'U') IS NOT NULL DROP TABLE {QueueTableName}"; + comm.ExecuteNonQuery(); + } + } + await queueCreator.CreateQueueIfNecessary(queueBindings, "").ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs b/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs index f8fa64b28..124738659 100644 --- a/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs +++ b/src/NServiceBus.SqlServer.IntegrationTests/When_receiving_messages.cs @@ -25,6 +25,7 @@ public async Task Should_stop_pumping_messages_after_first_unsuccessful_receive( new QueuePurger(sqlConnectionFactory), new ExpiredMessagesPurger(_ => sqlConnectionFactory.OpenNewConnection(), TimeSpan.MaxValue, 0), new QueuePeeker(sqlConnectionFactory, new QueuePeekerOptions()), + new SchemaInspector(_ => sqlConnectionFactory.OpenNewConnection()), new QueueAddressParser("dbo", null, null), TimeSpan.MaxValue); diff --git a/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj b/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj index 9a53bbe2b..b565ffe3f 100644 --- a/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj +++ b/src/NServiceBus.SqlServer.TransportTests/NServiceBus.SqlServer.TransportTests.csproj @@ -78,6 +78,7 @@ + diff --git a/src/NServiceBus.SqlServer.TransportTests/When_using_unicode_characters_in_headers.cs b/src/NServiceBus.SqlServer.TransportTests/When_using_unicode_characters_in_headers.cs new file mode 100644 index 000000000..d91ddbaf8 --- /dev/null +++ b/src/NServiceBus.SqlServer.TransportTests/When_using_unicode_characters_in_headers.cs @@ -0,0 +1,41 @@ +namespace NServiceBus.TransportTests +{ + using System.Collections.Generic; + using System.Threading.Tasks; + using NUnit.Framework; + using Transport; + + public class When_using_unicode_characters_in_headers : NServiceBusTransportTest + { + [Test] + public async Task Should_support_unicode_characters() + { + var onMessageCalled = new TaskCompletionSource(); + + await StartPump(m => + { + onMessageCalled.SetResult(m); + return Task.FromResult(0); + }, + error => Task.FromResult(ErrorHandleResult.Handled), + TransportTransactionMode.None); + + var sentHeaders = new Dictionary + { + { "a-B1", "a-B" }, + { "a-B2", "a-ɤϡ֎ᾣ♥-b" }, + { "a-ɤϡ֎ᾣ♥-B3", "a-B" }, + { "a-B4", "a-\U0001F60D-b" }, + { "a-\U0001F605-B5", "a-B" }, + { "a-B6", "a-😍-b" }, + { "a-😅-B7", "a-B" }, + }; + await SendMessage(InputQueueName, sentHeaders); + + var messageContext = await onMessageCalled.Task; + + Assert.IsNotEmpty(messageContext.Headers); + CollectionAssert.IsSupersetOf(messageContext.Headers, sentHeaders); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Legacy/MultiInstance/LegacySqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/Legacy/MultiInstance/LegacySqlServerTransportInfrastructure.cs index c346c76cc..7e6d01adc 100644 --- a/src/NServiceBus.SqlServer/Legacy/MultiInstance/LegacySqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/Legacy/MultiInstance/LegacySqlServerTransportInfrastructure.cs @@ -38,6 +38,8 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() var queuePurger = new LegacyQueuePurger(connectionFactory); var queuePeeker = new LegacyQueuePeeker(connectionFactory, peekerOptions); + var schemaInspector = new SchemaInspector(queue => connectionFactory.OpenNewConnection(queue.TransportAddress)); + var expiredMessagesPurger = CreateExpiredMessagesPurger(connectionFactory); SqlScopeOptions scopeOptions; @@ -66,7 +68,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() Func queueFactory = qa => new TableBasedQueue(qa); return new TransportReceiveInfrastructure( - () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, addressParser, waitTimeCircuitBreaker), + () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaInspector, addressParser, waitTimeCircuitBreaker), () => new LegacyQueueCreator(connectionFactory, addressParser), () => Task.FromResult(StartupCheckResult.Success)); } diff --git a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj index 4684d9870..6e79247f5 100644 --- a/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj +++ b/src/NServiceBus.SqlServer/NServiceBus.SqlServer.csproj @@ -86,6 +86,7 @@ + diff --git a/src/NServiceBus.SqlServer/Queuing/MessageRow.cs b/src/NServiceBus.SqlServer/Queuing/MessageRow.cs index be5fc03c9..1d2b7d594 100644 --- a/src/NServiceBus.SqlServer/Queuing/MessageRow.cs +++ b/src/NServiceBus.SqlServer/Queuing/MessageRow.cs @@ -47,7 +47,7 @@ public void PrepareSendCommand(SqlCommand command) AddParameter(command, "ReplyToAddress", SqlDbType.VarChar, replyToAddress); AddParameter(command, "Recoverable", SqlDbType.Bit, recoverable); AddParameter(command, "TimeToBeReceivedMs", SqlDbType.Int, timeToBeReceived); - AddParameter(command, "Headers", SqlDbType.VarChar, headers); + AddParameter(command, "Headers", SqlDbType.NVarChar, headers); AddParameter(command, "Body", SqlDbType.VarBinary, bodyBytes); } diff --git a/src/NServiceBus.SqlServer/Queuing/Sql.cs b/src/NServiceBus.SqlServer/Queuing/Sql.cs index 53f3d7ac3..73ef3c3c6 100644 --- a/src/NServiceBus.SqlServer/Queuing/Sql.cs +++ b/src/NServiceBus.SqlServer/Queuing/Sql.cs @@ -41,7 +41,7 @@ [CorrelationId] [varchar](255) NULL, [ReplyToAddress] [varchar](255) NULL, [Recoverable] [bit] NOT NULL, [Expires] [datetime] NULL, - [Headers] [varchar](max) NOT NULL, + [Headers] [nvarchar](max) NOT NULL, [Body] [varbinary](max) NULL, [RowVersion] [bigint] IDENTITY(1,1) NOT NULL ) ON [PRIMARY]; @@ -71,5 +71,12 @@ [Expires] ASC internal const string CheckIfExpiresIndexIsPresent = @"SELECT COUNT(*) FROM [sys].[indexes] WHERE [name] = '{0}' AND [object_id] = OBJECT_ID('{1}.{2}')"; internal const string ExpiresIndexName = "Index_Expires"; + + internal const string CheckHeadersColumnType = @" +SELECT t.name +FROM sys.columns c +INNER JOIN sys.types t ON c.system_type_id = t.system_type_id +WHERE c.object_id = OBJECT_ID('{0}.{1}') + AND c.name = 'Headers'"; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs b/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs index bea9acb5c..969d4cdca 100644 --- a/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs +++ b/src/NServiceBus.SqlServer/Queuing/TableBasedQueue.cs @@ -147,18 +147,24 @@ public async Task PurgeBatchOfExpiredMessages(SqlConnection connection, int } } - public async Task LogWarningWhenIndexIsMissing(SqlConnection connection) + public async Task CheckExpiresIndexPresence(SqlConnection connection) { var commandText = Format(Sql.CheckIfExpiresIndexIsPresent, Sql.ExpiresIndexName, schemaName, tableName); using (var command = new SqlCommand(commandText, connection)) { var rowsCount = (int)await command.ExecuteScalarAsync().ConfigureAwait(false); + return rowsCount > 0; + } + } - if (rowsCount == 0) - { - Logger.WarnFormat(@"Table {0}.{1} does not contain index '{2}'." + Environment.NewLine + "Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information.", schemaName, tableName, Sql.ExpiresIndexName); - } + public async Task CheckHeadersColumnType(SqlConnection connection) + { + var commandText = Format(Sql.CheckHeadersColumnType, schemaName, tableName); + + using (var command = new SqlCommand(commandText, connection)) + { + return (string) await command.ExecuteScalarAsync().ConfigureAwait(false); } } @@ -172,4 +178,4 @@ public override string ToString() static ILog Logger = LogManager.GetLogger(typeof(TableBasedQueue)); } -} \ No newline at end of file +} diff --git a/src/NServiceBus.SqlServer/Receiving/ExpiredMessagesPurger.cs b/src/NServiceBus.SqlServer/Receiving/ExpiredMessagesPurger.cs index dc06e1805..8ecebd395 100644 --- a/src/NServiceBus.SqlServer/Receiving/ExpiredMessagesPurger.cs +++ b/src/NServiceBus.SqlServer/Receiving/ExpiredMessagesPurger.cs @@ -51,21 +51,6 @@ public async Task Purge(TableBasedQueue queue, CancellationToken cancellationTok } } - public async Task Initialize(TableBasedQueue queue) - { - try - { - using (var connection = await openConnection(queue).ConfigureAwait(false)) - { - await queue.LogWarningWhenIndexIsMissing(connection).ConfigureAwait(false); - } - } - catch (Exception ex) - { - Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex); - } - } - Func> openConnection; const int DefaultPurgeBatchSize = 10000; static TimeSpan DefaultPurgeTaskDelay = TimeSpan.FromMinutes(5); diff --git a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs index c198cfabd..3e2061d1f 100644 --- a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs +++ b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs @@ -10,13 +10,14 @@ class MessagePump : IPushMessages { - public MessagePump(Func receiveStrategyFactory, Func queueFactory, IPurgeQueues queuePurger, ExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, QueueAddressParser addressParser, TimeSpan waitTimeCircuitBreaker) + public MessagePump(Func receiveStrategyFactory, Func queueFactory, IPurgeQueues queuePurger, ExpiredMessagesPurger expiredMessagesPurger, IPeekMessagesInQueue queuePeeker, SchemaInspector schemaInspector, QueueAddressParser addressParser, TimeSpan waitTimeCircuitBreaker) { this.receiveStrategyFactory = receiveStrategyFactory; this.queuePurger = queuePurger; this.queueFactory = queueFactory; this.expiredMessagesPurger = expiredMessagesPurger; this.queuePeeker = queuePeeker; + this.schemaInspector = schemaInspector; this.addressParser = addressParser; this.waitTimeCircuitBreaker = waitTimeCircuitBreaker; } @@ -47,7 +48,7 @@ public async Task Init(Func onMessage, Func queueFactory; ExpiredMessagesPurger expiredMessagesPurger; IPeekMessagesInQueue queuePeeker; + SchemaInspector schemaInspector; QueueAddressParser addressParser; TimeSpan waitTimeCircuitBreaker; ConcurrentDictionary runningReceiveTasks; diff --git a/src/NServiceBus.SqlServer/Receiving/SchemaInspector.cs b/src/NServiceBus.SqlServer/Receiving/SchemaInspector.cs new file mode 100644 index 000000000..5720fc4c3 --- /dev/null +++ b/src/NServiceBus.SqlServer/Receiving/SchemaInspector.cs @@ -0,0 +1,62 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System; + using System.Data.SqlClient; + using System.Threading.Tasks; + using Logging; + + class SchemaInspector + { + public SchemaInspector(Func> openConnection) + { + this.openConnection = openConnection; + } + + public async Task PerformInspection(TableBasedQueue queue) + { + await VerifyExpiredIndex(queue).ConfigureAwait(false); + await VerifyHeadersColumnType(queue).ConfigureAwait(false); + } + + async Task VerifyExpiredIndex(TableBasedQueue queue) + { + try + { + using (var connection = await openConnection(queue).ConfigureAwait(false)) + { + var indexExists = await queue.CheckExpiresIndexPresence(connection).ConfigureAwait(false); + if (!indexExists) + { + Logger.Warn($@"Table {queue} does not contain index 'Index_Expires'.{Environment.NewLine}Adding this index will speed up the process of purging expired messages from the queue. Please consult the documentation for further information."); + } + } + } + catch (Exception ex) + { + Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex); + } + } + + async Task VerifyHeadersColumnType(TableBasedQueue queue) + { + try + { + using (var connection = await openConnection(queue).ConfigureAwait(false)) + { + var columnType = await queue.CheckHeadersColumnType(connection).ConfigureAwait(false); + if (string.Equals(columnType, "varchar", StringComparison.OrdinalIgnoreCase)) + { + Logger.Warn($"Table {queue} stores headers in a non Unicode-compatible column (varchar).{Environment.NewLine}This may lead to data loss when sending non-ASCII characters in headers. SQL Server transport 3.1 and newer can take advantage of the nvarchar column type for headers. Please change the column type in the database."); + } + } + } + catch (Exception ex) + { + Logger.WarnFormat("Checking indexes on table {0} failed. Exception: {1}", queue, ex); + } + } + + Func> openConnection; + static ILog Logger = LogManager.GetLogger(); + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index 217609845..34b1400e8 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -73,13 +73,14 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() var queuePurger = new QueuePurger(connectionFactory); var queuePeeker = new QueuePeeker(connectionFactory, queuePeekerOptions); + var schemaInspector = new SchemaInspector(_ => connectionFactory.OpenNewConnection()); var expiredMessagesPurger = CreateExpiredMessagesPurger(connectionFactory); Func queueFactory = qa => new TableBasedQueue(qa); return new TransportReceiveInfrastructure( - () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, addressParser, waitTimeCircuitBreaker), + () => new MessagePump(receiveStrategyFactory, queueFactory, queuePurger, expiredMessagesPurger, queuePeeker, schemaInspector, addressParser, waitTimeCircuitBreaker), () => new QueueCreator(connectionFactory, addressParser), () => Task.FromResult(StartupCheckResult.Success)); }