Skip to content

Commit

Permalink
Merge pull request #527 from Particular/native-pub-sub-2
Browse files Browse the repository at this point in the history
Native pub sub 2
  • Loading branch information
SzymonPobiega authored Nov 14, 2019
2 parents 79622e6 + e34af9c commit ca1d003
Show file tree
Hide file tree
Showing 73 changed files with 2,201 additions and 807 deletions.
2 changes: 1 addition & 1 deletion GitVersion.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
assembly-versioning-scheme: Major
next-version: 4.2.1
next-version: 5.0.0
branches:
master:
mode: ContinuousDeployment
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,66 @@
using NServiceBus;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Settings;
using NServiceBus.Transport;
using NServiceBus.Transport.SQLServer;

public class ConfigureEndpointSqlServerTransport : IConfigureEndpointTestExecution
{
public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings settings, PublisherMetadata publisherMetadata)
public Task Configure(string endpointName, EndpointConfiguration configuration, RunSettings runSettings, PublisherMetadata publisherMetadata)
{
queueBindings = configuration.GetSettings().Get<QueueBindings>();

settings = configuration.GetSettings();
doNotCleanNativeSubscriptions = runSettings.TryGet<bool>("DoNotCleanNativeSubscriptions", out _);
connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString");

if (string.IsNullOrEmpty(connectionString))
{
throw new Exception("The 'SqlServerTransportConnectionString' environment variable is not set.");
}

var transportConfig = configuration.UseTransport<SqlServerTransport>();
transportConfig.ConnectionString(connectionString);
transportConfig.SubscriptionSettings().DisableSubscriptionCache();

#if !NET452
transportConfig.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
#endif

var routingConfig = transportConfig.Routing();

foreach (var publisher in publisherMetadata.Publishers)
{
foreach (var eventType in publisher.Events)
{
routingConfig.RegisterPublisher(eventType, publisher.PublisherName);
}
}

return Task.FromResult(0);
}

public Task Cleanup()
{
var subscriptionSettings = settings.GetOrDefault<SubscriptionSettings>() ?? new SubscriptionSettings();
settings.TryGet(SettingsKeys.DefaultSchemaSettingsKey, out string defaultSchemaOverride);
var subscriptionTable = subscriptionSettings.SubscriptionTable.Qualify(defaultSchemaOverride ?? "dbo", "nservicebus");

using (var conn = new SqlConnection(connectionString))
{
conn.Open();

var queueAddresses = queueBindings.ReceivingAddresses.Select(QueueAddress.Parse).ToList();
foreach (var address in queueAddresses)
{
TryDeleteTable(conn, address);
TryDeleteTable(conn, new QueueAddress(address.Table + ".Delayed", address.Schema, address.Catalog));
TryDeleteTable(conn, address.QualifiedTableName);
TryDeleteTable(conn, new QueueAddress(address.Table + ".Delayed", address.Schema, address.Catalog).QualifiedTableName);
}

if (!doNotCleanNativeSubscriptions)
{
TryDeleteTable(conn, subscriptionTable.QuotedQualifiedName);
}
}
return Task.FromResult(0);
}

static void TryDeleteTable(SqlConnection conn, QueueAddress address)
static void TryDeleteTable(SqlConnection conn, string address)
{
try
{
using (var comm = conn.CreateCommand())
{
comm.CommandText = $"IF OBJECT_ID('{address.QualifiedTableName}', 'U') IS NOT NULL DROP TABLE {address.QualifiedTableName}";
comm.CommandText = $"IF OBJECT_ID('{address}', 'U') IS NOT NULL DROP TABLE {address}";
comm.ExecuteNonQuery();
}
}
Expand All @@ -74,8 +77,10 @@ static void TryDeleteTable(SqlConnection conn, QueueAddress address)
}
}

bool doNotCleanNativeSubscriptions;
string connectionString;
QueueBindings queueBindings;
SettingsHolder settings;

class QueueAddress
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using NServiceBus;
using NServiceBus.Configuration.AdvancedExtensibility;

public static class EndpointConfigurationExtensions
{
public static TransportExtensions<SqlServerTransport> ConfigureSqlServerTransport(this EndpointConfiguration endpointConfiguration)
{
return new TransportExtensions<SqlServerTransport>(endpointConfiguration.GetSettings());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
namespace NServiceBus.SqlServer.AcceptanceTests.MultiCatalog
{
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using Configuration.AdvancedExtensibility;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using Transport.SQLServer;

public class When_custom_catalog_configured_for_legacy_publisher : MultiCatalogAcceptanceTest
{
static string PublisherConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus1");
static string SubscriberConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus2");
static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(LegacyPublisher));

[Test]
public Task Should_receive_event()
{
return Scenario.Define<Context>()
.WithEndpoint<LegacyPublisher>(b => b.When(c => c.Subscribed, session => session.Publish(new Event())))
.WithEndpoint<Subscriber>(b => b.When(c => c.EndpointsStarted, s => s.Subscribe(typeof(Event))))
.Done(c => c.EventReceived)
.Run();
}

class Context : ScenarioContext
{
public bool EventReceived { get; set; }
public bool Subscribed { get; set; }
}

class LegacyPublisher : EndpointConfigurationBuilder
{
public LegacyPublisher()
{
EndpointSetup<DefaultPublisher>(c =>
{
var transport = c.UseTransport<SqlServerTransport>();
transport.ConnectionString(PublisherConnectionString);

transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus");
transport.SubscriptionSettings().DisableSubscriptionCache();

c.GetSettings().Set("SqlServer.DisableNativePubSub", true);
c.OnEndpointSubscribed<Context>((s, context) =>
{
if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(Subscriber))))
{
context.Subscribed = true;
}
});
});
}
}

