Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: push all actions no matter what #347

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 21 additions & 35 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,10 @@ impl ActionsBridge {

// NOTE: Don't do any blocking operations here
// TODO: Remove blocking here. Audit all blocking functions here
let error = match self.try_route_action(action.clone()) {
Ok(_) => {
let response = ActionResponse::progress(&action_id, "Received", 0);
self.streams.forward(response).await;
return;
}
Err(e) => e,
let Err(error) = self.try_route_action(action.clone()) else {
let response = ActionResponse::progress(&action_id, "Received", 0);
self.streams.forward(response).await;
return;
};

// Remove action because it couldn't be routed
Expand Down Expand Up @@ -329,10 +326,7 @@ impl ActionsBridge {

/// Save current action information in persistence
fn save_current_action(&mut self) -> Result<(), Error> {
let current_action = match self.current_action.take() {
Some(c) => c,
None => return Ok(()),
};
let Some(current_action) = self.current_action.take() else { return Ok(()) };
let mut path = self.config.persistence_path.clone();
path.push("current_action");
info!("Storing current action in persistence; path: {}", path.display());
Expand Down Expand Up @@ -378,38 +372,39 @@ impl ActionsBridge {
}

async fn forward_action_response(&mut self, mut response: ActionResponse) {
// Ignore responses to timeout action
info!("Action response = {:?}", response);

// Don't forward responses to timeout action
if response.action_id == "timeout" {
return;
}

if self.parallel_actions.contains(&response.action_id) {
self.forward_parallel_action_response(response).await;
// Forward all other responses
self.streams.forward(response.clone()).await;

// Response to parallel actions shouldn't do anything
if self.parallel_actions.contains(&response.action_id) {
if response.is_completed() || response.is_failed() {
self.parallel_actions.remove(&response.action_id);
}
return;
}

let inflight_action = match &mut self.current_action {
Some(v) => v,
None => {
error!("Action timed out already/not present, ignoring response: {:?}", response);
return;
}
let Some(inflight_action) = &mut self.current_action else {
warn!("Action id({}) timed out already/not present", response.action_id);
return;
};

if !inflight_action.is_executing(&response.action_id)
&& !inflight_action.is_cancelled_by(&response.action_id)
{
error!(
"response id({}) != active action({}); response = {:?}",
response.action_id, inflight_action.action.action_id, response
warn!(
"response id({}) != active action({})",
response.action_id, inflight_action.action.action_id
);
return;
}

info!("Action response = {:?}", response);
self.streams.forward(response.clone()).await;

if response.is_completed() || response.is_failed() {
if let Some(CurrentAction { cancelled_by: Some(cancel_action), .. }) =
self.current_action.take()
Expand Down Expand Up @@ -480,15 +475,6 @@ impl ActionsBridge {
Ok(())
}

async fn forward_parallel_action_response(&mut self, response: ActionResponse) {
info!("Action response = {:?}", response);
if response.is_completed() || response.is_failed() {
self.parallel_actions.remove(&response.action_id);
}

self.streams.forward(response).await;
}

async fn forward_action_error(&mut self, action_id: &str, error: Error) {
let response = ActionResponse::failure(action_id, error.to_string());

Expand Down
Loading