Skip to content

Commit e27723f

Browse files
committed
Add job property to ReceiveChannel and SendChannel
Fixes #260
1 parent 1aec688 commit e27723f

File tree

17 files changed

+273
-101
lines changed

17 files changed

+273
-101
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+18-3
Original file line numberDiff line numberDiff line change
@@ -456,11 +456,12 @@ public final class kotlinx/coroutines/experimental/YieldKt {
456456
}
457457

458458
public abstract class kotlinx/coroutines/experimental/channels/AbstractChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/Channel {
459-
public fun <init> ()V
459+
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
460460
public fun cancel (Ljava/lang/Throwable;)Z
461461
protected fun cleanupSendQueueOnCancel ()V
462462
protected final fun describeTryPoll ()Lkotlinx/coroutines/experimental/channels/AbstractChannel$TryPollDesc;
463463
protected final fun getHasReceiveOrClosed ()Z
464+
public final fun getJob ()Lkotlinx/coroutines/experimental/Job;
464465
public final fun getOnReceive ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
465466
public final fun getOnReceiveOrNull ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
466467
protected abstract fun isBufferAlwaysEmpty ()Z
@@ -539,15 +540,18 @@ public abstract interface class kotlinx/coroutines/experimental/channels/ActorSc
539540
}
540541

