Skip to content

Commit

Permalink
Add support for PostgreSQL Transport (#4443)
Browse files Browse the repository at this point in the history
* Bring sqlserver files in line with sqlserver transport to avoid confusion in naming

* First changes for PostgreSql transport

* nullable fix

* test fix

* manifest file test fix

* Added test project

* Do some random work on postgresql support

- Update query for current sequence value
- Update addressing types
- Update query for estimating queue length

* build fix

* fix setting value key

* remove unused code

* customise queue names with schema for postgres when provisioning queues

* added todo for something that errors

* removed commented out code

* connection validator for postgres

* Fix subscription table parameter for postgresql

* fix test

* Translate transport sending queues in instances

* fix test

* Change ToTransportQualifiedQueueNameCore to use double quotes

* use postgres customisation for error queue names

* alternative to get latest identity value for table that doesn't rely on current session increment

* revert double assigning of schema

* Use new alpha of the transport with fixed delay bug

* Add connection string default

* Translate queue names when using dispatcher directly in tests

* fix query to work with postgres syntax

* fix syntax issues (functionality still broken)

* fix RunScenario test

* Translate queues for forwarding message in Audit and Error ingestion

* attempt to get postgres running on ci

* added test category for psotgres

* update postgres env variable

* reverting app.config changes

* clarify not using nolock

* update connection string name so it matches existing ones

* Perform az login for postgres

* fix error catching on failed connection

* fix sqlserver tests

* query fix

* Fix indicator update for transports that display endpointname on SP, ie slq server and postgresql

* Translate throughput queue for sending from monitoring

* fix extra character

* Add test for throughput fix

* Cleanup

* Avoid translating audit log queue multiple times

* Avoid translating multiple times when reporting throughput

* Avoid translating error log queue multiple times

* Remove message driven pubsub comment

* Fix bad merge

* More bad merge fixes

* Formatting

* translate staging queue

* fix dequeue tests

* variable name cleanup + add ability to quote subscription table

* Run container tests for postgres

* Fix docker command

* Use compose file to run postgres

* Fix container name

* Fix pwd

* Fix service deps

* Skip volume

* Add health check

* use default username and database

* Revert pool size

* Remove unneded settings

* limit postgreSQL connections for monitoring and audit to prevent connection starvation

* fix rebase

* fix null reference in tests

* more test fixing

* change name of default max concurrency level properties and apply to sql server transport

* fix CI test

* revert changes to SQL server max connections defaults

* Decrease the queue lenght read frequency for SQL Server and PostgreSQL

* Update to Postgres transport 8.1.4

* Update src/ServiceControl.Transports.PostgreSql/transport.manifest

* fix from rebase

* reapply change lost in rebase

* reapply change lost in rebase

---------

Co-authored-by: SzymonPobiega <szymon.pobiega@gmail.com>
Co-authored-by: Phil Bastian <phil.bastian@particular.net>
Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net>
  • Loading branch information
4 people authored Sep 27, 2024
1 parent f36ddbc commit 3f0261f
Show file tree
Hide file tree
Showing 82 changed files with 1,331 additions and 127 deletions.
12 changes: 10 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
os: [windows-2022, ubuntu-22.04]
test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence ]
test-category: [ Default, SqlServer, AzureServiceBus, RabbitMQ, AzureStorageQueues, MSMQ, SQS, PrimaryRavenAcceptance, PrimaryRavenPersistence, PostgreSQL ]
include:
- os: windows-2022
os-name: Windows
Expand Down Expand Up @@ -65,7 +65,7 @@ jobs:
run: Import-Module ./deploy/PowerShellModules/Particular.ServiceControl.Management
- name: Azure login
uses: azure/login@v2.2.0
if: matrix.test-category == 'AzureServiceBus' || matrix.test-category == 'AzureStorageQueues' || matrix.test-category == 'RabbitMQ'
if: matrix.test-category == 'AzureServiceBus' || matrix.test-category == 'AzureStorageQueues' || matrix.test-category == 'RabbitMQ' || matrix.test-category == 'PostgreSQL'
with:
creds: ${{ secrets.AZURE_ACI_CREDENTIALS }}
- name: Setup SQL Server
Expand All @@ -74,6 +74,14 @@ jobs:
with:
connection-string-env-var: ServiceControl_TransportTests_SQL_ConnectionString
catalog: nservicebus
- name: Setup PostgreSQL
uses: Particular/setup-postgres-action@v1.0.0
if: matrix.test-category == 'PostgreSQL'
with:
connection-string-name: ServiceControl_TransportTests_PostgreSQL_ConnectionString
tag: ServiceControl
registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Setup RabbitMQ
uses: Particular/setup-rabbitmq-action@v1.7.0
if: matrix.test-category == 'RabbitMQ'
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/container-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ jobs:
connection-string: 'Server=mssql;Database=master;User=sa;Password=ServiceControl1!;Encrypt=False;'
compose-cmd: docker compose -f servicecontrol.yml -f mssql.yml up -d
expected-healthy-containers: 5
- name: postgresql
transport: PostgreSQL
connection-string: 'Host=postgres;Port=5432;Database=postgres;User ID=postgres;Password=ServiceControl1!;'
compose-cmd: docker compose -f servicecontrol.yml -f postgres.yml up -d
expected-healthy-containers: 5
- name: asb
transport: NetStandardAzureServiceBus
compose-cmd: docker compose -f servicecontrol.yml up -d
Expand Down
6 changes: 4 additions & 2 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
<PackageVersion Include="NServiceBus.Transport.AzureServiceBus" Version="4.2.2" />
<PackageVersion Include="NServiceBus.Transport.AzureStorageQueues" Version="13.0.1" />
<PackageVersion Include="NServiceBus.Transport.Msmq.Sources" Version="3.0.1" />
<PackageVersion Include="NServiceBus.Transport.SqlServer" Version="8.1.3" />
<PackageVersion Include="NuGet.Versioning" Version="6.11.0" />
<PackageVersion Include="NServiceBus.Transport.SqlServer" Version="8.1.4" />
<PackageVersion Include="NServiceBus.Transport.PostgreSql" Version="8.1.4" />
<PackageVersion Include="Npgsql" Version="8.0.3" />
<PackageVersion Include="NuGet.Versioning" Version="6.11.0" />
<PackageVersion Include="NUnit" Version="4.2.2" />
<PackageVersion Include="NUnit.Analyzers" Version="4.3.0" />
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ public async Task UpdateUserIndicatorOnEndpoints(List<UpdateUserIndicator> userI
//if there are multiple sources of throughput for the endpoint, update them all
var existingEndpoints = GetAllConnectedEndpoints(e.Name);

existingEndpoints.ForEach(u => u.UserIndicator = e.UserIndicator);
existingEndpoints.ForEach(u =>
{
u.UserIndicator = e.UserIndicator;
//for ones that matched on endpoint name, update matching on sanitizedName
var sanitizedMAtchingEndpoints = GetAllConnectedEndpoints(u.SanitizedName);
sanitizedMAtchingEndpoints.ForEach(s => s.UserIndicator = e.UserIndicator);
});
});

