Skip to content

Commit

Permalink
Migrate primitives to state pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Michael X. Grey <greyxmike@gmail.com>
  • Loading branch information
mxgrey committed Nov 20, 2024
1 parent bf3d01a commit bbb5333
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 65 deletions.
25 changes: 19 additions & 6 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,25 @@ type RequestId = i64;

/// Main class responsible for sending requests to a ROS service.
///
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
/// ensure that [`Node`][2]s can track all the clients that have been created.
/// Create a client using [`Node::create_client`][1].
///
/// Receiving responses requires the node's executor to [spin][2].
///
/// [1]: crate::NodeState::create_client
/// [2]: crate::Node
pub struct Client<T>
/// [2]: crate::spin
pub type Client<T> = Arc<ClientState<T>>;

/// The inner state of a [`Client`].
///
/// This is public so that you can choose to create a [`Weak`][1] reference to it
/// if you want to be able to refer to a [`Client`] in a non-owning way. It is
/// generally recommended to manage the `ClientState` inside of an [`Arc`],
/// and [`Client`] is provided as a convenience alias for that.
///
/// The public API of the [`Client`] type is implemented via `ClientState`.
///
/// [1]: std::sync::Weak
pub struct ClientState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand All @@ -78,7 +91,7 @@ where
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
}

impl<T> Client<T>
impl<T> ClientState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand Down Expand Up @@ -275,7 +288,7 @@ where
}
}

impl<T> ClientBase for Client<T>
impl<T> ClientBase for ClientState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand Down
32 changes: 16 additions & 16 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use rosidl_runtime_rs::Message;

pub use self::{builder::*, graph::*};
use crate::{
rcl_bindings::*, Client, ClientBase, Clock, Context, ContextHandle, GuardCondition,
ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile,
RclrsError, Service, ServiceBase, Subscription, SubscriptionBase, SubscriptionCallback,
TimeSource, ENTITY_LIFECYCLE_MUTEX,
rcl_bindings::*, Client, ClientBase, ClientState, Clock, Context, ContextHandle,
GuardCondition, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher,
PublisherState, QoSProfile, RclrsError, Service, ServiceBase, ServiceState, Subscription,
SubscriptionBase, SubscriptionCallback, SubscriptionState, TimeSource, ENTITY_LIFECYCLE_MUTEX,
};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
Expand Down Expand Up @@ -212,11 +212,11 @@ impl NodeState {
///
/// [1]: crate::Client
// TODO: make client's lifetime depend on node's lifetime
pub fn create_client<T>(&self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
pub fn create_client<T>(&self, topic: &str) -> Result<Client<T>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
let client = Arc::new(Client::<T>::new(Arc::clone(&self.handle), topic)?);
let client = Arc::new(ClientState::<T>::new(Arc::clone(&self.handle), topic)?);
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
Ok(client)
}
Expand Down Expand Up @@ -270,28 +270,28 @@ impl NodeState {
&self,
topic: &str,
qos: QoSProfile,
) -> Result<Arc<Publisher<T>>, RclrsError>
) -> Result<Publisher<T>, RclrsError>
where
T: Message,
{
let publisher = Arc::new(Publisher::<T>::new(Arc::clone(&self.handle), topic, qos)?);
let publisher = Arc::new(PublisherState::<T>::new(
Arc::clone(&self.handle),
topic,
qos,
)?);
Ok(publisher)
}