541542
public final class kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/BroadcastChannel {
542-
public fun <init> (I)V
543+
public fun <init> (ILkotlinx/coroutines/experimental/Job;)V
544+
public synthetic fun <init> (ILkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
543545
public fun close (Ljava/lang/Throwable;)Z
544546
public final fun getCapacity ()I
547+
public fun getJob ()Lkotlinx/coroutines/experimental/Job;
545548
public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
546549
public fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
547550
}
548551

549552
public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
550-
public fun <init> (I)V
553+
public fun <init> (ILkotlinx/coroutines/experimental/Job;)V
554+
public synthetic fun <init> (ILkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
551555
protected fun cleanupSendQueueOnCancel ()V
552556
protected fun getBufferDebugString ()Ljava/lang/String;
553557
public final fun getCapacity ()I
@@ -712,7 +716,10 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh
712716
public static final field UNDEFINED Lkotlinx/coroutines/experimental/internal/Symbol;
713717
public fun <init> ()V
714718
public fun <init> (Ljava/lang/Object;)V
719+
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
720+
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
715721
public fun close (Ljava/lang/Throwable;)Z
722+
public fun getJob ()Lkotlinx/coroutines/experimental/Job;
716723
public fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2;
717724
public final fun getValue ()Ljava/lang/Object;
718725
public final fun getValueOrNull ()Ljava/lang/Object;
@@ -726,6 +733,8 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh
726733

727734
public class kotlinx/coroutines/experimental/channels/ConflatedChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
728735
public fun <init> ()V
736+
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
737+
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
729738
protected final fun isBufferAlwaysEmpty ()Z
730739
protected final fun isBufferAlwaysFull ()Z
731740
protected final fun isBufferEmpty ()Z
@@ -737,6 +746,8 @@ public class kotlinx/coroutines/experimental/channels/ConflatedChannel : kotlinx
737746

738747
public class kotlinx/coroutines/experimental/channels/LinkedListChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
739748
public fun <init> ()V
749+
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
750+
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
740751
protected final fun isBufferAlwaysEmpty ()Z
741752
protected final fun isBufferAlwaysFull ()Z
742753
protected final fun isBufferEmpty ()Z
@@ -775,6 +786,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Produce
775786

776787
public abstract interface class kotlinx/coroutines/experimental/channels/ReceiveChannel {
777788
public abstract fun cancel (Ljava/lang/Throwable;)Z
789+
public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job;
778790
public abstract fun getOnReceive ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
779791
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
780792
public abstract fun isClosedForReceive ()Z
@@ -797,6 +809,8 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Receive
797809

798810
public class kotlinx/coroutines/experimental/channels/RendezvousChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
799811
public fun <init> ()V
812+
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
813+
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
800814
protected final fun isBufferAlwaysEmpty ()Z
801815
protected final fun isBufferAlwaysFull ()Z
802816
protected final fun isBufferEmpty ()Z
@@ -812,6 +826,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Send {
812826

813827
public abstract interface class kotlinx/coroutines/experimental/channels/SendChannel {
814828
public abstract fun close (Ljava/lang/Throwable;)Z
829+
public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job;
815830
public abstract fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2;
816831
public abstract fun isClosedForSend ()Z
817832
public abstract fun isFull ()Z

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+24-1
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
259259
helpClose(closed)
260260
onClosed(closed)
261261
afterClose(cause)
262+
// Cancel it as the last action so if the channel is closed, then the job is cancelled as well
263+
job.cancel(cause)
262264
return true
263265
}
264266

@@ -473,9 +475,13 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
473475
/**
474476
* Abstract send/receive channel. It is a base class for all channel implementations.
475477
*/
476-
public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
478+
public abstract class AbstractChannel<E>(final override val job: Job) : AbstractSendChannel<E>(), Channel<E> {
477479
// ------ extension points for buffered channels ------
478480

481+
init {
482+
registerCancellation(job)
483+
}
484+
479485
/**
480486
* Returns `true` if [isBufferEmpty] is always `true`.
481487
* @suppress **This is unstable API and it is subject to change.**
@@ -1051,3 +1057,20 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
10511057
abstract fun resumeReceiveClosed(closed: Closed<*>)
10521058
}
10531059

1060+
internal fun SendChannel<*>.registerCancellation(job: Job) {
1061+
val cancellation = ChannelCancellation(this, job)
1062+
job.invokeOnCompletion(cancellation.asHandler)
1063+
}
1064+
1065+
private class ChannelCancellation(
1066+
private val channel: SendChannel<*>, job: Job) : JobNode<Job>(job) {
1067+
1068+
override fun invoke(cause: Throwable?) {
1069+
if (job.isCancelled) {
1070+
channel.close(cause)
1071+
} else {
1072+
channel.close()
1073+
}
1074+
}
1075+
}
1076+

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+9-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.*
1920
import kotlinx.coroutines.experimental.internal.*
2021
import kotlinx.coroutines.experimental.internalAnnotations.*
2122
import kotlinx.coroutines.experimental.selects.*
@@ -38,10 +39,16 @@ class ArrayBroadcastChannel<E>(
3839
/**
3940
* Buffer capacity.
4041
*/
41-
val capacity: Int
42+
val capacity: Int,
43+
44+
/**
45+
* Job owning this channel.
46+
*/
47+
override val job: Job = Job()
4248
) : AbstractSendChannel<E>(), BroadcastChannel<E> {
4349
init {
4450
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
51+
registerCancellation(job)
4552
}
4653

4754
private val bufferLock = ReentrantLock()
@@ -195,7 +202,7 @@ class ArrayBroadcastChannel<E>(
195202

196203
private class Subscriber<E>(
197204
private val broadcastChannel: ArrayBroadcastChannel<E>
198-
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
205+
) : AbstractChannel<E>(Job()), SubscriptionReceiveChannel<E> {
199206
private val subLock = ReentrantLock()
200207

201208
@Volatile

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

+9-3
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.*
1920
import kotlinx.coroutines.experimental.internal.*
20-
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
21+
import kotlinx.coroutines.experimental.internalAnnotations.*
2122
import kotlinx.coroutines.experimental.selects.*
2223

2324
/**
@@ -33,8 +34,13 @@ public open class ArrayChannel<E>(
3334
/**
3435
* Buffer capacity.
3536
*/
36-
val capacity: Int
37-
) : AbstractChannel<E>() {
37+
val capacity: Int,
38+
39+
/**
40+
* Job owning this channel.
41+
*/
42+
job: Job = Job()
43+
) : AbstractChannel<E>(job) {
3844
init {
3945
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
4046
}

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt

+20-7
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,24 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.CancellationException
20-
import kotlinx.coroutines.experimental.CoroutineScope
21-
import kotlinx.coroutines.experimental.Job
19+
import kotlinx.coroutines.experimental.*
2220
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2321
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
24-
import kotlinx.coroutines.experimental.selects.SelectClause1
25-
import kotlinx.coroutines.experimental.selects.SelectClause2
26-
import kotlinx.coroutines.experimental.selects.select
27-
import kotlinx.coroutines.experimental.yield
22+
import kotlinx.coroutines.experimental.selects.*
2823

2924
/**
3025
* Sender's interface to [Channel].
3126
*/
3227
public interface SendChannel<in E> {
28+
29+
/**
30+
* The job of this channel bounded with channel lifecycle.
31+
* If job is completed with any reason (either normally or exceptionally), channel is [closed][SendChannel.close]
32+
* with a completion [cause][Job.getCancellationException] of the job.
33+
* If the channel is [closed][isClosedForSend], job is cancelled with the same reason as [SendChannel.close] call
34+
*/
35+
public val job: Job
36+
3337
/**
3438
* Returns `true` if this channel was closed by invocation of [close] and thus
3539
* the [send] and [offer] attempts throws exception.
@@ -105,6 +109,15 @@ public interface SendChannel<in E> {
105109
* Receiver's interface to [Channel].
106110
*/
107111
public interface ReceiveChannel<out E> {
112+
113+
/**
114+
* The job of this channel bounded with channel lifecycle.
115+
* If job is completed with any reason (either normally or exceptionally), channel is [cancelled][ReceiveChannel.cancel]
116+
* with a completion [cause][Job.getCancellationException] of the job.
117+
* If the channel is cancelled or [closed][isClosedForReceive], job is cancelled with the same reason as [ReceiveChannel.cancel] call
118+
*/
119+
public val job: Job
120+
108121
/**
109122
* Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
110123
* side and all previously sent items were already received, so that the [receive] attempt

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.atomicfu.*
20+
import kotlinx.coroutines.experimental.*
2021
import kotlinx.coroutines.experimental.internal.*
2122
import kotlinx.coroutines.experimental.internalAnnotations.*
2223
import kotlinx.coroutines.experimental.intrinsics.*
@@ -37,7 +38,12 @@ import kotlinx.coroutines.experimental.selects.*
3738
* [opening][openSubscription] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the
3839
* number of subscribers.
3940
*/
40-
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
41+
public class ConflatedBroadcastChannel<E>(override val job: Job = Job()) : BroadcastChannel<E> {
42+
43+
init {
44+
registerCancellation(job)
45+
}
46+
4147
/**
4248
* Creates an instance of this class that already holds a value.
4349
*

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
20-
import kotlinx.coroutines.experimental.selects.SelectInstance
19+
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.selects.*
2121

2222
/**
2323
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
@@ -30,7 +30,7 @@ import kotlinx.coroutines.experimental.selects.SelectInstance
3030
*
3131
* This implementation is fully lock-free.
3232
*/
33-
public open class ConflatedChannel<E> : AbstractChannel<E>() {
33+
public open class ConflatedChannel<E>(job: Job = Job()) : AbstractChannel<E>(job) {
3434
protected final override val isBufferAlwaysEmpty: Boolean get() = true
3535
protected final override val isBufferEmpty: Boolean get() = true
3636
protected final override val isBufferAlwaysFull: Boolean get() = false

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
20-
import kotlinx.coroutines.experimental.selects.SelectInstance
19+
import kotlinx.coroutines.experimental.*
20+
import kotlinx.coroutines.experimental.selects.*
2121

2222
/**
2323
* Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
@@ -27,7 +27,7 @@ import kotlinx.coroutines.experimental.selects.SelectInstance
2727
*
2828
* This implementation is fully lock-free.
2929
*/
30-
public open class LinkedListChannel<E> : AbstractChannel<E>() {
30+
public open class LinkedListChannel<E>(job: Job = Job()) : AbstractChannel<E>(job) {
3131
protected final override val isBufferAlwaysEmpty: Boolean get() = true
3232
protected final override val isBufferEmpty: Boolean get() = true
3333
protected final override val isBufferAlwaysFull: Boolean get() = false

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.*
20+
1921
/**
2022
* Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender
2123
* to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends
@@ -25,7 +27,7 @@ package kotlinx.coroutines.experimental.channels
2527
*
2628
* This implementation is fully lock-free.
2729
*/
28-
public open class RendezvousChannel<E> : AbstractChannel<E>() {
30+
public open class RendezvousChannel<E>(job: Job = Job()) : AbstractChannel<E>(job) {
2931
protected final override val isBufferAlwaysEmpty: Boolean get() = true
3032
protected final override val isBufferEmpty: Boolean get() = true
3133
protected final override val isBufferAlwaysFull: Boolean get() = true

common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.common.kt

+22
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19+
import kotlin.test.*
20+
1921
public expect open class TestBase constructor() {
2022
public val isStressTest: Boolean
2123
public val stressTestMultiplier: Int
@@ -31,3 +33,23 @@ public expect open class TestBase constructor() {
3133
block: suspend CoroutineScope.() -> Unit
3234
)
3335
}
36+
37+
suspend inline fun <reified T : Exception> assertFailsWith(deferred: Deferred<*>) {
38+
try {
39+
deferred.await()
40+
fail("Expected ${T::class} to be thrown, but was completed successfully")
41+
} catch (e: Exception) {
42+
assertTrue(e is T, "Expected exception of type ${T::class}, but has $e}")
43+
}
44+
}
45+
46+
// Clashes with assertFailsWith
47+
suspend inline fun <reified T : Throwable> assertFails(noinline block: suspend () -> Unit): T {
48+
try {
49+
block()
50+
fail("Expected ${T::class} to be thrown, but was completed successfully")
51+
} catch (e: Throwable) {
52+
assertTrue(e is T, "Expected exception of type ${T::class}, but has $e}")
53+
return e as T
54+
}
55+
}

0 commit comments

Comments
 (0)