From 3f0261f8e3a65d3b35cad13a0a73364dcb299f31 Mon Sep 17 00:00:00 2001 From: Jo Palac Date: Fri, 27 Sep 2024 12:01:53 +1000 Subject: [PATCH] Add support for PostgreSQL Transport (#4443) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Bring sqlserver files in line with sqlserver transport to avoid confusion in naming * First changes for PostgreSql transport * nullable fix * test fix * manifest file test fix * Added test project * Do some random work on postgresql support - Update query for current sequence value - Update addressing types - Update query for estimating queue length * build fix * fix setting value key * remove unused code * customise queue names with schema for postgres when provisioning queues * added todo for something that errors * removed commented out code * connection validator for postgres * Fix subscription table parameter for postgresql * fix test * Translate transport sending queues in instances * fix test * Change ToTransportQualifiedQueueNameCore to use double quotes * use postgres customisation for error queue names * alternative to get latest identity value for table that doesn't rely on current session increment * revert double assigning of schema * Use new alpha of the transport with fixed delay bug * Add connection string default * Translate queue names when using dispatcher directly in tests * fix query to work with postgres syntax * fix syntax issues (functionality still broken) * fix RunScenario test * Translate queues for forwarding message in Audit and Error ingestion * attempt to get postgres running on ci * added test category for psotgres * update postgres env variable * reverting app.config changes * clarify not using nolock * update connection string name so it matches existing ones * Perform az login for postgres * fix error catching on failed connection * fix sqlserver tests * query fix * Fix indicator update for transports that display endpointname on SP, ie slq server and postgresql * Translate throughput queue for sending from monitoring * fix extra character * Add test for throughput fix * Cleanup * Avoid translating audit log queue multiple times * Avoid translating multiple times when reporting throughput * Avoid translating error log queue multiple times * Remove message driven pubsub comment * Fix bad merge * More bad merge fixes * Formatting * translate staging queue * fix dequeue tests * variable name cleanup + add ability to quote subscription table * Run container tests for postgres * Fix docker command * Use compose file to run postgres * Fix container name * Fix pwd * Fix service deps * Skip volume * Add health check * use default username and database * Revert pool size * Remove unneded settings * limit postgreSQL connections for monitoring and audit to prevent connection starvation * fix rebase * fix null reference in tests * more test fixing * change name of default max concurrency level properties and apply to sql server transport * fix CI test * revert changes to SQL server max connections defaults * Decrease the queue lenght read frequency for SQL Server and PostgreSQL * Update to Postgres transport 8.1.4 * Update src/ServiceControl.Transports.PostgreSql/transport.manifest * fix from rebase * reapply change lost in rebase * reapply change lost in rebase --------- Co-authored-by: SzymonPobiega Co-authored-by: Phil Bastian Co-authored-by: Andreas Öhlund --- .github/workflows/ci.yml | 12 +- .../workflows/container-integration-test.yml | 5 + src/Directory.Packages.props | 6 +- .../InMemoryLicensingDataStore.cs | 10 +- src/ProjectReferences.Transports.props | 1 + .../StartupModeTests.cs | 2 +- .../Infrastructure/When_instance_is_setup.cs | 1 + src/ServiceControl.Audit/App.config | 4 + .../Auditing/AuditIngestor.cs | 32 ++-- .../Infrastructure/NServiceBusFactory.cs | 2 +- .../Infrastructure/Settings/Settings.cs | 4 +- .../AddMonitoringInstance/ConnectionString.cs | 2 +- .../AddInstance/ConnectionString.cs | 2 +- .../EditAuditInstance/ConnectionString.cs | 2 +- .../EditErrorInstance/ConnectionString.cs | 2 +- .../ConnectionString.cs | 2 +- .../AddAuditInstanceValidationTests.cs | 4 +- .../AddErrorInstanceValidationTests.cs | 4 +- .../AddMonitoringInstanceValidationTests.cs | 4 +- .../EditAuditInstanceValidationTests.cs | 4 +- .../EditErrorInstanceValidationTests.cs | 4 +- .../PerformanceTests.cs | 2 +- .../ServiceControlComponentRunner.cs | 3 +- src/ServiceControl.Monitoring/App.config | 5 + .../HostApplicationBuilderExtensions.cs | 2 +- .../ReportThroughputHostedService.cs | 19 ++- src/ServiceControl.Monitoring/Settings.cs | 6 +- .../Throughput/LicensingDataStore.cs | 19 ++- .../RetryStateTests.cs | 18 ++- .../Throughput/EndpointsTests.cs | 28 +++- .../.editorconfig | 4 + ...validConnectionStringSettings.approved.txt | 5 + ...stConnectionWithValidSettings.approved.txt | 4 + .../ConnectionStringExtensionsTests.cs | 48 ++++++ .../PostgreSqlQueryTests.cs | 119 +++++++++++++++ ...Control.Transports.PostgreSql.Tests.csproj | 32 ++++ .../TestsFilter.cs | 1 + .../TransportTestsConfiguration.cs | 31 ++++ .../.editorconfig | 4 + .../ConnectionStringExtensions.cs | 36 +++++ .../DatabaseDetails.cs | 142 ++++++++++++++++++ .../PostgreSqlNameHelper.cs | 37 +++++ .../PostgreSqlQuery.cs | 116 ++++++++++++++ .../PostgreSqlTable.cs | 51 +++++++ .../PostgreSqlTransportCustomization.cs | 86 +++++++++++ .../QueueAddress.cs | 50 ++++++ .../QueueLengthProvider.cs | 138 +++++++++++++++++ .../QueueTableName.cs | 16 ++ .../QueueTableSnapshot.cs | 6 + ...erviceControl.Transports.PostgreSql.csproj | 25 +++ .../transport.manifest | 16 ++ ...ionWithInvalidCatalogSettings.approved.txt | 3 +- .../SqlServerQueryTests.cs | 2 +- .../DatabaseDetails.cs | 4 +- .../NameHelper.cs | 15 +- .../QueueAddress.cs | 4 +- .../QueueLengthProvider.cs | 2 +- ...qlNameHelper.cs => SqlServerNameHelper.cs} | 21 ++- .../SqlServerQuery.cs | 3 - .../SqlTable.cs | 10 +- ...ovals.ServiceControlTransport.approved.txt | 6 + ...s_exist_in_specified_assembly.approved.txt | 1 + .../QueueIngestionTests.cs | 4 +- .../QueueLengthMonitoringTests.cs | 2 +- .../QueueProvisioningTests.cs | 8 +- .../TestDispatcherExtensions.cs | 5 +- .../TransportTestFixture.cs | 2 +- .../DevelopmentTransportLocations.cs | 1 + .../TransportCustomization.cs | 9 +- .../TransportManifest.cs | 4 + src/ServiceControl.sln | 66 +++++--- src/ServiceControl/App.config | 4 + .../Infrastructure/NServiceBusFactory.cs | 2 +- .../Infrastructure/Settings/Settings.cs | 2 +- .../Operations/ErrorIngestor.cs | 9 +- .../Infrastructure/ReturnToSenderDequeuer.cs | 2 +- .../APIApprovals.TransportNames.approved.txt | 5 + .../ServiceControlInstaller.Engine.csproj | 1 + .../Validation/ConnectionStringValidator.cs | 48 ++++++ .../DeploymentPackageTests.cs | 3 +- .../IncludeInPostgreSqlTestsAttribute.cs | 4 + src/container-integration-test/postgres.yml | 28 ++++ 82 files changed, 1331 insertions(+), 127 deletions(-) create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/.editorconfig create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithInvalidConnectionStringSettings.approved.txt create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithValidSettings.approved.txt create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/ConnectionStringExtensionsTests.cs create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/ServiceControl.Transports.PostgreSql.Tests.csproj create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/TestsFilter.cs create mode 100644 src/ServiceControl.Transports.PostgreSql.Tests/TransportTestsConfiguration.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/.editorconfig create mode 100644 src/ServiceControl.Transports.PostgreSql/ConnectionStringExtensions.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/PostgreSqlNameHelper.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/PostgreSqlTable.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/QueueAddress.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/QueueTableName.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs create mode 100644 src/ServiceControl.Transports.PostgreSql/ServiceControl.Transports.PostgreSql.csproj create mode 100644 src/ServiceControl.Transports.PostgreSql/transport.manifest rename src/ServiceControl.Transports.SqlServer/{SqlNameHelper.cs => SqlServerNameHelper.cs} (54%) create mode 100644 src/TestHelper/IncludeInPostgreSqlTestsAttribute.cs create mode 100644 src/container-integration-test/postgres.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bd27872e27..871957cf5e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: os: [windows-2022, ubuntu-22.04] - test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence ] + test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence, PostgreSQL ] include: - os: windows-2022 os-name: Windows @@ -65,7 +65,7 @@ jobs: run: Import-Module ./deploy/PowerShellModules/Particular.ServiceControl.Management - name: Azure login uses: azure/login@v2.2.0 - if: matrix.test-category == 'AzureServiceBus' || matrix.test-category == 'AzureStorageQueues' || matrix.test-category == 'RabbitMQ' + if: matrix.test-category == 'AzureServiceBus' || matrix.test-category == 'AzureStorageQueues' || matrix.test-category == 'RabbitMQ' || matrix.test-category == 'PostgreSQL' with: creds: ${{ secrets.AZURE_ACI_CREDENTIALS }} - name: Setup SQL Server @@ -74,6 +74,14 @@ jobs: with: connection-string-env-var: ServiceControl_TransportTests_SQL_ConnectionString catalog: nservicebus + - name: Setup PostgreSQL + uses: Particular/setup-postgres-action@v1.0.0 + if: matrix.test-category == 'PostgreSQL' + with: + connection-string-name: ServiceControl_TransportTests_PostgreSQL_ConnectionString + tag: ServiceControl + registry-username: ${{ secrets.DOCKERHUB_USERNAME }} + registry-password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Setup RabbitMQ uses: Particular/setup-rabbitmq-action@v1.7.0 if: matrix.test-category == 'RabbitMQ' diff --git a/.github/workflows/container-integration-test.yml b/.github/workflows/container-integration-test.yml index fe8584bda7..fb9a48fc30 100644 --- a/.github/workflows/container-integration-test.yml +++ b/.github/workflows/container-integration-test.yml @@ -22,6 +22,11 @@ jobs: connection-string: 'Server=mssql;Database=master;User=sa;Password=ServiceControl1!;Encrypt=False;' compose-cmd: docker compose -f servicecontrol.yml -f mssql.yml up -d expected-healthy-containers: 5 + - name: postgresql + transport: PostgreSQL + connection-string: 'Host=postgres;Port=5432;Database=postgres;User ID=postgres;Password=ServiceControl1!;' + compose-cmd: docker compose -f servicecontrol.yml -f postgres.yml up -d + expected-healthy-containers: 5 - name: asb transport: NetStandardAzureServiceBus compose-cmd: docker compose -f servicecontrol.yml up -d diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index fbd763ee3e..afd2d16d78 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -46,8 +46,10 @@ - - + + + + diff --git a/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs b/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs index b883fbac49..816dd605b2 100644 --- a/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs +++ b/src/Particular.LicensingComponent.Persistence.InMemory/InMemoryLicensingDataStore.cs @@ -117,7 +117,13 @@ public async Task UpdateUserIndicatorOnEndpoints(List userI //if there are multiple sources of throughput for the endpoint, update them all var existingEndpoints = GetAllConnectedEndpoints(e.Name); - existingEndpoints.ForEach(u => u.UserIndicator = e.UserIndicator); + existingEndpoints.ForEach(u => + { + u.UserIndicator = e.UserIndicator; + //for ones that matched on endpoint name, update matching on sanitizedName + var sanitizedMAtchingEndpoints = GetAllConnectedEndpoints(u.SanitizedName); + sanitizedMAtchingEndpoints.ForEach(s => s.UserIndicator = e.UserIndicator); + }); }); await Task.CompletedTask; @@ -138,7 +144,7 @@ public async Task IsThereThroughputForLastXDaysForSource(int days, Through endpointThroughput.Value.Any(t => t.Key >= DateOnly.FromDateTime(DateTime.UtcNow).AddDays(-days) && t.Key <= endDate))); } - List GetAllConnectedEndpoints(string name) => endpoints.Where(w => w.SanitizedName == name).ToList(); + List GetAllConnectedEndpoints(string name) => endpoints.Where(w => w.SanitizedName == name || w.Id.Name == name).ToList(); public Task GetBrokerMetadata(CancellationToken cancellationToken) => Task.FromResult(brokerMetadata); diff --git a/src/ProjectReferences.Transports.props b/src/ProjectReferences.Transports.props index 71ce167f9d..c82221e628 100644 --- a/src/ProjectReferences.Transports.props +++ b/src/ProjectReferences.Transports.props @@ -7,6 +7,7 @@ + diff --git a/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs b/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs index 872dce0634..6aef55b25e 100644 --- a/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs +++ b/src/ServiceControl.AcceptanceTests.RavenDB/StartupModeTests.cs @@ -24,11 +24,11 @@ public async Task InitializeSettings() var transportIntegration = new ConfigureEndpointLearningTransport(); settings = new Settings( + transportType: transportIntegration.TypeName, forwardErrorMessages: false, errorRetentionPeriod: TimeSpan.FromDays(1), persisterType: "RavenDB") { - TransportType = transportIntegration.TypeName, TransportConnectionString = transportIntegration.ConnectionString, AssemblyLoadContextResolver = static _ => AssemblyLoadContext.Default }; diff --git a/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs b/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs index 27aa531580..d77d5758a9 100644 --- a/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs +++ b/src/ServiceControl.Audit.UnitTests/Infrastructure/When_instance_is_setup.cs @@ -88,5 +88,6 @@ public Task CreateTransportInfrastructure(string name, OnError onError = null, Func onCriticalError = null, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) => throw new NotImplementedException(); + public string ToTransportQualifiedQueueName(string queueName) => queueName; } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/App.config b/src/ServiceControl.Audit/App.config index 7497b67bcd..1c6276815e 100644 --- a/src/ServiceControl.Audit/App.config +++ b/src/ServiceControl.Audit/App.config @@ -17,6 +17,7 @@ These settings are only here so that we can debug ServiceControl while developin + @@ -46,5 +47,8 @@ These settings are only here so that we can debug ServiceControl while developin + + + \ No newline at end of file diff --git a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs index 6474f28e84..84d110a1f2 100644 --- a/src/ServiceControl.Audit/Auditing/AuditIngestor.cs +++ b/src/ServiceControl.Audit/Auditing/AuditIngestor.cs @@ -15,11 +15,10 @@ using Recoverability; using SagaAudit; using ServiceControl.Infrastructure.Metrics; + using ServiceControl.Transports; public class AuditIngestor { - static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; - public AuditIngestor( Metrics metrics, Settings settings, @@ -27,7 +26,8 @@ public AuditIngestor( EndpointInstanceMonitoring endpointInstanceMonitoring, IEnumerable auditEnrichers, // allows extending message enrichers with custom enrichers registered in the DI container IMessageSession messageSession, - Lazy messageDispatcher + Lazy messageDispatcher, + ITransportCustomization transportCustomization ) { this.settings = settings; @@ -49,14 +49,16 @@ Lazy messageDispatcher new SagaRelationshipsEnricher() }.Concat(auditEnrichers).ToArray(); + logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue); + auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher); } public async Task Ingest(List contexts) { - if (log.IsDebugEnabled) + if (Log.IsDebugEnabled) { - log.Debug($"Ingesting {contexts.Count} message contexts"); + Log.Debug($"Ingesting {contexts.Count} message contexts"); } var stored = await auditPersister.Persist(contexts); @@ -65,14 +67,14 @@ public async Task Ingest(List contexts) { if (settings.ForwardAuditMessages) { - if (log.IsDebugEnabled) + if (Log.IsDebugEnabled) { - log.Debug($"Forwarding {stored.Count} messages"); + Log.Debug($"Forwarding {stored.Count} messages"); } - await Forward(stored, settings.AuditLogQueue); - if (log.IsDebugEnabled) + await Forward(stored, logQueueAddress); + if (Log.IsDebugEnabled) { - log.Debug("Forwarded messages"); + Log.Debug("Forwarded messages"); } } @@ -83,9 +85,9 @@ public async Task Ingest(List contexts) } catch (Exception e) { - if (log.IsWarnEnabled) + if (Log.IsWarnEnabled) { - log.Warn("Forwarding messages failed", e); + Log.Warn("Forwarding messages failed", e); } // making sure to rethrow so that all messages get marked as failed @@ -140,7 +142,7 @@ public async Task VerifyCanReachForwardingAddress() new TransportOperation( new OutgoingMessage(Guid.Empty.ToString("N"), [], Array.Empty()), - new UnicastAddressTag(settings.AuditLogQueue) + new UnicastAddressTag(logQueueAddress) ) ); @@ -155,7 +157,9 @@ public async Task VerifyCanReachForwardingAddress() readonly AuditPersister auditPersister; readonly Settings settings; readonly Lazy messageDispatcher; + readonly string logQueueAddress; - static readonly ILog log = LogManager.GetLogger(); + static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000; + static readonly ILog Log = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs index 513ac8486f..869fc03cda 100644 --- a/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs +++ b/src/ServiceControl.Audit/Infrastructure/NServiceBusFactory.cs @@ -44,7 +44,7 @@ public static void Configure(Settings.Settings settings, ITransportCustomization routing.RouteToEndpoint(typeof(RegisterNewEndpoint), serviceControlLogicalQueue); routing.RouteToEndpoint(typeof(MarkMessageFailureResolvedByRetry), serviceControlLogicalQueue); - configuration.ReportCustomChecksTo(settings.ServiceControlQueueAddress); + configuration.ReportCustomChecksTo(transportCustomization.ToTransportQualifiedQueueName(settings.ServiceControlQueueAddress)); } configuration.GetSettings().Set(settings.LoggingSettings); diff --git a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs index be61d7b028..c14bc525b5 100644 --- a/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs @@ -43,9 +43,9 @@ public Settings(string transportType = null, string persisterType = null, Loggin { Hostname = SettingsReader.Read(SettingsRootNamespace, "Hostname", "localhost"); Port = SettingsReader.Read(SettingsRootNamespace, "Port", 44444); - } + }; - MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", 32); + MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultAuditMaximumConcurrencyLevel ?? 32); ServiceControlQueueAddress = SettingsReader.Read(SettingsRootNamespace, "ServiceControlQueueAddress"); TimeToRestartAuditIngestionAfterFailure = GetTimeToRestartAuditIngestionAfterFailure(); EnableFullTextSearchOnBodies = SettingsReader.Read(SettingsRootNamespace, "EnableFullTextSearchOnBodies", true); diff --git a/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs index 72a5b27129..fcb0e36b9c 100644 --- a/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs +++ b/src/ServiceControl.Config.Tests/AddInstance/AddMonitoringInstance/ConnectionString.cs @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName) Assert.That(viewModel.SampleConnectionString, Is.Not.Empty); }); - if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue") + if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL") { Assert.That(viewModel.TransportWarning, Is.Not.Null); Assert.That(viewModel.TransportWarning, Is.Not.Empty); diff --git a/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs index 7045fcc9d5..5acd682d02 100644 --- a/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs +++ b/src/ServiceControl.Config.Tests/AddInstance/ConnectionString.cs @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName) Assert.That(viewModel.SampleConnectionString, Is.Not.Empty); }); - if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue") + if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL") { Assert.That(viewModel.TransportWarning, Is.Not.Null); Assert.That(viewModel.TransportWarning, Is.Not.Empty); diff --git a/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs index 1c5e337f01..ff6e2e98cf 100644 --- a/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs +++ b/src/ServiceControl.Config.Tests/EditInstance/EditAuditInstance/ConnectionString.cs @@ -68,7 +68,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName) Assert.That(viewModel.SampleConnectionString, Is.Not.Empty); }); - if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue") + if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL") { Assert.That(viewModel.TransportWarning, Is.Not.Null); Assert.That(viewModel.TransportWarning, Is.Not.Empty); diff --git a/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs index 137aba3348..12830edf11 100644 --- a/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs +++ b/src/ServiceControl.Config.Tests/EditInstance/EditErrorInstance/ConnectionString.cs @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName) Assert.That(viewModel.SampleConnectionString, Is.Not.Empty); }); - if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue") + if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL") { Assert.That(viewModel.TransportWarning, Is.Not.Null); Assert.That(viewModel.TransportWarning, Is.Not.Empty); diff --git a/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs b/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs index 629a0d2c15..af2e19a9ef 100644 --- a/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs +++ b/src/ServiceControl.Config.Tests/EditInstance/EditMonitoringInstance/ConnectionString.cs @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName) Assert.That(viewModel.SampleConnectionString, Is.Not.Empty); }); - if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue") + if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL") { Assert.That(viewModel.TransportWarning, Is.Not.Null); Assert.That(viewModel.TransportWarning, Is.Not.Empty); diff --git a/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs index d60c32a5f0..d76f3400fc 100644 --- a/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs +++ b/src/ServiceControl.Config.Tests/Validation/AddAuditInstanceValidationTests.cs @@ -108,7 +108,7 @@ public void Transport_cannot_be_empty_when_adding_audit_instance() } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_audit_instance(string transportInfoName) { var viewModel = new ServiceControlAddViewModel @@ -126,7 +126,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str Assert.That(errors, Is.Not.Empty); } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_audit_instance( string transportInfoName) diff --git a/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs index 362ac50802..534ba7f705 100644 --- a/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs +++ b/src/ServiceControl.Config.Tests/Validation/AddErrorInstanceValidationTests.cs @@ -107,7 +107,7 @@ public void Transport_cannot_be_empty_when_adding_error_instance() Assert.That(errors, Is.Not.Empty); } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_error_instance( string transportInfoName) { @@ -127,7 +127,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str Assert.That(errors, Is.Not.Empty); } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_error_instance( string transportInfoName) { diff --git a/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs index afc6761b0d..42ccf0e6dd 100644 --- a/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs +++ b/src/ServiceControl.Config.Tests/Validation/AddMonitoringInstanceValidationTests.cs @@ -366,7 +366,7 @@ public void Transport_cannot_be_empty_when_adding_monitoring_instance() } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_monitoring_instance(string transportInfoName) { @@ -384,7 +384,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str Assert.That(errors, Is.Not.Empty); } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_monitoring_instance( string transportInfoName) { diff --git a/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs index 3e5a55ede7..73abc4d6d2 100644 --- a/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs +++ b/src/ServiceControl.Config.Tests/Validation/EditAuditInstanceValidationTests.cs @@ -10,7 +10,7 @@ public class EditAuditInstanceValidationTests { #region transport - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_editing_audit_instance( string transportInfoName) { @@ -28,7 +28,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str Assert.That(errors, Is.Not.Empty); } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_editing_audit_instance( string transportInfoName) { diff --git a/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs b/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs index 63bfb30202..a1e744191a 100644 --- a/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs +++ b/src/ServiceControl.Config.Tests/Validation/EditErrorInstanceValidationTests.cs @@ -10,7 +10,7 @@ public class EditErrorInstanceValidationTests { #region transport - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_editing_error_instance( string transportInfoName) { @@ -28,7 +28,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str Assert.That(errors, Is.Not.Empty); } - [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")] + [TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")] public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_editing_error_instance( string transportInfoName) { diff --git a/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs b/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs index 6ab42fd97e..59ca54e8bd 100644 --- a/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs +++ b/src/ServiceControl.Monitoring.AcceptanceTests/PerformanceTests.cs @@ -26,7 +26,7 @@ class PerformanceTests : AcceptanceTest retriesStore = new RetriesStore(); queueLengthStore = new QueueLengthStore(); - var settings = new Settings { EndpointUptimeGracePeriod = TimeSpan.FromMinutes(5) }; + var settings = new Settings(transportType: "Unknown") { EndpointUptimeGracePeriod = TimeSpan.FromMinutes(5) }; activityTracker = new EndpointInstanceActivityTracker(settings, TimeProvider.System); messageTypeRegistry = new MessageTypeRegistry(); diff --git a/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs b/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs index 2537e08268..debc3ed629 100644 --- a/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs +++ b/src/ServiceControl.Monitoring.AcceptanceTests/TestSupport/ServiceControlComponentRunner.cs @@ -33,9 +33,8 @@ class ServiceControlComponentRunner( async Task InitializeServiceControl(ScenarioContext context) { - settings = new Settings + settings = new Settings(transportType: transportToUse.TypeName) { - TransportType = transportToUse.TypeName, ConnectionString = transportToUse.ConnectionString, HttpHostName = "localhost", OnMessage = (id, headers, body, @continue) => diff --git a/src/ServiceControl.Monitoring/App.config b/src/ServiceControl.Monitoring/App.config index fad7edd774..577b757748 100644 --- a/src/ServiceControl.Monitoring/App.config +++ b/src/ServiceControl.Monitoring/App.config @@ -17,6 +17,8 @@ These settings are only here so that we can debug ServiceControl while developin + + @@ -42,5 +44,8 @@ These settings are only here so that we can debug ServiceControl while developin + + + \ No newline at end of file diff --git a/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs b/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs index b338a90e03..dc14169d15 100644 --- a/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs +++ b/src/ServiceControl.Monitoring/HostApplicationBuilderExtensions.cs @@ -98,7 +98,7 @@ static void ConfigureEndpoint(EndpointConfiguration config, Func c.NumberOfRetries(3)); recoverability.Delayed(c => c.NumberOfRetries(0)); - config.SendFailedMessagesTo(settings.ErrorQueue); + config.SendFailedMessagesTo(transportCustomization.ToTransportQualifiedQueueName(settings.ErrorQueue)); config.DisableFeature(); diff --git a/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs b/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs index fe15116ac9..70606ed4c0 100644 --- a/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs +++ b/src/ServiceControl.Monitoring/Infrastructure/ReportThroughputHostedService.cs @@ -9,13 +9,16 @@ using NServiceBus.Metrics; using NServiceBus.Unicast.Queuing; using ServiceControl.Monitoring.Infrastructure.Api; + using ServiceControl.Transports; - class ReportThroughputHostedService(ILogger logger, IMessageSession session, IEndpointMetricsApi endpointMetricsApi, Settings settings, TimeProvider timeProvider) : BackgroundService + class ReportThroughputHostedService(ILogger logger, IMessageSession session, IEndpointMetricsApi endpointMetricsApi, Settings settings, TimeProvider timeProvider, ITransportCustomization transportCustomization) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken cancellationToken) { logger.LogInformation($"Starting {nameof(ReportThroughputHostedService)}"); + var serviceControlThroughputDataQueue = transportCustomization.ToTransportQualifiedQueueName(settings.ServiceControlThroughputDataQueue); + try { using PeriodicTimer timer = new(TimeSpan.FromMinutes(ReportSendingIntervalInMinutes), timeProvider); @@ -24,7 +27,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) { try { - await ReportOnThroughput(cancellationToken); + await ReportOnThroughput(serviceControlThroughputDataQueue, cancellationToken); } catch (Exception ex) when (ex is not OperationCanceledException) { @@ -45,7 +48,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken) } } - async Task ReportOnThroughput(CancellationToken cancellationToken) + async Task ReportOnThroughput(string serviceControlThroughputDataQueue, CancellationToken cancellationToken) { var endpointData = endpointMetricsApi.GetAllEndpointsMetrics(ReportSendingIntervalInMinutes); @@ -61,13 +64,17 @@ async Task ReportOnThroughput(CancellationToken cancellationToken) for (int i = 0; i < endpointData.Length; i++) { var average = endpointData[i].Metrics["Throughput"]?.Average ?? 0; - throughputData.EndpointThroughputData[i] = new EndpointThroughputData { Name = endpointData[i].Name, Throughput = Convert.ToInt64(average * ReportSendingIntervalInMinutes * 60) }; + throughputData.EndpointThroughputData[i] = new EndpointThroughputData + { + Name = endpointData[i].Name, + Throughput = Convert.ToInt64(average * ReportSendingIntervalInMinutes * 60) + }; } - await session.Send(settings.ServiceControlThroughputDataQueue, throughputData, cancellationToken); + await session.Send(serviceControlThroughputDataQueue, throughputData, cancellationToken); } } - static int ReportSendingIntervalInMinutes = 5; + const int ReportSendingIntervalInMinutes = 5; } } \ No newline at end of file diff --git a/src/ServiceControl.Monitoring/Settings.cs b/src/ServiceControl.Monitoring/Settings.cs index 8084e43d55..a9fe9a626c 100644 --- a/src/ServiceControl.Monitoring/Settings.cs +++ b/src/ServiceControl.Monitoring/Settings.cs @@ -12,14 +12,14 @@ namespace ServiceControl.Monitoring public class Settings { - public Settings(LoggingSettings loggingSettings = null) + public Settings(LoggingSettings loggingSettings = null, string transportType = null) { LoggingSettings = loggingSettings ?? new(SettingsRootNamespace); // Overwrite the instance name if it is specified in ENVVAR, reg, or config file InstanceName = SettingsReader.Read(SettingsRootNamespace, "InstanceName", InstanceName); - TransportType = SettingsReader.Read(SettingsRootNamespace, "TransportType"); + TransportType = SettingsReader.Read(SettingsRootNamespace, "TransportType", transportType); ConnectionString = GetConnectionString(); ErrorQueue = SettingsReader.Read(SettingsRootNamespace, "ErrorQueue", "error"); @@ -37,7 +37,7 @@ public Settings(LoggingSettings loggingSettings = null) } EndpointUptimeGracePeriod = TimeSpan.Parse(SettingsReader.Read(SettingsRootNamespace, "EndpointUptimeGracePeriod", "00:00:40")); - MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", 32); + MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultMonitoringMaximumConcurrencyLevel ?? 32); ServiceControlThroughputDataQueue = SettingsReader.Read(SettingsRootNamespace, "ServiceControlThroughputDataQueue", "ServiceControl.ThroughputData"); AssemblyLoadContextResolver = static assemblyPath => new PluginAssemblyLoadContext(assemblyPath); diff --git a/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs b/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs index d39893fabd..005c9f1c09 100644 --- a/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs +++ b/src/ServiceControl.Persistence.RavenDB/Throughput/LicensingDataStore.cs @@ -186,14 +186,27 @@ public async Task UpdateUserIndicatorOnEndpoints(List userI using IAsyncDocumentSession session = store.OpenAsyncSession(databaseConfiguration.Name); var query = session.Query() - .Where(document => document.SanitizedName.In(updates.Keys)); + .Where(document => document.SanitizedName.In(updates.Keys) || document.EndpointId.Name.In(updates.Keys)); var documents = await query.ToListAsync(cancellationToken); foreach (var document in documents) { - if (updates.TryGetValue(document.SanitizedName, out var newValue)) + if (updates.TryGetValue(document.SanitizedName, out var newValueFromSanitizedName)) { - document.UserIndicator = newValue; + document.UserIndicator = newValueFromSanitizedName; + } + else if (updates.TryGetValue(document.EndpointId.Name, out var newValueFromEndpoint)) + { + document.UserIndicator = newValueFromEndpoint; + //update all that match this sanitized name + var sanitizedMatchingQuery = session.Query() + .Where(sanitizedDocument => sanitizedDocument.SanitizedName == document.SanitizedName && sanitizedDocument.EndpointId.Name != document.EndpointId.Name); + var sanitizedMatchingDocuments = await sanitizedMatchingQuery.ToListAsync(cancellationToken); + + foreach (var matchingDocumentOnSanitizedName in sanitizedMatchingDocuments) + { + matchingDocumentOnSanitizedName.UserIndicator = newValueFromEndpoint; + } } } diff --git a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs index 664e19c33c..dcfec5888b 100644 --- a/src/ServiceControl.Persistence.Tests/RetryStateTests.cs +++ b/src/ServiceControl.Persistence.Tests/RetryStateTests.cs @@ -1,9 +1,11 @@ namespace ServiceControl.Persistence.Tests { using System; + using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; + using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NServiceBus.Transport; using NUnit.Framework; @@ -14,6 +16,7 @@ using ServiceControl.MessageFailures; using ServiceControl.Persistence; using ServiceControl.Recoverability; + using ServiceControl.Transports; [NonParallelizable] class RetryStateTests : PersistenceTestBase @@ -279,7 +282,7 @@ class FakeApplicationLifetime : IHostApplicationLifetime class TestReturnToSenderDequeuer : ReturnToSenderDequeuer { public TestReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore store, IDomainEvents domainEvents, string endpointName) - : base(returnToSender, store, domainEvents, null, null, new Settings { InstanceName = endpointName }) + : base(returnToSender, store, domainEvents, new TestTransportCustomization(), null, new Settings { InstanceName = endpointName }) { } @@ -289,6 +292,19 @@ public override Task Run(string forwardingBatchId, Predicate fil } } + public class TestTransportCustomization : ITransportCustomization + { + public void AddTransportForAudit(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException(); + public void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException(); + public void AddTransportForPrimary(IServiceCollection services, TransportSettings transportSettings) => throw new NotImplementedException(); + public Task CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, Func onCriticalError = null, NServiceBus.TransportTransactionMode preferredTransactionMode = NServiceBus.TransportTransactionMode.ReceiveOnly) => throw new NotImplementedException(); + public void CustomizeAuditEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException(); + public void CustomizeMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException(); + public void CustomizePrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TransportSettings transportSettings) => throw new NotImplementedException(); + public Task ProvisionQueues(TransportSettings transportSettings, IEnumerable additionalQueues) => throw new NotImplementedException(); + public string ToTransportQualifiedQueueName(string queueName) => queueName; + } + public class TestSender : IMessageDispatcher { public Action Callback { get; set; } = m => { }; diff --git a/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs b/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs index 98168c3095..f677b03e8c 100644 --- a/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs +++ b/src/ServiceControl.Persistence.Tests/Throughput/EndpointsTests.cs @@ -148,7 +148,7 @@ public async Task Should_not_add_endpoint_when_updating_user_indication() } [Test] - public async Task Should_update_indicators_on_all_endpoint_sources() + public async Task Should_update_indicators_on_all_endpoint_sources_when_updated_based_on_sanitized_name() { // Arrange var userIndicator = "someIndicator"; @@ -173,6 +173,32 @@ public async Task Should_update_indicators_on_all_endpoint_sources() Assert.That(foundEndpointMonitoring.UserIndicator, Is.EqualTo(userIndicator)); } + [Test] + public async Task Should_update_indicators_on_all_endpoint_sources_when_updated_based_on_endpoint_name() + { + // Arrange + var userIndicator = "someIndicator"; + + var endpointAudit = new Endpoint("Endpoint1", ThroughputSource.Audit) { SanitizedName = "Endpoint1" }; + var endpointMonitoring = new Endpoint("\"public\".\"Endpoint1\"", ThroughputSource.Monitoring) { SanitizedName = "Endpoint1" }; + + await LicensingDataStore.SaveEndpoint(endpointAudit, default); + await LicensingDataStore.SaveEndpoint(endpointMonitoring, default); + + // Act + await LicensingDataStore.UpdateUserIndicatorOnEndpoints([new UpdateUserIndicator { Name = "\"public\".\"Endpoint1\"", UserIndicator = userIndicator }], default); + + // Assert + var foundEndpointAudit = await LicensingDataStore.GetEndpoint("Endpoint1", ThroughputSource.Audit, default); + var foundEndpointMonitoring = await LicensingDataStore.GetEndpoint("\"public\".\"Endpoint1\"", ThroughputSource.Monitoring, default); + + Assert.That(foundEndpointAudit, Is.Not.Null); + Assert.That(foundEndpointAudit.UserIndicator, Is.EqualTo(userIndicator)); + + Assert.That(foundEndpointMonitoring, Is.Not.Null); + Assert.That(foundEndpointMonitoring.UserIndicator, Is.EqualTo(userIndicator)); + } + [TestCase(10, 5, false)] [TestCase(10, 20, true)] public async Task Should_correctly_report_throughput_existence_for_X_days(int daysSinceLastThroughputEntry, int timeFrameToCheck, bool expectedValue) diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/.editorconfig b/src/ServiceControl.Transports.PostgreSql.Tests/.editorconfig new file mode 100644 index 0000000000..0279bdc2db --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/.editorconfig @@ -0,0 +1,4 @@ +[*.cs] + +# Justification: Test project +dotnet_diagnostic.CA2007.severity = none \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithInvalidConnectionStringSettings.approved.txt b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithInvalidConnectionStringSettings.approved.txt new file mode 100644 index 0000000000..ef7ee5cd4e --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithInvalidConnectionStringSettings.approved.txt @@ -0,0 +1,5 @@ +Connection settings to PostgreSql have some errors: +PostgreSQL Connection String could not be parsed. + +Connection attempted with the following settings: +ConnectionString set diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithValidSettings.approved.txt b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithValidSettings.approved.txt new file mode 100644 index 0000000000..88acd38c33 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/ApprovalFiles/PostgreSqlQueryTests.TestConnectionWithValidSettings.approved.txt @@ -0,0 +1,4 @@ +Connection test to PostgreSql was successful + +Connection settings used: +ConnectionString set diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ConnectionStringExtensionsTests.cs b/src/ServiceControl.Transports.PostgreSql.Tests/ConnectionStringExtensionsTests.cs new file mode 100644 index 0000000000..f736f184bf --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/ConnectionStringExtensionsTests.cs @@ -0,0 +1,48 @@ +namespace ServiceControl.Transport.Tests; + +using Transports.PostgreSql; +using NUnit.Framework; + +[TestFixture] +class ConnectionStringExtensionsTests : TransportTestFixture +{ + [TestCase("table")] + [TestCase("schema.table")] + [TestCase("schema.my.table")] + public void ShouldParseSchemaFromSubscriptionTable(string customSubscriptionTableContainingSchema) + { + string connectionString = $"{configuration.ConnectionString};Subscriptions Table={customSubscriptionTableContainingSchema}"; + + _ = connectionString.RemoveCustomConnectionStringParts(out var _, out var subscriptionsTableSetting); + var subscriptionsAddress = QueueAddress.Parse(subscriptionsTableSetting); + + Assert.That(subscriptionsAddress.Table, Is.Not.Null); + + if (customSubscriptionTableContainingSchema.Contains(".")) + { + Assert.That(subscriptionsAddress.Schema, Is.Not.Null); + Assert.That(subscriptionsAddress.Table, Is.EqualTo(customSubscriptionTableContainingSchema.Substring(customSubscriptionTableContainingSchema.IndexOf(".") + 1))); + Assert.That(subscriptionsAddress.Schema, Is.EqualTo(customSubscriptionTableContainingSchema.Substring(0, customSubscriptionTableContainingSchema.IndexOf(".")))); + } + else + { + Assert.That(subscriptionsAddress.Schema, Is.Null); + Assert.That(subscriptionsAddress.Table, Is.EqualTo(customSubscriptionTableContainingSchema)); + } + } + + [TestCase("\"table\"")] + [TestCase("\"schema.table\"")] + [TestCase("\"schema.my.table\"")] + public void ShouldParseOnlyTableFromSubscriptionTableWhenEnclosedInQuotes(string customSubscriptionTableWithoutSchema) + { + string connectionString = $"{configuration.ConnectionString};Subscriptions Table={customSubscriptionTableWithoutSchema}"; + + _ = connectionString.RemoveCustomConnectionStringParts(out var _, out var subscriptionsTableSetting); + var subscriptionsAddress = QueueAddress.Parse(subscriptionsTableSetting); + + Assert.That(subscriptionsAddress.Schema, Is.Null); + Assert.That(subscriptionsAddress.Table, Is.Not.Null); + Assert.That(subscriptionsAddress.Table, Is.EqualTo(PostgreSqlNameHelper.Unquote(customSubscriptionTableWithoutSchema))); + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs b/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs new file mode 100644 index 0000000000..bee96d7921 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/PostgreSqlQueryTests.cs @@ -0,0 +1,119 @@ +namespace ServiceControl.Transport.Tests; + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Time.Testing; +using NUnit.Framework; +using Particular.Approvals; +using Transports; +using Transports.PostgreSql; +using Transports.BrokerThroughput; + +[TestFixture] +class PostgreSqlQueryTests : TransportTestFixture +{ + FakeTimeProvider provider; + TransportSettings transportSettings; + PostgreSqlQuery query; + + [SetUp] + public void Initialise() + { + provider = new(); + provider.SetUtcNow(DateTimeOffset.UtcNow); + transportSettings = new TransportSettings + { + ConnectionString = configuration.ConnectionString, + MaxConcurrency = 1, + EndpointName = Guid.NewGuid().ToString("N") + }; + query = new PostgreSqlQuery(NullLogger.Instance, provider, transportSettings); + } + + [Test] + public async Task TestConnectionWithInvalidConnectionStringSettings() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var dictionary = new Dictionary + { + { PostgreSqlQuery.PostgreSqlSettings.ConnectionString, "not valid" } + }; + query.Initialize(new ReadOnlyDictionary(dictionary)); + (bool success, List errors, string diagnostics) = + await query.TestConnection(cancellationTokenSource.Token); + + Assert.That(success, Is.False); + Assert.That(errors.Single(), Is.EqualTo("PostgreSQL Connection String could not be parsed.")); + Approver.Verify(diagnostics); + } + + [Test] + public async Task TestConnectionWithValidSettings() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var dictionary = new Dictionary + { + { PostgreSqlQuery.PostgreSqlSettings.ConnectionString, configuration.ConnectionString } + }; + query.Initialize(new ReadOnlyDictionary(dictionary)); + (bool success, _, string diagnostics) = await query.TestConnection(cancellationTokenSource.Token); + + Assert.That(success, Is.True); + Approver.Verify(diagnostics); + } + + [Test] + public async Task RunScenario() + { + // We need to wait a bit of time, because the scenario running takes on average 1 sec per run. + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(50)); + CancellationToken token = cancellationTokenSource.Token; + var dictionary = new Dictionary + { + { PostgreSqlQuery.PostgreSqlSettings.ConnectionString, configuration.ConnectionString } + }; + + await CreateTestQueue(transportSettings.EndpointName); + + query.Initialize(new ReadOnlyDictionary(dictionary)); + + var queueNames = new List(); + await foreach (IBrokerQueue queueName in query.GetQueueNames(token)) + { + queueNames.Add(queueName); + } + + IBrokerQueue queue = queueNames.Find(name => ((BrokerQueueTable)name).SanitizedName == transportSettings.EndpointName); + Assert.That(queue, Is.Not.Null); + + long total = 0L; + using var reset = new ManualResetEventSlim(); + + var runScenarioAndAdvanceTime = Task.Run(async () => + { + while (!reset.IsSet) + { + await SendAndReceiveMessages(transportSettings.EndpointName, 1); + provider.Advance(TimeSpan.FromHours(1)); + } + }, token); + + await foreach (QueueThroughput queueThroughput in query.GetThroughputPerDay(queue, new DateOnly(), token)) + { + total += queueThroughput.TotalThroughput; + } + + reset.Set(); + await runScenarioAndAdvanceTime.WaitAsync(token); + + // Asserting that we have one message per hour during 24 hours, the first snapshot is not counted hence the 23 assertion. + Assert.That(total, Is.EqualTo(23)); + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/ServiceControl.Transports.PostgreSql.Tests.csproj b/src/ServiceControl.Transports.PostgreSql.Tests/ServiceControl.Transports.PostgreSql.Tests.csproj new file mode 100644 index 0000000000..7a3acb266d --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/ServiceControl.Transports.PostgreSql.Tests.csproj @@ -0,0 +1,32 @@ + + + + net8.0 + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/TestsFilter.cs b/src/ServiceControl.Transports.PostgreSql.Tests/TestsFilter.cs new file mode 100644 index 0000000000..aaa9ec7659 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/TestsFilter.cs @@ -0,0 +1 @@ +[assembly: IncludeInPostgreSqlTests()] \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql.Tests/TransportTestsConfiguration.cs b/src/ServiceControl.Transports.PostgreSql.Tests/TransportTestsConfiguration.cs new file mode 100644 index 0000000000..819248c612 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql.Tests/TransportTestsConfiguration.cs @@ -0,0 +1,31 @@ +namespace ServiceControl.Transport.Tests +{ + using System; + using System.Threading.Tasks; + using ServiceControl.Transports.PostgreSql; + using Transports; + + partial class TransportTestsConfiguration + { + public string ConnectionString { get; private set; } + + public ITransportCustomization TransportCustomization { get; private set; } + + public Task Configure() + { + TransportCustomization = new PostgreSqlTransportCustomization(); + ConnectionString = Environment.GetEnvironmentVariable(ConnectionStringKey); + + if (string.IsNullOrEmpty(ConnectionString)) + { + throw new Exception($"Environment variable {ConnectionStringKey} is required for PostgreSQL transport tests to run"); + } + + return Task.CompletedTask; + } + + public Task Cleanup() => Task.CompletedTask; + + const string ConnectionStringKey = "ServiceControl_TransportTests_PostgreSQL_ConnectionString"; + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/.editorconfig b/src/ServiceControl.Transports.PostgreSql/.editorconfig new file mode 100644 index 0000000000..ff993b49bb --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/.editorconfig @@ -0,0 +1,4 @@ +[*.cs] + +# Justification: ServiceControl app has no synchronization context +dotnet_diagnostic.CA2007.severity = none diff --git a/src/ServiceControl.Transports.PostgreSql/ConnectionStringExtensions.cs b/src/ServiceControl.Transports.PostgreSql/ConnectionStringExtensions.cs new file mode 100644 index 0000000000..07df2285ae --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/ConnectionStringExtensions.cs @@ -0,0 +1,36 @@ +namespace ServiceControl.Transports.PostgreSql; + +using System.Data.Common; + +public static class ConnectionStringExtensions +{ + public static string RemoveCustomConnectionStringParts(this string connectionString, out string schema, out string subscriptionTable) => + connectionString + .RemoveCustomConnectionStringPart(SubscriptionsTableName, out subscriptionTable) + .RemoveCustomConnectionStringPart(QueueSchemaName, out schema); + + static string RemoveCustomConnectionStringPart(this string connectionString, string partName, out string partValue) + { + var builder = new DbConnectionStringBuilder + { + ConnectionString = connectionString + }; + + if (builder.TryGetValue(partName, out var customPartValue)) + { + builder.Remove(partName); + } + + partValue = (string)customPartValue; + + if (partValue != null && connectionString.Contains(PostgreSqlNameHelper.Quote(partValue))) + { + partValue = PostgreSqlNameHelper.Quote(partValue); + } + + return builder.ConnectionString; + } + + const string QueueSchemaName = "Queue Schema"; + const string SubscriptionsTableName = "Subscriptions Table"; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs b/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs new file mode 100644 index 0000000000..556aba5a43 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/DatabaseDetails.cs @@ -0,0 +1,142 @@ +namespace ServiceControl.Transports.PostgreSql; + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Npgsql; + +public class DatabaseDetails +{ + readonly string connectionString; + + public string DatabaseName { get; } + + public DatabaseDetails(string connectionString) + { + try + { + var builder = new NpgsqlConnectionStringBuilder + { + ConnectionString = connectionString + }; + DatabaseName = builder.Database; + this.connectionString = builder.ToString(); + } + catch (Exception ex) when (ex is FormatException or ArgumentException) + { + throw new Exception("PostgreSQL Connection String could not be parsed.", ex); + } + } + + public async Task TestConnection(CancellationToken cancellationToken) + { + try + { + return await GetPostgreSqlVersion(cancellationToken); + } + catch (NpgsqlException ex) when (IsConnectionOrLoginIssue(ex)) + { + throw new Exception($"Could not connect to '{DatabaseName}' PostgreSQL database.", ex); + } + } + + static bool IsConnectionOrLoginIssue(NpgsqlException x) + { + // Reference is here: https://www.postgresql.org/docs/current/errcodes-appendix.html + + return x.SqlState switch + { + //28000 invalid_authorization_specification + //28P01 invalid_password + "28000" or "28P01" => true, + + //08000 connection_exception + //08003 connection_does_not_exist + //08006 connection_failure + //08001 sqlclient_unable_to_establish_sqlconnection + //08004 sqlserver_rejected_establishment_of_sqlconnection + //08007 transaction_resolution_unknown + //08P01 protocol_violation + "08000" or "08003" or "08006" or "08001" or "08004" or "08007" or "08P01" => true, + + // Everything else + _ => false + }; + } + + async Task GetPostgreSqlVersion(CancellationToken cancellationToken) + { + await using var conn = await OpenConnectionAsync(cancellationToken); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT version()"; + + return (string)await cmd.ExecuteScalarAsync(cancellationToken); + } + + public async Task> GetTables(CancellationToken cancellationToken = default) + { + List tables = []; + + await using var conn = await OpenConnectionAsync(cancellationToken); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = GetQueueListCommandText; + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); + while (await reader.ReadAsync(cancellationToken)) + { + var schema = reader.GetString(0); + var name = reader.GetString(1); + tables.Add(new BrokerQueueTable(this, new QueueAddress(name, schema))); + } + + return tables; + } + + public async Task GetSnapshot(BrokerQueueTable brokerQueueTable, + CancellationToken cancellationToken = default) + { + var table = new BrokerQueueTableSnapshot(brokerQueueTable); + + await using var conn = await OpenConnectionAsync(cancellationToken); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = $"select last_value from \"{table.SequenceName}\";"; + var value = await cmd.ExecuteScalarAsync(cancellationToken); + + if (value is long longValue) + { + table.RowVersion = longValue; + } + + return table; + } + + async Task OpenConnectionAsync(CancellationToken cancellationToken = default) + { + var conn = new NpgsqlConnection(connectionString); + await conn.OpenAsync(cancellationToken); + return conn; + } + + + /// + /// Query works by finidng all the columns in any table that *could* be from an NServiceBus + /// queue table, grouping by schema+name, and then using the HAVING COUNT(*) = 5 clause + /// to ensure that all 5 columns are represented. Delay tables, for example, will match + /// on 3 of the columns (Headers, Body, RowVersion) and many user tables might have an + /// Id column, but the HAVING clause filters these out. + /// /// + const string GetQueueListCommandText = @" +SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; + +SELECT C.TABLE_SCHEMA as TableSchema, C.TABLE_NAME as TableName +FROM information_schema.columns C +WHERE + (C.COLUMN_NAME = 'id' AND C.DATA_TYPE = 'uuid') OR + (C.COLUMN_NAME = 'expires' AND C.DATA_TYPE = 'timestamp without time zone') OR + (C.COLUMN_NAME = 'headers' AND C.DATA_TYPE = 'text') OR + (C.COLUMN_NAME = 'body' AND C.DATA_TYPE = 'bytea') OR + (C.COLUMN_NAME = 'seq' AND C.DATA_TYPE = 'integer') +GROUP BY C.TABLE_SCHEMA, C.TABLE_NAME +HAVING COUNT(*) = 5 +"; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlNameHelper.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlNameHelper.cs new file mode 100644 index 0000000000..3d1ca0023a --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlNameHelper.cs @@ -0,0 +1,37 @@ +namespace ServiceControl.Transports.PostgreSql; + +// NOTE: Copied from the SQL Transport + +public static class PostgreSqlNameHelper +{ + const string Delimiter = "\""; + static readonly string EscapedDelimiter = Delimiter + Delimiter; + + public static string Quote(string unquotedName) + { + if (unquotedName == null) + { + return null; + } + //Quotes are escaped by using double quotes + return Delimiter + unquotedName.Replace(Delimiter, EscapedDelimiter) + Delimiter; + } + + public static string Unquote(string quotedString) + { + if (quotedString == null) + { + return null; + } + + if (!quotedString.StartsWith(Delimiter) || !quotedString.EndsWith(Delimiter)) + { + //Already unquoted + return quotedString; + } + + return quotedString + .Substring(Delimiter.Length, quotedString.Length - (2 * Delimiter.Length)) + .Replace(EscapedDelimiter, Delimiter); + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs new file mode 100644 index 0000000000..12ed41546b --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlQuery.cs @@ -0,0 +1,116 @@ +#nullable enable +namespace ServiceControl.Transports.PostgreSql; + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using BrokerThroughput; +using Microsoft.Extensions.Logging; + +public class PostgreSqlQuery( + ILogger logger, + TimeProvider timeProvider, + TransportSettings transportSettings) : BrokerThroughputQuery(logger, "PostgreSql") +{ + readonly List databases = []; + + protected override void InitializeCore(ReadOnlyDictionary settings) + { + if (!settings.TryGetValue(PostgreSqlSettings.ConnectionString, out string? connectionString)) + { + logger.LogInformation("Using ConnectionString used by instance"); + + connectionString = transportSettings.ConnectionString.RemoveCustomConnectionStringParts(out string _, out string _); + + Diagnostics.AppendLine("ConnectionString not set, defaulted to using ConnectionString used by instance"); + } + else + { + Diagnostics.AppendLine("ConnectionString set"); + } + + databases.Add(new DatabaseDetails(connectionString)); + } + + public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue, + DateOnly startDate, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var queueTableName = (BrokerQueueTable)brokerQueue; + var startData = + await queueTableName.DatabaseDetails.GetSnapshot(queueTableName, cancellationToken); + + // looping for 24 hours + for (var i = 0; i < 24; i++) + { + await Task.Delay(TimeSpan.FromHours(1), timeProvider, cancellationToken); + var endData = + await queueTableName.DatabaseDetails.GetSnapshot(queueTableName, cancellationToken); + + yield return new QueueThroughput + { + DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime), + TotalThroughput = endData.RowVersion - startData.RowVersion + }; + + startData = endData; + } + } + + public override async IAsyncEnumerable GetQueueNames( + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var tables = new List(); + + foreach (var db in databases) + { + var version = await db.TestConnection(cancellationToken); + Data["SqlVersion"] = version; + tables.AddRange(await db.GetTables(cancellationToken)); + } + + ScopeType = "Catalog & Schema"; + + foreach (var tableName in tables) + { + yield return tableName; + } + } + + public override KeyDescriptionPair[] Settings => + [ + new KeyDescriptionPair(PostgreSqlSettings.ConnectionString, PostgreSqlSettings.ConnectionStringDescription) + ]; + + protected override async Task<(bool Success, List Errors)> TestConnectionCore( + CancellationToken cancellationToken) + { + List errors = []; + + foreach (DatabaseDetails db in databases) + { + try + { + await db.TestConnection(cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Test connection failed"); + errors.Add(ex.Message); + } + } + + return (errors.Count == 0, errors); + } + + public static class PostgreSqlSettings + { + public static readonly string ConnectionString = "PostgreSQL/ConnectionString"; + + public static readonly string ConnectionStringDescription = + "Database connection string that will provide at least read access to all queue tables."; + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlTable.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTable.cs new file mode 100644 index 0000000000..0e7ed6c5f0 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTable.cs @@ -0,0 +1,51 @@ +#nullable enable +namespace ServiceControl.Transports.PostgreSql; + +class PostgreSqlTable +{ + public PostgreSqlTable(string name, string schema) + { + //HINT: The query approximates queue length value based on max and min of the table sequence. + fullTableName = $"\"{schema}\".\"{name}\""; + //NOTE: Postgres doesn't have NOLOCK since it utilises snapshot isolation by default + LengthQuery = $$""" + SELECT CASE WHEN (EXISTS (SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{{schema}}' AND TABLE_NAME = '{{name}}')) THEN + COALESCE(cast(max(seq) - min(seq) + 1 AS int), 0) + ELSE + -1 + END AS Id FROM {{fullTableName}}; + """; + } + + readonly string fullTableName; + public string LengthQuery { get; } + + public override string ToString() => + fullTableName; + + bool Equals(PostgreSqlTable other) => + string.Equals(fullTableName, other.fullTableName); + + public override bool Equals(object? obj) + { + if (obj is null) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((PostgreSqlTable)obj); + } + + public override int GetHashCode() => + fullTableName.GetHashCode(); +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs new file mode 100644 index 0000000000..9322f67f33 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/PostgreSqlTransportCustomization.cs @@ -0,0 +1,86 @@ +namespace ServiceControl.Transports.PostgreSql; + +using System.Linq; +using System.Runtime.CompilerServices; +using BrokerThroughput; +using Microsoft.Extensions.DependencyInjection; +using NServiceBus; +using NServiceBus.Logging; +using NServiceBus.Transport.PostgreSql; + +public class PostgreSqlTransportCustomization : TransportCustomization +{ + protected override void CustomizeTransportForPrimaryEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) => + transportDefinition.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive; + + protected override void CustomizeTransportForAuditEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) => + transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + + protected override void CustomizeTransportForMonitoringEndpoint(EndpointConfiguration endpointConfiguration, PostgreSqlTransport transportDefinition, TransportSettings transportSettings) => + transportDefinition.TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + + protected override void AddTransportForPrimaryCore(IServiceCollection services, + TransportSettings transportSettings) => + services.AddSingleton(); + + protected override void AddTransportForMonitoringCore(IServiceCollection services, TransportSettings transportSettings) + { + services.AddSingleton(); + services.AddHostedService(provider => provider.GetRequiredService()); + } + + protected override PostgreSqlTransport CreateTransport(TransportSettings transportSettings, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) + { + var connectionString = transportSettings.ConnectionString.RemoveCustomConnectionStringParts(out var customSchema, out var subscriptionsTableSetting); + + var transport = new PostgreSqlTransport(connectionString); + + var subscriptions = transport.Subscriptions; + + if (customSchema != null) + { + transport.DefaultSchema = customSchema; + subscriptions.SubscriptionTableName = new SubscriptionTableName(DefaultSubscriptionTableName, customSchema); + } + + if (subscriptionsTableSetting != null) + { + var subscriptionsAddress = QueueAddress.Parse(subscriptionsTableSetting); + + subscriptions.SubscriptionTableName = + new SubscriptionTableName(subscriptionsAddress.Table, + subscriptionsAddress.Schema ?? customSchema); + } + + if (transportSettings.GetOrDefault("TransportSettings.EnableDtc")) + { + Logger.Error("The EnableDtc setting is no longer supported natively within ServiceControl. If you require distributed transactions, you will have to use a Transport Adapter (https://docs.particular.net/servicecontrol/transport-adapter/)"); + } + + DisableDelayedDelivery(transport) = true; + + transport.TransportTransactionMode = transport.GetSupportedTransactionModes().Contains(preferredTransactionMode) ? preferredTransactionMode : TransportTransactionMode.ReceiveOnly; + + return transport; + } + + protected override string ToTransportQualifiedQueueNameCore(string queueName) + { + const string delimiter = "\""; + const string escapedDelimiter = delimiter + delimiter; + + if (queueName.StartsWith(delimiter) || queueName.EndsWith(delimiter)) + { + return queueName; + } + + return delimiter + queueName.Replace(delimiter, escapedDelimiter) + delimiter; + } + + [UnsafeAccessor(UnsafeAccessorKind.Field, Name = "k__BackingField")] + static extern ref bool DisableDelayedDelivery(PostgreSqlTransport transport); + + const string DefaultSubscriptionTableName = "SubscriptionRouting"; + + static readonly ILog Logger = LogManager.GetLogger(typeof(PostgreSqlTransportCustomization)); +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/QueueAddress.cs b/src/ServiceControl.Transports.PostgreSql/QueueAddress.cs new file mode 100644 index 0000000000..03060d8d17 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/QueueAddress.cs @@ -0,0 +1,50 @@ +namespace ServiceControl.Transports.PostgreSql; + +using System; + +// NOTE: Copied from the SQL Transport +public class QueueAddress +{ + public QueueAddress(string table, string schemaName) + { + ArgumentException.ThrowIfNullOrWhiteSpace(table); + Table = SafeUnquote(table); + Schema = SafeUnquote(schemaName); + QualifiedTableName = $"{PostgreSqlNameHelper.Quote(Schema)}.{PostgreSqlNameHelper.Quote(Table)}"; + } + + public string Table { get; } + public string Schema { get; } + public string QualifiedTableName { get; } + + public static QueueAddress Parse(string address) + { + var index = 0; + var quoteCount = 0; + while (index < address.Length) + { + if (address[index] == '"') + { + quoteCount++; + } + else if (address[index] == '.' && quoteCount % 2 == 0) + { + var schema = address.Substring(0, index); + var table = address.Substring(index + 1); + + return new QueueAddress(table, schema); + } + index++; + } + + return new QueueAddress(address, null); + } + + static string SafeUnquote(string name) + { + var result = PostgreSqlNameHelper.Unquote(name); + return string.IsNullOrWhiteSpace(result) + ? null + : result; + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs b/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs new file mode 100644 index 0000000000..28b4fdea4b --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/QueueLengthProvider.cs @@ -0,0 +1,138 @@ +namespace ServiceControl.Transports.PostgreSql; + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Npgsql; +using NServiceBus.Logging; + +class QueueLengthProvider : AbstractQueueLengthProvider +{ + public QueueLengthProvider(TransportSettings settings, Action store) : base(settings, store) + { + connectionString = ConnectionString + .RemoveCustomConnectionStringParts(out var customSchema, out _); + + defaultSchema = customSchema ?? "public"; + } + public override void TrackEndpointInputQueue(EndpointToQueueMapping queueToTrack) + { + var parsedAddress = QueueAddress.Parse(queueToTrack.InputQueue); + + var sqlTable = new PostgreSqlTable(parsedAddress.Table, parsedAddress.Schema ?? defaultSchema); + + tableNames.AddOrUpdate(queueToTrack, _ => sqlTable, (_, currentSqlTable) => + { + if (!currentSqlTable.Equals(sqlTable)) + { + tableSizes.TryRemove(currentSqlTable, out var _); + } + + return sqlTable; + }); + + tableSizes.TryAdd(sqlTable, 0); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + await Task.Delay(QueryDelayInterval, stoppingToken); + + await QueryTableSizes(stoppingToken); + + UpdateQueueLengthStore(); + } + catch (OperationCanceledException) + { + // no-op + } + catch (Exception e) + { + Logger.Error("Error querying sql queue sizes.", e); + } + } + } + + void UpdateQueueLengthStore() + { + var nowTicks = DateTime.UtcNow.Ticks; + + foreach (var tableNamePair in tableNames) + { + Store( + [ + new QueueLengthEntry + { + DateTicks = nowTicks, + Value = tableSizes.GetValueOrDefault(tableNamePair.Value, 0) + } + ], + tableNamePair.Key); + } + } + + async Task QueryTableSizes(CancellationToken cancellationToken) + { + var chunks = tableSizes + .Select((i, index) => new + { + i, + index + }) + .GroupBy(p => p.index / QueryChunkSize) + .Select(grp => grp.Select(g => g.i).ToArray()) + .ToList(); + + await using var connection = new NpgsqlConnection(connectionString); + await connection.OpenAsync(cancellationToken); + + foreach (var chunk in chunks) + { + await UpdateChunk(connection, chunk, cancellationToken); + } + } + + async Task UpdateChunk(NpgsqlConnection connection, KeyValuePair[] chunk, CancellationToken cancellationToken) + { + var query = string.Join(Environment.NewLine, chunk.Select(c => c.Key.LengthQuery)); + + await using var command = new NpgsqlCommand(query, connection); + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + foreach (var chunkPair in chunk) + { + await reader.ReadAsync(cancellationToken); + + var queueLength = reader.GetInt32(0); + + if (queueLength == -1) + { + Logger.Warn($"Table {chunkPair.Key} does not exist."); + } + else + { + tableSizes.TryUpdate(chunkPair.Key, queueLength, chunkPair.Value); + } + + await reader.NextResultAsync(cancellationToken); + } + } + + readonly ConcurrentDictionary tableNames = new ConcurrentDictionary(); + readonly ConcurrentDictionary tableSizes = new ConcurrentDictionary(); + + readonly string connectionString; + readonly string defaultSchema; + + static readonly ILog Logger = LogManager.GetLogger(); + + static readonly TimeSpan QueryDelayInterval = TimeSpan.FromSeconds(1); + + const int QueryChunkSize = 10; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/QueueTableName.cs b/src/ServiceControl.Transports.PostgreSql/QueueTableName.cs new file mode 100644 index 0000000000..976d4ec29c --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/QueueTableName.cs @@ -0,0 +1,16 @@ +#nullable enable +namespace ServiceControl.Transports.PostgreSql; + +using System.Collections.Generic; +using ServiceControl.Transports.BrokerThroughput; +public class BrokerQueueTable(DatabaseDetails databaseDetails, QueueAddress queueAddress) + : IBrokerQueue +{ + public DatabaseDetails DatabaseDetails { get; } = databaseDetails; + public QueueAddress QueueAddress { get; } = queueAddress; + public string SequenceName => $"{QueueAddress.Table}_seq_seq"; + public string QueueName => QueueAddress.QualifiedTableName; + public string SanitizedName => QueueAddress.Table; + public string? Scope => $"[{DatabaseDetails.DatabaseName}].[{QueueAddress.Schema}]"; + public List EndpointIndicators { get; } = []; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs b/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs new file mode 100644 index 0000000000..ba4fc7180b --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/QueueTableSnapshot.cs @@ -0,0 +1,6 @@ +namespace ServiceControl.Transports.PostgreSql; + +public class BrokerQueueTableSnapshot(BrokerQueueTable details) : BrokerQueueTable(details.DatabaseDetails, details.QueueAddress) +{ + public long RowVersion { get; set; } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.PostgreSql/ServiceControl.Transports.PostgreSql.csproj b/src/ServiceControl.Transports.PostgreSql/ServiceControl.Transports.PostgreSql.csproj new file mode 100644 index 0000000000..1fa0bde372 --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/ServiceControl.Transports.PostgreSql.csproj @@ -0,0 +1,25 @@ + + + + net8.0 + true + + + + + + + + + + + + + + + + + + + + diff --git a/src/ServiceControl.Transports.PostgreSql/transport.manifest b/src/ServiceControl.Transports.PostgreSql/transport.manifest new file mode 100644 index 0000000000..ac3130f8de --- /dev/null +++ b/src/ServiceControl.Transports.PostgreSql/transport.manifest @@ -0,0 +1,16 @@ +{ + "Definitions": [ + { + "Name": "PostgreSQL", + "DisplayName": "PostgreSQL", + "AssemblyName": "ServiceControl.Transports.PostgreSql", + "TypeName": "ServiceControl.Transports.PostgreSql.PostgreSqlTransportCustomization, ServiceControl.Transports.PostgreSql", + "DefaultPrimaryMaximumConcurrencyLevel": 10, + "DefaultAuditMaximumConcurrencyLevel": 10, + "DefaultMonitoringMaximumConcurrencyLevel": 10, + "SampleConnectionString": "Server=;Database=nservicebus;Port=5432;User Id=;Password=;Queue Schema=myschema;Subscriptions Table=schema.tablename", + "AvailableInSCMU": true, + "Help": "Specify optional 'Queue Schema' to override the default schema. Specify optional 'Subscriptions Table' to override the default subscriptions table location." + } + ] +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt b/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt index 0288ca6324..242979cbf3 100644 --- a/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt +++ b/src/ServiceControl.Transports.SqlServer.Tests/ApprovalFiles/SqlServerQueryTests.TestConnectionWithInvalidCatalogSettings.approved.txt @@ -1,6 +1,5 @@ Connection test to SqlTransport failed: -Cannot open database "not_here" requested by the login. The login failed. -Login failed for user. +Could not connect to 'not_here' SQL Server database. Connection attempted with the following settings: ConnectionString set diff --git a/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs b/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs index 2c0bddfd9b..ab8b40bce3 100644 --- a/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs +++ b/src/ServiceControl.Transports.SqlServer.Tests/SqlServerQueryTests.cs @@ -69,7 +69,7 @@ public async Task TestConnectionWithInvalidCatalogSettings() await query.TestConnection(cancellationTokenSource.Token); Assert.That(success, Is.False); - Assert.That(errors.Single(), Does.StartWith("Cannot open database \"not_here\"")); + Assert.That(errors.Single(), Does.StartWith("Could not connect to 'not_here'")); Approver.Verify(diagnostics, s => Regex.Replace(s, "^Login failed for user .*$", "Login failed for user.", RegexOptions.Multiline)); } diff --git a/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs b/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs index 0e0b2573e3..54bb7151af 100644 --- a/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs +++ b/src/ServiceControl.Transports.SqlServer/DatabaseDetails.cs @@ -30,11 +30,11 @@ public DatabaseDetails(string connectionString) } } - public Task TestConnection(CancellationToken cancellationToken) + public async Task TestConnection(CancellationToken cancellationToken) { try { - return GetSqlVersion(cancellationToken); + return await GetSqlVersion(cancellationToken); } catch (SqlException ex) when (IsConnectionOrLoginIssue(ex)) { diff --git a/src/ServiceControl.Transports.SqlServer/NameHelper.cs b/src/ServiceControl.Transports.SqlServer/NameHelper.cs index cdc332a660..b21f097f67 100644 --- a/src/ServiceControl.Transports.SqlServer/NameHelper.cs +++ b/src/ServiceControl.Transports.SqlServer/NameHelper.cs @@ -5,27 +5,22 @@ class NameHelper const string prefix = "["; const string suffix = "]"; - public static string Quote(string unquotedName) + public static string Quote(string name) { - if (unquotedName == null) + if (name.StartsWith(prefix) && name.EndsWith(suffix)) { - return null; + return name; } - return prefix + unquotedName.Replace(suffix, suffix + suffix) + suffix; + + return prefix + name.Replace(suffix, suffix + suffix) + suffix; } public static string Unquote(string quotedString) { - if (quotedString == null) - { - return null; - } - if (!quotedString.StartsWith(prefix) || !quotedString.EndsWith(suffix)) { return quotedString; } - return quotedString .Substring(prefix.Length, quotedString.Length - prefix.Length - suffix.Length).Replace(suffix + suffix, suffix); } diff --git a/src/ServiceControl.Transports.SqlServer/QueueAddress.cs b/src/ServiceControl.Transports.SqlServer/QueueAddress.cs index 69d81a9a93..98501755be 100644 --- a/src/ServiceControl.Transports.SqlServer/QueueAddress.cs +++ b/src/ServiceControl.Transports.SqlServer/QueueAddress.cs @@ -101,12 +101,12 @@ static string ExtractNextPart(string address, out string part) static string Quote(string name) { - return NameHelper.Quote(name); + return SqlServerNameHelper.Quote(name); } static string SafeUnquote(string name) { - var result = NameHelper.Unquote(name); + var result = SqlServerNameHelper.Unquote(name); return string.IsNullOrWhiteSpace(result) ? null : result; diff --git a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs index bfcd15062e..fe06db3da6 100644 --- a/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.SqlServer/QueueLengthProvider.cs @@ -130,7 +130,7 @@ async Task UpdateChunk(SqlConnection connection, KeyValuePair[] c static readonly ILog Logger = LogManager.GetLogger(); - static readonly TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200); + static readonly TimeSpan QueryDelayInterval = TimeSpan.FromSeconds(1); const int QueryChunkSize = 10; } diff --git a/src/ServiceControl.Transports.SqlServer/SqlNameHelper.cs b/src/ServiceControl.Transports.SqlServer/SqlServerNameHelper.cs similarity index 54% rename from src/ServiceControl.Transports.SqlServer/SqlNameHelper.cs rename to src/ServiceControl.Transports.SqlServer/SqlServerNameHelper.cs index 9552a7b68c..24d3869041 100644 --- a/src/ServiceControl.Transports.SqlServer/SqlNameHelper.cs +++ b/src/ServiceControl.Transports.SqlServer/SqlServerNameHelper.cs @@ -1,27 +1,32 @@ -#nullable enable -namespace ServiceControl.Transports.SqlServer +namespace ServiceControl.Transports.SqlServer { - static class SqlNameHelper + // NOTE: Copied from the SQL Transport + static class SqlServerNameHelper { const string prefix = "["; const string suffix = "]"; - public static string Quote(string name) + public static string Quote(string unquotedName) { - if (name.StartsWith(prefix) && name.EndsWith(suffix)) + if (unquotedName == null) { - return name; + return null; } - - return prefix + name.Replace(suffix, suffix + suffix) + suffix; + return prefix + unquotedName.Replace(suffix, suffix + suffix) + suffix; } public static string Unquote(string quotedString) { + if (quotedString == null) + { + return null; + } + if (!quotedString.StartsWith(prefix) || !quotedString.EndsWith(suffix)) { return quotedString; } + return quotedString .Substring(prefix.Length, quotedString.Length - prefix.Length - suffix.Length).Replace(suffix + suffix, suffix); } diff --git a/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs b/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs index 86b66146b0..e84e58b8c4 100644 --- a/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs +++ b/src/ServiceControl.Transports.SqlServer/SqlServerQuery.cs @@ -92,9 +92,6 @@ public override async IAsyncEnumerable GetQueueNames( tables.AddRange(await db.GetTables(cancellationToken)); } - var catalogCount = tables.Select(t => t.DatabaseDetails.DatabaseName).Distinct().Count(); - var schemaCount = tables.Select(t => $"{t.DatabaseDetails.DatabaseName}/{t.Schema}").Distinct().Count(); - ScopeType = "Catalog & Schema"; foreach (var tableName in tables) diff --git a/src/ServiceControl.Transports.SqlServer/SqlTable.cs b/src/ServiceControl.Transports.SqlServer/SqlTable.cs index 5eeae69701..0e629b1762 100644 --- a/src/ServiceControl.Transports.SqlServer/SqlTable.cs +++ b/src/ServiceControl.Transports.SqlServer/SqlTable.cs @@ -6,10 +6,10 @@ class SqlTable { SqlTable(string name, string schema, string? catalog) { - var unquotedSchema = SqlNameHelper.Unquote(schema); - var unquotedName = SqlNameHelper.Unquote(name); - var quotedName = SqlNameHelper.Quote(name); - var quotedSchema = SqlNameHelper.Quote(schema); + var unquotedSchema = NameHelper.Unquote(schema); + var unquotedName = NameHelper.Unquote(name); + var quotedName = NameHelper.Quote(name); + var quotedSchema = NameHelper.Quote(schema); //HINT: The query approximates queue length value based on max and min // of RowVersion IDENTITY(1,1) column. There are couple of scenarios // that might lead to the approximation being off. More details here: @@ -29,7 +29,7 @@ SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) FROM {_ } else { - var quotedCatalog = SqlNameHelper.Quote(catalog); + var quotedCatalog = NameHelper.Quote(catalog); _fullTableName = $"{quotedCatalog}.{quotedSchema}.{quotedName}"; LengthQuery = $""" diff --git a/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt b/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt index 9856c18f66..d7cc93804f 100644 --- a/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt +++ b/src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt @@ -32,6 +32,7 @@ namespace ServiceControl.Transports void CustomizeMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, ServiceControl.Transports.TransportSettings transportSettings); void CustomizePrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, ServiceControl.Transports.TransportSettings transportSettings); System.Threading.Tasks.Task ProvisionQueues(ServiceControl.Transports.TransportSettings transportSettings, System.Collections.Generic.IEnumerable additionalQueues); + string ToTransportQualifiedQueueName(string queueName); } public class QueueLengthEntry { @@ -59,6 +60,8 @@ namespace ServiceControl.Transports protected abstract void CustomizeTransportForMonitoringEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TTransport transportDefinition, ServiceControl.Transports.TransportSettings transportSettings); protected abstract void CustomizeTransportForPrimaryEndpoint(NServiceBus.EndpointConfiguration endpointConfiguration, TTransport transportDefinition, ServiceControl.Transports.TransportSettings transportSettings); public virtual System.Threading.Tasks.Task ProvisionQueues(ServiceControl.Transports.TransportSettings transportSettings, System.Collections.Generic.IEnumerable additionalQueues) { } + public string ToTransportQualifiedQueueName(string queueName) { } + protected virtual string ToTransportQualifiedQueueNameCore(string queueName) { } } public static class TransportFactory { @@ -75,6 +78,9 @@ namespace ServiceControl.Transports public TransportManifestDefinition() { } public string[] Aliases { get; set; } public string AssemblyName { get; set; } + public int? DefaultAuditMaximumConcurrencyLevel { get; set; } + public int? DefaultMonitoringMaximumConcurrencyLevel { get; set; } + public int? DefaultPrimaryMaximumConcurrencyLevel { get; set; } public string DisplayName { get; set; } public string Location { get; set; } public string Name { get; set; } diff --git a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt index 6318beb0ad..6d201c38cc 100644 --- a/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt +++ b/src/ServiceControl.Transports.Tests/ApprovalFiles/TransportManifestLibraryTests.All_types_defined_in_manifest_files_exist_in_specified_assembly.approved.txt @@ -5,6 +5,7 @@ "LearningTransport", "MSMQ", "NetStandardAzureServiceBus", + "PostgreSQL", "RabbitMQ.ClassicConventionalRouting", "RabbitMQ.ClassicDirectRouting", "RabbitMQ.ConventionalRouting", diff --git a/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs b/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs index 35860cffe9..dc45c3ffb0 100644 --- a/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs +++ b/src/ServiceControl.Transports.Tests/QueueIngestionTests.cs @@ -41,7 +41,7 @@ await StartQueueIngestor( for (int i = 0; i < numMessagesToIngest; i++) { - await Dispatcher.SendTestMessage(queueName, $"message{i}"); + await Dispatcher.SendTestMessage(queueName, $"message{i}", configuration.TransportCustomization); } var allMessagesProcessed = await onMessagesProcessed.Task; @@ -67,7 +67,7 @@ await StartQueueIngestor( return Task.FromResult(ErrorHandleResult.Handled); }); - await Dispatcher.SendTestMessage(queueName, $"some failing message"); + await Dispatcher.SendTestMessage(queueName, $"some failing message", configuration.TransportCustomization); var onErrorWasCalled = await onErrorCalled.Task; diff --git a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs index 9a6aeaa7c9..5504962847 100644 --- a/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs +++ b/src/ServiceControl.Transports.Tests/QueueLengthMonitoringTests.cs @@ -24,7 +24,7 @@ public async Task Should_report_queue_length() } }); - await Dispatcher.SendTestMessage(queueName, "some content"); + await Dispatcher.SendTestMessage(queueName, "some content", configuration.TransportCustomization); var queueLengthEntry = await onQueueLengthEntryReceived.Task; diff --git a/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs b/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs index 98e5942431..518e5242ee 100644 --- a/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs +++ b/src/ServiceControl.Transports.Tests/QueueProvisioningTests.cs @@ -16,10 +16,10 @@ public async Task Should_provision_queues() await ProvisionQueues(queueName, errorQueue, [additionalQueue1, additionalQueue2]); - Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(queueName, "some content")); - Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(errorQueue, "some content")); - Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue1, "some content")); - Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue2, "some content")); + Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(queueName, "some content", configuration.TransportCustomization)); + Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(errorQueue, "some content", configuration.TransportCustomization)); + Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue1, "some content", configuration.TransportCustomization)); + Assert.DoesNotThrowAsync(async () => await Dispatcher.SendTestMessage(additionalQueue2, "some content", configuration.TransportCustomization)); } } } \ No newline at end of file diff --git a/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs b/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs index 785b87551b..23608ffead 100644 --- a/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs +++ b/src/ServiceControl.Transports.Tests/TestDispatcherExtensions.cs @@ -6,15 +6,16 @@ using System.Threading.Tasks; using NServiceBus.Routing; using NServiceBus.Transport; + using ServiceControl.Transports; static class TestDispatcherExtensions { - public static Task SendTestMessage(this IMessageDispatcher dispatcher, string queue, string content) + public static Task SendTestMessage(this IMessageDispatcher dispatcher, string queue, string content, ITransportCustomization transportCustomization) { var transportOperation = new TransportOperation( new OutgoingMessage(Guid.NewGuid().ToString(), [], Encoding.UTF8.GetBytes(content)), - new UnicastAddressTag(queue), []); + new UnicastAddressTag(transportCustomization.ToTransportQualifiedQueueName(queue)), []); return dispatcher.Dispatch(new TransportOperations(transportOperation), new TransportTransaction(), CancellationToken.None); } diff --git a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs index 87caa7be55..ced3129705 100644 --- a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs +++ b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs @@ -194,7 +194,7 @@ await StartQueueIngestor( for (int i = 0; i < numMessagesToIngest; i++) { - await Dispatcher.SendTestMessage(queueName, $"message{i}"); + await Dispatcher.SendTestMessage(queueName, $"message{i}", configuration.TransportCustomization); } bool allMessagesProcessed = await onMessagesProcessed.Task; diff --git a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs index cf6d111c10..f58bcff9bd 100644 --- a/src/ServiceControl.Transports/DevelopmentTransportLocations.cs +++ b/src/ServiceControl.Transports/DevelopmentTransportLocations.cs @@ -24,6 +24,7 @@ static DevelopmentTransportLocations() ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.RabbitMQ")); ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.SqlServer")); ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.SQS")); + ManifestFiles.Add(BuildManifestPath(srcFolder, "ServiceControl.Transports.PostgreSql")); } } diff --git a/src/ServiceControl.Transports/TransportCustomization.cs b/src/ServiceControl.Transports/TransportCustomization.cs index c6b6cd717a..025189a09e 100644 --- a/src/ServiceControl.Transports/TransportCustomization.cs +++ b/src/ServiceControl.Transports/TransportCustomization.cs @@ -23,6 +23,7 @@ public interface ITransportCustomization void AddTransportForMonitoring(IServiceCollection services, TransportSettings transportSettings); Task ProvisionQueues(TransportSettings transportSettings, IEnumerable additionalQueues); + string ToTransportQualifiedQueueName(string queueName); Task CreateTransportInfrastructure(string name, TransportSettings transportSettings, OnMessage onMessage = null, OnError onError = null, Func onCriticalError = null, TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly); } @@ -109,6 +110,10 @@ protected void ConfigureDefaultEndpointSettings(EndpointConfiguration endpointCo endpointConfiguration.SendFailedMessagesTo(transportSettings.ErrorQueue); } + public string ToTransportQualifiedQueueName(string queueName) => ToTransportQualifiedQueueNameCore(queueName); + + protected virtual string ToTransportQualifiedQueueNameCore(string queueName) => queueName; + public virtual async Task ProvisionQueues(TransportSettings transportSettings, IEnumerable additionalQueues) { var transport = CreateTransport(transportSettings); @@ -129,7 +134,7 @@ public virtual async Task ProvisionQueues(TransportSettings transportSettings, I false, transportSettings.ErrorQueue)}; - var transportInfrastructure = await transport.Initialize(hostSettings, receivers, additionalQueues.Union(new[] { transportSettings.ErrorQueue }).ToArray()); + var transportInfrastructure = await transport.Initialize(hostSettings, receivers, additionalQueues.Union([transportSettings.ErrorQueue]).Select(ToTransportQualifiedQueueNameCore).ToArray()); await transportInfrastructure.Shutdown(); } @@ -160,7 +165,7 @@ public async Task CreateTransportInfrastructure(string receivers = []; } - var transportInfrastructure = await transport.Initialize(hostSettings, receivers, new[] { transportSettings.ErrorQueue }); + var transportInfrastructure = await transport.Initialize(hostSettings, receivers, new[] { ToTransportQualifiedQueueNameCore(transportSettings.ErrorQueue) }); if (createReceiver) { diff --git a/src/ServiceControl.Transports/TransportManifest.cs b/src/ServiceControl.Transports/TransportManifest.cs index c37538867a..78d6324ae2 100644 --- a/src/ServiceControl.Transports/TransportManifest.cs +++ b/src/ServiceControl.Transports/TransportManifest.cs @@ -28,6 +28,10 @@ public class TransportManifestDefinition public string[] Aliases { get; set; } = []; + public int? DefaultPrimaryMaximumConcurrencyLevel { get; set; } + public int? DefaultAuditMaximumConcurrencyLevel { get; set; } + public int? DefaultMonitoringMaximumConcurrencyLevel { get; set; } + internal bool IsMatch(string transportType) => string.Equals(TypeName, transportType, StringComparison.Ordinal) // Type names are case-sensitive || string.Equals(Name, transportType, StringComparison.OrdinalIgnoreCase) diff --git a/src/ServiceControl.sln b/src/ServiceControl.sln index 43223c31a3..04eb5829f0 100644 --- a/src/ServiceControl.sln +++ b/src/ServiceControl.sln @@ -157,13 +157,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Transports.M EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent.Persistence", "Particular.LicensingComponent.Persistence\Particular.LicensingComponent.Persistence.csproj", "{0EBBFE57-5760-43FC-A646-FE4C49F4BF59}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.RavenDB", "ServiceControl.RavenDB\ServiceControl.RavenDB.csproj", "{D8610AB6-EB24-4617-8089-F11732D3E033}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.RavenDB", "ServiceControl.RavenDB\ServiceControl.RavenDB.csproj", "{D8610AB6-EB24-4617-8089-F11732D3E033}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Particular.LicensingComponent", "Particular.LicensingComponent", "{2D732554-9C3D-4509-8D0B-5595266A4D92}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Persistence", "Persistence", "{C22BFC89-851E-4D42-A878-C13B3F206908}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceControl.Api", "ServiceControl.Api\ServiceControl.Api.csproj", "{9682F7C5-C631-4924-A698-4C355BB14540}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Api", "ServiceControl.Api\ServiceControl.Api.csproj", "{9682F7C5-C631-4924-A698-4C355BB14540}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent.Persistence.InMemory", "Particular.LicensingComponent.Persistence.InMemory\Particular.LicensingComponent.Persistence.InMemory.csproj", "{9D134CF7-F1A3-4935-BCBD-458406F8AA97}" EndProject @@ -173,12 +173,16 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Persistence. EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Testing", "Testing", "{80C55E70-4B7A-4EF2-BB9E-C42F8DB0495D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Particular.LicensingComponent", "Particular.LicensingComponent\Particular.LicensingComponent.csproj", "{E9407280-401E-4D23-ADF5-D146C5ACBE4D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent", "Particular.LicensingComponent\Particular.LicensingComponent.csproj", "{E9407280-401E-4D23-ADF5-D146C5ACBE4D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Particular.LicensingComponent.UnitTests", "Particular.LicensingComponent.UnitTests\Particular.LicensingComponent.UnitTests.csproj", "{51F5504E-E915-40EC-B96E-CA700A57982C}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Particular.LicensingComponent.UnitTests", "Particular.LicensingComponent.UnitTests\Particular.LicensingComponent.UnitTests.csproj", "{51F5504E-E915-40EC-B96E-CA700A57982C}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HealthCheckApp", "HealthCheckApp\HealthCheckApp.csproj", "{D523E575-6975-4974-A173-B47996A73914}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Transports.PostgreSql", "ServiceControl.Transports.PostgreSql\ServiceControl.Transports.PostgreSql.csproj", "{448CBDCF-718D-4BC7-8F7C-099C9A362B59}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ServiceControl.Transports.PostgreSql.Tests", "ServiceControl.Transports.PostgreSql.Tests\ServiceControl.Transports.PostgreSql.Tests.csproj", "{18DBEEF5-42EE-4C1D-A05B-87B21C067D53}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -861,6 +865,18 @@ Global {5F8E6C64-B505-4FF7-81CC-9161FBC198A8}.Release|x64.Build.0 = Release|Any CPU {5F8E6C64-B505-4FF7-81CC-9161FBC198A8}.Release|x86.ActiveCfg = Release|Any CPU {5F8E6C64-B505-4FF7-81CC-9161FBC198A8}.Release|x86.Build.0 = Release|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.ActiveCfg = Debug|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.Build.0 = Debug|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.ActiveCfg = Debug|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.Build.0 = Debug|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.Build.0 = Release|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.ActiveCfg = Release|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.Build.0 = Release|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.ActiveCfg = Release|Any CPU + {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.Build.0 = Release|Any CPU {D8610AB6-EB24-4617-8089-F11732D3E033}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {D8610AB6-EB24-4617-8089-F11732D3E033}.Debug|Any CPU.Build.0 = Debug|Any CPU {D8610AB6-EB24-4617-8089-F11732D3E033}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -885,18 +901,6 @@ Global {9682F7C5-C631-4924-A698-4C355BB14540}.Release|x64.Build.0 = Release|Any CPU {9682F7C5-C631-4924-A698-4C355BB14540}.Release|x86.ActiveCfg = Release|Any CPU {9682F7C5-C631-4924-A698-4C355BB14540}.Release|x86.Build.0 = Release|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|Any CPU.Build.0 = Debug|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.ActiveCfg = Debug|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x64.Build.0 = Debug|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.ActiveCfg = Debug|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Debug|x86.Build.0 = Debug|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.ActiveCfg = Release|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|Any CPU.Build.0 = Release|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.ActiveCfg = Release|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x64.Build.0 = Release|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.ActiveCfg = Release|Any CPU - {0EBBFE57-5760-43FC-A646-FE4C49F4BF59}.Release|x86.Build.0 = Release|Any CPU {9D134CF7-F1A3-4935-BCBD-458406F8AA97}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {9D134CF7-F1A3-4935-BCBD-458406F8AA97}.Debug|Any CPU.Build.0 = Debug|Any CPU {9D134CF7-F1A3-4935-BCBD-458406F8AA97}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -969,6 +973,30 @@ Global {D523E575-6975-4974-A173-B47996A73914}.Release|x64.Build.0 = Release|Any CPU {D523E575-6975-4974-A173-B47996A73914}.Release|x86.ActiveCfg = Release|Any CPU {D523E575-6975-4974-A173-B47996A73914}.Release|x86.Build.0 = Release|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|Any CPU.Build.0 = Debug|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x64.ActiveCfg = Debug|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x64.Build.0 = Debug|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x86.ActiveCfg = Debug|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Debug|x86.Build.0 = Debug|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|Any CPU.ActiveCfg = Release|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|Any CPU.Build.0 = Release|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x64.ActiveCfg = Release|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x64.Build.0 = Release|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x86.ActiveCfg = Release|Any CPU + {448CBDCF-718D-4BC7-8F7C-099C9A362B59}.Release|x86.Build.0 = Release|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|Any CPU.Build.0 = Debug|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x64.ActiveCfg = Debug|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x64.Build.0 = Debug|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x86.ActiveCfg = Debug|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Debug|x86.Build.0 = Debug|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|Any CPU.ActiveCfg = Release|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|Any CPU.Build.0 = Release|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x64.ActiveCfg = Release|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x64.Build.0 = Release|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x86.ActiveCfg = Release|Any CPU + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1040,16 +1068,18 @@ Global {E067C14F-867B-4479-BC85-39F2AFAF25D0} = {E2249BAA-D9E9-4369-9C70-0E21C69A3E56} {F04B9D2C-7E31-4697-BAE3-D3FFC5FBBDFE} = {A21A1A89-0B07-4E87-8E3C-41D9C280DCB8} {5F8E6C64-B505-4FF7-81CC-9161FBC198A8} = {E0E45F22-35E3-4AD8-B09E-EFEA5A2F18EE} - {D8610AB6-EB24-4617-8089-F11732D3E033} = {9AF9D3C7-E859-451B-BA4D-B954D289213A} - {9682F7C5-C631-4924-A698-4C355BB14540} = {9AF9D3C7-E859-451B-BA4D-B954D289213A} {0EBBFE57-5760-43FC-A646-FE4C49F4BF59} = {C22BFC89-851E-4D42-A878-C13B3F206908} + {D8610AB6-EB24-4617-8089-F11732D3E033} = {9AF9D3C7-E859-451B-BA4D-B954D289213A} {C22BFC89-851E-4D42-A878-C13B3F206908} = {2D732554-9C3D-4509-8D0B-5595266A4D92} + {9682F7C5-C631-4924-A698-4C355BB14540} = {9AF9D3C7-E859-451B-BA4D-B954D289213A} {9D134CF7-F1A3-4935-BCBD-458406F8AA97} = {C22BFC89-851E-4D42-A878-C13B3F206908} {6D07AC20-25B3-423F-A88E-64DED435DA06} = {2D732554-9C3D-4509-8D0B-5595266A4D92} {71F18E74-944B-49B9-9AA9-EA2B253CAF0C} = {350F72AB-142D-4AAD-9EF1-1A83DC991D87} {80C55E70-4B7A-4EF2-BB9E-C42F8DB0495D} = {2D732554-9C3D-4509-8D0B-5595266A4D92} {E9407280-401E-4D23-ADF5-D146C5ACBE4D} = {2D732554-9C3D-4509-8D0B-5595266A4D92} {51F5504E-E915-40EC-B96E-CA700A57982C} = {80C55E70-4B7A-4EF2-BB9E-C42F8DB0495D} + {448CBDCF-718D-4BC7-8F7C-099C9A362B59} = {A21A1A89-0B07-4E87-8E3C-41D9C280DCB8} + {18DBEEF5-42EE-4C1D-A05B-87B21C067D53} = {E0E45F22-35E3-4AD8-B09E-EFEA5A2F18EE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3B9E5B72-F580-465A-A22C-2D2148AF4EB4} diff --git a/src/ServiceControl/App.config b/src/ServiceControl/App.config index 430444537b..654ab4b8d9 100644 --- a/src/ServiceControl/App.config +++ b/src/ServiceControl/App.config @@ -20,6 +20,7 @@ These settings are only here so that we can debug ServiceControl while developin + @@ -47,5 +48,8 @@ These settings are only here so that we can debug ServiceControl while developin + + + \ No newline at end of file diff --git a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs index 7dfaf90afb..6b15a86936 100644 --- a/src/ServiceControl/Infrastructure/NServiceBusFactory.cs +++ b/src/ServiceControl/Infrastructure/NServiceBusFactory.cs @@ -45,7 +45,7 @@ public static void Configure(Settings.Settings settings, ITransportCustomization recoverability.Delayed(c => c.NumberOfRetries(0)); recoverability.AddUnrecoverableException(); - configuration.SendFailedMessagesTo(transportSettings.ErrorQueue); + configuration.SendFailedMessagesTo(transportCustomization.ToTransportQualifiedQueueName(transportSettings.ErrorQueue)); recoverability.CustomPolicy(SendEmailNotificationHandler.RecoverabilityPolicy); diff --git a/src/ServiceControl/Infrastructure/Settings/Settings.cs b/src/ServiceControl/Infrastructure/Settings/Settings.cs index 399fbd344f..d6433a29f1 100644 --- a/src/ServiceControl/Infrastructure/Settings/Settings.cs +++ b/src/ServiceControl/Infrastructure/Settings/Settings.cs @@ -57,7 +57,7 @@ public Settings( } ProcessRetryBatchesFrequency = TimeSpan.FromSeconds(30); - MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", 10); + MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultPrimaryMaximumConcurrencyLevel ?? 10); RetryHistoryDepth = SettingsReader.Read(SettingsRootNamespace, "RetryHistoryDepth", 10); AllowMessageEditing = SettingsReader.Read(SettingsRootNamespace, "AllowMessageEditing"); NotificationsFilter = SettingsReader.Read(SettingsRootNamespace, "NotificationsFilter"); diff --git a/src/ServiceControl/Operations/ErrorIngestor.cs b/src/ServiceControl/Operations/ErrorIngestor.cs index 9edfc492a7..a2f13028cf 100644 --- a/src/ServiceControl/Operations/ErrorIngestor.cs +++ b/src/ServiceControl/Operations/ErrorIngestor.cs @@ -14,6 +14,7 @@ using Recoverability; using ServiceBus.Management.Infrastructure.Settings; using ServiceControl.Persistence.UnitOfWork; + using ServiceControl.Transports; public class ErrorIngestor { @@ -25,6 +26,7 @@ public ErrorIngestor(Metrics metrics, IDomainEvents domainEvents, IIngestionUnitOfWorkFactory unitOfWorkFactory, Lazy messageDispatcher, + ITransportCustomization transportCustomization, Settings settings) { this.unitOfWorkFactory = unitOfWorkFactory; @@ -44,6 +46,7 @@ public ErrorIngestor(Metrics metrics, errorProcessor = new ErrorProcessor(enrichers, failedMessageEnrichers.ToArray(), domainEvents, ingestedMeter); retryConfirmationProcessor = new RetryConfirmationProcessor(domainEvents); + logQueueAddress = new UnicastAddressTag(transportCustomization.ToTransportQualifiedQueueName(this.settings.ErrorLogQueue)); } public async Task Ingest(List contexts) @@ -167,7 +170,7 @@ Task Forward(IReadOnlyCollection messageContexts) // Forwarded messages should last as long as possible outgoingMessage.Headers.Remove(NServiceBus.Headers.TimeToBeReceived); - transportOperations[index] = new TransportOperation(outgoingMessage, new UnicastAddressTag(settings.ErrorLogQueue)); + transportOperations[index] = new TransportOperation(outgoingMessage, logQueueAddress); index++; } @@ -186,7 +189,7 @@ public async Task VerifyCanReachForwardingAddress() new TransportOperation( new OutgoingMessage(Guid.Empty.ToString("N"), [], Array.Empty()), - new UnicastAddressTag(settings.ErrorLogQueue) + logQueueAddress ) ); @@ -204,6 +207,8 @@ public async Task VerifyCanReachForwardingAddress() readonly ErrorProcessor errorProcessor; readonly Lazy messageDispatcher; readonly RetryConfirmationProcessor retryConfirmationProcessor; + readonly UnicastAddressTag logQueueAddress; + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs index 11ab232ef0..edd08e220d 100644 --- a/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs +++ b/src/ServiceControl/Recoverability/Retrying/Infrastructure/ReturnToSenderDequeuer.cs @@ -16,7 +16,7 @@ class ReturnToSenderDequeuer : IHostedService { public ReturnToSenderDequeuer(ReturnToSender returnToSender, IErrorMessageDataStore dataStore, IDomainEvents domainEvents, ITransportCustomization transportCustomization, TransportSettings transportSettings, Settings settings) { - InputAddress = settings.StagingQueue; + InputAddress = transportCustomization.ToTransportQualifiedQueueName(settings.StagingQueue); this.returnToSender = returnToSender; errorQueue = settings.ErrorQueue; this.transportCustomization = transportCustomization; diff --git a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt index d172846297..bb2e392a4e 100644 --- a/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt +++ b/src/ServiceControlInstaller.Engine.UnitTests/ApprovalFiles/APIApprovals.TransportNames.approved.txt @@ -55,6 +55,11 @@ "ServiceControl.Transports.AzureServiceBus.AzureServiceBusTransport, ServiceControl.Transports.AzureServiceBus" ] }, + { + "Name": "PostgreSQL", + "DisplayName": "PostgreSQL", + "Aliases": [] + }, { "Name": "RabbitMQ.ClassicConventionalRouting", "DisplayName": "RabbitMQ - Conventional routing topology (classic queues)", diff --git a/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj b/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj index 254c3ac192..79dab8b17a 100644 --- a/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj +++ b/src/ServiceControlInstaller.Engine/ServiceControlInstaller.Engine.csproj @@ -17,6 +17,7 @@ + diff --git a/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs b/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs index 1136f47255..b0f91b7fe6 100644 --- a/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs +++ b/src/ServiceControlInstaller.Engine/Validation/ConnectionStringValidator.cs @@ -5,6 +5,7 @@ using System.Linq; using Accounts; using Microsoft.Data.SqlClient; + using Npgsql; class ConnectionStringValidator { @@ -21,6 +22,10 @@ public static void Validate(IServiceControlAuditInstance instance) { validator.CheckMsSqlConnectionString(); } + else if (instance.TransportPackage.Name == "PostgreSQL") + { + validator.CheckPostgreSqlConnectString(); + } } public static void Validate(IServiceControlInstance instance) @@ -30,6 +35,10 @@ public static void Validate(IServiceControlInstance instance) { validator.CheckMsSqlConnectionString(); } + else if (instance.TransportPackage.Name == "PostgreSQL") + { + validator.CheckPostgreSqlConnectString(); + } } public static void Validate(IMonitoringInstance instance) @@ -39,6 +48,10 @@ public static void Validate(IMonitoringInstance instance) { validator.CheckMsSqlConnectionString(); } + else if (instance.TransportPackage.Name == "PostgreSQL") + { + validator.CheckPostgreSqlConnectString(); + } } void CheckMsSqlConnectionString() @@ -96,6 +109,41 @@ void CheckMsSqlConnectionString() } } + void CheckPostgreSqlConnectString() + { + string[] customKeys = { "Queue Schema", "Subscriptions Table" }; + + try + { + //Check validity of connection string. This will throw if invalid + var builder = new DbConnectionStringBuilder { ConnectionString = connectionString }; + + //The NSB PostgreSQL Transport can have custom key/value pairs in the connection string + // that won't make sense to PostgreSQL. Remove these from the string we want to validate. + foreach (var customKey in customKeys) + { + if (builder.ContainsKey(customKey)) + { + builder.Remove(customKey); + } + } + + //Attempt to connect to DB + using (var s = new NpgsqlConnection(builder.ConnectionString)) + { + s.Open(); + } + } + catch (ArgumentException argumentException) + { + throw new EngineValidationException($"Connection String is invalid - {argumentException.Message}"); + } + catch (SqlException sqlEx) + { + throw new EngineValidationException($"PostgreSQL connection failed - {sqlEx.Message}"); + } + } + string connectionString; string serviceAccount; } diff --git a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs index 6d4b44dc99..d2111f730f 100644 --- a/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs +++ b/src/ServiceControlInstaller.Packaging.UnitTests/DeploymentPackageTests.cs @@ -87,7 +87,8 @@ public void Should_package_all_transports() "RabbitMQ", "MSMQ", "AmazonSQS", - "LearningTransport"}; + "LearningTransport", + "PostgreSql"}; var bundledTransports = deploymentPackage.DeploymentUnits .Where(u => u.Category == "Transports") diff --git a/src/TestHelper/IncludeInPostgreSqlTestsAttribute.cs b/src/TestHelper/IncludeInPostgreSqlTestsAttribute.cs new file mode 100644 index 0000000000..4691a26412 --- /dev/null +++ b/src/TestHelper/IncludeInPostgreSqlTestsAttribute.cs @@ -0,0 +1,4 @@ +public class IncludeInPostgreSqlTestsAttribute : IncludeInTestsAttribute +{ + protected override string Filter => "PostgreSql"; +} diff --git a/src/container-integration-test/postgres.yml b/src/container-integration-test/postgres.yml new file mode 100644 index 0000000000..63807d29f1 --- /dev/null +++ b/src/container-integration-test/postgres.yml @@ -0,0 +1,28 @@ +services: + + postgres: + image: postgres + restart: unless-stopped + ports: + - "5432:5432" + environment: + POSTGRES_PASSWORD: ServiceControl1! + healthcheck: + test: ["CMD-SHELL", "psql -U postgres -d postgres -c 'SELECT 1' || exit 1"] + interval: 10s + timeout: 3s + retries: 3 + + # Add service health dependencies to ServiceControl instances + servicecontrol: + depends_on: + postgres: + condition: service_healthy + servicecontrol-audit: + depends_on: + postgres: + condition: service_healthy + servicecontrol-monitoring: + depends_on: + postgres: + condition: service_healthy \ No newline at end of file