Skip to content

Commit

Permalink
Merge pull request #52 from CoLearn-Dev/support-call-forwarding
Browse files Browse the repository at this point in the history
Call forwarding
  • Loading branch information
stneng authored Feb 17, 2023
2 parents 6e9b58e + 915258b commit d4b9852
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "colink-server"
version = "0.3.2"
version = "0.3.3"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion proto
102 changes: 91 additions & 11 deletions src/service/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,19 @@ impl crate::server::MyService {
self.check_privilege_in(request.metadata(), &["user", "guest"])
.await?;
let user_id = Self::get_key_from_metadata(request.metadata(), "user_id");
if request.metadata().get("forwarding-target").is_some() {
let forwarding_target = request
.metadata()
.get("forwarding-target")
.unwrap()
.to_str()
.unwrap()
.to_string();
let task = request.into_inner();
self.forward_inter_core_sync_task(&user_id, &forwarding_target, &task)
.await?;
return Ok(Response::new(Empty::default()));
}
if !self
._internal_storage_contains(&user_id, &format!("tasks:{}", request.get_ref().task_id))
.await?
Expand Down Expand Up @@ -342,14 +355,36 @@ impl crate::server::MyService {
Err(e) => return Err(Status::internal(format!("{}", e))),
}
} else {
let (core_addr, guest_jwt) = self.query_user_record(user_id, target_user_id).await?;
let mut client = match self._grpc_connect(&core_addr).await {
Ok(client) => client,
Err(e) => return Err(Status::internal(format!("{}", e))),
};
client
.inter_core_sync_task(generate_request(&guest_jwt, task.clone()))
.await?;
let (core_addr, guest_jwt, forwarding_user_id) =
self.query_user_record(user_id, target_user_id).await?;
if let Some(forwarding_user_id) = forwarding_user_id {
let (core_addr, guest_jwt, double_forwarding_user_id) =
self.query_user_record(user_id, &forwarding_user_id).await?;
if double_forwarding_user_id.is_some() {
return Err(Status::failed_precondition(format!(
"Double forwarding is not allowed (checking user {}).",
forwarding_user_id
)));
}
let mut client = match self._grpc_connect(&core_addr).await {
Ok(client) => client,
Err(e) => return Err(Status::internal(format!("{}", e))),
};
let mut request = generate_request(&guest_jwt, task.clone());
request.metadata_mut().insert(
"forwarding-target",
tonic::metadata::MetadataValue::try_from(target_user_id).unwrap(),
);
client.inter_core_sync_task(request).await?;
} else {
let mut client = match self._grpc_connect(&core_addr).await {
Ok(client) => client,
Err(e) => return Err(Status::internal(format!("{}", e))),
};
client
.inter_core_sync_task(generate_request(&guest_jwt, task.clone()))
.await?;
}
}
Ok(())
}
Expand All @@ -361,7 +396,13 @@ impl crate::server::MyService {
task: &Task,
service: Arc<MyService>,
) -> Result<(), Status> {
let (core_addr, guest_jwt) = self.query_user_record(user_id, target_user_id).await?;
let (core_addr, guest_jwt, forwarding_user_id) =
self.query_user_record(user_id, target_user_id).await?;
if forwarding_user_id.is_some() {
return self
.send_inter_core_sync_task(user_id, target_user_id, task)
.await;
}
let mut client = match self._grpc_connect(&core_addr).await {
Ok(client) => client,
Err(e) => return Err(Status::internal(format!("{}", e))),
Expand Down Expand Up @@ -409,6 +450,35 @@ impl crate::server::MyService {
Ok(())
}

async fn forward_inter_core_sync_task(
&self,
user_id: &str,
target_user_id: &str,
task: &Task,
) -> Result<(), Status> {
if self
.inter_core_reverse_senders
.lock()
.await
.contains_key(&(user_id.to_string(), target_user_id.to_string()))
{
let inter_core_reverse_senders = self.inter_core_reverse_senders.lock().await;
let tx = inter_core_reverse_senders
.get(&(user_id.to_string(), target_user_id.to_string()))
.unwrap();
match tx.send(Ok(task.clone())).await {
Ok(_) => {}
Err(e) => return Err(Status::internal(format!("{}", e))),
}
Ok(())
} else {
Err(Status::failed_precondition(format!(
"Unable to locate target {}.",
target_user_id
)))
}
}

async fn remove_task_from_list_in_storage(
&self,
user_id: &str,
Expand Down Expand Up @@ -518,7 +588,7 @@ impl crate::server::MyService {
&self,
user_id: &str,
query_user_id: &str,
) -> Result<(String, String), Status> {
) -> Result<(String, String, Option<String>), Status> {
let mut counter = 0;
while self
._internal_storage_read(
Expand Down Expand Up @@ -594,7 +664,17 @@ impl crate::server::MyService {
)
.await?;
let guest_jwt = String::from_utf8(guest_jwt).unwrap();
Ok((core_addr, guest_jwt))
let forwarding_user_id = match self
._internal_storage_read(
user_id,
&format!("known_users:{}:forwarding_user_id", &query_user_id),
)
.await
{
Ok(forwarding_user_id) => Some(String::from_utf8(forwarding_user_id).unwrap()),
Err(_) => None,
};
Ok((core_addr, guest_jwt, forwarding_user_id))
}

/**
Expand Down

0 comments on commit d4b9852

Please sign in to comment.