Skip to content

Commit e7e5cc9

Browse files
committed
Worker listening loop, worker commands
Implement a message listening loop where the worker checks for an executes messages. Also implement commands that can be queued for the workers, like a command to stop it processing. Update the simple example accordingly.
1 parent 117f140 commit e7e5cc9

File tree

8 files changed

+400
-35
lines changed

8 files changed

+400
-35
lines changed

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,25 @@ Spin up a Redis instance and pass the connection URL as the first argument.
1212
```sh
1313
cargo r --example simple_redis_queue 'redis://localhost:6379/1'
1414
```
15+
16+
This example uses two threads: a "worker thread" and a "controller thread". The
17+
controller queues tasks and commands that the worker picks up and executes. In
18+
the example code the controller queues a single tasks to print "Hello, World!"
19+
and return 42 and then waits for the worker to finish executing it. It then
20+
commands the worker to stop before terminating. The worker thread listens for
21+
messages and commands and executes them until it gets a stop command. It then
22+
terminates. A typical run looks something like this:
23+
24+
```
25+
Controller thread: Queueing task
26+
Controller thread: Polling for task run result
27+
Worker thread: Registered worker with ID 01H3FV14GHSWEB48PPDB4Q506S
28+
Worker thread: Listening for messages...
29+
Hello, World!
30+
Controller thread: Polling for task run result
31+
Controller thread: Got task run result, 42
32+
Controller thread: Looking up all registered workers
33+
Controller thread: Sending command to stop worker 01H3FV14GHSWEB48PPDB4Q506S
34+
Controller thread: Done
35+
Worker thread: Done
36+
```

examples/simple_redis_queue.rs

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
use anyhow;
2-
use parsnip::{self, brokers::redis::RedisBroker, task::Signature, task::Task, worker::Worker};
3-
use std::env;
2+
use parsnip::{
3+
self, brokers::redis::RedisBroker, messages::Command, task::Signature, task::Task,
4+
worker::Worker,
5+
};
6+
use std::{env, thread, time};
47

58
struct HelloWorldTask {
69
called_with_signature: Signature<Self>,
710
}
811

