diff --git a/binary-compatibility-validator/build.gradle b/binary-compatibility-validator/build.gradle index 32b7dad38f..a0432874ec 100644 --- a/binary-compatibility-validator/build.gradle +++ b/binary-compatibility-validator/build.gradle @@ -43,6 +43,7 @@ sourceSets { } test { + dependsOn cleanCompileTestKotlin dependsOn configurations.testArtifacts systemProperty 'testCasesClassesDirs', sourceSets.test.output.classesDirs.asPath diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index c73c208f3f..ec341573ea 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -456,11 +456,12 @@ public final class kotlinx/coroutines/experimental/YieldKt { } public abstract class kotlinx/coroutines/experimental/channels/AbstractChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/Channel { - public fun ()V + public fun (Lkotlinx/coroutines/experimental/Job;)V public fun cancel (Ljava/lang/Throwable;)Z protected fun cleanupSendQueueOnCancel ()V protected final fun describeTryPoll ()Lkotlinx/coroutines/experimental/channels/AbstractChannel$TryPollDesc; protected final fun getHasReceiveOrClosed ()Z + public final fun getJob ()Lkotlinx/coroutines/experimental/Job; public final fun getOnReceive ()Lkotlinx/coroutines/experimental/selects/SelectClause1; public final fun getOnReceiveOrNull ()Lkotlinx/coroutines/experimental/selects/SelectClause1; protected abstract fun isBufferAlwaysEmpty ()Z @@ -540,14 +541,19 @@ public abstract interface class kotlinx/coroutines/experimental/channels/ActorSc public final class kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/BroadcastChannel { public fun (I)V + public fun (ILkotlinx/coroutines/experimental/Job;)V + public synthetic fun (ILkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public fun close (Ljava/lang/Throwable;)Z public final fun getCapacity ()I + public fun getJob ()Lkotlinx/coroutines/experimental/Job; public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel; public fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel; } public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/coroutines/experimental/channels/AbstractChannel { public fun (I)V + public fun (ILkotlinx/coroutines/experimental/Job;)V + public synthetic fun (ILkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V protected fun cleanupSendQueueOnCancel ()V protected fun getBufferDebugString ()Ljava/lang/String; public final fun getCapacity ()I @@ -657,6 +663,7 @@ public final class kotlinx/coroutines/experimental/channels/ChannelsKt { public static final fun maxWith (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; public static final fun minWith (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; public static final fun none (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; + public static final fun onClose (Lkotlinx/coroutines/experimental/channels/Channel;Lkotlin/jvm/functions/Function1;)V public static final fun requireNoNulls (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel; public static final fun single (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; public static final fun singleOrNull (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object; @@ -712,7 +719,10 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh public static final field UNDEFINED Lkotlinx/coroutines/experimental/internal/Symbol; public fun ()V public fun (Ljava/lang/Object;)V + public fun (Lkotlinx/coroutines/experimental/Job;)V + public synthetic fun (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public fun close (Ljava/lang/Throwable;)Z + public fun getJob ()Lkotlinx/coroutines/experimental/Job; public fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2; public final fun getValue ()Ljava/lang/Object; public final fun getValueOrNull ()Ljava/lang/Object; @@ -726,6 +736,8 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh public class kotlinx/coroutines/experimental/channels/ConflatedChannel : kotlinx/coroutines/experimental/channels/AbstractChannel { public fun ()V + public fun (Lkotlinx/coroutines/experimental/Job;)V + public synthetic fun (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V protected final fun isBufferAlwaysEmpty ()Z protected final fun isBufferAlwaysFull ()Z protected final fun isBufferEmpty ()Z @@ -737,6 +749,8 @@ public class kotlinx/coroutines/experimental/channels/ConflatedChannel : kotlinx public class kotlinx/coroutines/experimental/channels/LinkedListChannel : kotlinx/coroutines/experimental/channels/AbstractChannel { public fun ()V + public fun (Lkotlinx/coroutines/experimental/Job;)V + public synthetic fun (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V protected final fun isBufferAlwaysEmpty ()Z protected final fun isBufferAlwaysFull ()Z protected final fun isBufferEmpty ()Z @@ -775,6 +789,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Produce public abstract interface class kotlinx/coroutines/experimental/channels/ReceiveChannel { public abstract fun cancel (Ljava/lang/Throwable;)Z + public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job; public abstract fun getOnReceive ()Lkotlinx/coroutines/experimental/selects/SelectClause1; public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/experimental/selects/SelectClause1; public abstract fun isClosedForReceive ()Z @@ -797,6 +812,8 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Receive public class kotlinx/coroutines/experimental/channels/RendezvousChannel : kotlinx/coroutines/experimental/channels/AbstractChannel { public fun ()V + public fun (Lkotlinx/coroutines/experimental/Job;)V + public synthetic fun (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V protected final fun isBufferAlwaysEmpty ()Z protected final fun isBufferAlwaysFull ()Z protected final fun isBufferEmpty ()Z @@ -812,6 +829,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Send { public abstract interface class kotlinx/coroutines/experimental/channels/SendChannel { public abstract fun close (Ljava/lang/Throwable;)Z + public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job; public abstract fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2; public abstract fun isClosedForSend ()Z public abstract fun isFull ()Z diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.common.kt index a0ea9b3d6b..09f1b0e3b6 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.common.kt @@ -24,6 +24,8 @@ internal expect annotation class JvmName(val name: String) @Target(AnnotationTarget.FILE) internal expect annotation class JvmMultifileClass() +internal expect annotation class JvmOverloads() + internal expect annotation class JvmField() internal expect annotation class Volatile() diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt index bb8ce693b7..7d0e452a6f 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt @@ -83,13 +83,13 @@ public abstract class AbstractSendChannel : SendChannel { * Returns non-null closed token if it is last in the queue. * @suppress **This is unstable API and it is subject to change.** */ - protected val closedForSend: Closed<*>? get() = queue.prevNode as? Closed<*> + protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) } /** * Returns non-null closed token if it is first in the queue. * @suppress **This is unstable API and it is subject to change.** */ - protected val closedForReceive: Closed<*>? get() = queue.nextNode as? Closed<*> + protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) } /** * Retrieves first sending waiter from the queue or returns closed token. @@ -181,7 +181,7 @@ public abstract class AbstractSendChannel : SendChannel { val result = offerInternal(element) return when { result === OFFER_SUCCESS -> true - result === OFFER_FAILED -> false + result === OFFER_FAILED -> throw closedForSend?.sendException ?: return false result is Closed<*> -> throw result.sendException else -> error("offerInternal returned $result") } @@ -243,23 +243,53 @@ public abstract class AbstractSendChannel : SendChannel { public override fun close(cause: Throwable?): Boolean { val closed = Closed(cause) + + /* + * Try to commit close by adding a close token to the end of the queue. + * Successful -> we're now responsible for closing receivers + * Not successful -> help closing pending receivers to maintain invariant + * "if (!close()) next send will throw" + */ + val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> }) + if (!closeAdded) { + helpClose(queue.prevNode as Closed<*>) + return false + } + + helpClose(closed) + onClosed(closed) + afterClose(cause) + // Cancel it as the last action so if the channel is closed, then the job is cancelled as well + job.cancel(cause) + return true + } + + private fun helpClose(closed: Closed<*>) { + /* + * It's important to traverse list from right to left to avoid races with sender. + * Consider channel state + * head sentinel -> [receiver 1] -> [receiver 2] -> head sentinel + * T1 invokes receive() + * T2 invokes close() + * T3 invokes close() + send(value) + * + * If both will traverse list from left to right, following non-linearizable history is possible: + * [close -> false], [send -> transferred 'value' to receiver] + */ while (true) { - val receive = takeFirstReceiveOrPeekClosed() - if (receive == null) { - // queue empty or has only senders -- try add last "Closed" item to the queue - if (queue.addLastIfPrev(closed, { prev -> - if (prev is Closed<*>) return false // already closed - prev !is ReceiveOrClosed<*> // only add close if no waiting receive - })) { - onClosed(closed) - afterClose(cause) - return true - } - continue // retry on failure + val previous = closed.prevNode + // Channel is empty or has no receivers + if (previous is LockFreeLinkedListHead || previous !is Receive<*>) { + break + } + + if (!previous.remove()) { + continue } - if (receive is Closed<*>) return false // already marked as closed -- nothing to do - receive as Receive // type assertion - receive.resumeReceiveClosed(closed) + + @Suppress("UNCHECKED_CAST") + previous as Receive // type assertion + previous.resumeReceiveClosed(closed) } } @@ -445,9 +475,13 @@ public abstract class AbstractSendChannel : SendChannel { /** * Abstract send/receive channel. It is a base class for all channel implementations. */ -public abstract class AbstractChannel : AbstractSendChannel(), Channel { +public abstract class AbstractChannel(final override val job: Job) : AbstractSendChannel(), Channel { // ------ extension points for buffered channels ------ + init { + registerCancellation(job) + } + /** * Returns `true` if [isBufferEmpty] is always `true`. * @suppress **This is unstable API and it is subject to change.** @@ -1023,3 +1057,20 @@ private abstract class Receive : LockFreeLinkedListNode(), ReceiveOrClosed abstract fun resumeReceiveClosed(closed: Closed<*>) } +internal fun SendChannel<*>.registerCancellation(job: Job) { + val cancellation = ChannelCancellation(this, job) + job.invokeOnCompletion(cancellation.asHandler) +} + +private class ChannelCancellation( + private val channel: SendChannel<*>, job: Job) : JobNode(job) { + + override fun invoke(cause: Throwable?) { + if (job.isCancelled) { + channel.close(cause) + } else { + channel.close() + } + } +} + diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt index ffdf694dca..9cdda276e4 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt @@ -16,6 +16,7 @@ package kotlinx.coroutines.experimental.channels +import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.internal.* import kotlinx.coroutines.experimental.internalAnnotations.* import kotlinx.coroutines.experimental.selects.* @@ -34,14 +35,20 @@ import kotlinx.coroutines.experimental.selects.* * The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber. * The lists of suspended senders or receivers are lock-free. */ -class ArrayBroadcastChannel( +class ArrayBroadcastChannel @JvmOverloads constructor( /** * Buffer capacity. */ - val capacity: Int + val capacity: Int, + + /** + * Job owning this channel. + */ + override val job: Job = Job() ) : AbstractSendChannel(), BroadcastChannel { init { require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" } + registerCancellation(job) } private val bufferLock = ReentrantLock() @@ -195,7 +202,7 @@ class ArrayBroadcastChannel( private class Subscriber( private val broadcastChannel: ArrayBroadcastChannel - ) : AbstractChannel(), SubscriptionReceiveChannel { + ) : AbstractChannel(Job()), SubscriptionReceiveChannel { private val subLock = ReentrantLock() @Volatile diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt index b118a896c8..e626d21066 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt @@ -16,8 +16,9 @@ package kotlinx.coroutines.experimental.channels +import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.internal.* -import kotlinx.coroutines.experimental.internalAnnotations.Volatile +import kotlinx.coroutines.experimental.internalAnnotations.* import kotlinx.coroutines.experimental.selects.* /** @@ -29,12 +30,17 @@ import kotlinx.coroutines.experimental.selects.* * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations. * The lists of suspended senders or receivers are lock-free. */ -public open class ArrayChannel( +public open class ArrayChannel @JvmOverloads constructor( /** * Buffer capacity. */ - val capacity: Int -) : AbstractChannel() { + val capacity: Int, + + /** + * Job owning this channel. + */ + job: Job = Job() +) : AbstractChannel(job) { init { require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" } } diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt index 912f87ea46..cd143ea2d2 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt @@ -16,20 +16,24 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.coroutines.experimental.CancellationException -import kotlinx.coroutines.experimental.CoroutineScope -import kotlinx.coroutines.experimental.Job +import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED -import kotlinx.coroutines.experimental.selects.SelectClause1 -import kotlinx.coroutines.experimental.selects.SelectClause2 -import kotlinx.coroutines.experimental.selects.select -import kotlinx.coroutines.experimental.yield +import kotlinx.coroutines.experimental.selects.* /** * Sender's interface to [Channel]. */ public interface SendChannel { + + /** + * The job of this channel bounded with channel lifecycle. + * If job is completed with any reason (either normally or exceptionally), channel is [closed][SendChannel.close] + * with a completion [cause][Job.getCancellationException] of the job. + * If the channel is [closed][isClosedForSend], job is cancelled with the same reason as [SendChannel.close] call + */ + public val job: Job + /** * Returns `true` if this channel was closed by invocation of [close] and thus * the [send] and [offer] attempts throws exception. @@ -105,6 +109,15 @@ public interface SendChannel { * Receiver's interface to [Channel]. */ public interface ReceiveChannel { + + /** + * The job of this channel bounded with channel lifecycle. + * If job is completed with any reason (either normally or exceptionally), channel is [cancelled][ReceiveChannel.cancel] + * with a completion [cause][Job.getCancellationException] of the job. + * If the channel is cancelled or [closed][isClosedForReceive], job is cancelled with the same reason as [ReceiveChannel.cancel] call + */ + public val job: Job + /** * Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel] * side and all previously sent items were already received, so that the [receive] attempt diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt index ecb4262622..fd03bbfc98 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt @@ -149,6 +149,20 @@ public suspend inline fun ReceiveChannel.consumeEach(action: (E) -> Unit) for (e in this) action(e) } +/** + * Invokes the given block **synchronously** in the context of cancellation when channel or its job is cancelled. + * If channel is already closed, block is invoked immediately. + * [block] is invoked only once even if channel is cancelled or closed more than once + * + * @param block non-blocking callback which should be invoked on cancellation + */ +public inline fun Channel<*>.onClose(crossinline block: (Throwable?) -> Unit) { + job.invokeOnCompletion { cause -> + if (cause !is JobCancellationException || cause.cause != null) block(cause) + else block(null) + } +} + /** * @suppress: **Deprecated**: binary compatibility with old code */ diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt index a8ec1ff9fe..d3d718498c 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt @@ -17,6 +17,7 @@ package kotlinx.coroutines.experimental.channels import kotlinx.atomicfu.* +import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.internal.* import kotlinx.coroutines.experimental.internalAnnotations.* import kotlinx.coroutines.experimental.intrinsics.* @@ -37,7 +38,12 @@ import kotlinx.coroutines.experimental.selects.* * [opening][openSubscription] and [closing][SubscriptionReceiveChannel.close] subscription takes O(N) time, where N is the * number of subscribers. */ -public class ConflatedBroadcastChannel() : BroadcastChannel { +public class ConflatedBroadcastChannel(override val job: Job = Job()) : BroadcastChannel { + + init { + registerCancellation(job) + } + /** * Creates an instance of this class that already holds a value. * diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt index ae9875670a..6e8dd883c8 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt @@ -16,8 +16,8 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED -import kotlinx.coroutines.experimental.selects.SelectInstance +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.selects.* /** * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations, @@ -30,7 +30,7 @@ import kotlinx.coroutines.experimental.selects.SelectInstance * * This implementation is fully lock-free. */ -public open class ConflatedChannel : AbstractChannel() { +public open class ConflatedChannel(job: Job = Job()) : AbstractChannel(job) { protected final override val isBufferAlwaysEmpty: Boolean get() = true protected final override val isBufferEmpty: Boolean get() = true protected final override val isBufferAlwaysFull: Boolean get() = false diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt index f2962ab073..27b70b9f5f 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt @@ -16,8 +16,8 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED -import kotlinx.coroutines.experimental.selects.SelectInstance +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.selects.* /** * Channel with linked-list buffer of a unlimited capacity (limited only by available memory). @@ -27,7 +27,7 @@ import kotlinx.coroutines.experimental.selects.SelectInstance * * This implementation is fully lock-free. */ -public open class LinkedListChannel : AbstractChannel() { +public open class LinkedListChannel(job: Job = Job()) : AbstractChannel(job) { protected final override val isBufferAlwaysEmpty: Boolean get() = true protected final override val isBufferEmpty: Boolean get() = true protected final override val isBufferAlwaysFull: Boolean get() = false diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt index 689e36fea4..d373866360 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt @@ -16,6 +16,8 @@ package kotlinx.coroutines.experimental.channels +import kotlinx.coroutines.experimental.* + /** * Rendezvous channel. This channel does not have any buffer at all. An element is transferred from sender * to receiver only when [send] and [receive] invocations meet in time (rendezvous), so [send] suspends @@ -25,7 +27,7 @@ package kotlinx.coroutines.experimental.channels * * This implementation is fully lock-free. */ -public open class RendezvousChannel : AbstractChannel() { +public open class RendezvousChannel(job: Job = Job()) : AbstractChannel(job) { protected final override val isBufferAlwaysEmpty: Boolean get() = true protected final override val isBufferEmpty: Boolean get() = true protected final override val isBufferAlwaysFull: Boolean get() = true diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.common.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.common.kt index c2b1dddd44..53443e5243 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.common.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/TestBase.common.kt @@ -16,6 +16,8 @@ package kotlinx.coroutines.experimental +import kotlin.test.* + public expect open class TestBase constructor() { public val isStressTest: Boolean public val stressTestMultiplier: Int @@ -31,3 +33,23 @@ public expect open class TestBase constructor() { block: suspend CoroutineScope.() -> Unit ) } + +suspend inline fun assertFailsWith(deferred: Deferred<*>) { + try { + deferred.await() + fail("Expected ${T::class} to be thrown, but was completed successfully") + } catch (e: Exception) { + assertTrue(e is T, "Expected exception of type ${T::class}, but has $e}") + } +} + +// Clashes with assertFailsWith +suspend inline fun assertFails(noinline block: suspend () -> Unit): T { + try { + block() + fail("Expected ${T::class} to be thrown, but was completed successfully") + } catch (e: Throwable) { + assertTrue(e is T, "Expected exception of type ${T::class}, but has $e}") + return e as T + } +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BasicOperationsTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BasicOperationsTest.kt new file mode 100644 index 0000000000..288e112ff3 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BasicOperationsTest.kt @@ -0,0 +1,190 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class BasicOperationsTest : TestBase() { + + @Test + fun testSimpleSendReceive() = testChannel { + // Parametrized common test :( + val channel = it.create() + launch(coroutineContext) { + repeat(100) { channel.send(it) } + channel.close() + } + var expected = 0 + for (x in channel) { + if (!it.isConflated) { + assertEquals(expected++, x, "Type: $it") + } else { + assertTrue(x >= expected, "Type: $it") + expected = x + 1 + } + } + if (!it.isConflated) { + assertEquals(100, expected, "Type: $it") + } + } + + @Test + fun testOfferAfterClose() = testChannel { + val channel = it.create() + val d = async(coroutineContext) { channel.send(42) } + yield() + channel.close() + assertTrue(channel.isClosedForSend) + try { + channel.offer(2) + fail() + } catch (e: ClosedSendChannelException) { + if (!it.isConflated) { + assertEquals(42, channel.receive()) + } + } + d.await() + } + + @Test + fun testReceiveOrNullAfterClose() = testChannel { + val channel = it.create() + val d = async(coroutineContext) { + channel.receive() + } + yield() + channel.close() + assertTrue(channel.isClosedForReceive) + assertNull(channel.receiveOrNull()) + assertNull(channel.poll()) + assertFailsWith(d) + } + + @Test + fun testReceiveOrNullAfterCloseWithException() = testChannel { + testReceiveOrNullException(it) + } + + @Test + fun testJobCompletion() = testChannel() { + var channel: Channel? = null + val producer = async(coroutineContext) { + channel = it.create(coroutineContext[Job]!!) + channel!!.send(1) + } + val consumer = async(coroutineContext) { + for (element in channel!!) { + assertEquals(1, element) + } + } + producer.await() + consumer.await() + } + + @Test + fun testJobCancellationWithoutCause() = testChannel { + testJobCancellation(it) + } + + @Test + fun testJobCancellationWithCause() = testChannel { + testJobCancellation(it, IllegalStateException()) + } + + @Test + fun testOnClose() = testChannel { + val channel = it.create() + var invoked = false + channel.onClose { + assertNull(it) + invoked = true + } + + channel.close() + assertTrue(invoked, "Kind $it") + } + + @Test + fun testOnCloseWithException() = testChannel { + val channel = it.create() + var invoked = false + channel.onClose { + assertTrue(it is IllegalStateException) + invoked = true + } + + channel.close(IllegalStateException()) + assertTrue(invoked, "Kind $it") + } + + @Test + fun testOnCloseWithJob() = testChannel { + val channel = it.create() + var invoked = false + channel.onClose { + assertTrue(it is IllegalArgumentException) + invoked = true + } + + channel.job.cancel(IllegalArgumentException()) + assertTrue(invoked, "Kind $it") + } + + @Test + fun testOnCloseMultipleTimes() = testChannel { + val channel = it.create() + var invoked = false + channel.onClose { + assertFalse(invoked) + invoked = true + } + + assertTrue(channel.job.cancel()) + assertFalse(channel.job.cancel()) + assertTrue(invoked, "Kind $it") + } + + private suspend inline fun testJobCancellation(kind: TestChannelKind, exception: T? = null) { + var channel: Channel? = null + val producer = async(coroutineContext) { + channel = kind.create(coroutineContext[Job]!!) + + while (true) { + channel!!.send(1) + yield() + } + } + + val consumer = async(coroutineContext) { + for (element in channel!!) { + val value = channel!!.receive() + assertEquals(1, value) + } + } + + yield() + producer.cancel(exception) + assertFailsWith(producer) + assertFailsWith(consumer) + } + + private suspend fun testReceiveOrNullException(kind: TestChannelKind) { + val channel = kind.create() + val d = async(coroutineContext) { + channel.receive() + } + + yield() + channel.close(IndexOutOfBoundsException()) + assertTrue(channel.isClosedForReceive) + + assertFails { channel.poll() } + assertFails { channel.receiveOrNull() } + assertFailsWith(d) + } + + // Parametrized common test :( + private fun testChannel(block: suspend CoroutineScope.(TestChannelKind) -> Unit) { + TestChannelKind.values().forEach { kind -> runTest { block(kind) } } + } +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt index 41ba67851d..ce4324ca7a 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt @@ -21,6 +21,7 @@ import kotlin.coroutines.experimental.* import kotlin.test.* class ProduceTest : TestBase() { + @Test fun testBasic() = runTest { val c = produce(coroutineContext) { @@ -93,5 +94,21 @@ class ProduceTest : TestBase() { } } + @Test + fun testAwaitProducerJob() = runTest { + val source = produce(coroutineContext) { + for (i in 1..100) { + send(i) + } + } + + val transformer = source.map { it * it } + val consumer = async(coroutineContext) { transformer.consumeEach { } } + + source.job.join() + assertTrue(transformer.job.isCompleted) + assertTrue(consumer.isCompleted) + } + private class TestException : Exception() } diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt deleted file mode 100644 index 69e939f2d9..0000000000 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt +++ /dev/null @@ -1,35 +0,0 @@ -package kotlinx.coroutines.experimental.channels - -import kotlinx.coroutines.experimental.* -import kotlin.coroutines.experimental.* -import kotlin.test.* - -class SimpleSendReceiveTest : TestBase() { - - @Test - fun testSimpleSendReceive() = runTest { - // Parametrized common test :( - TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) } - } - - private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) { - val channel = kind.create() - - launch(coroutineContext) { - repeat(iterations) { channel.send(it) } - channel.close() - } - var expected = 0 - for (x in channel) { - if (!kind.isConflated) { - assertEquals(expected++, x) - } else { - assertTrue(x >= expected) - expected = x + 1 - } - } - if (!kind.isConflated) { - assertEquals(iterations, expected) - } - } -} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt index c3ac904cd1..b1b7b5f90f 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt @@ -16,46 +16,47 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.coroutines.experimental.selects.SelectClause1 +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.selects.* enum class TestChannelKind { RENDEZVOUS { - override fun create(): Channel = RendezvousChannel() + override fun create(job: Job): Channel = RendezvousChannel(job) override fun toString(): String = "RendezvousChannel" }, ARRAY_1 { - override fun create(): Channel = ArrayChannel(1) + override fun create(job: Job): Channel = ArrayChannel(1, job) override fun toString(): String = "ArrayChannel(1)" }, ARRAY_10 { - override fun create(): Channel = ArrayChannel(8) + override fun create(job: Job): Channel = ArrayChannel(8, job) override fun toString(): String = "ArrayChannel(8)" }, LINKED_LIST { - override fun create(): Channel = LinkedListChannel() + override fun create(job: Job): Channel = LinkedListChannel(job) override fun toString(): String = "LinkedListChannel" }, CONFLATED { - override fun create(): Channel = ConflatedChannel() + override fun create(job: Job): Channel = ConflatedChannel(job) override fun toString(): String = "ConflatedChannel" override val isConflated: Boolean get() = true }, ARRAY_BROADCAST_1 { - override fun create(): Channel = ChannelViaBroadcast(ArrayBroadcastChannel(1)) + override fun create(job: Job): Channel = ChannelViaBroadcast(ArrayBroadcastChannel(1, job)) override fun toString(): String = "ArrayBroadcastChannel(1)" }, ARRAY_BROADCAST_10 { - override fun create(): Channel = ChannelViaBroadcast(ArrayBroadcastChannel(10)) + override fun create(job: Job): Channel = ChannelViaBroadcast(ArrayBroadcastChannel(10, job)) override fun toString(): String = "ArrayBroadcastChannel(10)" }, CONFLATED_BROADCAST { - override fun create(): Channel = ChannelViaBroadcast(ConflatedBroadcastChannel()) + override fun create(job: Job): Channel = ChannelViaBroadcast(ConflatedBroadcastChannel(job)) override fun toString(): String = "ConflatedBroadcastChannel" override val isConflated: Boolean get() = true } ; - abstract fun create(): Channel + abstract fun create(job: Job = Job()): Channel open val isConflated: Boolean get() = false } @@ -63,7 +64,7 @@ private class ChannelViaBroadcast( private val broadcast: BroadcastChannel ): Channel, SendChannel by broadcast { val sub = broadcast.openSubscription() - + override val job: Job = sub.job override val isClosedForReceive: Boolean get() = sub.isClosedForReceive override val isEmpty: Boolean get() = sub.isEmpty diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt index 21e4c78f46..ac944fe3c0 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt @@ -22,6 +22,9 @@ internal actual typealias JvmName = kotlin.jvm.JvmName @Suppress("ACTUAL_WITHOUT_EXPECT") internal actual typealias JvmMultifileClass = kotlin.jvm.JvmMultifileClass +@Suppress("ACTUAL_WITHOUT_EXPECT") +internal actual typealias JvmOverloads = kotlin.jvm.JvmOverloads + @Suppress("ACTUAL_WITHOUT_EXPECT") internal actual typealias JvmField = kotlin.jvm.JvmField diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt index 15166a43ab..acfb63ce38 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt @@ -16,8 +16,7 @@ package kotlinx.coroutines.experimental.internal -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.loop +import kotlinx.atomicfu.* private typealias Node = LockFreeLinkedListNode @@ -96,8 +95,6 @@ public actual open class LockFreeLinkedListNode { override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE } - public val isFresh: Boolean get() = _next.value === this - public actual val isRemoved: Boolean get() = next is Removed // LINEARIZABLE. Returns Node | Removed diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-11.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-11.kt new file mode 100644 index 0000000000..108b7e9d95 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-11.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. +package guide.channel.example11 + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* + +fun main(args: Array) = runBlocking { + val channel = Channel() + println("Registering onClose in thread ${Thread.currentThread().name}") + channel.onClose { + println("Invoking onClose in thread ${Thread.currentThread().name}") + } + + val job = launch(newSingleThreadContext("ClosingThread")) { + channel.close() + } + + job.join() +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt index ac598120bf..5b41208ccd 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt @@ -1,7 +1,7 @@ // This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. package guide.test -import org.junit.Test +import org.junit.* class GuideTest { @@ -291,6 +291,14 @@ class GuideTest { ) } + @Test + fun testGuideChannelExample11() { + test("GuideChannelExample11") { guide.channel.example11.main(emptyArray()) }.verifyLinesStart( + "Registering onClose in thread main @coroutine#1", + "Invoking onClose in thread ClosingThread @coroutine#2" + ) + } + @Test fun testGuideChannelExample03() { test("GuideChannelExample03") { guide.channel.example03.main(emptyArray()) }.verifyLines( diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt index 5109ba6474..8827dc53de 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ActorTest.kt @@ -146,4 +146,32 @@ class ActorTest(private val capacity: Int) : TestBase() { finish(3) } + + @Test + fun testJoinActor() = runTest { + val actor = actor(coroutineContext, capacity = capacity) { + expect(2) + for (i in channel) { + if (i == 42) { + expect(5) + break + } + + expect(4) + assertEquals(1, i) + } + } + + + val sender = async(coroutineContext) { + expect(3) + actor.send(1) + actor.send(42) + } + + expect(1) + actor.job.join() + sender.await() + finish(6) + } } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelIsClosedLinearizabilityTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelIsClosedLinearizabilityTest.kt new file mode 100644 index 0000000000..d5264ee709 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelIsClosedLinearizabilityTest.kt @@ -0,0 +1,54 @@ +package kotlinx.coroutines.experimental.channels + +import com.devexperts.dxlab.lincheck.* +import com.devexperts.dxlab.lincheck.annotations.* +import com.devexperts.dxlab.lincheck.paramgen.* +import com.devexperts.dxlab.lincheck.stress.* +import kotlinx.coroutines.experimental.* +import org.junit.* +import java.io.* + +@Param(name = "value", gen = IntGen::class, conf = "1:3") +class ChannelIsClosedLinearizabilityTest : TestBase() { + + private val lt = LinTesting() + private lateinit var channel: Channel + + @Reset + fun reset() { + channel = Channel() + } + + @Operation(runOnce = true) + fun send1(@Param(name = "value") value: Int) = lt.run("send1") { channel.send(value) } + + @Operation(runOnce = true) + fun send2(@Param(name = "value") value: Int) = lt.run("send2") { channel.send(value) } + + @Operation(runOnce = true) + fun receive1() = lt.run("receive1") { channel.receive() } + + @Operation(runOnce = true) + fun receive2() = lt.run("receive2") { channel.receive() } + + @Operation(runOnce = true) + fun close1() = lt.run("close1") { channel.close(IOException("close1")) } + + @Operation(runOnce = true) + fun isClosedForReceive() = lt.run("isClosedForReceive") { channel.isClosedForReceive } + + @Operation(runOnce = true) + fun isClosedForSend() = lt.run("isClosedForSend") { channel.isClosedForSend } + + @Test + fun testLinearizability() { + val options = StressOptions() + .iterations(100) + .invocationsPerIteration(1000 * stressTestMultiplier) + .addThread(1, 3) + .addThread(1, 3) + .addThread(1, 3) + .verifier(LinVerifier::class.java) + LinChecker.check(ChannelIsClosedLinearizabilityTest::class.java, options) + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelLinearizabilityTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelLinearizabilityTest.kt index ce4e549d9d..9fbc181641 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelLinearizabilityTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelLinearizabilityTest.kt @@ -16,23 +16,24 @@ package kotlinx.coroutines.experimental.channels -import com.devexperts.dxlab.lincheck.LinChecker -import com.devexperts.dxlab.lincheck.annotations.Operation -import com.devexperts.dxlab.lincheck.annotations.Param -import com.devexperts.dxlab.lincheck.annotations.Reset -import com.devexperts.dxlab.lincheck.paramgen.IntGen +import com.devexperts.dxlab.lincheck.* +import com.devexperts.dxlab.lincheck.annotations.* +import com.devexperts.dxlab.lincheck.paramgen.* import com.devexperts.dxlab.lincheck.stress.* import kotlinx.coroutines.experimental.* -import org.junit.Test +import org.junit.* +import java.io.* @Param(name = "value", gen = IntGen::class, conf = "1:3") class ChannelLinearizabilityTest : TestBase() { + private val lt = LinTesting() + private var capacity = 0 private lateinit var channel: Channel @Reset fun reset() { - channel = Channel() + channel = Channel(capacity) } @Operation(runOnce = true) @@ -53,14 +54,32 @@ class ChannelLinearizabilityTest : TestBase() { @Operation(runOnce = true) fun receive3() = lt.run("receive3") { channel.receive() } -// @Operation(runOnce = true) -// fun close1() = lt.run("close1") { channel.close(IOException("close1")) } -// -// @Operation(runOnce = true) -// fun close2() = lt.run("close2") { channel.close(IOException("close2")) } + @Operation(runOnce = true) + fun close1() = lt.run("close1") { channel.close(IOException("close1")) } + + @Operation(runOnce = true) + fun close2() = lt.run("close2") { channel.close(IOException("close2")) } + + @Test + fun testRendezvousChannelLinearizability() { + runTest(0) + } @Test - fun testLinearizability() { + fun testArrayChannelLinearizability() { + for (i in listOf(1, 2, 16)) { + runTest(i) + } + } + + @Test + fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED) + + @Test + fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED) + + private fun runTest(capacity: Int) { + this.capacity = capacity val options = StressOptions() .iterations(100) .invocationsPerIteration(1000 * stressTestMultiplier) @@ -70,4 +89,4 @@ class ChannelLinearizabilityTest : TestBase() { .verifier(LinVerifier::class.java) LinChecker.check(ChannelLinearizabilityTest::class.java, options) } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SelectChannelLinearizabilityTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SelectChannelLinearizabilityTest.kt new file mode 100644 index 0000000000..feca24e485 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SelectChannelLinearizabilityTest.kt @@ -0,0 +1,128 @@ +/* + * Copyright 2016-2017 JetBrains s.r.o. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kotlinx.coroutines.experimental.channels + +import com.devexperts.dxlab.lincheck.* +import com.devexperts.dxlab.lincheck.annotations.* +import com.devexperts.dxlab.lincheck.paramgen.* +import com.devexperts.dxlab.lincheck.stress.* +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.selects.* +import org.junit.* +import java.io.* + +@Param(name = "value", gen = IntGen::class, conf = "1:3") +class SelectChannelLinearizabilityTest : TestBase() { + + private val lt = LinTesting() + private var capacity = 0 + private lateinit var channel: Channel + + @Reset + fun reset() { + channel = Channel(capacity) + } + + @Operation(runOnce = true) + fun onSend1(@Param(name = "value") value: Int) = onSend(value, "onSend1") + + @Operation(runOnce = true) + fun onSend2(@Param(name = "value") value: Int) = onSend(value, "onSend2") + + + private fun onSend(value: Int, name: String): List { + return lt.run(name) { + select { + channel.onSend(value) {} + } + } + } + + @Operation(runOnce = true) + fun close1() = lt.run("close1") { channel.close(IOException("close1")) } + + @Operation(runOnce = true) + fun close2() = lt.run("close2") { channel.close(IOException("close2")) } + + @Operation(runOnce = true) + fun onReceive1() = onReceive("onReceive1") + + @Operation(runOnce = true) + fun onReceive2() = onReceive("onReceive2") + + private fun onReceive(name: String): List { + return lt.run(name) { + select { + channel.onReceive { + it + } + } + } + } + + @Operation(runOnce = true) + fun onClose1() = onClose("onClose1") + + @Operation(runOnce = true) + fun onClose2() = onClose("onClose2") + + private fun onClose(name: String): List { + return lt.run(name) { + select { + channel.onClose { + try { + val result = channel.offer(42) + throw AssertionError("Offer in 'onClose' should throw, but instead returned $result") + } catch (e: Exception) { + // Should happen + } + } + } + } + } + + @Test + fun testRendezvousChannelLinearizability() { + runTest(0) + } + + @Test + fun testArrayChannelLinearizability() { + for (i in listOf(1, 2, 16)) { + runTest(i) + } + } + + @Test + fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED) + + @Test + fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED) + + private fun runTest(capacity: Int) { + this.capacity = capacity + val options = StressOptions() + .iterations(100) + .invocationsPerIteration(200 * stressTestMultiplier) + .addThread(1, 3) + .addThread(1, 3) + .addThread(1, 3) + .addThread(1, 3) + .verifier(LinVerifier::class.java) + LinChecker.check(SelectChannelLinearizabilityTest::class.java, options) + } +} diff --git a/coroutines-guide.md b/coroutines-guide.md index 65e9130b97..e9505550b8 100644 --- a/coroutines-guide.md +++ b/coroutines-guide.md @@ -1340,6 +1340,32 @@ fun main(args: Array) = runBlocking { Done! --> +Additionally, it's possible to invoke a specific listener which is fired when the channel is closed using [onClose]. +Note that listener is invoked synchronously in the context of close: + +```kotlin +fun main(args: Array) = runBlocking { + val channel = Channel() + println("Registering onClose in thread ${Thread.currentThread().name}") + channel.onClose { + println("Invoking onClose in thread ${Thread.currentThread().name}") + } + + val job = launch(newSingleThreadContext("ClosingThread")) { + channel.close() + } + + job.join() +} +``` + +You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-11.kt) + + + ### Building channel producers The pattern where a coroutine is producing a sequence of elements is quite common. @@ -2441,6 +2467,7 @@ Channel was closed [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html [SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html +[onClose]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/on-close.html [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html [consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html [Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel.html diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt index 26c19ae86e..807fe6fce9 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Annotations.kt @@ -22,6 +22,8 @@ internal actual annotation class JvmName(actual val name: String) @Target(AnnotationTarget.FILE) internal actual annotation class JvmMultifileClass +internal actual annotation class JvmOverloads + internal actual annotation class JvmField internal actual annotation class Volatile diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt index ba53496047..f4e80aa4cf 100644 --- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Publish.kt @@ -81,7 +81,7 @@ private class PublisherCoroutine( // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked private val mutex = Mutex(locked = true) - + override val job: Job = channel.job private val _nRequested = atomic(0L) // < 0 when closed (CLOSED or SIGNALLED) override val isClosedForSend: Boolean get() = isCompleted diff --git a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt index f1fd15a407..8a68f9efbf 100644 --- a/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx1/src/main/kotlin/kotlinx/coroutines/experimental/rx1/RxObservable.kt @@ -80,6 +80,8 @@ private class RxObservableCoroutine( ) : AbstractCoroutine(parentContext, true), ProducerScope, Producer, Subscription, SelectClause2> { override val channel: SendChannel get() = this + override val job: Job = channel.job + // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked private val mutex = Mutex(locked = true) diff --git a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt index 55f1e36115..ba3a27ec9e 100644 --- a/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/main/kotlin/kotlinx/coroutines/experimental/rx2/RxObservable.kt @@ -81,6 +81,8 @@ private class RxObservableCoroutine( ) : AbstractCoroutine(parentContext, true), ProducerScope, Cancellable, SelectClause2> { override val channel: SendChannel get() = this + override val job: Job = channel.job + // Mutex is locked when while subscriber.onXXX is being invoked private val mutex = Mutex()