await Task.CompletedTask;
Expand All @@ -138,7 +144,7 @@ public async Task<bool> IsThereThroughputForLastXDaysForSource(int days, Through
endpointThroughput.Value.Any(t => t.Key >= DateOnly.FromDateTime(DateTime.UtcNow).AddDays(-days) && t.Key <= endDate)));
}

List<Endpoint> GetAllConnectedEndpoints(string name) => endpoints.Where(w => w.SanitizedName == name).ToList();
List<Endpoint> GetAllConnectedEndpoints(string name) => endpoints.Where(w => w.SanitizedName == name || w.Id.Name == name).ToList();

public Task<BrokerMetadata> GetBrokerMetadata(CancellationToken cancellationToken) => Task.FromResult(brokerMetadata);

Expand Down
1 change: 1 addition & 0 deletions src/ProjectReferences.Transports.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<ProjectReference Include="..\ServiceControl.Transports.Msmq\ServiceControl.Transports.Msmq.csproj" ReferenceOutputAssembly="false" Private="false" SkipGetTargetFrameworkProperties="true" UndefineProperties="TargetFramework" />
<ProjectReference Include="..\ServiceControl.Transports.RabbitMQ\ServiceControl.Transports.RabbitMQ.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Transports.SqlServer\ServiceControl.Transports.SqlServer.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Transports.PostgreSql\ServiceControl.Transports.PostgreSql.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Transports.SQS\ServiceControl.Transports.SQS.csproj" ReferenceOutputAssembly="false" Private="false" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ public async Task InitializeSettings()
var transportIntegration = new ConfigureEndpointLearningTransport();

