Skip to content

Commit 52433a0

Browse files
committed
Add the ability to exit the varlink::listen loop.
Factor out the `varlink::listen` parameters into a `varlink::ListenConfig` struct, so we can use ..Default::default(). Add an `stop_listening: Option<Arc<AtomicBool>>` to the `varlink::ListenConfig` struct, which can be set remotely to break the `varlink::listen` loop. Stop the running server after 10 seconds ```rust use std::{thread, time}; use std::sync::atomic::Ordering; let stop_listening = Arc::new(std::sync::atomic::AtomicBool::new(false)); let child = { let stop_running = stop_listening.clone(); thread::spawn(move || { thread::sleep(time::Duration::from_secs(10)); // Stop the running server after 10 seconds stop_listening.store(true, Ordering::Relaxed); }) }; varlink::listen( service, &address, &varlink::ListenConfig { idle_timeout: timeout, stop_listening: Some(stop_listening), ..Default::default() }, )?; child.join().expect("Error joining thread"); ``` Obsoletes varlink/rust#26 Addresses rust-lang#25
1 parent 81463ba commit 52433a0

File tree

8 files changed

+175
-41
lines changed

8 files changed

+175
-41
lines changed

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,4 @@ members = [
1010
"examples/example",
1111
"examples/more",
1212
"examples/ping",
13-
]
14-
13+
]

examples/example/src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,13 @@ fn run_server<S: ?Sized + AsRef<str>>(address: &S, timeout: u64) -> varlink::Res
230230
vec![Box::new(myinterface)],
231231
);
232232

233-
varlink::listen(service, address, 1, 10, timeout)?;
233+
varlink::listen(
234+
service,
235+
address,
236+
&varlink::ListenConfig {
237+
idle_timeout: timeout,
238+
..Default::default()
239+
},
240+
)?;
234241
Ok(())
235242
}

examples/more/src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,13 @@ fn run_server(address: &str, timeout: u64, sleep_duration: u64) -> varlink::Resu
216216
"http://varlink.org",
217217
vec![Box::new(myinterface)],
218218
);
219-
varlink::listen(service, &address, 1, 10, timeout)?;
219+
varlink::listen(
220+
service,
221+
&address,
222+
&varlink::ListenConfig {
223+
idle_timeout: timeout,
224+
..Default::default()
225+
},
226+
)?;
220227
Ok(())
221228
}