912
impl Task for HelloWorldTask {
1013
type ArgumentType = ();
11-
type ReturnType = ();
14+
type ReturnType = usize;
1215

1316
const ID: &'static str = "HelloWorldTask";
1417

@@ -20,27 +23,77 @@ impl Task for HelloWorldTask {
2023

2124
fn run(_: &Self::ArgumentType) -> Self::ReturnType {
2225
println!("Hello, World!");
26+
42
2327
}
2428

2529
fn signature(&self) -> &Signature<Self> {
2630
&self.called_with_signature
2731
}
2832
}
2933

30-
fn main() -> Result<(), anyhow::Error> {
31-
let connect_url: String = env::args()
32-
.nth(1)
33-
.expect("No Redis connect URL passed the first an argument");
34+
fn controller_main(connect_url: String) -> () {
35+
let mut broker = RedisBroker::new(&connect_url).expect("Can not connect to Redis");
36+
let mut app = parsnip::App::new(&mut broker);
37+
app.register_task::<HelloWorldTask>();
38+
39+
println!("Controller thread: Queueing task");
40+
let signature_id = app.queue_task::<HelloWorldTask>(()).unwrap();
41+
42+
println!("Controller thread: Polling for task run result");
43+
let mut result = app.get_task_result(&signature_id).unwrap();
44+
while result.is_none() {
45+
thread::sleep(time::Duration::from_millis(500));
46+
println!("Controller thread: Polling for task run result");
47+
result = app.get_task_result(&signature_id).unwrap();
48+
}
3449

35-
let mut broker = RedisBroker::new(&connect_url)?;
50+
println!(
51+
"Controller thread: Got task run result, {}",
52+
result.unwrap().result
53+
);
3654

55+
println!("Controller thread: Looking up all registered workers");
56+
match app.list_workers().unwrap() {
57+
None => panic!(
58+
"The task was picked up and run by a worker, but none are registered. This is a bug."
59+
),
60+
Some(v) => v.into_iter().for_each(|worker| {
61+
println!(
62+
"Controller thread: Sending command to stop worker {}",
63+
&worker.id
64+
);
65+
app.queue_command(&Command::StopWorker, &worker.id).unwrap();
66+
}),
67+
};
68+
println!("Controller thread: Done");
69+
}
70+
71+
fn worker_main(connect_url: String) -> () {
72+
let mut broker = RedisBroker::new(&connect_url).expect("Can not connect to Redis");
3773
let mut app = parsnip::App::new(&mut broker);
3874
app.register_task::<HelloWorldTask>();
3975

40-
let worker = Worker::new(&app);
76+
let worker = Worker::new(&app).expect("Worker initalization failed");
77+
println!("Worker thread: Registered worker with ID {}", worker.id);
78+
println!("Worker thread: Listening for messages...");
79+
worker
80+
.listen_for_messages()
81+
.expect("Malfunction while handling messages.");
82+
println!("Worker thread: Done")
83+
}
84+
85+
fn main() -> Result<(), anyhow::Error> {
86+
let connect_url: String = env::args()
87+
.nth(1)
88+
.expect("No Redis connect URL passed the first an argument");
89+
90+
let connect_url_controller = connect_url.clone();
91+
let handle_controller_thread = thread::spawn(|| controller_main(connect_url_controller));
4192

42-
app.queue_task::<HelloWorldTask>(())?;
43-
worker.take_first_task_in_queue()?;
93+
let connect_url_worker = connect_url.clone();
94+
let handler_worker_thread = thread::spawn(|| worker_main(connect_url_worker));
4495

96+
handle_controller_thread.join().unwrap();
97+
handler_worker_thread.join().unwrap();
4598
Ok(())
4699
}

src/broker.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,38 @@
1-
use super::messages::{Message, ResultMessage};
1+
use super::messages::{Command, Message, ResultMessage};
22
use anyhow::Result;
3+
use serde::{Deserialize, Serialize};
4+
5+
#[derive(Serialize, Deserialize, Clone)]
6+
pub enum WorkerState {
7+
Pending,
8+
Running,
9+
Stopped,
10+
}
11+
12+
#[derive(Serialize, Deserialize, Clone)]
13+
pub struct WorkerInfo {
14+
pub state: WorkerState,
15+
pub id: String,
16+
}
317

418
pub trait Broker {
519
fn push_message(&self, message: &Message) -> Result<()>;
620

721
fn pop_message(&self) -> Result<Option<Message>>;
822

23+
fn push_command(&self, command: &Command, worker_id: &str) -> Result<()>;
24+
25+
fn pop_command(&self, worker_id: &str) -> Result<Option<Command>>;
26+
927
fn store_result(&self, result_message: ResultMessage) -> Result<()>;
28+
29+
fn get_result(&self, signature_id: &str) -> Result<Option<ResultMessage>>;
30+
31+
fn update_worker_info(&self, info: WorkerInfo) -> Result<()>;
32+
33+
fn remove_worker_info(&self, worker_id: &str) -> Result<()>;
34+
35+
fn get_worker_info(&self, worker_id: &str) -> Result<Option<WorkerInfo>>;
36+
37+
fn all_workers(&self) -> Result<Option<Vec<WorkerInfo>>>;
1038
}

src/brokers/redis.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::broker::Broker;
1+
use crate::broker::{Broker, WorkerInfo};
22

33
use anyhow::Result;
44
use redis::{self, Commands};
@@ -7,7 +7,9 @@ use serde_json;
77
pub struct RedisBroker {
88
redis_client: redis::Client,
99
queue: String,
10+
command_queue_prefix: String,
1011
result_hash_map: String,
12+
worker_register: String,
1113
}
1214

1315
impl RedisBroker {
@@ -17,7 +19,9 @@ impl RedisBroker {
1719
Ok(Self {
1820
redis_client,
1921
queue: "parsnip_queue".to_string(),
22+
command_queue_prefix: "parsnip_command_queue".to_string(),
2023
result_hash_map: "parsnip_task_results".to_string(),
24+
worker_register: "worker_register".to_string(),
2125
})
2226
}
2327
}
@@ -47,4 +51,69 @@ impl Broker for RedisBroker {
4751
)?;
4852
Ok(())
4953
}
54+
55+
fn get_result(&self, signature_id: &str) -> Result<Option<crate::messages::ResultMessage>> {
56+
let mut con = self.redis_client.get_connection()?;
57+
let serialized_result: Option<String> = con.hget(&self.result_hash_map, signature_id)?;
58+
serialized_result.map_or(Ok(None), |v| {
59+
serde_json::from_str(&v).map_err(|e| anyhow::anyhow!("{}", e))
60+
})
61+
}
62+
63+
fn push_command(&self, command: &crate::messages::Command, worker_id: &str) -> Result<()> {
64+
let mut con = self.redis_client.get_connection()?;
65+
con.lpush(
66+
&format!("{}_{}", self.command_queue_prefix, worker_id),
67+
serde_json::to_string(&command)?,
68+
)?;
69+
Ok(())
70+
}
71+
72+
fn pop_command(&self, worker_id: &str) -> Result<Option<crate::messages::Command>> {
73+
let mut con = self.redis_client.get_connection()?;
74+
let serialized_command: Option<String> = con.rpop(
75+
&format!("{}_{}", self.command_queue_prefix, worker_id),
76+
None,
77+
)?;
78+
match serialized_command {
79+
Some(v) => Ok(Some(serde_json::from_str(&v)?)),
80+
None => Ok(None),
81+
}
82+
}
83+
84+
fn update_worker_info(&self, info: WorkerInfo) -> Result<()> {
85+
let mut con = self.redis_client.get_connection()?;
86+
con.hset(
87+
&self.worker_register,
88+
&info.id,
89+
serde_json::to_string(&info)?,
90+
)?;
91+
Ok(())
92+
}
93+
94+
fn get_worker_info(&self, worker_id: &str) -> Result<Option<WorkerInfo>> {
95+
let mut con = self.redis_client.get_connection()?;
96+
let serialized_info: Option<String> = con.hget(&self.worker_register, worker_id)?;
97+
serialized_info.map_or(Ok(None), |v| {
98+
serde_json::from_str(&v).map_err(|e| anyhow::anyhow!("{}", e))
99+
})
100+
}
101+
102+
fn remove_worker_info(&self, worker_id: &str) -> Result<()> {
103+
let mut con = self.redis_client.get_connection()?;
104+
con.hdel(&self.worker_register, worker_id)
105+
.map_err(|e| anyhow::anyhow!("{}", e))
106+
}
107+
108+
fn all_workers(&self) -> Result<Option<Vec<WorkerInfo>>> {
109+
let mut con = self.redis_client.get_connection()?;
110+
111+
let serialized_info: Option<Vec<String>> = con.hvals(&self.worker_register)?;
112+
serialized_info.map_or(Ok(None), |info_vec| {
113+
info_vec
114+
.iter()
115+
.map(|v| serde_json::from_str(v).map_err(|e| anyhow::anyhow!("{}", e)))
116+
.collect()
117+
})
118+
}
50119
}

