From 7fc5608b367fddb87dbf20e7275210d0d704a22b Mon Sep 17 00:00:00 2001 From: Austin Milt Date: Sat, 30 Nov 2024 19:52:48 -0500 Subject: [PATCH 1/3] recreate PR #3773 --- .../Api/ClientWebSocketWrapper.cs | 55 +++++++++++-------- .../Api/Interfaces/ICommApi.cs | 2 - .../Api/WebSocketServer.cs | 7 +-- .../lua/CommonLibs/CommLuaLibrary.cs | 53 ++++++++++++------ 4 files changed, 67 insertions(+), 50 deletions(-) diff --git a/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs b/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs index 26717757fe9..5b7ec39d3e4 100644 --- a/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs +++ b/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs @@ -1,5 +1,6 @@ #nullable enable +using System.Collections.Generic; using System.Net.WebSockets; using System.Text; using System.Threading; @@ -7,22 +8,18 @@ namespace BizHawk.Client.Common { - public struct ClientWebSocketWrapper + public class ClientWebSocketWrapper(Uri uri) { private ClientWebSocket? _w; + private readonly Queue _receivedMessages = new(); + + private readonly Uri _uri = uri; + /// calls getter (unless closed/disposed, then is always returned) public WebSocketState State => _w?.State ?? WebSocketState.Closed; - public ClientWebSocketWrapper( - Uri uri, - CancellationToken cancellationToken = default/* == CancellationToken.None */) - { - _w = new ClientWebSocket(); - _w.ConnectAsync(uri, cancellationToken).Wait(cancellationToken); - } - - /// calls + /// calls /// also calls (wrapper property will continue to work, method calls will throw ) public Task Close( WebSocketCloseStatus closeStatus, @@ -30,28 +27,35 @@ public Task Close( CancellationToken cancellationToken = default/* == CancellationToken.None */) { if (_w == null) throw new ObjectDisposedException(nameof(_w)); - var task = _w.CloseAsync(closeStatus, statusDescription, cancellationToken); + var task = _w.CloseOutputAsync(closeStatus, statusDescription, cancellationToken); _w.Dispose(); _w = null; return task; } /// calls - public Task Receive( - ArraySegment buffer, - CancellationToken cancellationToken = default/* == CancellationToken.None */) - => _w?.ReceiveAsync(buffer, cancellationToken) - ?? throw new ObjectDisposedException(nameof(_w)); + public async Task Receive(int bufferSize, int maxMessages) + { + var buffer = new ArraySegment(new byte[bufferSize]); + while ((_w != null) && (_w.State == WebSocketState.Open)) + { + WebSocketReceiveResult result; + result = await _w.ReceiveAsync(buffer, CancellationToken.None); + if (maxMessages == 0 || _receivedMessages.Count < maxMessages) + { + _receivedMessages.Enqueue(Encoding.UTF8.GetString(buffer.Array, 0, result.Count)); + } + } + } - /// calls - public string Receive( - int bufferCap, - CancellationToken cancellationToken = default/* == CancellationToken.None */) + public async Task Connect(int bufferSize, int maxMessages) { - if (_w == null) throw new ObjectDisposedException(nameof(_w)); - var buffer = new byte[bufferCap]; - var result = Receive(new ArraySegment(buffer), cancellationToken).Result; - return Encoding.UTF8.GetString(buffer, 0, result.Count); + _w ??= new(); + if ((_w != null) && (_w.State != WebSocketState.Open)) + { + await _w.ConnectAsync(_uri, CancellationToken.None); + await Receive(bufferSize, maxMessages); + } } /// calls @@ -77,5 +81,8 @@ public Task Send( cancellationToken ); } + + /// pops the first cached message off the message queue, otherwise returns null + public string? PopMessage() => (_receivedMessages.Count > 0) ? _receivedMessages.Dequeue() : null; } } diff --git a/src/BizHawk.Client.Common/Api/Interfaces/ICommApi.cs b/src/BizHawk.Client.Common/Api/Interfaces/ICommApi.cs index 3d40a598f75..38eb1f02641 100644 --- a/src/BizHawk.Client.Common/Api/Interfaces/ICommApi.cs +++ b/src/BizHawk.Client.Common/Api/Interfaces/ICommApi.cs @@ -10,9 +10,7 @@ public interface ICommApi : IExternalApi SocketServer? Sockets { get; } -#if ENABLE_WEBSOCKETS WebSocketServer WebSockets { get; } -#endif string? HttpTest(); diff --git a/src/BizHawk.Client.Common/Api/WebSocketServer.cs b/src/BizHawk.Client.Common/Api/WebSocketServer.cs index a887b83227a..e9eab1466c1 100644 --- a/src/BizHawk.Client.Common/Api/WebSocketServer.cs +++ b/src/BizHawk.Client.Common/Api/WebSocketServer.cs @@ -1,14 +1,9 @@ #nullable enable -using System.Threading; - namespace BizHawk.Client.Common { public sealed class WebSocketServer { - public ClientWebSocketWrapper Open( - Uri uri, - CancellationToken cancellationToken = default/* == CancellationToken.None */) - => new(uri, cancellationToken); + public ClientWebSocketWrapper Open(Uri uri) => new(uri); } } diff --git a/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs b/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs index db050266a2d..0ba35df074f 100644 --- a/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs +++ b/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs @@ -1,8 +1,9 @@ using System.Collections.Generic; using System.ComponentModel; using System.Linq; +using System.Net.WebSockets; using System.Text; - +using System.Threading.Tasks; using NLua; namespace BizHawk.Client.Common @@ -13,7 +14,7 @@ public sealed class CommLuaLibrary : LuaLibraryBase private readonly IDictionary _websockets = new Dictionary(); public CommLuaLibrary(ILuaLibraries luaLibsImpl, ApiContainer apiContainer, Action logOutputCallback) - : base(luaLibsImpl, apiContainer, logOutputCallback) {} + : base(luaLibsImpl, apiContainer, logOutputCallback) { } public override string Name => "comm"; @@ -253,20 +254,31 @@ private void CheckHttp() } } -#if ENABLE_WEBSOCKETS - [LuaMethod("ws_open", "Opens a websocket and returns the id so that it can be retrieved later.")] + [LuaMethod("ws_open", "Opens a websocket and returns the id so that it can be retrieved later. If an id is provided, reconnects to the ")] [LuaMethodExample("local ws_id = comm.ws_open(\"wss://echo.websocket.org\");")] - public string WebSocketOpen(string uri) + public async Task WebSocketOpen(string uri, string guid = null, int bufferSize = 1024, int maxMessages = 20) { + Log($"Opening websocket server {uri}"); var wsServer = APIs.Comm.WebSockets; if (wsServer == null) { - Log("WebSocket server is somehow not available"); + Log("WebSocket server is not available"); return null; } - var guid = new Guid(); - _websockets[guid] = wsServer.Open(new Uri(uri)); - return guid.ToString(); + var localGuid = guid == null ? Guid.NewGuid() : Guid.Parse(guid); + Log($"Server ID is {localGuid}"); + if (guid == null) + { + Log($"OK here we go, connecting in pt 1"); + _websockets[localGuid] = wsServer.Open(new Uri(uri)); + await _websockets[localGuid].Connect(bufferSize, maxMessages); + } + else + { + Log($"OK here we go, connecting in pt 2"); + await _websockets[localGuid].Connect(bufferSize, maxMessages); + } + return localGuid.ToString(); } [LuaMethod("ws_send", "Send a message to a certain websocket id (boolean flag endOfMessage)")] @@ -276,14 +288,17 @@ public void WebSocketSend( string content, bool endOfMessage) { - if (_websockets.TryGetValue(Guid.Parse(guid), out var wrapper)) wrapper.Send(content, endOfMessage); + if (_websockets.TryGetValue(Guid.Parse(guid), out var wrapper)) + { + _ = wrapper.Send(content, endOfMessage); + } } - [LuaMethod("ws_receive", "Receive a message from a certain websocket id and a maximum number of bytes to read")] - [LuaMethodExample("local ws = comm.ws_receive(ws_id, str_len);")] - public string WebSocketReceive(string guid, int bufferCap) + [LuaMethod("ws_receive", "Receive a message from a certain websocket id")] + [LuaMethodExample("local ws = comm.ws_receive(ws_id);")] + public string WebSocketReceive(string guid) => _websockets.TryGetValue(Guid.Parse(guid), out var wrapper) - ? wrapper.Receive(bufferCap) + ? wrapper.PopMessage() : null; [LuaMethod("ws_get_status", "Get a websocket's status")] @@ -291,17 +306,19 @@ public string WebSocketReceive(string guid, int bufferCap) public int? WebSocketGetStatus(string guid) => _websockets.TryGetValue(Guid.Parse(guid), out var wrapper) ? (int) wrapper.State - : (int?) null; + : null; [LuaMethod("ws_close", "Close a websocket connection with a close status")] [LuaMethodExample("local ws_status = comm.ws_close(ws_id, close_status);")] public void WebSocketClose( string guid, - WebSocketCloseStatus status, + int status, string closeMessage) { - if (_websockets.TryGetValue(Guid.Parse(guid), out var wrapper)) wrapper.Close(status, closeMessage); + if (_websockets.TryGetValue(Guid.Parse(guid), out var wrapper)) + { + _ = wrapper.Close((WebSocketCloseStatus) status, closeMessage); + } } -#endif } } \ No newline at end of file From eac754a9ba5ba1af26fc8dce430d068b57a0ed5a Mon Sep 17 00:00:00 2001 From: Austin Milt Date: Sat, 30 Nov 2024 19:54:08 -0500 Subject: [PATCH 2/3] remove debug logs --- src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs b/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs index 0ba35df074f..4e7a931f56b 100644 --- a/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs +++ b/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs @@ -258,24 +258,19 @@ private void CheckHttp() [LuaMethodExample("local ws_id = comm.ws_open(\"wss://echo.websocket.org\");")] public async Task WebSocketOpen(string uri, string guid = null, int bufferSize = 1024, int maxMessages = 20) { - Log($"Opening websocket server {uri}"); var wsServer = APIs.Comm.WebSockets; if (wsServer == null) { - Log("WebSocket server is not available"); return null; } var localGuid = guid == null ? Guid.NewGuid() : Guid.Parse(guid); - Log($"Server ID is {localGuid}"); if (guid == null) { - Log($"OK here we go, connecting in pt 1"); _websockets[localGuid] = wsServer.Open(new Uri(uri)); await _websockets[localGuid].Connect(bufferSize, maxMessages); } else { - Log($"OK here we go, connecting in pt 2"); await _websockets[localGuid].Connect(bufferSize, maxMessages); } return localGuid.ToString(); From 54df3c0dd19f09741124dd2723b917dc2e874a83 Mon Sep 17 00:00:00 2001 From: Austin Milt Date: Sun, 1 Dec 2024 11:44:34 -0500 Subject: [PATCH 3/3] make WebSocketOpen non-async --- .../Api/ClientWebSocketWrapper.cs | 54 +++++++++++++++---- .../lua/CommonLibs/CommLuaLibrary.cs | 21 ++++---- 2 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs b/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs index 5b7ec39d3e4..d239a8e47a2 100644 --- a/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs +++ b/src/BizHawk.Client.Common/Api/ClientWebSocketWrapper.cs @@ -33,28 +33,62 @@ public Task Close( return task; } - /// calls - public async Task Receive(int bufferSize, int maxMessages) + public async Task Connect(int bufferSize, int maxMessages) { + _w ??= new(); + if ((_w != null) && (_w.State != WebSocketState.Open)) + { + await _w.ConnectAsync(_uri, CancellationToken.None); + await Receive(bufferSize, maxMessages); + } + } + + /// opens a connection to the configured server and passes messages to [consumer] + public async Task Connect(Action consumer, int bufferSize = 1024) + { + _w ??= new(); + if ((_w != null) && (_w.State != WebSocketState.Open)) + { + await _w.ConnectAsync(_uri, CancellationToken.None); + } + var buffer = new ArraySegment(new byte[bufferSize]); while ((_w != null) && (_w.State == WebSocketState.Open)) { - WebSocketReceiveResult result; - result = await _w.ReceiveAsync(buffer, CancellationToken.None); - if (maxMessages == 0 || _receivedMessages.Count < maxMessages) - { - _receivedMessages.Enqueue(Encoding.UTF8.GetString(buffer.Array, 0, result.Count)); - } + var result = await _w.ReceiveAsync(buffer, CancellationToken.None); + string message = Encoding.UTF8.GetString(buffer.Array, 0, result.Count); + consumer(message); } } - public async Task Connect(int bufferSize, int maxMessages) + /// opens a connection to the configured server and passes messages to [consumer] + public async Task Connect(Action consumer, int bufferSize = 2048) { _w ??= new(); if ((_w != null) && (_w.State != WebSocketState.Open)) { await _w.ConnectAsync(_uri, CancellationToken.None); - await Receive(bufferSize, maxMessages); + } + + var buffer = new ArraySegment(new byte[bufferSize]); + while ((_w != null) && (_w.State == WebSocketState.Open)) + { + _ = await _w.ReceiveAsync(buffer, CancellationToken.None); + consumer(buffer.Array); + } + } + + /// calls + public async Task Receive(int bufferSize, int maxMessages) + { + var buffer = new ArraySegment(new byte[bufferSize]); + while ((_w != null) && (_w.State == WebSocketState.Open)) + { + var result = await _w.ReceiveAsync(buffer, CancellationToken.None); + if (maxMessages == 0 || _receivedMessages.Count < maxMessages) + { + _receivedMessages.Enqueue(Encoding.UTF8.GetString(buffer.Array, 0, result.Count)); + } } } diff --git a/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs b/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs index 4e7a931f56b..839dc4c58af 100644 --- a/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs +++ b/src/BizHawk.Client.Common/lua/CommonLibs/CommLuaLibrary.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Net.WebSockets; using System.Text; -using System.Threading.Tasks; using NLua; namespace BizHawk.Client.Common @@ -254,9 +253,13 @@ private void CheckHttp() } } - [LuaMethod("ws_open", "Opens a websocket and returns the id so that it can be retrieved later. If an id is provided, reconnects to the ")] + [LuaMethod("ws_open", "Opens a websocket and returns the id so that it can be retrieved later. If an id is provided, reconnects to the server")] [LuaMethodExample("local ws_id = comm.ws_open(\"wss://echo.websocket.org\");")] - public async Task WebSocketOpen(string uri, string guid = null, int bufferSize = 1024, int maxMessages = 20) + public string WebSocketOpen( + string uri, + string guid = null, + int bufferSize = 1024, + int maxMessages = 20) { var wsServer = APIs.Comm.WebSockets; if (wsServer == null) @@ -264,15 +267,9 @@ public async Task WebSocketOpen(string uri, string guid = null, int buff return null; } var localGuid = guid == null ? Guid.NewGuid() : Guid.Parse(guid); - if (guid == null) - { - _websockets[localGuid] = wsServer.Open(new Uri(uri)); - await _websockets[localGuid].Connect(bufferSize, maxMessages); - } - else - { - await _websockets[localGuid].Connect(bufferSize, maxMessages); - } + + _websockets[localGuid] ??= wsServer.Open(new Uri(uri)); + _websockets[localGuid].Connect(bufferSize, maxMessages).Wait(500); return localGuid.ToString(); }