Skip to content

Commit

Permalink
Merge in shared state pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Michael X. Grey <greyxmike@gmail.com>
  • Loading branch information
mxgrey committed Dec 9, 2024
2 parents 2a7ff60 + 2cff0b7 commit e86707e
Show file tree
Hide file tree
Showing 16 changed files with 163 additions and 88 deletions.
4 changes: 2 additions & 2 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use anyhow::{Error, Result};

struct MinimalSubscriber {
num_messages: AtomicU32,
node: Arc<rclrs::Node>,
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
node: rclrs::Node,
subscription: Mutex<Option<rclrs::Subscription<std_msgs::msg::String>>>,
}

impl MinimalSubscriber {
Expand Down
7 changes: 3 additions & 4 deletions examples/rust_pubsub/src/simple_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{sync::Arc, thread, time::Duration};
use std_msgs::msg::String as StringMsg;

struct SimplePublisherNode {
publisher: Arc<Publisher<StringMsg>>,
publisher: Publisher<StringMsg>,
}

impl SimplePublisherNode {
Expand All @@ -26,12 +26,11 @@ impl SimplePublisherNode {

fn main() -> Result<(), RclrsError> {
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
let publisher = Arc::new(SimplePublisher::new(&executor).unwrap());
let publisher_other_thread = Arc::clone(&publisher);
let node = Arc::new(SimplePublisher::new(&executor).unwrap());
let mut count: i32 = 0;
thread::spawn(move || loop {
thread::sleep(Duration::from_millis(1000));
count = publisher_other_thread.publish_data(count).unwrap();
count = node.publish_data(count).unwrap();
});
executor.spin(SpinOptions::default())
}
5 changes: 2 additions & 3 deletions examples/rust_pubsub/src/simple_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ impl SimpleSubscriptionNode {
}
fn main() -> Result<(), RclrsError> {
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
let subscription = Arc::new(SimpleSubscriptionNode::new(&executor).unwrap());
let subscription_other_thread = Arc::clone(&subscription);
let node = Arc::new(SimpleSubscriptionNode::new(&executor).unwrap());
thread::spawn(move || loop {
thread::sleep(Duration::from_millis(1000));
subscription_other_thread.data_callback().unwrap()
node.data_callback().unwrap()
});
executor.spin(SpinOptions::default())
}
31 changes: 22 additions & 9 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
ffi::CString,
sync::{Arc, Mutex, MutexGuard},
sync::{atomic::AtomicBool, Arc, Mutex, MutexGuard},
};

use rosidl_runtime_rs::Message;
Expand All @@ -23,12 +23,25 @@ pub use client_output::*;

/// Main class responsible for sending requests to a ROS service.
///
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
/// ensure that [`Node`][2]s can track all the clients that have been created.
/// Create a client using [`Node::create_client`][1].
///
/// [1]: crate::Node::create_client
/// [2]: crate::Node
pub struct Client<T>
/// Receiving responses requires the node's executor to [spin][2].
///
/// [1]: crate::NodeState::create_client
/// [2]: crate::spin
pub type Client<T> = Arc<ClientState<T>>;

/// The inner state of a [`Client`].
///
/// This is public so that you can choose to create a [`Weak`][1] reference to it
/// if you want to be able to refer to a [`Client`] in a non-owning way. It is
/// generally recommended to manage the `ClientState` inside of an [`Arc`],
/// and [`Client`] is provided as a convenience alias for that.
///
/// The public API of the [`Client`] type is implemented via `ClientState`.
///
/// [1]: std::sync::Weak
pub struct ClientState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand All @@ -38,7 +51,7 @@ where
lifecycle: WaitableLifecycle,
}

impl<T> Client<T>
impl<T> ClientState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand Down Expand Up @@ -171,7 +184,7 @@ where

/// Creates a new client.
pub(crate) fn create<'a>(
node: &Arc<Node>,
node: &Node,
options: impl Into<ClientOptions<'a>>,
) -> Result<Arc<Self>, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
Expand Down Expand Up @@ -407,7 +420,7 @@ struct ClientHandle {
rcl_client: Mutex<rcl_client_t>,
/// We store the whole node here because we use some of its user-facing API
/// in some of the Client methods.
node: Arc<Node>,
node: Node,
}

