diff --git a/rclrs/Cargo.toml b/rclrs/Cargo.toml index c7451db07..a0ba674ae 100644 --- a/rclrs/Cargo.toml +++ b/rclrs/Cargo.toml @@ -20,6 +20,8 @@ ament_rs = { version = "0.2", optional = true } futures = "0.3" # Needed for dynamic messages libloading = { version = "0.8", optional = true } +# Needed for /clock topic subscription when using simulation time +rosgraph_msgs = "*" # Needed for the Message trait, among others rosidl_runtime_rs = "0.3" diff --git a/rclrs/package.xml b/rclrs/package.xml index 70b7d349d..efe2541c7 100644 --- a/rclrs/package.xml +++ b/rclrs/package.xml @@ -18,6 +18,7 @@ rcl builtin_interfaces rcl_interfaces + rosgraph_msgs ament_cargo diff --git a/rclrs/src/clock.rs b/rclrs/src/clock.rs new file mode 100644 index 000000000..c89705d9d --- /dev/null +++ b/rclrs/src/clock.rs @@ -0,0 +1,223 @@ +use crate::rcl_bindings::*; +use crate::{error::ToResult, time::Time, to_rclrs_result}; +use std::sync::{Arc, Mutex}; + +/// Enum to describe clock type. Redefined for readability and to eliminate the uninitialized case +/// from the `rcl_clock_type_t` enum in the binding. +#[derive(Clone, Debug, Copy)] +pub enum ClockType { + /// Time with behavior dependent on the `set_ros_time(bool)` function. If called with `true` + /// it will be driven by a manual value override, otherwise it will be System Time + RosTime = 1, + /// Wall time depending on the current system + SystemTime = 2, + /// Steady time, monotonically increasing but not necessarily equal to wall time. + SteadyTime = 3, +} + +impl From for rcl_clock_type_t { + fn from(clock_type: ClockType) -> Self { + match clock_type { + ClockType::RosTime => rcl_clock_type_t::RCL_ROS_TIME, + ClockType::SystemTime => rcl_clock_type_t::RCL_SYSTEM_TIME, + ClockType::SteadyTime => rcl_clock_type_t::RCL_STEADY_TIME, + } + } +} + +/// Struct that implements a Clock and wraps `rcl_clock_t`. +#[derive(Clone, Debug)] +pub struct Clock { + _type: ClockType, + _rcl_clock: Arc>, + // TODO(luca) Implement jump callbacks +} + +/// A clock source that can be used to drive the contained clock. Created when a clock of type +/// `ClockType::RosTime` is constructed +pub struct ClockSource { + _rcl_clock: Arc>, +} + +impl Clock { + /// Creates a new Clock with `ClockType::SystemTime` + pub fn system() -> Self { + Self::make(ClockType::SystemTime) + } + + /// Creates a new Clock with `ClockType::SteadyTime` + pub fn steady() -> Self { + Self::make(ClockType::SteadyTime) + } + + /// Creates a new Clock with `ClockType::RosTime` and a matching `ClockSource` that can be used + /// to update it + pub fn with_source() -> (Self, ClockSource) { + let clock = Self::make(ClockType::RosTime); + let clock_source = ClockSource::new(clock._rcl_clock.clone()); + (clock, clock_source) + } + + /// Creates a new clock of the given `ClockType`. + pub fn new(type_: ClockType) -> (Self, Option) { + let clock = Self::make(type_); + let clock_source = + matches!(type_, ClockType::RosTime).then(|| ClockSource::new(clock._rcl_clock.clone())); + (clock, clock_source) + } + + fn make(type_: ClockType) -> Self { + let mut rcl_clock; + unsafe { + // SAFETY: Getting a default value is always safe. + rcl_clock = Self::init_generic_clock(); + let mut allocator = rcutils_get_default_allocator(); + // Function will return Err(_) only if there isn't enough memory to allocate a clock + // object. + rcl_clock_init(type_.into(), &mut rcl_clock, &mut allocator) + .ok() + .unwrap(); + } + Self { + _type: type_, + _rcl_clock: Arc::new(Mutex::new(rcl_clock)), + } + } + + /// Returns the clock's `ClockType`. + pub fn clock_type(&self) -> ClockType { + self._type + } + + /// Returns the current clock's timestamp. + pub fn now(&self) -> Time { + let mut clock = self._rcl_clock.lock().unwrap(); + let mut time_point: i64 = 0; + unsafe { + // SAFETY: No preconditions for this function + rcl_clock_get_now(&mut *clock, &mut time_point); + } + Time { + nsec: time_point, + clock: Arc::downgrade(&self._rcl_clock), + } + } + + /// Helper function to privately initialize a default clock, with the same behavior as + /// `rcl_init_generic_clock`. By defining a private function instead of implementing + /// `Default`, we avoid exposing a public API to create an invalid clock. + // SAFETY: Getting a default value is always safe. + unsafe fn init_generic_clock() -> rcl_clock_t { + let allocator = rcutils_get_default_allocator(); + rcl_clock_t { + type_: rcl_clock_type_t::RCL_CLOCK_UNINITIALIZED, + jump_callbacks: std::ptr::null_mut::(), + num_jump_callbacks: 0, + get_now: None, + data: std::ptr::null_mut::(), + allocator, + } + } +} + +impl Drop for ClockSource { + fn drop(&mut self) { + self.set_ros_time_enable(false); + } +} + +impl PartialEq for ClockSource { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self._rcl_clock, &other._rcl_clock) + } +} + +impl ClockSource { + /// Sets the value of the current ROS time. + pub fn set_ros_time_override(&self, nanoseconds: i64) { + let mut clock = self._rcl_clock.lock().unwrap(); + // SAFETY: Safe if clock jump callbacks are not edited, which is guaranteed + // by the mutex + unsafe { + // Function will only fail if timer was uninitialized or not RosTime, which should + // not happen + rcl_set_ros_time_override(&mut *clock, nanoseconds) + .ok() + .unwrap(); + } + } + + fn new(clock: Arc>) -> Self { + let source = Self { _rcl_clock: clock }; + source.set_ros_time_enable(true); + source + } + + /// Sets the clock to use ROS Time, if enabled the clock will report the last value set through + /// `Clock::set_ros_time_override(nanoseconds: i64)`. + fn set_ros_time_enable(&self, enable: bool) { + let mut clock = self._rcl_clock.lock().unwrap(); + if enable { + // SAFETY: Safe if clock jump callbacks are not edited, which is guaranteed + // by the mutex + unsafe { + // Function will only fail if timer was uninitialized or not RosTime, which should + // not happen + rcl_enable_ros_time_override(&mut *clock).ok().unwrap(); + } + } else { + // SAFETY: Safe if clock jump callbacks are not edited, which is guaranteed + // by the mutex + unsafe { + // Function will only fail if timer was uninitialized or not RosTime, which should + // not happen + rcl_disable_ros_time_override(&mut *clock).ok().unwrap(); + } + } + } +} + +impl Drop for rcl_clock_t { + fn drop(&mut self) { + // SAFETY: No preconditions for this function + let rc = unsafe { rcl_clock_fini(&mut *self) }; + if let Err(e) = to_rclrs_result(rc) { + panic!("Unable to release Clock. {:?}", e) + } + } +} + +// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread +// they are running in. Therefore, this type can be safely sent to another thread. +unsafe impl Send for rcl_clock_t {} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_send() {} + fn assert_sync() {} + + #[test] + fn clock_is_send_and_sync() { + assert_send::(); + assert_sync::(); + } + + #[test] + fn clock_system_time_now() { + let clock = Clock::system(); + assert!(clock.now().nsec > 0); + } + + #[test] + fn clock_ros_time_with_override() { + let (clock, source) = Clock::with_source(); + // No manual time set, it should default to 0 + assert!(clock.now().nsec == 0); + let set_time = 1234i64; + source.set_ros_time_override(set_time); + // Ros time is set, should return the value that was set + assert_eq!(clock.now().nsec, set_time); + } +} diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 9afec58bd..d3a752892 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -7,6 +7,7 @@ mod arguments; mod client; +mod clock; mod context; mod error; mod executor; @@ -16,6 +17,8 @@ mod publisher; mod qos; mod service; mod subscription; +mod time; +mod time_source; mod vendor; mod wait; @@ -29,6 +32,7 @@ use std::time::Duration; pub use arguments::*; pub use client::*; +pub use clock::*; pub use context::*; pub use error::*; pub use executor::*; @@ -39,6 +43,8 @@ pub use qos::*; pub use rcl_bindings::rmw_request_id_t; pub use service::*; pub use subscription::*; +pub use time::*; +use time_source::*; pub use wait::*; /// Polls the node for new messages and executes the corresponding callbacks. @@ -80,7 +86,7 @@ pub fn spin(node: Arc) -> Result<(), RclrsError> { /// # Ok::<(), RclrsError>(()) /// ``` pub fn create_node(context: &Context, node_name: &str) -> Result, RclrsError> { - Ok(Arc::new(Node::builder(context, node_name).build()?)) + Node::new(context, node_name) } /// Creates a [`NodeBuilder`][1]. diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index bf787c956..b91e16276 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -13,9 +13,9 @@ pub use self::builder::*; pub use self::graph::*; use crate::rcl_bindings::*; use crate::{ - Client, ClientBase, Context, GuardCondition, ParameterBuilder, ParameterInterface, + Client, ClientBase, Clock, Context, GuardCondition, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, Publisher, QoSProfile, RclrsError, Service, ServiceBase, - Subscription, SubscriptionBase, SubscriptionCallback, ToResult, + Subscription, SubscriptionBase, SubscriptionCallback, TimeSource, ToResult, }; impl Drop for rcl_node_t { @@ -71,6 +71,7 @@ pub struct Node { pub(crate) guard_conditions_mtx: Mutex>>, pub(crate) services_mtx: Mutex>>, pub(crate) subscriptions_mtx: Mutex>>, + _time_source: TimeSource, _parameter: ParameterInterface, } @@ -95,10 +96,15 @@ impl Node { /// /// See [`NodeBuilder::new()`] for documentation. #[allow(clippy::new_ret_no_self)] - pub fn new(context: &Context, node_name: &str) -> Result { + pub fn new(context: &Context, node_name: &str) -> Result, RclrsError> { Self::builder(context, node_name).build() } + /// Returns the clock associated with this node. + pub fn get_clock(&self) -> Clock { + self._time_source.get_clock() + } + /// Returns the name of the node. /// /// This returns the name after remapping, so it is not necessarily the same as the name that diff --git a/rclrs/src/node/builder.rs b/rclrs/src/node/builder.rs index b6373b7de..543bd50d2 100644 --- a/rclrs/src/node/builder.rs +++ b/rclrs/src/node/builder.rs @@ -2,7 +2,10 @@ use std::ffi::CString; use std::sync::{Arc, Mutex}; use crate::rcl_bindings::*; -use crate::{Context, Node, ParameterInterface, RclrsError, ToResult}; +use crate::{ + ClockType, Context, Node, ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, + QOS_PROFILE_CLOCK, +}; /// A builder for creating a [`Node`][1]. /// @@ -14,6 +17,8 @@ use crate::{Context, Node, ParameterInterface, RclrsError, ToResult}; /// - `use_global_arguments: true` /// - `arguments: []` /// - `enable_rosout: true` +/// - `clock_type: ClockType::RosTime` +/// - `clock_qos: QOS_PROFILE_CLOCK` /// /// # Example /// ``` @@ -43,6 +48,8 @@ pub struct NodeBuilder { use_global_arguments: bool, arguments: Vec, enable_rosout: bool, + clock_type: ClockType, + clock_qos: QoSProfile, } impl NodeBuilder { @@ -89,6 +96,8 @@ impl NodeBuilder { use_global_arguments: true, arguments: vec![], enable_rosout: true, + clock_type: ClockType::RosTime, + clock_qos: QOS_PROFILE_CLOCK, } } @@ -221,6 +230,18 @@ impl NodeBuilder { self } + /// Sets the node's clock type. + pub fn clock_type(mut self, clock_type: ClockType) -> Self { + self.clock_type = clock_type; + self + } + + /// Sets the QoSProfile for the clock subscription. + pub fn clock_qos(mut self, clock_qos: QoSProfile) -> Self { + self.clock_qos = clock_qos; + self + } + /// Builds the node instance. /// /// Node name and namespace validation is performed in this method. @@ -228,7 +249,7 @@ impl NodeBuilder { /// For example usage, see the [`NodeBuilder`][1] docs. /// /// [1]: crate::NodeBuilder - pub fn build(&self) -> Result { + pub fn build(&self) -> Result, RclrsError> { let node_name = CString::new(self.name.as_str()).map_err(|err| RclrsError::StringContainsNul { err, @@ -265,15 +286,20 @@ impl NodeBuilder { &rcl_node_options.arguments, &rcl_context.global_arguments, )?; - Ok(Node { + let node = Arc::new(Node { rcl_node_mtx, rcl_context_mtx: self.context.clone(), clients_mtx: Mutex::new(vec![]), guard_conditions_mtx: Mutex::new(vec![]), services_mtx: Mutex::new(vec![]), subscriptions_mtx: Mutex::new(vec![]), + _time_source: TimeSource::builder(self.clock_type) + .clock_qos(self.clock_qos) + .build(), _parameter, - }) + }); + node._time_source.attach_node(&node); + Ok(node) } /// Creates a rcl_node_options_t struct from this builder. diff --git a/rclrs/src/qos.rs b/rclrs/src/qos.rs index b4904262b..0de35dc16 100644 --- a/rclrs/src/qos.rs +++ b/rclrs/src/qos.rs @@ -296,6 +296,21 @@ pub const QOS_PROFILE_SENSOR_DATA: QoSProfile = QoSProfile { avoid_ros_namespace_conventions: false, }; +/// Equivalent to `ClockQos` from the [`rclcpp` package][1]. +/// Same as sensor data but with a history depth of 1 +/// +/// [1]: https://github.com/ros2/rclcpp/blob/rolling/rclcpp/include/rclcpp/qos.hpp +pub const QOS_PROFILE_CLOCK: QoSProfile = QoSProfile { + history: QoSHistoryPolicy::KeepLast { depth: 1 }, + reliability: QoSReliabilityPolicy::BestEffort, + durability: QoSDurabilityPolicy::Volatile, + deadline: QoSDuration::SystemDefault, + lifespan: QoSDuration::SystemDefault, + liveliness: QoSLivelinessPolicy::SystemDefault, + liveliness_lease_duration: QoSDuration::SystemDefault, + avoid_ros_namespace_conventions: false, +}; + /// Equivalent to `rmw_qos_profile_parameters` from the [`rmw` package][1]. /// /// [1]: https://github.com/ros2/rmw/blob/master/rmw/include/rmw/qos_profiles.h diff --git a/rclrs/src/time.rs b/rclrs/src/time.rs new file mode 100644 index 000000000..848981aa8 --- /dev/null +++ b/rclrs/src/time.rs @@ -0,0 +1,87 @@ +use crate::rcl_bindings::*; +use std::ops::{Add, Sub}; +use std::sync::{Mutex, Weak}; +use std::time::Duration; + +/// Struct that represents time. +#[derive(Clone, Debug)] +pub struct Time { + /// Timestamp in nanoseconds. + pub nsec: i64, + /// Weak reference to the clock that generated this time + pub clock: Weak>, +} + +impl Time { + /// Compares self to rhs, if they can be compared (originated from the same clock) calls f with + /// the values of the timestamps. + pub fn compare_with(&self, rhs: &Time, f: F) -> Option + where + F: FnOnce(i64, i64) -> U, + { + self.clock + .ptr_eq(&rhs.clock) + .then(|| f(self.nsec, rhs.nsec)) + } +} + +impl Add for Time { + type Output = Self; + + fn add(self, other: Duration) -> Self { + let dur_ns = i64::try_from(other.as_nanos()).unwrap(); + Time { + nsec: self.nsec.checked_add(dur_ns).unwrap(), + clock: self.clock.clone(), + } + } +} + +impl Sub for Time { + type Output = Self; + + fn sub(self, other: Duration) -> Self { + let dur_ns = i64::try_from(other.as_nanos()).unwrap(); + Time { + nsec: self.nsec.checked_sub(dur_ns).unwrap(), + clock: self.clock.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Clock; + + #[test] + fn compare_times_from_same_clock() { + let clock = Clock::system(); + let t1 = clock.now(); + std::thread::sleep(std::time::Duration::from_micros(1)); + let t2 = clock.now(); + assert_eq!(t1.compare_with(&t2, |t1, t2| t1 > t2), Some(false)); + assert_eq!(t1.compare_with(&t2, |t1, t2| t2 > t1), Some(true)); + } + + #[test] + fn compare_times_from_different_clocks() { + // Times from different clocks, even if of the same type, can't be compared + let c1 = Clock::system(); + let c2 = Clock::system(); + let t1 = c1.now(); + let t2 = c2.now(); + assert!(t2.compare_with(&t1, |_, _| ()).is_none()); + assert!(t1.compare_with(&t2, |_, _| ()).is_none()); + } + + #[test] + fn add_duration_to_time() { + let (clock, _) = Clock::with_source(); + let t = clock.now(); + let t2 = t.clone() + Duration::from_secs(1); + assert_eq!(t2.nsec - t.nsec, 1_000_000_000i64); + let t3 = t2 - Duration::from_secs(1); + assert_eq!(t3.nsec, t.nsec); + } +} diff --git a/rclrs/src/time_source.rs b/rclrs/src/time_source.rs new file mode 100644 index 000000000..d2aac431f --- /dev/null +++ b/rclrs/src/time_source.rs @@ -0,0 +1,165 @@ +use crate::clock::{Clock, ClockSource, ClockType}; +use crate::{MandatoryParameter, Node, QoSProfile, Subscription, QOS_PROFILE_CLOCK}; +use rosgraph_msgs::msg::Clock as ClockMsg; +use std::sync::{Arc, Mutex, RwLock, Weak}; + +/// Time source for a node that drives the attached clock. +/// If the node's `use_sim_time` parameter is set to `true`, the `TimeSource` will subscribe +/// to the `/clock` topic and drive the attached clock +pub(crate) struct TimeSource { + _node: Mutex>, + _clock: RwLock, + _clock_source: Arc>>, + _requested_clock_type: ClockType, + _clock_qos: QoSProfile, + _clock_subscription: Mutex>>>, + _last_received_time: Arc>>, + _use_sim_time: Mutex>>, +} + +/// A builder for creating a [`TimeSource`][1]. +/// +/// The builder pattern allows selectively setting some fields, and leaving all others at their default values. +/// This struct instance can be created via [`TimeSource::builder()`][2]. +/// +/// The default values for optional fields are: +/// - `clock_qos: QOS_PROFILE_CLOCK`[3] +/// +/// +/// [1]: crate::TimeSource +/// [2]: crate::TimeSource::builder +/// [3]: crate::QOS_PROFILE_CLOCK +pub(crate) struct TimeSourceBuilder { + clock_qos: QoSProfile, + clock_type: ClockType, +} + +impl TimeSourceBuilder { + /// Creates a builder for a time source that drives the given clock. + pub(crate) fn new(clock_type: ClockType) -> Self { + Self { + clock_qos: QOS_PROFILE_CLOCK, + clock_type, + } + } + + /// Sets the QoS for the `/clock` topic. + pub(crate) fn clock_qos(mut self, clock_qos: QoSProfile) -> Self { + self.clock_qos = clock_qos; + self + } + + /// Builds the `TimeSource` and attaches the provided `Node` and `Clock`. + pub(crate) fn build(self) -> TimeSource { + let clock = match self.clock_type { + ClockType::RosTime | ClockType::SystemTime => Clock::system(), + ClockType::SteadyTime => Clock::steady(), + }; + TimeSource { + _node: Mutex::new(Weak::new()), + _clock: RwLock::new(clock), + _clock_source: Arc::new(Mutex::new(None)), + _requested_clock_type: self.clock_type, + _clock_qos: self.clock_qos, + _clock_subscription: Mutex::new(None), + _last_received_time: Arc::new(Mutex::new(None)), + _use_sim_time: Mutex::new(None), + } + } +} + +impl TimeSource { + /// Creates a new `TimeSourceBuilder` with default parameters. + pub(crate) fn builder(clock_type: ClockType) -> TimeSourceBuilder { + TimeSourceBuilder::new(clock_type) + } + + /// Returns the clock that this TimeSource is controlling. + pub(crate) fn get_clock(&self) -> Clock { + self._clock.read().unwrap().clone() + } + + /// Attaches the given node to to the `TimeSource`, using its interface to read the + /// `use_sim_time` parameter and create the clock subscription. + pub(crate) fn attach_node(&self, node: &Arc) { + // TODO(luca) register a parameter callback that calls set_ros_time(bool) once parameter + // callbacks are implemented. + let param = node + .declare_parameter("use_sim_time") + .default(false) + .mandatory() + .unwrap(); + *self._node.lock().unwrap() = Arc::downgrade(node); + self.set_ros_time_enable(param.get()); + *self._use_sim_time.lock().unwrap() = Some(param); + } + + fn set_ros_time_enable(&self, enable: bool) { + if matches!(self._requested_clock_type, ClockType::RosTime) { + let mut clock = self._clock.write().unwrap(); + if enable && matches!(clock.clock_type(), ClockType::SystemTime) { + let (new_clock, mut clock_source) = Clock::with_source(); + if let Some(last_received_time) = *self._last_received_time.lock().unwrap() { + Self::update_clock(&mut clock_source, last_received_time); + } + *clock = new_clock; + *self._clock_source.lock().unwrap() = Some(clock_source); + *self._clock_subscription.lock().unwrap() = Some(self.create_clock_sub()); + } + if !enable && matches!(clock.clock_type(), ClockType::RosTime) { + *clock = Clock::system(); + *self._clock_source.lock().unwrap() = None; + *self._clock_subscription.lock().unwrap() = None; + } + } + } + + fn update_clock(clock: &mut ClockSource, nanoseconds: i64) { + clock.set_ros_time_override(nanoseconds); + } + + fn create_clock_sub(&self) -> Arc> { + let clock = self._clock_source.clone(); + let last_received_time = self._last_received_time.clone(); + // Safe to unwrap since the function will only fail if invalid arguments are provided + self._node + .lock() + .unwrap() + .upgrade() + .unwrap() + .create_subscription::("/clock", self._clock_qos, move |msg: ClockMsg| { + let nanoseconds: i64 = + (msg.clock.sec as i64 * 1_000_000_000) + msg.clock.nanosec as i64; + *last_received_time.lock().unwrap() = Some(nanoseconds); + if let Some(clock) = clock.lock().unwrap().as_mut() { + Self::update_clock(clock, nanoseconds); + } + }) + .unwrap() + } +} + +#[cfg(test)] +mod tests { + use crate::{create_node, Context}; + + #[test] + fn time_source_default_clock() { + let node = create_node(&Context::new([]).unwrap(), "test_node").unwrap(); + // Default clock should be above 0 (use_sim_time is default false) + assert!(node.get_clock().now().nsec > 0); + } + + #[test] + fn time_source_sim_time() { + let ctx = Context::new([ + String::from("--ros-args"), + String::from("-p"), + String::from("use_sim_time:=true"), + ]) + .unwrap(); + let node = create_node(&ctx, "test_node").unwrap(); + // Default sim time value should be 0 (no message received) + assert_eq!(node.get_clock().now().nsec, 0); + } +} diff --git a/rclrs_tests/src/graph_tests.rs b/rclrs_tests/src/graph_tests.rs index b32447ca1..11072e09c 100644 --- a/rclrs_tests/src/graph_tests.rs +++ b/rclrs_tests/src/graph_tests.rs @@ -2,11 +2,12 @@ use rclrs::{ Context, Node, NodeBuilder, RclrsError, TopicEndpointInfo, TopicNamesAndTypes, QOS_PROFILE_SYSTEM_DEFAULT, }; +use std::sync::Arc; use test_msgs::{msg, srv}; struct TestGraph { - node1: Node, - node2: Node, + node1: Arc, + node2: Arc, } fn construct_test_graph(namespace: &str) -> Result {