class Subscriber : EndpointConfigurationBuilder
{
public Subscriber()
{
EndpointSetup<DefaultServer>(b =>
{
var transport = b.UseTransport<SqlServerTransport>();
transport.ConnectionString(SubscriberConnectionString);

transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus");

transport.UseCatalogForEndpoint(PublisherEndpoint, "nservicebus1");
transport.EnableMessageDrivenPubSubCompatibilityMode().RegisterPublisher(typeof(Event), PublisherEndpoint);
});
}

class EventHandler : IHandleMessages<Event>
{
public Context Context { get; set; }

public Task Handle(Event message, IMessageHandlerContext context)
{
Context.EventReceived = true;
return Task.FromResult(0);
}
}
}

public class Event : IEvent
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace NServiceBus.SqlServer.AcceptanceTests.MultiCatalog
{
using System.Threading.Tasks;
using AcceptanceTesting;
using Features;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using Transport.SQLServer;

public class When_custom_catalog_configured_for_publisher_and_subscriber : MultiCatalogAcceptanceTest
{
static string PublisherConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus1");
static string SubscriberConnectionString => WithCustomCatalog(GetDefaultConnectionString(), "nservicebus2");

[Test]
public Task Should_receive_event()
{
return Scenario.Define<Context>()
.WithEndpoint<Publisher>(b => b.When(c => c.Subscribed, session => session.Publish(new Event())))
.WithEndpoint<Subscriber>(b => b.When(c => c.EndpointsStarted, async (s, ctx) =>
{
await s.Subscribe(typeof(Event)).ConfigureAwait(false);
ctx.Subscribed = true;
}))
.Done(c => c.EventReceived)
.Run();
}

class Context : ScenarioContext
{
public bool EventReceived { get; set; }
public bool Subscribed { get; set; }
}

class Publisher : EndpointConfigurationBuilder
{
public Publisher()
{
EndpointSetup<DefaultPublisher>(b =>
{
var transport = b.UseTransport<SqlServerTransport>();
transport.ConnectionString(PublisherConnectionString);

transport.SubscriptionSettings().DisableSubscriptionCache();
transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus");
});
}
}

class Subscriber : EndpointConfigurationBuilder
{
public Subscriber()
{
EndpointSetup<DefaultServer>(c =>
{
var transport = c.UseTransport<SqlServerTransport>();

transport.ConnectionString(SubscriberConnectionString);
transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo", "nservicebus");

c.DisableFeature<AutoSubscribe>();
});
}

class EventHandler : IHandleMessages<Event>
{
public Context Context { get; set; }

public Task Handle(Event message, IMessageHandlerContext context)
{
Context.EventReceived = true;
return Task.FromResult(0);
}
}
}

public class Event : IEvent
{
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
namespace NServiceBus.SqlServer.AcceptanceTests.MultiSchema
{
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using Configuration.AdvancedExtensibility;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using Transport.SQLServer;

public class When_custom_schema_configured_for_legacy_publisher : NServiceBusAcceptanceTest
{
[Test]
public Task Should_receive_event()
{
return Scenario.Define<Context>()
.WithEndpoint<LegacyPublisher>(b => b.When(c => c.Subscribed, session => session.Publish(new Event())))
.WithEndpoint<Subscriber>(b => b.When(c => c.EndpointsStarted, s => s.Subscribe(typeof(Event))))
.Done(c => c.EventReceived)
.Run();
}

class Context : ScenarioContext
{
public bool EventReceived { get; set; }
public bool Subscribed { get; set; }
}

class LegacyPublisher : EndpointConfigurationBuilder
{
public LegacyPublisher()
{
EndpointSetup<DefaultPublisher>(c =>
{
var transport = c.UseTransport<SqlServerTransport>();
transport.DefaultSchema("sender");
transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo");
transport.SubscriptionSettings().DisableSubscriptionCache();

c.GetSettings().Set("SqlServer.DisableNativePubSub", true);
c.OnEndpointSubscribed<Context>((s, context) =>
{
if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(Subscriber))))
{
context.Subscribed = true;
}
});
});
}
}

class Subscriber : EndpointConfigurationBuilder
{
public Subscriber()
{
EndpointSetup<DefaultServer>(b =>
{
var publisherEndpoint = Conventions.EndpointNamingConvention(typeof(LegacyPublisher));

var transport = b.UseTransport<SqlServerTransport>();
transport.DefaultSchema("receiver").UseSchemaForEndpoint(publisherEndpoint, "sender");
transport.SubscriptionSettings().SubscriptionTableName("SubscriptionRouting", "dbo");

transport.EnableMessageDrivenPubSubCompatibilityMode().RegisterPublisher(typeof(Event), Conventions.EndpointNamingConvention(typeof(LegacyPublisher)));
});
}

class EventHandler : IHandleMessages<Event>
{
public Context Context { get; set; }

public Task Handle(Event message, IMessageHandlerContext context)
{
Context.EventReceived = true;
return Task.FromResult(0);
}
}
}

public class Event : IEvent
{
}
}
}
Loading

0 comments on commit ca1d003

Please sign in to comment.