1
+ /**
2
+ * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
3
+ */
1
4
package akka .persistence
2
5
3
6
import scala .util .control .NonFatal
@@ -33,21 +36,30 @@ object PersistentFlow {
33
36
/**
34
37
* Configuration object for a [[Persistent ]] stream producer.
35
38
*
36
- * @param maxBufferSize maximum number of persistent messages to be buffered in memory (per producer).
39
+ * @param fromSequenceNr Sequence number where the produced stream shall start (inclusive).
40
+ * Default is `1L`.
41
+ * @param maxBufferSize Maximum number of persistent messages to be buffered in memory (per producer).
42
+ * Default is `100`.
43
+ * @param idle Optional duration to wait if no more persistent messages can be pulled from the journal
44
+ * before attempting the next pull. Default is `None` which causes the producer to take
45
+ * the value defined by the `akka.persistence.view.auto-update-interval` configuration
46
+ * key. If defined, the `idle` value is taken directly.
37
47
*/
38
- case class ProducerSettings (maxBufferSize : Int = 20 )
48
+ case class ProducerSettings (fromSequenceNr : Long = 1L , maxBufferSize : Int = 100 , idle : Option [FiniteDuration ] = None ) {
49
+ require(fromSequenceNr > 0L , " fromSequenceNr must be > 0" )
50
+ }
39
51
40
- private [akka] object ViewProducer {
52
+ private object ViewProducer {
41
53
def props (processorId : String , producerSettings : ProducerSettings , settings : MaterializerSettings ): Props =
42
54
Props (classOf [ViewProducerImpl ], processorId, producerSettings, settings)
43
55
}
44
56
45
- private [akka] case class ViewProducerNode (processorId : String , producerSettings : ProducerSettings ) extends ProducerNode [Persistent ] {
57
+ private case class ViewProducerNode (processorId : String , producerSettings : ProducerSettings ) extends ProducerNode [Persistent ] {
46
58
def createProducer (settings : MaterializerSettings , context : ActorRefFactory ): Producer [Persistent ] =
47
59
new ActorProducer (context.actorOf(ViewProducer .props(processorId, producerSettings, settings)))
48
60
}
49
61
50
- private [akka] class ViewProducerImpl (processorId : String , producerSettings : ProducerSettings , materializerSettings : MaterializerSettings )
62
+ private class ViewProducerImpl (processorId : String , producerSettings : ProducerSettings , materializerSettings : MaterializerSettings )
51
63
extends Actor
52
64
with ActorLogging
53
65
with SubscriberManagement [Persistent ]
@@ -58,13 +70,11 @@ private[akka] class ViewProducerImpl(processorId: String, producerSettings: Prod
58
70
59
71
type S = ActorSubscription [Persistent ]
60
72
61
- private val view = context.actorOf(Props (classOf [ViewBuffer ], processorId, producerSettings.maxBufferSize , self))
73
+ private val view = context.actorOf(Props (classOf [ViewBuffer ], processorId, producerSettings, self), " view-buffer " )
62
74
63
75
private var pub : ActorPublisher [Persistent ] = _
64
76
private var shutdownReason : Option [Throwable ] = ActorPublisher .NormalShutdownReason
65
77
66
- context.setReceiveTimeout(materializerSettings.downstreamSubscriptionTimeout)
67
-
68
78
final def receive = {
69
79
case ExposedPublisher (pub) ⇒
70
80
this .pub = pub.asInstanceOf [ActorPublisher [Persistent ]]
@@ -74,7 +84,6 @@ private[akka] class ViewProducerImpl(processorId: String, producerSettings: Prod
74
84
final def waitingForSubscribers : Receive = {
75
85
case SubscribePending ⇒
76
86
pub.takePendingSubscribers() foreach registerSubscriber
77
- context.setReceiveTimeout(Duration .Undefined )
78
87
context.become(active)
79
88
}
80
89
@@ -129,22 +138,22 @@ private object ViewBuffer {
129
138
}
130
139
131
140
/**
132
- * A view that buffers up to `maxBufferSize` persistent messages in memory. Downstream demands
133
- * (requests) are served if the buffer is non-empty either while filling the buffer or after
134
- * having filled the buffer. When the buffer becomes empty new persistent messages are loaded
135
- * from the journal (in batches up to `maxBufferSize`).
141
+ * A view that buffers up to `producerSettings. maxBufferSize` persistent messages in memory.
142
+ * Downstream demands (requests) are served if the buffer is non-empty either while filling
143
+ * the buffer or after having filled the buffer. When the buffer becomes empty new persistent
144
+ * messages are loaded from the journal (in batches up to `producerSettings. maxBufferSize`).
136
145
*/
137
- private [akka] class ViewBuffer (val processorId : String , maxBufferSize : Int , producer : ActorRef ) extends View {
146
+ private class ViewBuffer (val processorId : String , producerSettings : ProducerSettings , producer : ActorRef ) extends View {
138
147
import ViewBuffer ._
139
148
import context .dispatcher
140
149
141
150
private var replayed = 0
142
151
private var requested = 0
143
152
private var buffer : Vector [Persistent ] = Vector .empty
144
153
145
- val filling : Receive = {
154
+ private val filling : Receive = {
146
155
case p : Persistent ⇒
147
- buffer = buffer :+ p
156
+ buffer :+= p
148
157
replayed += 1
149
158
if (requested > 0 ) respond(requested)
150
159
case Filled ⇒
@@ -158,14 +167,14 @@ private[akka] class ViewBuffer(val processorId: String, maxBufferSize: Int, prod
158
167
if (buffer.nonEmpty) respond(requested)
159
168
}
160
169
161
- val pausing : Receive = {
170
+ private val pausing : Receive = {
162
171
case Request (num) ⇒
163
172
requested += num
164
173
respond(requested)
165
174
if (buffer.isEmpty) fill()
166
175
}
167
176
168
- val scheduled : Receive = {
177
+ private val scheduled : Receive = {
169
178
case Fill ⇒
170
179
fill()
171
180
case Request (num) ⇒
@@ -174,33 +183,47 @@ private[akka] class ViewBuffer(val processorId: String, maxBufferSize: Int, prod
174
183
175
184
def receive = filling
176
185
177
- def fill (): Unit = {
186
+ override def onReplaySuccess (receive : Receive , await : Boolean ): Unit = {
187
+ super .onReplaySuccess(receive, await)
188
+ self ! Filled
189
+ }
190
+
191
+ override def onReplayFailure (receive : Receive , await : Boolean , cause : Throwable ): Unit = {
192
+ super .onReplayFailure(receive, await, cause)
193
+ self ! Filled
194
+ }
195
+
196
+ override def lastSequenceNr : Long =
197
+ math.max(producerSettings.fromSequenceNr - 1L , super .lastSequenceNr)
198
+
199
+ override def autoUpdateInterval : FiniteDuration =
200
+ producerSettings.idle.getOrElse(super .autoUpdateInterval)
201
+
202
+ override def autoUpdateReplayMax : Long =
203
+ producerSettings.maxBufferSize
204
+
205
+ override def autoUpdate : Boolean =
206
+ false
207
+
208
+ private def fill (): Unit = {
178
209
replayed = 0
179
210
context.become(filling)
180
- self ! Update (false , maxBufferSize )
211
+ self ! Update (false , autoUpdateReplayMax )
181
212
}
182
213
183
- def pause (): Unit = {
214
+ private def pause (): Unit = {
184
215
context.become(pausing)
185
216
}
186
217
187
- def schedule (): Unit = {
218
+ private def schedule (): Unit = {
188
219
context.become(scheduled)
189
220
context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill )
190
221
}
191
222
192
- def respond (num : Int ): Unit = {
223
+ private def respond (num : Int ): Unit = {
193
224
val (res, buf) = buffer.splitAt(num)
194
225
producer ! Response (res)
195
226
buffer = buf
196
227
requested -= res.size
197
228
}
198
-
199
- private [persistence] override def onReplayComplete (await : Boolean ): Unit = {
200
- super .onReplayComplete(await)
201
- self ! Filled
202
- }
203
-
204
- override def autoUpdateReplayMax : Long = maxBufferSize
205
- override def autoUpdate : Boolean = false
206
229
}
0 commit comments