Skip to content

Commit

Permalink
fix: clear stream_metrics on write to network (#91)
Browse files Browse the repository at this point in the history
* fix: don't send when `stream_metrics` is empty

* fix: clear metrics only on successful publish
  • Loading branch information
Devdutt Shenoi authored Dec 30, 2022
1 parent 8bab9bd commit ee37777
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
19 changes: 13 additions & 6 deletions uplink/src/base/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,21 +471,28 @@ impl<C: MqttClient> Serializer<C> {
}
_ = interval.tick(), if metrics_enabled => {
if let Some(handler) = self.serializer_metrics.as_mut() {
info!("Publishing serializer metrics to broker");
let data = handler.update();
let payload = serde_json::to_vec(&vec![data])?;
handler.clear();
if let Err(e) = self.client.try_publish(&handler.topic, QoS::AtLeastOnce, false, payload) {
error!("Couldn't publish serializer metrics to broker: {}", e)

info!("Publishing serializer metrics to broker");
match self.client.try_publish(&handler.topic, QoS::AtLeastOnce, false, payload) {
Err(e) => error!("Couldn't publish serializer metrics to broker: {e}"),
_ => handler.clear(),
}
}

if let Some(handler) = self.stream_metrics.as_mut() {
info!("Publishing stream metrics to broker");
let data: Vec<&mut StreamMetrics> = handler.streams().collect();
if data.is_empty() {
continue;
}
let payload = serde_json::to_vec(&data)?;
if let Err(e) = self.client.try_publish(&handler.topic, QoS::AtLeastOnce, false, payload) {
error!("Couldn't publish stream metrics to broker: {}", e)

info!("Publishing stream metrics to broker");
match self.client.try_publish(&handler.topic, QoS::AtLeastOnce, false, payload) {
Err(e) => error!("Couldn't publish stream metrics to broker: {e}"),
_ => handler.clear(),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions uplink/src/base/serializer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ impl StreamMetricsHandler {
pub fn streams(&mut self) -> Streams {
Streams { values: self.map.values_mut() }
}

pub fn clear(&mut self) {
self.map.clear();
}
}

pub struct Streams<'a> {
Expand Down

0 comments on commit ee37777

Please sign in to comment.