Skip to content

JS channels #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kotlinx.coroutines.experimental

public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException

public expect open class CancellationException(message: String) : IllegalStateException
public expect open class CancellationException(message: String?) : IllegalStateException

public expect class JobCancellationException(
message: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlin.coroutines.experimental.*
Expand Down Expand Up @@ -934,28 +935,28 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
}

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
@JvmField internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
@JvmField internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED")
@JvmField internal val POLL_FAILED: Any = Symbol("POLL_FAILED")

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
@JvmField internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
@JvmField internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE")
@JvmField internal val NULL_VALUE: Any = Symbol("NULL_VALUE")

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
@JvmField internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")

/** @suppress **This is unstable API and it is subject to change.** */
@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED")
@JvmField internal val SEND_RESUMED = Symbol("SEND_RESUMED")

/**
* Represents sending waiter in the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.selects.*
import java.util.concurrent.*
import java.util.concurrent.locks.*
import kotlin.concurrent.*

/**
* Broadcast channel with array buffer of a fixed [capacity].
Expand Down Expand Up @@ -64,7 +63,7 @@ class ArrayBroadcastChannel<E>(
So read/writes to buffer need not be volatile
*/

private val subs = CopyOnWriteArrayList<Subscriber<E>>()
private val subs = subscriberList<Subscriber<E>>()

override val isBufferAlwaysFull: Boolean get() = false
override val isBufferFull: Boolean get() = size >= capacity
Expand Down Expand Up @@ -132,7 +131,6 @@ class ArrayBroadcastChannel<E>(

// updates head if needed and optionally adds / removes subscriber under the same lock
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
assert(addSub == null || removeSub == null) // only one of them can be specified
// update head in a tail rec loop
var send: Send? = null
var token: Any? = null
Expand Down Expand Up @@ -200,7 +198,8 @@ class ArrayBroadcastChannel<E>(
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
private val subLock = ReentrantLock()

@Volatile @JvmField
@Volatile
@JvmField
var subHead: Long = 0 // guarded by subLock

override val isBufferAlwaysEmpty: Boolean get() = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
import kotlinx.coroutines.experimental.selects.*
import java.util.concurrent.locks.*
import kotlin.concurrent.*

/**
* Channel with array buffer of a fixed [capacity].
Expand Down Expand Up @@ -249,4 +249,4 @@ public open class ArrayChannel<E>(

override val bufferDebugString: String
get() = "(buffer:capacity=${buffer.size},size=$size)"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import java.io.Closeable
import kotlinx.coroutines.experimental.internal.Closeable

/**
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,12 @@ internal open class ChannelCoroutine<E>(
val channel: Channel<E>
get() = this

// Workaround for KT-23094
override suspend fun receive(): E = _channel.receive()

override suspend fun send(element: E) = _channel.send(element)

override suspend fun receiveOrNull(): E? = _channel.receiveOrNull()

override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,6 @@ import kotlin.coroutines.experimental.*

internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"

// -------- Operations on SendChannel --------

/**
* Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
* or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
*
* This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
* so this function should not be used from coroutine.
*/
public fun <E> SendChannel<E>.sendBlocking(element: E) {
// fast path
if (offer(element))
return
// slow path
runBlocking {
send(element)
}
}

// -------- Conversions to ReceiveChannel --------

Expand Down Expand Up @@ -120,7 +102,7 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
if (exception == null) {
exception = e
} else {
exception.addSuppressed(e)
exception.addSuppressedThrowable(e)
}
}
exception?.let { throw it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlinx.coroutines.experimental.internal.Symbol
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*

/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
Expand Down Expand Up @@ -162,8 +161,8 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
check(i >= 0)
if (n == 1) return null
val update = arrayOfNulls<Subscriber<E>>(n - 1)
System.arraycopy(list, 0, update, 0, i)
System.arraycopy(list, i + 1, update, i, n - i - 1)
arraycopy(list, 0, update, 0, i)
arraycopy(list, i + 1, update, i, n - i - 1)
return update as Array<Subscriber<E>>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,3 @@ public open class ConflatedChannel<E> : AbstractChannel<E>() {
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package kotlinx.coroutines.experimental.internal

/**
* Cross-platform array copy. Overlaps of source and destination are not supported
*/
internal expect fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public abstract class AtomicOp<in T> : OpDescriptor() {
final override fun perform(affected: Any?): Any? {
// make decision on status
var decision = this._consensus.value
if (decision === NO_DECISION)
if (decision === NO_DECISION) {
decision = decide(prepare(affected as T))
}

complete(affected as T, decision)
return decision
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package kotlinx.coroutines.experimental.internal

/**
* Closeable entity.
* @suppress **Deprecated**
*/
@Deprecated("No replacement, see specific use")
public expect interface Closeable {
@Deprecated("No replacement, see specific code")
fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kotlinx.coroutines.experimental.internal

/**
* Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel]
* On JVM it's CopyOnWriteList and on JS it's MutableList.
*
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
*/
internal typealias SubscribersList<E> = MutableList<E>

internal expect fun <E> subscriberList(): SubscribersList<E>

internal expect class ReentrantLock() {
fun tryLock(): Boolean
fun unlock(): Unit
}

internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package kotlinx.coroutines.experimental.internal

import kotlin.jvm.*

/** @suppress **This is unstable API and it is subject to change.** */
public expect open class LockFreeLinkedListNode() {
public val isRemoved: Boolean
public val next: Any
public val nextNode: LockFreeLinkedListNode
public val prev: Any
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
Expand Down Expand Up @@ -57,11 +57,24 @@ public expect open class AddLastDesc<T : LockFreeLinkedListNode>(
val queue: LockFreeLinkedListNode
val node: T
protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}

/** @suppress **This is unstable API and it is subject to change.** */
public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
val queue: LockFreeLinkedListNode
public val result: T
protected open fun validatePrepared(node: T): Boolean
protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}

/** @suppress **This is unstable API and it is subject to change.** */
public expect abstract class AbstractAtomicDesc : AtomicDesc {
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
final override fun prepare(op: AtomicOp<*>): Any?
final override fun complete(op: AtomicOp<*>, failure: Any?)
protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ package kotlinx.coroutines.experimental.internal
*
* @suppress **This is unstable API and it is subject to change.**
*/
public class Symbol(val symbol: String) {
internal class Symbol(val symbol: String) {
override fun toString(): String = symbol
}
Loading