Skip to content

Commit

Permalink
feat: Integrated Java KV interface
Browse files Browse the repository at this point in the history
  • Loading branch information
YouMeiYouMaoTai committed Oct 17, 2024
1 parent 1778957 commit 597d7ed
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 13 deletions.
4 changes: 2 additions & 2 deletions scripts/deploy_cluster/node_config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
nodes:
9:
addr: 192.168.31.9:2500
4:
addr: 192.168.31.109:2500
spec:
- meta
- master
Expand Down
1 change: 1 addition & 0 deletions src/general/m_appmeta_manager/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn call_app_fn(Path((app, func)): Path<(String, String)>, body: String) ->
}
}

// KV DEBUG
async fn upload_app(mut multipart: Multipart) -> Response {
tracing::debug!("upload_app called");
// only worker can upload app
Expand Down
5 changes: 5 additions & 0 deletions src/general/m_appmeta_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ impl<'de> Deserialize<'de> for FnMetaYaml {
.ok_or_else(|| serde::de::Error::custom("not a map"))?;
// let calls = map.remove("calls").ok_or_else(|| serde::de::Error::missing_field("calls"))?;
let mut calls = vec![];

// KV DEBUG
fn parse_http_call<'de, D: Deserializer<'de>>(
map: &serde_yaml::Value,
) -> Result<HttpCall, D::Error> {
Expand Down Expand Up @@ -695,6 +697,7 @@ impl AppMetaManager {
// let appdir = self.fs_layer.concat_app_dir(app);
let appmeta = self.fs_layer.read_app_meta(tmpapp).await?;

// KV DEBUG
// TODO: 2.check project dir
// 3. if java, take snapshot
if let AppType::Jar = appmeta.app_type {
Expand Down Expand Up @@ -736,6 +739,8 @@ impl AppMetaManager {
.await
.is_some())
}

// KV DEBUG
pub async fn app_uploaded(&self, appname: String, data: Bytes) -> WSResult<()> {
// 1. tmpapp name & dir
// TODO: fobidden tmpapp public access
Expand Down
2 changes: 2 additions & 0 deletions src/general/m_os/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ impl OperatingSystem {
pub fn app_path(&self, app: &str) -> PathBuf {
self.view.appmeta_manager().fs_layer.concat_app_dir(app)
}

// KV DEBUG
pub fn start_process(&self, p: OsProcessType) -> process::Child {
let (mut binding, log_file) = match p {
OsProcessType::JavaApp(app) => {
Expand Down
10 changes: 5 additions & 5 deletions src/general/network/proto_src/kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ message KvPairs{

message KvResponse{
message KvResponse{
repeated KvPair kvs=1;
repeated KvPair kvs=1;
}
oneof resp {
KvResponse common_resp=1;
uint32 lock_id=2;
}
}

message KvResponses{
repeated KvResponse responses=1;
}

message KvRequests{
string app=1;
string func=2;
repeated KvRequest requests=3;
int64 prev_kv_opeid=4;
}

message KvResponses{
repeated KvResponse responses=1;
}

// message MetaKvRequest{
// KvRequest request=1;
// }
Expand Down
1 change: 1 addition & 0 deletions src/worker/func/shared/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{

use super::process::PID;

// KV DEBUG
pub(super) fn cold_start(app: &str, os: &OperatingSystem) -> WSResult<process::Child> {
tracing::debug!("java cold start {}", app);
let p = os.start_process(OsProcessType::JavaApp(app.to_owned()));
Expand Down
1 change: 1 addition & 0 deletions src/worker/func/shared/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl ProcessInstance {
true
}

// KV DEBUG
pub async fn wait_for_verify(&self) -> proc_proto::AppStarted {
if let Some(v) = self.state.0.read().0.as_connected() {
return v.clone();
Expand Down
1 change: 1 addition & 0 deletions src/worker/func/shared/process_instance_man_related.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl InstanceManager {
Ok(())
}

// KV DEBUG
pub async fn make_checkpoint_for_app(&self, app: &str) -> WSResult<()> {
tracing::debug!("make checkpoint for app: {}", app);
let p = self.get_process_instance(&AppType::Jar, app);
Expand Down
153 changes: 148 additions & 5 deletions src/worker/func/shared/process_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use crate::{
modules_global_bridge::process_func::{
ModulesGlobalBrigeAppMetaManager, ModulesGlobalBrigeInstanceManager,
},
result::WSResult,
result::{WSResult, WsRpcErr},
sys::LogicalModulesRef,
worker::func::shared::process_rpc::proc_proto::AppStarted,
};
use async_trait::async_trait;
use parking_lot::Mutex;
use prost::Message;
use parking_lot::{Mutex, RwLock};
use proc_proto::{kv_request::Op, kv_response::{self, CommonKvResponse, Resp}, KvPair, KvResponse, KvResponses};
use prost::{
bytes::{BufMut, BytesMut},
Message,
};
use std::{collections::HashMap, path::Path, time::Duration};
use tokio::sync::oneshot;

Expand All @@ -37,6 +41,8 @@ lazy_static::lazy_static! {
#[async_trait]
impl RpcCustom for ProcessRpc {
type SpawnArgs = String;

// 创建了一个 Unix socket 监听器,用于接收客户端的连接。当有连接时,会通过这个 socket 接收数据。
fn bind(a: String) -> tokio::net::UnixListener {
clean_sock_file(&a);
tokio::net::UnixListener::bind(&a).unwrap()
Expand All @@ -53,15 +59,21 @@ impl RpcCustom for ProcessRpc {
// };
// }


// 一旦有数据到达,后端会先进行数据验证
async fn verify(buf: &[u8]) -> Option<HashValue> {

// 首先尝试将其解码为 proc_proto::AppStarted 结构体
let res = proc_proto::AppStarted::decode(buf);
let res: proc_proto::AppStarted = match res {
Ok(res) => res,

// 如果解码失败,则请求被忽略
Err(_) => {
return None;
}
};

// 然后再根据 appid 查找对应的实例,并设置其验证状态
unsafe {
let appman = ProcessRpc::global_m_app_meta_manager();
let ishttp = {
Expand Down Expand Up @@ -98,15 +110,18 @@ impl RpcCustom for ProcessRpc {
Some(HashValue::Str(res.appid))
}

fn handle_remote_call(_conn: &HashValue, id: u8, buf: &[u8]) -> bool {
// 处理远程调用的主要逻辑。它根据消息的 id 来识别不同类型的请求,并根据请求的类型来处理。
fn handle_remote_call(conn: &HashValue, id: u8, buf: &[u8]) -> bool {
tracing::debug!("handle_remote_call: id: {}", id);
let _ = match id {
4 => (),
5 => (),
id => {
tracing::warn!("handle_remote_call: unsupported id: {}", id);
return false;
}
};
// TODO 再加一类处理新消息类型的
let err = match id {
4 => match proc_proto::UpdateCheckpoint::decode(buf) {
Ok(_req) => {
Expand All @@ -122,6 +137,124 @@ impl RpcCustom for ProcessRpc {
}
Err(e) => e,
},
5 => match proc_proto::KvRequests::decode(buf) {
Ok(req) => {
tracing::debug!("test java kv");
// TODO 要不要有处理逻辑,类似 4 下面的 spawn 这块

// 先接收到KvRequests,根据内容转换为 KvResponses,再传出去
let mut kv_responses: Vec<KvResponse> = Vec::new();

// 遍历 KvRequests 中的每一个请求
for request in req.requests.clone().iter() {
match request.op.clone() {
Some(op)=>{
match op {
Op::Set(kv_put) => {
// 将 kv_put.kv.key、kv_put.kv.value 解码为 String
let key_str = String::from_utf8(kv_put.kv.key.clone()).unwrap();
let value_str = String::from_utf8(kv_put.kv.value.clone()).unwrap();
tracing::debug!("kv_put, key={:?}, value={:?}", key_str, value_str);

let key = (key_str + "_response").into_bytes();
let value = (value_str + "_response").into_bytes();

// 构造 KvResponse
let kv_response = KvResponse {
resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]}))
};
kv_responses.push(kv_response);
},
Op::Get(kv_get) => {
let start_str = String::from_utf8(kv_get.range.start.clone()).unwrap();
let end_str = String::from_utf8(kv_get.range.end.clone()).unwrap();
tracing::debug!("kv_get, start={:?}, end={:?}", start_str, end_str);

let key = (start_str + "_response").into_bytes();
let value = (end_str + "_response").into_bytes();

// 构造 KvResponse
let kv_response = KvResponse {
resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]}))
};
kv_responses.push(kv_response);
},
Op::Delete(kv_delete) => {
let start_str = String::from_utf8(kv_delete.range.start.clone()).unwrap();
let end_str = String::from_utf8(kv_delete.range.end.clone()).unwrap();
tracing::debug!("kv_delete, start={:?}, end={:?}", start_str, end_str);

let key = (start_str + "_response").into_bytes();
let value = (end_str + "_response").into_bytes();

// 构造 KvResponse
let kv_response = KvResponse {
resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]}))
};
kv_responses.push(kv_response);
},
Op::Lock(kv_lock) => {
let read_or_write = kv_lock.read_or_write;
let release_ids = kv_lock.release_id;
let start_str = String::from_utf8(kv_lock.range.start.clone()).unwrap();
let end_str = String::from_utf8(kv_lock.range.end.clone()).unwrap();
tracing::debug!("kv_lock, read_or_write={:?}, release_id={:?}, start={:?}, end={:?}", read_or_write, release_ids, start_str, end_str);

let key = (start_str + "_response").into_bytes();
let value = (end_str + "_response").into_bytes();

// 构造 KvResponse
let kv_response = KvResponse {
resp: Some(Resp::CommonResp(CommonKvResponse{kvs: vec![KvPair{key, value}]}))
};
kv_responses.push(kv_response);
},
}
},
None => {},
}
}

let kv_responses = KvResponses { responses: kv_responses };
struct ConnState {
tx: tokio::sync::mpsc::Sender<Vec<u8>>,
}

let conn_map: RwLock<HashMap<HashValue,ConnState>> = RwLock::new(HashMap::new());

let conn = conn.clone();
let _: tokio::task::JoinHandle<Result<(), WsRpcErr>> = tokio::spawn(async move {
let tx = {
let mut conn_map = conn_map.write(); // 确保使用 await
match conn_map.get_mut(&conn) {
Some(state) => {
state.tx.clone()
},
None => {
// 返回一个错误结果
return Err(WsRpcErr::ConnectionNotEstablished(conn).into());
}
}
};

// 其他逻辑
let mut buf = BytesMut::with_capacity(kv_responses.encoded_len() + 8);
// 长度
buf.put_i32(kv_responses.encoded_len() as i32);
// taskid
buf.put_i32(9999);
// 区别类型 reqType
buf.put_i32(1);

tx.send(buf.into()).await.unwrap();

Ok(())
});

return true;
}
Err(e) => e,
},
_ => unreachable!(),
};
tracing::warn!("handle_remote_call error: {:?}", err);
Expand All @@ -147,11 +280,21 @@ impl MsgIdBind for proc_proto::FuncCallResp {
}
}

impl MsgIdBind for proc_proto::KvRequests {
fn id() -> u16 {
5
}
}

impl ReqMsg for FuncCallReq {
type Resp = FuncCallResp;
}

// TODO 看一下是不是根据这个 app或者func 来区分是什么类型的请求
pub async fn call_func(app: &str, func: &str, arg: String) -> WSResult<FuncCallResp> {

tracing::debug!("CALL_FUNC: app:{}, func:{}, arg:{}", app, func, arg);

rpc_model::call(
FuncCallReq {
func: func.to_owned(),
Expand Down
Loading

0 comments on commit 597d7ed

Please sign in to comment.