Skip to content

Commit

Permalink
Merge options pattern PR
Browse files Browse the repository at this point in the history
Signed-off-by: Michael X. Grey <greyxmike@gmail.com>
  • Loading branch information
mxgrey committed Dec 9, 2024
2 parents f1c4716 + 98b8ea2 commit 2a7ff60
Show file tree
Hide file tree
Showing 38 changed files with 1,173 additions and 808 deletions.
14 changes: 4 additions & 10 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,19 @@ fn demonstrate_pubsub() -> Result<(), Error> {
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();
let node = executor.create_node("message_demo")?;

let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
)?;
let direct_publisher = node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
)?;
let idiomatic_publisher =
node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>("topic")?;
let direct_publisher =
node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>("topic")?;

let _idiomatic_subscription = node
.create_subscription::<rclrs_example_msgs::msg::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::VariousTypes| println!("Got idiomatic message!"),
)?;
let _direct_subscription = node
.create_subscription::<rclrs_example_msgs::msg::rmw::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::rmw::VariousTypes| {
println!("Got RMW-native message!")
},
Expand Down
3 changes: 1 addition & 2 deletions examples/minimal_pub_sub/src/minimal_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ fn main() -> Result<(), Error> {

let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::String>("topic")?;

let mut message = std_msgs::msg::String::default();

Expand Down
1 change: 0 additions & 1 deletion examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ fn main() -> Result<(), Error> {

let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
num_messages += 1;
println!("I heard: '{}'", msg.data);
Expand Down
4 changes: 1 addition & 3 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ impl MinimalSubscriber {
.node
.create_subscription::<std_msgs::msg::String, _>(
topic,
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
minimal_subscriber_aux.callback(msg);
},
Expand Down Expand Up @@ -58,8 +57,7 @@ fn main() -> Result<(), Error> {
let _subscriber_node_two =
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;

let publisher = publisher_node
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = publisher_node.create_publisher::<std_msgs::msg::String>("topic")?;

std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
let mut message = std_msgs::msg::String::default();
Expand Down
3 changes: 1 addition & 2 deletions examples/minimal_pub_sub/src/zero_copy_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ fn main() -> Result<(), Error> {

let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic")?;

let mut publish_count: u32 = 1;

Expand Down
1 change: 0 additions & 1 deletion examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ fn main() -> Result<(), Error> {

let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
num_messages += 1;
println!("I heard: '{}'", msg.data);
Expand Down
6 changes: 3 additions & 3 deletions examples/rust_pubsub/src/simple_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use rclrs::{Context, Executor, Publisher, RclrsError, SpinOptions, QOS_PROFILE_D
use std::{sync::Arc, thread, time::Duration};
use std_msgs::msg::String as StringMsg;

struct SimplePublisher {
struct SimplePublisherNode {
publisher: Arc<Publisher<StringMsg>>,
}

impl SimplePublisher {
impl SimplePublisherNode {
fn new(executor: &Executor) -> Result<Self, RclrsError> {
let node = executor.create_node("simple_publisher").unwrap();
let publisher = node
.create_publisher("publish_hello", QOS_PROFILE_DEFAULT)
.create_publisher("publish_hello")
.unwrap();
Ok(Self { publisher })
}
Expand Down
1 change: 0 additions & 1 deletion examples/rust_pubsub/src/simple_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ impl SimpleSubscriptionNode {
let _subscriber = node
.create_subscription::<StringMsg, _>(
"publish_hello",
QOS_PROFILE_DEFAULT,
move |msg: StringMsg| {
*data_mut.lock().unwrap() = Some(msg);
},
Expand Down
92 changes: 64 additions & 28 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::{
collections::HashMap,
ffi::CString,
sync::{Arc, Mutex, MutexGuard},
collections::HashMap,
};

use rosidl_runtime_rs::Message;

use crate::{
error::ToResult,
rcl_bindings::*,
MessageCow, Node, RclrsError, RclReturnCode, Promise, ENTITY_LIFECYCLE_MUTEX,
RclPrimitive, QoSProfile, Waitable, WaitableLifecycle,
RclPrimitiveHandle, RclPrimitiveKind, ServiceInfo,
error::ToResult, rcl_bindings::*, IntoPrimitiveOptions, MessageCow, Node, Promise, QoSProfile,
RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclReturnCode, RclrsError, ServiceInfo,
Waitable, WaitableLifecycle, ENTITY_LIFECYCLE_MUTEX,
};

mod client_async_callback;
Expand Down Expand Up @@ -59,10 +57,7 @@ where
///
/// [1]: crate::RequestId
/// [2]: crate::ServiceInfo
pub fn call<'a, Req, Out>(
&self,
request: Req,
) -> Result<Promise<Out>, RclrsError>
pub fn call<'a, Req, Out>(&self, request: Req) -> Result<Promise<Out>, RclrsError>
where
Req: MessageCow<'a, T::Request>,
Out: ClientOutput<T::Response>,
Expand All @@ -82,7 +77,10 @@ where
.ok()?;

// TODO(@mxgrey): Log errors here when logging becomes available.
self.board.lock().unwrap().new_request(sequence_number, sender);
self.board
.lock()
.unwrap()
.new_request(sequence_number, sender);

Ok(promise)
}
Expand All @@ -103,8 +101,8 @@ where
where
Req: MessageCow<'a, T::Request>,
{
let callback = move |response, info| {
async { callback.run_client_callback(response, info); }
let callback = move |response, info| async {
callback.run_client_callback(response, info);
};
self.call_then_async(request, callback)
}
Expand Down Expand Up @@ -167,29 +165,30 @@ where
let client = Arc::clone(self);
self.handle.node.notify_on_graph_change(
// TODO(@mxgrey): Log any errors here once logging is available
move || client.service_is_ready().is_ok_and(|r| r)
move || client.service_is_ready().is_ok_and(|r| r),
)
}

/// Creates a new client.
pub(crate) fn create(
topic: &str,
qos: QoSProfile,
pub(crate) fn create<'a>(
node: &Arc<Node>,
options: impl Into<ClientOptions<'a>>,
) -> Result<Arc<Self>, RclrsError>
// This uses pub(crate) visibility to avoid instantiating this struct outside
// [`Node::create_client`], see the struct's documentation for the rationale
where
T: rosidl_runtime_rs::Service,
{
let ClientOptions { service_name, qos } = options.into();
// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_client = unsafe { rcl_get_zero_initialized_client() };
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
as *const rosidl_service_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;
let topic_c_string =
CString::new(service_name).map_err(|err| RclrsError::StringContainsNul {
err,
s: service_name.into(),
})?;

// SAFETY: No preconditions for this function.
let mut client_options = unsafe { rcl_client_get_default_options() };
Expand Down Expand Up @@ -243,12 +242,44 @@ where
}
}

/// `ClientOptions` are used by [`Node::create_client`][1] to initialize a
/// [`Client`] for a service.
///
/// [1]: crate::Node::create_client
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ClientOptions<'a> {
/// The name of the service that this client will send requests to
pub service_name: &'a str,
/// The quality of the service profile for this client
pub qos: QoSProfile,
}

impl<'a> ClientOptions<'a> {
/// Initialize a new [`ClientOptions`] with default settings.
pub fn new(service_name: &'a str) -> Self {
Self {
service_name,
qos: QoSProfile::services_default(),
}
}
}

impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for ClientOptions<'a> {
fn from(value: T) -> Self {
let primitive = value.into_primitive_options();
let mut options = Self::new(primitive.name);
primitive.apply(&mut options.qos);
options
}
}

struct ClientExecutable<T>
where
T: rosidl_runtime_rs::Service,
{
handle: Arc<ClientHandle>,
board: Arc<Mutex<ClientRequestBoard<T>>>
board: Arc<Mutex<ClientRequestBoard<T>>>,
}

impl<T> RclPrimitive for ClientExecutable<T>
Expand Down Expand Up @@ -326,7 +357,10 @@ where
}
Err(err) => {
match err {
RclrsError::RclError { code: RclReturnCode::ClientTakeFailed, .. } => {
RclrsError::RclError {
code: RclReturnCode::ClientTakeFailed,
..
} => {
// This is okay, it means a spurious wakeup happened
}
err => {
Expand Down Expand Up @@ -355,10 +389,12 @@ where
)
}
.ok()
.map(|_| (
T::Response::from_rmw_message(response_out),
service_info_out,
))
.map(|_| {
(
T::Response::from_rmw_message(response_out),
service_info_out,
)
})
}
}

Expand Down Expand Up @@ -415,7 +451,7 @@ mod tests {
let graph = construct_test_graph(namespace)?;
let _node_2_empty_client = graph
.node2
.create_client::<srv::Empty>("graph_test_topic_4", QoSProfile::services_default())?;
.create_client::<srv::Empty>("graph_test_topic_4")?;

std::thread::sleep(std::time::Duration::from_millis(200));

Expand Down
2 changes: 1 addition & 1 deletion rclrs/src/client/client_async_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rosidl_runtime_rs::Service;

use std::future::Future;

use crate::{ServiceInfo, RequestId};
use crate::{RequestId, ServiceInfo};

/// A trait to deduce async callbacks of service clients.
///
Expand Down
2 changes: 1 addition & 1 deletion rclrs/src/client/client_callback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rosidl_runtime_rs::Service;

use crate::{ServiceInfo, RequestId};
use crate::{RequestId, ServiceInfo};

/// A trait to deduce regular callbacks of service clients.
///
Expand Down
18 changes: 4 additions & 14 deletions rclrs/src/client/client_output.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use rosidl_runtime_rs::Message;

use futures::channel::oneshot::{Sender, channel};
use futures::channel::oneshot::{channel, Sender};

use crate::{
rcl_bindings::rmw_service_info_t,
RequestId, ServiceInfo, Promise,
};
use crate::{rcl_bindings::rmw_service_info_t, Promise, RequestId, ServiceInfo};

/// This trait allows us to deduce how much information a user wants to receive
/// from a client call. A user can choose to receive only the response from the
Expand Down Expand Up @@ -50,11 +47,7 @@ pub enum AnyClientOutputSender<Response> {
}

impl<Response: Message> AnyClientOutputSender<Response> {
pub(super) fn send_response(
self,
response: Response,
service_info: rmw_service_info_t,
) {
pub(super) fn send_response(self, response: Response, service_info: rmw_service_info_t) {
match self {
Self::ResponseOnly(sender) => {
let _ = sender.send(response);
Expand All @@ -66,10 +59,7 @@ impl<Response: Message> AnyClientOutputSender<Response> {
));
}
Self::WithServiceInfo(sender) => {
let _ = sender.send((
response,
ServiceInfo::from_rmw_service_info(&service_info),
));
let _ = sender.send((response, ServiceInfo::from_rmw_service_info(&service_info)));
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions rclrs/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::{
vec::Vec,
};

use crate::
{rcl_bindings::*, Executor, ExecutorRuntime, LoggingLifecycle, RclrsError, ToResult,
BasicExecutorRuntime,
use crate::{
rcl_bindings::*, BasicExecutorRuntime, Executor, ExecutorRuntime, LoggingLifecycle, RclrsError,
ToResult,
};

/// This is locked whenever initializing or dropping any middleware entity
Expand Down
Loading

0 comments on commit 2a7ff60

Please sign in to comment.