Skip to content

Commit f2bdf60

Browse files
committed
Merged develop (js-channels work) into await-all
# Conflicts: # common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt # common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt # common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt # common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt # common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt # common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt
2 parents 189e995 + 11d6b5b commit f2bdf60

File tree

57 files changed

+942
-432
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+942
-432
lines changed

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package kotlinx.coroutines.experimental
1818

1919
public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException
2020

21-
public expect open class CancellationException(message: String) : IllegalStateException
21+
public expect open class CancellationException(message: String?) : IllegalStateException
2222

2323
public expect class JobCancellationException(
2424
message: String,

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+9-8
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package kotlinx.coroutines.experimental.channels
1818

1919
import kotlinx.coroutines.experimental.*
2020
import kotlinx.coroutines.experimental.internal.*
21+
import kotlinx.coroutines.experimental.internalAnnotations.*
2122
import kotlinx.coroutines.experimental.intrinsics.*
2223
import kotlinx.coroutines.experimental.selects.*
2324
import kotlin.coroutines.experimental.*
@@ -934,28 +935,28 @@ public abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E>
934935
}
935936

936937
/** @suppress **This is unstable API and it is subject to change.** */
937-
@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
938+
@JvmField internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS")
938939

939940
/** @suppress **This is unstable API and it is subject to change.** */
940-
@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
941+
@JvmField internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED")
941942

942943
/** @suppress **This is unstable API and it is subject to change.** */
943-
@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED")
944+
@JvmField internal val POLL_FAILED: Any = Symbol("POLL_FAILED")
944945

945946
/** @suppress **This is unstable API and it is subject to change.** */
946-
@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
947+
@JvmField internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED")
947948

948949
/** @suppress **This is unstable API and it is subject to change.** */
949-
@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
950+
@JvmField internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED")
950951

951952
/** @suppress **This is unstable API and it is subject to change.** */
952-
@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE")
953+
@JvmField internal val NULL_VALUE: Any = Symbol("NULL_VALUE")
953954

954955
/** @suppress **This is unstable API and it is subject to change.** */
955-
@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
956+
@JvmField internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED")
956957

957958
/** @suppress **This is unstable API and it is subject to change.** */
958-
@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED")
959+
@JvmField internal val SEND_RESUMED = Symbol("SEND_RESUMED")
959960

960961
/**
961962
* Represents sending waiter in the queue.
+5-6
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.internal.*
20+
import kotlinx.coroutines.experimental.internalAnnotations.*
1921
import kotlinx.coroutines.experimental.selects.*
20-
import java.util.concurrent.*
21-
import java.util.concurrent.locks.*
22-
import kotlin.concurrent.*
2322

2423
/**
2524
* Broadcast channel with array buffer of a fixed [capacity].
@@ -64,7 +63,7 @@ class ArrayBroadcastChannel<E>(
6463
So read/writes to buffer need not be volatile
6564
*/
6665

67-
private val subs = CopyOnWriteArrayList<Subscriber<E>>()
66+
private val subs = subscriberList<Subscriber<E>>()
6867

