Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Integrated Java KV interface #49

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion scripts/deploy_cluster/1.install_remote_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ def os_system(command):
# make ../install tar.gz
os_system_sure("rm -rf install.tar.gz")
CRAC_INSTALL_DIR = "/usr/jdk_crac"
os_system_sure(f"cp -r {CRAC_INSTALL_DIR} ../install/inner/jdk_crac")

# 判断一下 ../install/inner/jdk_crac 存不存在,如果不存在则 cp, 存在则跳过
dst = "../install/inner/jdk_crac"
if not os.path.exists(dst):
os_system(f"cp -r {CRAC_INSTALL_DIR} {dst}")
else:
print(f"{dst} 已存在,跳过复制。")

# os_system_sure(f"cp -r {CRAC_INSTALL_DIR} ../install/inner/jdk_crac")
os_system_sure("tar -czvf install.tar.gz -C ../install .")

def deploy_to_nodes():
Expand Down
8 changes: 4 additions & 4 deletions scripts/deploy_cluster/node_config.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
nodes:
9:
addr: 192.168.31.9:2500
4:
addr: 192.168.31.109:2500
spec:
- meta
- master
10:
addr: 192.168.31.240:2500
11:
addr: 192.168.31.138:2500
spec:
- meta
- worker
1 change: 1 addition & 0 deletions src/general/m_appmeta_manager/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn call_app_fn(Path((app, func)): Path<(String, String)>, body: String) ->
}
}

