Skip to content

Commit f9fbc4f

Browse files
committed
Broker trait: Pop message returns Option
1 parent 65c8334 commit f9fbc4f

File tree

5 files changed

+14
-11
lines changed

5 files changed

+14
-11
lines changed

src/broker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::Result;
44
pub trait Broker {
55
fn push_message(&self, message: &Message) -> Result<()>;
66

7-
fn pop_message(&self) -> Result<Message>;
7+
fn pop_message(&self) -> Result<Option<Message>>;
88

99
fn store_result(&self, result_message: ResultMessage) -> Result<()>;
1010
}

src/brokers/redis.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ impl Broker for RedisBroker {
2929
Ok(())
3030
}
3131

32-
fn pop_message(&self) -> Result<crate::messages::Message> {
32+
fn pop_message(&self) -> Result<Option<crate::messages::Message>> {
3333
let mut con = self.redis_client.get_connection()?;
3434
let serialized_message: Option<String> = con.rpop(&self.queue, None)?;
3535
match serialized_message {
36-
Some(v) => Ok(serde_json::from_str(&v)?),
37-
None => Err(anyhow::anyhow!("Task queue is empty")),
36+
Some(v) => Ok(Some(serde_json::from_str(&v)?)),
37+
None => Ok(None),
3838
}
3939
}
4040

src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ impl Broker for InMemoryTestBroker {
2929
Ok(())
3030
}
3131

32-
fn pop_message(&self) -> anyhow::Result<Message> {
32+
fn pop_message(&self) -> anyhow::Result<Option<Message>> {
3333
match self
3434
.queue
3535
.write()
3636
.expect("Failed to aquire lock")
3737
.pop_front()
3838
{
39-
Some(message) => Ok(message),
40-
None => Err(anyhow::anyhow!("No messages in queue!")),
39+
Some(message) => Ok(Some(message)),
40+
None => Ok(None),
4141
}
4242
}
4343

src/worker.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ impl<'a, B: Broker + 'static> Worker<'a, B> {
1919

2020
pub fn take_first_task_in_queue(&self) -> Result<(), Error> {
2121
let message = self.app.broker.pop_message()?;
22-
self.app.handle_message(&message)
22+
match message {
23+
Some(m) => self.app.handle_message(&m),
24+
None => Err(anyhow::anyhow!("No messages in queue")),
25+
}
2326
}
2427
}

tests/running_task_from_message_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ impl Broker for InMemoryTestBroker {
3030
Ok(())
3131
}
3232

33-
fn pop_message(&self) -> anyhow::Result<Message> {
33+
fn pop_message(&self) -> anyhow::Result<Option<Message>> {
3434
match self
3535
.queue
3636
.write()
3737
.expect("Failed to aquire lock")
3838
.pop_front()
3939
{
40-
Some(message) => Ok(message),
41-
None => Err(anyhow::anyhow!("No messages in queue!")),
40+
Some(message) => Ok(Some(message)),
41+
None => Ok(None),
4242
}
4343
}
4444

0 commit comments

Comments
 (0)