|
| 1 | +package akka.persistence |
| 2 | + |
| 3 | +import scala.util.control.NonFatal |
| 4 | +import scala.concurrent.duration._ |
| 5 | + |
| 6 | +import org.reactivestreams.api.Producer |
| 7 | +import org.reactivestreams.spi.Subscriber |
| 8 | + |
| 9 | +import akka.actor._ |
| 10 | +import akka.stream._ |
| 11 | +import akka.stream.impl._ |
| 12 | +import akka.stream.impl.Ast.ProducerNode |
| 13 | +import akka.stream.scaladsl.Flow |
| 14 | + |
| 15 | +object PersistentFlow { |
| 16 | + /** |
| 17 | + * Starts a new [[Persistent]] message flow from the given processor, |
| 18 | + * identified by `processorId`. Elements are pulled from the processor's |
| 19 | + * journal (using a [[View]]) in accordance with the demand coming from |
| 20 | + * the downstream transformation steps. |
| 21 | + * |
| 22 | + * Elements pulled from the processor's journal are buffered in memory so that |
| 23 | + * fine-grained demands (requests) from downstream can be served efficiently. |
| 24 | + * Reads from the journal are done in (coarse-grained) batches of configurable |
| 25 | + * size (which correspond to the configurable maximum buffer size). |
| 26 | + * |
| 27 | + * @see [[ProducerSettings]] |
| 28 | + */ |
| 29 | + def fromProcessor(processorId: String, producerSettings: ProducerSettings = ProducerSettings()): Flow[Persistent] = |
| 30 | + FlowImpl(ViewProducerNode(processorId, producerSettings), Nil) |
| 31 | +} |
| 32 | + |
| 33 | +/** |
| 34 | + * Configuration object for a [[Persistent]] stream producer. |
| 35 | + * |
| 36 | + * @param maxBufferSize maximum number of persistent messages to be buffered in memory (per producer). |
| 37 | + */ |
| 38 | +case class ProducerSettings(maxBufferSize: Int = 20) |
| 39 | + |
| 40 | +private[akka] object ViewProducer { |
| 41 | + def props(processorId: String, producerSettings: ProducerSettings, settings: MaterializerSettings): Props = |
| 42 | + Props(classOf[ViewProducerImpl], processorId, producerSettings, settings) |
| 43 | +} |
| 44 | + |
| 45 | +private[akka] case class ViewProducerNode(processorId: String, producerSettings: ProducerSettings) extends ProducerNode[Persistent] { |
| 46 | + def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[Persistent] = |
| 47 | + new ActorProducer(context.actorOf(ViewProducer.props(processorId, producerSettings, settings))) |
| 48 | +} |
| 49 | + |
| 50 | +private[akka] class ViewProducerImpl(processorId: String, producerSettings: ProducerSettings, materializerSettings: MaterializerSettings) |
| 51 | + extends Actor |
| 52 | + with ActorLogging |
| 53 | + with SubscriberManagement[Persistent] |
| 54 | + with SoftShutdown { |
| 55 | + |
| 56 | + import ActorBasedFlowMaterializer._ |
| 57 | + import ViewBuffer._ |
| 58 | + |
| 59 | + type S = ActorSubscription[Persistent] |
| 60 | + |
| 61 | + private val view = context.actorOf(Props(classOf[ViewBuffer], processorId, producerSettings.maxBufferSize, self)) |
| 62 | + |
| 63 | + private var pub: ActorPublisher[Persistent] = _ |
| 64 | + private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason |
| 65 | + |
| 66 | + context.setReceiveTimeout(materializerSettings.downstreamSubscriptionTimeout) |
| 67 | + |
| 68 | + final def receive = { |
| 69 | + case ExposedPublisher(pub) ⇒ |
| 70 | + this.pub = pub.asInstanceOf[ActorPublisher[Persistent]] |
| 71 | + context.become(waitingForSubscribers) |
| 72 | + } |
| 73 | + |
| 74 | + final def waitingForSubscribers: Receive = { |
| 75 | + case SubscribePending ⇒ |
| 76 | + pub.takePendingSubscribers() foreach registerSubscriber |
| 77 | + context.setReceiveTimeout(Duration.Undefined) |
| 78 | + context.become(active) |
| 79 | + } |
| 80 | + |
| 81 | + final def active: Receive = { |
| 82 | + case SubscribePending ⇒ |
| 83 | + pub.takePendingSubscribers() foreach registerSubscriber |
| 84 | + case RequestMore(sub, elements) ⇒ |
| 85 | + moreRequested(sub.asInstanceOf[S], elements) |
| 86 | + case Cancel(sub) ⇒ |
| 87 | + unregisterSubscription(sub.asInstanceOf[S]) |
| 88 | + case r @ Response(ps) ⇒ |
| 89 | + try { |
| 90 | + ps.foreach(p ⇒ pushToDownstream(withCtx(context)(p))) |
| 91 | + } catch { |
| 92 | + case Stop ⇒ { completeDownstream(); shutdownReason = None } |
| 93 | + case NonFatal(e) ⇒ { abortDownstream(e); shutdownReason = Some(e) } |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + override def requestFromUpstream(elements: Int): Unit = |
| 98 | + view ! Request(elements) |
| 99 | + |
| 100 | + override def initialBufferSize = |
| 101 | + materializerSettings.initialFanOutBufferSize |
| 102 | + |
| 103 | + override def maxBufferSize = |
| 104 | + materializerSettings.maxFanOutBufferSize |
| 105 | + |
| 106 | + override def createSubscription(subscriber: Subscriber[Persistent]): ActorSubscription[Persistent] = |
| 107 | + new ActorSubscription(self, subscriber) |
| 108 | + |
| 109 | + override def cancelUpstream(): Unit = { |
| 110 | + pub.shutdown(shutdownReason) |
| 111 | + softShutdown() |
| 112 | + } |
| 113 | + override def shutdown(completed: Boolean): Unit = { |
| 114 | + pub.shutdown(shutdownReason) |
| 115 | + softShutdown() |
| 116 | + } |
| 117 | + |
| 118 | + override def postStop(): Unit = { |
| 119 | + pub.shutdown(shutdownReason) |
| 120 | + } |
| 121 | +} |
| 122 | + |
| 123 | +private object ViewBuffer { |
| 124 | + case class Request(num: Int) |
| 125 | + case class Response(messages: Vector[Persistent]) |
| 126 | + |
| 127 | + case object Fill |
| 128 | + case object Filled |
| 129 | +} |
| 130 | + |
| 131 | +/** |
| 132 | + * A view that buffers up to `maxBufferSize` persistent messages in memory. Downstream demands |
| 133 | + * (requests) are served if the buffer is non-empty either while filling the buffer or after |
| 134 | + * having filled the buffer. When the buffer becomes empty new persistent messages are loaded |
| 135 | + * from the journal (in batches up to `maxBufferSize`). |
| 136 | + */ |
| 137 | +private[akka] class ViewBuffer(val processorId: String, maxBufferSize: Int, producer: ActorRef) extends View { |
| 138 | + import ViewBuffer._ |
| 139 | + import context.dispatcher |
| 140 | + |
| 141 | + private var replayed = 0 |
| 142 | + private var requested = 0 |
| 143 | + private var buffer: Vector[Persistent] = Vector.empty |
| 144 | + |
| 145 | + val filling: Receive = { |
| 146 | + case p: Persistent ⇒ |
| 147 | + buffer = buffer :+ p |
| 148 | + replayed += 1 |
| 149 | + if (requested > 0) respond(requested) |
| 150 | + case Filled ⇒ |
| 151 | + if (buffer.nonEmpty && requested > 0) respond(requested) |
| 152 | + if (buffer.nonEmpty) pause() |
| 153 | + else if (requested == 0) pause() |
| 154 | + else if (replayed > 0) fill() |
| 155 | + else schedule() |
| 156 | + case Request(num) ⇒ |
| 157 | + requested += num |
| 158 | + if (buffer.nonEmpty) respond(requested) |
| 159 | + } |
| 160 | + |
| 161 | + val pausing: Receive = { |
| 162 | + case Request(num) ⇒ |
| 163 | + requested += num |
| 164 | + respond(requested) |
| 165 | + if (buffer.isEmpty) fill() |
| 166 | + } |
| 167 | + |
| 168 | + val scheduled: Receive = { |
| 169 | + case Fill ⇒ |
| 170 | + fill() |
| 171 | + case Request(num) ⇒ |
| 172 | + requested += num |
| 173 | + } |
| 174 | + |
| 175 | + def receive = filling |
| 176 | + |
| 177 | + def fill(): Unit = { |
| 178 | + replayed = 0 |
| 179 | + context.become(filling) |
| 180 | + self ! Update(false, maxBufferSize) |
| 181 | + } |
| 182 | + |
| 183 | + def pause(): Unit = { |
| 184 | + context.become(pausing) |
| 185 | + } |
| 186 | + |
| 187 | + def schedule(): Unit = { |
| 188 | + context.become(scheduled) |
| 189 | + context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill) |
| 190 | + } |
| 191 | + |
| 192 | + def respond(num: Int): Unit = { |
| 193 | + val (res, buf) = buffer.splitAt(num) |
| 194 | + producer ! Response(res) |
| 195 | + buffer = buf |
| 196 | + requested -= res.size |
| 197 | + } |
| 198 | + |
| 199 | + private[persistence] override def onReplayComplete(await: Boolean): Unit = { |
| 200 | + super.onReplayComplete(await) |
| 201 | + self ! Filled |
| 202 | + } |
| 203 | + |
| 204 | + override def autoUpdateReplayMax: Long = maxBufferSize |
| 205 | + override def autoUpdate: Boolean = false |
| 206 | +} |
0 commit comments