Skip to content

Commit

Permalink
Merge branch 'main' into events
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi authored Oct 14, 2024
2 parents de012c6 + ae325b5 commit a282bee
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 309 deletions.
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:alpine as builder
FROM rust:alpine AS builder

RUN apk add build-base openssl-dev
WORKDIR "/usr/share/bytebeam/uplink"
Expand Down Expand Up @@ -28,4 +28,3 @@ CMD ["/usr/bin/runsvdir", "/etc/runit"]

COPY paths/ paths
COPY simulator.sh .

13 changes: 12 additions & 1 deletion configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ default_buf_size = 1024 # 1KB
# Maximum number of data streams that can be accepted by uplink
max_stream_count = 10

# All streams will first push the latest packet before pushing historical data in
# FIFO order, defaults to false. This solves the problem of bad networks leading to
# data being pushed so slow that it is practically impossible to track the device.
default_live_data_first = true

# MQTT client configuration
#
# Required Parameters
Expand Down Expand Up @@ -84,13 +89,19 @@ blacklist = ["cancollector_metrics", "candump_metrics", "pinger"]
# used when there is a network/system failure.
# - priority(optional, u8): Higher prioirity streams get to push their data
# onto the network first.
# - live_data_first(optional, bool): All streams will first push the latest packet
# before pushing historical data in FIFO order, defaults to false. This solves the
# problem of bad networks leading to data being pushed so slow that it is practically
# impossible to track the device.
#
# In the following config for the device_shadow stream we set batch_size to 1 and mark
# it as non-persistent. streams are internally constructed as a map of Name -> Config
# it as non-persistent, also setting up live_data_first to enable quick delivery of stats.
# Streams are internally constructed as a map of Name -> Config
[streams.device_shadow]
topic = "/tenants/{tenant_id}/devices/{device_id}/events/device_shadow/jsonarray"
flush_period = 5
priority = 75
live_data_first = true

# Example using compression
[streams.imu]
Expand Down
6 changes: 6 additions & 0 deletions runit/simulator/finish
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/sh

cd /usr/share/bytebeam/uplink

exec ./simulator.sh kill_devices
echo "Killed all simulated devices"
File renamed without changes.
125 changes: 84 additions & 41 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum Error {
CorruptedFile,
#[error("Empty write buffer")]
NoWrites,
#[error("All backups have been consumed")]
Done,
}

