Skip to content

Commit 231a3f1

Browse files
authored
Implement WaitSet (#48)
* Beginning implementation of WaitSet Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Reworked error handling a bit Implemented first working version of WaitSet Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Remove unused code Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Merge new and init Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Merge new and init. Add documentation Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Remove unneeded clear Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Remove unneeded imports Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]> * Run rustfmt Distro A, OPSEC #4584. You may have additional rights; please see https://rosmilitary.org/faq/?category=ros-2-license Signed-off-by: Jacob Hassold <[email protected]>
1 parent e4ab569 commit 231a3f1

File tree

5 files changed

+262
-131
lines changed

5 files changed

+262
-131
lines changed

rclrs/src/error.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::Result;
22

33
use crate::rcl_bindings::*;
4-
pub use rclrs_common::error::RclError;
4+
pub use rclrs_common::error::{RclError, to_rcl_result};
55

66
pub(crate) trait ToResult {
77
fn ok(&self) -> Result<(), RclError>;
@@ -13,10 +13,6 @@ pub(crate) trait ToResult {
1313

1414
impl ToResult for rcl_ret_t {
1515
fn ok(&self) -> Result<(), RclError> {
16-
if *self as u32 == RCL_RET_OK {
17-
Ok(())
18-
} else {
19-
Err(RclError::from(*self))
20-
}
16+
to_rcl_result(*self as i32)
2117
}
2218
}

rclrs/src/lib.rs

Lines changed: 19 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod context;
22
pub mod error;
33
pub mod node;
44
pub mod qos;
5+
pub mod wait;
56

67
mod rcl_bindings;
78

@@ -13,6 +14,8 @@ pub use self::qos::*;
1314
use self::rcl_bindings::*;
1415
use std::ops::{Deref, DerefMut};
1516
use anyhow::Result;
17+
use rclrs_common::error::WaitSetError;
18+
use wait::WaitSet;
1619

1720
pub trait Handle<T> {
1821
type DerefT: Deref<Target = T>;
@@ -27,8 +30,9 @@ pub fn spin(node: &Node) -> Result<(), RclError> {
2730
while unsafe { rcl_context_is_valid(&mut *node.context.get_mut() as *mut _) } {
2831
if let Some(error) = spin_once(node, 500).err() {
2932
match error {
30-
RclError::Timeout => continue,
31-
_ => return Err(error),
33+
WaitSetError::DroppedSubscription => continue,
34+
WaitSetError::RclError(RclError::Timeout) => continue,
35+
WaitSetError::RclError(rclerr) => return Err(rclerr),
3236
}
3337
}
3438
}
@@ -74,10 +78,7 @@ pub fn spin(node: &Node) -> Result<(), RclError> {
7478
/// +--------------------+
7579
///
7680
///
77-
pub fn spin_once(node: &Node, timeout: i64) -> Result<(), RclError> {
78-
// get an rcl_wait_set_t - All NULLs
79-
let mut wait_set_handle = unsafe { rcl_get_zero_initialized_wait_set() };
80-
81+
pub fn spin_once(node: &Node, timeout: i64) -> Result<(), WaitSetError> {
8182
let number_of_subscriptions = node.subscriptions.len();
8283
let number_of_guard_conditions = 0;
8384
let number_of_timers = 0;
@@ -87,80 +88,24 @@ pub fn spin_once(node: &Node, timeout: i64) -> Result<(), RclError> {
8788

8889
let context = &mut *node.context.get_mut();
8990

90-
unsafe {
91-
rcl_wait_set_init(
92-
&mut wait_set_handle as *mut _,
93-
number_of_subscriptions,
94-
number_of_guard_conditions,
95-
number_of_timers,
96-
number_of_clients,
97-
number_of_services,
98-
number_of_events,
99-
context,
100-
rcutils_get_default_allocator(),
101-
).ok()?;
102-
}
103-
104-
unsafe {
105-
match rcl_wait_set_clear(&mut wait_set_handle as *mut _).ok() {
106-
Ok(()) => (),
107-
Err(rcl_err) => {
108-
eprintln!("Unable to clear WaitSet! Error code: {:?}", rcl_err);
109-
match rcl_wait_set_fini(&mut wait_set_handle as *mut _).ok() {
110-
Ok(()) => return Err(rcl_err),
111-
Err(rcl_err2) => {
112-
eprintln!("Trying to clear the WaitSet caused a second error!! Error code: {:?}", rcl_err2);
113-
return Err(rcl_err2);
114-
}
115-
}
116-
}
117-
};
118-
}
91+
let mut wait_set = WaitSet::new(
92+
number_of_subscriptions,
93+
number_of_guard_conditions,
94+
number_of_timers,
95+
number_of_clients,
96+
number_of_services,
97+
number_of_events,
98+
context)?;
11999

120100
for subscription in &node.subscriptions {
121-
if let Some(subscription) = subscription.upgrade() {
122-
let subscription_handle = &*subscription.handle().get();
123-
unsafe {
124-
match rcl_wait_set_add_subscription(
125-
&mut wait_set_handle as *mut _,
126-
subscription_handle as *const _,
127-
std::ptr::null_mut(),
128-
)
129-
.ok() {
130-
Ok(()) => (),
131-
Err(rcl_err) => {
132-
eprintln!("Unable to add subscription to WaitSet! Error code: {:?}", rcl_err);
133-
match rcl_wait_set_fini(&mut wait_set_handle as *mut _).ok() {
134-
Ok(()) => return Err(rcl_err),
135-
Err(rcl_err2) => {
136-
eprintln!("Trying to clear the WaitSet caused a second error!! Error code: {:?}", rcl_err2);
137-
return Err(rcl_err2);
138-
}
139-
}
140-
}
141-
};
142-
}
143-
}
144-
}
145-
146-
unsafe {
147-
match rcl_wait(&mut wait_set_handle as *mut _, timeout).ok() {
101+
match wait_set.add_subscription(subscription) {
148102
Ok(()) => (),
149-
Err(rcl_err) => {
150-
match rcl_wait_set_fini(&mut wait_set_handle as *mut _).ok() {
151-
Ok(()) => return Err(rcl_err),
152-
Err(RclError::Timeout) => return Err(RclError::Timeout),
153-
Err(rcl_err2) => {
154-
eprintln!("Error waiting on WaitSet! Error code: {:?}", rcl_err);
155-
eprintln!("Trying to clear the WaitSet caused a second error!! Error code: {:?}", rcl_err2);
156-
return Err(rcl_err2);
157-
}
158-
}
159-
}
160-
103+
Err(WaitSetError::DroppedSubscription) => (),
104+
err => return err,
161105
};
162106
}
163107

108+
wait_set.wait(timeout)?;
164109
for subscription in &node.subscriptions {
165110
if let Some(subscription) = subscription.upgrade() {
166111
let mut message = subscription.create_message();
@@ -170,15 +115,6 @@ pub fn spin_once(node: &Node, timeout: i64) -> Result<(), RclError> {
170115
}
171116
}
172117
}
173-
unsafe {
174-
match rcl_wait_set_fini(&mut wait_set_handle as *mut _).ok() {
175-
Ok(()) => (),
176-
Err(rcl_err) => {
177-
eprintln!("Error cleaning up WaitSet! Error code: {:?}", rcl_err);
178-
return Err(rcl_err);
179-
}
180-
};
181-
}
182118

183119
Ok(())
184120
}

rclrs/src/node/subscription.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::ffi::CString;
88
use std::marker::PhantomData;
99
use std::rc::Rc;
1010
use anyhow::Result;
11+
use rclrs_common::error::to_rcl_result;
1112

1213
pub struct SubscriptionHandle {
1314
handle: RefCell<rcl_subscription_t>,
@@ -78,13 +79,13 @@ pub trait SubscriptionBase {
7879
)
7980
};
8081

81-
let result = match result.into() {
82-
RclError::Ok => {
82+
let result = match to_rcl_result(result) {
83+
Ok(()) => {
8384
message.read_handle(message_handle);
8485
Ok(true)
8586
}
86-
RclError::SubscriptionTakeFailed => Ok(false),
87-
error => Err(error),
87+
Err(RclError::SubscriptionTakeFailed) => Ok(false),
88+
Err(error) => Err(error),
8889
};
8990

9091
message.destroy_native_message(message_handle);

rclrs/src/wait.rs

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Copyright 2020 DCS Corporation, All Rights Reserved.
2+
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// DISTRIBUTION A. Approved for public release; distribution unlimited.
16+
// OPSEC #4584.
17+
18+
use std::borrow::BorrowMut;
19+
use std::rc::Weak;
20+
21+
use crate::Handle;
22+
use crate::{error::*, SubscriptionBase};
23+
24+
use crate::rcl_bindings::*;
25+
26+
use anyhow::Result;
27+
use rclrs_common::error::WaitSetError;
28+
29+
pub struct WaitSet {
30+
pub wait_set: rcl_wait_set_t,
31+
initialized: bool,
32+
}
33+
34+
impl WaitSet {
35+
/// Creates and initializes a new WaitSet object.
36+
///
37+
/// Under the hood, this calls `rcl_get_zero_initialized_wait_set()`, and stores it
38+
/// within the WaitSet struct, while also noting that the returned value is uninitialized.
39+
pub fn new(
40+
number_of_subscriptions: usize,
41+
number_of_guard_conditions: usize,
42+
number_of_timers: usize,
43+
number_of_clients: usize,
44+
number_of_services: usize,
45+
number_of_events: usize,
46+
context: &mut rcl_context_t,
47+
) -> Result<Self, WaitSetError> {
48+
let mut waitset = Self {
49+
wait_set: unsafe { rcl_get_zero_initialized_wait_set() },
50+
initialized: false,
51+
};
52+
unsafe {
53+
match to_rcl_result(rcl_wait_set_init(
54+
waitset.wait_set.borrow_mut() as *mut _,
55+
number_of_subscriptions,
56+
number_of_guard_conditions,
57+
number_of_timers,
58+
number_of_clients,
59+
number_of_services,
60+
number_of_events,
61+
context,
62+
rcutils_get_default_allocator(),
63+
)) {
64+
Ok(()) => {
65+
waitset.initialized = true;
66+
Ok(waitset)
67+
}
68+
Err(err) => {
69+
waitset.initialized = false;
70+
Err(WaitSetError::RclError(err))
71+
}
72+
}
73+
}
74+
}
75+
76+
/// Removes (sets to NULL) all entities in the WaitSet
77+
///
78+
/// # Errors
79+
/// - `RclError::InvalidArgument` if any arguments are invalid.
80+
/// - `RclError::WaitSetInvalid` if the WaitSet is already zero-initialized.
81+
/// - `RclError::Error` for an unspecified error
82+
pub fn clear(&mut self) -> Result<(), WaitSetError> {
83+
if !self.initialized {
84+
return Err(WaitSetError::RclError(RclError::WaitSetInvalid));
85+
}
86+
unsafe {
87+
// Whether or not we successfully clear, this WaitSet will count as uninitialized
88+
self.initialized = false;
89+
to_rcl_result(rcl_wait_set_clear(self.wait_set.borrow_mut() as *mut _))
90+
.map_err(WaitSetError::RclError)
91+
}
92+
}
93+
94+
/// Adds a subscription to the WaitSet
95+
///
96+
/// # Errors
97+
/// - `WaitSetError::DroppedSubscription` if the passed weak pointer refers to a dropped subscription
98+
/// - `WaitSetError::RclError` for any `rcl` errors that occur during the process
99+
pub fn add_subscription(
100+
&mut self,
101+
subscription: &Weak<dyn SubscriptionBase>,
102+
) -> Result<(), WaitSetError> {
103+
if let Some(subscription) = subscription.upgrade() {
104+
let subscription_handle = &*subscription.handle().get();
105+
unsafe {
106+
return to_rcl_result(rcl_wait_set_add_subscription(
107+
self.wait_set.borrow_mut() as *mut _,
108+
subscription_handle as *const _,
109+
std::ptr::null_mut(),
110+
))
111+
.map_err(WaitSetError::RclError);
112+
}
113+
} else {
114+
Err(WaitSetError::DroppedSubscription)
115+
}
116+
}
117+
118+
/// Blocks until the WaitSet is ready, or until the timeout has been exceeded
119+
///
120+
/// This function will collect the items in the rcl_wait_set_t and pass them
121+
/// to the underlying rmw_wait function.
122+
/// The items in the wait set will be either left untouched or set to NULL after
123+
/// this function returns.
124+
/// Items that are not NULL are ready, where ready means different things based
125+
/// on the type of the item.
126+
/// For subscriptions this means there may be messages that can be taken, or
127+
/// perhaps that the state of the subscriptions has changed, in which case
128+
/// rcl_take may succeed but return with taken == false.
129+
/// For guard conditions this means the guard condition was triggered.
130+
///
131+
/// The wait set struct must be allocated, initialized, and should have been
132+
/// cleared and then filled with items, e.g. subscriptions and guard conditions.
133+
/// Passing a wait set with no wait-able items in it will fail.
134+
/// NULL items in the sets are ignored, e.g. it is valid to have as input:
135+
/// subscriptions[0] = valid pointer
136+
/// subscriptions[1] = NULL
137+
/// subscriptions[2] = valid pointer
138+
/// size_of_subscriptions = 3
139+
///
140+
/// Passing an uninitialized (zero initialized) wait set struct will fail.
141+
/// Passing a wait set struct with uninitialized memory is undefined behavior.
142+
/// For this reason, it is advised to use the WaitSet struct to call `wait`, as it
143+
/// cannot be created uninitialized.
144+
///
145+
/// The unit of timeout is nanoseconds.
146+
/// If the timeout is negative then this function will block indefinitely until
147+
/// something in the wait set is valid or it is interrupted.
148+
/// If the timeout is 0 then this function will be non-blocking; checking what's
149+
/// ready now, but not waiting if nothing is ready yet.
150+
/// If the timeout is greater than 0 then this function will return after
151+
/// that period of time has elapsed or the wait set becomes ready, which ever
152+
/// comes first.
153+
/// Passing a timeout struct with uninitialized memory is undefined behavior.
154+
///
155+
/// This function is thread-safe for unique wait sets with unique contents.
156+
/// This function cannot operate on the same wait set in multiple threads, and
157+
/// the wait sets may not share content.
158+
/// For example, calling wait() in two threads on two different wait sets
159+
/// that both contain a single, shared guard condition is undefined behavior.
160+
/// # Errors
161+
/// - `RclError::InvalidArgument` if an argument was invalid
162+
/// - `RclError::WaitSetInvalid` if the wait set is zero initialized
163+
/// - `RclError::WaitSetEmpty` if the wait set contains no items
164+
/// - `RclError::Timeout` if the timeout expired before something was ready
165+
/// - `RclError::Error` for an unspecified error
166+
pub fn wait(&mut self, timeout: i64) -> Result<(), WaitSetError> {
167+
unsafe {
168+
to_rcl_result(rcl_wait(self.wait_set.borrow_mut() as *mut _, timeout))
169+
.map_err(WaitSetError::RclError)
170+
}
171+
}
172+
}
173+
174+
impl Drop for WaitSet {
175+
/// Drops the WaitSet, and clears the memory
176+
///
177+
/// # Panics
178+
/// A panic is raised if `rcl` is unable to release the waitset for any reason.
179+
fn drop(&mut self) {
180+
let handle = &mut *self.wait_set.borrow_mut();
181+
unsafe {
182+
match to_rcl_result(rcl_wait_set_fini(handle as *mut _)) {
183+
Ok(()) => (),
184+
Err(err) => {
185+
panic!("Unable to release WaitSet!! {:?}", err)
186+
}
187+
}
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)