From 1f1d826abe9895400e2a2c85c880c221fe84f0be Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Mon, 9 Dec 2024 17:23:51 +0800 Subject: [PATCH] Refining merge Signed-off-by: Michael X. Grey --- rclrs/src/client.rs | 2 +- rclrs/src/executor.rs | 25 ++++--- rclrs/src/executor/basic_executor.rs | 23 ++++--- rclrs/src/node.rs | 8 +-- rclrs/src/parameter/service.rs | 90 +++++++++++++++++-------- rclrs/src/service.rs | 4 +- rclrs/src/test_helpers/graph_helpers.rs | 1 - rclrs/src/wait_set.rs | 4 +- rclrs/src/wait_set/wait_set_runner.rs | 38 +++-------- 9 files changed, 107 insertions(+), 88 deletions(-) diff --git a/rclrs/src/client.rs b/rclrs/src/client.rs index 72de8cfe1..15baf57f6 100644 --- a/rclrs/src/client.rs +++ b/rclrs/src/client.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, ffi::CString, - sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard}, + sync::{Arc, Mutex, MutexGuard}, }; use rosidl_runtime_rs::Message; diff --git a/rclrs/src/executor.rs b/rclrs/src/executor.rs index 07b1c90bb..af2cfe632 100644 --- a/rclrs/src/executor.rs +++ b/rclrs/src/executor.rs @@ -46,9 +46,9 @@ impl Executor { /// [`SpinOptions`] can be used to automatically stop the spinning when /// certain conditions are met. Use `SpinOptions::default()` to allow the /// Executor to keep spinning indefinitely. - pub fn spin(&mut self, options: SpinOptions) { + pub fn spin(&mut self, options: SpinOptions) -> Result<(), RclrsError> { let conditions = self.make_spin_conditions(options); - self.runtime.spin(conditions); + self.runtime.spin(conditions) } /// Spin the Executor as an async task. This does not block the current thread. @@ -60,7 +60,7 @@ impl Executor { /// The async task will run until the [`SpinConditions`] stop the Executor /// from spinning. The output of the async task will be the restored Executor, /// which you can use to resume spinning after the task is finished. - pub async fn spin_async(self, options: SpinOptions) -> Self { + pub async fn spin_async(self, options: SpinOptions) -> (Self, Result<(), RclrsError>) { let conditions = self.make_spin_conditions(options); let Self { context, @@ -68,12 +68,15 @@ impl Executor { runtime, } = self; - let runtime = runtime.spin_async(conditions).await; - Self { - context, - commands, - runtime, - } + let (runtime, result) = runtime.spin_async(conditions).await; + ( + Self { + context, + commands, + runtime, + }, + result, + ) } /// Creates a new executor using the provided runtime. Users of rclrs should @@ -231,7 +234,7 @@ pub trait ExecutorRuntime: Send { /// Tell the runtime to spin while blocking any further execution until the /// spinning is complete. - fn spin(&mut self, conditions: SpinConditions); + fn spin(&mut self, conditions: SpinConditions) -> Result<(), RclrsError>; /// Tell the runtime to spin asynchronously, not blocking the current /// thread. The runtime instance will be consumed by this function, but it @@ -240,7 +243,7 @@ pub trait ExecutorRuntime: Send { fn spin_async( self: Box, conditions: SpinConditions, - ) -> BoxFuture<'static, Box>; + ) -> BoxFuture<'static, (Box, Result<(), RclrsError>)>; } /// A bundle of optional conditions that a user may want to impose on how long diff --git a/rclrs/src/executor/basic_executor.rs b/rclrs/src/executor/basic_executor.rs index a314ff804..1ee2c0ad7 100644 --- a/rclrs/src/executor/basic_executor.rs +++ b/rclrs/src/executor/basic_executor.rs @@ -14,7 +14,7 @@ use std::{ use crate::{ executor::{ExecutorChannel, ExecutorRuntime, SpinConditions}, - Context, WaitSetRunner, Waitable, + Context, RclrsError, WaitSetRunner, Waitable, }; /// The implementation of this runtime is based off of the async Rust reference book: @@ -40,7 +40,7 @@ pub struct BasicExecutorRuntime { } impl ExecutorRuntime for BasicExecutorRuntime { - fn spin(&mut self, mut conditions: SpinConditions) { + fn spin(&mut self, mut conditions: SpinConditions) -> Result<(), RclrsError> { self.process_spin_conditions(&mut conditions); let wait_set_runner = self.wait_set_runner.take().expect( @@ -92,19 +92,20 @@ impl ExecutorRuntime for BasicExecutorRuntime { } } - self.wait_set_runner = Some( - wait_set_receiver.recv().expect( - "Basic executor failed to receive the WaitSetRunner at the end of its spinning. \ - This is a critical bug in rclrs. \ - Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem." - ) + let (runner, result) = wait_set_receiver.recv().expect( + "Basic executor failed to receive the WaitSetRunner at the end of its spinning. \ + This is a critical bug in rclrs. \ + Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem." ); + + self.wait_set_runner = Some(runner); + result } fn spin_async( mut self: Box, conditions: SpinConditions, - ) -> BoxFuture<'static, Box> { + ) -> BoxFuture<'static, (Box, Result<(), RclrsError>)> { let (sender, receiver) = oneshot::channel(); // Create a thread to run the executor. We should not run the executor // as an async task because it blocks its current thread while running. @@ -117,8 +118,8 @@ impl ExecutorRuntime for BasicExecutorRuntime { // executor. But that would probably require us to introduce a new // dependency such as tokio. std::thread::spawn(move || { - self.spin(conditions); - sender.send(self as Box).ok(); + let result = self.spin(conditions); + sender.send((self as Box, result)).ok(); }); Box::pin(async move { diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index f544575d6..8be556458 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -31,10 +31,10 @@ use rosidl_runtime_rs::Message; use crate::{ rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands, LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise, - Publisher, PublisherOptions, PublisherState, QoSProfile, RclrsError, Service, - ServiceAsyncCallback, ServiceCallback, ServiceOptions, ServiceState, Subscription, - SubscriptionAsyncCallback, SubscriptionCallback, SubscriptionOptions, SubscriptionState, - TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX, + Publisher, PublisherOptions, PublisherState, RclrsError, Service, ServiceAsyncCallback, + ServiceCallback, ServiceOptions, ServiceState, Subscription, SubscriptionAsyncCallback, + SubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams, + ENTITY_LIFECYCLE_MUTEX, }; /// A processing unit that can communicate with other nodes. diff --git a/rclrs/src/parameter/service.rs b/rclrs/src/parameter/service.rs index 3bffa5cf7..304f78f98 100644 --- a/rclrs/src/parameter/service.rs +++ b/rclrs/src/parameter/service.rs @@ -9,7 +9,7 @@ use rosidl_runtime_rs::Sequence; use super::ParameterMap; use crate::{ parameter::{DeclaredValue, ParameterKind, ParameterStorage}, - rmw_request_id_t, IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service, + IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service, }; // The variables only exist to keep a strong reference to the services and are technically unused. @@ -437,11 +437,13 @@ mod tests { !not_finished }); - executor.spin( - SpinOptions::new() - .until_promise_resolved(promise) - .timeout(Duration::from_secs(1)), - ); + executor + .spin( + SpinOptions::new() + .until_promise_resolved(promise) + .timeout(Duration::from_secs(1)), + ) + .unwrap(); Ok(()) } @@ -453,11 +455,13 @@ mod tests { client_node.create_client::("/list/node/list_parameters")?; // return Ok(()); - executor.spin( - SpinOptions::default() - .until_promise_resolved(list_client.notify_on_service_ready()) - .timeout(Duration::from_secs(2)), - ); + executor + .spin( + SpinOptions::default() + .until_promise_resolved(list_client.notify_on_service_ready()) + .timeout(Duration::from_secs(2)), + ) + .unwrap(); // List all parameters let callback_ran = Arc::new(AtomicBool::new(false)); @@ -484,11 +488,13 @@ mod tests { }) .unwrap(); - executor.spin( - SpinOptions::default() - .until_promise_resolved(promise) - .timeout(Duration::from_secs(5)), - ); + executor + .spin( + SpinOptions::default() + .until_promise_resolved(promise) + .timeout(Duration::from_secs(5)), + ) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // Limit depth, namespaced parameter is not returned @@ -508,7 +514,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // Filter by prefix, just return the requested one with the right prefix @@ -529,7 +537,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // If prefix is equal to names, parameters should be returned @@ -550,7 +560,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); Ok(()) @@ -578,7 +590,9 @@ mod tests { let clients_ready = client_node .notify_on_graph_change_with_period(Duration::from_millis(1), clients_ready_condition); - executor.spin(SpinOptions::default().until_promise_resolved(clients_ready)); + executor + .spin(SpinOptions::default().until_promise_resolved(clients_ready)) + .unwrap(); // Get an existing parameter let callback_ran = Arc::new(AtomicBool::new(false)); @@ -596,7 +610,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // Getting both existing and non existing parameters, missing one should return @@ -617,7 +633,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // Set a mix of existing, non existing, dynamic and out of range parameters @@ -717,7 +735,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // Set the node to use undeclared parameters and try to set one @@ -746,7 +766,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // With set_parameters_atomically, if one fails all should fail @@ -765,7 +787,9 @@ mod tests { ) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); Ok(()) @@ -789,7 +813,9 @@ mod tests { let promise = client_node .notify_on_graph_change_with_period(Duration::from_millis(1), clients_ready_condition); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); // Describe all parameters let request = DescribeParameters_Request { @@ -836,7 +862,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // If a describe parameters request is sent with a non existing parameter, an empty @@ -860,7 +888,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); // Get all parameter types, including a non existing one that will be NOT_SET @@ -888,7 +918,9 @@ mod tests { }) .unwrap(); - executor.spin(SpinOptions::default().until_promise_resolved(promise)); + executor + .spin(SpinOptions::default().until_promise_resolved(promise)) + .unwrap(); assert!(callback_ran.load(Ordering::Acquire)); Ok(()) diff --git a/rclrs/src/service.rs b/rclrs/src/service.rs index 87d5a0fcd..9430c0383 100644 --- a/rclrs/src/service.rs +++ b/rclrs/src/service.rs @@ -1,11 +1,9 @@ use std::{ boxed::Box, ffi::{CStr, CString}, - sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard}, + sync::{Arc, Mutex, MutexGuard}, }; -use rosidl_runtime_rs::Message; - use crate::{ error::ToResult, rcl_bindings::*, ExecutorCommands, IntoPrimitiveOptions, NodeHandle, QoSProfile, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, Waitable, diff --git a/rclrs/src/test_helpers/graph_helpers.rs b/rclrs/src/test_helpers/graph_helpers.rs index 9aa93bf0c..f61f5db96 100644 --- a/rclrs/src/test_helpers/graph_helpers.rs +++ b/rclrs/src/test_helpers/graph_helpers.rs @@ -1,5 +1,4 @@ use crate::{Context, IntoNodeOptions, Node, RclrsError}; -use std::sync::Arc; pub(crate) struct TestGraph { pub node1: Node, diff --git a/rclrs/src/wait_set.rs b/rclrs/src/wait_set.rs index 54a335b81..5729aa98c 100644 --- a/rclrs/src/wait_set.rs +++ b/rclrs/src/wait_set.rs @@ -270,7 +270,9 @@ mod tests { let start = std::time::Instant::now(); // This should stop spinning right away because the guard condition was // already triggered. - executor.spin(SpinOptions::spin_once().timeout(Duration::from_secs(10))); + executor + .spin(SpinOptions::spin_once().timeout(Duration::from_secs(10))) + .unwrap(); // If it took more than a second to finish spinning then something is // probably wrong. diff --git a/rclrs/src/wait_set/wait_set_runner.rs b/rclrs/src/wait_set/wait_set_runner.rs index 1c7d633c9..f7ce8c71a 100644 --- a/rclrs/src/wait_set/wait_set_runner.rs +++ b/rclrs/src/wait_set/wait_set_runner.rs @@ -8,7 +8,7 @@ use std::{ time::{Duration, Instant}, }; -use crate::{Context, Promise, RclReturnCode, RclrsError, SpinConditions, WaitSet, Waitable}; +use crate::{Context, Promise, RclrsError, SpinConditions, WaitSet, Waitable}; /// This is a utility class that executors can use to easily run and manage /// their wait set. @@ -45,12 +45,12 @@ impl WaitSetRunner { /// the best practice is for your executor runtime to swap that out with a /// new promise which ensures that the [`SpinConditions::guard_condition`] /// will be triggered after the user-provided promise is resolved. - pub fn run(mut self, conditions: SpinConditions) -> Promise { + pub fn run(mut self, conditions: SpinConditions) -> Promise<(Self, Result<(), RclrsError>)> { let (sender, promise) = channel(); std::thread::spawn(move || { - self.run_blocking(conditions); + let result = self.run_blocking(conditions); // TODO(@mxgrey): Log any error here when logging becomes available - sender.send(self).ok(); + sender.send((self, result)).ok(); }); promise @@ -63,7 +63,7 @@ impl WaitSetRunner { /// the best practice is for your executor runtime to swap that out with a /// new promise which ensures that the [`SpinConditions::guard_condition`] /// will be triggered after the user-provided promise is resolved. - pub fn run_blocking(&mut self, mut conditions: SpinConditions) { + pub fn run_blocking(&mut self, mut conditions: SpinConditions) -> Result<(), RclrsError> { let mut first_spin = true; let t_stop_spinning = conditions.options.timeout.map(|dt| Instant::now() + dt); loop { @@ -81,7 +81,7 @@ impl WaitSetRunner { if conditions.options.only_next_available_work && !first_spin { // We've already completed a spin and were asked to only do one, // so break here - break; + return Ok(()); } first_spin = false; @@ -89,19 +89,19 @@ impl WaitSetRunner { let r = promise.try_recv(); if r.is_ok_and(|r| r.is_some()) || r.is_err() { // The promise has been resolved, so we should stop spinning. - break; + return Ok(()); } } if conditions.halt_spinning.load(Ordering::Acquire) { // The user has manually asked for the spinning to stop - break; + return Ok(()); } if !conditions.context.ok() { // The ROS context has switched to being invalid, so we should // stop spinning. - break; + return Ok(()); } let timeout = t_stop_spinning.map(|t| { @@ -113,24 +113,8 @@ impl WaitSetRunner { } }); - if let Err(err) = self - .wait_set - .wait(timeout, |executable| executable.execute()) - { - match err { - RclrsError::RclError { - code: RclReturnCode::Timeout, - .. - } => { - // We have timed out, so we should stop waiting. - break; - } - err => { - // TODO(@mxgrey): Change this to a log when logging becomes available - eprintln!("Error while processing wait set: {err}"); - } - } - } + self.wait_set + .wait(timeout, |executable| executable.execute())?; } } }