settings = new Settings(
transportType: transportIntegration.TypeName,
forwardErrorMessages: false,
errorRetentionPeriod: TimeSpan.FromDays(1),
persisterType: "RavenDB")
{
TransportType = transportIntegration.TypeName,
TransportConnectionString = transportIntegration.ConnectionString,
AssemblyLoadContextResolver = static _ => AssemblyLoadContext.Default
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ public Task<TransportInfrastructure> CreateTransportInfrastructure(string name,
OnError onError = null, Func<string, Exception, Task> onCriticalError = null,
TransportTransactionMode preferredTransactionMode = TransportTransactionMode.ReceiveOnly) =>
throw new NotImplementedException();
public string ToTransportQualifiedQueueName(string queueName) => queueName;
}
}
4 changes: 4 additions & 0 deletions src/ServiceControl.Audit/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ These settings are only here so that we can debug ServiceControl while developin
<!--<add key="ServiceControl.Audit/TransportType" value="NetStandardAzureServiceBus" />-->
<!--<add key="ServiceControl.Audit/TransportType" value="RabbitMQ.QuorumConventionalRouting" />-->
<!--<add key="ServiceControl.Audit/TransportType" value="SQLServer" />-->
<!--<add key="ServiceControl.Audit/TransportType" value="PostgreSQL" />-->

<!-- DEVS - Pick a persistence to run Auditing instance on. -->
<add key="ServiceControl.Audit/PersistenceType" value="InMemory" />
Expand Down Expand Up @@ -46,5 +47,8 @@ These settings are only here so that we can debug ServiceControl while developin

<!--SQLServer -->
<!--<add name="NServiceBus/Transport" connectionString="Data Source=;Initial Catalog=nservicebus;Integrated Security=True;Queue Schema=myschema;Subscriptions Table=tablename@schema@catalog" />-->

<!--PostgreSQL -->
<!--<add name="NServiceBus/Transport" connectionString="Server=;Database=nservicebus;Port=5432;User Id=;Password=;" />-->
</connectionStrings>
</configuration>
32 changes: 18 additions & 14 deletions src/ServiceControl.Audit/Auditing/AuditIngestor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
using Recoverability;
using SagaAudit;
using ServiceControl.Infrastructure.Metrics;
using ServiceControl.Transports;

public class AuditIngestor
{
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;

public AuditIngestor(
Metrics metrics,
Settings settings,
IAuditIngestionUnitOfWorkFactory unitOfWorkFactory,
EndpointInstanceMonitoring endpointInstanceMonitoring,
IEnumerable<IEnrichImportedAuditMessages> auditEnrichers, // allows extending message enrichers with custom enrichers registered in the DI container
IMessageSession messageSession,
Lazy<IMessageDispatcher> messageDispatcher
Lazy<IMessageDispatcher> messageDispatcher,
ITransportCustomization transportCustomization
)
{
this.settings = settings;
Expand All @@ -49,14 +49,16 @@ Lazy<IMessageDispatcher> messageDispatcher
new SagaRelationshipsEnricher()
}.Concat(auditEnrichers).ToArray();

logQueueAddress = transportCustomization.ToTransportQualifiedQueueName(settings.AuditLogQueue);

auditPersister = new AuditPersister(unitOfWorkFactory, enrichers, ingestedAuditMeter, ingestedSagaAuditMeter, auditBulkInsertDurationMeter, sagaAuditBulkInsertDurationMeter, bulkInsertCommitDurationMeter, messageSession, messageDispatcher);
}