6968
override val isBufferAlwaysFull: Boolean get() = false
7069
override val isBufferFull: Boolean get() = size >= capacity
@@ -132,7 +131,6 @@ class ArrayBroadcastChannel<E>(
132131

133132
// updates head if needed and optionally adds / removes subscriber under the same lock
134133
private tailrec fun updateHead(addSub: Subscriber<E>? = null, removeSub: Subscriber<E>? = null) {
135-
assert(addSub == null || removeSub == null) // only one of them can be specified
136134
// update head in a tail rec loop
137135
var send: Send? = null
138136
var token: Any? = null
@@ -200,7 +198,8 @@ class ArrayBroadcastChannel<E>(
200198
) : AbstractChannel<E>(), SubscriptionReceiveChannel<E> {
201199
private val subLock = ReentrantLock()
202200

203-
@Volatile @JvmField
201+
@Volatile
202+
@JvmField
204203
var subHead: Long = 0 // guarded by subLock
205204

206205
override val isBufferAlwaysEmpty: Boolean get() = false

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.internal.*
20+
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
1921
import kotlinx.coroutines.experimental.selects.*
20-
import java.util.concurrent.locks.*
21-
import kotlin.concurrent.*
2222

2323
/**
2424
* Channel with array buffer of a fixed [capacity].
@@ -249,4 +249,4 @@ public open class ArrayChannel<E>(
249249

250250
override val bufferDebugString: String
251251
get() = "(buffer:capacity=${buffer.size},size=$size)"
252-
}
252+
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19+
import kotlinx.coroutines.experimental.*
1920
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
2021
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
21-
import java.io.Closeable
22+
import kotlinx.coroutines.experimental.internal.Closeable
2223

2324
/**
2425
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt

+7
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,12 @@ internal open class ChannelCoroutine<E>(
2727
val channel: Channel<E>
2828
get() = this
2929

30+
// Workaround for KT-23094
31+
override suspend fun receive(): E = _channel.receive()
32+
33+
override suspend fun send(element: E) = _channel.send(element)
34+
35+
override suspend fun receiveOrNull(): E? = _channel.receiveOrNull()
36+
3037
override fun cancel(cause: Throwable?): Boolean = super.cancel(cause)
3138
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

+1-19
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,6 @@ import kotlin.coroutines.experimental.*
2121

2222
internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
2323

24-
// -------- Operations on SendChannel --------
25-
26-
/**
27-
* Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull],
28-
* or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details).
29-
*
30-
* This is a way to call [Channel.send] method inside a blocking code using [runBlocking],
31-
* so this function should not be used from coroutine.
32-
*/
33-
public fun <E> SendChannel<E>.sendBlocking(element: E) {
34-
// fast path
35-
if (offer(element))
36-
return
37-
// slow path
38-
runBlocking {
39-
send(element)
40-
}
41-
}
4224

4325
// -------- Conversions to ReceiveChannel --------
4426

@@ -120,7 +102,7 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler =
120102
if (exception == null) {
121103
exception = e
122104
} else {
123-
exception.addSuppressed(e)
105+
exception.addSuppressedThrowable(e)
124106
}
125107
}
126108
exception?.let { throw it }
+7-8
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.atomicfu.atomic
20-
import kotlinx.atomicfu.loop
21-
import kotlinx.coroutines.experimental.internal.Symbol
22-
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
23-
import kotlinx.coroutines.experimental.selects.SelectClause2
24-
import kotlinx.coroutines.experimental.selects.SelectInstance
19+
import kotlinx.atomicfu.*
20+
import kotlinx.coroutines.experimental.internal.*
21+
import kotlinx.coroutines.experimental.internalAnnotations.*
22+
import kotlinx.coroutines.experimental.intrinsics.*
23+
import kotlinx.coroutines.experimental.selects.*
2524

2625
/**
2726
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
@@ -162,8 +161,8 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
162161
check(i >= 0)
163162
if (n == 1) return null
164163
val update = arrayOfNulls<Subscriber<E>>(n - 1)
165-
System.arraycopy(list, 0, update, 0, i)
166-
System.arraycopy(list, i + 1, update, i, n - i - 1)
164+
arraycopy(list, 0, update, 0, i)
165+
arraycopy(list, i + 1, update, i, n - i - 1)
167166
return update as Array<Subscriber<E>>
168167
}
169168

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt renamed to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt

-1
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,3 @@ public open class ConflatedChannel<E> : AbstractChannel<E>() {
8080
}
8181
}
8282
}
83-
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package kotlinx.coroutines.experimental.internal
2+
3+
/**
4+
* Cross-platform array copy. Overlaps of source and destination are not supported
5+
*/
6+
internal expect fun <E> arraycopy(source: Array<E>, srcPos: Int, destination: Array<E?>, destinationStart: Int, length: Int)

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ public abstract class AtomicOp<in T> : OpDescriptor() {
6666
final override fun perform(affected: Any?): Any? {
6767
// make decision on status
6868
var decision = this._consensus.value
69-
if (decision === NO_DECISION)
69+
if (decision === NO_DECISION) {
7070
decision = decide(prepare(affected as T))
71+
}
72+
7173
complete(affected as T, decision)
7274
return decision
7375
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package kotlinx.coroutines.experimental.internal
2+
3+
/**
4+
* Closeable entity.
5+
* @suppress **Deprecated**
6+
*/
7+
@Deprecated("No replacement, see specific use")
8+
public expect interface Closeable {
9+
@Deprecated("No replacement, see specific code")
10+
fun close()
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package kotlinx.coroutines.experimental.internal
2+
3+
/**
4+
* Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel]
5+
* On JVM it's CopyOnWriteList and on JS it's MutableList.
6+
*
7+
* Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel
8+
*/
9+
internal typealias SubscribersList<E> = MutableList<E>
10+
11+
internal expect fun <E> subscriberList(): SubscribersList<E>
12+
13+
internal expect class ReentrantLock() {
14+
fun tryLock(): Boolean
15+
fun unlock(): Unit
16+
}
17+
18+
internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt

+16-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package kotlinx.coroutines.experimental.internal
1818

19-
import kotlin.jvm.*
20-
2119
/** @suppress **This is unstable API and it is subject to change.** */
2220
public expect open class LockFreeLinkedListNode() {
2321
public val isRemoved: Boolean
22+
public val next: Any
2423
public val nextNode: LockFreeLinkedListNode
24+
public val prev: Any
2525
public val prevNode: LockFreeLinkedListNode
2626
public fun addLast(node: LockFreeLinkedListNode)
2727
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
@@ -57,11 +57,24 @@ public expect open class AddLastDesc<T : LockFreeLinkedListNode>(
5757
val queue: LockFreeLinkedListNode
5858
val node: T
5959
protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
60+
override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
61+
}
62+
63+
/** @suppress **This is unstable API and it is subject to change.** */
64+
public expect open class RemoveFirstDesc<T>(queue: LockFreeLinkedListNode): AbstractAtomicDesc {
65+
val queue: LockFreeLinkedListNode
66+
public val result: T
67+
protected open fun validatePrepared(node: T): Boolean
68+
protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
69+
final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
6070
}
6171

6272
/** @suppress **This is unstable API and it is subject to change.** */
6373
public expect abstract class AbstractAtomicDesc : AtomicDesc {
64-
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any?
6574
final override fun prepare(op: AtomicOp<*>): Any?
6675
final override fun complete(op: AtomicOp<*>, failure: Any?)
76+
protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
77+
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
78+
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
79+
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
6780
}

common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ package kotlinx.coroutines.experimental.internal
2121
*
2222
* @suppress **This is unstable API and it is subject to change.**
2323
*/
24-
public class Symbol(val symbol: String) {
24+
internal class Symbol(val symbol: String) {
2525
override fun toString(): String = symbol
2626
}

0 commit comments

Comments
 (0)