@@ -27,7 +27,7 @@ class ChannelsConsumeTest {
27
27
private val sourceList = (1 .. 10 ).toList()
28
28
29
29
// test source with numbers 1..10
30
- private fun testSource () = produce {
30
+ private fun testSource (context : CoroutineContext ) = produce(context) {
31
31
for (i in sourceList) {
32
32
send(i)
33
33
}
@@ -811,10 +811,10 @@ class ChannelsConsumeTest {
811
811
fun testZip () {
812
812
val expect = sourceList.zip(sourceList) { a, b -> a + 2 * b }
813
813
checkTransform(expect) { ctx ->
814
- zip(testSource(), ctx) { a, b -> a + 2 * b }
814
+ zip(testSource(ctx ), ctx) { a, b -> a + 2 * b }
815
815
}
816
816
checkTransform(expect) { ctx ->
817
- testSource().zip(this , ctx) { a, b -> a + 2 * b }
817
+ testSource(ctx ).zip(this , ctx) { a, b -> a + 2 * b }
818
818
}
819
819
}
820
820
@@ -832,27 +832,28 @@ class ChannelsConsumeTest {
832
832
expected : ((Throwable ? ) -> Unit )? = null,
833
833
terminal : suspend ReceiveChannel <Int >.() -> Unit
834
834
) {
835
- val src = testSource()
836
- runBlocking {
835
+ val src = runBlocking {
836
+ val src = testSource(coroutineContext)
837
837
try {
838
838
// terminal operation
839
839
terminal(src)
840
840
// source must be cancelled at the end of terminal op
841
- assertTrue(src.isClosedForReceive, " Source must be closed" )
842
841
if (expected != null ) error(" Exception was expected" )
843
842
} catch (e: Throwable ) {
844
843
if (expected == null ) throw e
845
844
expected(e)
846
845
}
846
+ src
847
847
}
848
+ assertTrue(src.isClosedForReceive, " Source must be closed" )
848
849
}
849
850
850
851
private fun checkTerminalCancellation (
851
852
expected : ((Throwable ? ) -> Unit )? = null,
852
853
terminal : suspend ReceiveChannel <Int >.() -> Unit
853
854
) {
854
- val src = testSource()
855
- runBlocking {
855
+ val src = runBlocking {
856
+ val src = testSource(coroutineContext)
856
857
// terminal operation in a separate async context started until the first suspension
857
858
val d = async(coroutineContext, start = CoroutineStart .UNDISPATCHED ) {
858
859
terminal(src)
@@ -869,6 +870,7 @@ class ChannelsConsumeTest {
869
870
if (expected == null ) throw e
870
871
expected(e)
871
872
}
873
+ src
872
874
}
873
875
// source must be cancelled at the end of terminal op even if it was cancelled while in process
874
876
assertTrue(src.isClosedForReceive, " Source must be closed" )
@@ -889,8 +891,8 @@ class ChannelsConsumeTest {
889
891
expect : List <R >,
890
892
transform : ReceiveChannel <Int >.(CoroutineContext ) -> ReceiveChannel <R >
891
893
) {
892
- val src = testSource()
893
- runBlocking {
894
+ val src = runBlocking {
895
+ val src = testSource(coroutineContext)
894
896
// transform
895
897
val res = transform(src, coroutineContext)
896
898
// receive nReceive elements from the result
@@ -904,6 +906,7 @@ class ChannelsConsumeTest {
904
906
// then check that result is closed
905
907
assertEquals(null , res.receiveOrNull(), " Result has unexpected values" )
906
908
}
909
+ src
907
910
}
908
911
// source must be cancelled when runBlocking processes all the scheduled stuff
909
912
assertTrue(src.isClosedForReceive, " Source must be closed" )
0 commit comments