Skip to content

Commit 8a720fa

Browse files
authored
Tagged pubsub (#258)
* Unwrap tagged events in pubsub * ignore metals * move unwrapping of Tagged payloads into PubSub
1 parent 8cc6579 commit 8a720fa

File tree

3 files changed

+24
-11
lines changed

3 files changed

+24
-11
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ target/
1515
ext-lib-src/
1616
.classpath_nb
1717
.bsp
18+
metals.sbt

core/src/main/scala/akka/persistence/r2dbc/internal/PubSub.scala

+11-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import akka.actor.typed.pubsub.Topic
1717
import akka.annotation.InternalApi
1818
import akka.persistence.Persistence
1919
import akka.persistence.PersistentRepr
20+
import akka.persistence.journal.Tagged
2021
import akka.persistence.query.TimestampOffset
2122
import akka.persistence.query.typed.EventEnvelope
2223
import akka.persistence.typed.PersistenceId
@@ -55,11 +56,20 @@ import akka.persistence.typed.PersistenceId
5556
val slice = persistenceExt.sliceForPersistenceId(pid)
5657

5758
val offset = TimestampOffset(timestamp, timestamp, Map(pid -> pr.sequenceNr))
59+
val payload =
60+
pr.payload match {
61+
case Tagged(payload, _) =>
62+
// eventsByTag not implemented (see issue #82), but events can still be tagged, so we unwrap this tagged event.
63+
payload
64+
65+
case other => other
66+
}
67+
5868
val envelope = new EventEnvelope(
5969
offset,
6070
pid,
6171
pr.sequenceNr,
62-
Option(pr.payload),
72+
Option(payload),
6373
timestamp.toEpochMilli,
6474
pr.metadata,
6575
entityType,

core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala

+12-10
Original file line numberDiff line numberDiff line change
@@ -183,18 +183,20 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends
183183
writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic)
184184
}
185185

186-
private def publish(messages: immutable.Seq[AtomicWrite], dbTimestamp: Future[Instant]): Future[Done] = {
187-
if (pubSub.isDefined) {
188-
dbTimestamp.map { timestamp =>
189-
pubSub.foreach { p =>
190-
messages.iterator.flatMap(_.payload.iterator).foreach(pr => p.publish(pr, timestamp))
186+
private def publish(messages: immutable.Seq[AtomicWrite], dbTimestamp: Future[Instant]): Future[Done] =
187+
pubSub match {
188+
case Some(ps) =>
189+
dbTimestamp.map { timestamp =>
190+
messages.iterator
191+
.flatMap(_.payload.iterator)
192+
.foreach(pr => ps.publish(pr, timestamp))
193+
194+
Done
191195
}
192-
Done
193-
}
194-
} else {
195-
dbTimestamp.map(_ => Done)(ExecutionContexts.parasitic)
196+
197+
case None =>
198+
dbTimestamp.map(_ => Done)(ExecutionContexts.parasitic)
196199
}
197-
}
198200

199201
private def logEventsByTagsNotImplemented(): Unit = {
200202
if (!eventsByTagNotImplementedLogged) {

0 commit comments

Comments
 (0)