Skip to content

Commit

Permalink
feat: refactor serializer.rs to improve readability (#90)
Browse files Browse the repository at this point in the history
* refactor: a bit of DRY with `construct_publish`

* refactor: DRY writing to disk

* fix: set pkid to pass test

* doc: improve code comments

* fix: don't update stream metrics on network crash
  • Loading branch information
Devdutt Shenoi authored Dec 31, 2022
1 parent ee37777 commit bb6f915
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 137 deletions.
210 changes: 73 additions & 137 deletions uplink/src/base/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,47 +193,19 @@ impl<C: MqttClient> Serializer<C> {
}

/// Write all data received, from here-on, to disk only.
async fn crash(&mut self, mut publish: Publish) -> Result<Status, Error> {
async fn crash(&mut self, publish: Publish) -> Result<Status, Error> {
let storage = match &mut self.storage {
Some(s) => s,
None => return Err(Error::MissingPersistence),
};
// Write failed publish to disk first
publish.pkid = 1;

if let Err(e) = publish.write(storage.writer()) {
error!("Failed to fill write buffer during bad network. Error = {:?}", e);
}

if let Err(e) = storage.flush_on_overflow() {
error!("Failed to flush write buffer to disk during bad network. Error = {:?}", e);
}
// Write failed publish to disk first, metrics don't matter
write_to_disk(publish, storage, &mut None);

loop {
// Collect next data packet to write to disk
// Collect next data packet and write to disk
let data = self.collector_rx.recv_async().await?;
let topic = data.topic();
let payload = data.serialize()?;
let stream = data.stream().as_ref().to_owned();
let point_count = data.len();
let batch_latency = data.batch_latency();
trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}");
if let Some(handler) = self.stream_metrics.as_mut() {
handler.update(stream, point_count, batch_latency);
}

let mut publish = Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload);
publish.pkid = 1;

if let Err(e) = publish.write(storage.writer()) {
error!("Failed to fill write buffer during bad network. Error = {:?}", e);
continue;
}

if let Err(e) = storage.flush_on_overflow() {
error!("Failed to flush write buffer to disk during bad network. Error = {:?}", e);
continue;
}
let publish = construct_publish(&mut None, data)?;
write_to_disk(publish, storage, &mut None);
}
}

Expand Down Expand Up @@ -265,41 +237,8 @@ impl<C: MqttClient> Serializer<C> {
}
}

let topic = data.topic();
let payload = data.serialize()?;
let stream = data.stream().as_ref().to_owned();
let point_count = data.len();
let batch_latency = data.batch_latency();
trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}");
if let Some(handler) = self.stream_metrics.as_mut() {
handler.update(stream, point_count, batch_latency);
}

let payload_size = payload.len();
let mut publish = Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload);
publish.pkid = 1;

match publish.write(storage.writer()) {
Ok(_) => if let Some(handler) = self.serializer_metrics.as_mut(){
handler.add_total_disk_size(payload_size)
},
Err(e) => {
error!("Failed to fill disk buffer. Error = {:?}", e);
continue
}
}

match storage.flush_on_overflow() {
Ok(deleted) => if let Some(handler) = self.serializer_metrics.as_mut() {
if deleted.is_some() {
handler.increment_lost_segments();
}
},
Err(e) => {
error!("Failed to flush disk buffer. Error = {:?}", e);
continue
}
}
let publish = construct_publish(&mut self.stream_metrics, data)?;
write_to_disk(publish, storage, &mut self.serializer_metrics);
}
o = &mut publish => match o {
Ok(_) => return Ok(Status::EventLoopReady),
Expand Down Expand Up @@ -354,41 +293,8 @@ impl<C: MqttClient> Serializer<C> {
}
}

let topic = data.topic();
let payload = data.serialize()?;
let stream = data.stream().as_ref().to_owned();
let point_count = data.len();
let batch_latency = data.batch_latency();
trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}");
if let Some(handler) = self.stream_metrics.as_mut() {
handler.update(stream, point_count, batch_latency);
}

let payload_size = payload.len();
let mut publish = Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload);
publish.pkid = 1;

match publish.write(storage.writer()) {
Ok(_) => if let Some(handler) = self.serializer_metrics.as_mut() {
handler.add_total_disk_size(payload_size)
},
Err(e) => {
error!("Failed to fill disk buffer. Error = {:?}", e);
continue
}
}

match storage.flush_on_overflow() {
Ok(deleted) => if let Some(handler) = self.serializer_metrics.as_mut() {
if deleted.is_some() {
handler.increment_lost_segments();
}
},
Err(e) => {
error!("Failed to flush write buffer to disk during catchup. Error = {:?}", e);
continue
}
}
let publish = construct_publish(&mut self.stream_metrics, data)?;
write_to_disk(publish, storage, &mut self.serializer_metrics);
}
o = &mut send => {
// Send failure implies eventloop crash. Switch state to
Expand Down Expand Up @@ -448,18 +354,9 @@ impl<C: MqttClient> Serializer<C> {
}
}

let topic = data.topic();
let payload = data.serialize()?;
let stream = data.stream().as_ref().to_owned();
let point_count = data.len();
let batch_latency = data.batch_latency();
trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}");
if let Some(handler) = self.stream_metrics.as_mut() {
handler.update(stream, point_count, batch_latency);
}