public async Task Ingest(List<MessageContext> contexts)
{
if (log.IsDebugEnabled)
if (Log.IsDebugEnabled)
{
log.Debug($"Ingesting {contexts.Count} message contexts");
Log.Debug($"Ingesting {contexts.Count} message contexts");
}

var stored = await auditPersister.Persist(contexts);
Expand All @@ -65,14 +67,14 @@ public async Task Ingest(List<MessageContext> contexts)
{
if (settings.ForwardAuditMessages)
{
if (log.IsDebugEnabled)
if (Log.IsDebugEnabled)
{
log.Debug($"Forwarding {stored.Count} messages");
Log.Debug($"Forwarding {stored.Count} messages");
}
await Forward(stored, settings.AuditLogQueue);
if (log.IsDebugEnabled)
await Forward(stored, logQueueAddress);
if (Log.IsDebugEnabled)
{
log.Debug("Forwarded messages");
Log.Debug("Forwarded messages");
}
}

Expand All @@ -83,9 +85,9 @@ public async Task Ingest(List<MessageContext> contexts)
}
catch (Exception e)
{
if (log.IsWarnEnabled)
if (Log.IsWarnEnabled)
{
log.Warn("Forwarding messages failed", e);
Log.Warn("Forwarding messages failed", e);
}

// making sure to rethrow so that all messages get marked as failed
Expand Down Expand Up @@ -140,7 +142,7 @@ public async Task VerifyCanReachForwardingAddress()
new TransportOperation(
new OutgoingMessage(Guid.Empty.ToString("N"),
[], Array.Empty<byte>()),
new UnicastAddressTag(settings.AuditLogQueue)
new UnicastAddressTag(logQueueAddress)
)
);

Expand All @@ -155,7 +157,9 @@ public async Task VerifyCanReachForwardingAddress()
readonly AuditPersister auditPersister;
readonly Settings settings;
readonly Lazy<IMessageDispatcher> messageDispatcher;
readonly string logQueueAddress;

static readonly ILog log = LogManager.GetLogger<AuditIngestor>();
static readonly long FrequencyInMilliseconds = Stopwatch.Frequency / 1000;
static readonly ILog Log = LogManager.GetLogger<AuditIngestor>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void Configure(Settings.Settings settings, ITransportCustomization
routing.RouteToEndpoint(typeof(RegisterNewEndpoint), serviceControlLogicalQueue);
routing.RouteToEndpoint(typeof(MarkMessageFailureResolvedByRetry), serviceControlLogicalQueue);

configuration.ReportCustomChecksTo(settings.ServiceControlQueueAddress);
configuration.ReportCustomChecksTo(transportCustomization.ToTransportQualifiedQueueName(settings.ServiceControlQueueAddress));
}

configuration.GetSettings().Set(settings.LoggingSettings);
Expand Down
4 changes: 2 additions & 2 deletions src/ServiceControl.Audit/Infrastructure/Settings/Settings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public Settings(string transportType = null, string persisterType = null, Loggin
{
Hostname = SettingsReader.Read(SettingsRootNamespace, "Hostname", "localhost");
Port = SettingsReader.Read(SettingsRootNamespace, "Port", 44444);
}
};

MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", 32);
MaximumConcurrencyLevel = SettingsReader.Read(SettingsRootNamespace, "MaximumConcurrencyLevel", TransportManifestLibrary.Find(TransportType)?.DefaultAuditMaximumConcurrencyLevel ?? 32);
ServiceControlQueueAddress = SettingsReader.Read<string>(SettingsRootNamespace, "ServiceControlQueueAddress");
TimeToRestartAuditIngestionAfterFailure = GetTimeToRestartAuditIngestionAfterFailure();
EnableFullTextSearchOnBodies = SettingsReader.Read(SettingsRootNamespace, "EnableFullTextSearchOnBodies", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});

if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});

if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});

if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});

if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void Non_MSMQ_transport_is_selected(string transportInfoName)
Assert.That(viewModel.SampleConnectionString, Is.Not.Empty);
});

if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue")
if (transportInfoName is "SQLServer" or "AmazonSQS" or "AzureStorageQueue" or "PostgreSQL")
{
Assert.That(viewModel.TransportWarning, Is.Not.Null);
Assert.That(viewModel.TransportWarning, Is.Not.Empty);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void Transport_cannot_be_empty_when_adding_audit_instance()

}

[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_audit_instance(string transportInfoName)
{
var viewModel = new ServiceControlAddViewModel
Expand All @@ -126,7 +126,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}

[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void
Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_audit_instance(
string transportInfoName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void Transport_cannot_be_empty_when_adding_error_instance()
Assert.That(errors, Is.Not.Empty);
}

[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_error_instance(
string transportInfoName)
{
Expand All @@ -127,7 +127,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}

[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_error_instance(
string transportInfoName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void Transport_cannot_be_empty_when_adding_monitoring_instance()
}


[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_empty_if_sample_connection_string_is_present_when_adding_monitoring_instance(string transportInfoName)
{

Expand All @@ -384,7 +384,7 @@ public void Transport_connection_string_cannot_be_empty_if_sample_connection_str
Assert.That(errors, Is.Not.Empty);
}

[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ")]
[TestTheseTransports("AmazonSQS", "AzureServiceBus", "SQLServer", "RabbitMQ", "PostgreSQL")]
public void Transport_connection_string_cannot_be_null_if_sample_connection_string_is_present_when_adding_monitoring_instance(
string transportInfoName)
{
Expand Down
Loading

0 comments on commit 3f0261f

Please sign in to comment.