Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Execution #421

Closed
wants to merge 52 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
9b9de10
Drafting traits for generic executors
mxgrey Oct 7, 2024
2dba330
Fleshing out interfaces for async execution
mxgrey Oct 7, 2024
f10036d
Beginning to migrate subscriptions to async
mxgrey Oct 8, 2024
b1dda94
Implementing async execution for subscriptions
mxgrey Oct 8, 2024
c31ad7d
Update wait set behavior
mxgrey Oct 9, 2024
cda131d
Reimagining the way wait sets are handled
mxgrey Oct 10, 2024
d28ed70
Finished reworking wait sets -- need to migrate all waitables
mxgrey Oct 10, 2024
abb8367
Finished migrating guard conditions and services
mxgrey Oct 11, 2024
3a851be
Change Waitable from trait to struct
mxgrey Oct 12, 2024
2f91083
Migrate clients to new async implementation
mxgrey Oct 12, 2024
cc5b264
Fleshing out basic executor
mxgrey Oct 13, 2024
06cb0ab
Finished implementing basic executor -- needs testing
mxgrey Oct 13, 2024
40d8746
Add support for waiting on graph events
mxgrey Oct 13, 2024
329cafb
Migrate parameter service tests to new async executor
mxgrey Oct 13, 2024
d15f762
Try async-std timeouts instead of tokio
mxgrey Oct 13, 2024
fe56cc9
Debugging strange client failure to take
mxgrey Oct 13, 2024
8f0f192
Experimenting with only taking from services and clients in the same …
mxgrey Oct 14, 2024
c99f410
Migrate subscription to use shared callback instead of an async task
mxgrey Oct 14, 2024
26b41f8
Remove task modules that are no longer used
mxgrey Oct 14, 2024
500b083
Rename wait module to wait_set
mxgrey Oct 14, 2024
ee15de5
Move the wait_set_runner to the wait_set module
mxgrey Oct 14, 2024
d395f5f
Remove unnecessary debug outputs
mxgrey Oct 14, 2024
5c59592
Big cleanup
mxgrey Oct 14, 2024
0138e33
Update doctests
mxgrey Oct 14, 2024
22d2c84
Remove old comments
mxgrey Oct 14, 2024
29fe10e
Migrate Context, Executor, and Node creation to new API
mxgrey Nov 16, 2024
1b5c187
Update examples
mxgrey Nov 16, 2024
1ec9f10
Fix documentation
mxgrey Nov 19, 2024
0874d8d
Fix formatting
mxgrey Nov 20, 2024
433a348
Migrate to SubscriptionOptions
mxgrey Nov 20, 2024
f12f874
Migrate to PublisherOptions
mxgrey Nov 20, 2024
2c32e20
Migrate to ServiceOptions
mxgrey Nov 20, 2024
6c61c9c
Migrate to ClientOptions
mxgrey Nov 20, 2024
da05361
Enable direct creation of the _Options for all primitive types
mxgrey Nov 20, 2024
bf3d01a
Migrate Node to shared state pattern
mxgrey Nov 20, 2024
bbb5333
Migrate primitives to state pattern
mxgrey Nov 20, 2024
4c2a67b
Fix example formatting
mxgrey Nov 20, 2024
daebaa8
Fix example formatting
mxgrey Nov 20, 2024
2cff0b7
Fix examples
mxgrey Nov 20, 2024
126aaca
Fix docs
mxgrey Nov 20, 2024
012ff2e
Make deadline, liveliness_lease, and lifespan all symmetric
mxgrey Nov 21, 2024
f33e8d5
Add an API to the primitive options builder for avoiding ROS namespac…
mxgrey Nov 21, 2024
98b8ea2
Retrigger CI
mxgrey Nov 21, 2024
6059506
Implicitly cast &str to NodeOptions
mxgrey Nov 23, 2024
259fcb3
Remove debug outputs
mxgrey Nov 23, 2024
e1ceb70
Fix formatting
mxgrey Nov 23, 2024
0918476
Merge with latest main
mxgrey Dec 9, 2024
f1c4716
Merge execution structure PR
mxgrey Dec 9, 2024
2a7ff60
Merge options pattern PR
mxgrey Dec 9, 2024
e86707e
Merge in shared state pattern
mxgrey Dec 9, 2024
1f1d826
Refining merge
mxgrey Dec 9, 2024
6d3f7e4
Update examples
mxgrey Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Migrate Node to shared state pattern
Signed-off-by: Michael X. Grey <greyxmike@gmail.com>
  • Loading branch information
