Skip to content

Commit

Permalink
style: name types
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Aug 29, 2024
1 parent a91d515 commit 225d4af
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions uplink/src/collector/bus/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ use crate::{
};

type Json = Map<String, Value>;
type InputStream = String;
type FieldName = String;

pub struct Router {
map: HashMap<String, Vec<Sender<(String, Json)>>>,
map: HashMap<InputStream, Vec<Sender<(InputStream, Json)>>>,
pub tasks: JoinSet<()>,
}

Expand All @@ -26,14 +28,14 @@ impl Router {
bridge_tx: BridgeTx,
back_tx: Sender<Payload>,
) -> Self {
let mut map: HashMap<String, Vec<Sender<(String, Json)>>> = HashMap::new();
let mut map: HashMap<InputStream, Vec<Sender<(InputStream, Json)>>> = HashMap::new();
let mut tasks = JoinSet::new();
for config in configs {
let (tx, rx) = bounded(1);
let mut fields = HashMap::new();
for stream in &config.construct_from {
if let SelectConfig::Fields(selected_fields) = &stream.select_fields {
let renames: &mut HashMap<String, Field> =
let renames: &mut HashMap<FieldName, Field> =
fields.entry(stream.input_stream.to_owned()).or_default();
for field in selected_fields {
renames.insert(field.original.to_owned(), field.to_owned());
Expand All @@ -47,7 +49,7 @@ impl Router {
}
let joiner = Joiner {
rx,
joined: Map::new(),
joined: Json::new(),
config,
tx: bridge_tx.clone(),
fields,
Expand All @@ -60,7 +62,7 @@ impl Router {
Router { map, tasks }
}

pub async fn map(&mut self, input_stream: String, json: Json) {
pub async fn map(&mut self, input_stream: InputStream, json: Json) {
let Some(iter) = self.map.get(&input_stream) else { return };
for tx in iter {
_ = tx.send_async((input_stream.clone(), json.clone())).await;
Expand All @@ -69,10 +71,10 @@ impl Router {
}

struct Joiner {
rx: Receiver<(String, Json)>,
rx: Receiver<(InputStream, Json)>,
joined: Json,
config: JoinConfig,
fields: HashMap<String, HashMap<String, Field>>,
fields: HashMap<InputStream, HashMap<FieldName, Field>>,
tx: BridgeTx,
back_tx: Sender<Payload>,
sequence: u32,
Expand All @@ -83,7 +85,7 @@ impl Joiner {
let PushInterval::OnTimeout(period) = self.config.push_interval_s else {
loop {
match self.rx.recv_async().await {
Ok((stream_name, json)) => self.update(stream_name, json),
Ok((input_stream, json)) => self.update(input_stream, json),
Err(e) => {
error!("{e}");
return;
Expand All @@ -97,7 +99,7 @@ impl Joiner {
select! {
r = self.rx.recv_async() => {
match r {
Ok((stream_name, json)) => self.update(stream_name, json),
Ok((input_stream, json)) => self.update(input_stream, json),
Err(e) => {
error!("{e}");
return;
Expand All @@ -120,8 +122,8 @@ impl Joiner {
}
}

fn update(&mut self, stream_name: String, json: Json) {
if let Some(map) = self.fields.get(&stream_name) {
fn update(&mut self, input_stream: InputStream, json: Json) {
if let Some(map) = self.fields.get(&input_stream) {
for (mut key, value) in json {
// drop unenumerated keys from json
let Some(field) = map.get(&key) else { continue };
Expand Down

0 comments on commit 225d4af

Please sign in to comment.