Skip to content

Commit

Permalink
ClientBase新增GetAsync,让Logout/Upgrade优先使用GetAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Jun 28, 2024
1 parent 1a6bf84 commit a4e13eb
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 41 deletions.
115 changes: 82 additions & 33 deletions NewLife.Remoting/Clients/ClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Reflection;
using NewLife.Caching;
Expand Down Expand Up @@ -35,7 +36,7 @@ namespace NewLife.Remoting.Clients;
public abstract class ClientBase : DisposeBase, IApiClient, ICommandClient, IEventProvider, ITracerFeature, ILogFeature
{
#region 属性
/// <summary>服务端地址。支持http/tcp/udp,多地址逗号分隔</summary>
/// <summary>服务端地址。支持http/tcp/udp,支持客户端负载均衡,多地址逗号分隔</summary>
public String? Server { get; set; }

/// <summary>编码。设备编码DeviceCode,或应用标识AppId</summary>
Expand Down Expand Up @@ -220,12 +221,12 @@ class MyApiClient : ApiClient
protected override async Task<Object?> OnLoginAsync(ISocketClient client, Boolean force) => await InvokeWithClientAsync<Object>(client, Client.Actions[Features.Login], Client.BuildLoginRequest());
}

/// <summary>异步调用</summary>
/// <summary>异步调用。HTTP默认POST,自动识别GET</summary>
/// <param name="action">动作</param>
/// <param name="args">参数</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public virtual async Task<TResult> OnInvokeAsync<TResult>(String action, Object? args, CancellationToken cancellationToken)
protected virtual async Task<TResult> OnInvokeAsync<TResult>(String action, Object? args, CancellationToken cancellationToken)
{
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]=>{1}", action, args?.ToJson());

Expand All @@ -234,9 +235,9 @@ public virtual async Task<TResult> OnInvokeAsync<TResult>(String action, Object?
TResult? rs = default;
if (_client is ApiHttpClient http)
{
var method = System.Net.Http.HttpMethod.Post;
var method = HttpMethod.Post;
if (args == null || args.GetType().IsBaseType() || action.StartsWithIgnoreCase("Get") || action.ToLower().Contains("/get"))
method = System.Net.Http.HttpMethod.Get;
method = HttpMethod.Get;

rs = await http.InvokeAsync<TResult>(method, action, args, null, cancellationToken);
}
Expand All @@ -248,6 +249,33 @@ public virtual async Task<TResult> OnInvokeAsync<TResult>(String action, Object?
return rs!;
}

/// <summary>异步Get调用(仅用于HTTP)</summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="action"></param>
/// <param name="args"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <exception cref="NotSupportedException"></exception>
protected virtual async Task<TResult> GetAsync<TResult>(String action, Object? args, CancellationToken cancellationToken = default)
{
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]=>{1}", action, args?.ToJson());

Init();

if (_client is not ApiHttpClient http) throw new NotSupportedException();

// 验证登录
var needLogin = !Actions[Features.Login].EqualIgnoreCase(action);
if (!Logined && needLogin && Features.HasFlag(Features.Login)) await Login();

// GET请求
var rs = await http.InvokeAsync<TResult>(HttpMethod.Get, action, args, null, cancellationToken);

if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]<={1}", action, rs?.ToJson());

return rs!;
}

