Skip to content

Commit

Permalink
refactor: update JSON-RPC server configuration and improve request ha…
Browse files Browse the repository at this point in the history
…ndling
  • Loading branch information
🎲 committed Jan 20, 2025
1 parent 70cc948 commit 2c5b7e3
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 57 deletions.
13 changes: 6 additions & 7 deletions main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ package.cpath = package.cpath .. ";.\\target\\debug\\?.dll"
local jsonrpc = require("lua_json_rpc")

local stop = jsonrpc.start_server({
host = "0.0.0.0",
host = "127.0.0.1",
port = 1359,
workers = 2,
api_key = "super-secret-k3y"
workers = 4
})

io.write("JSON-RPC server started on port 1359\n")
io.flush()

function on_rpc(request)
io.write("Routing Request: " .. request.method .. "\n")
--io.write("Routing Request: " .. request.method .. "\n")

local response = {
id = request.id,
Expand All @@ -26,15 +25,15 @@ function on_rpc(request)
response.result = request.params[1] - request.params[2]
end

io.flush()
--io.flush()

return response
end

local started = os.clock()

---- Run for 10 seconds
while os.clock() - started < 30 do
--while os.clock() - started < 30 do -- Run for 30 seconds
while true do
jsonrpc.process_rpc(on_rpc)
end

Expand Down
28 changes: 13 additions & 15 deletions script.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ import http from 'k6/http';
let seq = 1

export default function () {
const url = 'http://127.0.0.1:1630/rpc';
const payload = JSON.stringify({
"jsonrpc": "2.0",
"id": `${seq++}`,
"method": "hello",
"params": []
});
const url = 'http://127.0.0.1:1359/rpc';
const payload = JSON.stringify({
"jsonrpc": "2.0",
"id": `${seq++}`,
"method": "hello",
"params": []
});

const params = {
headers: {
'Content-Type': 'application/json',
},
};
const params = {
headers: {
'Content-Type': 'application/json',
},
};

http.post(url, payload, params);

// http.get("http://127.0.0.1:12080/cmV0dXJuICJVUCI=?env=default")
http.post(url, payload, params);
}
Binary file added script.result.txt
Binary file not shown.
54 changes: 34 additions & 20 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServ
use actix_ws::{Message, Session};
use log::{error, info};
use serde::Deserialize;
use std::collections::{HashMap, VecDeque};
use std::collections::VecDeque;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;
use tokio::time::timeout;

pub struct AppData {
pub rpc_queue: VecDeque<JsonRpcRequest>,
pub rpc_response_listeners: HashMap<String, oneshot::Sender<JsonRpcResponse>>,
pub rpc_queue: VecDeque<AppRequest>,
pub api_key: Option<String>,
}

pub struct AppRequest {
pub request: JsonRpcRequest,
pub response_sender: Option<oneshot::Sender<JsonRpcResponse>>,
}

#[derive(Deserialize, Debug)]
pub struct AppConfig {
pub host: String,
Expand Down Expand Up @@ -51,21 +57,26 @@ async fn notify_session(
mut session: Session,
receiver: Receiver<JsonRpcResponse>,
) -> Result<(), Error> {
match receiver.await {
Ok(response) => {
info!("Received response: {:?}", response);
match serde_json::to_string(&response) {
Ok(response_message) => match session.text(response_message).await {
Ok(_) => info!("Sent response to session"),
Err(e) => error!("Failed to send response to session: {}", e),
},
Err(e) => {
error!("Failed to serialize response: {}", e);
match timeout(Duration::from_secs(5), receiver).await {
Ok(response) => match response {
Ok(response) => {
info!("Received response: {:?}", response);
match serde_json::to_string(&response) {
Ok(response_message) => match session.text(response_message).await {
Ok(_) => info!("Sent response to session"),
Err(e) => error!("Failed to send response to session: {}", e),
},
Err(e) => {
error!("Failed to serialize response: {}", e);
}
}
}
}
Err(_) => {
error!("Failed to receive response");
}
},
Err(_) => {
error!("Failed to receive response");
error!("Timed out waiting for response");
}
}

Expand Down Expand Up @@ -148,11 +159,14 @@ async fn rpc_handler(
drop(data_guard);

match receiver {
Some(receiver) => match receiver.await {
Ok(response) => serde_json::to_string(&response)
.map(|s| HttpResponse::Ok().body(s))
.map_err(|e| ErrorInternalServerError(e.to_string())),
Err(_) => Err(ErrorInternalServerError("Failed to receive response")),
Some(receiver) => match timeout(Duration::from_secs(5), receiver).await {
Ok(response) => match response {
Ok(response) => serde_json::to_string(&response)
.map(|s| HttpResponse::Ok().body(s))
.map_err(|e| ErrorInternalServerError(e.to_string())),
Err(_) => Err(ErrorInternalServerError("Failed to receive response")),
},
Err(_) => Err(ErrorInternalServerError("Timed out waiting for response")),
},
None => Ok(HttpResponse::Accepted().body("OK")),
}
Expand Down
13 changes: 9 additions & 4 deletions src/json_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::app::AppData;
use crate::app::{AppData, AppRequest};
use log::{debug, info};
use mlua::prelude::LuaFunction;
use mlua::{Error, Lua, LuaSerdeExt, Value};
Expand Down Expand Up @@ -56,13 +56,18 @@ pub fn push_rpc_request(
Some(id) => {
debug!("Adding request to queue with id: {}", id);
let (sender, receiver) = oneshot::channel::<JsonRpcResponse>();
data.rpc_response_listeners.insert(id.clone(), sender);
data.rpc_queue.push_back(request);
data.rpc_queue.push_back(AppRequest {
request,
response_sender: Some(sender),
});
Ok(Some(receiver))
}
None => {
debug!("Adding notification to queue");
data.rpc_queue.push_back(request);
data.rpc_queue.push_back(AppRequest {
request,
response_sender: None,
});
Ok(None)
}
}
Expand Down
18 changes: 7 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use actix_web::web::Data;
use log::{error, info, warn};
use mlua::prelude::*;
use mlua::{Function, Lua, Nil, Result};
use std::collections::{HashMap, VecDeque};
use std::collections::VecDeque;
use std::sync::Mutex;

#[mlua::lua_module]
Expand All @@ -20,7 +20,6 @@ pub fn lua_json_rpc(lua: &Lua) -> Result<LuaTable> {
// Create the shared queue
let _data: Data<Mutex<AppData>> = Data::new(Mutex::new(AppData {
rpc_queue: VecDeque::new(),
rpc_response_listeners: HashMap::new(),
api_key: None,
}));

Expand Down Expand Up @@ -66,8 +65,8 @@ pub fn lua_json_rpc(lua: &Lua) -> Result<LuaTable> {
mlua::Error::RuntimeError(format!("Error acquiring data lock: {:?}", e))
})?;

match data_guard.rpc_queue.pop_front() {
Some(request) => match process_rpc(lua, &callback, &request) {
while let Some(request) = data_guard.rpc_queue.pop_front() {
match process_rpc(lua, &callback, &request.request) {
Ok(response) => {
info!("Processed request: {:?}", response);
let response_message = serde_json::to_string(&response).map_err(|e| {
Expand All @@ -78,8 +77,7 @@ pub fn lua_json_rpc(lua: &Lua) -> Result<LuaTable> {
))
})?;

if let Some(sender) = data_guard.rpc_response_listeners.remove(&response.id)
{
if let Some(sender) = request.response_sender {
match sender.send(response) {
Ok(_) => info!("Published Response: {}", response_message),
Err(e) => info!(
Expand All @@ -88,16 +86,14 @@ pub fn lua_json_rpc(lua: &Lua) -> Result<LuaTable> {
),
}
}

Ok(())
}
Err(err) => {
error!("Error processing request: {:?}", err);
Ok(())
}
},
None => Ok(()),
}
}

Ok(())
})?,
)?;

Expand Down

0 comments on commit 2c5b7e3

Please sign in to comment.