Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and Improve Streaming Rpc #47

Merged
merged 2 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion SharedBuildProperties.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Product>Solana.Unity</Product>
<Version>2.6.1.2</Version>
<Version>2.6.1.3</Version>
<Copyright>Copyright 2022 &#169; Magicblock Labs</Copyright>
<Authors>Magicblock Labs</Authors>
<PublisherName>Magicblock Labs</PublisherName>
Expand Down
5 changes: 4 additions & 1 deletion src/Solana.Unity.Rpc/Core/Sockets/IWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ internal interface IWebSocket : IDisposable
Task CloseAsync(CancellationToken cancellationToken);
Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken);
Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken);
Task<WebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken);

public abstract event WebSocketMessageEventHandler OnMessage;
public delegate void WebSocketMessageEventHandler(byte[] data);
public event EventHandler<WebSocketState> ConnectionStateChangedEvent;
}
}
86 changes: 13 additions & 73 deletions src/Solana.Unity.Rpc/Core/Sockets/StreamingRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected StreamingRpcClient(string url, object logger, IWebSocket socket = defa
_logger = logger;
_sem = new SemaphoreSlim(1, 1);
_connectionStats = new ConnectionStats();
ClientSocket.ConnectionStateChangedEvent += (sender, state) => ConnectionStateChangedEvent?.Invoke(sender, state);
}

/// <summary>
Expand All @@ -70,7 +71,7 @@ public async Task ConnectAsync()
if (ClientSocket.State != WebSocketState.Open)
{
await ClientSocket.ConnectAsync(NodeAddress, CancellationToken.None);
_ = StartListening();
ClientSocket.OnMessage += DispatchMessage;
ConnectionStateChangedEvent?.Invoke(this, State);
}
}
Expand All @@ -80,6 +81,16 @@ public async Task ConnectAsync()
}
}

private void DispatchMessage(byte[] message)
{
HandleNewMessage(new Memory<byte>(message));
_connectionStats.AddReceived((uint)message.Length);
if (ClientSocket.State != WebSocketState.Open && ClientSocket.State != WebSocketState.Connecting)
{
ConnectionStateChangedEvent?.Invoke(this, State);
}
}

/// <inheritdoc cref="IStreamingRpcClient.DisconnectAsync"/>
public async Task DisconnectAsync()
{
Expand All @@ -94,6 +105,7 @@ public async Task DisconnectAsync()
//and will also notify when there is a non-user triggered disconnection event

// handle disconnection cleanup
ClientSocket.OnMessage -= DispatchMessage;
ClientSocket.Dispose();
ClientSocket = new WebSocketWrapper();
CleanupSubscriptions();
Expand All @@ -105,78 +117,6 @@ public async Task DisconnectAsync()
}
}

/// <summary>
/// Starts listeing to new messages.
/// </summary>
/// <returns>Returns the task representing the asynchronous task.</returns>
private async Task StartListening()
{
while (ClientSocket.State is WebSocketState.Open or WebSocketState.Connecting)
{
try
{
await ReadNextMessage();
}
catch (Exception e)
{
if (_logger != null)
{
Console.WriteLine($"Exception trying to read next message: {e.Message}");
}
}
}

if (_logger != null)
{
Console.WriteLine($"Stopped reading messages. ClientSocket.State changed to {ClientSocket.State}");
}
ConnectionStateChangedEvent?.Invoke(this, State);
}

/// <summary>
/// Reads the next message from the socket.
/// </summary>
/// <param name="cancellationToken">The cancelation token.</param>
/// <returns>Returns the task representing the asynchronous task.</returns>
private async Task ReadNextMessage(CancellationToken cancellationToken = default)
{
var buffer = new byte[32768];
Memory<byte> mem = new(buffer);
WebSocketReceiveResult result = await ClientSocket.ReceiveAsync(mem, cancellationToken);
int count = result.Count;

if (result.MessageType == WebSocketMessageType.Close)
{
await ClientSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken);
}
else
{
if (!result.EndOfMessage)
{
MemoryStream ms = new MemoryStream();
ms.Write(mem.Span.ToArray(), 0, mem.Span.Length);


while (!result.EndOfMessage)
{
result = await ClientSocket.ReceiveAsync(mem, cancellationToken).ConfigureAwait(false);

var memSlice = mem.Slice(0, result.Count).Span.ToArray();
ms.Write(memSlice, 0, memSlice.Length);
count += result.Count;
}

mem = new Memory<byte>(ms.ToArray());
}
else
{
mem = mem.Slice(0, count);
}
_connectionStats.AddReceived((uint)count);
HandleNewMessage(mem);
}
}

