diff --git a/uplink/src/collector/bus/joins.rs b/uplink/src/collector/bus/joins.rs index 4786876c..ede2331d 100644 --- a/uplink/src/collector/bus/joins.rs +++ b/uplink/src/collector/bus/joins.rs @@ -14,9 +14,11 @@ use crate::{ }; type Json = Map; +type InputStream = String; +type FieldName = String; pub struct Router { - map: HashMap>>, + map: HashMap>>, pub tasks: JoinSet<()>, } @@ -26,14 +28,14 @@ impl Router { bridge_tx: BridgeTx, back_tx: Sender, ) -> Self { - let mut map: HashMap>> = HashMap::new(); + let mut map: HashMap>> = HashMap::new(); let mut tasks = JoinSet::new(); for config in configs { let (tx, rx) = bounded(1); let mut fields = HashMap::new(); for stream in &config.construct_from { if let SelectConfig::Fields(selected_fields) = &stream.select_fields { - let renames: &mut HashMap = + let renames: &mut HashMap = fields.entry(stream.input_stream.to_owned()).or_default(); for field in selected_fields { renames.insert(field.original.to_owned(), field.to_owned()); @@ -47,7 +49,7 @@ impl Router { } let joiner = Joiner { rx, - joined: Map::new(), + joined: Json::new(), config, tx: bridge_tx.clone(), fields, @@ -60,7 +62,7 @@ impl Router { Router { map, tasks } } - pub async fn map(&mut self, input_stream: String, json: Json) { + pub async fn map(&mut self, input_stream: InputStream, json: Json) { let Some(iter) = self.map.get(&input_stream) else { return }; for tx in iter { _ = tx.send_async((input_stream.clone(), json.clone())).await; @@ -69,10 +71,10 @@ impl Router { } struct Joiner { - rx: Receiver<(String, Json)>, + rx: Receiver<(InputStream, Json)>, joined: Json, config: JoinConfig, - fields: HashMap>, + fields: HashMap>, tx: BridgeTx, back_tx: Sender, sequence: u32, @@ -83,7 +85,7 @@ impl Joiner { let PushInterval::OnTimeout(period) = self.config.push_interval_s else { loop { match self.rx.recv_async().await { - Ok((stream_name, json)) => self.update(stream_name, json), + Ok((input_stream, json)) => self.update(input_stream, json), Err(e) => { error!("{e}"); return; @@ -97,7 +99,7 @@ impl Joiner { select! { r = self.rx.recv_async() => { match r { - Ok((stream_name, json)) => self.update(stream_name, json), + Ok((input_stream, json)) => self.update(input_stream, json), Err(e) => { error!("{e}"); return; @@ -120,8 +122,8 @@ impl Joiner { } } - fn update(&mut self, stream_name: String, json: Json) { - if let Some(map) = self.fields.get(&stream_name) { + fn update(&mut self, input_stream: InputStream, json: Json) { + if let Some(map) = self.fields.get(&input_stream) { for (mut key, value) in json { // drop unenumerated keys from json let Some(field) = map.get(&key) else { continue };