pub struct Storage {
Expand Down Expand Up @@ -129,42 +131,50 @@ impl Storage {
/// the file after loading. If all the disk data is caught up,
/// swaps current write buffer to current read buffer if there
/// is pending data in memory write buffer.
/// Returns true if all the messages are caught up
pub fn reload_on_eof(&mut self) -> Result<bool, Error> {
/// Returns Error::Done if all the messages are caught up
pub fn reload_on_eof(&mut self) -> Result<(), Error> {
// Don't reload if there is data in current read file
if self.current_read_file.has_remaining() {
return Ok(false);
return Ok(());
}

if let Some(persistence) = &mut self.persistence {
// Remove read file on completion in destructive-read mode
let read_is_destructive = !persistence.non_destructive_read;
let read_file_id = persistence.current_read_file_id.take();
if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() {
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
let Some(persistence) = &mut self.persistence else {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
}

// Swap read buffer with write buffer to read data in inmemory write
// buffer when all the backlog disk files are done
if persistence.backlog_files.is_empty() {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
return Ok(self.current_read_file.is_empty());
}
return Ok(());
};

if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) {
self.current_read_file.clear();
persistence.current_read_file_id.take();
return Err(e);
}
// Remove read file on completion in destructive-read mode
let read_is_destructive = !persistence.non_destructive_read;
let read_file_id = persistence.current_read_file_id.take();
if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() {
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

Ok(false)
} else {
// Swap read buffer with write buffer to read data in inmemory write
// buffer when all the backlog disk files are done
if persistence.backlog_files.is_empty() {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
Ok(self.current_read_file.is_empty())
if self.current_read_file.is_empty() {
return Err(Error::Done);
}

return Ok(());
}

if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) {
self.current_read_file.clear();
persistence.current_read_file_id.take();
return Err(e);
}

Ok(())
}
}

Expand All @@ -184,8 +194,8 @@ fn id(path: &Path) -> Result<u64, Error> {

/// Gets list of file ids in the disk. Id of file backup@10 is 10.
/// Storing ids instead of full paths enables efficient indexing
fn get_file_ids(path: &Path) -> Result<VecDeque<u64>, Error> {
let mut file_ids = Vec::new();
fn get_file_ids(path: &Path, max_file_count: usize) -> Result<VecDeque<u64>, Error> {
let mut file_ids = Vec::with_capacity(max_file_count);
let files = fs::read_dir(path)?;
for file in files {
let path = file?.path();
Expand Down Expand Up @@ -312,7 +322,7 @@ struct Persistence {
impl Persistence {
fn new<P: Into<PathBuf>>(path: P, max_file_count: usize) -> Result<Self, Error> {
let path = path.into();
let backlog_files = get_file_ids(&path)?;
let backlog_files = get_file_ids(&path, max_file_count)?;
info!("List of file ids loaded from disk: {backlog_files:?}");

let bytes_occupied = backlog_files.iter().fold(0, |acc, id| {
Expand Down Expand Up @@ -379,16 +389,18 @@ impl Persistence {
Ok(NextFile { file: PersistenceFile::new(&self.path, file_name)?, deleted })
}

/// Load the next persistence file to be read into memory
/// Load the next persistence file to be read into memory, returns Error::Done if there is none left.
fn load_next_read_file(&mut self, current_read_file: &mut BytesMut) -> Result<(), Error> {
// Len always > 0 because of above if. Doesn't panic
let id = self.backlog_files.pop_front().unwrap();
let Some(id) = self.backlog_files.pop_front() else {
self.current_read_file_id.take();
return Err(Error::Done);
};
let file_name = format!("backup@{id}");
let mut file = PersistenceFile::new(&self.path, file_name)?;

// Load file into memory and store its id for deleting in the future
file.read(current_read_file)?;
self.current_read_file_id = Some(id);
self.current_read_file_id.replace(id);

Ok(())
}
Expand Down Expand Up @@ -423,7 +435,7 @@ mod test {
let mut publishes = vec![];
for _ in 0..n {
// Done reading all the pending files
if storage.reload_on_eof().unwrap() {
if let Err(super::Error::Done) = storage.reload_on_eof() {
break;
}

Expand All @@ -450,7 +462,7 @@ mod test {
assert_eq!(storage.writer().len(), 1036);

// other messages on disk
let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
}

Expand All @@ -463,14 +475,14 @@ mod test {
// 11 files created. 10 on disk
write_n_publishes(&mut storage, 110);

let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// 11 files created. 10 on disk
write_n_publishes(&mut storage, 10);

assert_eq!(storage.writer().len(), 0);
let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
}

Expand Down Expand Up @@ -523,7 +535,7 @@ mod test {
assert_eq!(storage.persistence.as_ref().unwrap().current_read_file_id, None);

// Ensure unread files are all present before read
let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

// Successfully read 10 files with files still in storage after 10 reads
Expand All @@ -532,7 +544,7 @@ mod test {
let file_id = storage.persistence.as_ref().unwrap().current_read_file_id.unwrap();
assert_eq!(file_id, i);
// Ensure partially read file is still present in backup dir
let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert!(files.contains(&i));
}

Expand All @@ -541,7 +553,7 @@ mod test {
assert_eq!(storage.persistence.as_ref().unwrap().current_read_file_id, None);

// Ensure read files are all present before read
let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![]);
}

Expand All @@ -562,14 +574,45 @@ mod test {
assert_eq!(file_id, 0);

// Ensure all persistance files still exist
let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

// Write 10 more files onto disk, 10 publishes per file
write_n_publishes(&mut storage, 100);

// Ensure none of the earlier files exist on disk
let files = get_file_ids(&backup.path()).unwrap();
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
}

#[test]
fn ensure_current_read_file_is_not_lost() {
let backup = init_backup_folders();
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();
// partially fill write buffer
write_n_publishes(&mut storage, 1);

// Nothing written to disk
assert!(storage.persistence.as_ref().unwrap().backlog_files.is_empty());

// Trigger swap of read and write buffers, ensure packets in read buffer
storage.reload_on_eof().unwrap();
assert!(!storage.current_read_file.is_empty());
assert!(storage.persistence.as_ref().unwrap().current_read_file_id.is_none());

// Trigger flush onto disk, and drop storage
storage.flush().unwrap();
drop(storage);

// reload storage
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();

// verify read buffer was persisted by reading a single packet
read_n_publishes(&mut storage, 1);
assert_eq!(storage.file_count(), 1);
let file_id = storage.persistence.as_ref().unwrap().current_read_file_id.unwrap();
assert_eq!(file_id, 0);
}
}
4 changes: 3 additions & 1 deletion uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ impl ActionsBridge {
pub async fn start(&mut self) -> Result<(), Error> {
let mut metrics_timeout = interval(self.config.stream_metrics.timeout);
let mut end: Pin<Box<Sleep>> = Box::pin(time::sleep(Duration::from_secs(u64::MAX)));
self.load_saved_action()?;
if let Err(e) = self.load_saved_action() {
error!("Couldn't load saved action: {e}");
}

loop {
select! {
Expand Down
Loading

0 comments on commit a282bee

Please sign in to comment.