/// <summary>
/// Handless a new message payload.
/// </summary>
Expand Down
17 changes: 16 additions & 1 deletion src/Solana.Unity.Rpc/Core/Sockets/SubscriptionState.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Solana.Unity.Rpc.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -29,6 +30,11 @@ public abstract class SubscriptionState
/// The current state of the subscription.
/// </summary>
public SubscriptionStatus State { get; protected set; }

/// <summary>
/// The JsonRpcRequest for this subscription.
/// </summary>
internal JsonRpcRequest Request;

/// <summary>
/// The last error message.
Expand Down Expand Up @@ -95,6 +101,15 @@ internal void ChangeState(SubscriptionStatus newState, string error = null, stri

/// <inheritdoc cref="Unsubscribe"/>
public async Task UnsubscribeAsync() => await _rpcClient.UnsubscribeAsync(this).ConfigureAwait(false);

/// <summary>
/// Set the request for this subscription.
/// </summary>
/// <param name="request"></param>
public void SetRequest(JsonRpcRequest request)
{
Request = request;
}
}

/// <summary>
Expand Down
63 changes: 42 additions & 21 deletions src/Solana.Unity.Rpc/Core/Sockets/WebSocketWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using NativeWebSocket;
using System;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -13,8 +12,10 @@ internal class WebSocketWrapper : IWebSocket
private NativeWebSocket.IWebSocket webSocket;

public WebSocketCloseStatus? CloseStatus => WebSocketCloseStatus.NormalClosure;

public string CloseStatusDescription => "Not implemented";

private TaskCompletionSource<bool> _webSocketConnectionTask = new();

public WebSocketState State
{
Expand All @@ -33,37 +34,57 @@ public WebSocketState State
}
}

public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken)
=> webSocket.Close();
public Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription,
CancellationToken cancellationToken)
{
return webSocket.Close();
}

public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
{
webSocket = WebSocket.Create(uri.AbsoluteUri);
webSocket.OnOpen += () =>
{
_webSocketConnectionTask.TrySetResult(true);
webSocket.OnMessage += MessageReceived;
ConnectionStateChangedEvent?.Invoke(this, State);
};
webSocket.OnClose += _ =>
{
webSocket.OnMessage -= MessageReceived;
ConnectionStateChangedEvent?.Invoke(this, State);
};
return webSocket.Connect();
}

private void MessageReceived(byte[] message)
{
OnMessage?.Invoke(message);
}

public Task CloseAsync(CancellationToken cancellationToken)
=> webSocket.Close();

public Task<WebSocketReceiveResult> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken)
{
TaskCompletionSource<WebSocketReceiveResult> receiveMessageTask = new();
public event IWebSocket.WebSocketMessageEventHandler OnMessage;
public event EventHandler<WebSocketState> ConnectionStateChangedEvent;

void WebSocketOnOnMessage(byte[] bytes)
public Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken)
{
if (webSocket.State == NativeWebSocket.WebSocketState.Connecting)
{
bytes.CopyTo(buffer);
WebSocketReceiveResult webSocketReceiveResult = new(bytes.Length, WebSocketMessageType.Text, true);
MainThreadUtil.Run(() => receiveMessageTask.SetResult(webSocketReceiveResult));
webSocket.OnMessage -= WebSocketOnOnMessage;
Console.WriteLine("Message received");
return _webSocketConnectionTask.Task.ContinueWith(_ =>
{
if (webSocket.State != NativeWebSocket.WebSocketState.Open)
{
throw new WebSocketException(WebSocketError.InvalidState, "WebSocket is not connected.");
}
return webSocket.Send(buffer.ToArray());
}, cancellationToken).Unwrap();
}
if (webSocket.State != NativeWebSocket.WebSocketState.Open)
{
throw new WebSocketException(WebSocketError.InvalidState, "WebSocket is not connected.");
}
webSocket.OnMessage += WebSocketOnOnMessage;
return receiveMessageTask.Task;
}

public Task SendAsync(ReadOnlyMemory<byte> buffer, WebSocketMessageType messageType, bool endOfMessage,
CancellationToken cancellationToken)
{
return webSocket.Send(buffer.ToArray());
}

Expand Down
Loading
Loading