diff --git a/src/lib/drivers/tlog/reader.rs b/src/lib/drivers/tlog/reader.rs index 8e91475..d82e5d0 100644 --- a/src/lib/drivers/tlog/reader.rs +++ b/src/lib/drivers/tlog/reader.rs @@ -45,6 +45,12 @@ impl TlogReaderBuilder { impl TlogReader { #[instrument(level = "debug")] pub fn builder(name: &str, path: PathBuf) -> TlogReaderBuilder { + let path = std::fs::canonicalize(&path) + .inspect_err(|_| { + warn!("Failed canonicalizing path: {path:?}, using the non-canonized instead.") + }) + .unwrap_or(path); + let name = Arc::new(name.to_string()); let path_str = path.clone().display().to_string(); @@ -207,12 +213,8 @@ impl DriverInfo for TlogReaderInfo { legacy_entry: crate::drivers::DriverDescriptionLegacy, ) -> Result { let scheme = self.default_scheme().context("No default scheme")?; - let path = legacy_entry.arg1; - std::fs::metadata(&path).context("Failed to get metadata for file")?; - // Get absolute path of file - let path = std::fs::canonicalize(&path).context("Failed to get absolute path for file")?; - - let path_string = path.to_str().context("Failed to convert path to string")?; + let path_string = legacy_entry.arg1; + std::fs::metadata(&path_string).context("Failed to get metadata for file")?; if let Some(arg2) = legacy_entry.arg2 { warn!("Ignoring extra argument: {arg2:?}"); diff --git a/src/lib/drivers/tlog/writer.rs b/src/lib/drivers/tlog/writer.rs index 835a85b..1f62a49 100644 --- a/src/lib/drivers/tlog/writer.rs +++ b/src/lib/drivers/tlog/writer.rs @@ -1,6 +1,6 @@ use std::{path::PathBuf, sync::Arc}; -use anyhow::Result; +use anyhow::{Context, Result}; use tokio::{ io::{AsyncWriteExt, BufWriter}, sync::{broadcast, RwLock}, @@ -20,12 +20,26 @@ use crate::{ #[derive(Debug)] pub struct TlogWriter { pub path: PathBuf, + file_creation_condition: FileCreationCondition, name: arc_swap::ArcSwap, uuid: DriverUuid, on_message_output: Callbacks>, stats: Arc>, } +#[derive(Debug, strum_macros::EnumString)] +#[strum(serialize_all = "snake_case")] +pub enum FileCreationCondition { + OnArm(ExpectedOrigin), + Always, +} + +#[derive(Debug, Default)] +pub struct ExpectedOrigin { + pub system_id: RwLock>, + pub component_id: u8, +} + pub struct TlogWriterBuilder(TlogWriter); impl TlogWriterBuilder { @@ -44,13 +58,24 @@ impl TlogWriterBuilder { impl TlogWriter { #[instrument(level = "debug")] - pub fn builder(name: &str, path: PathBuf) -> TlogWriterBuilder { + pub fn builder( + name: &str, + path: PathBuf, + file_creation_condition: FileCreationCondition, + ) -> TlogWriterBuilder { + let path = std::fs::canonicalize(&path) + .inspect_err(|_| { + warn!("Failed canonicalizing path: {path:?}, using the non-canonized instead.") + }) + .unwrap_or(path); + let name = Arc::new(name.to_string()); let path_str = path.clone().display().to_string(); TlogWriterBuilder(Self { path, name: arc_swap::ArcSwap::new(name.clone()), + file_creation_condition, uuid: Self::generate_uuid(&path_str), on_message_output: Callbacks::default(), stats: Arc::new(RwLock::new(AccumulatedDriverStats::new( @@ -88,6 +113,29 @@ impl TlogWriter { writer.write_all(×tamp.to_be_bytes()).await?; writer.write_all(raw_bytes).await?; writer.flush().await?; + + if let FileCreationCondition::OnArm(ExpectedOrigin { + system_id, + component_id, + }) = &self.file_creation_condition + { + let system_id = system_id.read().await.expect( + "System ID should always be Some at this point because it was replaced when armed, which is the condition to reach this part", + ); + + if *message.system_id() != system_id + || message.component_id() != component_id + { + continue; + } + + if let Some(ArmState::Disarmed) = check_arm_state(&message) { + debug!( + "Vehicle disarmed, finishing tlog file writer until next arm..." + ); + break; + } + } } Err(error) => { error!("Failed to receive message from hub: {error:?}"); @@ -105,11 +153,56 @@ impl TlogWriter { impl Driver for TlogWriter { #[instrument(level = "debug", skip(self, hub_sender))] async fn run(&self, hub_sender: broadcast::Sender>) -> Result<()> { - let file = tokio::fs::File::create(self.path.clone()).await?; - let writer = tokio::io::BufWriter::with_capacity(1024, file); - let hub_receiver = hub_sender.subscribe(); + let mut armed = false; + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + let mut first = true; + loop { + if first { + first = false; + } else { + interval.tick().await; + } - TlogWriter::handle_client(self, writer, hub_receiver).await + if !armed { + if let FileCreationCondition::OnArm(expected_origin) = &self.file_creation_condition + { + debug!( + "TlogWriter waiting for its arm condition as {:?}", + self.file_creation_condition + ); + let hub_receiver = hub_sender.subscribe(); + if let Err(error) = wait_for_arm(hub_receiver, expected_origin).await { + warn!("Failed waiting for arm: {error:?}"); + continue; + } + debug!( + "TlogWriter has reached its arm condition as {:?}", + self.file_creation_condition + ); + armed = true; + } + } + + let file = match create_tlog_file(self.path.clone()).await { + Ok(file) => file, + Err(error) => { + warn!("Failed creating tlog file: {error:?}"); + continue; + } + }; + + debug!("Writing to tlog file: {file:?}"); + + let writer = tokio::io::BufWriter::with_capacity(1024, file); + let hub_receiver = hub_sender.subscribe(); + + if let Err(error) = TlogWriter::handle_client(self, writer, hub_receiver).await { + debug!("TlogWriter client ended with an error: {error:?}"); + } + + armed = false; + } } #[instrument(level = "debug", skip(self))] @@ -151,7 +244,11 @@ impl DriverInfo for TlogWriterInfo { let first_schema = &self.valid_schemes()[0]; vec![ format!("{first_schema}:"), + format!("{first_schema}:"), format!("{first_schema}:/tmp/potato.tlog"), + format!("{first_schema}:/tmp/tlogs_output_dir/"), + format!("{first_schema}:/?when=always"), + format!("{first_schema}:/?when=while_armed"), ] } @@ -159,15 +256,180 @@ impl DriverInfo for TlogWriterInfo { let first_schema = &self.valid_schemes()[0]; vec![ format!("{first_schema}://").to_string(), + format!("{first_schema}://").to_string(), url::Url::parse(&format!("{first_schema}:///tmp/potato.tlog")) .unwrap() .to_string(), + url::Url::parse(&format!("{first_schema}:///tmp/tlogs_output_dir/")) + .unwrap() + .to_string(), + url::Url::parse(&format!( + "{first_schema}:///tmp/tlogs_output_dir/?when=while_armed" + )) + .unwrap() + .to_string(), ] } fn create_endpoint_from_url(&self, url: &url::Url) -> Option> { + let expected_system_id = url.query_pairs().find_map(|(key, value)| { + if key != "system_id" { + return None; + } + value.parse::().ok() + }); + + const MAV_COMP_ID_AUTOPILOT1: u8 = 1; // From: https://mavlink.io/en/messages/minimal.html#MAV_COMPONENT + let expected_component_id = url + .query_pairs() + .find_map(|(key, value)| { + if key != "component_id" { + return None; + } + value.parse::().ok() + }) + .unwrap_or(MAV_COMP_ID_AUTOPILOT1); + + let file_creation_condition = url + .query_pairs() + .find_map(|(key, value)| { + if key != "when" { + return None; + } + let mut file_creation_condition = + value.parse().map(FileCreationCondition::from).ok(); + + if let Some(FileCreationCondition::OnArm(_)) = file_creation_condition { + file_creation_condition.replace(FileCreationCondition::OnArm(ExpectedOrigin { + system_id: RwLock::new(expected_system_id), + component_id: expected_component_id, + })); + } + + file_creation_condition + }) + .unwrap_or_else(|| FileCreationCondition::Always); + Some(Arc::new( - TlogWriter::builder("TlogWriter", url.path().into()).build(), + TlogWriter::builder("TlogWriter", url.path().into(), file_creation_condition).build(), )) } } + +async fn create_tlog_file(path: PathBuf) -> Result { + let file_path: PathBuf = if path.extension().and_then(|ext| ext.to_str()) == Some("tlog") { + path.clone() + } else { + if !std::path::Path::new(&path).exists() { + tokio::fs::create_dir_all(&path).await?; + } + + let sequence = get_sequence(&path).await.unwrap_or_default(); + let timestamp = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S"); + let file_name = format!("{sequence:05}-{timestamp}.tlog"); + + let mut file_path = path.clone(); + file_path.push(file_name); + file_path + }; + + tokio::fs::File::create(file_path) + .await + .map_err(anyhow::Error::msg) +} + +async fn get_sequence(path: &PathBuf) -> Result { + let re = regex::Regex::new(r"^(\d{5})-\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}\.tlog$") + .expect("Failed to compile regex"); + + let mut max_seq: u32 = 0; + let mut files_in_dir = tokio::fs::read_dir(&path).await?; + + while let Some(entry) = files_in_dir.next_entry().await? { + let entry_path = entry.path(); + + if let Some(sequence) = entry_path + .file_name() + .and_then(|name| name.to_str()) + .and_then(|file_name| re.captures(file_name)) + .and_then(|captures| captures.get(1)) + .and_then(|seq_match| seq_match.as_str().parse::().ok()) + { + max_seq = max_seq.max(sequence); + } + } + + Ok(max_seq + 1) +} + +async fn wait_for_arm( + mut hub_receiver: broadcast::Receiver>, + ExpectedOrigin { + system_id, + component_id, + }: &ExpectedOrigin, +) -> Result<()> { + loop { + let message = hub_receiver.recv().await?; + + if message.component_id() == component_id + && matches!(check_arm_state(&message), Some(ArmState::Armed)) + { + debug!( + "Received arm from system {:?}. Current: {system_id:?}", + message.system_id() + ); + + let current_system_id = system_id.read().await.clone(); + + let system_id = match current_system_id { + Some(system_id) => system_id, + None => { + system_id + .write() + .await + .replace(*message.system_id()) + .context("This should always be None") + .expect_err("This should never be Ok"); + + debug!("Expected System ID updated to {system_id:?}"); + + *message.system_id() + } + }; + + if *message.system_id() == system_id { + break; + } + } + } + + Ok(()) +} + +enum ArmState { + Disarmed, + Armed, +} + +/// A performant way of checking if the vehicle is armed without parsing the message into a heartbeat message type (from Mavlink crate) +fn check_arm_state(message: &Arc) -> Option { + use mavlink::MessageData; + + if message.message_id() != mavlink::common::HEARTBEAT_DATA::ID { + return None; + } + + const BASE_MODE_BYTE: usize = 6; // From: https://mavlink.io/en/messages/common.html#HEARTBEAT + + let base_mode = message + .payload() + .get(BASE_MODE_BYTE) + .cloned() + .unwrap_or_else(|| mavlink::common::MavModeFlag::empty().bits()); + + match base_mode & mavlink::common::MavModeFlag::MAV_MODE_FLAG_SAFETY_ARMED.bits() { + 0 => Some(ArmState::Disarmed), + _ => Some(ArmState::Armed), + } +}