/// <summary>远程调用拦截,支持重新登录</summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="action"></param>
Expand Down Expand Up @@ -307,7 +335,7 @@ protected virtual void SetToken(String? token)
#region 登录注销
/// <summary>登录</summary>
/// <returns></returns>
public virtual async Task<ILoginResponse?> Login()
public virtual async Task<ILoginResponse?> Login(CancellationToken cancellationToken = default)
{
Init();

Expand All @@ -321,7 +349,7 @@ protected virtual void SetToken(String? token)
SetToken(null);
Logined = false;

var rs = await LoginAsync(request);
var rs = await LoginAsync(request, cancellationToken);
if (rs == null) return null;

if (!rs.Code.IsNullOrEmpty() && !rs.Secret.IsNullOrEmpty())
Expand Down Expand Up @@ -404,8 +432,9 @@ public virtual ILoginRequest BuildLoginRequest()

/// <summary>注销</summary>
/// <param name="reason"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public virtual async Task<ILogoutResponse?> Logout(String? reason)
public virtual async Task<ILogoutResponse?> Logout(String? reason, CancellationToken cancellationToken = default)
{
if (!Logined) return null;

Expand All @@ -414,7 +443,7 @@ public virtual ILoginRequest BuildLoginRequest()

try
{
var rs = await LogoutAsync(reason);
var rs = await LogoutAsync(reason, cancellationToken);

// 更新令牌
SetToken(rs?.Token);
Expand All @@ -436,18 +465,25 @@ public virtual ILoginRequest BuildLoginRequest()

/// <summary>登录</summary>
/// <param name="request">登录信息</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected virtual Task<ILoginResponse?> LoginAsync(ILoginRequest request) => InvokeAsync<ILoginResponse>(Actions[Features.Login], request);
protected virtual Task<ILoginResponse?> LoginAsync(ILoginRequest request, CancellationToken cancellationToken) => InvokeAsync<ILoginResponse>(Actions[Features.Login], request, cancellationToken);

/// <summary>注销</summary>
/// <returns></returns>
protected virtual Task<ILogoutResponse?> LogoutAsync(String reason) => InvokeAsync<ILogoutResponse>(Actions[Features.Logout], new { reason });
protected virtual async Task<ILogoutResponse?> LogoutAsync(String? reason, CancellationToken cancellationToken)
{
if (_client is ApiHttpClient)
return await GetAsync<ILogoutResponse>(Actions[Features.Logout], new { reason }, cancellationToken);

return await InvokeAsync<ILogoutResponse>(Actions[Features.Logout], new { reason }, cancellationToken);
}
#endregion

#region 心跳
#region 心跳保活
/// <summary>心跳</summary>
/// <returns></returns>
public virtual async Task<IPingResponse?> Ping()
public virtual async Task<IPingResponse?> Ping(CancellationToken cancellationToken = default)
{
Init();

Expand All @@ -467,7 +503,7 @@ public virtual ILoginRequest BuildLoginRequest()
IPingResponse? rs = null;
try
{
rs = await PingAsync(request);
rs = await PingAsync(request, cancellationToken);
if (rs != null)
{
// 由服务器改变采样频率
Expand All @@ -483,7 +519,7 @@ public virtual ILoginRequest BuildLoginRequest()
{
foreach (var model in rs.Commands)
{
await ReceiveCommand(model, "Pong");
await ReceiveCommand(model, "Pong", cancellationToken);
}
}
}
Expand All @@ -498,7 +534,7 @@ public virtual ILoginRequest BuildLoginRequest()
// 上报正常,处理历史,失败则丢弃
while (_fails.TryDequeue(out var info))
{
await PingAsync(info);
await PingAsync(info, cancellationToken);
}

return rs;
Expand All @@ -511,7 +547,7 @@ public virtual ILoginRequest BuildLoginRequest()
if (ex2 is ApiException aex && (aex.Code == ApiCode.Unauthorized || aex.Code == ApiCode.Forbidden) && Features.HasFlag(Features.Login))
{
WriteLog("重新登录");
await Login();
await Login(cancellationToken);

return null;
}
Expand Down Expand Up @@ -564,8 +600,9 @@ public virtual IPingRequest BuildPingRequest()

/// <summary>心跳</summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected virtual Task<IPingResponse?> PingAsync(IPingRequest request) => InvokeAsync<IPingResponse>(Actions[Features.Ping], request);
protected virtual Task<IPingResponse?> PingAsync(IPingRequest request, CancellationToken cancellationToken) => InvokeAsync<IPingResponse>(Actions[Features.Ping], request, cancellationToken);
#endregion

#region 下行通知
Expand Down Expand Up @@ -636,8 +673,9 @@ protected virtual async Task OnPing(Object state)
/// <summary>收到命令</summary>
/// <param name="model"></param>
/// <param name="source"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task ReceiveCommand(CommandModel model, String source)
public async Task ReceiveCommand(CommandModel model, String source, CancellationToken cancellationToken = default)
{
if (model == null) return;

Expand All @@ -662,7 +700,7 @@ public async Task ReceiveCommand(CommandModel model, String source)
{
TimerX.Delay(s =>
{
_ = OnReceiveCommand(model);
_ = OnReceiveCommand(model, CancellationToken.None);
}, (Int32)ts.TotalMilliseconds);

var reply = new CommandReplyModel
Expand All @@ -671,15 +709,15 @@ public async Task ReceiveCommand(CommandModel model, String source)
Status = CommandStatus.处理中,
Data = $"已安排计划执行 {startTime.ToFullString()}"
};
await CommandReply(reply);
await CommandReply(reply, cancellationToken);
}
else
await OnReceiveCommand(model);
await OnReceiveCommand(model, cancellationToken);
}
else
{
var reply = new CommandReplyModel { Id = model.Id, Status = CommandStatus.取消 };
await CommandReply(reply);
await CommandReply(reply, cancellationToken);
}
}
catch (Exception ex)
Expand All @@ -690,30 +728,35 @@ public async Task ReceiveCommand(CommandModel model, String source)

/// <summary>触发收到命令的动作</summary>
/// <param name="model"></param>
protected virtual async Task OnReceiveCommand(CommandModel model)
/// <param name="cancellationToken"></param>
protected virtual async Task<CommandReplyModel?> OnReceiveCommand(CommandModel model, CancellationToken cancellationToken)
{
var e = new CommandEventArgs { Model = model };
Received?.Invoke(this, e);

var rs = await this.ExecuteCommand(model);
var rs = await this.ExecuteCommand(model, cancellationToken);
e.Reply ??= rs;

if (e.Reply != null && e.Reply.Id > 0) await CommandReply(e.Reply);
if (e.Reply != null && e.Reply.Id > 0) await CommandReply(e.Reply, cancellationToken);

return e.Reply;
}

/// <summary>向命令引擎发送命令,触发指定已注册动作</summary>
/// <param name="command"></param>
/// <param name="argument"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task SendCommand(String command, String argument) => await OnReceiveCommand(new CommandModel { Command = command, Argument = argument });
public virtual async Task<CommandReplyModel?> SendCommand(String command, String argument, CancellationToken cancellationToken = default) => await OnReceiveCommand(new CommandModel { Command = command, Argument = argument }, cancellationToken);

/// <summary>上报命令调用结果</summary>
/// <param name="model"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public virtual Task<Object?> CommandReply(CommandReplyModel model) => InvokeAsync<Object>(Actions[Features.CommandReply], model);
public virtual Task<Object?> CommandReply(CommandReplyModel model, CancellationToken cancellationToken = default) => InvokeAsync<Object>(Actions[Features.CommandReply], model, cancellationToken);
#endregion

#region 上报
#region 事件上报
private readonly ConcurrentQueue<EventModel> _events = new();
private readonly ConcurrentQueue<EventModel> _failEvents = new();
private TimerX? _eventTimer;
Expand Down Expand Up @@ -800,7 +843,7 @@ public virtual Boolean WriteEvent(String type, String name, String? remark)
private String? _lastVersion;
/// <summary>获取更新信息</summary>
/// <returns></returns>
public async Task<IUpgradeInfo?> Upgrade(String? channel)
public virtual async Task<IUpgradeInfo?> Upgrade(String? channel, CancellationToken cancellationToken = default)
{
using var span = Tracer?.NewSpan(nameof(Upgrade));
WriteLog("检查更新");
Expand All @@ -809,13 +852,13 @@ public virtual Boolean WriteEvent(String type, String name, String? remark)
var ug = new Upgrade { Log = XTrace.Log };
ug.DeleteBackup(".");

var info = await UpgradeAsync(channel);
var info = await UpgradeAsync(channel, cancellationToken);
if (info != null && info.Version != _lastVersion)
{
WriteLog("发现更新:{0}", info.ToJson(true));

ug.Url = info.Source;
await ug.Download();
await ug.Download(cancellationToken);

// 检查文件完整性
if (info.FileHash.IsNullOrEmpty() || ug.CheckFileHash(info.FileHash))
Expand Down Expand Up @@ -847,7 +890,13 @@ protected virtual void Restart(Upgrade upgrade)

/// <summary>更新</summary>
/// <returns></returns>
protected virtual Task<IUpgradeInfo?> UpgradeAsync(String? channel) => InvokeAsync<IUpgradeInfo>(Actions[Features.Upgrade], new { channel });
protected virtual async Task<IUpgradeInfo?> UpgradeAsync(String? channel, CancellationToken cancellationToken)
{
if (_client is ApiHttpClient)
return await GetAsync<IUpgradeInfo>(Actions[Features.Upgrade], new { channel }, cancellationToken);

return await InvokeAsync<IUpgradeInfo>(Actions[Features.Upgrade], new { channel }, cancellationToken);
}
#endregion

#region 辅助
Expand Down
23 changes: 20 additions & 3 deletions NewLife.Remoting/Clients/ICommandClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ public static void RegisterCommand(this ICommandClient client, String command, F
client.Commands[command] = method;
}

/// <summary>
/// 注册服务。收到平台下发的服务调用时,执行注册的方法
/// </summary>
/// <param name="client">命令客户端</param>
/// <param name="command"></param>
/// <param name="method"></param>
/// <exception cref="ArgumentNullException"></exception>
public static void RegisterCommand(this ICommandClient client, String command, Func<CommandModel, CancellationToken, Task<CommandReplyModel>> method)
{
if (command.IsNullOrEmpty()) command = method.Method.Name;

client.Commands[command] = method;
}

/// <summary>
/// 注册服务。收到平台下发的服务调用时,执行注册的方法
/// </summary>
Expand All @@ -90,14 +104,15 @@ public static void RegisterCommand(this ICommandClient client, String command, A
/// <summary>执行命令</summary>
/// <param name="client">命令客户端</param>
/// <param name="model"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public static async Task<CommandReplyModel> ExecuteCommand(this ICommandClient client, CommandModel model)
public static async Task<CommandReplyModel> ExecuteCommand(this ICommandClient client, CommandModel model, CancellationToken cancellationToken = default)
{
using var span = DefaultTracer.Instance?.NewSpan("ExecuteCommand", $"{model.Command}({model.Argument})");
var rs = new CommandReplyModel { Id = model.Id, Status = CommandStatus.已完成 };
try
{
var result = await OnCommand(client, model);
var result = await OnCommand(client, model, cancellationToken);
if (result is CommandReplyModel reply)
{
reply.Id = model.Id;
Expand Down Expand Up @@ -129,7 +144,8 @@ public static async Task<CommandReplyModel> ExecuteCommand(this ICommandClient c
/// <summary>分发执行服务</summary>
/// <param name="client">命令客户端</param>
/// <param name="model"></param>
private static async Task<Object?> OnCommand(ICommandClient client, CommandModel model)
/// <param name="cancellationToken"></param>
private static async Task<Object?> OnCommand(ICommandClient client, CommandModel model, CancellationToken cancellationToken)
{
//WriteLog("OnCommand {0}", model.ToJson());

Expand All @@ -139,6 +155,7 @@ public static async Task<CommandReplyModel> ExecuteCommand(this ICommandClient c
if (d is Func<String?, Task<String?>> func1) return await func1(model.Argument);
//if (d is Func<String, Task<Object>> func2) return await func2(model.Argument);
if (d is Func<CommandModel, Task<CommandReplyModel>> func3) return await func3(model);
if (d is Func<CommandModel, CancellationToken, Task<CommandReplyModel>> func4) return await func4(model, cancellationToken);

if (d is Action<CommandModel> func21) func21(model);

Expand Down
Loading

0 comments on commit a4e13eb

Please sign in to comment.