Skip to content

Channel fixes #362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions binary-compatibility-validator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ sourceSets {
}

test {
dependsOn cleanCompileTestKotlin
dependsOn configurations.testArtifacts

systemProperty 'testCasesClassesDirs', sourceSets.test.output.classesDirs.asPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,12 @@ public final class kotlinx/coroutines/experimental/YieldKt {
}

public abstract class kotlinx/coroutines/experimental/channels/AbstractChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/Channel {
public fun <init> ()V
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
public fun cancel (Ljava/lang/Throwable;)Z
protected fun cleanupSendQueueOnCancel ()V
protected final fun describeTryPoll ()Lkotlinx/coroutines/experimental/channels/AbstractChannel$TryPollDesc;
protected final fun getHasReceiveOrClosed ()Z
public final fun getJob ()Lkotlinx/coroutines/experimental/Job;
public final fun getOnReceive ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
public final fun getOnReceiveOrNull ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
protected abstract fun isBufferAlwaysEmpty ()Z
Expand Down Expand Up @@ -540,14 +541,19 @@ public abstract interface class kotlinx/coroutines/experimental/channels/ActorSc

public final class kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel : kotlinx/coroutines/experimental/channels/AbstractSendChannel, kotlinx/coroutines/experimental/channels/BroadcastChannel {
public fun <init> (I)V
public fun <init> (ILkotlinx/coroutines/experimental/Job;)V
public synthetic fun <init> (ILkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close (Ljava/lang/Throwable;)Z
public final fun getCapacity ()I
public fun getJob ()Lkotlinx/coroutines/experimental/Job;
public fun open ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
public fun openSubscription ()Lkotlinx/coroutines/experimental/channels/SubscriptionReceiveChannel;
}

public class kotlinx/coroutines/experimental/channels/ArrayChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
public fun <init> (I)V
public fun <init> (ILkotlinx/coroutines/experimental/Job;)V
public synthetic fun <init> (ILkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected fun cleanupSendQueueOnCancel ()V
protected fun getBufferDebugString ()Ljava/lang/String;
public final fun getCapacity ()I
Expand Down Expand Up @@ -657,6 +663,7 @@ public final class kotlinx/coroutines/experimental/channels/ChannelsKt {
public static final fun maxWith (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
public static final fun minWith (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
public static final fun onClose (Lkotlinx/coroutines/experimental/channels/Channel;Lkotlin/jvm/functions/Function1;)V
public static final fun requireNoNulls (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
public static final fun single (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
public static final fun singleOrNull (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -712,7 +719,10 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh
public static final field UNDEFINED Lkotlinx/coroutines/experimental/internal/Symbol;
public fun <init> ()V
public fun <init> (Ljava/lang/Object;)V
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close (Ljava/lang/Throwable;)Z
public fun getJob ()Lkotlinx/coroutines/experimental/Job;
public fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2;
public final fun getValue ()Ljava/lang/Object;
public final fun getValueOrNull ()Ljava/lang/Object;
Expand All @@ -726,6 +736,8 @@ public final class kotlinx/coroutines/experimental/channels/ConflatedBroadcastCh

public class kotlinx/coroutines/experimental/channels/ConflatedChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
public fun <init> ()V
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected final fun isBufferAlwaysEmpty ()Z
protected final fun isBufferAlwaysFull ()Z
protected final fun isBufferEmpty ()Z
Expand All @@ -737,6 +749,8 @@ public class kotlinx/coroutines/experimental/channels/ConflatedChannel : kotlinx

public class kotlinx/coroutines/experimental/channels/LinkedListChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
public fun <init> ()V
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected final fun isBufferAlwaysEmpty ()Z
protected final fun isBufferAlwaysFull ()Z
protected final fun isBufferEmpty ()Z
Expand Down Expand Up @@ -775,6 +789,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Produce

public abstract interface class kotlinx/coroutines/experimental/channels/ReceiveChannel {
public abstract fun cancel (Ljava/lang/Throwable;)Z
public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job;
public abstract fun getOnReceive ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/experimental/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
Expand All @@ -797,6 +812,8 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Receive

public class kotlinx/coroutines/experimental/channels/RendezvousChannel : kotlinx/coroutines/experimental/channels/AbstractChannel {
public fun <init> ()V
public fun <init> (Lkotlinx/coroutines/experimental/Job;)V
public synthetic fun <init> (Lkotlinx/coroutines/experimental/Job;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected final fun isBufferAlwaysEmpty ()Z
protected final fun isBufferAlwaysFull ()Z
protected final fun isBufferEmpty ()Z
Expand All @@ -812,6 +829,7 @@ public abstract interface class kotlinx/coroutines/experimental/channels/Send {

public abstract interface class kotlinx/coroutines/experimental/channels/SendChannel {
public abstract fun close (Ljava/lang/Throwable;)Z
public abstract fun getJob ()Lkotlinx/coroutines/experimental/Job;
public abstract fun getOnSend ()Lkotlinx/coroutines/experimental/selects/SelectClause2;
public abstract fun isClosedForSend ()Z
public abstract fun isFull ()Z
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ internal expect annotation class JvmName(val name: String)
@Target(AnnotationTarget.FILE)
internal expect annotation class JvmMultifileClass()

internal expect annotation class JvmOverloads()

internal expect annotation class JvmField()

internal expect annotation class Volatile()
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
* Returns non-null closed token if it is last in the queue.
* @suppress **This is unstable API and it is subject to change.**
*/
protected val closedForSend: Closed<*>? get() = queue.prevNode as? Closed<*>
protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }

/**
* Returns non-null closed token if it is first in the queue.
* @suppress **This is unstable API and it is subject to change.**
*/
protected val closedForReceive: Closed<*>? get() = queue.nextNode as? Closed<*>
protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }

/**
* Retrieves first sending waiter from the queue or returns closed token.
Expand Down Expand Up @@ -181,7 +181,7 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
val result = offerInternal(element)
return when {
result === OFFER_SUCCESS -> true
result === OFFER_FAILED -> false
result === OFFER_FAILED -> throw closedForSend?.sendException ?: return false
result is Closed<*> -> throw result.sendException
else -> error("offerInternal returned $result")
}
Expand Down Expand Up @@ -243,23 +243,53 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {

public override fun close(cause: Throwable?): Boolean {
val closed = Closed<E>(cause)

/*
* Try to commit close by adding a close token to the end of the queue.
* Successful -> we're now responsible for closing receivers
* Not successful -> help closing pending receivers to maintain invariant
* "if (!close()) next send will throw"
*/
val closeAdded = queue.addLastIfPrev(closed, { it !is Closed<*> })
if (!closeAdded) {
helpClose(queue.prevNode as Closed<*>)
return false
}

helpClose(closed)
onClosed(closed)
afterClose(cause)
// Cancel it as the last action so if the channel is closed, then the job is cancelled as well
job.cancel(cause)
return true
}

private fun helpClose(closed: Closed<*>) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discuss: helpClose should probably try to cancel job as well, so it will be linearizable along with channel

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the current implementation doesn't try to cancel the job, could this lead to a deadlock if somehow, a runBlocking { ... } call is waiting for it because it'd be waiting for the job to complete?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation doesn't have a job, this branch is not going to master.

/*
* It's important to traverse list from right to left to avoid races with sender.
* Consider channel state
* head sentinel -> [receiver 1] -> [receiver 2] -> head sentinel
* T1 invokes receive()
* T2 invokes close()
* T3 invokes close() + send(value)
*
* If both will traverse list from left to right, following non-linearizable history is possible:
* [close -> false], [send -> transferred 'value' to receiver]
*/
while (true) {
val receive = takeFirstReceiveOrPeekClosed()
if (receive == null) {
// queue empty or has only senders -- try add last "Closed" item to the queue
if (queue.addLastIfPrev(closed, { prev ->
if (prev is Closed<*>) return false // already closed
prev !is ReceiveOrClosed<*> // only add close if no waiting receive
})) {
onClosed(closed)
afterClose(cause)
return true
}
continue // retry on failure
val previous = closed.prevNode
// Channel is empty or has no receivers
if (previous is LockFreeLinkedListHead || previous !is Receive<*>) {
break
}

if (!previous.remove()) {
continue
}
if (receive is Closed<*>) return false // already marked as closed -- nothing to do
receive as Receive<E> // type assertion
receive.resumeReceiveClosed(closed)

@Suppress("UNCHECKED_CAST")
previous as Receive<E> // type assertion
previous.resumeReceiveClosed(closed)
}
}

Expand Down Expand Up @@ -445,9 +475,13 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
/**
* Abstract send/receive channel. It is a base class for all channel implementations.
*/
public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E> {
public abstract class AbstractChannel<E>(final override val job: Job) : AbstractSendChannel<E>(), Channel<E> {
// ------ extension points for buffered channels ------

init {
registerCancellation(job)
}

/**
* Returns `true` if [isBufferEmpty] is always `true`.
* @suppress **This is unstable API and it is subject to change.**
Expand Down Expand Up @@ -1023,3 +1057,20 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
abstract fun resumeReceiveClosed(closed: Closed<*>)
}

internal fun SendChannel<*>.registerCancellation(job: Job) {
val cancellation = ChannelCancellation(this, job)
job.invokeOnCompletion(cancellation.asHandler)
}

private class ChannelCancellation(
private val channel: SendChannel<*>, job: Job) : JobNode<Job>(job) {

override fun invoke(cause: Throwable?) {
if (job.isCancelled) {
channel.close(cause)
} else {
channel.close()
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.selects.*
Expand All @@ -34,14 +35,20 @@ import kotlinx.coroutines.experimental.selects.*
* The lock at each subscription is also used to manage concurrent attempts to receive from the same subscriber.
* The lists of suspended senders or receivers are lock-free.
*/
class ArrayBroadcastChannel<E>(
class ArrayBroadcastChannel<E> @JvmOverloads constructor(
/**
* Buffer capacity.
*/
val capacity: Int
val capacity: Int,

/**
* Job owning this channel.
*/
override val job: Job = Job()
) : AbstractSendChannel<E>(), BroadcastChannel<E> {
init {
require(capacity >= 1) { "ArrayBroadcastChannel capacity must be at least 1, but $capacity was specified" }
registerCancellation(job)
}

private val bufferLock = ReentrantLock()
Expand Down Expand Up @@ -195,7 +202,7 @@ class ArrayBroadcastChannel<E>(

private class Subscriber<E>(
private val broadcastChannel: ArrayBroadcastChannel<E>
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
) : AbstractChannel<E>(Job()), SubscriptionReceiveChannel<E> {
private val subLock = ReentrantLock()

@Volatile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.selects.*

/**
Expand All @@ -29,12 +30,17 @@ import kotlinx.coroutines.experimental.selects.*
* This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
* The lists of suspended senders or receivers are lock-free.
*/
public open class ArrayChannel<E>(
public open class ArrayChannel<E> @JvmOverloads constructor(
/**
* Buffer capacity.
*/
val capacity: Int
) : AbstractChannel<E>() {
val capacity: Int,

/**
* Job owning this channel.
*/
job: Job = Job()
) : AbstractChannel<E>(job) {
init {
require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,24 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.experimental.selects.SelectClause1
import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.select
import kotlinx.coroutines.experimental.yield
import kotlinx.coroutines.experimental.selects.*

/**
* Sender's interface to [Channel].
*/
public interface SendChannel<in E> {

/**
* The job of this channel bounded with channel lifecycle.
* If job is completed with any reason (either normally or exceptionally), channel is [closed][SendChannel.close]
* with a completion [cause][Job.getCancellationException] of the job.
* If the channel is [closed][isClosedForSend], job is cancelled with the same reason as [SendChannel.close] call
*/
public val job: Job

/**
* Returns `true` if this channel was closed by invocation of [close] and thus
* the [send] and [offer] attempts throws exception.
Expand Down Expand Up @@ -105,6 +109,15 @@ public interface SendChannel<in E> {
* Receiver's interface to [Channel].
*/
public interface ReceiveChannel<out E> {

/**
* The job of this channel bounded with channel lifecycle.
* If job is completed with any reason (either normally or exceptionally), channel is [cancelled][ReceiveChannel.cancel]
* with a completion [cause][Job.getCancellationException] of the job.
* If the channel is cancelled or [closed][isClosedForReceive], job is cancelled with the same reason as [ReceiveChannel.cancel] call
*/
public val job: Job

/**
* Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
* side and all previously sent items were already received, so that the [receive] attempt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
for (e in this) action(e)
}

/**
* Invokes the given block **synchronously** in the context of cancellation when channel or its job is cancelled.
* If channel is already closed, block is invoked immediately.
* [block] is invoked only once even if channel is cancelled or closed more than once
*
* @param block non-blocking callback which should be invoked on cancellation
*/
public inline fun Channel<*>.onClose(crossinline block: (Throwable?) -> Unit) {
job.invokeOnCompletion { cause ->
if (cause !is JobCancellationException || cause.cause != null) block(cause)
else block(null)
}
}

/**
* @suppress: **Deprecated**: binary compatibility with old code
*/
Expand Down
Loading