let payload_size = payload.len();
match self.client.try_publish(topic.as_ref(), QoS::AtLeastOnce, false, payload) {
let publish = construct_publish(&mut self.stream_metrics, data)?;
let payload_size = publish.payload.len();
match self.client.try_publish(publish.topic, QoS::AtLeastOnce, false, publish.payload) {
Ok(_) => if let Some(handler) = self.serializer_metrics.as_mut() {
handler.add_total_sent_size(payload_size);
continue;
Expand All @@ -469,6 +366,7 @@ impl<C: MqttClient> Serializer<C> {
}

}
// On a regular interval, forwards metrics information to network
_ = interval.tick(), if metrics_enabled => {
if let Some(handler) = self.serializer_metrics.as_mut() {
let data = handler.update();
Expand Down Expand Up @@ -538,6 +436,58 @@ async fn send_publish<C: MqttClient>(
Ok(client)
}

// Constructs a [Publish] packet given a [Package] element. Updates stream metrics as necessary.
fn construct_publish(
stream_metrics: &mut Option<StreamMetricsHandler>,
data: Box<dyn Package>,
) -> Result<Publish, Error> {
let stream = data.stream().as_ref().to_owned();
let point_count = data.len();
let batch_latency = data.batch_latency();
trace!("Data received on stream: {stream}; message count = {point_count}; batching latency = {batch_latency}");
if let Some(handler) = stream_metrics {
handler.update(stream, point_count, batch_latency);
}

let topic = data.topic();
let payload = data.serialize()?;

Ok(Publish::new(topic.as_ref(), QoS::AtLeastOnce, payload))
}

// Writes the provided publish packet to disk with [Storage], after setting its pkid to 1.
// Updates serializer metrics with appropriate values on success, if asked to do so.
fn write_to_disk(
mut publish: Publish,
storage: &mut Storage,
serializer_metrics: &mut Option<SerializerMetricsHandler>,
) {
publish.pkid = 1;
let payload_size = publish.payload.len();
if let Err(e) = publish.write(storage.writer()) {
error!("Failed to fill disk buffer. Error = {:?}", e);
return;
}

if let Some(handler) = serializer_metrics {
handler.add_total_disk_size(payload_size)
}

let deleted = match storage.flush_on_overflow() {
Ok(d) => d,
Err(e) => {
error!("Failed to flush disk buffer. Error = {:?}", e);
return;
}
};

if let Some(handler) = serializer_metrics {
if deleted.is_some() {
handler.increment_lost_segments();
}
}
}

#[cfg(test)]
mod test {
use serde_json::Value;
Expand Down Expand Up @@ -609,16 +559,6 @@ mod test {
}
}

fn write_to_storage(storage: &mut Storage, publish: &Publish) {
if let Err(e) = publish.write(storage.writer()) {
panic!("Failed to fill write buffer. Error = {:?}", e);
}

if let Err(e) = storage.flush_on_overflow() {
panic!("Failed to flush write buffer to disk. Error = {:?}", e);
}
}

fn read_from_storage(storage: &mut Storage, max_packet_size: usize) -> Publish {
if storage.reload_on_eof().unwrap() {
panic!("No publishes found in storage");
Expand Down Expand Up @@ -740,12 +680,12 @@ mod test {
QoS::AtLeastOnce,
"[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(),
);
publish.pkid = 1;

write_to_storage(&mut storage, &publish);
write_to_disk(publish.clone(), &mut storage, &mut None);

let stored_publish = read_from_storage(&mut storage, serializer.config.max_packet_size);

// Ensure publish.pkid is 1, as written to disk
publish.pkid = 1;
assert_eq!(publish, stored_publish);
}

Expand Down Expand Up @@ -862,14 +802,12 @@ mod test {
});

// Force write a publish into storage
let mut publish = Publish::new(
let publish = Publish::new(
"hello/world",
QoS::AtLeastOnce,
"[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(),
);
publish.pkid = 1;

write_to_storage(&mut storage, &publish);
write_to_disk(publish.clone(), &mut storage, &mut None);

// Replace storage into serializer
serializer.storage = Some(storage);
Expand Down Expand Up @@ -898,14 +836,12 @@ mod test {
});

// Force write a publish into storage
let mut publish = Publish::new(
let publish = Publish::new(
"hello/world",
QoS::AtLeastOnce,
"[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(),
);
publish.pkid = 1;

write_to_storage(&mut storage, &publish);
write_to_disk(publish.clone(), &mut storage, &mut None);

// Replace storage into serializer
serializer.storage = Some(storage);
Expand Down
4 changes: 4 additions & 0 deletions uplink/src/base/serializer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl SerializerMetricsHandler {
self.metrics.errors.push_str(" | ");
}

// Retrieve metrics to send on network
pub fn update(&mut self) -> &SerializerMetrics {
let timestamp =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0));
Expand Down Expand Up @@ -128,6 +129,8 @@ impl StreamMetricsHandler {
Some(Self { topic, map: Default::default() })
}

/// Updates the metrics for a stream as deemed necessary with the count of points in batch
/// and the difference between first and last elements timestamp as latency being inputs.
pub fn update(&mut self, stream: String, point_count: usize, batch_latency: u64) {
// Init stream metrics max/min values with opposite extreme values to ensure first latency value is accepted
let metrics = self.map.entry(stream.clone()).or_insert(StreamMetrics {
Expand All @@ -139,6 +142,7 @@ impl StreamMetricsHandler {

metrics.max_latency = metrics.max_latency.max(batch_latency);
metrics.min_latency = metrics.min_latency.min(batch_latency);
// NOTE: Average latency is calculated in a slightly lossy fashion,
let total_latency = (metrics.average_latency * metrics.batch_count) + batch_latency;

metrics.batch_count += 1;
Expand Down

0 comments on commit bb6f915

Please sign in to comment.