1
1
package kyo
2
2
3
3
import Batch .internal .*
4
+ import Batch .internal .Pending .Expanded
5
+ import Batch .internal .Pending .ToExpand
4
6
import kyo .Tag
5
7
import kyo .kernel .*
6
8
@@ -102,54 +104,46 @@ object Batch:
102
104
* A sequence of results from executing the batched operations
103
105
*/
104
106
def run [A : Flat , S ](v : A < (Batch & S ))(using Frame ): Chunk [A ] < S =
105
-
106
- type SourceAny = Source [Any , Any , S ]
107
- type ContAny = Any < (Batch & S ) => (ToExpand | Expanded | A ) < (Batch & S )
108
-
109
- abstract class Pending
110
- case class ToExpand (op : Seq [Any ], cont : ContAny ) extends Pending
111
- case class Expanded (value : Any , source : SourceAny , cont : ContAny ) extends Pending
112
-
113
107
// An item can be a final value (`A`),
114
108
// a sequence from `Batch.eval` (`ToExpand`),
115
109
// or a call to a source (`Expanded`).
116
- type Item = A | ToExpand | Expanded
110
+ type Item = A | ToExpand [ A , S ] | Expanded [ A , S ]
117
111
118
112
// Transforms effect suspensions into an item.
119
113
// Captures the continuation in the `Item` objects for `ToExpand` and `Expanded` cases.
120
114
def capture (v : Item < (Batch & S )): Item < S =
121
115
ArrowEffect .handle(Tag [Batch ], v) {
122
116
[C ] =>
123
117
(input, cont) =>
124
- val contAny = cont.asInstanceOf [ContAny ]
118
+ val contAny = cont.asInstanceOf [ContAny [ A , S ] ]
125
119
input match
126
- case Call (v, source) => Expanded (v, source.asInstanceOf [SourceAny ], contAny)
120
+ case Call (v, source) => Expanded (v, source.asInstanceOf [SourceAny [ S ] ], contAny)
127
121
case Eval (v) => ToExpand (v, contAny)
128
122
}
129
123
130
124
// Expands any `Batch.eval` calls, capturing items for each element in the sequence.
131
125
// Returns a `Chunk[Chunk[A]]` to reduce `map` calls.
132
126
def expand (items : Chunk [Item ]): Chunk [Chunk [Item ]] < S =
133
127
Kyo .foreach(items) {
134
- case ToExpand (seq : Seq [Any ], cont) =>
128
+ case ToExpand [ A @ unchecked, S @ unchecked] (seq : Seq [Any ], cont) =>
135
129
Kyo .foreach(seq)(v => capture(cont(v)))
136
130
case item => Chunk (item)
137
131
}
138
132
139
133
// Groups all source calls (`Expanded`), calls their source functions, and reassembles the results.
140
134
// Returns a `Chunk[Chunk[A]]` to reduce `map` calls.
141
135
def flush (items : Chunk [Item ]): Chunk [Chunk [Item ]] < S =
142
- val pending : Map [SourceAny | Unit , Seq [Item ]] =
136
+ val pending : Map [SourceAny [ S ] | Unit , Seq [Item ]] =
143
137
items.groupBy {
144
- case Expanded (_, source, _) => source
145
- case _ => () // Used as a placeholder for items that aren't source calls
138
+ case ( Expanded [ A @ unchecked, S @ unchecked] (_, source, _) ) => source
139
+ case _ => () // Used as a placeholder for items that aren't source calls
146
140
}
147
141
Kyo .foreach(pending.toSeq) { tuple =>
148
142
(tuple : @ unchecked) match
149
143
case (_ : Unit , items) =>
150
144
// No need for flushing
151
145
Chunk .from(items)
152
- case (source : SourceAny , items : Seq [Expanded ] @ unchecked) =>
146
+ case (source : SourceAny [ S ] , items : Seq [Expanded [ A , S ] ] @ unchecked) =>
153
147
// Only request distinct items from the source
154
148
source(items.map(_.value).distinct).map { results =>
155
149
// Reassemble the results by iterating on the original collection
@@ -163,13 +157,13 @@ object Batch:
163
157
164
158
// The main evaluation loop that expands and flushes items until all values are final.
165
159
def loop (items : Chunk [Item ]): Chunk [A ] < S =
166
- if ! items.exists((_ : @ unchecked).isInstanceOf [Pending ]) then
160
+ if ! items.exists((_ : @ unchecked).isInstanceOf [Pending [ A , S ] ]) then
167
161
// All values are final, done
168
162
items.asInstanceOf [Chunk [A ]]
169
163
else
170
164
// The code repetition in the branches is a performance optimization to reduce
171
165
// `map` calls.
172
- if items.exists((_ : @ unchecked).isInstanceOf [ToExpand ]) then
166
+ if items.exists((_ : @ unchecked).isInstanceOf [ToExpand [ A , S ] ]) then
173
167
// Expand `Batch.eval` calls if present
174
168
expand(items).map { expanded =>
175
169
flush(expanded.flattenChunk)
@@ -185,10 +179,16 @@ object Batch:
185
179
186
180
object internal :
187
181
type Source [A , B , S ] = Seq [A ] => (A => B < S ) < S
182
+ type SourceAny [S ] = Source [Any , Any , S ]
188
183
189
184
sealed trait Op [A ]
190
185
case class Eval [A ](seq : Seq [A ]) extends Op [A ]
191
186
case class Call [A , B , S ](v : A , source : Source [A , B , S ]) extends Op [B < (Batch & S )]
192
- end internal
193
187
188
+ enum Pending [A , S ]:
189
+ case ToExpand (op : Seq [Any ], cont : ContAny [A , S ])
190
+ case Expanded (value : Any , source : SourceAny [S ], cont : ContAny [A , S ])
191
+
192
+ type ContAny [A , S ] = Any < (Batch & S ) => (ToExpand [A , S ] | Expanded [A , S ] | A ) < (Batch & S )
193
+ end internal
194
194
end Batch
0 commit comments