impl ClientHandle {
Expand Down
4 changes: 2 additions & 2 deletions rclrs/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Executor {
pub fn create_node<'a>(
&'a self,
options: impl IntoNodeOptions<'a>,
) -> Result<Arc<Node>, RclrsError> {
) -> Result<Node, RclrsError> {
let options = options.into_node_options();
let node = options.build(&self.commands)?;
Ok(node)
Expand Down Expand Up @@ -128,7 +128,7 @@ impl ExecutorCommands {
pub fn create_node<'a>(
self: &Arc<Self>,
options: impl IntoNodeOptions<'a>,
) -> Result<Arc<Node>, RclrsError> {
) -> Result<Node, RclrsError> {
let options = options.into_node_options();
options.build(self)
}
Expand Down
61 changes: 37 additions & 24 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use async_std::future::timeout;
use rosidl_runtime_rs::Message;

use crate::{
rcl_bindings::*, Client, ClientOptions, Clock, ContextHandle, ExecutorCommands, LogParams,
Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise, Publisher,
PublisherOptions, QoSProfile, RclrsError, Service, ServiceAsyncCallback, ServiceCallback,
ServiceOptions, Subscription, SubscriptionAsyncCallback, SubscriptionCallback,
SubscriptionOptions, TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
rcl_bindings::*, Client, ClientOptions, ClientState, Clock, ContextHandle, ExecutorCommands,
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Promise,
Publisher, PublisherOptions, PublisherState, QoSProfile, RclrsError, Service,
ServiceAsyncCallback, ServiceCallback, ServiceOptions, ServiceState, Subscription,
SubscriptionAsyncCallback, SubscriptionCallback, SubscriptionOptions, SubscriptionState,
TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
};

/// A processing unit that can communicate with other nodes.
Expand Down Expand Up @@ -78,7 +79,19 @@ use crate::{
/// [5]: crate::NodeOptions::new
/// [6]: crate::NodeOptions::namespace
/// [7]: crate::Executor::create_node
pub struct Node {
pub type Node = Arc<NodeState>;

/// The inner state of a [`Node`].
///
/// This is public so that you can choose to put it inside a [`Weak`] if you
/// want to be able to refer to a [`Node`] in a non-owning way. It is generally
/// recommended to manage the [`NodeState`] inside of an [`Arc`], and [`Node`]
/// recommended to manage the `NodeState` inside of an [`Arc`], and [`Node`]
/// is provided as convenience alias for that.
///
/// The public API of the [`Node`] type is implemented via `NodeState`.
pub struct NodeState {
time_source: TimeSource,
parameter: ParameterInterface,
logger: Logger,
Expand Down Expand Up @@ -132,23 +145,23 @@ impl Drop for NodeHandle {
}
}

impl Eq for Node {}
impl Eq for NodeState {}

impl PartialEq for Node {
impl PartialEq for NodeState {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.handle, &other.handle)
}
}

impl fmt::Debug for Node {
impl fmt::Debug for NodeState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Node")
.field("fully_qualified_name", &self.fully_qualified_name())
.finish()
}
}

impl Node {
impl NodeState {
/// Returns the clock associated with this node.
pub fn get_clock(&self) -> Clock {
self.time_source.get_clock()
Expand Down Expand Up @@ -206,7 +219,7 @@ impl Node {
/// Returns the fully qualified name of the node.
///
/// The fully qualified name of the node is the node namespace combined with the node name.
/// It is subject to the remappings shown in [`Node::name()`] and [`Node::namespace()`].
/// It is subject to the remappings shown in [`NodeState::name()`] and [`NodeState::namespace()`].
///
/// # Example
/// ```
Expand Down Expand Up @@ -261,11 +274,11 @@ impl Node {
pub fn create_client<'a, T>(
self: &Arc<Self>,
options: impl Into<ClientOptions<'a>>,
) -> Result<Arc<Client<T>>, RclrsError>
) -> Result<Client<T>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
Client::<T>::create(self, options)
ClientState::<T>::create(self, options)
}

/// Creates a [`Publisher`][1].
Expand Down Expand Up @@ -305,11 +318,11 @@ impl Node {
pub fn create_publisher<'a, T>(
&self,
options: impl Into<PublisherOptions<'a>>,
) -> Result<Arc<Publisher<T>>, RclrsError>
) -> Result<Publisher<T>, RclrsError>
where
T: Message,
{
Publisher::<T>::create(Arc::clone(&self.handle), options)
PublisherState::<T>::create(Arc::clone(&self.handle), options)
}

/// Creates a [`Service`] with an ordinary callback.
Expand Down Expand Up @@ -373,11 +386,11 @@ impl Node {
&self,
options: impl Into<ServiceOptions<'a>>,
callback: impl ServiceCallback<T, Args>,
) -> Result<Arc<Service<T>>, RclrsError>
) -> Result<Service<T>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
Service::<T>::create(
ServiceState::<T>::create(
options,
callback.into_service_callback(),
&self.handle,
Expand Down Expand Up @@ -408,11 +421,11 @@ impl Node {
&self,
options: impl Into<ServiceOptions<'a>>,
callback: impl ServiceAsyncCallback<T, Args>,
) -> Result<Arc<Service<T>>, RclrsError>
) -> Result<Service<T>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
Service::<T>::create(
ServiceState::<T>::create(
options,
callback.into_service_async_callback(),
&self.handle,
Expand Down Expand Up @@ -480,11 +493,11 @@ impl Node {
&self,
options: impl Into<SubscriptionOptions<'a>>,
callback: impl SubscriptionCallback<T, Args>,
) -> Result<Arc<Subscription<T>>, RclrsError>
) -> Result<Subscription<T>, RclrsError>
where
T: Message,
{
Subscription::<T>::create(
SubscriptionState::<T>::create(
options,
callback.into_subscription_callback(),
&self.handle,
Expand Down Expand Up @@ -515,11 +528,11 @@ impl Node {
&self,
options: impl Into<SubscriptionOptions<'a>>,
callback: impl SubscriptionAsyncCallback<T, Args>,
) -> Result<Arc<Subscription<T>>, RclrsError>
) -> Result<Subscription<T>, RclrsError>
where
T: Message,
{
Subscription::<T>::create(
SubscriptionState::<T>::create(
options,
callback.into_subscription_async_callback(),
&self.handle,
Expand Down Expand Up @@ -689,7 +702,7 @@ impl Node {
}
}

impl<'a> ToLogParams<'a> for &'a Node {
impl<'a> ToLogParams<'a> for &'a NodeState {
fn to_log_params(self) -> LogParams<'a> {
self.logger().to_log_params()
}
Expand Down
4 changes: 2 additions & 2 deletions rclrs/src/node/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
ffi::{CStr, CString},
};

use crate::{rcl_bindings::*, Node, RclrsError, ToResult};
use crate::{rcl_bindings::*, NodeState, RclrsError, ToResult};

impl Drop for rmw_names_and_types_t {
fn drop(&mut self) {
Expand Down Expand Up @@ -57,7 +57,7 @@ pub struct TopicEndpointInfo {
pub topic_type: String,
}

impl Node {
impl NodeState {
/// Returns a list of topic names and types for publishers associated with a node.
pub fn get_publisher_names_and_types_by_node(
&self,
Expand Down
9 changes: 5 additions & 4 deletions rclrs/src/node/node_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use futures::channel::mpsc::unbounded;
use crate::{
node::node_graph_task::{node_graph_task, NodeGraphAction},
rcl_bindings::*,
ClockType, ExecutorCommands, GuardCondition, Logger, Node, NodeHandle, ParameterInterface,
QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX, QOS_PROFILE_CLOCK,
ClockType, ExecutorCommands, GuardCondition, Logger, Node, NodeHandle, NodeState,
ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX,
QOS_PROFILE_CLOCK,
};

/// This trait helps to build [`NodeOptions`] which can be passed into
Expand Down Expand Up @@ -289,7 +290,7 @@ impl<'a> NodeOptions<'a> {
///
/// Only used internally. Downstream users should call
/// [`Executor::create_node`].
pub(crate) fn build(self, commands: &Arc<ExecutorCommands>) -> Result<Arc<Node>, RclrsError> {
pub(crate) fn build(self, commands: &Arc<ExecutorCommands>) -> Result<Node, RclrsError> {
let node_name = CString::new(self.name).map_err(|err| RclrsError::StringContainsNul {
err,
s: self.name.to_owned(),
Expand Down Expand Up @@ -388,7 +389,7 @@ impl<'a> NodeOptions<'a> {
graph_change_guard_condition,
));

let node = Arc::new(Node {
let node = Arc::new(NodeState {
time_source: TimeSource::builder(self.clock_type)
.clock_qos(self.clock_qos)
.build(),
Expand Down
2 changes: 1 addition & 1 deletion rclrs/src/parameter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ enum DeclaredValue {
}

/// Builder used to declare a parameter. Obtain this by calling
/// [`crate::Node::declare_parameter`].
/// [`crate::NodeState::declare_parameter`].
#[must_use]
pub struct ParameterBuilder<'a, T: ParameterVariant> {
name: Arc<str>,
Expand Down
Loading

0 comments on commit e86707e

Please sign in to comment.