Skip to content

Commit

Permalink
Refining merge
Browse files Browse the repository at this point in the history
Signed-off-by: Michael X. Grey <greyxmike@gmail.com>
  • Loading branch information
mxgrey committed Dec 9, 2024
1 parent e86707e commit 1f1d826
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 88 deletions.
2 changes: 1 addition & 1 deletion rclrs/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
ffi::CString,
sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard},
sync::{Arc, Mutex, MutexGuard},
};

use rosidl_runtime_rs::Message;
Expand Down
25 changes: 14 additions & 11 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl Executor {
/// [`SpinOptions`] can be used to automatically stop the spinning when
/// certain conditions are met. Use `SpinOptions::default()` to allow the
/// Executor to keep spinning indefinitely.
pub fn spin(&mut self, options: SpinOptions) {
pub fn spin(&mut self, options: SpinOptions) -> Result<(), RclrsError> {
let conditions = self.make_spin_conditions(options);
self.runtime.spin(conditions);
self.runtime.spin(conditions)
}

/// Spin the Executor as an async task. This does not block the current thread.
Expand All @@ -60,20 +60,23 @@ impl Executor {
/// The async task will run until the [`SpinConditions`] stop the Executor
/// from spinning. The output of the async task will be the restored Executor,
/// which you can use to resume spinning after the task is finished.
pub async fn spin_async(self, options: SpinOptions) -> Self {
pub async fn spin_async(self, options: SpinOptions) -> (Self, Result<(), RclrsError>) {
let conditions = self.make_spin_conditions(options);
let Self {
context,
commands,
runtime,
} = self;

let runtime = runtime.spin_async(conditions).await;
Self {
context,
commands,
runtime,
}
let (runtime, result) = runtime.spin_async(conditions).await;
(
Self {
context,
commands,
runtime,
},
result,
)
}

/// Creates a new executor using the provided runtime. Users of rclrs should
Expand Down Expand Up @@ -231,7 +234,7 @@ pub trait ExecutorRuntime: Send {

/// Tell the runtime to spin while blocking any further execution until the
/// spinning is complete.
fn spin(&mut self, conditions: SpinConditions);
fn spin(&mut self, conditions: SpinConditions) -> Result<(), RclrsError>;

/// Tell the runtime to spin asynchronously, not blocking the current
/// thread. The runtime instance will be consumed by this function, but it
Expand All @@ -240,7 +243,7 @@ pub trait ExecutorRuntime: Send {
fn spin_async(
self: Box<Self>,
conditions: SpinConditions,
) -> BoxFuture<'static, Box<dyn ExecutorRuntime>>;
) -> BoxFuture<'static, (Box<dyn ExecutorRuntime>, Result<(), RclrsError>)>;
}

/// A bundle of optional conditions that a user may want to impose on how long
Expand Down
23 changes: 12 additions & 11 deletions rclrs/src/executor/basic_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{

use crate::{
executor::{ExecutorChannel, ExecutorRuntime, SpinConditions},
Context, WaitSetRunner, Waitable,
Context, RclrsError, WaitSetRunner, Waitable,
};

/// The implementation of this runtime is based off of the async Rust reference book:
Expand All @@ -40,7 +40,7 @@ pub struct BasicExecutorRuntime {
}

impl ExecutorRuntime for BasicExecutorRuntime {
fn spin(&mut self, mut conditions: SpinConditions) {
fn spin(&mut self, mut conditions: SpinConditions) -> Result<(), RclrsError> {
self.process_spin_conditions(&mut conditions);

let wait_set_runner = self.wait_set_runner.take().expect(
Expand Down Expand Up @@ -92,19 +92,20 @@ impl ExecutorRuntime for BasicExecutorRuntime {
}
}

self.wait_set_runner = Some(
wait_set_receiver.recv().expect(
"Basic executor failed to receive the WaitSetRunner at the end of its spinning. \
This is a critical bug in rclrs. \
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
)
let (runner, result) = wait_set_receiver.recv().expect(
"Basic executor failed to receive the WaitSetRunner at the end of its spinning. \
This is a critical bug in rclrs. \
Please report this bug to the maintainers of rclrs by providing a minimum reproduction of the problem."
);

self.wait_set_runner = Some(runner);
result
}

fn spin_async(
mut self: Box<Self>,
conditions: SpinConditions,
) -> BoxFuture<'static, Box<dyn ExecutorRuntime>> {
) -> BoxFuture<'static, (Box<dyn ExecutorRuntime>, Result<(), RclrsError>)> {
let (sender, receiver) = oneshot::channel();
// Create a thread to run the executor. We should not run the executor
// as an async task because it blocks its current thread while running.
Expand All @@ -117,8 +118,8 @@ impl ExecutorRuntime for BasicExecutorRuntime {
// executor. But that would probably require us to introduce a new
// dependency such as tokio.
std::thread::spawn(move || {
self.spin(conditions);
sender.send(self as Box<dyn ExecutorRuntime>).ok();
let result = self.spin(conditions);
sender.send((self as Box<dyn ExecutorRuntime>, result)).ok();
});

Box::pin(async move {
Expand Down
8 changes: 4 additions & 4 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use rosidl_runtime_rs::Message;
use crate::{
rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise,
Publisher, PublisherOptions, PublisherState, QoSProfile, RclrsError, Service,
ServiceAsyncCallback, ServiceCallback, ServiceOptions, ServiceState, Subscription,
SubscriptionAsyncCallback, SubscriptionCallback, SubscriptionOptions, SubscriptionState,
TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
Publisher, PublisherOptions, PublisherState, RclrsError, Service, ServiceAsyncCallback,
ServiceCallback, ServiceOptions, ServiceState, Subscription, SubscriptionAsyncCallback,
SubscriptionCallback, SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams,
ENTITY_LIFECYCLE_MUTEX,
};

/// A processing unit that can communicate with other nodes.
Expand Down
90 changes: 61 additions & 29 deletions rclrs/src/parameter/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rosidl_runtime_rs::Sequence;
use super::ParameterMap;
use crate::{
parameter::{DeclaredValue, ParameterKind, ParameterStorage},
rmw_request_id_t, IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service,
IntoPrimitiveOptions, Node, QoSProfile, RclrsError, Service,
};

// The variables only exist to keep a strong reference to the services and are technically unused.
Expand Down Expand Up @@ -437,11 +437,13 @@ mod tests {
!not_finished
});

executor.spin(
SpinOptions::new()
.until_promise_resolved(promise)
.timeout(Duration::from_secs(1)),
);
executor
.spin(
SpinOptions::new()
.until_promise_resolved(promise)
.timeout(Duration::from_secs(1)),
)
.unwrap();

Ok(())
}
Expand All @@ -453,11 +455,13 @@ mod tests {
client_node.create_client::<ListParameters>("/list/node/list_parameters")?;

// return Ok(());
executor.spin(
SpinOptions::default()
.until_promise_resolved(list_client.notify_on_service_ready())
.timeout(Duration::from_secs(2)),
);
executor
.spin(
SpinOptions::default()
.until_promise_resolved(list_client.notify_on_service_ready())
.timeout(Duration::from_secs(2)),
)
.unwrap();

// List all parameters
let callback_ran = Arc::new(AtomicBool::new(false));
Expand All @@ -484,11 +488,13 @@ mod tests {
})
.unwrap();

executor.spin(
SpinOptions::default()
.until_promise_resolved(promise)
.timeout(Duration::from_secs(5)),
);
executor
.spin(
SpinOptions::default()
.until_promise_resolved(promise)
.timeout(Duration::from_secs(5)),
)
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// Limit depth, namespaced parameter is not returned
Expand All @@ -508,7 +514,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// Filter by prefix, just return the requested one with the right prefix
Expand All @@ -529,7 +537,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// If prefix is equal to names, parameters should be returned
Expand All @@ -550,7 +560,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

Ok(())
Expand Down Expand Up @@ -578,7 +590,9 @@ mod tests {
let clients_ready = client_node
.notify_on_graph_change_with_period(Duration::from_millis(1), clients_ready_condition);

executor.spin(SpinOptions::default().until_promise_resolved(clients_ready));
executor
.spin(SpinOptions::default().until_promise_resolved(clients_ready))
.unwrap();

// Get an existing parameter
let callback_ran = Arc::new(AtomicBool::new(false));
Expand All @@ -596,7 +610,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// Getting both existing and non existing parameters, missing one should return
Expand All @@ -617,7 +633,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// Set a mix of existing, non existing, dynamic and out of range parameters
Expand Down Expand Up @@ -717,7 +735,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// Set the node to use undeclared parameters and try to set one
Expand Down Expand Up @@ -746,7 +766,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// With set_parameters_atomically, if one fails all should fail
Expand All @@ -765,7 +787,9 @@ mod tests {
)
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

Ok(())
Expand All @@ -789,7 +813,9 @@ mod tests {
let promise = client_node
.notify_on_graph_change_with_period(Duration::from_millis(1), clients_ready_condition);

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();

// Describe all parameters
let request = DescribeParameters_Request {
Expand Down Expand Up @@ -836,7 +862,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// If a describe parameters request is sent with a non existing parameter, an empty
Expand All @@ -860,7 +888,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

// Get all parameter types, including a non existing one that will be NOT_SET
Expand Down Expand Up @@ -888,7 +918,9 @@ mod tests {
})
.unwrap();

executor.spin(SpinOptions::default().until_promise_resolved(promise));
executor
.spin(SpinOptions::default().until_promise_resolved(promise))
.unwrap();
assert!(callback_ran.load(Ordering::Acquire));

Ok(())
Expand Down
4 changes: 1 addition & 3 deletions rclrs/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{
boxed::Box,
ffi::{CStr, CString},
sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard},
sync::{Arc, Mutex, MutexGuard},
};

use rosidl_runtime_rs::Message;

use crate::{
error::ToResult, rcl_bindings::*, ExecutorCommands, IntoPrimitiveOptions, NodeHandle,
QoSProfile, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, Waitable,
Expand Down
1 change: 0 additions & 1 deletion rclrs/src/test_helpers/graph_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{Context, IntoNodeOptions, Node, RclrsError};
use std::sync::Arc;

pub(crate) struct TestGraph {
pub node1: Node,
Expand Down
4 changes: 3 additions & 1 deletion rclrs/src/wait_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ mod tests {
let start = std::time::Instant::now();
// This should stop spinning right away because the guard condition was
// already triggered.
executor.spin(SpinOptions::spin_once().timeout(Duration::from_secs(10)));
executor
.spin(SpinOptions::spin_once().timeout(Duration::from_secs(10)))
.unwrap();

// If it took more than a second to finish spinning then something is
// probably wrong.
Expand Down
Loading

0 comments on commit 1f1d826

Please sign in to comment.