Skip to content

Commit

Permalink
Add support CustomHeadersBuilder option for NATS. (#1519)
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed May 15, 2024
1 parent f9091f2 commit b0ee9ac
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
6 changes: 6 additions & 0 deletions src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using NATS.Client;
using NATS.Client.JetStream;

Expand Down Expand Up @@ -33,6 +34,11 @@ public class NATSOptions

public Action<ConsumerConfiguration.ConsumerConfigurationBuilder>? ConsumerOptions { get; set; }

/// <summary>
/// If you need to get additional native delivery args, you can use this function to write into <see cref="CapHeader" />.
/// </summary>
public Func<MsgHandlerEventArgs, IServiceProvider, List<KeyValuePair<string, string>>>? CustomHeadersBuilder { get; set; }

public Func<string, string> NormalizeStreamName { get; set; } = origin => origin.Split('.')[0];
}
}
13 changes: 12 additions & 1 deletion src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ internal sealed class NATSConsumerClient : IConsumerClient
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

private readonly string _groupId;
private readonly IServiceProvider _serviceProvider;
private readonly NATSOptions _natsOptions;

private IConnection? _consumerClient;

public NATSConsumerClient(string groupId, IOptions<NATSOptions> options)
public NATSConsumerClient(string groupId, IOptions<NATSOptions> options, IServiceProvider serviceProvider)
{
_groupId = groupId;
_serviceProvider = serviceProvider;
_natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}

Expand Down Expand Up @@ -142,6 +144,15 @@ private void SubscriptionMessageHandler(object? sender, MsgHandlerEventArgs e)

headers.Add(Headers.Group, _groupId);

if (_natsOptions.CustomHeadersBuilder != null)
{
var customHeaders = _natsOptions.CustomHeadersBuilder(e, _serviceProvider);
foreach (var customHeader in customHeaders)
{
headers[customHeader.Key] = customHeader.Value;
}
}

OnMessageCallback!(new TransportMessage(headers, e.Message.Data), e.Message);
}

Expand Down
7 changes: 5 additions & 2 deletions src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;

Expand All @@ -9,17 +10,19 @@ namespace DotNetCore.CAP.NATS
internal sealed class NATSConsumerClientFactory : IConsumerClientFactory
{
private readonly IOptions<NATSOptions> _natsOptions;
private readonly IServiceProvider _serviceProvider;

public NATSConsumerClientFactory(IOptions<NATSOptions> natsOptions)
public NATSConsumerClientFactory(IOptions<NATSOptions> natsOptions, IServiceProvider serviceProvider)
{
_natsOptions = natsOptions;
_serviceProvider = serviceProvider;
}

public IConsumerClient Create(string groupId)
{
try
{
var client = new NATSConsumerClient(groupId, _natsOptions);
var client = new NATSConsumerClient(groupId, _natsOptions, _serviceProvider);
client.Connect();
return client;
}
Expand Down

0 comments on commit b0ee9ac

Please sign in to comment.