src/lib.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ mod runner;
55
pub mod task;
66
pub mod worker;
77

8-
use broker::Broker;
9-
use messages::{Message, ResultMessage};
8+
use broker::{Broker, WorkerInfo};
9+
use messages::{Command, Message, ResultMessage};
1010
use runner::TaskRunnerBuilder;
1111
use task::{Signature, Task};
1212

@@ -33,25 +33,46 @@ impl<'a, B: Broker + 'static> App<'a, B> {
3333
.insert(T::ID.into(), Box::new(runner::build_task_runner::<T, B>));
3434
}
3535

36-
pub fn queue_task<T: Task + 'static>(&self, arg: T::ArgumentType) -> Result<(), Error> {
36+
/// Queue a task for pickup by a worker.
37+
///
38+
/// Returns the signature_id for the task invocation, which can be used to
39+
/// lookup the result of running the task.
40+
pub fn queue_task<T: Task + 'static>(&self, arg: T::ArgumentType) -> Result<String, Error> {
3741
if !self.task_runner_builders.contains_key(T::ID.into()) {
3842
anyhow::bail!(
3943
"Can not queue task with ID '{}' as it is not registered.",
4044
T::ID
4145
);
4246
}
4347

48+
let signature_id = Ulid::new().to_string();
4449
let signature = Signature::<T> {
4550
arg,
46-
id: Ulid::new().to_string(),
51+
id: signature_id.clone(),
4752
};
4853
self.broker
4954
.push_message(&Message {
5055
task_id: T::ID.into(),
5156
signature: serde_json::to_string(&signature)?,
5257
})
5358
.context("Failed to put task invocation on the queue.")?;
54-
Ok(())
59+
Ok(signature_id)
60+
}
61+
62+
pub fn get_task_result(&self, signatrue_id: &str) -> Result<Option<ResultMessage>, Error> {
63+
self.broker.get_result(signatrue_id)
64+
}
65+
66+
pub fn queue_command(&self, command: &Command, worker_id: &str) -> Result<(), Error> {
67+
self.broker.push_command(command, worker_id)
68+
}
69+
70+
pub fn get_worker_info(&self, worker_id: &str) -> Result<Option<WorkerInfo>, Error> {
71+
self.broker.get_worker_info(worker_id)
72+
}
73+
74+
pub fn list_workers(&self) -> Result<Option<Vec<WorkerInfo>>, Error> {
75+
self.broker.all_workers()
5576
}
5677

5778
fn handle_message(&self, message: &Message) -> Result<(), Error> {
@@ -69,7 +90,14 @@ impl<'a, B: Broker + 'static> App<'a, B> {
6990
}
7091

7192
fn store_task_result(&self, result: ResultMessage) -> Result<(), Error> {
72-
self.broker.store_result(result)?;
73-
Ok(())
93+
self.broker.store_result(result)
94+
}
95+
96+
fn update_worker_info(&self, info: WorkerInfo) -> Result<(), Error> {
97+
self.broker.update_worker_info(info)
98+
}
99+
100+
fn remove_worker_info(&self, worker_id: &str) -> Result<(), Error> {
101+
self.broker.remove_worker_info(worker_id)
74102
}
75103
}

src/messages.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,8 @@ pub struct ResultMessage {
1111
pub signature_id: String,
1212
pub result: String,
1313
}
14+
15+
#[derive(Serialize, Deserialize, Clone)]
16+
pub enum Command {
17+
StopWorker,
18+
}

0 commit comments

Comments
 (0)