diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs index 9400cf60c..0d5cd3073 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubClientMessageReader.cs @@ -74,29 +74,24 @@ public StreamingHubMessageType ReadMessageType() return (clientRequestMessageId, methodId, data.Slice(offset)); } - public (byte Sequence, ReadOnlyMemory Metadata) ReadServerHeartbeat() + public (byte Sequence, long ServerSentAt, ReadOnlyMemory Metadata) ReadServerHeartbeat() { //var type = reader.ReadByte(); // Type is already read by ReadMessageType var sequence = reader.ReadByte(); // Sequence - reader.Skip(); // Dummy (2) + var serverSentAt = reader.ReadInt64(); // ServerSentAt (2) reader.Skip(); // Dummy (3) - return (sequence, data.Slice((int)reader.Consumed)); + return (sequence, serverSentAt, data.Slice((int)reader.Consumed)); } - public (byte Sequence, long SentAt) ReadClientHeartbeatResponse() + public (byte Sequence, long ClientSentAt) ReadClientHeartbeatResponse() { //var type = reader.ReadByte(); // Type is already read by ReadMessageType var sequence = reader.ReadByte(); // Sequence - reader.Skip(); // Dummy (2) - reader.Skip(); // Dummy (3) - - // Extra: [SentAt(long)] - var arrayLen = reader.ReadArrayHeader(); - if (arrayLen == 0) throw new InvalidOperationException("Invalid client heartbeat response. An extra data is empty."); - var sentAt = reader.ReadInt64(); + var clientSentAt = reader.ReadInt64(); // ClientSentAt (2) + reader.Skip(); // Reserved (3) - return (sequence, sentAt); + return (sequence, clientSentAt); } } } diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs index 6b40447e0..d1a82749c 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubMessageWriter.cs @@ -203,32 +203,32 @@ public static void WriteClientResultResponseMessageForError(IBufferWriter /// Writes a server heartbeat message for sending from the server. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteServerHeartbeatMessageHeader(IBufferWriter bufferWriter, short sequence) + public static void WriteServerHeartbeatMessageHeader(IBufferWriter bufferWriter, short sequence, DateTimeOffset serverSentAt) { - // Array(5)[127, Sequence(int8), Nil, Nil, ] + // Array(5)[127, Sequence(int8), ServerSentAt(long; UnixTimeMs), Nil, ] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(5); - writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy - writer.WriteNil(); // Dummy + writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) + writer.Write(sequence); // Sequence + writer.Write(serverSentAt.ToUnixTimeMilliseconds()); // ServerSentAt + writer.WriteNil(); // Dummy writer.Flush(); - // // + // // } /// /// Writes a server heartbeat message for sending response from the client. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteServerHeartbeatMessageResponse(IBufferWriter bufferWriter, short sequence) + public static void WriteServerHeartbeatMessageResponse(IBufferWriter bufferWriter, short sequence, long serverSentAt) { - // Array(4)[127, Sequence(int8), Nil, Nil] + // Array(4)[127, Sequence(int8), ServerSentAt(long; UnixTimeMs), Nil] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(4); - writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy - writer.WriteNil(); // Dummy + writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) + writer.Write(sequence); // Sequence + writer.Write(serverSentAt); // ServerSentAt + writer.WriteNil(); // Dummy writer.Flush(); } @@ -236,33 +236,33 @@ public static void WriteServerHeartbeatMessageResponse(IBufferWriter buffe /// Writes a client heartbeat message for sending from the client. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteClientHeartbeatMessageHeader(IBufferWriter bufferWriter, short sequence) + public static void WriteClientHeartbeatMessage(IBufferWriter bufferWriter, short sequence, DateTimeOffset clientSentAt) { - // Array(4)[0x7e(126), Sequence(int8), Nil, ] + // Array(4)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), ] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(4); - writer.Write(0x7e); // Type = 0x7e / 126 (ClientHeartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy + writer.Write(0x7e); // 0:Type = 0x7e / 126 (ClientHeartbeat) + writer.Write(sequence); // 1:Sequence + writer.Write(clientSentAt.ToUnixTimeMilliseconds()); // 2:ClientSentAt + writer.WriteNil(); // 3:Reserved writer.Flush(); - // // } /// /// Writes a client heartbeat message for sending response from the server. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteClientHeartbeatMessageResponseHeader(IBufferWriter bufferWriter, short sequence) + public static void WriteClientHeartbeatMessageResponse(IBufferWriter bufferWriter, short sequence, long clientSentAt) { - // Array(5)[0x7e(126), Sequence(int8), Nil, Nil, ] + // Array(5)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), Nil, ] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(5); - writer.Write(0x7e); // Type = 0x7e / 126 (Heartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy - writer.WriteNil(); // Dummy + writer.Write(0x7e); // 0:Type = 0x7e / 126 (Heartbeat) + writer.Write(sequence); // 1:Sequence + writer.Write(clientSentAt); // 2:ClientSentAt + writer.WriteNil(); // 3:Reserved + writer.WriteNil(); // 4:Reserved writer.Flush(); - // // } } diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs index 66538d5f1..deb1ff617 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/Internal.Shared/StreamingHubServerMessageReader.cs @@ -77,14 +77,14 @@ public StreamingHubMessageType ReadMessageType() return (clientResultMessageId, clientMethodId, statusCode, detail, message); } - public (short Sequence, ReadOnlyMemory Extra) ReadClientHeartbeat() + public (short Sequence, long ClientSentAt, ReadOnlyMemory Extra) ReadClientHeartbeat() { - // [Sequence(int8), Nil, [SentAt(long)]] + // [Sequence(int8), ClientSentAt(long), ] var sequence = reader.ReadInt16(); // Sequence - reader.Skip(); // Dummy + var clientSentAt = reader.ReadInt64(); // ClientSentAt var extra = data.Slice((int)reader.Consumed); - return (sequence, data.Slice((int)reader.Consumed)); + return (sequence, clientSentAt, data.Slice((int)reader.Consumed)); } public short ReadServerHeartbeatResponse() diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs index 613909c7a..cf18930c8 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientBase.cs @@ -22,7 +22,7 @@ public class StreamingHubClientOptions public TimeSpan? ClientHeartbeatInterval { get; } public TimeSpan? ClientHeartbeatTimeout { get; } - public Action>? OnServerHeartbeatReceived { get; } + public Action? OnServerHeartbeatReceived { get; } public Action? OnClientHeartbeatResponseReceived { get; } #if NET8_0_OR_GREATER public TimeProvider? TimeProvider { get; } @@ -38,9 +38,9 @@ public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOn } #if NET8_0_OR_GREATER - public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action>? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived,TimeProvider? timeProvider) + public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived,TimeProvider? timeProvider) #else - public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action>? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived) + public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived) #endif { Host = host; @@ -120,7 +120,7 @@ public StreamingHubClientOptions WithClientHeartbeatTimeout(TimeSpan? timeout) /// /// /// - public StreamingHubClientOptions WithServerHeartbeatReceived(Action>? onServerHeartbeatReceived) + public StreamingHubClientOptions WithServerHeartbeatReceived(Action? onServerHeartbeatReceived) => new(Host, CallOptions, SerializerProvider, Logger , ClientHeartbeatInterval, ClientHeartbeatTimeout, onServerHeartbeatReceived, OnClientHeartbeatResponseReceived #if NET8_0_OR_GREATER diff --git a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs index df35b1634..809e0531f 100644 --- a/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs +++ b/src/MagicOnion.Client.Unity/Assets/Scripts/MagicOnion/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs @@ -9,6 +9,9 @@ namespace MagicOnion.Client { + /// + /// Represents a client heartbeat received event. + /// public readonly struct ClientHeartbeatEvent { /// @@ -22,13 +25,35 @@ public ClientHeartbeatEvent(long roundTripTimeMs) } } + /// + /// Represents a server heartbeat received event. + /// + public readonly struct ServerHeartbeatEvent + { + /// + /// Gets the server time at when the heartbeat was sent. + /// + public DateTimeOffset ServerTime { get; } + + /// + /// Gets the metadata data. The data is only available during event processing. + /// + public ReadOnlyMemory Metadata { get; } + + public ServerHeartbeatEvent(long serverTimeUnixMs, ReadOnlyMemory metadata) + { + ServerTime = DateTimeOffset.FromUnixTimeMilliseconds(serverTimeUnixMs); + Metadata = metadata; + } + } + internal class StreamingHubClientHeartbeatManager : IDisposable { readonly CancellationTokenSource timeoutTokenSource; readonly CancellationTokenSource shutdownTokenSource; readonly TimeSpan heartbeatInterval; readonly TimeSpan timeoutPeriod; - readonly Action>? onServerHeartbeatReceived; + readonly Action? onServerHeartbeatReceived; readonly Action? onClientHeartbeatResponseReceived; readonly SynchronizationContext? synchronizationContext; readonly ChannelWriter writer; @@ -48,7 +73,7 @@ public StreamingHubClientHeartbeatManager( ChannelWriter writer, TimeSpan heartbeatInterval, TimeSpan timeoutPeriod, - Action>? onServerHeartbeatReceived, + Action? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived, SynchronizationContext? synchronizationContext, CancellationToken shutdownToken @@ -142,7 +167,7 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action>? serverHeartbeatReceivedAction) => (state) => + SendOrPostCallback ProcessServerHeartbeatCore(Action? serverHeartbeatReceivedAction) => (state) => { var payload = (StreamingHubPayload)state!; var reader = new StreamingHubClientMessageReader(payload.Memory); _ = reader.ReadMessageType(); - var (serverSentSequence, metadata) = reader.ReadServerHeartbeat(); + var (serverSentSequence, serverSentAt, metadata) = reader.ReadServerHeartbeat(); - serverHeartbeatReceivedAction?.Invoke(metadata); + serverHeartbeatReceivedAction?.Invoke(new ServerHeartbeatEvent(serverSentAt, metadata)); // Writes a ServerHeartbeatResponse to the writer queue. - _ = writer.TryWrite(BuildServerHeartbeatMessage(serverSentSequence)); + _ = writer.TryWrite(BuildServerHeartbeatMessage(serverSentSequence, serverSentAt)); StreamingHubPayloadPool.Shared.Return(payload); }; - StreamingHubPayload BuildServerHeartbeatMessage(short serverSequence) + StreamingHubPayload BuildServerHeartbeatMessage(short serverSequence, long serverSentAt) { using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter(); - StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer, serverSequence); + StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer, serverSequence, serverSentAt); return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan); } StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence) { using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter(); - StreamingHubMessageWriter.WriteClientHeartbeatMessageHeader(buffer, clientSequence); var now = #if NET8_0_OR_GREATER @@ -197,11 +221,7 @@ StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence) DateTimeOffset.UtcNow; #endif - // Extra: [SentAt(long)] - var writer = new MessagePackWriter(buffer); - writer.WriteArrayHeader(1); - writer.Write(now.ToUnixTimeMilliseconds()); - writer.Flush(); + StreamingHubMessageWriter.WriteClientHeartbeatMessage(buffer, clientSequence, now); return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan); } diff --git a/src/MagicOnion.Client/StreamingHubClientBase.cs b/src/MagicOnion.Client/StreamingHubClientBase.cs index 613909c7a..cf18930c8 100644 --- a/src/MagicOnion.Client/StreamingHubClientBase.cs +++ b/src/MagicOnion.Client/StreamingHubClientBase.cs @@ -22,7 +22,7 @@ public class StreamingHubClientOptions public TimeSpan? ClientHeartbeatInterval { get; } public TimeSpan? ClientHeartbeatTimeout { get; } - public Action>? OnServerHeartbeatReceived { get; } + public Action? OnServerHeartbeatReceived { get; } public Action? OnClientHeartbeatResponseReceived { get; } #if NET8_0_OR_GREATER public TimeProvider? TimeProvider { get; } @@ -38,9 +38,9 @@ public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOn } #if NET8_0_OR_GREATER - public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action>? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived,TimeProvider? timeProvider) + public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived,TimeProvider? timeProvider) #else - public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action>? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived) + public StreamingHubClientOptions(string? host, CallOptions callOptions, IMagicOnionSerializerProvider serializerProvider, IMagicOnionClientLogger logger, TimeSpan? clientHeartbeatInterval, TimeSpan? clientHeartbeatTimeout, Action? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived) #endif { Host = host; @@ -120,7 +120,7 @@ public StreamingHubClientOptions WithClientHeartbeatTimeout(TimeSpan? timeout) /// /// /// - public StreamingHubClientOptions WithServerHeartbeatReceived(Action>? onServerHeartbeatReceived) + public StreamingHubClientOptions WithServerHeartbeatReceived(Action? onServerHeartbeatReceived) => new(Host, CallOptions, SerializerProvider, Logger , ClientHeartbeatInterval, ClientHeartbeatTimeout, onServerHeartbeatReceived, OnClientHeartbeatResponseReceived #if NET8_0_OR_GREATER diff --git a/src/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs b/src/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs index df35b1634..809e0531f 100644 --- a/src/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs +++ b/src/MagicOnion.Client/StreamingHubClientHeartbeatManager.cs @@ -9,6 +9,9 @@ namespace MagicOnion.Client { + /// + /// Represents a client heartbeat received event. + /// public readonly struct ClientHeartbeatEvent { /// @@ -22,13 +25,35 @@ public ClientHeartbeatEvent(long roundTripTimeMs) } } + /// + /// Represents a server heartbeat received event. + /// + public readonly struct ServerHeartbeatEvent + { + /// + /// Gets the server time at when the heartbeat was sent. + /// + public DateTimeOffset ServerTime { get; } + + /// + /// Gets the metadata data. The data is only available during event processing. + /// + public ReadOnlyMemory Metadata { get; } + + public ServerHeartbeatEvent(long serverTimeUnixMs, ReadOnlyMemory metadata) + { + ServerTime = DateTimeOffset.FromUnixTimeMilliseconds(serverTimeUnixMs); + Metadata = metadata; + } + } + internal class StreamingHubClientHeartbeatManager : IDisposable { readonly CancellationTokenSource timeoutTokenSource; readonly CancellationTokenSource shutdownTokenSource; readonly TimeSpan heartbeatInterval; readonly TimeSpan timeoutPeriod; - readonly Action>? onServerHeartbeatReceived; + readonly Action? onServerHeartbeatReceived; readonly Action? onClientHeartbeatResponseReceived; readonly SynchronizationContext? synchronizationContext; readonly ChannelWriter writer; @@ -48,7 +73,7 @@ public StreamingHubClientHeartbeatManager( ChannelWriter writer, TimeSpan heartbeatInterval, TimeSpan timeoutPeriod, - Action>? onServerHeartbeatReceived, + Action? onServerHeartbeatReceived, Action? onClientHeartbeatResponseReceived, SynchronizationContext? synchronizationContext, CancellationToken shutdownToken @@ -142,7 +167,7 @@ SendOrPostCallback ProcessClientHeartbeatResponseCore(Action>? serverHeartbeatReceivedAction) => (state) => + SendOrPostCallback ProcessServerHeartbeatCore(Action? serverHeartbeatReceivedAction) => (state) => { var payload = (StreamingHubPayload)state!; var reader = new StreamingHubClientMessageReader(payload.Memory); _ = reader.ReadMessageType(); - var (serverSentSequence, metadata) = reader.ReadServerHeartbeat(); + var (serverSentSequence, serverSentAt, metadata) = reader.ReadServerHeartbeat(); - serverHeartbeatReceivedAction?.Invoke(metadata); + serverHeartbeatReceivedAction?.Invoke(new ServerHeartbeatEvent(serverSentAt, metadata)); // Writes a ServerHeartbeatResponse to the writer queue. - _ = writer.TryWrite(BuildServerHeartbeatMessage(serverSentSequence)); + _ = writer.TryWrite(BuildServerHeartbeatMessage(serverSentSequence, serverSentAt)); StreamingHubPayloadPool.Shared.Return(payload); }; - StreamingHubPayload BuildServerHeartbeatMessage(short serverSequence) + StreamingHubPayload BuildServerHeartbeatMessage(short serverSequence, long serverSentAt) { using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter(); - StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer, serverSequence); + StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer, serverSequence, serverSentAt); return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan); } StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence) { using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter(); - StreamingHubMessageWriter.WriteClientHeartbeatMessageHeader(buffer, clientSequence); var now = #if NET8_0_OR_GREATER @@ -197,11 +221,7 @@ StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence) DateTimeOffset.UtcNow; #endif - // Extra: [SentAt(long)] - var writer = new MessagePackWriter(buffer); - writer.WriteArrayHeader(1); - writer.Write(now.ToUnixTimeMilliseconds()); - writer.Flush(); + StreamingHubMessageWriter.WriteClientHeartbeatMessage(buffer, clientSequence, now); return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan); } diff --git a/src/MagicOnion.Internal/StreamingHubClientMessageReader.cs b/src/MagicOnion.Internal/StreamingHubClientMessageReader.cs index 9400cf60c..0d5cd3073 100644 --- a/src/MagicOnion.Internal/StreamingHubClientMessageReader.cs +++ b/src/MagicOnion.Internal/StreamingHubClientMessageReader.cs @@ -74,29 +74,24 @@ public StreamingHubMessageType ReadMessageType() return (clientRequestMessageId, methodId, data.Slice(offset)); } - public (byte Sequence, ReadOnlyMemory Metadata) ReadServerHeartbeat() + public (byte Sequence, long ServerSentAt, ReadOnlyMemory Metadata) ReadServerHeartbeat() { //var type = reader.ReadByte(); // Type is already read by ReadMessageType var sequence = reader.ReadByte(); // Sequence - reader.Skip(); // Dummy (2) + var serverSentAt = reader.ReadInt64(); // ServerSentAt (2) reader.Skip(); // Dummy (3) - return (sequence, data.Slice((int)reader.Consumed)); + return (sequence, serverSentAt, data.Slice((int)reader.Consumed)); } - public (byte Sequence, long SentAt) ReadClientHeartbeatResponse() + public (byte Sequence, long ClientSentAt) ReadClientHeartbeatResponse() { //var type = reader.ReadByte(); // Type is already read by ReadMessageType var sequence = reader.ReadByte(); // Sequence - reader.Skip(); // Dummy (2) - reader.Skip(); // Dummy (3) - - // Extra: [SentAt(long)] - var arrayLen = reader.ReadArrayHeader(); - if (arrayLen == 0) throw new InvalidOperationException("Invalid client heartbeat response. An extra data is empty."); - var sentAt = reader.ReadInt64(); + var clientSentAt = reader.ReadInt64(); // ClientSentAt (2) + reader.Skip(); // Reserved (3) - return (sequence, sentAt); + return (sequence, clientSentAt); } } } diff --git a/src/MagicOnion.Internal/StreamingHubMessageWriter.cs b/src/MagicOnion.Internal/StreamingHubMessageWriter.cs index 6b40447e0..d1a82749c 100644 --- a/src/MagicOnion.Internal/StreamingHubMessageWriter.cs +++ b/src/MagicOnion.Internal/StreamingHubMessageWriter.cs @@ -203,32 +203,32 @@ public static void WriteClientResultResponseMessageForError(IBufferWriter /// Writes a server heartbeat message for sending from the server. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteServerHeartbeatMessageHeader(IBufferWriter bufferWriter, short sequence) + public static void WriteServerHeartbeatMessageHeader(IBufferWriter bufferWriter, short sequence, DateTimeOffset serverSentAt) { - // Array(5)[127, Sequence(int8), Nil, Nil, ] + // Array(5)[127, Sequence(int8), ServerSentAt(long; UnixTimeMs), Nil, ] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(5); - writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy - writer.WriteNil(); // Dummy + writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) + writer.Write(sequence); // Sequence + writer.Write(serverSentAt.ToUnixTimeMilliseconds()); // ServerSentAt + writer.WriteNil(); // Dummy writer.Flush(); - // // + // // } /// /// Writes a server heartbeat message for sending response from the client. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteServerHeartbeatMessageResponse(IBufferWriter bufferWriter, short sequence) + public static void WriteServerHeartbeatMessageResponse(IBufferWriter bufferWriter, short sequence, long serverSentAt) { - // Array(4)[127, Sequence(int8), Nil, Nil] + // Array(4)[127, Sequence(int8), ServerSentAt(long; UnixTimeMs), Nil] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(4); - writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy - writer.WriteNil(); // Dummy + writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat) + writer.Write(sequence); // Sequence + writer.Write(serverSentAt); // ServerSentAt + writer.WriteNil(); // Dummy writer.Flush(); } @@ -236,33 +236,33 @@ public static void WriteServerHeartbeatMessageResponse(IBufferWriter buffe /// Writes a client heartbeat message for sending from the client. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteClientHeartbeatMessageHeader(IBufferWriter bufferWriter, short sequence) + public static void WriteClientHeartbeatMessage(IBufferWriter bufferWriter, short sequence, DateTimeOffset clientSentAt) { - // Array(4)[0x7e(126), Sequence(int8), Nil, ] + // Array(4)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), ] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(4); - writer.Write(0x7e); // Type = 0x7e / 126 (ClientHeartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy + writer.Write(0x7e); // 0:Type = 0x7e / 126 (ClientHeartbeat) + writer.Write(sequence); // 1:Sequence + writer.Write(clientSentAt.ToUnixTimeMilliseconds()); // 2:ClientSentAt + writer.WriteNil(); // 3:Reserved writer.Flush(); - // // } /// /// Writes a client heartbeat message for sending response from the server. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void WriteClientHeartbeatMessageResponseHeader(IBufferWriter bufferWriter, short sequence) + public static void WriteClientHeartbeatMessageResponse(IBufferWriter bufferWriter, short sequence, long clientSentAt) { - // Array(5)[0x7e(126), Sequence(int8), Nil, Nil, ] + // Array(5)[0x7e(126), Sequence(int8), ClientSentAt(long; UnixTimeMs), Nil, ] var writer = new MessagePackWriter(bufferWriter); writer.WriteArrayHeader(5); - writer.Write(0x7e); // Type = 0x7e / 126 (Heartbeat) - writer.Write(sequence); // Sequence - writer.WriteNil(); // Dummy - writer.WriteNil(); // Dummy + writer.Write(0x7e); // 0:Type = 0x7e / 126 (Heartbeat) + writer.Write(sequence); // 1:Sequence + writer.Write(clientSentAt); // 2:ClientSentAt + writer.WriteNil(); // 3:Reserved + writer.WriteNil(); // 4:Reserved writer.Flush(); - // // } } diff --git a/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs b/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs index 66538d5f1..deb1ff617 100644 --- a/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs +++ b/src/MagicOnion.Internal/StreamingHubServerMessageReader.cs @@ -77,14 +77,14 @@ public StreamingHubMessageType ReadMessageType() return (clientResultMessageId, clientMethodId, statusCode, detail, message); } - public (short Sequence, ReadOnlyMemory Extra) ReadClientHeartbeat() + public (short Sequence, long ClientSentAt, ReadOnlyMemory Extra) ReadClientHeartbeat() { - // [Sequence(int8), Nil, [SentAt(long)]] + // [Sequence(int8), ClientSentAt(long), ] var sequence = reader.ReadInt16(); // Sequence - reader.Skip(); // Dummy + var clientSentAt = reader.ReadInt64(); // ClientSentAt var extra = data.Slice((int)reader.Consumed); - return (sequence, data.Slice((int)reader.Consumed)); + return (sequence, clientSentAt, data.Slice((int)reader.Consumed)); } public short ReadServerHeartbeatResponse() diff --git a/src/MagicOnion.Server/Hubs/StreamingHub.cs b/src/MagicOnion.Server/Hubs/StreamingHub.cs index 923450d13..d1c624a79 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHub.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHub.cs @@ -234,10 +234,10 @@ ValueTask ProcessMessageAsync(StreamingHubPayload payload, UniqueHashDictionary< } case StreamingHubMessageType.ClientHeartbeat: { - var (seq, heartbeatBody) = reader.ReadClientHeartbeat(); + var (seq, clientSentAt, heartbeatBody) = reader.ReadClientHeartbeat(); using var bufferWriter = ArrayPoolBufferWriter.RentThreadStaticWriter(); - StreamingHubMessageWriter.WriteClientHeartbeatMessageResponseHeader(bufferWriter, seq); + StreamingHubMessageWriter.WriteClientHeartbeatMessageResponse(bufferWriter, seq, clientSentAt); bufferWriter.Write(heartbeatBody.Span); // Copy an extra body to the response message. StreamingServiceContext.QueueResponseStreamWrite(StreamingHubPayloadPool.Shared.RentOrCreate(bufferWriter.WrittenSpan)); diff --git a/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs b/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs index 1303ada52..75b5a1092 100644 --- a/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs +++ b/src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs @@ -9,6 +9,8 @@ namespace MagicOnion.Server.Hubs; internal interface IStreamingHubHeartbeatManager : IDisposable { + TimeProvider TimeProvider { get; } + StreamingHubHeartbeatHandle Register(IStreamingServiceContext serviceContext); void Unregister(IStreamingServiceContext serviceContext); } @@ -21,26 +23,41 @@ internal class StreamingHubHeartbeatHandle : IDisposable bool disposed; short waitingSequence; bool timeoutTimerIsRunning; + DateTimeOffset lastSentAt; + DateTimeOffset lastReceivedAt; + + /// + /// Gets the last received time. + /// + public DateTimeOffset LastReceivedAt => lastReceivedAt; + + /// + /// Gets the latency between client and server. Returns if not sent or received. + /// + public TimeSpan Latency => (lastSentAt == default || lastReceivedAt == default) + ? TimeSpan.Zero + : lastReceivedAt - lastSentAt; public IStreamingServiceContext ServiceContext { get; } public CancellationToken TimeoutToken => timeoutToken.Token; - public StreamingHubHeartbeatHandle(IStreamingHubHeartbeatManager manager, IStreamingServiceContext serviceContext, TimeSpan timeoutDuration, TimeProvider timeProvider) + public StreamingHubHeartbeatHandle(IStreamingHubHeartbeatManager manager, IStreamingServiceContext serviceContext, TimeSpan timeoutDuration) { this.manager = manager; this.ServiceContext = serviceContext; this.timeoutDuration = timeoutDuration; this.timeoutToken = new CancellationTokenSource(Timeout.InfiniteTimeSpan #if NET8_0_OR_GREATER - , timeProvider + , this.manager.TimeProvider #endif ); } - public void RestartTimeoutTimer(short sequence) + public void RestartTimeoutTimer(short sequence, DateTimeOffset sentAt) { if (disposed || timeoutDuration == Timeout.InfiniteTimeSpan) return; waitingSequence = sequence; + lastSentAt = sentAt; if (!timeoutTimerIsRunning) { @@ -54,6 +71,7 @@ public void Ack(short sequence) if (disposed || timeoutDuration == Timeout.InfiniteTimeSpan) return; if (waitingSequence != sequence) return; + lastReceivedAt = manager.TimeProvider.GetUtcNow(); timeoutToken.CancelAfter(Timeout.InfiniteTimeSpan); timeoutTimerIsRunning = false; } @@ -71,10 +89,12 @@ internal class NopStreamingHubHeartbeatManager : IStreamingHubHeartbeatManager { public static IStreamingHubHeartbeatManager Instance { get; } = new NopStreamingHubHeartbeatManager(); + public TimeProvider TimeProvider => TimeProvider.System; + NopStreamingHubHeartbeatManager() {} public StreamingHubHeartbeatHandle Register(IStreamingServiceContext serviceContext) - => new(this, serviceContext, Timeout.InfiniteTimeSpan, TimeProvider.System); + => new(this, serviceContext, Timeout.InfiniteTimeSpan); public void Unregister(IStreamingServiceContext serviceContext) { } public void Dispose() { } } @@ -87,7 +107,6 @@ internal class StreamingHubHeartbeatManager : IStreamingHubHeartbeatManager readonly IStreamingHubHeartbeatMetadataProvider? heartbeatMetadataProvider; readonly TimeSpan heartbeatInterval; readonly TimeSpan timeoutDuration; - readonly TimeProvider timeProvider; readonly ILogger logger; PeriodicTimer? timer; @@ -95,19 +114,22 @@ internal class StreamingHubHeartbeatManager : IStreamingHubHeartbeatManager ConcurrentDictionary contexts = new(); short sequence; + public TimeProvider TimeProvider { get; } + public StreamingHubHeartbeatManager(TimeSpan heartbeatInterval, TimeSpan timeoutDuration, IStreamingHubHeartbeatMetadataProvider? heartbeatMetadataProvider, TimeProvider timeProvider, ILogger logger) { this.heartbeatInterval = heartbeatInterval; this.timeoutDuration = timeoutDuration; this.heartbeatMetadataProvider = heartbeatMetadataProvider; - this.timeProvider = timeProvider; this.logger = logger; + + TimeProvider = timeProvider; } public StreamingHubHeartbeatHandle Register(IStreamingServiceContext serviceContext) { var method = serviceContext.CallContext.Method; - var handle = new StreamingHubHeartbeatHandle(this, serviceContext, timeoutDuration, timeProvider); + var handle = new StreamingHubHeartbeatHandle(this, serviceContext, timeoutDuration); if (contexts.TryAdd(serviceContext.ContextId, handle)) { if (Interlocked.Increment(ref registeredCount) == 1) @@ -118,7 +140,7 @@ public StreamingHubHeartbeatHandle Register(IStreamingServiceContext(); while (await runningTimer.WaitForNextTickAsync()) { - StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(writer, sequence); + var now = TimeProvider.GetUtcNow(); + StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(writer, sequence, now); if (!(heartbeatMetadataProvider?.TryWriteMetadata(writer) ?? false)) { writer.Write(Nil); @@ -153,7 +176,7 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method) { foreach (var (contextId, handle) in contexts) { - handle.RestartTimeoutTimer(sequence); + handle.RestartTimeoutTimer(sequence, now); handle.ServiceContext.QueueResponseStreamWrite(StreamingHubPayloadPool.Shared.RentOrCreate(writer.WrittenSpan)); } } diff --git a/tests/MagicOnion.Client.Tests/StreamingHubClientHeartbeatManagerTest.cs b/tests/MagicOnion.Client.Tests/StreamingHubClientHeartbeatManagerTest.cs index fde01aa9c..756782ee0 100644 --- a/tests/MagicOnion.Client.Tests/StreamingHubClientHeartbeatManagerTest.cs +++ b/tests/MagicOnion.Client.Tests/StreamingHubClientHeartbeatManagerTest.cs @@ -23,7 +23,7 @@ public async Task Interval_TimeoutDisabled() channel.Writer, interval, timeout, - onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x), + onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x.Metadata.ToArray()), onClientHeartbeatResponseReceived: x => clientHeartbeatResponseReceived.Add(x), synchronizationContext: null, shutdownToken: CancellationToken.None, @@ -41,11 +41,11 @@ public async Task Interval_TimeoutDisabled() // Assert Assert.True(channel.Reader.TryRead(out var heartbeat1)); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(1))], heartbeat1.Memory.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray()); Assert.True(channel.Reader.TryRead(out var heartbeat2)); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(2))], heartbeat2.Memory.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(origin.AddSeconds(2)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat2.Memory.ToArray()); Assert.True(channel.Reader.TryRead(out var heartbeat3)); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(3))], heartbeat3.Memory.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(origin.AddSeconds(3)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat3.Memory.ToArray()); Assert.False(manager.TimeoutToken.IsCancellationRequested); } @@ -65,7 +65,7 @@ public async Task Elapsed_RoundTripTime() channel.Writer, interval, timeout, - onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x), + onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x.Metadata.ToArray()), onClientHeartbeatResponseReceived: x => clientHeartbeatResponseReceived.Add(x), synchronizationContext: null, shutdownToken: CancellationToken.None, @@ -78,35 +78,34 @@ public async Task Elapsed_RoundTripTime() await Task.Delay(10); timeProvider.Advance(TimeSpan.FromMilliseconds(100)); await Task.Delay(10); - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100))])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */])); timeProvider.Advance(TimeSpan.FromMilliseconds(900)); // Send await Task.Delay(10); timeProvider.Advance(TimeSpan.FromMilliseconds(100)); await Task.Delay(10); - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100))])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */])); timeProvider.Advance(TimeSpan.FromMilliseconds(900)); // Send await Task.Delay(10); timeProvider.Advance(TimeSpan.FromMilliseconds(100)); await Task.Delay(10); - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100))])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(timeProvider.GetUtcNow().AddMilliseconds(-100)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */])); await Task.Delay(10); // Assert Assert.Equal(3, clientHeartbeatResponseReceived.Count); Assert.True(channel.Reader.TryRead(out var heartbeat1)); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(1))], heartbeat1.Memory.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray()); Assert.Equal(TimeSpan.FromMilliseconds(100), clientHeartbeatResponseReceived[0].RoundTripTime); Assert.True(channel.Reader.TryRead(out var heartbeat2)); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(2))], heartbeat2.Memory.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(origin.AddSeconds(2)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat2.Memory.ToArray()); Assert.Equal(TimeSpan.FromMilliseconds(100), clientHeartbeatResponseReceived[1].RoundTripTime); Assert.True(channel.Reader.TryRead(out var heartbeat3)); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(3))], heartbeat3.Memory.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(origin.AddSeconds(3)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat3.Memory.ToArray()); Assert.Equal(TimeSpan.FromMilliseconds(100), clientHeartbeatResponseReceived[2].RoundTripTime); - Assert.False(manager.TimeoutToken.IsCancellationRequested); } @@ -125,7 +124,7 @@ public async Task Timeout_Not_Responding() channel.Writer, interval, timeout, - onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x), + onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x.Metadata.ToArray()), onClientHeartbeatResponseReceived: x => clientHeartbeatResponseReceived.Add(x), synchronizationContext: null, shutdownToken: CancellationToken.None, @@ -141,7 +140,7 @@ public async Task Timeout_Not_Responding() // Assert Assert.True(channel.Reader.TryRead(out var heartbeat1)); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. (ToMessagePackBytes(origin.AddSeconds(1)))], heartbeat1.Memory.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */], heartbeat1.Memory.ToArray()); Assert.False(channel.Reader.TryRead(out var heartbeat2)); Assert.True(manager.TimeoutToken.IsCancellationRequested); @@ -163,7 +162,7 @@ public async Task Timeout_Keep() channel.Writer, interval, timeout, - onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x), + onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x.Metadata.ToArray()), onClientHeartbeatResponseReceived: x => clientHeartbeatResponseReceived.Add(x), synchronizationContext: null, shutdownToken: CancellationToken.None, @@ -183,7 +182,7 @@ public async Task Timeout_Keep() Assert.False(manager.TimeoutToken.IsCancellationRequested); // Received a response message from the server. - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(1))])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */])); timeProvider.Advance(TimeSpan.FromMilliseconds(250)); await Task.Delay(10); @@ -212,7 +211,7 @@ public async Task Timeout_IntervalLongerThanTimeout_Not_Responding() channel.Writer, interval, timeout, - onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x), + onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x.Metadata.ToArray()), onClientHeartbeatResponseReceived: x => clientHeartbeatResponseReceived.Add(x), synchronizationContext: null, shutdownToken: CancellationToken.None, @@ -238,7 +237,7 @@ public async Task Timeout_IntervalLongerThanTimeout_Not_Responding() Assert.False(manager.TimeoutToken.IsCancellationRequested); // Respond to the first message. but it does not respond to subsequent messages. - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(1))])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt */, 0xc0 /* Nil */, 0xc0 /* Nil */])); timeProvider.Advance(TimeSpan.FromMilliseconds(100)); // 3s has elapsed since the first message. await Task.Delay(10); @@ -261,7 +260,7 @@ public async Task Timeout_IntervalLongerThanTimeout_Keep() channel.Writer, interval, timeout, - onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x), + onServerHeartbeatReceived: x => serverHeartbeatReceived.Add(x.Metadata.ToArray()), onClientHeartbeatResponseReceived: x => clientHeartbeatResponseReceived.Add(x), synchronizationContext: null, shutdownToken: CancellationToken.None, @@ -287,9 +286,9 @@ public async Task Timeout_IntervalLongerThanTimeout_Keep() Assert.False(manager.TimeoutToken.IsCancellationRequested); // Respond to the first message. but it does not respond to subsequent messages. - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(1))])); - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(2))])); - manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, 0xc0 /* Nil */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. ToMessagePackBytes(origin.AddSeconds(3))])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x0 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddSeconds(1)) /* ClientSentAt * /, 0xc0 /* Nil */, 0xc0 /* Nil */])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x1 /* Sequence(1) */, .. ToMessagePackBytes(origin.AddSeconds(2)) /* ClientSentAt * /, 0xc0 /* Nil */, 0xc0 /* Nil */])); + manager.ProcessClientHeartbeatResponse(StreamingHubPayloadPool.Shared.RentOrCreate([0x95 /* Array(5) */, 0x7e /* 0x7e(127) */, 0x2 /* Sequence(2) */, .. ToMessagePackBytes(origin.AddSeconds(3)) /* ClientSentAt * /, 0xc0 /* Nil */, 0xc0 /* Nil */])); timeProvider.Advance(TimeSpan.FromMilliseconds(100)); // 3s has elapsed since the first message. await Task.Delay(10); diff --git a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs index bbc81e71f..459a4feaa 100644 --- a/tests/MagicOnion.Client.Tests/StreamingHubTest.cs +++ b/tests/MagicOnion.Client.Tests/StreamingHubTest.cs @@ -435,7 +435,7 @@ public async Task Void_Parameter_Many() } [Fact] - public async Task Heartbeat_Interval() + public async Task ClientHeartbeat_Interval() { // Arrange var origin = new DateTimeOffset(2024, 7, 1, 0, 0, 0, TimeSpan.Zero); @@ -444,7 +444,6 @@ public async Task Heartbeat_Interval() var helper = new StreamingHubClientTestHelper(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance); var options = StreamingHubClientOptions.CreateWithDefault().WithClientHeartbeatInterval(TimeSpan.FromMilliseconds(100)).WithTimeProvider(timeProvider); var client = await helper.ConnectAsync(options, timeout.Token); - byte[] sentAt = [0xcf, 0x00, 0x00, 0x01, 0x90, 0x6b, 0x97, 0x5c, 0x00]; // MessagePackWriter.Write(timeProvider.GetUtcNow().ToUnixTimeMilliseconds()); // Act var t = client.Parameter_One(1234); @@ -462,9 +461,9 @@ public async Task Heartbeat_Interval() var request1 = await helper.ReadRequestRawAsync(); var request2 = await helper.ReadRequestRawAsync(); var request3 = await helper.ReadRequestRawAsync(); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x00 /* Sequence(0) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. (ToMessagePackBytes(origin.AddMilliseconds(100)))], request1.ToArray()); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x01 /* Sequence(1) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. (ToMessagePackBytes(origin.AddMilliseconds(200)))], request2.ToArray()); - Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x02 /* Sequence(2) */, 0xc0 /* Nil */, 0x91 /* Array(1) */, .. (ToMessagePackBytes(origin.AddMilliseconds(300)))], request3.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x00 /* Sequence(0) */, .. ToMessagePackBytes(origin.AddMilliseconds(100)) /* ServerSentAt */, 0xc0 /* Nil */], request1.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x01 /* Sequence(1) */, .. ToMessagePackBytes(origin.AddMilliseconds(200)) /* ServerSentAt */, 0xc0 /* Nil */], request2.ToArray()); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7e /* 0x7e(127) */, 0x02 /* Sequence(2) */, .. ToMessagePackBytes(origin.AddMilliseconds(300)) /* ServerSentAt */, 0xc0 /* Nil */], request3.ToArray()); static byte[] ToMessagePackBytes(DateTimeOffset dt) { @@ -479,17 +478,17 @@ static byte[] ToMessagePackBytes(DateTimeOffset dt) } [Fact] - public async Task Heartbeat_Respond() + public async Task ServerHeartbeat_Respond() { // Arrange var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var helper = new StreamingHubClientTestHelper(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance); - var options = StreamingHubClientOptions.CreateWithDefault().WithClientHeartbeatInterval(Timeout.InfiniteTimeSpan); // Disable Heartbeat timer. + var options = StreamingHubClientOptions.CreateWithDefault().WithClientHeartbeatInterval(Timeout.InfiniteTimeSpan); // Disable Client Heartbeat timer. var client = await helper.ConnectAsync(options, timeout.Token); // Act var t = client.Parameter_One(1234); - helper.WriteResponseRaw([0x95 /* Array(5) */, 0x7f /* Type:127 */, 0x00 /* Sequence(0) */, 0xc0, 0xc0, 0xc0 /* Extra */]); // Simulate heartbeat from the server. + helper.WriteResponseRaw([0x95 /* Array(5) */, 0x7f /* Type:127 */, 0x00 /* Sequence(0) */, .. (byte[])[0xcd, 0x30, 0x39] /* ServerSentAt */, 0xc0 /* Nil */, 0xc0 /* Extra */]); // Simulate heartbeat from the server. await Task.Delay(100); // Assert @@ -497,29 +496,55 @@ public async Task Heartbeat_Respond() Assert.Equal(1234, requestBody); var request1 = await helper.ReadRequestRawAsync(); - Assert.Equal([0x94, 0x7f, 0x00 /* Sequence(0) */, 0xc0, 0xc0], request1.ToArray()); // Respond to the heartbeat from the server. + Assert.Equal((byte[])[0x94, 0x7f, 0x00 /* Sequence(0) */, .. (byte[])[0xcd, 0x30, 0x39] /* ServerSentAt */, 0xc0 /* Nil */], request1.ToArray()); // Respond to the heartbeat from the server. } [Fact] - public async Task Heartbeat_Extra() + public async Task ServerHeartbeat_ServerTime() + { + // Arrange + var received = default(DateTimeOffset); + var timeProvider = new FakeTimeProvider(); + var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var helper = new StreamingHubClientTestHelper(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance); + var options = StreamingHubClientOptions.CreateWithDefault() + .WithTimeProvider(timeProvider) + .WithServerHeartbeatReceived(x => received = x.ServerTime) + .WithClientHeartbeatInterval(Timeout.InfiniteTimeSpan); // Disable Heartbeat timer. + var client = await helper.ConnectAsync(options, timeout.Token); + + // Act + helper.WriteResponseRaw((byte[])[0x95 /* Array(5) */, 0x7f /* Type:127 */, 0x00 /* Sequence(0) */, .. (byte[])[0xcf, 0x00, 0x00, 0x01, 0x90, 0x6b, 0x97, 0x5c, 0x00] /* ServerSentAt */, 0xc0 /* Nil */, 0xc0 /* Extra(Nil) */]); // Simulate heartbeat from the server. + await Task.Delay(100); + + // Assert + var request1 = await helper.ReadRequestRawAsync(); + Assert.Equal(new DateTimeOffset(2024, 7, 1, 0, 0, 0, 0, TimeSpan.Zero), received); + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7f /* Type:127 */, 0x00 /* Sequence(0) */, .. (byte[])[0xcf, 0x00, 0x00, 0x01, 0x90, 0x6b, 0x97, 0x5c, 0x00] /* ServerSentAt */, 0xc0 /* Nil */], request1.ToArray()); // Respond to the heartbeat from the server. + } + + [Fact] + public async Task ServerHeartbeat_Extra() { // Arrange var received = Array.Empty(); + var timeProvider = new FakeTimeProvider(); var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var helper = new StreamingHubClientTestHelper(factoryProvider: DynamicStreamingHubClientFactoryProvider.Instance); var options = StreamingHubClientOptions.CreateWithDefault() - .WithServerHeartbeatReceived(x => received = x.ToArray()) + .WithTimeProvider(timeProvider) + .WithServerHeartbeatReceived(x => received = x.Metadata.ToArray()) .WithClientHeartbeatInterval(Timeout.InfiniteTimeSpan); // Disable Heartbeat timer. var client = await helper.ConnectAsync(options, timeout.Token); // Act - helper.WriteResponseRaw([0x95 /* Array(5) */, 0x7f /* Type:127 */, 0x00 /* Sequence(0) */, 0xc0, 0xc0, .."Hello World"u8 /* Extra */]); // Simulate heartbeat from the server. + helper.WriteResponseRaw((byte[])[0x95 /* Array(5) */, 0x7f /* Type:127 */, 0x00 /* Sequence(0) */, .. (byte[])[ 0xcd, 0x30, 0x39 ] /* ServerSentAt */, 0xc0 /* Nil */, .."Hello World"u8 /* Extra */]); // Simulate heartbeat from the server. await Task.Delay(100); // Assert Assert.Equal([.. "Hello World"u8], received); // Respond to the heartbeat from the server. var request1 = await helper.ReadRequestRawAsync(); - Assert.Equal([0x94, 0x7f, 0x00 /* Sequence(0) */, 0xc0, 0xc0], request1.ToArray()); // Respond to the heartbeat from the server. + Assert.Equal((byte[])[0x94 /* Array(4) */, 0x7f /* Type:127 */, 0x00 /* Sequence(0) */, .. (byte[])[0xcd, 0x30, 0x39] /* ServerSentAt */, 0xc0 /* Nil */], request1.ToArray()); // Respond to the heartbeat from the server. } } diff --git a/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubHeartbeatManagerTest.cs b/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubHeartbeatManagerTest.cs index 4520660fb..71f7226cd 100644 --- a/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubHeartbeatManagerTest.cs +++ b/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubHeartbeatManagerTest.cs @@ -45,6 +45,66 @@ public void Handle_Dispose() handle.Dispose(); } + [Fact] + public async Task Latency() + { + // Arrange + var collector = FakeLogCollector.Create(new FakeLogCollectorOptions()); + var logger = new FakeLogger(collector); + var timeProvider = new FakeTimeProvider(); + var manager = new StreamingHubHeartbeatManager(TimeSpan.FromMilliseconds(300), TimeSpan.FromMilliseconds(200), null, timeProvider, logger); + var context = CreateFakeStreamingServiceContext(); + + // Act + using var handle = manager.Register(context); + timeProvider.Advance(TimeSpan.FromMilliseconds(350)); + await Task.Delay(1); + + // Simulate to send heartbeat responses from clients. + timeProvider.Advance(TimeSpan.FromMilliseconds(50)); + handle.Ack(0); + await Task.Delay(1); + + // Assert + Assert.Equal(TimeSpan.FromMilliseconds(50), handle.Latency); + } + + [Fact] + public void Latency_Before_Send() + { + // Arrange + var collector = FakeLogCollector.Create(new FakeLogCollectorOptions()); + var logger = new FakeLogger(collector); + var timeProvider = new FakeTimeProvider(); + var manager = new StreamingHubHeartbeatManager(TimeSpan.FromMilliseconds(300), TimeSpan.FromMilliseconds(200), null, timeProvider, logger); + var context = CreateFakeStreamingServiceContext(); + + // Act + using var handle = manager.Register(context); + + // Assert + Assert.Equal(TimeSpan.Zero, handle.Latency); + } + + [Fact] + public async Task Latency_Before_Ack() + { + // Arrange + var collector = FakeLogCollector.Create(new FakeLogCollectorOptions()); + var logger = new FakeLogger(collector); + var timeProvider = new FakeTimeProvider(); + var manager = new StreamingHubHeartbeatManager(TimeSpan.FromMilliseconds(300), TimeSpan.FromMilliseconds(200), null, timeProvider, logger); + var context = CreateFakeStreamingServiceContext(); + + // Act + using var handle = manager.Register(context); + timeProvider.Advance(TimeSpan.FromMilliseconds(350)); + await Task.Delay(1); + + // Assert + Assert.Equal(TimeSpan.Zero, handle.Latency); + } + [Fact] public async Task Interval_Disable_Timeout() { @@ -55,9 +115,9 @@ public async Task Interval_Disable_Timeout() var context1 = CreateFakeStreamingServiceContext(); var context2 = CreateFakeStreamingServiceContext(); var context3 = CreateFakeStreamingServiceContext(); - byte[] expectedHeartbeatMessageNoExtra1 = BuildMessage(0); - byte[] expectedHeartbeatMessageNoExtra2 = BuildMessage(1); - byte[] expectedHeartbeatMessageNoExtra3 = BuildMessage(2); + byte[] expectedHeartbeatMessageNoExtra1 = BuildMessage(0, timeProvider.GetUtcNow().AddMilliseconds(100)); + byte[] expectedHeartbeatMessageNoExtra2 = BuildMessage(1, timeProvider.GetUtcNow().AddMilliseconds(200)); + byte[] expectedHeartbeatMessageNoExtra3 = BuildMessage(2, timeProvider.GetUtcNow().AddMilliseconds(300)); // Act using var handle1 = manager.Register(context1); @@ -165,9 +225,9 @@ public async Task Interval_Stop_After_HandleDisposed() var context1 = CreateFakeStreamingServiceContext(); var context2 = CreateFakeStreamingServiceContext(); var context3 = CreateFakeStreamingServiceContext(); - byte[] expectedHeartbeatMessageNoExtra1 = BuildMessage(0); - byte[] expectedHeartbeatMessageNoExtra2 = BuildMessage(1); - byte[] expectedHeartbeatMessageNoExtra3 = BuildMessage(2); + byte[] expectedHeartbeatMessageNoExtra1 = BuildMessage(0, timeProvider.GetUtcNow().AddMilliseconds(100)); + byte[] expectedHeartbeatMessageNoExtra2 = BuildMessage(1, timeProvider.GetUtcNow().AddMilliseconds(200)); + byte[] expectedHeartbeatMessageNoExtra3 = BuildMessage(2, timeProvider.GetUtcNow().AddMilliseconds(300)); // Act var handle1 = manager.Register(context1); @@ -214,9 +274,9 @@ public async Task CustomMetadataProvider() var context1 = CreateFakeStreamingServiceContext(); var context2 = CreateFakeStreamingServiceContext(); var context3 = CreateFakeStreamingServiceContext(); - byte[] expectedHeartbeatMessage1 = [.. BuildMessageHeader(0), .. "Hello"u8]; - byte[] expectedHeartbeatMessage2 = [.. BuildMessageHeader(1), .. "Hello"u8]; - byte[] expectedHeartbeatMessage3 = [.. BuildMessageHeader(2), .. "Hello"u8]; + byte[] expectedHeartbeatMessage1 = [.. BuildMessageHeader(0, timeProvider.GetUtcNow().AddMilliseconds(100)), .. "Hello"u8]; + byte[] expectedHeartbeatMessage2 = [.. BuildMessageHeader(1, timeProvider.GetUtcNow().AddMilliseconds(200)), .. "Hello"u8]; + byte[] expectedHeartbeatMessage3 = [.. BuildMessageHeader(2, timeProvider.GetUtcNow().AddMilliseconds(300)), .. "Hello"u8]; // Act using var handle1 = manager.Register(context1); @@ -348,8 +408,8 @@ public async Task Sequence() handle1.Ack(1); // Assert - Assert.Equal(BuildMessage(0), context1.Responses[0].Memory.ToArray()); - Assert.Equal(BuildMessage(1), context1.Responses[1].Memory.ToArray()); + Assert.Equal(BuildMessage(0, origin.AddMilliseconds(350)), context1.Responses[0].Memory.ToArray()); + Assert.Equal(BuildMessage(1, origin.AddMilliseconds(700)), context1.Responses[1].Memory.ToArray()); } [Fact] @@ -360,60 +420,59 @@ public void Writer_WriteServerHeartbeatMessageHeader() var bufferWriter2 = new ArrayBufferWriter(); // Act - StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(bufferWriter1, 0); - StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(bufferWriter2, 1); + StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(bufferWriter1, 0, DateTimeOffset.FromUnixTimeMilliseconds(123456)); + StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(bufferWriter2, 1, DateTimeOffset.FromUnixTimeMilliseconds(456789)); // Assert - Assert.Equal(BuildMessageHeader(0), bufferWriter1.WrittenSpan.ToArray()); - Assert.Equal(BuildMessageHeader(1), bufferWriter2.WrittenSpan.ToArray()); + Assert.Equal(BuildMessageHeader(0, DateTimeOffset.FromUnixTimeMilliseconds(123456)), bufferWriter1.WrittenSpan.ToArray()); + Assert.Equal(BuildMessageHeader(1, DateTimeOffset.FromUnixTimeMilliseconds(456789)), bufferWriter2.WrittenSpan.ToArray()); } - static byte[] BuildMessageHeader(byte sequence) + static byte[] BuildMessageHeader(byte sequence, DateTimeOffset serverSentAt) { var bufferWriter = new ArrayBufferWriter(); var messagePackWriter = new MessagePackWriter(bufferWriter); messagePackWriter.WriteArrayHeader(5); { - messagePackWriter.Write(127); // 0: 0x7f / 127: ServerHeartbeat - messagePackWriter.Write(sequence); // 1: Sequence - messagePackWriter.WriteNil(); // 2: Dummy - messagePackWriter.WriteNil(); // 3: Dummy + messagePackWriter.Write(127); // 0: 0x7f / 127: ServerHeartbeat + messagePackWriter.Write(sequence); // 1: Sequence + messagePackWriter.Write(serverSentAt.ToUnixTimeMilliseconds()); // 2: ServerSentAt + messagePackWriter.WriteNil(); // 3: Dummy } messagePackWriter.Flush(); return bufferWriter.WrittenSpan.ToArray(); } - static byte[] BuildMessage(byte sequence) + + static byte[] BuildMessage(byte sequence, long serverSentAt) { var bufferWriter = new ArrayBufferWriter(); var messagePackWriter = new MessagePackWriter(bufferWriter); messagePackWriter.WriteArrayHeader(5); { - messagePackWriter.Write(127); // 0: 0x7f / 127: ServerHeartbeat - messagePackWriter.Write(sequence); // 1: Sequence - messagePackWriter.WriteNil(); // 2: Dummy - messagePackWriter.WriteNil(); // 3: Dummy - messagePackWriter.WriteNil(); // 4: Dummy + messagePackWriter.Write(127); // 0: 0x7f / 127: ServerHeartbeat + messagePackWriter.Write(sequence); // 1: Sequence + messagePackWriter.Write(serverSentAt); // 2: ServerSentAt + messagePackWriter.WriteNil(); // 3: Dummy + messagePackWriter.WriteNil(); // 4: Dummy } messagePackWriter.Flush(); return bufferWriter.WrittenSpan.ToArray(); } - static byte[] BuildMessage(byte sequence, DateTimeOffset dt) + static byte[] BuildMessage(byte sequence, DateTimeOffset serverSentAt) { var bufferWriter = new ArrayBufferWriter(); var messagePackWriter = new MessagePackWriter(bufferWriter); messagePackWriter.WriteArrayHeader(5); { - messagePackWriter.Write(127); // 0: 0x7f / 127: ServerHeartbeat - messagePackWriter.Write(sequence); // 1: Sequence - messagePackWriter.WriteNil(); // 2: Dummy - messagePackWriter.WriteNil(); // 3: Dummy - messagePackWriter.WriteArrayHeader(1); // 4: Array(1) - { - messagePackWriter.Write(dt.ToUnixTimeMilliseconds()); - } + messagePackWriter.Write(127); // 0: 0x7f / 127: ServerHeartbeat + messagePackWriter.Write(sequence); // 1: Sequence + messagePackWriter.Write(serverSentAt.ToUnixTimeMilliseconds()); // 2: ServerSentAt + messagePackWriter.WriteNil(); // 3: Dummy + messagePackWriter.WriteNil(); // 4: Dummy + } messagePackWriter.Flush(); diff --git a/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubServerHeartbeatTest.cs b/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubServerHeartbeatTest.cs index bdbd32f38..0b47dfc4f 100644 --- a/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubServerHeartbeatTest.cs +++ b/tests/MagicOnion.Server.Tests/StreamingHubHeartbeat/StreamingHubServerHeartbeatTest.cs @@ -21,7 +21,7 @@ public async Task EnableByAttribute() // Arrange var receiver = Substitute.For(); var receivedHeartbeatMetadata = new List(); - var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.ToArray())); + var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.Metadata.ToArray())); // Act var client = await Fixture.CreateStreamingHubClientAsync(receiver, options); @@ -38,7 +38,7 @@ public async Task DisableByAttribute() // Arrange var receiver = Substitute.For(); var receivedHeartbeatMetadata = new List(); - var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.ToArray())); + var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.Metadata.ToArray())); // Act var client = await Fixture.CreateStreamingHubClientAsync(receiver, options); @@ -55,7 +55,7 @@ public async Task Override_Interval() // Arrange var receiver = Substitute.For(); var receivedHeartbeatMetadata = new List(); - var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.ToArray())); + var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.Metadata.ToArray())); // Act var client = await Fixture.CreateStreamingHubClientAsync(receiver, options); @@ -127,7 +127,7 @@ public async Task Default_Disable() // Arrange var receiver = Substitute.For(); var receivedHeartbeatMetadata = new List(); - var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.ToArray())); + var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.Metadata.ToArray())); // Act var client = await Fixture.CreateStreamingHubClientAsync(receiver, options); @@ -169,7 +169,7 @@ public async Task Default_Enable() // Arrange var receiver = Substitute.For(); var receivedHeartbeatMetadata = new List(); - var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.ToArray())); + var options = StreamingHubClientOptions.CreateWithDefault().WithServerHeartbeatReceived(x => receivedHeartbeatMetadata.Add(x.Metadata.ToArray())); // Act var client = await Fixture.CreateStreamingHubClientAsync(receiver, options);