From 85288988cc47ae9fb2e4a41ca47cb49d773facfe Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 9 Oct 2024 17:54:58 +0530 Subject: [PATCH 1/9] refactor: simulator parts of the repo (#362) * style: rename simulator service * feat: add runit finish script --- Dockerfile | 3 +-- runit/simulator/finish | 6 ++++++ runit/{uplink => simulator}/run | 0 3 files changed, 7 insertions(+), 2 deletions(-) create mode 100755 runit/simulator/finish rename runit/{uplink => simulator}/run (100%) diff --git a/Dockerfile b/Dockerfile index 9808209cd..9ab476eb0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:alpine as builder +FROM rust:alpine AS builder RUN apk add build-base openssl-dev WORKDIR "/usr/share/bytebeam/uplink" @@ -28,4 +28,3 @@ CMD ["/usr/bin/runsvdir", "/etc/runit"] COPY paths/ paths COPY simulator.sh . - diff --git a/runit/simulator/finish b/runit/simulator/finish new file mode 100755 index 000000000..c54781b94 --- /dev/null +++ b/runit/simulator/finish @@ -0,0 +1,6 @@ +#!/bin/sh + +cd /usr/share/bytebeam/uplink + +exec ./simulator.sh kill_devices +echo "Killed all simulated devices" diff --git a/runit/uplink/run b/runit/simulator/run similarity index 100% rename from runit/uplink/run rename to runit/simulator/run From 2498e28c5b0f76a1f6e5bb7adb59ce480442342e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 9 Oct 2024 23:05:28 +0530 Subject: [PATCH 2/9] refactor: DRY serializer --- uplink/src/base/serializer/mod.rs | 141 ++++++++++++++---------------- 1 file changed, 68 insertions(+), 73 deletions(-) diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 6c3b914c5..55752ebce 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -381,23 +381,7 @@ impl Serializer { select! { data = self.collector_rx.recv_async() => { let data = data?; - let stream = data.stream_config(); - let publish = construct_publish(data, &mut self.stream_metrics)?; - let storage = self.storage_handler.select(&stream); - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => { - debug!("Lost segment = {deleted}"); - self.metrics.increment_lost_segments(); - } - Ok(_) => {}, - Err(e) => { - error!("Storage write error = {e}"); - self.metrics.increment_errors(); - } - }; - - // Update metrics - self.metrics.add_batch(); + store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?; } o = &mut publish => match o { Ok(_) => break Ok(Status::EventLoopReady), @@ -438,56 +422,21 @@ impl Serializer { self.metrics.set_mode("catchup"); let max_packet_size = self.config.mqtt.max_packet_size; - let client = self.client.clone(); - - let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else { + let Some((mut last_publish_stream, publish)) = + next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)? + else { return Ok(Status::Normal); }; - // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. - // This leads to force switching to normal mode. Increasing max_payload_size to bypass this - let publish = match Packet::read(storage.reader(), max_packet_size) { - Ok(Packet::Publish(publish)) => publish, - Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), - Err(e) => { - self.metrics.increment_errors(); - error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); - save_and_prepare_next_metrics( - &mut self.pending_metrics, - &mut self.metrics, - &mut self.stream_metrics, - &self.storage_handler, - ); - return Ok(Status::Normal); - } - }; - let mut last_publish_payload_size = publish.payload.len(); - let mut last_publish_stream = stream.clone(); - let send = send_publish(client, publish.topic, publish.payload); + let send = send_publish(self.client.clone(), publish.topic, publish.payload); tokio::pin!(send); let v: Result = loop { select! { data = self.collector_rx.recv_async() => { let data = data?; - let stream = data.stream_config(); - let publish = construct_publish(data, &mut self.stream_metrics)?; - let storage = self.storage_handler.select(&stream); - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => { - debug!("Lost segment = {deleted}"); - self.metrics.increment_lost_segments(); - } - Ok(_) => {}, - Err(e) => { - error!("Storage write error = {e}"); - self.metrics.increment_errors(); - } - }; - - // Update metrics - self.metrics.add_batch(); + store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?; } o = &mut send => { self.metrics.add_sent_size(last_publish_payload_size); @@ -495,25 +444,16 @@ impl Serializer { // indefinitely write to disk to not loose data let client = match o { Ok(c) => c, - Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish, last_publish_stream.clone())), - Err(e) => unreachable!("Unexpected error: {e}"), - }; - - let Some((stream, storage)) = self.storage_handler.next(&mut self.metrics) else { - return Ok(Status::Normal); - }; - - let publish = match Packet::read(storage.reader(), max_packet_size) { - Ok(Packet::Publish(publish)) => publish, - Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), - Err(e) => { - error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); - break Ok(Status::Normal) + Err(MqttError::Send(Request::Publish(publish))) => { + break Ok(Status::EventLoopCrash(publish, last_publish_stream.clone())) } + Err(e) => unreachable!("Unexpected error: {e}"), }; - self.metrics.add_batch(); - + let Some((stream, publish)) = next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)? + else { + return Ok(Status::Normal); + }; let payload = publish.payload; last_publish_payload_size = payload.len(); last_publish_stream = stream.clone(); @@ -606,6 +546,61 @@ impl Serializer { } } +// Selects the right read buffer for storage and serializes received data as a Publish packet into it. +// Updates metrics regarding the serializer as well. +fn store_received_data( + data: Box, + storage_handler: &mut StorageHandler, + stream_metrics: &mut HashMap, + metrics: &mut Metrics, +) -> Result<(), Error> { + let stream = data.stream_config(); + let publish = construct_publish(data, stream_metrics)?; + let storage = storage_handler.select(&stream); + match write_to_storage(publish, storage) { + Ok(Some(deleted)) => { + debug!("Lost segment = {deleted}"); + metrics.increment_lost_segments(); + } + Ok(_) => {} + Err(e) => { + error!("Storage write error = {e}"); + metrics.increment_errors(); + } + }; + + // Update metrics + metrics.add_batch(); + + Ok(()) +} + +// Deserializes a Publish packet from the storage read buffer and updates metrics regarding the serializer. +fn next_publish( + storage_handler: &mut StorageHandler, + metrics: &mut Metrics, + max_packet_size: usize, +) -> Result, Publish)>, Error> { + let Some((stream, storage)) = storage_handler.next(metrics) else { + return Ok(None); + }; + + // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. + // This leads to force switching to normal mode. Increasing max_payload_size to bypass this + let publish = match Packet::read(storage.reader(), max_packet_size) { + Ok(Packet::Publish(publish)) => publish, + Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), + Err(e) => { + error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); + return Ok(None); + } + }; + + metrics.add_batch(); + + Ok(Some((stream.clone(), publish))) +} + async fn send_publish( client: C, topic: String, From 587a22e17045b0b519809e138a7bae4000e4d61d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 10 Oct 2024 12:42:28 +0530 Subject: [PATCH 3/9] fix: ignore corrupted `current_action` file (#363) --- uplink/src/base/bridge/actions_lane.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index cad5a081b..e910a7047 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -153,7 +153,9 @@ impl ActionsBridge { pub async fn start(&mut self) -> Result<(), Error> { let mut metrics_timeout = interval(self.config.stream_metrics.timeout); let mut end: Pin> = Box::pin(time::sleep(Duration::from_secs(u64::MAX))); - self.load_saved_action()?; + if let Err(e) = self.load_saved_action() { + error!("Couldn't load saved action: {e}"); + } loop { select! { From 8866ea6218d03084f4fc7af254d1709b6212e579 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 11 Oct 2024 21:08:14 +0530 Subject: [PATCH 4/9] refactor: use `let Some else` to de-nest --- storage/src/lib.rs | 48 +++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index eef1abfe9..412f29191 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -136,35 +136,35 @@ impl Storage { return Ok(false); } - if let Some(persistence) = &mut self.persistence { - // Remove read file on completion in destructive-read mode - let read_is_destructive = !persistence.non_destructive_read; - let read_file_id = persistence.current_read_file_id.take(); - if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() { - let deleted_file = persistence.remove(id)?; - debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name); - } - - // Swap read buffer with write buffer to read data in inmemory write - // buffer when all the backlog disk files are done - if persistence.backlog_files.is_empty() { - mem::swap(&mut self.current_read_file, &mut self.current_write_file); - // If read buffer is 0 after swapping, all the data is caught up - return Ok(self.current_read_file.is_empty()); - } + let Some(persistence) = &mut self.persistence else { + mem::swap(&mut self.current_read_file, &mut self.current_write_file); + // If read buffer is 0 after swapping, all the data is caught up + return Ok(self.current_read_file.is_empty()); + }; - if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) { - self.current_read_file.clear(); - persistence.current_read_file_id.take(); - return Err(e); - } + // Remove read file on completion in destructive-read mode + let read_is_destructive = !persistence.non_destructive_read; + let read_file_id = persistence.current_read_file_id.take(); + if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() { + let deleted_file = persistence.remove(id)?; + debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name); + } - Ok(false) - } else { + // Swap read buffer with write buffer to read data in inmemory write + // buffer when all the backlog disk files are done + if persistence.backlog_files.is_empty() { mem::swap(&mut self.current_read_file, &mut self.current_write_file); // If read buffer is 0 after swapping, all the data is caught up - Ok(self.current_read_file.is_empty()) + return Ok(self.current_read_file.is_empty()); } + + if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) { + self.current_read_file.clear(); + persistence.current_read_file_id.take(); + return Err(e); + } + + Ok(false) } } From 54ea96d035ff05ec03f3c13d2941d88211f60ad5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 13 Oct 2024 13:24:28 +0530 Subject: [PATCH 5/9] refactor: use a wrapper to simplify interfacing with `storage` (#364) * refactor: wrapper on storage to improve readability * test: ensure serializer handles data in FIFO order * refactor: simplify code with lesser match branches * doc: update comments * refactor: DRY storage metrics collection * refactor: next can't err * refactor: store can't err * refactor: `send_publish` takes a `publish` * doc: comment explaining functions * refactor: `clone` when required only --- uplink/src/base/serializer/mod.rs | 397 ++++++++++++++---------------- uplink/src/lib.rs | 2 +- uplink/tests/serializer.rs | 107 +++++--- 3 files changed, 258 insertions(+), 248 deletions(-) diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 55752ebce..1853b5d4c 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -2,6 +2,7 @@ mod metrics; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{self, Write}; +use std::path::PathBuf; use std::time::Instant; use std::{sync::Arc, time::Duration}; @@ -10,7 +11,6 @@ use flume::{bounded, Receiver, RecvError, Sender, TrySendError}; use log::{debug, error, info, trace}; use lz4_flex::frame::FrameEncoder; use rumqttc::*; -use storage::Storage; use thiserror::Error; use tokio::{select, time::interval}; @@ -131,6 +131,63 @@ impl MqttClient for AsyncClient { } } +pub struct Storage { + inner: storage::Storage, +} + +impl Storage { + pub fn new(name: impl Into, max_file_size: usize) -> Self { + Self { inner: storage::Storage::new(name, max_file_size) } + } + + pub fn set_persistence( + &mut self, + backup_path: impl Into, + max_file_count: usize, + ) -> Result<(), storage::Error> { + self.inner.set_persistence(backup_path.into(), max_file_count) + } + + // Stores the provided publish packet by serializing it into storage, after setting its pkid to 1. + // If the write buffer is full, it is flushed/written onto disk based on config. + pub fn write(&mut self, mut publish: Publish) -> Result, storage::Error> { + publish.pkid = 1; + if let Err(e) = publish.write(self.inner.writer()) { + error!("Failed to fill disk buffer. Error = {e}"); + return Ok(None); + } + + self.inner.flush_on_overflow() + } + + // Ensures read-buffer is ready to be read from, exchanges buffers if required, returns true if empty. + pub fn reload_on_eof(&mut self) -> Result { + self.inner.reload_on_eof() + } + + // Deserializes publish packets from storage, returns None when it fails, i.e read buffer is empty. + // + // ## Panic + // When any packet other than a publish is deserialized. + pub fn read(&mut self, max_packet_size: usize) -> Option { + // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. + // This leads to force switching to normal mode. Increasing max_payload_size to bypass this + match Packet::read(self.inner.reader(), max_packet_size) { + Ok(Packet::Publish(publish)) => Some(publish), + Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), + Err(e) => { + error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); + None + } + } + } + + // Ensures all data is written into persistence, when configured. + pub fn flush(&mut self) -> Result, storage::Error> { + self.inner.flush() + } +} + struct StorageHandler { config: Arc, map: BTreeMap, Storage>, @@ -161,48 +218,64 @@ impl StorageHandler { path.display() ); } - map.insert(Arc::new(stream_config.clone()), storage); + map.insert(Arc::new(stream_config), storage); } Ok(Self { config, map, read_stream: None }) } - fn select(&mut self, stream: &Arc) -> &mut Storage { - self.map + // Selects the right storage for to write into and serializes received data as a Publish packet into it. + fn store(&mut self, stream: Arc, publish: Publish, metrics: &mut Metrics) { + match self + .map .entry(stream.to_owned()) .or_insert_with(|| Storage::new(&stream.topic, self.config.default_buf_size)) + .write(publish) + { + Ok(Some(deleted)) => { + debug!("Lost segment = {deleted}"); + metrics.increment_lost_segments(); + } + Err(e) => { + error!("Crash loop: write error = {e}"); + metrics.increment_errors(); + } + _ => {} + } } - fn next(&mut self, metrics: &mut Metrics) -> Option<(&Arc, &mut Storage)> { + // Extracts a publish packet from storage if any exist, else returns None + fn next(&mut self, metrics: &mut Metrics) -> Option<(Arc, Publish)> { let storages = self.map.iter_mut(); for (stream, storage) in storages { - match (storage.reload_on_eof(), &mut self.read_stream) { - // Done reading all pending files for a persisted stream - (Ok(true), Some(curr_stream)) => { - if curr_stream == stream { - self.read_stream.take(); - debug!("Completed reading from: {}", stream.topic); + match storage.reload_on_eof() { + Ok(true) => { + if self.read_stream.take_if(|s| s == stream).is_some() { + debug!("Done reading from: {}", stream.topic); } - - continue; } - // Persisted stream is empty - (Ok(true), _) => continue, - // Reading from a newly loaded non-empty persisted stream - (Ok(false), None) => { - debug!("Reading from: {}", stream.topic); - self.read_stream = Some(stream.to_owned()); - return Some((stream, storage)); + // Reading from a non-empty persisted stream + Ok(false) => { + if self.read_stream.is_none() { + self.read_stream.replace(stream.to_owned()); + debug!("Started reading from: {}", stream.topic); + } else { + trace!("Reading from: {}", stream.topic); + } + + let Some(publish) = storage.read(self.config.mqtt.max_packet_size) else { + continue; + }; + metrics.add_batch(); + + return Some((stream.to_owned(), publish)); } - // Continuing to read from persisted stream loaded earlier - (Ok(false), _) => return Some((stream, storage)), // Reload again on encountering a corrupted file - (Err(e), _) => { + Err(e) => { metrics.increment_errors(); metrics.increment_lost_segments(); error!("Failed to reload from storage. Error = {e}"); - continue; } } } @@ -210,6 +283,8 @@ impl StorageHandler { None } + // Force flushes all in-memory buffers to ensure zero packet loss during uplink restart. + // TODO: Ensure packets in read-buffer but not on disk are not lost. fn flush_all(&mut self) { for (stream_config, storage) in self.map.iter_mut() { match storage.flush() { @@ -222,6 +297,26 @@ impl StorageHandler { } } } + + // Update Metrics about all storages being handled + fn update_metrics(&self, metrics: &mut Metrics) { + let mut inmemory_write_size = 0; + let mut inmemory_read_size = 0; + let mut file_count = 0; + let mut disk_utilized = 0; + + for storage in self.map.values() { + inmemory_write_size += storage.inner.inmemory_write_size(); + inmemory_read_size += storage.inner.inmemory_read_size(); + file_count += storage.inner.file_count(); + disk_utilized += storage.inner.disk_utilized(); + } + + metrics.set_write_memory(inmemory_write_size); + metrics.set_read_memory(inmemory_read_size); + metrics.set_disk_files(file_count); + metrics.set_disk_utilized(disk_utilized); + } } /// The uplink Serializer is the component that deals with serializing, compressing and writing data onto disk or Network. @@ -269,7 +364,6 @@ impl StorageHandler { /// [`try_publish()`]: AsyncClient::try_publish /// [`publish()`]: AsyncClient::publish pub struct Serializer { - config: Arc, collector_rx: Receiver>, client: C, storage_handler: StorageHandler, @@ -291,11 +385,10 @@ impl Serializer { client: C, metrics_tx: Sender, ) -> Result, Error> { - let storage_handler = StorageHandler::new(config.clone())?; + let storage_handler = StorageHandler::new(config)?; let (ctrl_tx, ctrl_rx) = bounded(1); Ok(Serializer { - config, collector_rx, client, storage_handler, @@ -325,14 +418,10 @@ impl Serializer { self.storage_handler.flush_all(); return Ok(()); }; - let stream_config = data.stream_config(); + + let stream = data.stream_config(); let publish = construct_publish(data, &mut self.stream_metrics)?; - let storage = self.storage_handler.select(&stream_config); - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), - Ok(_) => {} - Err(e) => error!("Shutdown: write error = {e}"), - } + self.storage_handler.store(stream, publish, &mut self.metrics); } } @@ -342,24 +431,15 @@ impl Serializer { publish: Publish, stream: Arc, ) -> Result { - let storage = self.storage_handler.select(&stream); // Write failed publish to disk first, metrics don't matter - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), - Ok(_) => {} - Err(e) => error!("Crash loop: write error = {e}"), - } + self.storage_handler.store(stream, publish, &mut self.metrics); loop { // Collect next data packet and write to disk let data = self.collector_rx.recv_async().await?; + let stream = data.stream_config(); let publish = construct_publish(data, &mut self.stream_metrics)?; - let storage = self.storage_handler.select(&stream); - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => debug!("Lost segment = {deleted}"), - Ok(_) => {} - Err(e) => error!("Crash loop: write error = {e}"), - } + self.storage_handler.store(stream, publish, &mut self.metrics); } } @@ -373,15 +453,16 @@ impl Serializer { // Note: self.client.publish() is executing code before await point // in publish method every time. Verify this behaviour later - let payload = Bytes::copy_from_slice(&publish.payload[..]); - let publish = send_publish(self.client.clone(), publish.topic, payload); + let publish = send_publish(self.client.clone(), publish); tokio::pin!(publish); let v: Result = loop { select! { data = self.collector_rx.recv_async() => { let data = data?; - store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?; + let stream = data.stream_config(); + let publish = construct_publish(data, &mut self.stream_metrics)?; + self.storage_handler.store(stream, publish, &mut self.metrics); } o = &mut publish => match o { Ok(_) => break Ok(Status::EventLoopReady), @@ -412,7 +493,7 @@ impl Serializer { /// Write new collector data to disk while sending existing data on /// disk to mqtt eventloop. Collector rx is selected with blocking - /// `publish` instead of `try publish` to ensure that transient back + /// `publish` instead of `try_publish` to ensure that transient back /// pressure due to a lot of data on disk doesn't switch state to /// `Status::SlowEventLoop` async fn catchup(&mut self) -> Result { @@ -421,22 +502,21 @@ impl Serializer { let mut interval = interval(METRICS_INTERVAL); self.metrics.set_mode("catchup"); - let max_packet_size = self.config.mqtt.max_packet_size; - let Some((mut last_publish_stream, publish)) = - next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)? + let Some((mut last_publish_stream, publish)) = self.storage_handler.next(&mut self.metrics) else { return Ok(Status::Normal); }; - let mut last_publish_payload_size = publish.payload.len(); - let send = send_publish(self.client.clone(), publish.topic, publish.payload); + let send = send_publish(self.client.clone(), publish); tokio::pin!(send); let v: Result = loop { select! { data = self.collector_rx.recv_async() => { let data = data?; - store_received_data(data, &mut self.storage_handler,&mut self.stream_metrics, &mut self.metrics)?; + let stream = data.stream_config(); + let publish = construct_publish(data, &mut self.stream_metrics)?; + self.storage_handler.store(stream, publish, &mut self.metrics); } o = &mut send => { self.metrics.add_sent_size(last_publish_payload_size); @@ -445,27 +525,27 @@ impl Serializer { let client = match o { Ok(c) => c, Err(MqttError::Send(Request::Publish(publish))) => { - break Ok(Status::EventLoopCrash(publish, last_publish_stream.clone())) + break Ok(Status::EventLoopCrash(publish, last_publish_stream)) } Err(e) => unreachable!("Unexpected error: {e}"), }; - let Some((stream, publish)) = next_publish(&mut self.storage_handler, &mut self.metrics, max_packet_size)? + let Some((stream, publish)) = self.storage_handler.next(&mut self.metrics) else { - return Ok(Status::Normal); + break Ok(Status::Normal); }; - let payload = publish.payload; - last_publish_payload_size = payload.len(); - last_publish_stream = stream.clone(); - send.set(send_publish(client, publish.topic, payload)); + + last_publish_payload_size = publish.payload.len(); + last_publish_stream = stream; + send.set(send_publish(client, publish)); } // On a regular interval, forwards metrics information to network _ = interval.tick() => { let _ = check_and_flush_metrics(&mut self.pending_metrics, &mut self.metrics, &self.metrics_tx, &self.storage_handler); } - // Transition into crash mode when uplink is shutting down + // Transition into shutdown mode when uplink is shutting down Ok(SerializerShutdown) = self.ctrl_rx.recv_async() => { - return Ok(Status::Shutdown) + break Ok(Status::Shutdown) } } }; @@ -480,6 +560,10 @@ impl Serializer { v } + /// Tries to serialize and directly write all incoming data packets + /// to network, will transition to 'slow' mode if `try_publish` fails. + /// Every few seconds pushes serializer metrics. Transitions to + /// `shutdown` mode if commanded to do so by control signals. async fn normal(&mut self) -> Result { let mut interval = interval(METRICS_INTERVAL); self.metrics.set_mode("normal"); @@ -503,18 +587,16 @@ impl Serializer { Err(MqttError::TrySend(Request::Publish(publish))) => return Ok(Status::SlowEventloop(publish, stream)), Err(e) => unreachable!("Unexpected error: {e}"), } - } // On a regular interval, forwards metrics information to network _ = interval.tick() => { // Check in storage stats every tick. TODO: Make storage object always // available. It can be inmemory storage - if let Err(e) = check_and_flush_metrics(&mut self.pending_metrics, &mut self.metrics, &self.metrics_tx, &self.storage_handler) { debug!("Failed to flush serializer metrics (normal). Error = {e}"); } } - // Transition into crash mode when uplink is shutting down + // Transition into shutdown mode when uplink is shutting down Ok(SerializerShutdown) = self.ctrl_rx.recv_async() => { return Ok(Status::Shutdown) } @@ -546,71 +628,16 @@ impl Serializer { } } -// Selects the right read buffer for storage and serializes received data as a Publish packet into it. -// Updates metrics regarding the serializer as well. -fn store_received_data( - data: Box, - storage_handler: &mut StorageHandler, - stream_metrics: &mut HashMap, - metrics: &mut Metrics, -) -> Result<(), Error> { - let stream = data.stream_config(); - let publish = construct_publish(data, stream_metrics)?; - let storage = storage_handler.select(&stream); - match write_to_storage(publish, storage) { - Ok(Some(deleted)) => { - debug!("Lost segment = {deleted}"); - metrics.increment_lost_segments(); - } - Ok(_) => {} - Err(e) => { - error!("Storage write error = {e}"); - metrics.increment_errors(); - } - }; - - // Update metrics - metrics.add_batch(); - - Ok(()) -} - -// Deserializes a Publish packet from the storage read buffer and updates metrics regarding the serializer. -fn next_publish( - storage_handler: &mut StorageHandler, - metrics: &mut Metrics, - max_packet_size: usize, -) -> Result, Publish)>, Error> { - let Some((stream, storage)) = storage_handler.next(metrics) else { - return Ok(None); - }; - - // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. - // This leads to force switching to normal mode. Increasing max_payload_size to bypass this - let publish = match Packet::read(storage.reader(), max_packet_size) { - Ok(Packet::Publish(publish)) => publish, - Ok(packet) => unreachable!("Unexpected packet: {:?}", packet), - Err(e) => { - error!("Failed to read from storage. Forcing into Normal mode. Error = {e}"); - return Ok(None); - } - }; - - metrics.add_batch(); - - Ok(Some((stream.clone(), publish))) -} - -async fn send_publish( - client: C, - topic: String, - payload: Bytes, -) -> Result { +// Used to construct a future that resolves if request is sent to the MQTT handler or errors +async fn send_publish(client: C, publish: Publish) -> Result { + let payload = Bytes::copy_from_slice(&publish.payload[..]); + let topic = publish.topic; debug!("publishing on {topic} with size = {}", payload.len()); client.publish(topic, QoS::AtLeastOnce, false, payload).await?; Ok(client) } +// Updates payload with compressed content fn lz4_compress(payload: &mut Vec) -> Result<(), Error> { let mut compressor = FrameEncoder::new(vec![]); compressor.write_all(payload)?; @@ -630,10 +657,8 @@ fn construct_publish( let batch_latency = data.latency(); trace!("Data received on stream: {stream_name}; message count = {point_count}; batching latency = {batch_latency}"); - let topic = stream_config.topic.clone(); - let metrics = stream_metrics - .entry(stream_name.clone()) + .entry(stream_name.to_owned()) .or_insert_with(|| StreamMetrics::new(&stream_name)); let serialization_start = Instant::now(); @@ -655,48 +680,18 @@ fn construct_publish( metrics.add_serialized_sizes(data_size, compressed_data_size); - Ok(Publish::new(topic, QoS::AtLeastOnce, payload)) -} - -// Writes the provided publish packet to [Storage], after setting its pkid to 1. -// If the write buffer is full, it is flushed/written onto disk based on config. -pub fn write_to_storage( - mut publish: Publish, - storage: &mut Storage, -) -> Result, storage::Error> { - publish.pkid = 1; - if let Err(e) = publish.write(storage.writer()) { - error!("Failed to fill disk buffer. Error = {e}"); - return Ok(None); - } - - let deleted = storage.flush_on_overflow()?; - Ok(deleted) + Ok(Publish::new(&stream_config.topic, QoS::AtLeastOnce, payload)) } +// Updates serializer metrics and logs it, but doesn't push to network fn check_metrics( metrics: &mut Metrics, stream_metrics: &mut HashMap, storage_handler: &StorageHandler, ) { use pretty_bytes::converter::convert; - let mut inmemory_write_size = 0; - let mut inmemory_read_size = 0; - let mut file_count = 0; - let mut disk_utilized = 0; - - for storage in storage_handler.map.values() { - inmemory_read_size += storage.inmemory_read_size(); - inmemory_write_size += storage.inmemory_write_size(); - file_count += storage.file_count(); - disk_utilized += storage.disk_utilized(); - } - - metrics.set_write_memory(inmemory_write_size); - metrics.set_read_memory(inmemory_read_size); - metrics.set_disk_files(file_count); - metrics.set_disk_utilized(disk_utilized); + storage_handler.update_metrics(metrics); info!( "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", metrics.mode, @@ -722,29 +717,14 @@ fn check_metrics( } } +// Updates serializer metrics and adds it into a queue to push later fn save_and_prepare_next_metrics( pending: &mut VecDeque, metrics: &mut Metrics, stream_metrics: &mut HashMap, storage_handler: &StorageHandler, ) { - let mut inmemory_write_size = 0; - let mut inmemory_read_size = 0; - let mut file_count = 0; - let mut disk_utilized = 0; - - for storage in storage_handler.map.values() { - inmemory_write_size += storage.inmemory_write_size(); - inmemory_read_size += storage.inmemory_read_size(); - file_count += storage.file_count(); - disk_utilized += storage.disk_utilized(); - } - - metrics.set_write_memory(inmemory_write_size); - metrics.set_read_memory(inmemory_read_size); - metrics.set_disk_files(file_count); - metrics.set_disk_utilized(disk_utilized); - + storage_handler.update_metrics(metrics); let m = Box::new(metrics.clone()); pending.push_back(SerializerMetrics::Main(m)); metrics.prepare_next(); @@ -757,7 +737,7 @@ fn save_and_prepare_next_metrics( } } -// // Enable actual metrics timers when there is data. This method is called every minute by the bridge +// Updates serializer metrics and pushes it directly to network fn check_and_flush_metrics( pending: &mut VecDeque, metrics: &mut Metrics, @@ -766,23 +746,7 @@ fn check_and_flush_metrics( ) -> Result<(), TrySendError> { use pretty_bytes::converter::convert; - let mut inmemory_write_size = 0; - let mut inmemory_read_size = 0; - let mut file_count = 0; - let mut disk_utilized = 0; - - for storage in storage_handler.map.values() { - inmemory_write_size += storage.inmemory_write_size(); - inmemory_read_size += storage.inmemory_read_size(); - file_count += storage.file_count(); - disk_utilized += storage.disk_utilized(); - } - - metrics.set_write_memory(inmemory_write_size); - metrics.set_read_memory(inmemory_read_size); - metrics.set_disk_files(file_count); - metrics.set_disk_utilized(disk_utilized); - + storage_handler.update_metrics(metrics); // Send pending metrics. This signifies state change while let Some(metrics) = pending.front() { match metrics { @@ -874,7 +838,7 @@ pub mod tests { panic!("No publishes found in storage"); } - match Packet::read(storage.reader(), max_packet_size) { + match Packet::read(storage.inner.reader(), max_packet_size) { Ok(Packet::Publish(publish)) => return publish, v => { panic!("Failed to read publish from storage. read: {:?}", v); @@ -942,18 +906,15 @@ pub mod tests { fn read_write_storage() { let config = Arc::new(default_config()); - let (serializer, _, _) = defaults(config); let mut storage = Storage::new("hello/world", 1024); - let mut publish = Publish::new( "hello/world", QoS::AtLeastOnce, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - write_to_storage(publish.clone(), &mut storage).unwrap(); + storage.write(publish.clone()).unwrap(); - let stored_publish = - read_from_storage(&mut storage, serializer.config.mqtt.max_packet_size); + let stored_publish = read_from_storage(&mut storage, config.mqtt.max_packet_size); // Ensure publish.pkid is 1, as written to disk publish.pkid = 1; @@ -1058,7 +1019,7 @@ pub mod tests { let (mut serializer, data_tx, net_rx) = defaults(config); - let mut storage = serializer + let storage = serializer .storage_handler .map .entry(Arc::new(Default::default())) @@ -1100,7 +1061,7 @@ pub mod tests { QoS::AtLeastOnce, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - write_to_storage(publish.clone(), &mut storage).unwrap(); + storage.write(publish.clone()).unwrap(); let status = serializer.catchup().await.unwrap(); assert_eq!(status, Status::Normal); @@ -1113,7 +1074,7 @@ pub mod tests { let (mut serializer, data_tx, _) = defaults(config); - let mut storage = serializer + let storage = serializer .storage_handler .map .entry(Arc::new(StreamConfig { @@ -1141,7 +1102,7 @@ pub mod tests { QoS::AtLeastOnce, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]".as_bytes(), ); - write_to_storage(publish.clone(), &mut storage).unwrap(); + storage.write(publish.clone()).unwrap(); match serializer.catchup().await.unwrap() { Status::EventLoopCrash(Publish { topic, payload, .. }, _) => { @@ -1189,7 +1150,7 @@ pub mod tests { payload: Bytes::from(i.to_string()), }; - let mut one = serializer + let one = serializer .storage_handler .map .entry(Arc::new(StreamConfig { @@ -1198,8 +1159,8 @@ pub mod tests { ..Default::default() })) .or_insert_with(|| unreachable!()); - write_to_storage(publish("topic/one".to_string(), 1), &mut one).unwrap(); - write_to_storage(publish("topic/one".to_string(), 10), &mut one).unwrap(); + one.write(publish("topic/one".to_string(), 1)).unwrap(); + one.write(publish("topic/one".to_string(), 10)).unwrap(); let top = serializer .storage_handler @@ -1210,8 +1171,8 @@ pub mod tests { ..Default::default() })) .or_insert_with(|| unreachable!()); - write_to_storage(publish("topic/top".to_string(), 100), top).unwrap(); - write_to_storage(publish("topic/top".to_string(), 1000), top).unwrap(); + top.write(publish("topic/top".to_string(), 100)).unwrap(); + top.write(publish("topic/top".to_string(), 1000)).unwrap(); let two = serializer .storage_handler @@ -1222,9 +1183,9 @@ pub mod tests { ..Default::default() })) .or_insert_with(|| unreachable!()); - write_to_storage(publish("topic/two".to_string(), 3), two).unwrap(); + two.write(publish("topic/two".to_string(), 3)).unwrap(); - let mut default = serializer + let default = serializer .storage_handler .map .entry(Arc::new(StreamConfig { @@ -1233,8 +1194,8 @@ pub mod tests { ..Default::default() })) .or_insert(Storage::new("topic/default", 1024)); - write_to_storage(publish("topic/default".to_string(), 0), &mut default).unwrap(); - write_to_storage(publish("topic/default".to_string(), 2), &mut default).unwrap(); + default.write(publish("topic/default".to_string(), 0)).unwrap(); + default.write(publish("topic/default".to_string(), 2)).unwrap(); // run serializer in the background spawn(async { serializer.start().await.unwrap() }); diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index e8838ba94..6e3c4e509 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -58,6 +58,7 @@ pub use base::actions::{Action, ActionResponse}; use base::bridge::{stream::Stream, Bridge, Package, Payload, Point, StreamMetrics}; use base::monitor::Monitor; use base::mqtt::Mqtt; +pub use base::serializer::Storage; use base::serializer::{Serializer, SerializerMetrics}; use base::CtrlTx; use collector::device_shadow::DeviceShadow; @@ -73,7 +74,6 @@ use collector::script_runner::ScriptRunner; use collector::systemstats::StatCollector; use collector::tunshell::TunshellClient; pub use collector::{simulator, tcpjson::TcpJson}; -pub use storage::Storage; /// Spawn a named thread to run the function f on pub fn spawn_named_thread(name: &str, f: F) diff --git a/uplink/tests/serializer.rs b/uplink/tests/serializer.rs index c55143fb7..0a171110d 100644 --- a/uplink/tests/serializer.rs +++ b/uplink/tests/serializer.rs @@ -1,21 +1,39 @@ -use std::{fs::create_dir_all, path::PathBuf, sync::Arc, time::Duration}; +use std::{fs::create_dir_all, path::PathBuf, sync::Arc, thread, time::Duration}; use bytes::Bytes; use flume::bounded; use rumqttc::{Publish, QoS, Request}; use tempdir::TempDir; -use tokio::spawn; +use tokio::{runtime::Runtime, spawn}; use uplink::{ - base::{ - bridge::Payload, - serializer::{write_to_storage, Serializer}, - }, + base::{bridge::Payload, serializer::Serializer}, config::{Config, Persistence, StreamConfig}, mock::{MockClient, MockCollector}, Storage, }; +fn publish(topic: String, i: u32) -> Publish { + Publish { + dup: false, + qos: QoS::AtMostOnce, + retain: false, + topic, + pkid: 0, + payload: { + let serialized = serde_json::to_vec(&vec![Payload { + stream: Default::default(), + sequence: i, + timestamp: 0, + payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}").unwrap(), + }]) + .unwrap(); + + Bytes::from(serialized) + }, + } +} + #[tokio::test] // Ensures that the data of streams are removed based on preference async fn preferential_send_on_network() { @@ -56,24 +74,6 @@ async fn preferential_send_on_network() { ), ]); - let publish = |topic: String, i: u32| Publish { - dup: false, - qos: QoS::AtMostOnce, - retain: false, - topic, - pkid: 0, - payload: { - let serialized = serde_json::to_vec(&vec![Payload { - stream: Default::default(), - sequence: i, - timestamp: 0, - payload: serde_json::from_str("{\"msg\": \"Hello, World!\"}").unwrap(), - }]) - .unwrap(); - - Bytes::from(serialized) - }, - }; let persistence_path = |path: &PathBuf, stream_name: &str| { let mut path = path.to_owned(); path.push(stream_name); @@ -85,19 +85,19 @@ async fn preferential_send_on_network() { // write packets for one, two and top onto disk let mut one = Storage::new("topic/one", 1024 * 1024); one.set_persistence(persistence_path(&config.persistence_path, "one"), 1).unwrap(); - write_to_storage(publish("topic/one".to_string(), 4), &mut one).unwrap(); - write_to_storage(publish("topic/one".to_string(), 5), &mut one).unwrap(); + one.write(publish("topic/one".to_string(), 4)).unwrap(); + one.write(publish("topic/one".to_string(), 5)).unwrap(); one.flush().unwrap(); let mut two = Storage::new("topic/two", 1024 * 1024); two.set_persistence(persistence_path(&config.persistence_path, "two"), 1).unwrap(); - write_to_storage(publish("topic/two".to_string(), 3), &mut two).unwrap(); + two.write(publish("topic/two".to_string(), 3)).unwrap(); two.flush().unwrap(); let mut top = Storage::new("topic/top", 1024 * 1024); top.set_persistence(persistence_path(&config.persistence_path, "top"), 1).unwrap(); - write_to_storage(publish("topic/top".to_string(), 1), &mut top).unwrap(); - write_to_storage(publish("topic/top".to_string(), 2), &mut top).unwrap(); + top.write(publish("topic/top".to_string(), 1)).unwrap(); + top.write(publish("topic/top".to_string(), 2)).unwrap(); top.flush().unwrap(); let config = Arc::new(config); @@ -167,3 +167,52 @@ async fn preferential_send_on_network() { assert_eq!(topic, "topic/default"); assert_eq!(payload, "[{\"sequence\":7,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); } + +#[tokio::test] +// Ensures that data pushed maintains FIFO order +async fn fifo_data_push() { + let mut config = Config::default(); + config.default_buf_size = 1024 * 1024; + config.mqtt.max_packet_size = 1024 * 1024; + let config = Arc::new(config); + let (data_tx, data_rx) = bounded(1); + let (net_tx, req_rx) = bounded(1); + let (metrics_tx, _metrics_rx) = bounded(1); + let client = MockClient { net_tx }; + let serializer = Serializer::new(config, data_rx, client, metrics_tx).unwrap(); + + // start serializer in the background + thread::spawn(|| _ = Runtime::new().unwrap().block_on(serializer.start())); + + spawn(async { + let mut default = MockCollector::new( + "default", + StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() }, + data_tx, + ); + for i in 0.. { + default.send(i).await.unwrap(); + } + }); + + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":0,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); + + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); + + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); +} From e657210be27d13b68289bdd26a5beba3a2067e17 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 14 Oct 2024 00:07:41 +0530 Subject: [PATCH 6/9] refactor: `Error::Done` demarcates end of storage --- storage/src/lib.rs | 34 +++++++++++++++++++++---------- uplink/src/base/serializer/mod.rs | 17 ++++++++-------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 412f29191..9272d78c4 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -18,6 +18,8 @@ pub enum Error { CorruptedFile, #[error("Empty write buffer")] NoWrites, + #[error("All backups have been consumed")] + Done, } pub struct Storage { @@ -129,17 +131,21 @@ impl Storage { /// the file after loading. If all the disk data is caught up, /// swaps current write buffer to current read buffer if there /// is pending data in memory write buffer. - /// Returns true if all the messages are caught up - pub fn reload_on_eof(&mut self) -> Result { + /// Returns Error::Done if all the messages are caught up + pub fn reload_on_eof(&mut self) -> Result<(), Error> { // Don't reload if there is data in current read file if self.current_read_file.has_remaining() { - return Ok(false); + return Ok(()); } let Some(persistence) = &mut self.persistence else { mem::swap(&mut self.current_read_file, &mut self.current_write_file); // If read buffer is 0 after swapping, all the data is caught up - return Ok(self.current_read_file.is_empty()); + if self.current_read_file.is_empty() { + return Err(Error::Done); + } + + return Ok(()); }; // Remove read file on completion in destructive-read mode @@ -155,7 +161,11 @@ impl Storage { if persistence.backlog_files.is_empty() { mem::swap(&mut self.current_read_file, &mut self.current_write_file); // If read buffer is 0 after swapping, all the data is caught up - return Ok(self.current_read_file.is_empty()); + if self.current_read_file.is_empty() { + return Err(Error::Done); + } + + return Ok(()); } if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) { @@ -164,7 +174,7 @@ impl Storage { return Err(e); } - Ok(false) + Ok(()) } } @@ -379,16 +389,18 @@ impl Persistence { Ok(NextFile { file: PersistenceFile::new(&self.path, file_name)?, deleted }) } - /// Load the next persistence file to be read into memory + /// Load the next persistence file to be read into memory, returns Error::Done if there is none left. fn load_next_read_file(&mut self, current_read_file: &mut BytesMut) -> Result<(), Error> { - // Len always > 0 because of above if. Doesn't panic - let id = self.backlog_files.pop_front().unwrap(); + let Some(id) = self.backlog_files.pop_front() else { + self.current_read_file_id.take(); + return Err(Error::Done); + }; let file_name = format!("backup@{id}"); let mut file = PersistenceFile::new(&self.path, file_name)?; // Load file into memory and store its id for deleting in the future file.read(current_read_file)?; - self.current_read_file_id = Some(id); + self.current_read_file_id.replace(id); Ok(()) } @@ -423,7 +435,7 @@ mod test { let mut publishes = vec![]; for _ in 0..n { // Done reading all the pending files - if storage.reload_on_eof().unwrap() { + if let Err(super::Error::Done) = storage.reload_on_eof() { break; } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 1853b5d4c..798bd862c 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -161,7 +161,7 @@ impl Storage { } // Ensures read-buffer is ready to be read from, exchanges buffers if required, returns true if empty. - pub fn reload_on_eof(&mut self) -> Result { + pub fn reload_on_eof(&mut self) -> Result<(), storage::Error> { self.inner.reload_on_eof() } @@ -250,13 +250,8 @@ impl StorageHandler { for (stream, storage) in storages { match storage.reload_on_eof() { - Ok(true) => { - if self.read_stream.take_if(|s| s == stream).is_some() { - debug!("Done reading from: {}", stream.topic); - } - } // Reading from a non-empty persisted stream - Ok(false) => { + Ok(_) => { if self.read_stream.is_none() { self.read_stream.replace(stream.to_owned()); debug!("Started reading from: {}", stream.topic); @@ -271,6 +266,12 @@ impl StorageHandler { return Some((stream.to_owned(), publish)); } + // All packets read from storage + Err(storage::Error::Done) => { + if self.read_stream.take_if(|s| s == stream).is_some() { + debug!("Done reading from: {}", stream.topic); + } + } // Reload again on encountering a corrupted file Err(e) => { metrics.increment_errors(); @@ -834,7 +835,7 @@ pub mod tests { use super::*; fn read_from_storage(storage: &mut Storage, max_packet_size: usize) -> Publish { - if storage.reload_on_eof().unwrap() { + if let Err(storage::Error::Done) = storage.reload_on_eof() { panic!("No publishes found in storage"); } From ab2ce8d54312cc9ee6f882d6bd7bfda315aecceb Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 13 Oct 2024 14:46:40 +0530 Subject: [PATCH 7/9] perf: avoid mem alloc --- storage/src/lib.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 9272d78c4..ecf0f7d30 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -194,8 +194,8 @@ fn id(path: &Path) -> Result { /// Gets list of file ids in the disk. Id of file backup@10 is 10. /// Storing ids instead of full paths enables efficient indexing -fn get_file_ids(path: &Path) -> Result, Error> { - let mut file_ids = Vec::new(); +fn get_file_ids(path: &Path, max_file_count: usize) -> Result, Error> { + let mut file_ids = Vec::with_capacity(max_file_count); let files = fs::read_dir(path)?; for file in files { let path = file?.path(); @@ -322,7 +322,7 @@ struct Persistence { impl Persistence { fn new>(path: P, max_file_count: usize) -> Result { let path = path.into(); - let backlog_files = get_file_ids(&path)?; + let backlog_files = get_file_ids(&path, max_file_count)?; info!("List of file ids loaded from disk: {backlog_files:?}"); let bytes_occupied = backlog_files.iter().fold(0, |acc, id| { @@ -462,7 +462,7 @@ mod test { assert_eq!(storage.writer().len(), 1036); // other messages on disk - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); } @@ -475,14 +475,14 @@ mod test { // 11 files created. 10 on disk write_n_publishes(&mut storage, 110); - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); // 11 files created. 10 on disk write_n_publishes(&mut storage, 10); assert_eq!(storage.writer().len(), 0); - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); } @@ -535,7 +535,7 @@ mod test { assert_eq!(storage.persistence.as_ref().unwrap().current_read_file_id, None); // Ensure unread files are all present before read - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); // Successfully read 10 files with files still in storage after 10 reads @@ -544,7 +544,7 @@ mod test { let file_id = storage.persistence.as_ref().unwrap().current_read_file_id.unwrap(); assert_eq!(file_id, i); // Ensure partially read file is still present in backup dir - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert!(files.contains(&i)); } @@ -553,7 +553,7 @@ mod test { assert_eq!(storage.persistence.as_ref().unwrap().current_read_file_id, None); // Ensure read files are all present before read - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![]); } @@ -574,14 +574,14 @@ mod test { assert_eq!(file_id, 0); // Ensure all persistance files still exist - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); // Write 10 more files onto disk, 10 publishes per file write_n_publishes(&mut storage, 100); // Ensure none of the earlier files exist on disk - let files = get_file_ids(&backup.path()).unwrap(); + let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); } } From 678a61609c26caba55823226133e6ac6a979565a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 13 Oct 2024 23:22:04 +0530 Subject: [PATCH 8/9] test: ensure packets in read_buffer aren't forgotten --- storage/src/lib.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index ecf0f7d30..25a138ab3 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -584,4 +584,35 @@ mod test { let files = get_file_ids(&backup.path(), 10).unwrap(); assert_eq!(files, vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); } + + #[test] + fn ensure_current_read_file_is_not_lost() { + let backup = init_backup_folders(); + let mut storage = Storage::new("test", 10 * 1036); + storage.set_persistence(backup.path(), 10).unwrap(); + // partially fill write buffer + write_n_publishes(&mut storage, 1); + + // Nothing written to disk + assert!(storage.persistence.as_ref().unwrap().backlog_files.is_empty()); + + // Trigger swap of read and write buffers, ensure packets in read buffer + storage.reload_on_eof().unwrap(); + assert!(!storage.current_read_file.is_empty()); + assert!(storage.persistence.as_ref().unwrap().current_read_file_id.is_none()); + + // Trigger flush onto disk, and drop storage + storage.flush().unwrap(); + drop(storage); + + // reload storage + let mut storage = Storage::new("test", 10 * 1036); + storage.set_persistence(backup.path(), 10).unwrap(); + + // verify read buffer was persisted by reading a single packet + read_n_publishes(&mut storage, 1); + assert_eq!(storage.file_count(), 1); + let file_id = storage.persistence.as_ref().unwrap().current_read_file_id.unwrap(); + assert_eq!(file_id, 0); + } } From ae325b59c2ea2f007247e1eeae34de91168c1cd5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 14 Oct 2024 16:26:33 +0530 Subject: [PATCH 9/9] feat: `Serializer` pushes latest data first if configured to do so (#365) * feat: Serializer pushes latest data first if configured * doc: example config explainig usecase * feat: allow option per-stream * style: don't warn, rm unnecessary handling --- configs/config.toml | 13 ++++++- uplink/src/base/serializer/mod.rs | 55 ++++++++++++++++++++++------ uplink/src/config.rs | 5 +++ uplink/tests/serializer.rs | 60 ++++++++++++++++++++++++++++--- 4 files changed, 118 insertions(+), 15 deletions(-) diff --git a/configs/config.toml b/configs/config.toml index 1e328f759..70d75393f 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -20,6 +20,11 @@ default_buf_size = 1024 # 1KB # Maximum number of data streams that can be accepted by uplink max_stream_count = 10 +# All streams will first push the latest packet before pushing historical data in +# FIFO order, defaults to false. This solves the problem of bad networks leading to +# data being pushed so slow that it is practically impossible to track the device. +default_live_data_first = true + # MQTT client configuration # # Required Parameters @@ -84,13 +89,19 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"] # used when there is a network/system failure. # - priority(optional, u8): Higher prioirity streams get to push their data # onto the network first. +# - live_data_first(optional, bool): All streams will first push the latest packet +# before pushing historical data in FIFO order, defaults to false. This solves the +# problem of bad networks leading to data being pushed so slow that it is practically +# impossible to track the device. # # In the following config for the device_shadow stream we set batch_size to 1 and mark -# it as non-persistent. streams are internally constructed as a map of Name -> Config +# it as non-persistent, also setting up live_data_first to enable quick delivery of stats. +# Streams are internally constructed as a map of Name -> Config [streams.device_shadow] topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray" flush_period = 5 priority = 75 +live_data_first = true # Example using compression [streams.imu] diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 798bd862c..c4acaa4cf 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -8,7 +8,7 @@ use std::{sync::Arc, time::Duration}; use bytes::Bytes; use flume::{bounded, Receiver, RecvError, Sender, TrySendError}; -use log::{debug, error, info, trace}; +use log::{debug, error, info, trace, warn}; use lz4_flex::frame::FrameEncoder; use rumqttc::*; use thiserror::Error; @@ -133,11 +133,17 @@ impl MqttClient for AsyncClient { pub struct Storage { inner: storage::Storage, + live_data_first: bool, + latest_data: Option, } impl Storage { - pub fn new(name: impl Into, max_file_size: usize) -> Self { - Self { inner: storage::Storage::new(name, max_file_size) } + pub fn new(name: impl Into, max_file_size: usize, live_data_first: bool) -> Self { + Self { + inner: storage::Storage::new(name, max_file_size), + live_data_first, + latest_data: None, + } } pub fn set_persistence( @@ -151,6 +157,13 @@ impl Storage { // Stores the provided publish packet by serializing it into storage, after setting its pkid to 1. // If the write buffer is full, it is flushed/written onto disk based on config. pub fn write(&mut self, mut publish: Publish) -> Result, storage::Error> { + if self.live_data_first { + let Some(previous) = self.latest_data.replace(publish) else { return Ok(None) }; + publish = previous; + } else if self.latest_data.is_some() { + warn!("Latest data should be unoccupied if not using the live data first scheme"); + } + publish.pkid = 1; if let Err(e) = publish.write(self.inner.writer()) { error!("Failed to fill disk buffer. Error = {e}"); @@ -170,6 +183,10 @@ impl Storage { // ## Panic // When any packet other than a publish is deserialized. pub fn read(&mut self, max_packet_size: usize) -> Option { + if let Some(publish) = self.latest_data.take() { + return Some(publish); + } + // TODO(RT): This can fail when packet sizes > max_payload_size in config are written to disk. // This leads to force switching to normal mode. Increasing max_payload_size to bypass this match Packet::read(self.inner.reader(), max_packet_size) { @@ -184,6 +201,15 @@ impl Storage { // Ensures all data is written into persistence, when configured. pub fn flush(&mut self) -> Result, storage::Error> { + // Write live cache to disk when flushing + if let Some(mut publish) = self.latest_data.take() { + publish.pkid = 1; + if let Err(e) = publish.write(self.inner.writer()) { + error!("Failed to fill disk buffer. Error = {e}"); + return Ok(None); + } + } + self.inner.flush() } } @@ -202,8 +228,11 @@ impl StorageHandler { // NOTE: persist action_status if not configured otherwise streams.insert("action_status".into(), config.action_status.clone()); for (stream_name, stream_config) in streams { - let mut storage = - Storage::new(&stream_config.topic, stream_config.persistence.max_file_size); + let mut storage = Storage::new( + &stream_config.topic, + stream_config.persistence.max_file_size, + stream_config.live_data_first, + ); if stream_config.persistence.max_file_count > 0 { let mut path = config.persistence_path.clone(); path.push(&stream_name); @@ -229,7 +258,13 @@ impl StorageHandler { match self .map .entry(stream.to_owned()) - .or_insert_with(|| Storage::new(&stream.topic, self.config.default_buf_size)) + .or_insert_with(|| { + Storage::new( + &stream.topic, + self.config.default_buf_size, + self.config.default_live_data_first, + ) + }) .write(publish) { Ok(Some(deleted)) => { @@ -907,7 +942,7 @@ pub mod tests { fn read_write_storage() { let config = Arc::new(default_config()); - let mut storage = Storage::new("hello/world", 1024); + let mut storage = Storage::new("hello/world", 1024, false); let mut publish = Publish::new( "hello/world", QoS::AtLeastOnce, @@ -1024,7 +1059,7 @@ pub mod tests { .storage_handler .map .entry(Arc::new(Default::default())) - .or_insert(Storage::new("hello/world", 1024)); + .or_insert(Storage::new("hello/world", 1024, false)); let (stream_name, stream_config) = ( "hello", @@ -1082,7 +1117,7 @@ pub mod tests { topic: "hello/world".to_string(), ..Default::default() })) - .or_insert(Storage::new("hello/world", 1024)); + .or_insert(Storage::new("hello/world", 1024, false)); let (stream_name, stream_config) = ( "hello", @@ -1194,7 +1229,7 @@ pub mod tests { priority: 0, ..Default::default() })) - .or_insert(Storage::new("topic/default", 1024)); + .or_insert(Storage::new("topic/default", 1024, false)); default.write(publish("topic/default".to_string(), 0)).unwrap(); default.write(publish("topic/default".to_string(), 2)).unwrap(); diff --git a/uplink/src/config.rs b/uplink/src/config.rs index b7d1aff68..0c1921b3b 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -93,6 +93,8 @@ pub struct StreamConfig { pub persistence: Persistence, #[serde(default)] pub priority: u8, + #[serde(default)] + pub live_data_first: bool, } impl Default for StreamConfig { @@ -104,6 +106,7 @@ impl Default for StreamConfig { compression: Compression::Disabled, persistence: Persistence::default(), priority: 0, + live_data_first: false, } } } @@ -536,6 +539,8 @@ pub struct Config { pub logging: Option, pub precondition_checks: Option, pub bus: Option, + #[serde(default)] + pub default_live_data_first: bool, } impl Config { diff --git a/uplink/tests/serializer.rs b/uplink/tests/serializer.rs index 0a171110d..b45999b79 100644 --- a/uplink/tests/serializer.rs +++ b/uplink/tests/serializer.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use flume::bounded; use rumqttc::{Publish, QoS, Request}; use tempdir::TempDir; -use tokio::{runtime::Runtime, spawn}; +use tokio::{runtime::Runtime, spawn, time::sleep}; use uplink::{ base::{bridge::Payload, serializer::Serializer}, @@ -83,18 +83,18 @@ async fn preferential_send_on_network() { }; // write packets for one, two and top onto disk - let mut one = Storage::new("topic/one", 1024 * 1024); + let mut one = Storage::new("topic/one", 1024 * 1024, false); one.set_persistence(persistence_path(&config.persistence_path, "one"), 1).unwrap(); one.write(publish("topic/one".to_string(), 4)).unwrap(); one.write(publish("topic/one".to_string(), 5)).unwrap(); one.flush().unwrap(); - let mut two = Storage::new("topic/two", 1024 * 1024); + let mut two = Storage::new("topic/two", 1024 * 1024, false); two.set_persistence(persistence_path(&config.persistence_path, "two"), 1).unwrap(); two.write(publish("topic/two".to_string(), 3)).unwrap(); two.flush().unwrap(); - let mut top = Storage::new("topic/top", 1024 * 1024); + let mut top = Storage::new("topic/top", 1024 * 1024, false); top.set_persistence(persistence_path(&config.persistence_path, "top"), 1).unwrap(); top.write(publish("topic/top".to_string(), 1)).unwrap(); top.write(publish("topic/top".to_string(), 2)).unwrap(); @@ -216,3 +216,55 @@ async fn fifo_data_push() { assert_eq!(topic, "topic/default"); assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); } + +#[tokio::test] +// Ensures that live data is pushed first if configured to do so +async fn prefer_live_data() { + let mut config = Config::default(); + config.default_buf_size = 1024 * 1024; + config.mqtt.max_packet_size = 1024 * 1024; + config.default_live_data_first = true; + let config = Arc::new(config); + let (data_tx, data_rx) = bounded(0); + let (net_tx, req_rx) = bounded(0); + let (metrics_tx, _metrics_rx) = bounded(1); + let client = MockClient { net_tx }; + let serializer = Serializer::new(config, data_rx, client, metrics_tx).unwrap(); + + // start serializer in the background + thread::spawn(|| _ = Runtime::new().unwrap().block_on(serializer.start())); + + spawn(async { + let mut default = MockCollector::new( + "default", + StreamConfig { topic: "topic/default".to_owned(), batch_size: 1, ..Default::default() }, + data_tx, + ); + for i in 0.. { + default.send(i).await.unwrap(); + sleep(Duration::from_millis(250)).await; + } + }); + + sleep(Duration::from_millis(750)).await; + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":0,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); + + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":2,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); + + let Request::Publish(Publish { topic, payload, .. }) = req_rx.recv_async().await.unwrap() + else { + unreachable!() + }; + assert_eq!(topic, "topic/default"); + assert_eq!(payload, "[{\"sequence\":1,\"timestamp\":0,\"msg\":\"Hello, World!\"}]"); +}