Skip to content

Commit

Permalink
Add NATS option to disable dynamic subject creation for consumers. (#…
Browse files Browse the repository at this point in the history
…1556)

* Added NATS option to disable dynamic subject creation for consumers.

* Added NATS option to disable dynamic subject creation for consumers.

* Added connect error if stream does not exist to mark the consumer register as unhealthy to allow subscribe retries in case the stream has not been configured before the client is started.
  • Loading branch information
davidterins authored Jul 1, 2024
1 parent 387f4fb commit 7b6b033
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
5 changes: 5 additions & 0 deletions src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public class NATSOptions
/// </summary>
public int ConnectionPoolSize { get; set; } = 10;

/// <summary>
/// Allows a nats consumer client to dynamically create a stream and configure the expected subjects on the stream. Defaults to true.
/// </summary>
public bool EnableSubscriberClientStreamAndSubjectCreation { get; set; } = true;

/// <summary>
/// Used to setup all NATs client options
/// </summary>
Expand Down
54 changes: 31 additions & 23 deletions src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,37 +43,40 @@ public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions<NATSO

public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
{
Connect();

var jsm = _consumerClient!.CreateJetStreamManagementContext();

var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x));

foreach (var subjectStream in streamGroup)
if (_natsOptions.EnableSubscriberClientStreamAndSubjectCreation)
{
var builder = StreamConfiguration.Builder()
.WithName(subjectStream.Key)
.WithNoAck(false)
.WithStorageType(StorageType.Memory)
.WithSubjects(subjectStream.ToList());
Connect();

_natsOptions.StreamOptions?.Invoke(builder);
var jsm = _consumerClient!.CreateJetStreamManagementContext();

try
{
jsm.GetStreamInfo(subjectStream.Key); // this throws if the stream does not exist
var streamSubjectsGroups = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x));

jsm.UpdateStream(builder.Build());
}
catch (NATSJetStreamException)
foreach (var streamSubjectsGroup in streamSubjectsGroups)
{
var builder = StreamConfiguration.Builder()
.WithName(streamSubjectsGroup.Key)
.WithNoAck(false)
.WithStorageType(StorageType.Memory)
.WithSubjects(streamSubjectsGroup.ToList());

_natsOptions.StreamOptions?.Invoke(builder);

try
{
jsm.AddStream(builder.Build());
jsm.GetStreamInfo(streamSubjectsGroup.Key); // this throws if the stream does not exist

jsm.UpdateStream(builder.Build());
}
catch
catch (NATSJetStreamException)
{
// ignored
try
{
jsm.AddStream(builder.Build());
}
catch
{
// ignored
}
}
}
}
Expand Down Expand Up @@ -120,7 +123,12 @@ public void Subscribe(IEnumerable<string> topics)
}
catch (Exception e)
{
Console.WriteLine(e);
OnLogCallback!(new LogMessageEventArgs()
{
LogType = MqLogType.ConnectError,
Reason = $"An error was encountered when attempting to subscribe to subject: {subject}.{Environment.NewLine}" +
$"{e.Message}"
});
}
}
}
Expand Down

0 comments on commit 7b6b033

Please sign in to comment.