Skip to content

Commit 7450810

Browse files
authored
Return Arc from the create_node function to match other create_X functions (#294)
* Fine-grained locks. Made create_subscription, create_service, create_client not take a mutable self * Return an Arc from rclrs::create_node to match other create_X functions * Update spin* to take an Arc * Fix clippy warning
1 parent f9e7263 commit 7450810

File tree

11 files changed

+62
-52
lines changed

11 files changed

+62
-52
lines changed

docs/writing-your-first-rclrs-node.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ struct RepublisherNode {
5454

5555
impl RepublisherNode {
5656
fn new(context: &rclrs::Context) -> Result<Self, rclrs::RclrsError> {
57-
let mut node = rclrs::Node::new(context, "republisher")?;
57+
let node = rclrs::Node::new(context, "republisher")?;
5858
let data = None;
5959
let _subscription = node.create_subscription(
6060
"in_topic",
@@ -76,7 +76,7 @@ Next, add a main function to launch it:
7676
fn main() -> Result<(), rclrs::RclrsError> {
7777
let context = rclrs::Context::new(std::env::args())?;
7878
let republisher = RepublisherNode::new(&context)?;
79-
rclrs::spin(&republisher.node)
79+
rclrs::spin(republisher.node)
8080
}
8181
```
8282

@@ -121,7 +121,7 @@ struct RepublisherNode {
121121

122122
impl RepublisherNode {
123123
fn new(context: &rclrs::Context) -> Result<Self, rclrs::RclrsError> {
124-
let mut node = rclrs::Node::new(context, "republisher")?;
124+
let node = rclrs::Node::new(context, "republisher")?;
125125
let data = Arc::new(Mutex::new(None)); // (3)
126126
let data_cb = Arc::clone(&data);
127127
let _subscription = {
@@ -190,7 +190,7 @@ fn main() -> Result<(), rclrs::RclrsError> {
190190
republisher.republish()?;
191191
}
192192
});
193-
rclrs::spin(&republisher.node)
193+
rclrs::spin(republisher.node)
194194
}
195195
```
196196

@@ -212,7 +212,7 @@ fn main() -> Result<(), rclrs::RclrsError> {
212212
republisher_other_thread.republish()?;
213213
}
214214
});
215-
rclrs::spin(&republisher.node)
215+
rclrs::spin(republisher.node)
216216
}
217217
```
218218

examples/message_demo/src/message_demo.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::convert::TryInto;
22
use std::env;
3+
use std::sync::Arc;
34

45
use anyhow::{Error, Result};
56
use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence};
@@ -132,7 +133,7 @@ fn demonstrate_pubsub() -> Result<(), Error> {
132133
println!("================== Interoperability demo ==================");
133134
// Demonstrate interoperability between idiomatic and RMW-native message types
134135
let context = rclrs::Context::new(env::args())?;
135-
let mut node = rclrs::create_node(&context, "message_demo")?;
136+
let node = rclrs::create_node(&context, "message_demo")?;
136137

137138
let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
138139
"topic",
@@ -159,10 +160,10 @@ fn demonstrate_pubsub() -> Result<(), Error> {
159160
)?;
160161
println!("Sending idiomatic message.");
161162
idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?;
162-
rclrs::spin_once(&node, None)?;
163+
rclrs::spin_once(Arc::clone(&node), None)?;
163164
println!("Sending RMW-native message.");
164165
direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?;
165-
rclrs::spin_once(&node, None)?;
166+
rclrs::spin_once(Arc::clone(&node), None)?;
166167

167168
Ok(())
168169
}

examples/minimal_client_service/src/minimal_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::{Error, Result};
55
fn main() -> Result<(), Error> {
66
let context = rclrs::Context::new(env::args())?;
77

8-
let mut node = rclrs::create_node(&context, "minimal_client")?;
8+
let node = rclrs::create_node(&context, "minimal_client")?;
99

1010
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
1111

@@ -28,5 +28,5 @@ fn main() -> Result<(), Error> {
2828
std::thread::sleep(std::time::Duration::from_millis(500));
2929

3030
println!("Waiting for response");
31-
rclrs::spin(&node).map_err(|err| err.into())
31+
rclrs::spin(node).map_err(|err| err.into())
3232
}

examples/minimal_client_service/src/minimal_client_async.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use anyhow::{Error, Result};
66
async fn main() -> Result<(), Error> {
77
let context = rclrs::Context::new(env::args())?;
88

9-
let mut node = rclrs::create_node(&context, "minimal_client")?;
9+
let node = rclrs::create_node(&context, "minimal_client")?;
1010

1111
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
1212

@@ -20,7 +20,7 @@ async fn main() -> Result<(), Error> {
2020

2121
println!("Waiting for response");
2222

23-
let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(&node));
23+
let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node));
2424

2525
let response = future.await?;
2626
println!(

examples/minimal_client_service/src/minimal_service.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ fn handle_service(
1515
fn main() -> Result<(), Error> {
1616
let context = rclrs::Context::new(env::args())?;
1717

18-
let mut node = rclrs::create_node(&context, "minimal_service")?;
18+
let node = rclrs::create_node(&context, "minimal_service")?;
1919

2020
let _server = node
2121
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;
2222

2323
println!("Starting server");
24-
rclrs::spin(&node).map_err(|err| err.into())
24+
rclrs::spin(node).map_err(|err| err.into())
2525
}

examples/minimal_pub_sub/src/minimal_subscriber.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::{Error, Result};
55
fn main() -> Result<(), Error> {
66
let context = rclrs::Context::new(env::args())?;
77

8-
let mut node = rclrs::create_node(&context, "minimal_subscriber")?;
8+
let node = rclrs::create_node(&context, "minimal_subscriber")?;
99

1010
let mut num_messages: usize = 0;
1111

@@ -19,5 +19,5 @@ fn main() -> Result<(), Error> {
1919
},
2020
)?;
2121

22-
rclrs::spin(&node).map_err(|err| err.into())
22+
rclrs::spin(node).map_err(|err| err.into())
2323
}

examples/minimal_pub_sub/src/zero_copy_subscriber.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use anyhow::{Error, Result};
55
fn main() -> Result<(), Error> {
66
let context = rclrs::Context::new(env::args())?;
77

8-
let mut node = rclrs::create_node(&context, "minimal_subscriber")?;
8+
let node = rclrs::create_node(&context, "minimal_subscriber")?;
99

1010
let mut num_messages: usize = 0;
1111

@@ -19,5 +19,5 @@ fn main() -> Result<(), Error> {
1919
},
2020
)?;
2121

22-
rclrs::spin(&node).map_err(|err| err.into())
22+
rclrs::spin(node).map_err(|err| err.into())
2323
}

rclrs/src/lib.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod rcl_bindings;
2323
#[cfg(feature = "dyn_msg")]
2424
pub mod dynamic_message;
2525

26+
use std::sync::Arc;
2627
use std::time::Duration;
2728

2829
pub use arguments::*;
@@ -49,8 +50,8 @@ pub use wait::*;
4950
/// This can usually be ignored.
5051
///
5152
/// [1]: crate::RclReturnCode
52-
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
53-
let wait_set = WaitSet::new_for_node(node)?;
53+
pub fn spin_once(node: Arc<Node>, timeout: Option<Duration>) -> Result<(), RclrsError> {
54+
let wait_set = WaitSet::new_for_node(&node)?;
5455
let ready_entities = wait_set.wait(timeout)?;
5556

5657
for ready_subscription in ready_entities.subscriptions {
@@ -71,14 +72,16 @@ pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsErro
7172
/// Convenience function for calling [`spin_once`] in a loop.
7273
///
7374
/// This function additionally checks that the context is still valid.
74-
pub fn spin(node: &Node) -> Result<(), RclrsError> {
75+
pub fn spin(node: Arc<Node>) -> Result<(), RclrsError> {
7576
// The context_is_valid functions exists only to abstract away ROS distro differences
7677
// SAFETY: No preconditions for this function.
77-
let context_is_valid =
78-
|| unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) };
78+
let context_is_valid = {
79+
let node = Arc::clone(&node);
80+
move || unsafe { rcl_context_is_valid(&*node.rcl_context_mtx.lock().unwrap()) }
81+
};
7982

8083
while context_is_valid() {
81-
match spin_once(node, None) {
84+
match spin_once(Arc::clone(&node), None) {
8285
Ok(_)
8386
| Err(RclrsError::RclError {
8487
code: RclReturnCode::Timeout,
@@ -105,8 +108,8 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
105108
/// assert!(node.is_ok());
106109
/// # Ok::<(), RclrsError>(())
107110
/// ```
108-
pub fn create_node(context: &Context, node_name: &str) -> Result<Node, RclrsError> {
109-
Node::builder(context, node_name).build()
111+
pub fn create_node(context: &Context, node_name: &str) -> Result<Arc<Node>, RclrsError> {
112+
Ok(Arc::new(Node::builder(context, node_name).build()?))
110113
}
111114

112115
/// Creates a [`NodeBuilder`][1].

rclrs/src/node.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ unsafe impl Send for rcl_node_t {}
6767
pub struct Node {
6868
pub(crate) rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
6969
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
70-
pub(crate) clients: Vec<Weak<dyn ClientBase>>,
71-
pub(crate) guard_conditions: Vec<Weak<GuardCondition>>,
72-
pub(crate) services: Vec<Weak<dyn ServiceBase>>,
73-
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
70+
pub(crate) clients_mtx: Mutex<Vec<Weak<dyn ClientBase>>>,
71+
pub(crate) guard_conditions_mtx: Mutex<Vec<Weak<GuardCondition>>>,
72+
pub(crate) services_mtx: Mutex<Vec<Weak<dyn ServiceBase>>>,
73+
pub(crate) subscriptions_mtx: Mutex<Vec<Weak<dyn SubscriptionBase>>>,
7474
_parameter_map: ParameterOverrideMap,
7575
}
7676

@@ -180,13 +180,12 @@ impl Node {
180180
///
181181
/// [1]: crate::Client
182182
// TODO: make client's lifetime depend on node's lifetime
183-
pub fn create_client<T>(&mut self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
183+
pub fn create_client<T>(&self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
184184
where
185185
T: rosidl_runtime_rs::Service,
186186
{
187187
let client = Arc::new(Client::<T>::new(Arc::clone(&self.rcl_node_mtx), topic)?);
188-
self.clients
189-
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
188+
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
190189
Ok(client)
191190
}
192191

@@ -199,12 +198,12 @@ impl Node {
199198
///
200199
/// [1]: crate::GuardCondition
201200
/// [2]: crate::spin_once
202-
pub fn create_guard_condition(&mut self) -> Arc<GuardCondition> {
201+
pub fn create_guard_condition(&self) -> Arc<GuardCondition> {
203202
let guard_condition = Arc::new(GuardCondition::new_with_rcl_context(
204203
&mut self.rcl_context_mtx.lock().unwrap(),
205204
None,
206205
));
207-
self.guard_conditions
206+
{ self.guard_conditions_mtx.lock().unwrap() }
208207
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
209208
guard_condition
210209
}
@@ -226,7 +225,7 @@ impl Node {
226225
&mut self.rcl_context_mtx.lock().unwrap(),
227226
Some(Box::new(callback) as Box<dyn Fn() + Send + Sync>),
228227
));
229-
self.guard_conditions
228+
{ self.guard_conditions_mtx.lock().unwrap() }
230229
.push(Arc::downgrade(&guard_condition) as Weak<GuardCondition>);
231230
guard_condition
232231
}
@@ -251,7 +250,7 @@ impl Node {
251250
/// [1]: crate::Service
252251
// TODO: make service's lifetime depend on node's lifetime
253252
pub fn create_service<T, F>(
254-
&mut self,
253+
&self,
255254
topic: &str,
256255
callback: F,
257256
) -> Result<Arc<Service<T>>, RclrsError>
@@ -264,7 +263,7 @@ impl Node {
264263
topic,
265264
callback,
266265
)?);
267-
self.services
266+
{ self.services_mtx.lock().unwrap() }
268267
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
269268
Ok(service)
270269
}
@@ -274,7 +273,7 @@ impl Node {
274273
/// [1]: crate::Subscription
275274
// TODO: make subscription's lifetime depend on node's lifetime
276275
pub fn create_subscription<T, Args>(
277-
&mut self,
276+
&self,
278277
topic: &str,
279278
qos: QoSProfile,
280279
callback: impl SubscriptionCallback<T, Args>,
@@ -288,32 +287,39 @@ impl Node {
288287
qos,
289288
callback,
290289
)?);
291-
self.subscriptions
290+
{ self.subscriptions_mtx.lock() }
291+
.unwrap()
292292
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
293293
Ok(subscription)
294294
}
295295

296296
/// Returns the subscriptions that have not been dropped yet.
297297
pub(crate) fn live_subscriptions(&self) -> Vec<Arc<dyn SubscriptionBase>> {
298-
self.subscriptions
298+
{ self.subscriptions_mtx.lock().unwrap() }
299299
.iter()
300300
.filter_map(Weak::upgrade)
301301
.collect()
302302
}
303303

304304
pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
305-
self.clients.iter().filter_map(Weak::upgrade).collect()
305+
{ self.clients_mtx.lock().unwrap() }
306+
.iter()
307+
.filter_map(Weak::upgrade)
308+
.collect()
306309
}
307310

308311
pub(crate) fn live_guard_conditions(&self) -> Vec<Arc<GuardCondition>> {
309-
self.guard_conditions
312+
{ self.guard_conditions_mtx.lock().unwrap() }
310313
.iter()
311314
.filter_map(Weak::upgrade)
312315
.collect()
313316
}
314317

315318
pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
316-
self.services.iter().filter_map(Weak::upgrade).collect()
319+
{ self.services_mtx.lock().unwrap() }
320+
.iter()
321+
.filter_map(Weak::upgrade)
322+
.collect()
317323
}
318324

319325
/// Returns the ROS domain ID that the node is using.

rclrs/src/node/builder.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,10 @@ impl NodeBuilder {
275275
Ok(Node {
276276
rcl_node_mtx,
277277
rcl_context_mtx: self.context.clone(),
278-
clients: vec![],
279-
guard_conditions: vec![],
280-
services: vec![],
281-
subscriptions: vec![],
278+
clients_mtx: Mutex::new(vec![]),
279+
guard_conditions_mtx: Mutex::new(vec![]),
280+
services_mtx: Mutex::new(vec![]),
281+
subscriptions_mtx: Mutex::new(vec![]),
282282
_parameter_map,
283283
})
284284
}

rclrs_tests/src/graph_tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ fn test_publishers() -> Result<(), RclrsError> {
8585
#[test]
8686
fn test_subscriptions() -> Result<(), RclrsError> {
8787
let namespace = "/test_subscriptions_graph";
88-
let mut graph = construct_test_graph(namespace)?;
88+
let graph = construct_test_graph(namespace)?;
8989

9090
let node_2_empty_subscription = graph.node2.create_subscription::<msg::Empty, _>(
9191
"graph_test_topic_1",
@@ -149,7 +149,7 @@ fn test_subscriptions() -> Result<(), RclrsError> {
149149

150150
#[test]
151151
fn test_topic_names_and_types() -> Result<(), RclrsError> {
152-
let mut graph = construct_test_graph("test_topics_graph")?;
152+
let graph = construct_test_graph("test_topics_graph")?;
153153

154154
let _node_1_defaults_subscription = graph.node1.create_subscription::<msg::Defaults, _>(
155155
"graph_test_topic_3",
@@ -191,7 +191,7 @@ fn test_topic_names_and_types() -> Result<(), RclrsError> {
191191
#[test]
192192
fn test_services() -> Result<(), RclrsError> {
193193
let namespace = "/test_services_graph";
194-
let mut graph = construct_test_graph(namespace)?;
194+
let graph = construct_test_graph(namespace)?;
195195
let check_names_and_types = |names_and_types: TopicNamesAndTypes| {
196196
let types = names_and_types
197197
.get("/test_services_graph/graph_test_topic_4")
@@ -225,7 +225,7 @@ fn test_services() -> Result<(), RclrsError> {
225225
#[test]
226226
fn test_clients() -> Result<(), RclrsError> {
227227
let namespace = "/test_clients_graph";
228-
let mut graph = construct_test_graph(namespace)?;
228+
let graph = construct_test_graph(namespace)?;
229229
let _node_2_empty_client = graph
230230
.node2
231231
.create_client::<srv::Empty>("graph_test_topic_4")?;

0 commit comments

Comments
 (0)