Skip to content

Commit a9e817c

Browse files
authored
Add Arc and Mutex (#51)
* Initial refactor to make multithreadable Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Formatter pass Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]>
1 parent 231a3f1 commit a9e817c

File tree

13 files changed

+145
-139
lines changed

13 files changed

+145
-139
lines changed

rclrs/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ authors = ["Esteve Fernandez <[email protected]>"]
55
edition = "2018"
66

77
[dependencies]
8+
anyhow = {version = "1", features = ["backtrace"]}
89
libc = "0.2.43"
10+
parking_lot = "0.11"
911
thiserror = "1"
10-
anyhow = {version = "1", features = ["backtrace"]}
1112

1213
[build-dependencies]
1314
bindgen = "0.59.1"

rclrs/build.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ fn main() {
88
.header("src/rcl_wrapper.h")
99
.derive_copy(false)
1010
.size_t_is_usize(true)
11-
.default_enum_style(bindgen::EnumVariation::Rust{non_exhaustive: false});
11+
.default_enum_style(bindgen::EnumVariation::Rust {
12+
non_exhaustive: false,
13+
});
1214

1315
let ament_prefix_var_name = "AMENT_PREFIX_PATH";
1416
let ament_prefix_var = env::var(ament_prefix_var_name);

rclrs/src/context.rs

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,49 @@
1-
use crate::error::{RclError, ToResult};
2-
use crate::{Handle, Node};
1+
use crate::error::ToResult;
32
use crate::rcl_bindings::*;
4-
use std::cell::{Ref, RefCell, RefMut};
3+
use crate::Node;
4+
use parking_lot::{Mutex, MutexGuard};
5+
use rclrs_common::error::RclError;
56
use std::env;
67
use std::ffi::CString;
78
use std::os::raw::c_char;
8-
use std::rc::Rc;
9-
use anyhow::Result;
9+
use std::sync::Arc;
1010

11-
pub struct ContextHandle(RefCell<rcl_context_t>);
11+
pub struct ContextHandle(Mutex<rcl_context_t>);
1212

13-
impl<'a> Handle<rcl_context_t> for &'a ContextHandle {
14-
type DerefT = Ref<'a, rcl_context_t>;
15-
type DerefMutT = RefMut<'a, rcl_context_t>;
13+
impl ContextHandle {
14+
pub fn get_mut(&mut self) -> &mut rcl_context_t {
15+
self.0.get_mut()
16+
}
1617

17-
fn get(self) -> Self::DerefT {
18-
self.0.borrow()
18+
pub fn lock(&self) -> MutexGuard<rcl_context_t> {
19+
self.0.lock()
1920
}
2021

21-
fn get_mut(self) -> Self::DerefMutT {
22-
self.0.borrow_mut()
22+
pub fn try_lock(&self) -> Option<MutexGuard<rcl_context_t>> {
23+
self.0.try_lock()
2324
}
2425
}
2526

2627
impl Drop for ContextHandle {
2728
fn drop(&mut self) {
28-
let handle = &mut *self.get_mut();
2929
unsafe {
30-
rcl_shutdown(handle as *mut _);
30+
rcl_shutdown(&mut *self.get_mut() as *mut _);
3131
}
3232
}
3333
}
3434

3535
pub struct Context {
36-
pub handle: Rc<ContextHandle>,
36+
pub handle: Arc<ContextHandle>,
3737
}
3838

3939
impl Context {
40-
fn init(&mut self) -> Result<(), RclError> {
40+
fn init(&self) -> Result<(), RclError> {
4141
let args: Vec<CString> = env::args()
4242
.filter_map(|arg| CString::new(arg).ok())
4343
.collect();
4444

4545
let c_args: Vec<*const c_char> = args.iter().map(|arg| arg.as_ptr()).collect();
46-
let handle = &mut *self.handle.get_mut();
46+
let handle = &mut *self.handle.lock();
4747

4848
unsafe {
4949
let allocator = rcutils_get_default_allocator();
@@ -62,9 +62,9 @@ impl Context {
6262
Ok(())
6363
}
6464

65-
pub fn ok(&self) -> bool {
66-
let handle = &mut *self.handle.get_mut();
67-
unsafe { rcl_context_is_valid(handle as *mut _) }
65+
pub fn ok(&self) -> Result<bool, RclError> {
66+
let handle = &mut *self.handle.lock();
67+
unsafe { Ok(rcl_context_is_valid(handle as *mut _)) }
6868
}
6969

7070
pub fn create_node(&self, node_name: &str) -> Result<Node, RclError> {
@@ -74,8 +74,8 @@ impl Context {
7474

7575
impl Default for Context {
7676
fn default() -> Self {
77-
let mut context = Self {
78-
handle: Rc::new(ContextHandle(RefCell::new(unsafe {
77+
let context = Self {
78+
handle: Arc::new(ContextHandle(Mutex::new(unsafe {
7979
rcl_get_zero_initialized_context()
8080
}))),
8181
};

rclrs/src/error.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
use anyhow::Result;
2-
31
use crate::rcl_bindings::*;
4-
pub use rclrs_common::error::{RclError, to_rcl_result};
2+
pub use rclrs_common::error::{to_rcl_result, RclError};
53

64
pub(crate) trait ToResult {
75
fn ok(&self) -> Result<(), RclError>;

rclrs/src/lib.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ pub use self::node::*;
1212
pub use self::qos::*;
1313

1414
use self::rcl_bindings::*;
15-
use std::ops::{Deref, DerefMut};
16-
use anyhow::Result;
1715
use rclrs_common::error::WaitSetError;
16+
use std::ops::{Deref, DerefMut};
1817
use wait::WaitSet;
1918

2019
pub trait Handle<T> {
@@ -26,21 +25,21 @@ pub trait Handle<T> {
2625
}
2726

2827
/// Wrapper around [`spin_once`]
29-
pub fn spin(node: &Node) -> Result<(), RclError> {
30-
while unsafe { rcl_context_is_valid(&mut *node.context.get_mut() as *mut _) } {
28+
pub fn spin<'node>(node: &'node node::Node) -> Result<(), WaitSetError> {
29+
while unsafe { rcl_context_is_valid(&mut *node.context.lock() as *mut _) } {
3130
if let Some(error) = spin_once(node, 500).err() {
3231
match error {
33-
WaitSetError::DroppedSubscription => continue,
34-
WaitSetError::RclError(RclError::Timeout) => continue,
35-
WaitSetError::RclError(rclerr) => return Err(rclerr),
36-
}
32+
WaitSetError::DroppedSubscription | WaitSetError::RclError(RclError::Timeout) => {
33+
continue
34+
}
35+
error => return Err(error),
36+
};
3737
}
3838
}
3939

4040
Ok(())
4141
}
4242

43-
4443
/// Main function for waiting.
4544
///
4645
/// Following is a schematic representation of the interation of [`spin_once`] with ROS RCL FFI
@@ -78,15 +77,15 @@ pub fn spin(node: &Node) -> Result<(), RclError> {
7877
/// +--------------------+
7978
///
8079
///
81-
pub fn spin_once(node: &Node, timeout: i64) -> Result<(), WaitSetError> {
80+
pub fn spin_once<'node>(node: &'node Node, timeout: i64) -> Result<(), WaitSetError> {
8281
let number_of_subscriptions = node.subscriptions.len();
8382
let number_of_guard_conditions = 0;
8483
let number_of_timers = 0;
8584
let number_of_clients = 0;
8685
let number_of_services = 0;
8786
let number_of_events = 0;
8887

89-
let context = &mut *node.context.get_mut();
88+
let context = &mut *node.context.lock();
9089

9190
let mut wait_set = WaitSet::new(
9291
number_of_subscriptions,
@@ -95,13 +94,14 @@ pub fn spin_once(node: &Node, timeout: i64) -> Result<(), WaitSetError> {
9594
number_of_clients,
9695
number_of_services,
9796
number_of_events,
98-
context)?;
97+
context,
98+
)?;
9999

100100
for subscription in &node.subscriptions {
101101
match wait_set.add_subscription(subscription) {
102102
Ok(()) => (),
103103
Err(WaitSetError::DroppedSubscription) => (),
104-
err => return err,
104+
Err(err) => return Err(err),
105105
};
106106
}
107107

rclrs/src/node/mod.rs

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,54 @@
1-
use crate::{RclError, ToResult};
21
use crate::qos::QoSProfile;
32
use crate::rcl_bindings::*;
4-
use crate::{Context, ContextHandle, Handle};
5-
use std::cell::{Ref, RefCell, RefMut};
3+
use crate::ToResult;
4+
use crate::{Context, ContextHandle};
5+
use rclrs_common::error::RclError;
66
use std::ffi::CString;
7-
use std::rc::{Rc, Weak};
8-
use anyhow::Result;
7+
use std::sync::{Arc, Weak};
8+
9+
use parking_lot::{Mutex, MutexGuard};
910

1011
pub mod publisher;
1112
pub use self::publisher::*;
1213
pub mod subscription;
1314
pub use self::subscription::*;
1415

15-
pub struct NodeHandle(RefCell<rcl_node_t>);
16+
pub struct NodeHandle(Mutex<rcl_node_t>);
1617

17-
impl<'a> Handle<rcl_node_t> for &'a NodeHandle {
18-
type DerefT = Ref<'a, rcl_node_t>;
19-
type DerefMutT = RefMut<'a, rcl_node_t>;
18+
impl NodeHandle {
19+
pub fn get_mut(&mut self) -> &mut rcl_node_t {
20+
self.0.get_mut()
21+
}
2022

21-
fn get(self) -> Self::DerefT {
22-
self.0.borrow()
23+
pub fn lock(&self) -> MutexGuard<rcl_node_t> {
24+
self.0.lock()
2325
}
2426

25-
fn get_mut(self) -> Self::DerefMutT {
26-
self.0.borrow_mut()
27+
pub fn try_lock(&self) -> Option<MutexGuard<rcl_node_t>> {
28+
self.0.try_lock()
2729
}
2830
}
2931

3032
impl Drop for NodeHandle {
3133
fn drop(&mut self) {
3234
let handle = &mut *self.get_mut();
33-
unsafe {
34-
rcl_node_fini(handle as *mut _).unwrap();
35-
}
35+
unsafe { rcl_node_fini(handle as *mut _).unwrap() };
3636
}
3737
}
3838

3939
pub struct Node {
40-
handle: Rc<NodeHandle>,
41-
pub(crate) context: Rc<ContextHandle>,
40+
handle: Arc<NodeHandle>,
41+
pub(crate) context: Arc<ContextHandle>,
4242
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
4343
}
4444

4545
impl Node {
4646
#[allow(clippy::new_ret_no_self)]
47-
pub fn new(node_name: &str, context: &Context) -> Result<Node, RclError> {
47+
pub fn new<'ctxt>(node_name: &str, context: &Context) -> Result<Node, RclError> {
4848
Self::new_with_namespace(node_name, "", context)
4949
}
5050

51-
pub fn new_with_namespace(
51+
pub fn new_with_namespace<'ctxt>(
5252
node_name: &str,
5353
node_ns: &str,
5454
context: &Context,
@@ -57,7 +57,7 @@ impl Node {
5757
let raw_node_ns = CString::new(node_ns).unwrap();
5858

5959
let mut node_handle = unsafe { rcl_get_zero_initialized_node() };
60-
let context_handle = &mut *context.handle.get_mut();
60+
let context_handle = &mut *context.handle.lock();
6161

6262
unsafe {
6363
let node_options = rcl_node_get_default_options();
@@ -71,7 +71,7 @@ impl Node {
7171
.ok()?;
7272
}
7373

74-
let handle = Rc::new(NodeHandle(RefCell::new(node_handle)));
74+
let handle = Arc::new(NodeHandle(Mutex::new(node_handle)));
7575

7676
Ok(Node {
7777
handle,
@@ -81,7 +81,11 @@ impl Node {
8181
}
8282

8383
// TODO: make publisher's lifetime depend on node's lifetime
84-
pub fn create_publisher<T>(&self, topic: &str, qos: QoSProfile) -> Result<Publisher<T>, RclError>
84+
pub fn create_publisher<T>(
85+
&self,
86+
topic: &str,
87+
qos: QoSProfile,
88+
) -> Result<Publisher<T>, RclError>
8589
where
8690
T: rclrs_common::traits::MessageDefinition<T>,
8791
{
@@ -94,14 +98,14 @@ impl Node {
9498
topic: &str,
9599
qos: QoSProfile,
96100
callback: F,
97-
) -> Result<Rc<Subscription<T>>, RclError>
101+
) -> Result<Arc<Subscription<T>>, RclError>
98102
where
99103
T: rclrs_common::traits::MessageDefinition<T> + Default,
100104
F: FnMut(&T) + Sized + 'static,
101105
{
102-
let subscription = Rc::new(Subscription::<T>::new(self, topic, qos, callback)?);
106+
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
103107
self.subscriptions
104-
.push(Rc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
108+
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
105109
Ok(subscription)
106110
}
107111
}

0 commit comments

Comments
 (0)