mxgrey committed Nov 20, 2024

Verified

This commit was signed with the committer’s verified signature.
adamlui Adam Lui
commit bf3d01acec2a936bdb4168e66b04de6a93d7224f
2 changes: 1 addition & 1 deletion examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ use anyhow::{Error, Result};

struct MinimalSubscriber {
num_messages: AtomicU32,
node: Arc<rclrs::Node>,
node: rclrs::Node,
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
}

2 changes: 1 addition & 1 deletion examples/rust_pubsub/src/simple_publisher.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ use rclrs::{create_node, Context, Node, Publisher, RclrsError, QOS_PROFILE_DEFAU
use std::{sync::Arc, thread, time::Duration};
use std_msgs::msg::String as StringMsg;
struct SimplePublisherNode {
node: Arc<Node>,
node: Node,
_publisher: Arc<Publisher<StringMsg>>,
}
impl SimplePublisherNode {
2 changes: 1 addition & 1 deletion examples/rust_pubsub/src/simple_subscriber.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use std::{
};
use std_msgs::msg::String as StringMsg;
pub struct SimpleSubscriptionNode {
node: Arc<Node>,
node: Node,
_subscriber: Arc<Subscription<StringMsg>>,
data: Arc<Mutex<Option<StringMsg>>>,
}
2 changes: 1 addition & 1 deletion rclrs/src/client.rs
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ type RequestId = i64;
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
/// ensure that [`Node`][2]s can track all the clients that have been created.
///
/// [1]: crate::Node::create_client
/// [1]: crate::NodeState::create_client
/// [2]: crate::Node
pub struct Client<T>
where
10 changes: 6 additions & 4 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::{rcl_bindings::rcl_context_is_valid, Node, RclReturnCode, RclrsError, WaitSet};
use crate::{
rcl_bindings::rcl_context_is_valid, Node, NodeState, RclReturnCode, RclrsError, WaitSet,
};
use std::{
sync::{Arc, Mutex, Weak},
time::Duration,
};

/// Single-threaded executor implementation.
pub struct SingleThreadedExecutor {
nodes_mtx: Mutex<Vec<Weak<Node>>>,
nodes_mtx: Mutex<Vec<Weak<NodeState>>>,
}

impl Default for SingleThreadedExecutor {
@@ -24,13 +26,13 @@ impl SingleThreadedExecutor {
}

/// Add a node to the executor.
pub fn add_node(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
pub fn add_node(&self, node: &Node) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }.push(Arc::downgrade(node));
Ok(())
}

/// Remove a node from the executor.
pub fn remove_node(&self, node: Arc<Node>) -> Result<(), RclrsError> {
pub fn remove_node(&self, node: Node) -> Result<(), RclrsError> {
{ self.nodes_mtx.lock().unwrap() }
.retain(|n| !n.upgrade().map(|n| Arc::ptr_eq(&n, &node)).unwrap_or(false));
Ok(())
18 changes: 9 additions & 9 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ mod rcl_bindings;
#[cfg(feature = "dyn_msg")]
pub mod dynamic_message;

use std::{sync::Arc, time::Duration};
use std::time::Duration;

pub use arguments::*;
pub use client::*;
@@ -59,14 +59,14 @@ pub use wait::*;
/// This can usually be ignored.
///
/// [1]: crate::RclReturnCode
pub fn spin_once(node: Arc<Node>, timeout: Option<Duration>) -> Result<(), RclrsError> {
pub fn spin_once(node: Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
let executor = SingleThreadedExecutor::new();
executor.add_node(&node)?;
executor.spin_once(timeout)
}

/// Convenience function for calling [`spin_once`] in a loop.
pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
pub fn spin(node: Node) -> Result<(), RclrsError> {
let executor = SingleThreadedExecutor::new();
executor.add_node(&node)?;
executor.spin()
@@ -77,7 +77,7 @@ pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
/// Convenience function equivalent to [`Node::new`][1].
/// Please see that function's documentation.
///
/// [1]: crate::Node::new
/// [1]: crate::NodeState::new
///
/// # Example
/// ```
@@ -87,17 +87,17 @@ pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
/// assert!(node.is_ok());
/// # Ok::<(), RclrsError>(())
/// ```
pub fn create_node(context: &Context, node_name: &str) -> Result<Arc<Node>, RclrsError> {
Node::new(context, node_name)
pub fn create_node(context: &Context, node_name: &str) -> Result<Node, RclrsError> {
NodeState::new(context, node_name)
}

/// Creates a [`NodeBuilder`].
///
/// Convenience function equivalent to [`NodeBuilder::new()`][1] and [`Node::builder()`][2].
/// Convenience function equivalent to [`NodeBuilder::new()`][1] and [`NodeState::builder()`][2].
/// Please see that function's documentation.
///
/// [1]: crate::NodeBuilder::new
/// [2]: crate::Node::builder
/// [2]: crate::NodeState::builder
///
/// # Example
/// ```
@@ -109,5 +109,5 @@ pub fn create_node(context: &Context, node_name: &str) -> Result<Arc<Node>, Rclr
/// # Ok::<(), RclrsError>(())
/// ```
pub fn create_node_builder(context: &Context, node_name: &str) -> NodeBuilder {
Node::builder(context, node_name)
NodeState::builder(context, node_name)
}
31 changes: 21 additions & 10 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ unsafe impl Send for rcl_node_t {}
/// In that sense, the parameters to the node creation functions are only the _default_ namespace and
/// name.
/// See also the [official tutorial][1] on the command line arguments for ROS nodes, and the
/// [`Node::namespace()`] and [`Node::name()`] functions for examples.
/// [`NodeState::namespace()`] and [`NodeState::name()`] functions for examples.
///
/// ## Rules for valid names
/// The rules for valid node names and node namespaces are explained in
@@ -58,7 +58,18 @@ unsafe impl Send for rcl_node_t {}
/// [2]: https://docs.ros.org/en/rolling/How-To-Guides/Node-arguments.html
/// [3]: crate::NodeBuilder::new
/// [4]: crate::NodeBuilder::namespace
pub struct Node {
pub type Node = Arc<NodeState>;

/// The inner state of a [`Node`].
///
/// This is public so that you can choose to put it inside a [`Weak`] if you
/// want to be able to refer to a [`Node`] in a non-owning way. It is generally
/// recommended to manage the [`NodeState`] inside of an [`Arc`], and [`Node`]
/// recommended to manage the `NodeState` inside of an [`Arc`], and [`Node`]
/// is provided as convenience alias for that.
///
/// The public API of the [`Node`] type is implemented via `NodeState`.
pub struct NodeState {
pub(crate) clients_mtx: Mutex<Vec<Weak<dyn ClientBase>>>,
pub(crate) guard_conditions_mtx: Mutex<Vec<Weak<GuardCondition>>>,
pub(crate) services_mtx: Mutex<Vec<Weak<dyn ServiceBase>>>,
@@ -89,28 +100,28 @@ impl Drop for NodeHandle {
}
}

impl Eq for Node {}
impl Eq for NodeState {}

impl PartialEq for Node {
impl PartialEq for NodeState {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.handle, &other.handle)
}
}

impl fmt::Debug for Node {
impl fmt::Debug for NodeState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Node")
.field("fully_qualified_name", &self.fully_qualified_name())
.finish()
}
}

impl Node {
impl NodeState {
/// Creates a new node in the empty namespace.
///
/// See [`NodeBuilder::new()`] for documentation.
#[allow(clippy::new_ret_no_self)]
pub fn new(context: &Context, node_name: &str) -> Result<Arc<Node>, RclrsError> {
pub fn new(context: &Context, node_name: &str) -> Result<Node, RclrsError> {
Self::builder(context, node_name).build()
}

@@ -171,7 +182,7 @@ impl Node {
/// Returns the fully qualified name of the node.
///
/// The fully qualified name of the node is the node namespace combined with the node name.
/// It is subject to the remappings shown in [`Node::name()`] and [`Node::namespace()`].
/// It is subject to the remappings shown in [`NodeState::name()`] and [`NodeState::namespace()`].
///
/// # Example
/// ```
@@ -431,9 +442,9 @@ impl Node {
///
/// # Example
/// ```
/// # use rclrs::{Context, Node, RclrsError};
/// # use rclrs::{Context, NodeState, RclrsError};
/// let context = Context::new([])?;
/// let node = Node::builder(&context, "my_node").build()?;
/// let node = NodeState::builder(&context, "my_node").build()?;
/// assert_eq!(node.name(), "my_node");
/// # Ok::<(), RclrsError>(())
/// ```
29 changes: 15 additions & 14 deletions rclrs/src/node/builder.rs
Original file line number Diff line number Diff line change
@@ -4,14 +4,15 @@ use std::{
};

use crate::{
rcl_bindings::*, ClockType, Context, ContextHandle, Node, NodeHandle, ParameterInterface,
QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX, QOS_PROFILE_CLOCK,
rcl_bindings::*, ClockType, Context, ContextHandle, Node, NodeHandle, NodeState,
ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX,
QOS_PROFILE_CLOCK,
};

/// A builder for creating a [`Node`][1].
///
/// The builder pattern allows selectively setting some fields, and leaving all others at their default values.
/// This struct instance can be created via [`Node::builder()`][2].
/// This struct instance can be created via [`NodeState::builder()`][2].
///
/// The default values for optional fields are:
/// - `namespace: "/"`
@@ -24,25 +25,25 @@ use crate::{
///
/// # Example
/// ```
/// # use rclrs::{Context, NodeBuilder, Node, RclrsError};
/// # use rclrs::{Context, NodeBuilder, NodeState, RclrsError};
/// let context = Context::new([])?;
/// // Building a node in a single expression
/// let node = NodeBuilder::new(&context, "foo_node").namespace("/bar").build()?;
/// assert_eq!(node.name(), "foo_node");
/// assert_eq!(node.namespace(), "/bar");
/// // Building a node via Node::builder()
/// let node = Node::builder(&context, "bar_node").build()?;
/// // Building a node via NodeState::builder()
/// let node = NodeState::builder(&context, "bar_node").build()?;
/// assert_eq!(node.name(), "bar_node");
/// // Building a node step-by-step
/// let mut builder = Node::builder(&context, "goose");
/// let mut builder = NodeState::builder(&context, "goose");
/// builder = builder.namespace("/duck/duck");
/// let node = builder.build()?;
/// assert_eq!(node.fully_qualified_name(), "/duck/duck/goose");
/// # Ok::<(), RclrsError>(())
/// ```
///
/// [1]: crate::Node
/// [2]: crate::Node::builder
/// [2]: crate::NodeState::builder
pub struct NodeBuilder {
context: Arc<ContextHandle>,
name: String,
@@ -126,22 +127,22 @@ impl NodeBuilder {
///
/// # Example
/// ```
/// # use rclrs::{Context, Node, RclrsError, RclReturnCode};
/// # use rclrs::{Context, NodeState, RclrsError, RclReturnCode};
/// let context = Context::new([])?;
/// // This is a valid namespace
/// let builder_ok_ns = Node::builder(&context, "my_node").namespace("/some/nested/namespace");
/// let builder_ok_ns = NodeState::builder(&context, "my_node").namespace("/some/nested/namespace");
/// assert!(builder_ok_ns.build().is_ok());
/// // This is an invalid namespace
/// assert!(matches!(
/// Node::builder(&context, "my_node")
/// NodeState::builder(&context, "my_node")
/// .namespace("/10_percent_luck/20_percent_skill")
/// .build()
/// .unwrap_err(),
/// RclrsError::RclError { code: RclReturnCode::NodeInvalidNamespace, .. }
/// ));
/// // A missing forward slash at the beginning is automatically added
/// assert_eq!(
/// Node::builder(&context, "my_node")
/// NodeState::builder(&context, "my_node")
/// .namespace("foo")
/// .build()?
/// .namespace(),
@@ -262,7 +263,7 @@ impl NodeBuilder {
/// For example usage, see the [`NodeBuilder`][1] docs.
///
/// [1]: crate::NodeBuilder
pub fn build(&self) -> Result<Arc<Node>, RclrsError> {
pub fn build(&self) -> Result<Node, RclrsError> {
let node_name =
CString::new(self.name.as_str()).map_err(|err| RclrsError::StringContainsNul {
err,
@@ -308,7 +309,7 @@ impl NodeBuilder {
&rcl_context.global_arguments,
)?
};
let node = Arc::new(Node {
let node = Arc::new(NodeState {
handle,
clients_mtx: Mutex::new(vec![]),
guard_conditions_mtx: Mutex::new(vec![]),
10 changes: 5 additions & 5 deletions rclrs/src/node/graph.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use std::{
ffi::{CStr, CString},
};

use crate::{rcl_bindings::*, Node, RclrsError, ToResult};
use crate::{rcl_bindings::*, NodeState, RclrsError, ToResult};

impl Drop for rmw_names_and_types_t {
fn drop(&mut self) {
@@ -57,7 +57,7 @@ pub struct TopicEndpointInfo {
pub topic_type: String,
}

impl Node {
impl NodeState {
/// Returns a list of topic names and types for publishers associated with a node.
pub fn get_publisher_names_and_types_by_node(
&self,
@@ -486,7 +486,7 @@ mod tests {
Context::new_with_options([], InitOptions::new().with_domain_id(Some(domain_id)))
.unwrap();
let node_name = "test_publisher_names_and_types";
let node = Node::new(&context, node_name).unwrap();
let node = NodeState::new(&context, node_name).unwrap();
// Test that the graph has no publishers
let names_and_topics = node
.get_publisher_names_and_types_by_node(node_name, "")
@@ -545,7 +545,7 @@ mod tests {
fn test_node_names() {
let context = Context::new([]).unwrap();
let node_name = "test_node_names";
let node = Node::new(&context, node_name).unwrap();
let node = NodeState::new(&context, node_name).unwrap();

let names_and_namespaces = node.get_node_names().unwrap();

@@ -561,7 +561,7 @@ mod tests {
fn test_node_names_with_enclaves() {
let context = Context::new([]).unwrap();
let node_name = "test_node_names_with_enclaves";
let node = Node::new(&context, node_name).unwrap();
let node = NodeState::new(&context, node_name).unwrap();

let names_and_namespaces = node.get_node_names_with_enclaves().unwrap();

2 changes: 1 addition & 1 deletion rclrs/src/parameter.rs
Original file line number Diff line number Diff line change
@@ -82,7 +82,7 @@ enum DeclaredValue {
}

/// Builder used to declare a parameter. Obtain this by calling
/// [`crate::Node::declare_parameter`].
/// [`crate::NodeState::declare_parameter`].
#[must_use]
pub struct ParameterBuilder<'a, T: ParameterVariant> {
name: Arc<str>,
4 changes: 2 additions & 2 deletions rclrs/src/parameter/service.rs
Original file line number Diff line number Diff line change
@@ -319,7 +319,7 @@ mod tests {
use std::sync::{Arc, RwLock};

struct TestNode {
node: Arc<Node>,
node: Node,
bool_param: MandatoryParameter<bool>,
_ns_param: MandatoryParameter<i64>,
_read_only_param: ReadOnlyParameter<f64>,
@@ -341,7 +341,7 @@ mod tests {
Ok(())
}

fn construct_test_nodes(context: &Context, ns: &str) -> (TestNode, Arc<Node>) {
fn construct_test_nodes(context: &Context, ns: &str) -> (TestNode, Node) {
let node = NodeBuilder::new(context, "node")
.namespace(ns)
.build()
2 changes: 1 addition & 1 deletion rclrs/src/service.rs
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ type ServiceCallback<Request, Response> =
/// The only available way to instantiate services is via [`Node::create_service()`][1], this is to
/// ensure that [`Node`][2]s can track all the services that have been created.
///
/// [1]: crate::Node::create_service
/// [1]: crate::NodeState::create_service
/// [2]: crate::Node
pub struct Service<T>
where
Loading