examples/ping/src/main.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ fn main() {
2929
opts.optflag("", "client", "run in client mode");
3030
opts.optopt("", "bridge", "bridge", "<bridge>");
3131
opts.optflag("m", "multiplex", "run in multiplex mode");
32+
opts.optopt("t", "timeout", "server timeout", "<seconds>");
3233
opts.optflag("h", "help", "print this help menu");
3334

3435
let matches = match opts.parse(&args[1..]) {
@@ -62,9 +63,17 @@ fn main() {
6263
};
6364
run_client(&connection).map_err(|e| e.into())
6465
} else if let Some(address) = matches.opt_str("varlink") {
65-
run_server(&address, 1000, matches.opt_present("m"))
66-
// .map_err(mstrerr!("running server with address {}", address))
67-
.map_err(|e| e.into())
66+
let timeout = matches
67+
.opt_str("timeout")
68+
.unwrap_or("0".to_string())
69+
.parse::<u64>()
70+
.map_err(From::from);
71+
72+
timeout.and_then(|timeout| {
73+
run_server(&address, timeout, matches.opt_present("m"))
74+
// .map_err(mstrerr!("running server with address {}", address))
75+
.map_err(From::from)
76+
})
6877
} else {
6978
print_usage(&program, &opts);
7079
eprintln!("Need varlink address in server mode.");
@@ -510,7 +519,30 @@ fn run_server(address: &str, timeout: u64, multiplex: bool) -> varlink::Result<(
510519
// Demonstrate a single process, single-threaded service
511520
multiplex::listen_multiplex(service, &address, timeout)?;
512521
} else {
513-
varlink::listen(service, &address, 1, 10, timeout)?;
522+
/*
523+
use std::sync::atomic::Ordering;
524+
use std::{thread, time};
525+
let stop_listening = Arc::new(std::sync::atomic::AtomicBool::new(false));
526+
527+
let child = {
528+
let stop_running = stop_listening.clone();
529+
thread::spawn(move || {
530+
thread::sleep(time::Duration::from_secs(10));
531+
stop_running.store(true, Ordering::Relaxed);
532+
})
533+
};
534+
*/
535+
varlink::listen(
536+
service,
537+
&address,
538+
&varlink::ListenConfig {
539+
idle_timeout: timeout,
540+
// stop_listening: Some(stop_listening),
541+
..Default::default()
542+
},
543+
)?;
544+
545+
//child.join().expect("Error joining thread");
514546
}
515547
}
516548
Ok(())

varlink-certification/src/main.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,14 @@ pub fn run_server(address: &str, timeout: u64) -> varlink::Result<()> {
784784
vec![Box::new(myinterface)],
785785
);
786786

787-
if let Err(e) = varlink::listen(service, &address, 1, 10, timeout) {
787+
if let Err(e) = varlink::listen(
788+
service,
789+
&address,
790+
&varlink::ListenConfig {
791+
idle_timeout: timeout,
792+
..Default::default()
793+
},
794+
) {
788795
match e.kind() {
789796
::varlink::ErrorKind::Timeout => {}
790797
_ => Err(e)?,

varlink/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,12 @@
170170
//! ],
171171
//! );
172172
//!
173-
//! varlink::listen(service, &args[1], 1, 10, 0);
173+
//! varlink::listen(service, &args[1],
174+
//! &varlink::ListenConfig {
175+
//! idle_timeout: 1,
176+
//! ..Default::default()
177+
//! },
178+
//! );
174179
//! # }
175180
//! # fn main() {}
176181
//! ```
@@ -242,7 +247,7 @@ pub use crate::stream::Stream;
242247
pub type VarlinkStream = Box<dyn Stream>;
243248
pub type ServerStream = Box<dyn Stream>;
244249

245-
pub use crate::server::{listen, Listener};
250+
pub use crate::server::{listen, ListenConfig, Listener};
246251

247252
#[macro_use]
248253
pub mod error;

varlink/src/server.rs

Lines changed: 98 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use std::os::unix::net::{UnixListener, UnixStream};
1414
#[cfg(windows)]
1515
use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
1616
use std::process;
17-
use std::sync::{mpsc, Arc, Mutex, RwLock};
17+
use std::sync::{
18+
atomic::{AtomicBool, Ordering},
19+
mpsc, Arc, Mutex, RwLock,
20+
};
1821

1922
#[cfg(windows)]
2023
use uds_windows::UnixListener;
@@ -166,6 +169,11 @@ impl Listener {
166169
self.as_raw_socket()
167170
.ok_or_else(|| context!(ErrorKind::ConnectionClosed))? as usize;
168171

172+
let mut timeout = timeval {
173+
tv_sec: (timeout / 1000u64) as _,
174+
tv_usec: ((timeout % 1000u64) * 1000u64) as _,
175+
};
176+
169177
unsafe {
170178
let mut readfs: fd_set = mem::MaybeUninit::zeroed().assume_init();
171179
loop {
@@ -174,10 +182,6 @@ impl Listener {
174182

175183
let mut writefds = mem::MaybeUninit::zeroed();
176184
let mut errorfds = mem::MaybeUninit::zeroed();
177-
let mut timeout = timeval {
178-
tv_sec: timeout as i32,
179-
tv_usec: 0,
180-
};
181185

182186
let ret = select(
183187
0,
@@ -212,13 +216,18 @@ impl Listener {
212216

213217
#[cfg(unix)]
214218
pub fn accept(&self, timeout: u64) -> Result<Box<dyn Stream>> {
215-
use libc::{fd_set, select, time_t, timeval, EAGAIN, EINTR, FD_ISSET, FD_SET, FD_ZERO};
219+
use libc::{fd_set, select, timeval, EAGAIN, EINTR, FD_ISSET, FD_SET, FD_ZERO};
216220

217221
if timeout > 0 {
218222
let fd = self
219223
.as_raw_fd()
220224
.ok_or_else(|| context!(ErrorKind::ConnectionClosed))?;
221225

226+
let mut timeout = timeval {
227+
tv_sec: (timeout / 1000u64) as _,
228+
tv_usec: ((timeout % 1000u64) * 1000u64) as _,
229+
};
230+
222231
unsafe {
223232
let mut readfs = mem::MaybeUninit::<fd_set>::uninit();
224233
loop {
@@ -230,10 +239,6 @@ impl Listener {
230239
let mut errorfds = mem::MaybeUninit::<fd_set>::uninit();
231240
FD_ZERO(errorfds.as_mut_ptr());
232241
errorfds.assume_init();
233-
let mut timeout = timeval {
234-
tv_sec: timeout as time_t,
235-
tv_usec: 0,
236-
};
237242

238243
FD_SET(fd, readfs.as_mut_ptr());
239244
let ret = select(
@@ -458,9 +463,44 @@ impl Worker {
458463
}
459464
}
460465

466+
/// `ListenConfig` specifies the configuration parameters for [`varlink::listen`]
467+
///
468+
/// Examples:
469+
///
470+
/// ```rust
471+
/// let l = varlink::ListenConfig::default();
472+
/// assert_eq!(l.initial_worker_threads, 1);
473+
/// assert_eq!(l.max_worker_threads, 100);
474+
/// assert_eq!(l.idle_timeout, 0);
475+
/// assert!(l.stop_listening.is_none());
476+
/// ```
477+
///
478+
/// [`varlink::listen`]: fn.listen.html
479+
pub struct ListenConfig {
480+
/// The amount of initial worker threads
481+
pub initial_worker_threads: usize,
482+
/// The maximum amount of worker threads
483+
pub max_worker_threads: usize,
484+
/// Time in seconds for the server to quit, when it is idle
485+
pub idle_timeout: u64,
486+
/// An optional AtomicBool as a global flag, which lets the server stop accepting new connections, when set to `true`
487+
pub stop_listening: Option<Arc<AtomicBool>>,
488+
}
489+
490+
impl Default for ListenConfig {
491+
fn default() -> Self {
492+
ListenConfig {
493+
initial_worker_threads: 1,
494+
max_worker_threads: 100,
495+
idle_timeout: 0,
496+
stop_listening: None,
497+
}
498+
}
499+
}
500+
461501
/// `listen` creates a server, with `num_worker` threads listening on `varlink_uri`.
462502
///
463-
/// If an `idle_timeout` != 0 is specified, this function returns after the specified
503+
/// If an `listen_config.idle_timeout` != 0 is specified, this function returns after the specified
464504
/// amount of seconds, if no new connection is made in that time frame. It still waits for
465505
/// all pending connections to finish.
466506
///
@@ -477,7 +517,14 @@ impl Worker {
477517
/// vec![/* Your varlink interfaces go here */],
478518
/// );
479519
///
480-
/// if let Err(e) = varlink::listen(service, "unix:test_listen_timeout", 1, 10, 1) {
520+
/// if let Err(e) = varlink::listen(
521+
/// service,
522+
/// "unix:test_listen_timeout",
523+
/// &varlink::ListenConfig {
524+
/// idle_timeout: 1,
525+
/// ..Default::default()
526+
/// },
527+
/// ) {
481528
/// if *e.kind() != varlink::ErrorKind::Timeout {
482529
/// panic!("Error listen: {:?}", e);
483530
/// }
@@ -489,31 +536,55 @@ impl Worker {
489536
pub fn listen<S: ?Sized + AsRef<str>, H: crate::ConnectionHandler + Send + Sync + 'static>(
490537
handler: H,
491538
address: &S,
492-
initial_worker_threads: usize,
493-
max_worker_threads: usize,
494-
idle_timeout: u64,
539+
listen_config: &ListenConfig,
495540
) -> Result<()> {
496541
let handler = Arc::new(handler);
497542
let listener = Listener::new(address)?;
498543

499544
listener.set_nonblocking(false)?;
500545

501-
let mut pool = ThreadPool::new(initial_worker_threads, max_worker_threads);
546+
let mut pool = ThreadPool::new(
547+
listen_config.initial_worker_threads,
548+
listen_config.max_worker_threads,
549+
);
502550

503551
loop {
504-
let mut stream = match listener.accept(idle_timeout) {
505-
Err(e) => match e.kind() {
506-
ErrorKind::Timeout => {
507-
if pool.num_busy() == 0 {
552+
let mut to_wait = listen_config.idle_timeout * 1000;
553+
let wait_time = listen_config
554+
.stop_listening
555+
.as_ref()
556+
.map(|_| 100)
557+
.unwrap_or(to_wait);
558+
let mut stream = loop {
559+
match listener.accept(wait_time) {
560+
Err(e) => match e.kind() {
561+
ErrorKind::Timeout => {
562+
if let Some(stop) = listen_config.stop_listening.as_ref() {
563+
if stop.load(Ordering::SeqCst) {
564+
return Ok(());
565+
}
566+
if listen_config.idle_timeout == 0 {
567+
continue;
568+
}
569+
}
570+
571+
if to_wait <= wait_time {
572+
if pool.num_busy() == 0 {
573+
return Err(e);
574+
}
575+
to_wait = listen_config.idle_timeout * 1000;
576+
} else {
577+
to_wait -= wait_time;
578+
}
579+
580+
continue;
581+
}
582+
_ => {
508583
return Err(e);
509584
}
510-
continue;
511-
}
512-
_ => {
513-
return Err(e);
514-
}
515-
},
516-
r => r?,
585+
},
586+
r => break r?,
587+
}
517588
};
518589
let handler = handler.clone();
519590

varlink/src/test.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,14 @@ fn test_listen() -> Result<()> {
1313
vec![], // Your varlink interfaces go here
1414
);
1515

16-
if let Err(e) = listen(service, &address, 1, 10, timeout) {
16+
if let Err(e) = listen(
17+
service,
18+
&address,
19+
&ListenConfig {
20+
idle_timeout: timeout,
21+
..Default::default()
22+
},
23+
) {
1724
if *e.kind() != ErrorKind::Timeout {
1825
panic!("Error listen: {:#?}", e);
1926
}
@@ -187,8 +194,7 @@ fn test_handle() -> Result<()> {
187194

188195
let reply = from_slice::<Reply>(&w).unwrap();
189196

190-
let si =
191-
from_value::<ServiceInfo>(reply.parameters.unwrap()).map_err(map_context!())?;
197+
let si = from_value::<ServiceInfo>(reply.parameters.unwrap()).map_err(map_context!())?;
192198

193199
assert_eq!(
194200
si,

0 commit comments

Comments
 (0)