/// Creates a [`Service`][1].
///
/// [1]: crate::Service
// TODO: make service's lifetime depend on node's lifetime
pub fn create_service<T, F>(
&self,
topic: &str,
callback: F,
) -> Result<Arc<Service<T>>, RclrsError>
pub fn create_service<T, F>(&self, topic: &str, callback: F) -> Result<Service<T>, RclrsError>
where
T: rosidl_runtime_rs::Service,
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
let service = Arc::new(Service::<T>::new(
let service = Arc::new(ServiceState::<T>::new(
Arc::clone(&self.handle),
topic,
callback,
Expand All @@ -310,11 +310,11 @@ impl NodeState {
topic: &str,
qos: QoSProfile,
callback: impl SubscriptionCallback<T, Args>,
) -> Result<Arc<Subscription<T>>, RclrsError>
) -> Result<Subscription<T>, RclrsError>
where
T: Message,
{
let subscription = Arc::new(Subscription::<T>::new(
let subscription = Arc::new(SubscriptionState::<T>::new(
Arc::clone(&self.handle),
topic,
qos,
Expand Down
12 changes: 6 additions & 6 deletions rclrs/src/parameter/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ use crate::{
// What is used is the Weak that is stored in the node, and is upgraded when spinning.
pub struct ParameterService {
#[allow(dead_code)]
describe_parameters_service: Arc<Service<DescribeParameters>>,
describe_parameters_service: Service<DescribeParameters>,
#[allow(dead_code)]
get_parameter_types_service: Arc<Service<GetParameterTypes>>,
get_parameter_types_service: Service<GetParameterTypes>,
#[allow(dead_code)]
get_parameters_service: Arc<Service<GetParameters>>,
get_parameters_service: Service<GetParameters>,
#[allow(dead_code)]
list_parameters_service: Arc<Service<ListParameters>>,
list_parameters_service: Service<ListParameters>,
#[allow(dead_code)]
set_parameters_service: Arc<Service<SetParameters>>,
set_parameters_service: Service<SetParameters>,
#[allow(dead_code)]
set_parameters_atomically_service: Arc<Service<SetParametersAtomically>>,
set_parameters_atomically_service: Service<SetParametersAtomically>,
}

fn describe_parameters(
Expand Down
33 changes: 25 additions & 8 deletions rclrs/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,32 @@ impl Drop for PublisherHandle {

/// Struct for sending messages of type `T`.
///
/// Create a publisher using [`Node::create_publisher`][1].
///
/// Multiple publishers can be created for the same topic, in different nodes or the same node.
/// A clone of a `Publisher` will refer to the same publisher instance as the original.
/// The underlying instance is tied to [`PublisherState`] which implements the [`Publisher`] API.
///
/// The underlying RMW will decide on the concrete delivery mechanism (network stack, shared
/// memory, or intraprocess).
///
/// Sending messages does not require calling [`spin`][1] on the publisher's node.
/// Sending messages does not require calling [`spin`][2] on the publisher's node.
///
/// [1]: crate::NodeState::create_publisher
/// [2]: crate::spin
pub type Publisher<T> = Arc<PublisherState<T>>;

/// The inner state of a [`Publisher`].
///
/// This is public so that you can choose to create a [`Weak`][1] reference to it
/// if you want to be able to refer to a [`Publisher`] in a non-owning way. It is
/// generally recommended to manage the `PublisherState` inside of an [`Arc`],
/// and [`Publisher`] is provided as a convenience alias for that.
///
/// The public API of the [`Publisher`] type is implemented via `PublisherState`.
///
/// [1]: crate::spin
pub struct Publisher<T>
/// [1]: std::sync::Weak
pub struct PublisherState<T>
where
T: Message,
{
Expand All @@ -66,12 +83,12 @@ where

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl<T> Send for Publisher<T> where T: Message {}
unsafe impl<T> Send for PublisherState<T> where T: Message {}
// SAFETY: The type_support_ptr prevents the default Sync impl.
// rosidl_message_type_support_t is a read-only type without interior mutability.
unsafe impl<T> Sync for Publisher<T> where T: Message {}
unsafe impl<T> Sync for PublisherState<T> where T: Message {}

impl<T> Publisher<T>
impl<T> PublisherState<T>
where
T: Message,
{
Expand Down Expand Up @@ -179,7 +196,7 @@ where
}
}

impl<T> Publisher<T>
impl<T> PublisherState<T>
where
T: RmwMessage,
{
Expand Down Expand Up @@ -231,7 +248,7 @@ where
}
}

/// Convenience trait for [`Publisher::publish`].
/// Convenience trait for [`PublisherState::publish`].
pub trait MessageCow<'a, T: Message> {
/// Wrap the owned or borrowed message in a `Cow`.
fn into_cow(self) -> Cow<'a, T>;
Expand Down
6 changes: 3 additions & 3 deletions rclrs/src/publisher/loaned_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::ops::{Deref, DerefMut};

use rosidl_runtime_rs::RmwMessage;

use crate::{rcl_bindings::*, Publisher, RclrsError, ToResult};
use crate::{rcl_bindings::*, PublisherState, RclrsError, ToResult};

/// A message that is owned by the middleware, loaned for publishing.
///
/// It dereferences to a `&mut T`.
///
/// This type is returned by [`Publisher::borrow_loaned_message()`], see the documentation of
/// This type is returned by [`PublisherState::borrow_loaned_message()`], see the documentation of
/// that function for more information.
///
/// The loan is returned by dropping the message or [publishing it][1].
Expand All @@ -19,7 +19,7 @@ where
T: RmwMessage,
{
pub(super) msg_ptr: *mut T,
pub(super) publisher: &'a Publisher<T>,
pub(super) publisher: &'a PublisherState<T>,
}

impl<'a, T> Deref for LoanedMessage<'a, T>
Expand Down
33 changes: 26 additions & 7 deletions rclrs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,33 @@ pub trait ServiceBase: Send + Sync {
type ServiceCallback<Request, Response> =
Box<dyn Fn(&rmw_request_id_t, Request) -> Response + 'static + Send>;

/// Main class responsible for responding to requests sent by ROS clients.
/// Provide a service that can respond to requests sent by ROS service clients.
///
/// The only available way to instantiate services is via [`Node::create_service()`][1], this is to
/// ensure that [`Node`][2]s can track all the services that have been created.
/// Create a service using [`Node::create_service`][1].
///
/// ROS only supports having one service provider for any given fully-qualified
/// service name. "Fully-qualified" means the namespace is also taken into account
/// for uniqueness. A clone of a `Service` will refer to the same service provider
/// instance as the original. The underlying instance is tied to [`ServiceState`]
/// which implements the [`Service`] API.
///
/// Responding to requests requires the node's executor to [spin][2].
///
/// [1]: crate::NodeState::create_service
/// [2]: crate::Node
pub struct Service<T>
/// [2]: crate::spin
pub type Service<T> = Arc<ServiceState<T>>;

/// The inner state of a [`Service`].
///
/// This is public so that you can choose to create a [`Weak`][1] reference to it
/// if you want to be able to refer to a [`Service`] in a non-owning way. It is
/// generally recommended to manage the `ServiceState` inside of an [`Arc`],
/// and [`Service`] is provided as a convenience alias for that.
///
/// The public API of the [`Service`] type is implemented via `ServiceState`.
///
/// [1]: std::sync::Weak
pub struct ServiceState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand All @@ -75,7 +94,7 @@ where
pub callback: Mutex<ServiceCallback<T::Request, T::Response>>,
}

impl<T> Service<T>
impl<T> ServiceState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand Down Expand Up @@ -181,7 +200,7 @@ where
}
}

impl<T> ServiceBase for Service<T>
impl<T> ServiceBase for ServiceState<T>
where
T: rosidl_runtime_rs::Service,
{
Expand Down
39 changes: 25 additions & 14 deletions rclrs/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,32 @@ pub trait SubscriptionBase: Send + Sync {

/// Struct for receiving messages of type `T`.
///
/// Create a subscription using [`Node::create_subscription`][1].
///
/// There can be multiple subscriptions for the same topic, in different nodes or the same node.
/// A clone of a `Subscription` will refer to the same subscription instance as the original.
/// The underlying instance is tied to [`SubscriptionState`] which implements the [`Subscription`] API.
///
/// Receiving messages requires calling [`spin_once`][1] or [`spin`][2] on the subscription's node.
/// Receiving messages requires the node's executor to [spin][2].
///
/// When a subscription is created, it may take some time to get "matched" with a corresponding
/// publisher.
///
/// The only available way to instantiate subscriptions is via [`Node::create_subscription()`][3], this
/// is to ensure that [`Node`][4]s can track all the subscriptions that have been created.
///
/// [1]: crate::spin_once
/// [1]: crate::NodeState::create_subscription
/// [2]: crate::spin
/// [3]: crate::NodeState::create_subscription
/// [4]: crate::Node
pub struct Subscription<T>
pub type Subscription<T> = Arc<SubscriptionState<T>>;

/// The inner state of a [`Subscription`].
///
/// This is public so that you can choose to create a [`Weak`][1] reference to it
/// if you want to be able to refer to a [`Subscription`] in a non-owning way. It is
/// generally recommended to manage the `SubscriptionState` inside of an [`Arc`],
/// and [`Subscription`] is provided as a convenience alias for that.
///
/// The public API of the [`Subscription`] type is implemented via `SubscriptionState`.
///
/// [1]: std::sync::Weak
pub struct SubscriptionState<T>
where
T: Message,
{
Expand All @@ -88,7 +99,7 @@ where
message: PhantomData<T>,
}

impl<T> Subscription<T>
impl<T> SubscriptionState<T>
where
T: Message,
{
Expand Down Expand Up @@ -233,11 +244,11 @@ where
/// When there is no new message, this will return a
/// [`SubscriptionTakeFailed`][1].
///
/// This is the counterpart to [`Publisher::borrow_loaned_message()`][2]. See its documentation
/// This is the counterpart to [`PublisherState::borrow_loaned_message()`][2]. See its documentation
/// for more information.
///
/// [1]: crate::RclrsError
/// [2]: crate::Publisher::borrow_loaned_message
/// [2]: crate::PublisherState::borrow_loaned_message
pub fn take_loaned(&self) -> Result<(ReadOnlyLoanedMessage<'_, T>, MessageInfo), RclrsError> {
let mut msg_ptr = std::ptr::null_mut();
let mut message_info = unsafe { rmw_get_zero_initialized_message_info() };
Expand All @@ -263,7 +274,7 @@ where
}
}

impl<T> SubscriptionBase for Subscription<T>
impl<T> SubscriptionBase for SubscriptionState<T>
where
T: Message,
{
Expand Down Expand Up @@ -326,8 +337,8 @@ mod tests {

#[test]
fn traits() {
assert_send::<Subscription<msg::BoundedSequences>>();
assert_sync::<Subscription<msg::BoundedSequences>>();
assert_send::<SubscriptionState<msg::BoundedSequences>>();
assert_sync::<SubscriptionState<msg::BoundedSequences>>();
}

#[test]
Expand Down
Loading

0 comments on commit bbb5333

Please sign in to comment.