diff --git a/docs/content/user-guide/en/cap/messaging.md b/docs/content/user-guide/en/cap/messaging.md index 4f800f92a..504d6dae9 100644 --- a/docs/content/user-guide/en/cap/messaging.md +++ b/docs/content/user-guide/en/cap/messaging.md @@ -54,6 +54,35 @@ public object DeductProductQty(JsonElement param) } ``` +### Controlling callback response + +You can inject the `CapHeader` parameter in the subscription method using the `[FromCap]` attribute and utilize its methods to add extra headers to the callback context or terminate the callback. + +Example: + +```cs +[CapSubscribe("place.order.qty.deducted")] +public object DeductProductQty(JsonElement param, [FromCap] CapHeader header) +{ + var orderId = param.GetProperty("OrderId").GetInt32(); + var productId = param.GetProperty("ProductId").GetInt32(); + var qty = param.GetProperty("Qty").GetInt32(); + + // Add additional headers to the response message + header.AddResponseHeader("some-message-info", "this is the test"); + // Or add a callback to the response + header.AddResponseHeader(DotNetCore.CAP.Messages.Headers.CallbackName, "place.order.qty.deducted-callback"); + + // If you no longer want to follow the sender's specified callback and want to modify it, use the RewriteCallback method. + header.RewriteCallback("new-callback-name"); + + // If you want to terminate/stop, or no longer respond to the sender, call RemoveCallback to remove the callback. + header.RemoveCallback(); + + return new { OrderId = orderId, IsSuccess = true }; +} +``` + ## Heterogeneous system integration In version 3.0+, we reconstructed the message structure. We used the Header in the message protocol in the message queue to transmit some additional information, so that we can do it in the Body without modifying or packaging the user’s original The message data format and content are sent. diff --git a/docs/content/user-guide/zh/cap/messaging.md b/docs/content/user-guide/zh/cap/messaging.md index a54f0c707..b3ab4df41 100644 --- a/docs/content/user-guide/zh/cap/messaging.md +++ b/docs/content/user-guide/zh/cap/messaging.md @@ -57,6 +57,35 @@ public object DeductProductQty(JsonElement param) } ``` +### 控制回调响应 + +你可以通过 `[FromCap]` 标记在订阅方法中注入 `CapHeader` 参数,并利用其提供的方法来向回调上下文中添加额外的头信息或者终止回调。 + +示例如下: + +```cs +[CapSubscribe("place.order.qty.deducted")] +public object DeductProductQty(JsonElement param, [FromCap] CapHeader header) +{ + var orderId = param.GetProperty("OrderId").GetInt32(); + var productId = param.GetProperty("ProductId").GetInt32(); + var qty = param.GetProperty("Qty").GetInt32(); + + // 添加额外的头信息到响应消息中 + header.AddResponseHeader("some-message-info", "this is the test"); + // 或再次添加回调的回调 + header.AddResponseHeader(DotNetCore.CAP.Messages.Headers.CallbackName, "place.order.qty.deducted-callback"); + + // 如果你不再遵从发送着指定的回调,想修改回调,可通过 RewriteCallback 方法修改。 + header.RewriteCallback("new-callback-name"); + + // 如果你想终止/停止,或不再给发送方响应,调用 RemoveCallback 来移除回调。 + header.RemoveCallback(); + + return new { OrderId = orderId, IsSuccess = true }; +} +``` + ## 异构系统集成 在 3.0+ 版本中,我们对消息结构进行了重构,我们利用了消息队列中消息协议中的 Header 来传输一些额外信息,以便于在 Body 中我们可以做到不需要修改或包装使用者的原始消息数据格式和内容进行发送。 diff --git a/src/DotNetCore.CAP/CAP.Attribute.cs b/src/DotNetCore.CAP/CAP.Attribute.cs index d3347d244..ed0f06143 100644 --- a/src/DotNetCore.CAP/CAP.Attribute.cs +++ b/src/DotNetCore.CAP/CAP.Attribute.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Collections.ObjectModel; using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; // ReSharper disable once CheckNamespace namespace DotNetCore.CAP; @@ -29,7 +30,37 @@ public class FromCapAttribute : Attribute public class CapHeader : ReadOnlyDictionary { + internal IDictionary? ResponseHeader { get; set; } + public CapHeader(IDictionary dictionary) : base(dictionary) { } + + /// + /// When a callbackName is specified from publish message, use this method to add an additional header. + /// + /// The response header key. + /// The response header value. + public void AddResponseHeader(string key, string? value) + { + ResponseHeader ??= new Dictionary(); + ResponseHeader[key] = value; + } + + /// + /// When a callbackName is specified from publish message, use this method to abort the callback. + /// + public void RemoveCallback() + { + Dictionary.Remove(Headers.CallbackName); + } + + /// + /// When a callbackName is specified from Publish message, use this method to rewrite the callback name. + /// + /// + public void RewriteCallback(string callbackName) + { + Dictionary[Headers.CallbackName] = callbackName; + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ConsumerExecutedResult.cs b/src/DotNetCore.CAP/Internal/ConsumerExecutedResult.cs index 987a41b5e..8fa4e141a 100644 --- a/src/DotNetCore.CAP/Internal/ConsumerExecutedResult.cs +++ b/src/DotNetCore.CAP/Internal/ConsumerExecutedResult.cs @@ -1,15 +1,18 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System.Collections.Generic; + namespace DotNetCore.CAP.Internal; public class ConsumerExecutedResult { - public ConsumerExecutedResult(object? result, string msgId, string? callbackName) + public ConsumerExecutedResult(object? result, string msgId, string? callbackName, IDictionary? callbackHeader) { Result = result; MessageId = msgId; CallbackName = callbackName; + CallbackHeader = callbackHeader; } public object? Result { get; set; } @@ -17,4 +20,6 @@ public ConsumerExecutedResult(object? result, string msgId, string? callbackName public string MessageId { get; set; } public string? CallbackName { get; set; } + + public IDictionary? CallbackHeader { get; set; } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs index 8169f0c58..32c30901e 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs @@ -185,17 +185,15 @@ private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExec if (!string.IsNullOrEmpty(ret.CallbackName)) { - var header = new Dictionary - { - [Headers.CorrelationId] = message.Origin.GetId(), - [Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString() - }; + ret.CallbackHeader ??= new Dictionary(); + ret.CallbackHeader[Headers.CorrelationId] = message.Origin.GetId(); + ret.CallbackHeader[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString(); - if(message.Origin.Headers.TryGetValue(Headers.TraceParent, out var traceparent)) - header[Headers.TraceParent] = traceparent; + if (message.Origin.Headers.TryGetValue(Headers.TraceParent, out var traceparent)) + ret.CallbackHeader[Headers.TraceParent] = traceparent; await _provider.GetRequiredService() - .PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken).ConfigureAwait(false); + .PublishAsync(ret.CallbackName, ret.Result, ret.CallbackHeader, cancellationToken).ConfigureAwait(false); } } catch (OperationCanceledException) diff --git a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs index 299fc7dca..e7d91dfc2 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs @@ -50,12 +50,14 @@ public async Task InvokeAsync(ConsumerContext context, var message = context.DeliverMessage; var parameterDescriptors = context.ConsumerDescriptor.Parameters; var executeParameters = new object?[parameterDescriptors.Count]; + var headerIndex = 0; for (var i = 0; i < parameterDescriptors.Count; i++) { var parameterDescriptor = parameterDescriptors[i]; if (parameterDescriptor.IsFromCap) { executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken); + headerIndex = i; } else { @@ -123,7 +125,7 @@ public async Task InvokeAsync(ConsumerContext context, } } - return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName()); + return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName(), (executeParameters[headerIndex] as CapHeader)?.ResponseHeader); } private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message,