Skip to content

Commit

Permalink
feat: no route for unsubscribed actions
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Aug 8, 2024
1 parent d50957c commit af4d648
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
bytes = { workspace = true }
flume = { workspace = true }
rumqttc = { workspace = true }
rumqttd = { git = "https://github.com/bytebeamio/rumqtt", default-features = false, optional = true }
rumqttd = { git = "https://github.com/bytebeamio/rumqtt", default-features = false, optional = true, branch = "console-response" }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = "3.3.0"
Expand Down Expand Up @@ -78,6 +78,7 @@ vergen = { version = "7", features = ["git", "build", "time"] }
tempdir = { workspace = true }

[features]
default = ["bus"]
bus = ["rumqttd"]

[[test]]
Expand Down
42 changes: 32 additions & 10 deletions uplink/src/collector/bus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::{collections::HashSet, net::SocketAddr};
use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
};

use flume::{bounded, Receiver};
use joins::Router;
use log::error;
use reqwest::get;
use rumqttd::{
local::{LinkRx, LinkTx},
protocol::Publish,
Expand All @@ -15,7 +19,7 @@ use tokio::select;
use crate::{
base::bridge::{BridgeTx, Payload},
config::BusConfig,
spawn_named_thread, Action,
spawn_named_thread, Action, ActionResponse,
};

mod joins;
Expand All @@ -30,6 +34,10 @@ pub enum Error {
Parse(#[from] std::net::AddrParseError),
#[error("Recv error: {0}")]
Recv(#[from] flume::RecvError),
#[error("Req error: {0}")]
Req(#[from] reqwest::Error),
#[error("Action was not expected")]
NoRoute,
}

pub struct BusRx {
Expand All @@ -50,6 +58,7 @@ impl BusRx {

pub struct BusTx {
tx: LinkTx,
console_url: String,
}

impl BusTx {
Expand All @@ -73,10 +82,19 @@ impl BusTx {
Ok(())
}

fn run_action(&mut self, action: Action) -> Result<(), Error> {
async fn run_action(&mut self, action: &Action) -> Result<(), Error> {
let topic = format!("/actions/{}", action.name);

let url = format!("http://{}/subscriptions", self.console_url);
let body = get(url).await?.bytes().await?;
let subscriptions: HashMap<String, Vec<String>> = serde_json::from_slice(&body)?;

if !subscriptions.contains_key(&topic) {
return Err(Error::NoRoute);
}

let response_topic = format!("/actions/{}/status", action.action_id);
let payload = serde_json::to_vec(&action)?;
let payload = serde_json::to_vec(action)?;

self.tx.subscribe(response_topic)?;
self.tx.publish(topic, payload)?;
Expand Down Expand Up @@ -118,7 +136,8 @@ impl Bus {
connections,
};
let mut console = ConsoleSettings::default();
console.listen = format!("127.0.0.1:{}", config.console_port);
let console_url = format!("127.0.0.1:{}", config.console_port);
console_url.clone_into(&mut console.listen);
let servers = [("service_bus".to_owned(), server)].into_iter().collect();
let mut broker = Broker::new(Config {
id: 0,
Expand All @@ -134,7 +153,7 @@ impl Bus {
}
});

Self { tx: BusTx { tx }, rx: BusRx { rx }, bridge_tx, actions_rx, config }
Self { tx: BusTx { tx, console_url }, rx: BusRx { rx }, bridge_tx, actions_rx, config }
}

#[tokio::main(flavor = "current_thread")]
Expand All @@ -157,8 +176,10 @@ impl Bus {
loop {
select! {
Ok(action) = self.actions_rx.recv_async() => {
if let Err(e) = self.tx.run_action(action) {
error!("{e}")
if let Err(e) = self.tx.run_action(&action).await {
error!("{e}");
let status = ActionResponse::failure(&action.action_id, e.to_string());
self.bridge_tx.send_action_response(status).await;
}
}

Expand All @@ -168,7 +189,7 @@ impl Bus {
error!("Couldn't parse payload as action status");
continue;
};
self.bridge_tx.send_action_response_sync(status);
self.bridge_tx.send_action_response(status).await;
continue;
}

Expand Down Expand Up @@ -219,7 +240,8 @@ mod tests {
#[test]
fn recv_action_and_respond() {
let port = 1883;
let config = BusConfig { port, joins: JoinerConfig { output_streams: vec![] } };
let config =
BusConfig { port, console_port: 3030, joins: JoinerConfig { output_streams: vec![] } };

let (data_tx, _data_rx) = bounded(1);
let (status_tx, status_rx) = bounded(1);
Expand Down
20 changes: 10 additions & 10 deletions uplink/tests/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn as_is_data_from_bus() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -91,7 +91,7 @@ fn join_two_streams_on_new_data_from_bus() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -153,7 +153,7 @@ fn join_two_streams_on_timeout_from_bus() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -207,7 +207,7 @@ fn select_from_stream_on_bus() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -266,7 +266,7 @@ fn select_from_two_streams_on_bus() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -323,7 +323,7 @@ fn null_after_flush() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -412,7 +412,7 @@ fn previous_value_after_flush() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -508,7 +508,7 @@ fn two_streams_with_similar_fields_no_rename() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -589,7 +589,7 @@ fn two_streams_with_similar_fields_renamed() {
publish_on_service_bus: false,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down Expand Up @@ -658,7 +658,7 @@ fn publish_joined_stream_back_on_bus() {
publish_on_service_bus: true,
}],
};
let config = BusConfig { port, joins };
let config = BusConfig { console_port: 3030, port, joins };

let (data_tx, data_rx) = bounded(1);
let (status_tx, _status_rx) = bounded(1);
Expand Down

0 comments on commit af4d648

Please sign in to comment.