From 2c5b7e3915ed0e529ce76d5c150f6a6964cffc86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8E=B2?= Date: Mon, 20 Jan 2025 19:55:38 +0000 Subject: [PATCH] refactor: update JSON-RPC server configuration and improve request handling --- main.lua | 13 ++++++----- script.js | 28 +++++++++++------------- script.result.txt | Bin 0 -> 3832 bytes src/app.rs | 54 +++++++++++++++++++++++++++++----------------- src/json_rpc.rs | 13 +++++++---- src/lib.rs | 18 ++++++---------- 6 files changed, 69 insertions(+), 57 deletions(-) create mode 100644 script.result.txt diff --git a/main.lua b/main.lua index 73704c7..66e37af 100644 --- a/main.lua +++ b/main.lua @@ -5,17 +5,16 @@ package.cpath = package.cpath .. ";.\\target\\debug\\?.dll" local jsonrpc = require("lua_json_rpc") local stop = jsonrpc.start_server({ - host = "0.0.0.0", + host = "127.0.0.1", port = 1359, - workers = 2, - api_key = "super-secret-k3y" + workers = 4 }) io.write("JSON-RPC server started on port 1359\n") io.flush() function on_rpc(request) - io.write("Routing Request: " .. request.method .. "\n") + --io.write("Routing Request: " .. request.method .. "\n") local response = { id = request.id, @@ -26,15 +25,15 @@ function on_rpc(request) response.result = request.params[1] - request.params[2] end - io.flush() + --io.flush() return response end local started = os.clock() ----- Run for 10 seconds -while os.clock() - started < 30 do +--while os.clock() - started < 30 do -- Run for 30 seconds +while true do jsonrpc.process_rpc(on_rpc) end diff --git a/script.js b/script.js index 39ea6ed..93761b4 100644 --- a/script.js +++ b/script.js @@ -3,21 +3,19 @@ import http from 'k6/http'; let seq = 1 export default function () { - const url = 'http://127.0.0.1:1630/rpc'; - const payload = JSON.stringify({ - "jsonrpc": "2.0", - "id": `${seq++}`, - "method": "hello", - "params": [] - }); + const url = 'http://127.0.0.1:1359/rpc'; + const payload = JSON.stringify({ + "jsonrpc": "2.0", + "id": `${seq++}`, + "method": "hello", + "params": [] + }); - const params = { - headers: { - 'Content-Type': 'application/json', - }, - }; + const params = { + headers: { + 'Content-Type': 'application/json', + }, + }; - http.post(url, payload, params); - - // http.get("http://127.0.0.1:12080/cmV0dXJuICJVUCI=?env=default") + http.post(url, payload, params); } diff --git a/script.result.txt b/script.result.txt new file mode 100644 index 0000000000000000000000000000000000000000..20d2823a717081c562f0b00af8e4881843544490 GIT binary patch literal 3832 zcmd5 zr-`cufmR!Py)!$nZ)PXIe?68*5)?*VF1>HBWi5RfV=ge2zAhKRMNk*_1j|=9H0~MO z?!_ayL6+i`5p?tH1UXp21Nuzq+cxifXb}OLzvg z%nz&07CW@2muSZ~JJcH2BY}+M7IPq9Wg}Br!h>g!FvHsbzMQ4HyBf#XTZ4K99#YpJ z%l%f9&sh6%lz(aDtjsStUm7Z(k|&K!Bq?KwlSo;SP#PFD^ZQijl@K#e{Q}=Qm~&`< zi|2cJ2O542@vVXPY;OoXh#YGCOr72xJ-^eTco`5S#R(1H{E@?4jETH9K85xC2>zKHyw9~!Pu5kVa{!`n& z;Q7h;bRgFyETptPd@+%8*hoB`%1?s}#%*q*a-JfE9?Mq5lyXJjnkQT<;HC?XYD(#% zL3dl}uJ5^`Ed!unfx)aKGKrDQSN@x8O9zxhcQ*dLE%^-dwf9%9ElpsLxgKI@Y|7Zu z0!Is+yW+y{#Bd9d(i7ddw6I6a>7E}U`tv&`E9EMQ3Q?ixi}8)|BJR%+uQhUg1nM6+ zjpKi{s)#$@pHt*fnHoW>vRX_JH88ch^Et+<3y;#<5vK3aqHMQD?MJ-tQR1V9y3tNE zxC~C3;Gt%_aH2OvmO}yJ%j_p^93J3oB7%P!=7r7U_xh-f=O;T0~Uqt+YRLV$hWz zjF6j^>>-+v)kXfW_VK=tux_J^&EN1-)V7i7hitdh10MYT>26J2Y_JX!>uuz1`Pv?u z`g{rvHEUgLu(oUMx7p+Svt?@4iIL{}vuM?QJo*rS-oSVFk>&fAlG#aS!{(R&)!o, - pub rpc_response_listeners: HashMap>, + pub rpc_queue: VecDeque, pub api_key: Option, } +pub struct AppRequest { + pub request: JsonRpcRequest, + pub response_sender: Option>, +} + #[derive(Deserialize, Debug)] pub struct AppConfig { pub host: String, @@ -51,21 +57,26 @@ async fn notify_session( mut session: Session, receiver: Receiver, ) -> Result<(), Error> { - match receiver.await { - Ok(response) => { - info!("Received response: {:?}", response); - match serde_json::to_string(&response) { - Ok(response_message) => match session.text(response_message).await { - Ok(_) => info!("Sent response to session"), - Err(e) => error!("Failed to send response to session: {}", e), - }, - Err(e) => { - error!("Failed to serialize response: {}", e); + match timeout(Duration::from_secs(5), receiver).await { + Ok(response) => match response { + Ok(response) => { + info!("Received response: {:?}", response); + match serde_json::to_string(&response) { + Ok(response_message) => match session.text(response_message).await { + Ok(_) => info!("Sent response to session"), + Err(e) => error!("Failed to send response to session: {}", e), + }, + Err(e) => { + error!("Failed to serialize response: {}", e); + } } } - } + Err(_) => { + error!("Failed to receive response"); + } + }, Err(_) => { - error!("Failed to receive response"); + error!("Timed out waiting for response"); } } @@ -148,11 +159,14 @@ async fn rpc_handler( drop(data_guard); match receiver { - Some(receiver) => match receiver.await { - Ok(response) => serde_json::to_string(&response) - .map(|s| HttpResponse::Ok().body(s)) - .map_err(|e| ErrorInternalServerError(e.to_string())), - Err(_) => Err(ErrorInternalServerError("Failed to receive response")), + Some(receiver) => match timeout(Duration::from_secs(5), receiver).await { + Ok(response) => match response { + Ok(response) => serde_json::to_string(&response) + .map(|s| HttpResponse::Ok().body(s)) + .map_err(|e| ErrorInternalServerError(e.to_string())), + Err(_) => Err(ErrorInternalServerError("Failed to receive response")), + }, + Err(_) => Err(ErrorInternalServerError("Timed out waiting for response")), }, None => Ok(HttpResponse::Accepted().body("OK")), } diff --git a/src/json_rpc.rs b/src/json_rpc.rs index 90f6caa..756b64b 100644 --- a/src/json_rpc.rs +++ b/src/json_rpc.rs @@ -1,4 +1,4 @@ -use crate::app::AppData; +use crate::app::{AppData, AppRequest}; use log::{debug, info}; use mlua::prelude::LuaFunction; use mlua::{Error, Lua, LuaSerdeExt, Value}; @@ -56,13 +56,18 @@ pub fn push_rpc_request( Some(id) => { debug!("Adding request to queue with id: {}", id); let (sender, receiver) = oneshot::channel::(); - data.rpc_response_listeners.insert(id.clone(), sender); - data.rpc_queue.push_back(request); + data.rpc_queue.push_back(AppRequest { + request, + response_sender: Some(sender), + }); Ok(Some(receiver)) } None => { debug!("Adding notification to queue"); - data.rpc_queue.push_back(request); + data.rpc_queue.push_back(AppRequest { + request, + response_sender: None, + }); Ok(None) } } diff --git a/src/lib.rs b/src/lib.rs index 9dfc4c7..637437f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ use actix_web::web::Data; use log::{error, info, warn}; use mlua::prelude::*; use mlua::{Function, Lua, Nil, Result}; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::sync::Mutex; #[mlua::lua_module] @@ -20,7 +20,6 @@ pub fn lua_json_rpc(lua: &Lua) -> Result { // Create the shared queue let _data: Data> = Data::new(Mutex::new(AppData { rpc_queue: VecDeque::new(), - rpc_response_listeners: HashMap::new(), api_key: None, })); @@ -66,8 +65,8 @@ pub fn lua_json_rpc(lua: &Lua) -> Result { mlua::Error::RuntimeError(format!("Error acquiring data lock: {:?}", e)) })?; - match data_guard.rpc_queue.pop_front() { - Some(request) => match process_rpc(lua, &callback, &request) { + while let Some(request) = data_guard.rpc_queue.pop_front() { + match process_rpc(lua, &callback, &request.request) { Ok(response) => { info!("Processed request: {:?}", response); let response_message = serde_json::to_string(&response).map_err(|e| { @@ -78,8 +77,7 @@ pub fn lua_json_rpc(lua: &Lua) -> Result { )) })?; - if let Some(sender) = data_guard.rpc_response_listeners.remove(&response.id) - { + if let Some(sender) = request.response_sender { match sender.send(response) { Ok(_) => info!("Published Response: {}", response_message), Err(e) => info!( @@ -88,16 +86,14 @@ pub fn lua_json_rpc(lua: &Lua) -> Result { ), } } - - Ok(()) } Err(err) => { error!("Error processing request: {:?}", err); - Ok(()) } - }, - None => Ok(()), + } } + + Ok(()) })?, )?;