Skip to content

Commit 05d13ac

Browse files
qwwdfsadelizarov
authored andcommitted
Migrate channels and related operators to common, so channels can be used from JS
Fixes #201
1 parent f4eb05a commit 05d13ac

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+711
-349
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package kotlinx.coroutines.experimental
1818

1919
public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
2020

21-
public expect open class CancellationException(message: String) : IllegalStateException
21+
public expect open class CancellationException(message: String?) : IllegalStateException
2222

2323
public expect class JobCancellationException(
2424
message: String,

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.*
2020
import kotlinx.coroutines.experimental.internal.*
21+
import kotlinx.coroutines.experimental.internalAnnotations.*
2122
import kotlinx.coroutines.experimental.intrinsics.*
2223
import kotlinx.coroutines.experimental.selects.*
2324
import kotlin.coroutines.experimental.*
+5-6
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.internal.*
20+
import kotlinx.coroutines.experimental.internalAnnotations.*
1921
import kotlinx.coroutines.experimental.selects.*
20-
import java.util.concurrent.*
21-
import java.util.concurrent.locks.*
22-
import kotlin.concurrent.*
2322

2423
/**
2524
* Broadcast channel with array buffer of a fixed [capacity].
@@ -64,7 +63,7 @@ class ArrayBroadcastChannel<E>(
6463
So read/writes to buffer need not be volatile
6564
*/
6665

67-
private val subs = CopyOnWriteArrayList<Subscriber<E>>()
66+
private val subs = subscriberList<Subscriber<E>>()
6867

6968
override val isBufferAlwaysFull: Boolean get() = false
7069
override val isBufferFull: Boolean get() = size >= capacity
@@ -132,7 +131,6 @@ class ArrayBroadcastChannel<E>(
132131

133132
// updates head if needed and optionally adds / removes subscriber under the same lock
134133
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
135-
assert(addSub == null || removeSub == null) // only one of them can be specified
136134
// update head in a tail rec loop
137135
var send: Send? = null
138136
var token: Any? = null
@@ -200,7 +198,8 @@ class ArrayBroadcastChannel<E>(
200198
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
201199
private val subLock = ReentrantLock()
202200

203-
@Volatile @JvmField
201+
@Volatile
202+
@JvmField
204203
var subHead: Long = 0 // guarded by subLock
205204

206205
override val isBufferAlwaysEmpty: Boolean get() = false

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.internal.*
20+
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
1921
import kotlinx.coroutines.experimental.selects.*
20-
import java.util.concurrent.locks.*
21-
import kotlin.concurrent.*
2222

2323
/**
2424
* Channel with array buffer of a fixed [capacity].
@@ -249,4 +249,4 @@ public open class ArrayChannel<E>(
249249

250250
override val bufferDebugString: String
251251
get() = "(buffer:capacity=${buffer.size},size=$size)"
252-
}
252+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.*
1920
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2021
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
21-
import java.io.Closeable
22+
import kotlinx.coroutines.experimental.internal.Closeable
2223

2324
/**
2425
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt

+7
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,12 @@ internal open class ChannelCoroutine<E>(
2727
val channel: Channel<E>
2828
get() = this
2929

30+
// Workaround for KT-23094
31+
override suspend fun receive(): E = _channel.receive()
32+
33+
override suspend fun send(element: E) = _channel.send(element)
34+
35+
override suspend fun receiveOrNull(): E? = _channel.receiveOrNull()
36+
3037
override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
3138
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

+1-19
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,6 @@ import kotlin.coroutines.experimental.*
2121

2222
internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
2323

24-
// -------- Operations on SendChannel --------
25-
26-
/**
27-
* Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
28-
* or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
29-
*
30-
* This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
31-
* so this function should not be used from coroutine.
32-
*/
33-
public fun <E> SendChannel<E>.sendBlocking(element: E) {
34-
// fast path
35-
if (offer(element))
36-
return
37-
// slow path
38-
runBlocking {
39-
send(element)
40-
}
41-
}
4224

4325
// -------- Conversions to ReceiveChannel --------
4426

@@ -120,7 +102,7 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
120102
if (exception == null) {
121103
exception = e
122104
} else {
123-
exception.addSuppressed(e)
105+
exception.addSuppressedThrowable(e)
124106
}
125107
}
126108
exception?.let { throw it }
+7-8
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.atomicfu.atomic
20-
import kotlinx.atomicfu.loop
21-
import kotlinx.coroutines.experimental.internal.Symbol
22-
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
23-
import kotlinx.coroutines.experimental.selects.SelectClause2
24-
import kotlinx.coroutines.experimental.selects.SelectInstance
19+
import kotlinx.atomicfu.*
20+
import kotlinx.coroutines.experimental.internal.*
21+
import kotlinx.coroutines.experimental.internalAnnotations.*
22+
import kotlinx.coroutines.experimental.intrinsics.*
23+
import kotlinx.coroutines.experimental.selects.*
2524

2625
/**
2726
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -162,8 +161,8 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
162161
check(i >= 0)
163162
if (n == 1) return null
164163
val update = arrayOfNulls<Subscriber<E>>(n - 1)
165-
System.arraycopy(list, 0, update, 0, i)
166-
System.arraycopy(list, i + 1, update, i, n - i - 1)
164+
arraycopy(list, 0, update, 0, i)
165+
arraycopy(list, i + 1, update, i, n - i - 1)
167166
return update as Array<Subscriber<E>>
168167
}
169168

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt

-1
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,3 @@ public open class ConflatedChannel<E> : AbstractChannel<E>() {
8080
}
8181
}
8282
}
83-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package kotlinx.coroutines.experimental.internal
2+
3+
/**
4+
* Cross-platform array copy. Overlaps of source and destination are not supported
5+
*/
6+
expect fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package kotlinx.coroutines.experimental.internal
2+
3+
expect interface Closeable {
4+
fun close()
5+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package kotlinx.coroutines.experimental.internal
2+
3+
/**
4+
* Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel]
5+
* On JVM it's CopyOnWriteList and on JS it's MutableList.
6+
*
7+
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
8+
*/
9+
typealias SubscribersList<E> = MutableList<E>
10+
11+
expect fun <E> subscriberList(): SubscribersList<E>
12+
13+
expect class ReentrantLock() {
14+
fun tryLock(): Boolean
15+
fun unlock(): Unit
16+
}
17+
18+
expect inline fun <T> ReentrantLock.withLock(action: () -> T): T

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt

+15-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package kotlinx.coroutines.experimental.internal
1818

19-
import kotlin.jvm.*
20-
2119
/** @suppress **This is unstable API and it is subject to change.** */
2220
public expect open class LockFreeLinkedListNode() {
2321
public val isRemoved: Boolean
22+
public val next: Any
2423
public val nextNode: LockFreeLinkedListNode
24+
public val prev: Any
2525
public val prevNode: LockFreeLinkedListNode
2626
public fun addLast(node: LockFreeLinkedListNode)
2727
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
@@ -57,11 +57,23 @@ public expect open class AddLastDesc<T : LockFreeLinkedListNode>(
5757
val queue: LockFreeLinkedListNode
5858
val node: T
5959
protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
60+
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
61+
}
62+
63+
public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
64+
val queue: LockFreeLinkedListNode
65+
public val result: T
66+
protected open fun validatePrepared(node: T): Boolean
67+
protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
68+
final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
6069
}
6170

6271
/** @suppress **This is unstable API and it is subject to change.** */
6372
public expect abstract class AbstractAtomicDesc : AtomicDesc {
64-
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
6573
final override fun prepare(op: AtomicOp<*>): Any?
6674
final override fun complete(op: AtomicOp<*>, failure: Any?)
75+
protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
76+
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
77+
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
78+
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
6779
}

0 commit comments

Comments
 (0)