// KV DEBUG
async fn upload_app(mut multipart: Multipart) -> Response {
tracing::debug!("upload_app called");
// only worker can upload app
Expand Down
6 changes: 6 additions & 0 deletions src/general/m_appmeta_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ impl<'de> Deserialize<'de> for FnMetaYaml {
.ok_or_else(|| serde::de::Error::custom("not a map"))?;
// let calls = map.remove("calls").ok_or_else(|| serde::de::Error::missing_field("calls"))?;
let mut calls = vec![];

// KV DEBUG
fn parse_http_call<'de, D: Deserializer<'de>>(
map: &serde_yaml::Value,
) -> Result<HttpCall, D::Error> {
Expand Down Expand Up @@ -695,6 +697,7 @@ impl AppMetaManager {
// let appdir = self.fs_layer.concat_app_dir(app);
let appmeta = self.fs_layer.read_app_meta(tmpapp).await?;

// KV DEBUG
// TODO: 2.check project dir
// 3. if java, take snapshot
if let AppType::Jar = appmeta.app_type {
Expand Down Expand Up @@ -736,6 +739,8 @@ impl AppMetaManager {
.await
.is_some())
}

// KV DEBUG
pub async fn app_uploaded(&self, appname: String, data: Bytes) -> WSResult<()> {
// 1. tmpapp name & dir
// TODO: fobidden tmpapp public access
Expand Down Expand Up @@ -777,6 +782,7 @@ impl AppMetaManager {
};

// 3. check meta
tracing::debug!("begin check meta");
let res = self.construct_tmp_app(&tmpapp).await;
let appmeta = match res {
Err(e) => {
Expand Down
2 changes: 2 additions & 0 deletions src/general/m_os/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ impl OperatingSystem {
pub fn app_path(&self, app: &str) -> PathBuf {
self.view.appmeta_manager().fs_layer.concat_app_dir(app)
}

// KV DEBUG
pub fn start_process(&self, p: OsProcessType) -> process::Child {
let (mut binding, log_file) = match p {
OsProcessType::JavaApp(app) => {
Expand Down
10 changes: 5 additions & 5 deletions src/general/network/proto_src/kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ message KvPairs{

message KvResponse{
message KvResponse{
repeated KvPair kvs=1;
repeated KvPair kvs=1;
}
oneof resp {
KvResponse common_resp=1;
uint32 lock_id=2;
}
}

message KvResponses{
repeated KvResponse responses=1;
}

message KvRequests{
string app=1;
string func=2;
repeated KvRequest requests=3;
int64 prev_kv_opeid=4;
}

message KvResponses{
repeated KvResponse responses=1;
}

// message MetaKvRequest{
// KvRequest request=1;
// }
Expand Down
39 changes: 32 additions & 7 deletions src/general/network/rpc_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ pub async fn call<Req: ReqMsg>(
// register the call back
let (wait_tx, wait_rx) = oneshot::channel();
let next_task = NEXT_TASK_ID.fetch_add(1, Ordering::SeqCst);
tracing::debug!("insert into CALL_MAP next_task:{:?}", next_task.clone());
let _ = CALL_MAP.write().insert(next_task, wait_tx);
tracing::debug!("insert after CALL_MAP.write(): {:?}", CALL_MAP.write());

// send the request
let mut buf = BytesMut::with_capacity(req.encoded_len() + 8);
buf.put_i32(req.encoded_len() as i32);
buf.put_i32(next_task as i32);
buf.put_i32(2);
req.encode(&mut buf).unwrap();

tracing::debug!("send request: {:?} with len: {}", req, buf.len() - 8);
Expand Down Expand Up @@ -122,15 +125,16 @@ pub async fn call<Req: ReqMsg>(
// Disconnected,
// }

struct ConnState {
#[derive(Debug)]
pub struct ConnState {
/// record the waiters
// Connecting(Vec<oneshot::Sender<tokio::sync::mpsc::Sender<Vec<u8>>>>),
tx: tokio::sync::mpsc::Sender<Vec<u8>>,
pub tx: tokio::sync::mpsc::Sender<Vec<u8>>,
}

lazy_static! {
/// This is an example for using doc comment attributes
static ref CONN_MAP: RwLock<HashMap<HashValue,ConnState>> = RwLock::new(HashMap::new());
pub static ref CONN_MAP: RwLock<HashMap<HashValue,ConnState>> = RwLock::new(HashMap::new());

static ref CALL_MAP: RwLock<HashMap<u32,oneshot::Sender<Vec<u8>>>> = RwLock::new(HashMap::new());

Expand All @@ -147,13 +151,19 @@ async fn listen_task<R: RpcCustom>(socket: tokio::net::UnixStream) {
let Some((conn, rx)) =
listen_task_ext::verify_remote::<R>(&mut sockrx, &mut len, &mut buf).await
else {
tracing::debug!("verify failed");
tracing::warn!("verify failed");
return;
};

tracing::debug!("verify_remote 结束");

listen_task_ext::spawn_send_loop(rx, socktx);

tracing::debug!("spawn_send_loop 结束");

listen_task_ext::read_loop::<R>(conn, &mut sockrx, &mut len, &mut buf).await;

tracing::debug!("read_loop 结束");
}

pub(super) mod listen_task_ext {
Expand All @@ -172,8 +182,8 @@ pub(super) mod listen_task_ext {

pub(super) async fn verify_remote<R: RpcCustom>(
sockrx: &mut OwnedReadHalf,
len: &mut usize,
buf: &mut [u8],
len: &mut usize, // 0
buf: &mut [u8], // 0
) -> Option<(HashValue, Receiver<Vec<u8>>)> {
async fn verify_remote_inner<R: RpcCustom>(
sockrx: &mut OwnedReadHalf,
Expand All @@ -188,25 +198,33 @@ pub(super) mod listen_task_ext {

let verify_msg_len = consume_i32(0, buf, len);

tracing::debug!("len: {}, verify_msg_len: {}", len, verify_msg_len);

// println!("waiting for verify msg {}", verify_msg_len);
if !wait_for_len(sockrx, len, verify_msg_len, buf).await {
tracing::warn!("failed to read verify msg");
return None;
}
// println!("wait done");

tracing::debug!("wait_for_len 完成");

let Some(id) = R::verify(&buf[4..4 + verify_msg_len]).await else {
tracing::warn!("verify failed");
tracing::warn!("verify failed in verify_remote_inner");
return None;
};
let (tx, rx) = tokio::sync::mpsc::channel(10);

// 确定一下为什么 conn_map 里面有上一次连接 id, 需要找这个 conn_map 在哪里都被调用了
let mut write_conn_map = CONN_MAP.write();
tracing::debug!("write_conn_map: {:?}", write_conn_map);
if write_conn_map.contains_key(&id) {
tracing::warn!("conflict conn id: {:?}", id);
return None;
}
let _ = write_conn_map.insert(id.clone(), ConnState { tx });
tracing::debug!("insert into CALL_MAP id:{:?}", id.clone());
tracing::debug!("insert after CALL_MAP.write(): {:?}", write_conn_map);

// println!("verify success");
Some((id, rx))
Expand All @@ -230,6 +248,7 @@ pub(super) mod listen_task_ext {
*len = 0;
let mut offset = 0;
loop {

let (msg_len, msg_id, taskid) = {
let buf = &mut buf[offset..];
if !wait_for_len(socket, len, 9, buf).await {
Expand All @@ -243,6 +262,8 @@ pub(super) mod listen_task_ext {
consume_i32(5, buf, len) as u32,
)
};

tracing::debug!("2 len: {}, msg_len: {}, msg_id: {}, taskid: {}", len, msg_len, msg_id, taskid);

{
if buf.len() < offset + msg_len {
Expand All @@ -269,12 +290,15 @@ pub(super) mod listen_task_ext {
};

let msg = buf[..msg_len].to_vec();
tracing::debug!("msg: {:?}", msg);
cb.send(msg).unwrap();
}

// update the buf meta
offset += msg_len;
*len -= msg_len;

tracing::debug!("1 len: {}, msg_len: {}, msg_id: {}, taskid: {}", len, msg_len, msg_id, taskid);
}

// match socket.read(buf).await {
Expand Down Expand Up @@ -331,6 +355,7 @@ pub(super) mod listen_task_ext {
return false;
}
// println!("recv: {:?}", buf[..n]);
tracing::debug!("len += {}", n);
*len += n;
}
Err(e) => {
Expand Down
1 change: 1 addition & 0 deletions src/worker/func/shared/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{

use super::process::PID;

// KV DEBUG
pub(super) fn cold_start(app: &str, os: &OperatingSystem) -> WSResult<process::Child> {
tracing::debug!("java cold start {}", app);
let p = os.start_process(OsProcessType::JavaApp(app.to_owned()));
Expand Down
1 change: 1 addition & 0 deletions src/worker/func/shared/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl ProcessInstance {
true
}

// KV DEBUG
pub async fn wait_for_verify(&self) -> proc_proto::AppStarted {
if let Some(v) = self.state.0.read().0.as_connected() {
return v.clone();
Expand Down
13 changes: 12 additions & 1 deletion src/worker/func/shared/process_instance_man_related.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::time::Duration;
use tokio::process::Command;

use crate::{
general::m_appmeta_manager::AppType,
general::{
m_appmeta_manager::{AppType},
network::rpc_model::{self, HashValue},
},
result::{WSResult, WsFuncError},
worker::func::{
m_instance_manager::{EachAppCache, InstanceManager},
Expand All @@ -15,6 +18,7 @@ use super::{process::ProcessInstance, SharedInstance};

impl InstanceManager {
pub async fn update_checkpoint(&self, app_name: &str, restart: bool) -> WSResult<()> {
tracing::debug!("start update_checkpoint");
async fn debug_port_left() {
tracing::debug!("debug port left");
// only for test
Expand All @@ -40,6 +44,7 @@ impl InstanceManager {
.into());
};
// state 2 connecting, make others wait
tracing::debug!("state 2 connecting, make others wait");
{
proc_ins.before_checkpoint();
tokio::time::sleep(Duration::from_secs(3)).await;
Expand All @@ -51,6 +56,11 @@ impl InstanceManager {
AppType::Jar => java::take_snapshot(app_name, self.view.os()).await,
AppType::Wasm => unreachable!(),
}

// 打完快照手动 close 一下
tracing::debug!("打完快照手动 close 一下, 移除CONN_MAP中的残余 app");
rpc_model::close_conn(&HashValue::Str(app_name.to_string().clone()));

}
// recover by criu
tokio::time::sleep(Duration::from_secs(3)).await;
Expand Down Expand Up @@ -80,6 +90,7 @@ impl InstanceManager {
Ok(())
}

// KV DEBUG
pub async fn make_checkpoint_for_app(&self, app: &str) -> WSResult<()> {
tracing::debug!("make checkpoint for app: {}", app);
let p = self.get_process_instance(&AppType::Jar, app);
Expand Down
Loading
Loading