Skip to content

Commit 65c8334

Browse files
committed
Redis broker impl, simple example with Redis
Simple implementation of using Redis as a message broker. Together with a simple example application using the Redis broker.
1 parent 11f3bde commit 65c8334

File tree

7 files changed

+218
-0
lines changed

7 files changed

+218
-0
lines changed

Cargo.lock

Lines changed: 109 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ serde = { version = "1.0", features = ["derive"] }
1616
serde_json = "1.0"
1717
ulid = "1.0"
1818
anyhow = "1.0.71"
19+
redis = "0.23"

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,13 @@
22

33
A Rust library for distributed, asynchronous computing using a task queue.
44
Inspired by [Celery](https://docs.celeryq.dev/en/stable/).
5+
6+
## Examples
7+
8+
### `simple_redis_queue`
9+
10+
Spin up a Redis instance and pass the connection URL as the first argument.
11+
12+
```sh
13+
cargo r --example simple_redis_queue 'redis://localhost:6379/1'
14+
```

examples/simple_redis_queue.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use anyhow;
2+
use parsnip::{self, brokers::redis::RedisBroker, task::Signature, task::Task, worker::Worker};
3+
use std::env;
4+
5+
struct HelloWorldTask {
6+
called_with_signature: Signature<Self>,
7+
}
8+
9+
impl Task for HelloWorldTask {
10+
type ArgumentType = ();
11+
type ReturnType = ();
12+
13+
const ID: &'static str = "HelloWorldTask";
14+
15+
fn from_signature(signature: Signature<Self>) -> Self {
16+
Self {
17+
called_with_signature: signature,
18+
}
19+
}
20+
21+
fn run(_: &Self::ArgumentType) -> Self::ReturnType {
22+
println!("Hello, World!");
23+
}
24+
25+
fn signature(&self) -> &Signature<Self> {
26+
&self.called_with_signature
27+
}
28+
}
29+
30+
fn main() -> Result<(), anyhow::Error> {
31+
let connect_url: String = env::args()
32+
.nth(1)
33+
.expect("No Redis connect URL passed the first an argument");
34+
35+
let mut broker = RedisBroker::new(&connect_url)?;
36+
37+
let mut app = parsnip::App::new(&mut broker);
38+
app.register_task::<HelloWorldTask>();
39+
40+
let worker = Worker::new(&app);
41+
42+
app.queue_task::<HelloWorldTask>(())?;
43+
worker.take_first_task_in_queue()?;
44+
45+
Ok(())
46+
}

src/brokers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod redis;

src/brokers/redis.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use crate::broker::Broker;
2+
3+
use anyhow::Result;
4+
use redis::{self, Commands};
5+
use serde_json;
6+
7+
pub struct RedisBroker {
8+
redis_client: redis::Client,
9+
queue: String,
10+
result_hash_map: String,
11+
}
12+
13+
impl RedisBroker {
14+
pub fn new(connect_url: &str) -> Result<Self> {
15+
let redis_client = redis::Client::open(connect_url)?;
16+
17+
Ok(Self {
18+
redis_client,
19+
queue: "parsnip_queue".to_string(),
20+
result_hash_map: "parsnip_task_results".to_string(),
21+
})
22+
}
23+
}
24+
25+
impl Broker for RedisBroker {
26+
fn push_message(&self, message: &crate::messages::Message) -> Result<()> {
27+
let mut con = self.redis_client.get_connection()?;
28+
con.lpush(&self.queue, serde_json::to_string(&message)?)?;
29+
Ok(())
30+
}
31+
32+
fn pop_message(&self) -> Result<crate::messages::Message> {
33+
let mut con = self.redis_client.get_connection()?;
34+
let serialized_message: Option<String> = con.rpop(&self.queue, None)?;
35+
match serialized_message {
36+
Some(v) => Ok(serde_json::from_str(&v)?),
37+
None => Err(anyhow::anyhow!("Task queue is empty")),
38+
}
39+
}
40+
41+
fn store_result(&self, result_message: crate::messages::ResultMessage) -> Result<()> {
42+
let mut con = self.redis_client.get_connection()?;
43+
con.hset(
44+
&self.result_hash_map,
45+
&result_message.signature_id,
46+
serde_json::to_string(&result_message)?,
47+
)?;
48+
Ok(())
49+
}
50+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod broker;
2+
pub mod brokers;
23
pub mod messages;
34
mod runner;
45
pub mod task;

0 commit comments

Comments
 (0)