Skip to content

Commit

Permalink
Big cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Michael X. Grey <grey@openrobotics.org>
  • Loading branch information
mxgrey committed Oct 14, 2024
1 parent d395f5f commit 5c59592
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 169 deletions.
14 changes: 7 additions & 7 deletions rclrs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{
error::ToResult,
rcl_bindings::*,
MessageCow, Node, RclrsError, RclReturnCode, Promise, ENTITY_LIFECYCLE_MUTEX,
Executable, QoSProfile, Waitable, WaitableLifecycle,
ExecutableHandle, ExecutableKind, ServiceInfo,
RclPrimitive, QoSProfile, Waitable, WaitableLifecycle,
RclPrimitiveHandle, RclPrimitiveKind, ServiceInfo,
};

mod client_async_callback;
Expand Down Expand Up @@ -251,20 +251,20 @@ where
board: Arc<Mutex<ClientRequestBoard<T>>>
}

impl<T> Executable for ClientExecutable<T>
impl<T> RclPrimitive for ClientExecutable<T>
where
T: rosidl_runtime_rs::Service,
{
fn execute(&mut self) -> Result<(), RclrsError> {
self.board.lock().unwrap().execute(&self.handle)
}

fn handle(&self) -> ExecutableHandle {
ExecutableHandle::Client(self.handle.lock())
fn handle(&self) -> RclPrimitiveHandle {
RclPrimitiveHandle::Client(self.handle.lock())
}

fn kind(&self) -> ExecutableKind {
ExecutableKind::Client
fn kind(&self) -> RclPrimitiveKind {
RclPrimitiveKind::Client
}
}

Expand Down
3 changes: 3 additions & 0 deletions rclrs/src/client/client_async_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ pub trait ClientAsyncCallback<T, Args>: Send + 'static
where
T: Service,
{
/// This represents the type of task (Future) that will be produced by the callback
type Task: Future<Output = ()> + Send;

/// Trigger the callback to run
fn run_client_async_callback(self, response: T::Response, info: ServiceInfo) -> Self::Task;
}

Expand Down
1 change: 1 addition & 0 deletions rclrs/src/client/client_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub trait ClientCallback<T, Args>: Send + 'static
where
T: Service,
{
/// Trigger the callback to run
fn run_client_callback(self, response: T::Response, info: ServiceInfo);
}

Expand Down
17 changes: 11 additions & 6 deletions rclrs/src/client/client_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use crate::{
///
/// Users never need to use this trait directly.
pub trait ClientOutput<Response>: Sized {
/// Create the appropriate type of channel to send the information that the
/// user asked for.
fn create_channel() -> (AnyClientOutputSender<Response>, Promise<Self>);
}

impl<Response: Message> ClientOutput<Response> for Response {
fn create_channel() -> (AnyClientOutputSender<Response>, Promise<Self>) {
let (sender, receiver) = channel();
(AnyClientOutputSender::RequestOnly(sender), receiver)
(AnyClientOutputSender::ResponseOnly(sender), receiver)
}
}

Expand All @@ -39,8 +41,11 @@ impl<Response: Message> ClientOutput<Response> for (Response, ServiceInfo) {

/// Can send any kind of response for a client call.
pub enum AnyClientOutputSender<Response> {
RequestOnly(Sender<Response>),
/// The user only asked for the response.
ResponseOnly(Sender<Response>),
/// The user also asked for the RequestId
WithId(Sender<(Response, RequestId)>),
/// The user also asked for the ServiceInfo
WithServiceInfo(Sender<(Response, ServiceInfo)>),
}

Expand All @@ -51,17 +56,17 @@ impl<Response: Message> AnyClientOutputSender<Response> {
service_info: rmw_service_info_t,
) {
match self {
Self::RequestOnly(sender) => {
sender.send(response);
Self::ResponseOnly(sender) => {
let _ = sender.send(response);
}
Self::WithId(sender) => {
sender.send((
let _ = sender.send((
response,
RequestId::from_rmw_request_id(&service_info.request_id),
));
}
Self::WithServiceInfo(sender) => {
sender.send((
let _ = sender.send((
response,
ServiceInfo::from_rmw_service_info(&service_info),
));
Expand Down
4 changes: 1 addition & 3 deletions rclrs/src/executor/basic_executor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use futures::{
future::{BoxFuture, FutureExt},
future::BoxFuture,
task::{waker_ref, ArcWake},
channel::{oneshot, mpsc::UnboundedSender},
};
use std::{
future::Future,
sync::{
mpsc::{Sender, Receiver, channel},
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
task::Context as TaskContext,
time::Duration,
};

use crate::{
Expand Down
13 changes: 7 additions & 6 deletions rclrs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
error::ToResult,
rcl_bindings::*,
NodeHandle, RclrsError, Waitable, WaitableLifecycle, QoSProfile,
Executable, ExecutableKind, ExecutableHandle, ENTITY_LIFECYCLE_MUTEX, ExecutorCommands,
RclPrimitive, RclPrimitiveKind, RclPrimitiveHandle, ENTITY_LIFECYCLE_MUTEX, ExecutorCommands,
};

mod any_service_callback;
Expand Down Expand Up @@ -44,6 +44,7 @@ where
callback: Arc<Mutex<AnyServiceCallback<T>>>,
/// Holding onto this keeps the waiter for this service alive in the wait
/// set of the executor.
#[allow(unused)]
lifecycle: WaitableLifecycle,
}

Expand Down Expand Up @@ -162,7 +163,7 @@ struct ServiceExecutable<T: rosidl_runtime_rs::Service> {
commands: Arc<ExecutorCommands>,
}

impl<T> Executable for ServiceExecutable<T>
impl<T> RclPrimitive for ServiceExecutable<T>
where
T: rosidl_runtime_rs::Service,
{
Expand All @@ -174,12 +175,12 @@ where
Ok(())
}

fn kind(&self) -> crate::ExecutableKind {
ExecutableKind::Service
fn kind(&self) -> crate::RclPrimitiveKind {
RclPrimitiveKind::Service
}

fn handle(&self) -> ExecutableHandle {
ExecutableHandle::Service(self.handle.lock())
fn handle(&self) -> RclPrimitiveHandle {
RclPrimitiveHandle::Service(self.handle.lock())
}
}

Expand Down
15 changes: 8 additions & 7 deletions rclrs/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
error::ToResult,
qos::QoSProfile,
rcl_bindings::*,
ExecutorCommands, NodeHandle, RclrsError, Waitable, Executable, ExecutableHandle,
ExecutableKind, WaitableLifecycle, ENTITY_LIFECYCLE_MUTEX,
ExecutorCommands, NodeHandle, RclrsError, Waitable, RclPrimitive, RclPrimitiveHandle,
RclPrimitiveKind, WaitableLifecycle, ENTITY_LIFECYCLE_MUTEX,
};

mod any_subscription_callback;
Expand Down Expand Up @@ -57,6 +57,7 @@ where
callback: Arc<Mutex<AnySubscriptionCallback<T>>>,
/// Holding onto this keeps the waiter for this subscription alive in the
/// wait set of the executor.
#[allow(unused)]
lifecycle: WaitableLifecycle,
}

Expand Down Expand Up @@ -172,20 +173,20 @@ struct SubscriptionExecutable<T: Message> {
commands: Arc<ExecutorCommands>,
}

impl<T> Executable for SubscriptionExecutable<T>
impl<T> RclPrimitive for SubscriptionExecutable<T>
where
T: Message,
{
fn execute(&mut self) -> Result<(), RclrsError> {
self.callback.lock().unwrap().execute(&self.handle, &self.commands)
}

fn kind(&self) -> crate::ExecutableKind {
ExecutableKind::Subscription
fn kind(&self) -> crate::RclPrimitiveKind {
RclPrimitiveKind::Subscription
}

fn handle(&self) -> ExecutableHandle {
ExecutableHandle::Subscription(self.handle.lock())
fn handle(&self) -> RclPrimitiveHandle {
RclPrimitiveHandle::Subscription(self.handle.lock())
}
}

Expand Down
12 changes: 6 additions & 6 deletions rclrs/src/subscription/any_subscription_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,27 @@ impl<T: Message> AnySubscriptionCallback<T> {
match self {
AnySubscriptionCallback::Regular(cb) => {
let (msg, _) = Self::take(handle)?;
commands.run(cb(msg));
let _ = commands.run(cb(msg));
}
AnySubscriptionCallback::RegularWithMessageInfo(cb) => {
let (msg, msg_info) = Self::take(handle)?;
commands.run(cb(msg, msg_info));
let _ = commands.run(cb(msg, msg_info));
}
AnySubscriptionCallback::Boxed(cb) => {
let (msg, _) = Self::take_boxed(handle)?;
commands.run(cb(msg));
let _ = commands.run(cb(msg));
}
AnySubscriptionCallback::BoxedWithMessageInfo(cb) => {
let (msg, msg_info) = Self::take_boxed(handle)?;
commands.run(cb(msg, msg_info));
let _ = commands.run(cb(msg, msg_info));
}
AnySubscriptionCallback::Loaned(cb) => {
let (msg, _) = Self::take_loaned(handle)?;
commands.run(cb(msg));
let _ = commands.run(cb(msg));
}
AnySubscriptionCallback::LoanedWithMessageInfo(cb) => {
let (msg, msg_info) = Self::take_loaned(handle)?;
commands.run(cb(msg, msg_info));
let _ = commands.run(cb(msg, msg_info));
}
}
Ok(())
Expand Down
11 changes: 7 additions & 4 deletions rclrs/src/wait_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use crate::{
mod guard_condition;
pub use guard_condition::*;

mod rcl_primitive;
pub use rcl_primitive::*;

mod waitable;
pub use waitable::*;

Expand All @@ -34,7 +37,7 @@ pub use wait_set_runner::*;

/// A struct for waiting on subscriptions and other waitable entities to become ready.
pub struct WaitSet {
primitives: HashMap<ExecutableKind, Vec<Waitable>>,
primitives: HashMap<RclPrimitiveKind, Vec<Waitable>>,
handle: WaitSetHandle,
}

Expand Down Expand Up @@ -70,7 +73,7 @@ impl WaitSet {
if entity.in_wait_set() {
return Err(RclrsError::AlreadyAddedToWaitSet);
}
let kind = entity.executable.kind();
let kind = entity.primitive.kind();
self.primitives.entry(kind).or_default().push(entity);
}
self.resize_rcl_containers()?;
Expand Down Expand Up @@ -116,7 +119,7 @@ impl WaitSet {
pub fn wait(
&mut self,
timeout: Option<Duration>,
mut f: impl FnMut(&mut dyn Executable) -> Result<(), RclrsError>,
mut f: impl FnMut(&mut dyn RclPrimitive) -> Result<(), RclrsError>,
) -> Result<(), RclrsError> {
let timeout_ns = match timeout.map(|d| d.as_nanos()) {
None => -1,
Expand Down Expand Up @@ -153,7 +156,7 @@ impl WaitSet {
// the callback for those that were.
for waiter in self.primitives.values_mut().flat_map(|v| v) {
if waiter.is_ready(&self.handle.rcl_wait_set) {
f(&mut *waiter.executable)?;
f(&mut *waiter.primitive)?;
}
}

Expand Down
27 changes: 20 additions & 7 deletions rclrs/src/wait_set/guard_condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{

use crate::{
rcl_bindings::*,
ContextHandle, RclrsError, ToResult, WaitableLifecycle, Executable,
Waitable, ExecutableKind, ExecutableHandle,
ContextHandle, RclrsError, ToResult, WaitableLifecycle, RclPrimitive,
Waitable, RclPrimitiveKind, RclPrimitiveHandle,
};

/// A waitable entity used for waking up a wait set manually.
Expand Down Expand Up @@ -135,28 +135,41 @@ struct GuardConditionHandle {
/// from rcl and either have static lifetimes or lifetimes that depend on
/// something else.
pub enum InnerGuardConditionHandle {
/// This variant means the guard condition was created and owned by rclrs.
/// Its memory is managed by us.
Owned(rcl_guard_condition_t),
/// This variant means the guard condition was created and owned by rcl.
/// The owner object represents something that the lifecycle of the guard
/// condition depends on, such as the rcl_node that created it.
Unowned {
/// This is the unowned guard condition pointer. We must not deallocate
/// it.
handle: *const rcl_guard_condition_t,
/// This somehow holds a shared reference to the owner of the guard
/// condition. We need to hold onto this to ensure the guard condition
/// remains valid.
owner: Box<dyn Any>,
},
}

impl InnerGuardConditionHandle {
/// Get the handle if it is owned by rclrs
pub fn owned(&self) -> Option<&rcl_guard_condition_t> {
match self {
Self::Owned(handle) => Some(handle),
_ => None,
}
}

/// Get the handle if it is owned by rclrs
pub fn owned_mut(&mut self) -> Option<&mut rcl_guard_condition_t> {
match self {
Self::Owned(handle) => Some(handle),
_ => None,
}
}

/// Apply a function to the handle
pub fn use_handle<Out>(&self, f: impl FnOnce(&rcl_guard_condition_t) -> Out) -> Out {
match self {
Self::Owned(handle) => f(handle),
Expand Down Expand Up @@ -191,20 +204,20 @@ struct GuardConditionExecutable {
callback: Option<Box<dyn FnMut() + Send + Sync>>,
}

impl Executable for GuardConditionExecutable {
impl RclPrimitive for GuardConditionExecutable {
fn execute(&mut self) -> Result<(), RclrsError> {
if let Some(callback) = &mut self.callback {
callback();
}
Ok(())
}

fn kind(&self) -> ExecutableKind {
ExecutableKind::GuardCondition
fn kind(&self) -> RclPrimitiveKind {
RclPrimitiveKind::GuardCondition
}

fn handle(&self) -> ExecutableHandle {
ExecutableHandle::GuardCondition(
fn handle(&self) -> RclPrimitiveHandle {
RclPrimitiveHandle::GuardCondition(
self.handle.rcl_guard_condition.lock().unwrap()
)
}
Expand Down
Loading

0 comments on commit 5c59592

Please sign in to comment.