Skip to content

Commit

Permalink
合并
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Apr 25, 2024
2 parents 91e5375 + fb36f20 commit 6519e05
Show file tree
Hide file tree
Showing 15 changed files with 725 additions and 224 deletions.
4 changes: 2 additions & 2 deletions NewLife.Remoting/ApiAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ApiAction
/// <summary>实例化</summary>
public ApiAction(MethodInfo method, Type type)
{
if (type == null) type = method.DeclaringType;
if (type == null) type = method.DeclaringType!;
Name = GetName(type, method);

// 必须同时记录类型和方法,因为有些方法位于继承的不同层次,那样会导致实例化的对象不一致
Expand All @@ -67,7 +67,7 @@ public ApiAction(MethodInfo method, Type type)
/// <returns></returns>
public static String GetName(Type? type, MethodInfo method)
{
if (type == null) type = method.DeclaringType;
if (type == null) type = method.DeclaringType!;
//if (type == null) return null;

var typeName = type.Name.TrimEnd("Controller", "Service");
Expand Down
22 changes: 12 additions & 10 deletions NewLife.Remoting/ApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ protected virtual ICluster<String, ISocketClient> InitCluster()
{
var cluster = Cluster;
cluster ??= UsePool ?
new ClientPoolCluster { Log = Log } :
new ClientSingleCluster { Log = Log };
new ClientPoolCluster<ISocketClient> { Log = Log } :
new ClientSingleCluster<ISocketClient> { Log = Log };

if (cluster is ClientSingleCluster sc && sc.OnCreate == null) sc.OnCreate = OnCreate;
if (cluster is ClientPoolCluster pc && pc.OnCreate == null) pc.OnCreate = OnCreate;
if (cluster is ClientSingleCluster<ISocketClient> sc && sc.OnCreate == null) sc.OnCreate = OnCreate;
if (cluster is ClientPoolCluster<ISocketClient> pc && pc.OnCreate == null) pc.OnCreate = OnCreate;

cluster.GetItems ??= () => Servers ?? new String[0];
cluster.GetItems ??= () => Servers ?? [];
cluster.Open();

return cluster;
Expand Down Expand Up @@ -159,7 +159,7 @@ protected virtual ICluster<String, ISocketClient> InitCluster()
catch (ApiException ex)
{
// 这个连接没有鉴权,重新登录后再次调用
if (ex.Code == 401)
if (ex.Code == ApiCode.Unauthorized)
{
//await Cluster.InvokeAsync(client => OnLoginAsync(client, true)).ConfigureAwait(false);
await OnLoginAsync(client, true).ConfigureAwait(false);
Expand Down Expand Up @@ -283,7 +283,7 @@ public virtual Int32 InvokeOneWay(String action, Object? args = null, Byte flag
var message = enc.Decode(rs) ?? throw new InvalidOperationException();

// 是否成功
if (message.Code is not ApiCode.Ok and not 200)
if (message.Code is not ApiCode.Ok and not ApiCode.Ok200)
throw new ApiException(message.Code, message.Data?.ToStr().Trim('\"') ?? "") { Source = invoker + "/" + action };

if (message.Data == null) return default;
Expand Down Expand Up @@ -350,7 +350,7 @@ public Int32 InvokeWithClient(ISocketClient client, String action, Object? args,
/// <param name="e"></param>
protected virtual void OnReceive(IMessage message, ApiReceivedEventArgs e) => Received?.Invoke(this, e);

private void Client_Received(Object sender, ReceivedEventArgs e)
private void Client_Received(Object? sender, ReceivedEventArgs e)
{
LastActive = DateTime.Now;

Expand Down Expand Up @@ -378,11 +378,11 @@ private void Client_Received(Object sender, ReceivedEventArgs e)
/// <summary>连接后自动登录</summary>
/// <param name="client">客户端</param>
/// <param name="force">强制登录</param>
protected virtual Task<Object> OnLoginAsync(ISocketClient client, Boolean force) => Task.FromResult<Object>(0);
protected virtual Task<Object?> OnLoginAsync(ISocketClient client, Boolean force) => Task.FromResult<Object?>(null);

/// <summary>登录</summary>
/// <returns></returns>
public virtual async Task<Object> LoginAsync()
public virtual async Task<Object?> LoginAsync()
{
if (Cluster == null) throw new ArgumentNullException(nameof(Cluster));

Expand All @@ -407,6 +407,8 @@ protected virtual ISocketClient OnCreate(String svr)
client.Opened += (s, e) => OnNewSession((s as ISocketClient)!);
client.Received += Client_Received;

client.Open();

return client;
}
#endregion
Expand Down
2 changes: 1 addition & 1 deletion NewLife.Remoting/ApiServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public virtual Int32 InvokeAll(String action, Object? args = null)
/// <summary>显示统计信息的周期。默认600秒,0表示不显示统计信息</summary>
public Int32 StatPeriod { get; set; } = 600;

private void DoStat(Object state)
private void DoStat(Object? state)
{
var sb = Pool.StringBuilder.Get();
var pf2 = StatProcess;
Expand Down
28 changes: 14 additions & 14 deletions NewLife.Remoting/ClientPoolCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
namespace NewLife.Remoting;

/// <summary>客户端连接池负载均衡集群</summary>
public class ClientPoolCluster : ICluster<String, ISocketClient>
public class ClientPoolCluster<T> : ICluster<String, T>
{
/// <summary>最后使用资源</summary>
public KeyValuePair<String, ISocketClient> Current { get; private set; }
public KeyValuePair<String, T> Current { get; private set; }

/// <summary>服务器地址列表</summary>
public Func<IEnumerable<String>>? GetItems { get; set; }

/// <summary>创建回调</summary>
public Func<String, ISocketClient>? OnCreate { get; set; }
public Func<String, T>? OnCreate { get; set; }

/// <summary>连接池</summary>
public IPool<ISocketClient> Pool { get; private set; }
public IPool<T> Pool { get; private set; }

/// <summary>实例化连接池集群</summary>
public ClientPoolCluster() => Pool = new MyPool(this);
Expand All @@ -32,11 +32,11 @@ public class ClientPoolCluster : ICluster<String, ISocketClient>

/// <summary>从集群中获取资源</summary>
/// <returns></returns>
public virtual ISocketClient Get() => Pool.Get();
public virtual T Get() => Pool.Get();

/// <summary>归还</summary>
/// <param name="value"></param>
public virtual Boolean Put(ISocketClient value)
public virtual Boolean Put(T value)
{
if (value == null) return false;

Expand All @@ -48,7 +48,7 @@ public virtual Boolean Put(ISocketClient value)

/// <summary>为连接池创建连接</summary>
/// <returns></returns>
protected virtual ISocketClient CreateClient()
protected virtual T CreateClient()
{
if (GetItems == null) throw new ArgumentNullException(nameof(GetItems));
if (OnCreate == null) throw new ArgumentNullException(nameof(OnCreate));
Expand All @@ -69,10 +69,10 @@ protected virtual ISocketClient CreateClient()
WriteLog("集群均衡:{0}", svr);

var client = OnCreate(svr);
client.Open();
//client.Open();

// 设置当前资源
Current = new KeyValuePair<String, ISocketClient>(svr, client);
Current = new KeyValuePair<String, T>(svr, client);

return client;
}
Expand All @@ -85,11 +85,11 @@ protected virtual ISocketClient CreateClient()
throw last ?? new NullReferenceException();
}

class MyPool : ObjectPool<ISocketClient>
class MyPool : ObjectPool<T>

Check warning on line 88 in NewLife.Remoting/ClientPoolCluster.cs

View workflow job for this annotation

GitHub Actions / build-publish

The type 'T' cannot be used as type parameter 'T' in the generic type or method 'ObjectPool<T>'. Nullability of type argument 'T' doesn't match 'notnull' constraint.

Check warning on line 88 in NewLife.Remoting/ClientPoolCluster.cs

View workflow job for this annotation

GitHub Actions / build-publish

The type 'T' cannot be used as type parameter 'T' in the generic type or method 'ObjectPool<T>'. Nullability of type argument 'T' doesn't match 'notnull' constraint.

Check warning on line 88 in NewLife.Remoting/ClientPoolCluster.cs

View workflow job for this annotation

GitHub Actions / test

The type 'T' cannot be used as type parameter 'T' in the generic type or method 'ObjectPool<T>'. Nullability of type argument 'T' doesn't match 'notnull' constraint.

Check warning on line 88 in NewLife.Remoting/ClientPoolCluster.cs

View workflow job for this annotation

GitHub Actions / test

The type 'T' cannot be used as type parameter 'T' in the generic type or method 'ObjectPool<T>'. Nullability of type argument 'T' doesn't match 'notnull' constraint.
{
public ClientPoolCluster Host { get; set; }
public ClientPoolCluster<T> Host { get; set; }

public MyPool(ClientPoolCluster cluster)
public MyPool(ClientPoolCluster<T> cluster)
{
// 最小值为0,连接池不再使用栈,只使用队列
Min = 0;
Expand All @@ -98,11 +98,11 @@ public MyPool(ClientPoolCluster cluster)
Host = cluster;
}

protected override ISocketClient OnCreate() => Host.CreateClient();
protected override T OnCreate() => Host.CreateClient();

/// <summary>释放时,返回是否有效。无效对象将会被抛弃</summary>
/// <param name="value"></param>
protected override Boolean OnPut(ISocketClient value) => value != null && !value.Disposed /*&& value.Client != null*/;
protected override Boolean OnPut(T value) => value != null && (value is not IDisposable2 ds || !ds.Disposed);
}

#region 日志
Expand Down
33 changes: 21 additions & 12 deletions NewLife.Remoting/ClientSingleCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,45 @@
namespace NewLife.Remoting;

/// <summary>客户端单连接故障转移集群</summary>
public class ClientSingleCluster : ICluster<String, ISocketClient>
public class ClientSingleCluster<T> : ICluster<String, T>
{
/// <summary>最后使用资源</summary>
public KeyValuePair<String, ISocketClient> Current { get; private set; }
public KeyValuePair<String, T> Current { get; private set; }

/// <summary>服务器地址列表</summary>
public Func<IEnumerable<String>>? GetItems { get; set; }

/// <summary>创建回调</summary>
public Func<String, ISocketClient>? OnCreate { get; set; }
public Func<String, T>? OnCreate { get; set; }

/// <summary>打开</summary>
public virtual Boolean Open() => true;

/// <summary>关闭</summary>
/// <param name="reason">关闭原因。便于日志分析</param>
/// <returns>是否成功</returns>
public virtual Boolean Close(String reason) => _Client == null ? false : _Client.Close(reason);
public virtual Boolean Close(String reason)
{
if (_Client == null) return false;

if (_Client is ISocketClient client) return client.Close(reason);

_Client.TryDispose();

return true;
}

private ISocketClient? _Client;
private T? _Client;
/// <summary>从集群中获取资源</summary>
/// <returns></returns>
public virtual ISocketClient Get()
public virtual T Get()
{
var tc = _Client;
if (tc != null && tc.Active && !tc.Disposed) return tc;
if (tc != null && (tc is not DisposeBase ds || !ds.Disposed)) return tc;
lock (this)
{
tc = _Client;
if (tc != null && tc.Active && !tc.Disposed) return tc;
if (tc != null && (tc is not DisposeBase ds2 || !ds2.Disposed)) return tc;

// 释放旧对象
tc.TryDispose();
Expand All @@ -44,14 +53,14 @@ public virtual ISocketClient Get()

/// <summary>归还</summary>
/// <param name="value"></param>
public virtual Boolean Put(ISocketClient value) => true;
public virtual Boolean Put(T value) => true;

/// <summary>Round-Robin 负载均衡</summary>
private Int32 _index = -1;

/// <summary>为连接池创建连接</summary>
/// <returns></returns>
protected virtual ISocketClient CreateClient()
protected virtual T CreateClient()
{
if (GetItems == null) throw new ArgumentNullException(nameof(GetItems));
if (OnCreate == null) throw new ArgumentNullException(nameof(OnCreate));
Expand All @@ -72,10 +81,10 @@ protected virtual ISocketClient CreateClient()
WriteLog("集群转移:{0}", svr);

var client = OnCreate(svr);
client.Open();
//client.Open();

// 设置当前资源
Current = new KeyValuePair<String, ISocketClient>(svr, client);
Current = new KeyValuePair<String, T>(svr, client);

return client;
}
Expand Down
Loading

0 comments on commit 6519e05

Please sign in to comment.