From 05d13aca47acb5f6c7a5805fa3726e7dfa15b5bd Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 20 Apr 2018 18:13:33 +0300 Subject: [PATCH 1/4] Migrate channels and related operators to common, so channels can be used from JS Fixes #201 --- .../experimental/Exceptions.common.kt | 2 +- .../experimental/channels/AbstractChannel.kt | 1 + .../channels/ArrayBroadcastChannel.kt | 11 +- .../experimental/channels/ArrayChannel.kt | 6 +- .../experimental/channels/BroadcastChannel.kt | 3 +- .../experimental/channels/Channel.kt | 0 .../experimental/channels/ChannelCoroutine.kt | 7 + .../experimental/channels/Channels.kt | 20 +- .../channels/ConflatedBroadcastChannel.kt | 15 +- .../experimental/channels/ConflatedChannel.kt | 1 - .../channels/LinkedListChannel.kt | 0 .../experimental/channels/Produce.kt | 0 .../channels/RendezvousChannel.kt | 0 .../experimental/internal/ArrayCopy.common.kt | 6 + .../experimental/internal/Closeable.common.kt | 5 + .../internal/Concurrent.common.kt | 18 ++ .../internal/LockFreeLinkedList.common.kt | 18 +- .../channels/ArrayBroadcastChannelTest.kt | 62 +++--- .../experimental/channels/ArrayChannelTest.kt | 37 +--- .../channels/BroadcastChannelFactoryTest.kt | 26 +-- .../channels/ChannelFactoryTest.kt | 27 +-- .../experimental/channels/ChannelsTest.kt | 194 ++++++++---------- .../channels/ConflatedBroadcastChannelTest.kt | 70 ++++--- .../channels/ConflatedChannelTest.kt | 35 ++-- .../channels/LinkedListChannelTest.kt | 21 +- .../channels/ProduceConsumeTest.kt | 55 +++++ .../experimental/channels/ProduceTest.kt | 5 +- .../channels/RendezvousChannelTest.kt | 53 ++--- .../channels/SendReceiveStressTest.kt | 46 +++++ .../channels/SimpleSendReceiveTest.kt | 35 ++++ .../channels/TestBroadcastChannelKind.kt | 6 +- .../experimental/channels/TestChannelKind.kt | 18 +- .../experimental/channels/ChannelsJvm.kt | 22 ++ .../experimental/internal/ArrayCopy.kt | 5 + .../experimental/internal/Closeable.kt | 3 + .../experimental/internal/Concurrent.kt | 10 + .../internal/LockFreeLinkedList.kt | 8 +- .../channels/ArrayChannelStressTest.kt | 39 ++++ .../experimental/channels/ChannelsJvmTest.kt | 21 ++ .../channels/DoubleChannelCloseStressTest.kt | 2 +- ...onsumeTest.kt => ProduceConsumeJvmTest.kt} | 6 +- .../channels/RandevouzChannelStressTest.kt | 26 +++ .../channels/SendReceiveJvmStressTest.kt | 45 ++++ ...iveTest.kt => SimpleSendReceiveJvmTest.kt} | 4 +- .../coroutines/experimental/Exceptions.kt | 2 +- .../experimental/internal/ArrayCopy.kt | 9 + .../experimental/internal/Closeable.kt | 5 + .../experimental/internal/Concurrent.kt | 12 ++ .../experimental/internal/LinkedList.kt | 24 +++ .../experimental/internal/ArrayCopyKtTest.kt | 14 ++ 50 files changed, 711 insertions(+), 349 deletions(-) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt (99%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt (98%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt (98%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt (97%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt (100%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt (82%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt (98%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt (95%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt (99%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt (100%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt (100%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt (100%) create mode 100644 common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt create mode 100644 common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt create mode 100644 common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt (72%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt (85%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt (58%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt (54%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt (77%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt (58%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt (71%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt (72%) create mode 100644 common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt (98%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt (85%) create mode 100644 common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt create mode 100644 common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt (95%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt (82%) create mode 100644 core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvm.kt create mode 100644 core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt create mode 100644 core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt create mode 100644 core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt create mode 100644 core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelStressTest.kt create mode 100644 core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvmTest.kt rename core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/{ProduceConsumeTest.kt => ProduceConsumeJvmTest.kt} (96%) create mode 100644 core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RandevouzChannelStressTest.kt create mode 100644 core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveJvmStressTest.kt rename core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/{SimpleSendReceiveTest.kt => SimpleSendReceiveJvmTest.kt} (98%) create mode 100644 js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt create mode 100644 js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt create mode 100644 js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt create mode 100644 js/kotlinx-coroutines-core-js/src/test/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopyKtTest.kt diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt index e2e67d38ab..6759e8f845 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.common.kt @@ -18,7 +18,7 @@ package kotlinx.coroutines.experimental public expect class CompletionHandlerException(message: String, cause: Throwable) : RuntimeException -public expect open class CancellationException(message: String) : IllegalStateException +public expect open class CancellationException(message: String?) : IllegalStateException public expect class JobCancellationException( message: String, diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt similarity index 99% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt index 4ce28dad7b..5fcfc87bf2 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt @@ -18,6 +18,7 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.internal.* +import kotlinx.coroutines.experimental.internalAnnotations.* import kotlinx.coroutines.experimental.intrinsics.* import kotlinx.coroutines.experimental.selects.* import kotlin.coroutines.experimental.* diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt similarity index 98% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt index 5c4fa55288..ffdf694dca 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt @@ -16,10 +16,9 @@ package kotlinx.coroutines.experimental.channels +import kotlinx.coroutines.experimental.internal.* +import kotlinx.coroutines.experimental.internalAnnotations.* import kotlinx.coroutines.experimental.selects.* -import java.util.concurrent.* -import java.util.concurrent.locks.* -import kotlin.concurrent.* /** * Broadcast channel with array buffer of a fixed [capacity]. @@ -64,7 +63,7 @@ class ArrayBroadcastChannel( So read/writes to buffer need not be volatile */ - private val subs = CopyOnWriteArrayList>() + private val subs = subscriberList>() override val isBufferAlwaysFull: Boolean get() = false override val isBufferFull: Boolean get() = size >= capacity @@ -132,7 +131,6 @@ class ArrayBroadcastChannel( // updates head if needed and optionally adds / removes subscriber under the same lock private tailrec fun updateHead(addSub: Subscriber? = null, removeSub: Subscriber? = null) { - assert(addSub == null || removeSub == null) // only one of them can be specified // update head in a tail rec loop var send: Send? = null var token: Any? = null @@ -200,7 +198,8 @@ class ArrayBroadcastChannel( ) : AbstractChannel(), SubscriptionReceiveChannel { private val subLock = ReentrantLock() - @Volatile @JvmField + @Volatile + @JvmField var subHead: Long = 0 // guarded by subLock override val isBufferAlwaysEmpty: Boolean get() = false diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt similarity index 98% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt index fd8c79d5d1..b118a896c8 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt @@ -16,9 +16,9 @@ package kotlinx.coroutines.experimental.channels +import kotlinx.coroutines.experimental.internal.* +import kotlinx.coroutines.experimental.internalAnnotations.Volatile import kotlinx.coroutines.experimental.selects.* -import java.util.concurrent.locks.* -import kotlin.concurrent.* /** * Channel with array buffer of a fixed [capacity]. @@ -249,4 +249,4 @@ public open class ArrayChannel( override val bufferDebugString: String get() = "(buffer:capacity=${buffer.size},size=$size)" -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt similarity index 97% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt index e071aa216d..0a9c589bdf 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannel.kt @@ -16,9 +16,10 @@ package kotlinx.coroutines.experimental.channels +import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED -import java.io.Closeable +import kotlinx.coroutines.experimental.internal.Closeable /** * Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt similarity index 100% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channel.kt diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt similarity index 82% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt index a527be2387..a23c2dc79d 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelCoroutine.kt @@ -27,5 +27,12 @@ internal open class ChannelCoroutine( val channel: Channel get() = this + // Workaround for KT-23094 + override suspend fun receive(): E = _channel.receive() + + override suspend fun send(element: E) = _channel.send(element) + + override suspend fun receiveOrNull(): E? = _channel.receiveOrNull() + override fun cancel(cause: Throwable?): Boolean = super.cancel(cause) } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt similarity index 98% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt index 87abbf567c..ecb4262622 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt @@ -21,24 +21,6 @@ import kotlin.coroutines.experimental.* internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed" -// -------- Operations on SendChannel -------- - -/** - * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull], - * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details). - * - * This is a way to call [Channel.send] method inside a blocking code using [runBlocking], - * so this function should not be used from coroutine. - */ -public fun SendChannel.sendBlocking(element: E) { - // fast path - if (offer(element)) - return - // slow path - runBlocking { - send(element) - } -} // -------- Conversions to ReceiveChannel -------- @@ -120,7 +102,7 @@ public fun consumesAll(vararg channels: ReceiveChannel<*>): CompletionHandler = if (exception == null) { exception = e } else { - exception.addSuppressed(e) + exception.addSuppressedThrowable(e) } } exception?.let { throw it } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt similarity index 95% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt index 95ae09037b..a8ec1ff9fe 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannel.kt @@ -16,12 +16,11 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.atomicfu.atomic -import kotlinx.atomicfu.loop -import kotlinx.coroutines.experimental.internal.Symbol -import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched -import kotlinx.coroutines.experimental.selects.SelectClause2 -import kotlinx.coroutines.experimental.selects.SelectInstance +import kotlinx.atomicfu.* +import kotlinx.coroutines.experimental.internal.* +import kotlinx.coroutines.experimental.internalAnnotations.* +import kotlinx.coroutines.experimental.intrinsics.* +import kotlinx.coroutines.experimental.selects.* /** * Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers. @@ -162,8 +161,8 @@ public class ConflatedBroadcastChannel() : BroadcastChannel { check(i >= 0) if (n == 1) return null val update = arrayOfNulls>(n - 1) - System.arraycopy(list, 0, update, 0, i) - System.arraycopy(list, i + 1, update, i, n - i - 1) + arraycopy(list, 0, update, 0, i) + arraycopy(list, i + 1, update, i, n - i - 1) return update as Array> } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt similarity index 99% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt index 7589179c0a..ae9875670a 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannel.kt @@ -80,4 +80,3 @@ public open class ConflatedChannel : AbstractChannel() { } } } - diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt similarity index 100% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannel.kt diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt similarity index 100% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Produce.kt diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt similarity index 100% rename from core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt rename to common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannel.kt diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt new file mode 100644 index 0000000000..fe20163456 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt @@ -0,0 +1,6 @@ +package kotlinx.coroutines.experimental.internal + +/** + * Cross-platform array copy. Overlaps of source and destination are not supported + */ +expect fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int) \ No newline at end of file diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt new file mode 100644 index 0000000000..2b1f504b2b --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt @@ -0,0 +1,5 @@ +package kotlinx.coroutines.experimental.internal + +expect interface Closeable { + fun close() +} diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt new file mode 100644 index 0000000000..df36305a29 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt @@ -0,0 +1,18 @@ +package kotlinx.coroutines.experimental.internal + +/** + * Special kind of list intended to be used as collection of subscribers in [ArrayBroadcastChannel] + * On JVM it's CopyOnWriteList and on JS it's MutableList. + * + * Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel + */ +typealias SubscribersList = MutableList + +expect fun subscriberList(): SubscribersList + +expect class ReentrantLock() { + fun tryLock(): Boolean + fun unlock(): Unit +} + +expect inline fun ReentrantLock.withLock(action: () -> T): T diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt index 9cc3e141c9..e23fd988db 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt @@ -16,12 +16,12 @@ package kotlinx.coroutines.experimental.internal -import kotlin.jvm.* - /** @suppress **This is unstable API and it is subject to change.** */ public expect open class LockFreeLinkedListNode() { public val isRemoved: Boolean + public val next: Any public val nextNode: LockFreeLinkedListNode + public val prev: Any public val prevNode: LockFreeLinkedListNode public fun addLast(node: LockFreeLinkedListNode) public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean @@ -57,11 +57,23 @@ public expect open class AddLastDesc( val queue: LockFreeLinkedListNode val node: T protected override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? + override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) +} + +public expect open class RemoveFirstDesc(queue: LockFreeLinkedListNode): AbstractAtomicDesc { + val queue: LockFreeLinkedListNode + public val result: T + protected open fun validatePrepared(node: T): Boolean + protected final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? + final override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } /** @suppress **This is unstable API and it is subject to change.** */ public expect abstract class AbstractAtomicDesc : AtomicDesc { - protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? final override fun prepare(op: AtomicOp<*>): Any? final override fun complete(op: AtomicOp<*>, failure: Any?) + protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? + protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean + protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure + protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt similarity index 72% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt index a71e3f8a56..84da40fe43 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt @@ -17,28 +17,27 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* -import org.hamcrest.core.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* class ArrayBroadcastChannelTest : TestBase() { + @Test - fun testBasic() = runBlocking { + fun testBasic() = runTest { expect(1) val broadcast = ArrayBroadcastChannel(1) - assertThat(broadcast.isClosedForSend, IsEqual(false)) + assertFalse(broadcast.isClosedForSend) val first = broadcast.openSubscription() launch(coroutineContext, CoroutineStart.UNDISPATCHED) { expect(2) - assertThat(first.receive(), IsEqual(1)) // suspends - assertThat(first.isClosedForReceive, IsEqual(false)) + assertEquals(1, first.receive()) // suspends + assertFalse(first.isClosedForReceive) expect(5) - assertThat(first.receive(), IsEqual(2)) // suspends - assertThat(first.isClosedForReceive, IsEqual(false)) + assertEquals(2, first.receive()) // suspends + assertFalse(first.isClosedForReceive) expect(10) - assertThat(first.receiveOrNull(), IsNull()) // suspends - assertThat(first.isClosedForReceive, IsEqual(true)) + assertNull(first.receiveOrNull()) // suspends + assertTrue(first.isClosedForReceive) expect(14) } expect(3) @@ -46,14 +45,15 @@ class ArrayBroadcastChannelTest : TestBase() { expect(4) yield() // to the first receiver expect(6) + val second = broadcast.openSubscription() launch(coroutineContext, CoroutineStart.UNDISPATCHED) { expect(7) - assertThat(second.receive(), IsEqual(2)) // suspends - assertThat(second.isClosedForReceive, IsEqual(false)) + assertEquals(2, second.receive()) // suspends + assertFalse(second.isClosedForReceive) expect(11) - assertThat(second.receiveOrNull(), IsNull()) // suspends - assertThat(second.isClosedForReceive, IsEqual(true)) + assertNull(second.receiveOrNull()) // suspends + assertTrue(second.isClosedForReceive) expect(15) } expect(8) @@ -63,21 +63,21 @@ class ArrayBroadcastChannelTest : TestBase() { expect(12) broadcast.close() expect(13) - assertThat(broadcast.isClosedForSend, IsEqual(true)) + assertTrue(broadcast.isClosedForSend) yield() // to first & second receivers finish(16) } @Test - fun testSendSuspend() = runBlocking { + fun testSendSuspend() = runTest { expect(1) val broadcast = ArrayBroadcastChannel(1) val first = broadcast.openSubscription() launch(coroutineContext) { expect(4) - assertThat(first.receive(), IsEqual(1)) + assertEquals(1, first.receive()) expect(5) - assertThat(first.receive(), IsEqual(2)) + assertEquals(2, first.receive()) expect(6) } expect(2) @@ -88,7 +88,7 @@ class ArrayBroadcastChannelTest : TestBase() { } @Test - fun testConcurrentSendCompletion() = runBlocking { + fun testConcurrentSendCompletion() = runTest { expect(1) val broadcast = ArrayBroadcastChannel(1) val sub = broadcast.openSubscription() @@ -104,17 +104,17 @@ class ArrayBroadcastChannelTest : TestBase() { broadcast.close() // now must receive all 3 items expect(6) - assertThat(sub.isClosedForReceive, IsEqual(false)) + assertFalse(sub.isClosedForReceive) for (x in 1..3) - assertThat(sub.receiveOrNull(), IsEqual(x)) + assertEquals(x, sub.receiveOrNull()) // and receive close signal - assertThat(sub.receiveOrNull(), IsNull()) - assertThat(sub.isClosedForReceive, IsEqual(true)) + assertNull(sub.receiveOrNull()) + assertTrue(sub.isClosedForReceive) finish(7) } @Test - fun testForgetUnsubscribed() = runBlocking { + fun testForgetUnsubscribed() = runTest { expect(1) val broadcast = ArrayBroadcastChannel(1) broadcast.send(1) @@ -124,7 +124,7 @@ class ArrayBroadcastChannelTest : TestBase() { val sub = broadcast.openSubscription() launch(coroutineContext, CoroutineStart.UNDISPATCHED) { expect(3) - assertThat(sub.receive(), IsEqual(4)) // suspends + assertEquals(4, sub.receive()) // suspends expect(5) } expect(4) @@ -134,7 +134,7 @@ class ArrayBroadcastChannelTest : TestBase() { } @Test - fun testReceiveFullAfterClose() = runBlocking { + fun testReceiveFullAfterClose() = runTest { val channel = BroadcastChannel(10) val sub = channel.openSubscription() // generate into buffer & close @@ -148,7 +148,7 @@ class ArrayBroadcastChannelTest : TestBase() { } @Test - fun testCloseSubDuringIteration() = runBlocking { + fun testCloseSubDuringIteration() = runTest { val channel = BroadcastChannel(1) // launch generator (for later) in this context launch(coroutineContext) { @@ -168,9 +168,7 @@ class ArrayBroadcastChannelTest : TestBase() { } @Test - fun testReceiveFromClosedSub() = runTest( - expected = { it is ClosedReceiveChannelException } - ) { + fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) { val channel = BroadcastChannel(1) val sub = channel.openSubscription() assertFalse(sub.isClosedForReceive) @@ -178,4 +176,4 @@ class ArrayBroadcastChannelTest : TestBase() { assertTrue(sub.isClosedForReceive) sub.receive() } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt similarity index 85% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt index 80d3682127..61fdaefd83 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelTest.kt @@ -17,13 +17,13 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* class ArrayChannelTest : TestBase() { + @Test - fun testSimple() = runBlocking { + fun testSimple() = runTest { val q = ArrayChannel(1) check(q.isEmpty && !q.isFull) expect(1) @@ -52,25 +52,7 @@ class ArrayChannelTest : TestBase() { } @Test - fun testStress() = runBlocking { - val n = 100_000 - val q = ArrayChannel(1) - val sender = launch(coroutineContext) { - for (i in 1..n) q.send(i) - expect(2) - } - val receiver = launch(coroutineContext) { - for (i in 1..n) check(q.receive() == i) - expect(3) - } - expect(1) - sender.join() - receiver.join() - finish(4) - } - - @Test - fun testClosedBufferedReceiveOrNull() = runBlocking { + fun testClosedBufferedReceiveOrNull() = runTest { val q = ArrayChannel(1) check(q.isEmpty && !q.isFull && !q.isClosedForSend && !q.isClosedForReceive) expect(1) @@ -95,7 +77,7 @@ class ArrayChannelTest : TestBase() { } @Test - fun testClosedExceptions() = runBlocking { + fun testClosedExceptions() = runTest { val q = ArrayChannel(1) expect(1) launch(coroutineContext) { @@ -106,7 +88,8 @@ class ArrayChannelTest : TestBase() { } } expect(2) - q.close() + + require(q.close()) expect(3) yield() expect(6) @@ -117,7 +100,7 @@ class ArrayChannelTest : TestBase() { } @Test - fun testOfferAndPool() = runBlocking { + fun testOfferAndPool() = runTest { val q = ArrayChannel(1) assertTrue(q.offer(1)) expect(1) @@ -147,7 +130,7 @@ class ArrayChannelTest : TestBase() { } @Test - fun testConsumeAll() = runBlocking { + fun testConsumeAll() = runTest { val q = ArrayChannel(5) for (i in 1..10) { if (i <= 5) { @@ -168,4 +151,4 @@ class ArrayChannelTest : TestBase() { check(q.receiveOrNull() == null) finish(12) } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt similarity index 58% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt index dd60d2426d..2bbc4a1da3 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelFactoryTest.kt @@ -16,34 +16,34 @@ package kotlinx.coroutines.experimental.channels -import org.hamcrest.MatcherAssert.assertThat -import org.hamcrest.core.IsInstanceOf -import org.junit.Test +import kotlin.test.* + class BroadcastChannelFactoryTest { - @Test(expected = IllegalArgumentException::class) + + @Test fun testRendezvousChannelNotSupported() { - BroadcastChannel(0) + assertFailsWith { BroadcastChannel(0) } } - @Test(expected = IllegalArgumentException::class) + @Test fun testLinkedListChannelNotSupported() { - BroadcastChannel(Channel.UNLIMITED) + assertFailsWith { BroadcastChannel(Channel.UNLIMITED) } } @Test fun testConflatedBroadcastChannel() { - assertThat(BroadcastChannel(Channel.CONFLATED), IsInstanceOf(ConflatedBroadcastChannel::class.java)) + assertTrue { BroadcastChannel(Channel.CONFLATED) is ConflatedBroadcastChannel } } @Test fun testArrayBroadcastChannel() { - assertThat(BroadcastChannel(1), IsInstanceOf(ArrayBroadcastChannel::class.java)) - assertThat(BroadcastChannel(10), IsInstanceOf(ArrayBroadcastChannel::class.java)) + assertTrue { BroadcastChannel(1) is ArrayBroadcastChannel } + assertTrue { BroadcastChannel(10) is ArrayBroadcastChannel } } - @Test(expected = IllegalArgumentException::class) + @Test fun testInvalidCapacityNotSupported() { - BroadcastChannel(-2) + assertFailsWith { BroadcastChannel(-2) } } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt similarity index 54% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt index 5ea2bf21e4..d4c5126601 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelFactoryTest.kt @@ -16,35 +16,36 @@ package kotlinx.coroutines.experimental.channels -import org.hamcrest.MatcherAssert.assertThat -import org.hamcrest.core.IsInstanceOf -import org.junit.Test +import kotlinx.coroutines.experimental.* +import kotlin.test.* + + +class ChannelFactoryTest : TestBase() { -class ChannelFactoryTest { @Test fun testRendezvousChannel() { - assertThat(Channel(), IsInstanceOf(RendezvousChannel::class.java)) - assertThat(Channel(0), IsInstanceOf(RendezvousChannel::class.java)) + assertTrue(Channel() is RendezvousChannel) + assertTrue(Channel(0) is RendezvousChannel) } @Test fun testLinkedListChannel() { - assertThat(Channel(Channel.UNLIMITED), IsInstanceOf(LinkedListChannel::class.java)) + assertTrue(Channel(Channel.UNLIMITED) is LinkedListChannel) } @Test fun testConflatedChannel() { - assertThat(Channel(Channel.CONFLATED), IsInstanceOf(ConflatedChannel::class.java)) + assertTrue(Channel(Channel.CONFLATED) is ConflatedChannel) } @Test fun testArrayChannel() { - assertThat(Channel(1), IsInstanceOf(ArrayChannel::class.java)) - assertThat(Channel(10), IsInstanceOf(ArrayChannel::class.java)) + assertTrue(Channel(1) is ArrayChannel) + assertTrue(Channel(10) is ArrayChannel) } - @Test(expected = IllegalArgumentException::class) - fun testInvalidCapacityNotSupported() { + @Test + fun testInvalidCapacityNotSupported() = runTest({ it is IllegalArgumentException }) { Channel(-2) } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt similarity index 77% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt index cdb4f4537b..0fe1fc9749 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsTest.kt @@ -16,87 +16,72 @@ package kotlinx.coroutines.experimental.channels -import kotlinx.coroutines.experimental.async -import kotlinx.coroutines.experimental.runBlocking -import org.junit.Assert.* -import org.junit.Test +import kotlinx.coroutines.experimental.* +import kotlin.math.* +import kotlin.test.* -class ChannelsTest { +class ChannelsTest: TestBase() { private val testList = listOf(1, 2, 3) @Test - fun testBlocking() { - val ch = Channel() - val sum = async { - ch.sumBy { it } - } - repeat(10) { - ch.sendBlocking(it) - } - ch.close() - assertEquals(45, runBlocking { sum.await() }) - - } - - @Test - fun testIterableAsReceiveChannel() = runBlocking { + fun testIterableAsReceiveChannel() = runTest { assertEquals(testList, testList.asReceiveChannel().toList()) } @Test - fun testSequenceAsReceiveChannel() = runBlocking { + fun testSequenceAsReceiveChannel() = runTest { assertEquals(testList, testList.asSequence().asReceiveChannel().toList()) } @Test - fun testAssociate() = runBlocking { + fun testAssociate() = runTest { assertEquals(testList.associate { it * 2 to it * 3 }, testList.asReceiveChannel().associate { it * 2 to it * 3 }.toMap()) } @Test - fun testAssociateBy() = runBlocking { + fun testAssociateBy() = runTest { assertEquals(testList.associateBy { it % 2 }, testList.asReceiveChannel().associateBy { it % 2 }) } @Test - fun testAssociateBy2() = runBlocking { + fun testAssociateBy2() = runTest { assertEquals(testList.associateBy({ it * 2}, { it * 3 }), testList.asReceiveChannel().associateBy({ it * 2}, { it * 3 }).toMap()) } @Test - fun testDistinct() = runBlocking { + fun testDistinct() = runTest { assertEquals(testList.map { it % 2 }.distinct(), testList.asReceiveChannel().map { it % 2 }.distinct().toList()) } @Test - fun testDistinctBy() = runBlocking { + fun testDistinctBy() = runTest { assertEquals(testList.distinctBy { it % 2 }.toList(), testList.asReceiveChannel().distinctBy { it % 2 }.toList()) } @Test - fun testToCollection() = runBlocking { + fun testToCollection() = runTest { val target = mutableListOf() testList.asReceiveChannel().toCollection(target) assertEquals(testList, target) } @Test - fun testDrop() = runBlocking { + fun testDrop() = runTest { for (i in 0..testList.size) { - assertEquals("Drop $i", testList.drop(i), testList.asReceiveChannel().drop(i).toList()) + assertEquals(testList.drop(i), testList.asReceiveChannel().drop(i).toList(), "Drop $i") } } @Test - fun testElementAtOrElse() = runBlocking { + fun testElementAtOrElse() = runTest { assertEquals(testList.elementAtOrElse(2) { 42 }, testList.asReceiveChannel().elementAtOrElse(2) { 42 }) assertEquals(testList.elementAtOrElse(9) { 42 }, testList.asReceiveChannel().elementAtOrElse(9) { 42 }) } @Test - fun testFirst() = runBlocking { + fun testFirst() = runTest { assertEquals(testList.first(), testList.asReceiveChannel().first()) for (i in testList) { assertEquals(testList.first { it == i }, testList.asReceiveChannel().first { it == i }) @@ -109,56 +94,56 @@ class ChannelsTest { } @Test - fun testFirstOrNull() = runBlocking { + fun testFirstOrNull() = runTest { assertEquals(testList.firstOrNull(), testList.asReceiveChannel().firstOrNull()) assertEquals(testList.firstOrNull { it == 2 }, testList.asReceiveChannel().firstOrNull { it == 2 }) assertEquals(testList.firstOrNull { it == 9 }, testList.asReceiveChannel().firstOrNull { it == 9 }) } @Test - fun testFlatMap() = runBlocking { + fun testFlatMap() = runTest { assertEquals(testList.flatMap { (0..it).toList() }, testList.asReceiveChannel().flatMap { (0..it).asReceiveChannel() }.toList()) } @Test - fun testFold() = runBlocking { + fun testFold() = runTest { assertEquals(testList.fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }, testList.asReceiveChannel().fold(mutableListOf(42)) { acc, e -> acc.apply { add(e) } }.toList()) } @Test - fun testFoldIndexed() = runBlocking { + fun testFoldIndexed() = runTest { assertEquals(testList.foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }, testList.asReceiveChannel().foldIndexed(mutableListOf(42)) { index, acc, e -> acc.apply { add(index + e) } }.toList()) } @Test - fun testGroupBy() = runBlocking { + fun testGroupBy() = runTest { assertEquals(testList.groupBy { it % 2 }, testList.asReceiveChannel().groupBy { it % 2 }) } @Test - fun testGroupBy2() = runBlocking { + fun testGroupBy2() = runTest { assertEquals(testList.groupBy({ -it }, { it + 100 }), testList.asReceiveChannel().groupBy({ -it }, { it + 100 }).toMap()) } @Test - fun testMap() = runBlocking { + fun testMap() = runTest { assertEquals(testList.map { it + 10 }, testList.asReceiveChannel().map { it + 10 }.toList()) } @Test - fun testMapToCollection() = runBlocking { + fun testMapToCollection() = runTest { val c = mutableListOf() testList.asReceiveChannel().mapTo(c) { it + 10 } assertEquals(testList.map { it + 10 }, c) } @Test - fun testMapToSendChannel() = runBlocking { + fun testMapToSendChannel() = runTest { val c = produce { testList.asReceiveChannel().mapTo(channel) { it + 10 } } @@ -166,34 +151,34 @@ class ChannelsTest { } @Test - fun testEmptyList() = runBlocking { + fun testEmptyList() = runTest { assertTrue(emptyList().asReceiveChannel().toList().isEmpty()) } @Test - fun testToList() = runBlocking { + fun testToList() = runTest { assertEquals(testList, testList.asReceiveChannel().toList()) } @Test - fun testEmptySet() = runBlocking { + fun testEmptySet() = runTest { assertTrue(emptyList().asReceiveChannel().toSet().isEmpty()) } @Test - fun testToSet() = runBlocking { + fun testToSet() = runTest { assertEquals(testList.toSet(), testList.asReceiveChannel().toSet()) } @Test - fun testToMutableSet() = runBlocking { + fun testToMutableSet() = runTest { assertEquals(testList.toMutableSet(), testList.asReceiveChannel().toMutableSet()) } @Test - fun testEmptySequence() = runBlocking { + fun testEmptySequence() = runTest { val channel = Channel() channel.close() @@ -201,7 +186,7 @@ class ChannelsTest { } @Test - fun testEmptyMap() = runBlocking { + fun testEmptyMap() = runTest { val channel = Channel>() channel.close() @@ -209,50 +194,50 @@ class ChannelsTest { } @Test - fun testToMap() = runBlocking { + fun testToMap() = runTest { val values = testList.map { it to it.toString() } assertEquals(values.toMap(), values.asReceiveChannel().toMap()) } @Test - fun testReduce() = runBlocking { + fun testReduce() = runTest { assertEquals(testList.reduce { acc, e -> acc * e }, testList.asReceiveChannel().reduce { acc, e -> acc * e }) } @Test - fun testReduceIndexed() = runBlocking { + fun testReduceIndexed() = runTest { assertEquals(testList.reduceIndexed { index, acc, e -> index + acc * e }, testList.asReceiveChannel().reduceIndexed { index, acc, e -> index + acc * e }) } @Test - fun testTake() = runBlocking { + fun testTake() = runTest { for (i in 0..testList.size) { assertEquals(testList.take(i), testList.asReceiveChannel().take(i).toList()) } } @Test - fun testPartition() = runBlocking { + fun testPartition() = runTest { assertEquals(testList.partition { it % 2 == 0 }, testList.asReceiveChannel().partition { it % 2 == 0 }) } @Test - fun testZip() = runBlocking { + fun testZip() = runTest { val other = listOf("a", "b") assertEquals(testList.zip(other), testList.asReceiveChannel().zip(other.asReceiveChannel()).toList()) } @Test - fun testElementAt() = runBlocking { + fun testElementAt() = runTest { testList.indices.forEach { i -> assertEquals(testList[i], testList.asReceiveChannel().elementAt(i)) } } @Test - fun testElementAtOrNull() = runBlocking { + fun testElementAtOrNull() = runTest { testList.indices.forEach { i -> assertEquals(testList[i], testList.asReceiveChannel().elementAtOrNull(i)) } @@ -261,7 +246,7 @@ class ChannelsTest { } @Test - fun testFind() = runBlocking { + fun testFind() = runTest { repeat(3) { mod -> assertEquals(testList.find { it % 2 == mod }, testList.asReceiveChannel().find { it % 2 == mod }) @@ -269,28 +254,28 @@ class ChannelsTest { } @Test - fun testFindLast() = runBlocking { + fun testFindLast() = runTest { repeat(3) { mod -> assertEquals(testList.findLast { it % 2 == mod }, testList.asReceiveChannel().findLast { it % 2 == mod }) } } @Test - fun testIndexOf() = runBlocking { + fun testIndexOf() = runTest { repeat(testList.size + 1) { i -> assertEquals(testList.indexOf(i), testList.asReceiveChannel().indexOf(i)) } } @Test - fun testLastIndexOf() = runBlocking { + fun testLastIndexOf() = runTest { repeat(testList.size + 1) { i -> assertEquals(testList.lastIndexOf(i), testList.asReceiveChannel().lastIndexOf(i)) } } @Test - fun testIndexOfFirst() = runBlocking { + fun testIndexOfFirst() = runTest { repeat(3) { mod -> assertEquals(testList.indexOfFirst { it % 2 == mod }, testList.asReceiveChannel().indexOfFirst { it % 2 == mod }) @@ -298,7 +283,7 @@ class ChannelsTest { } @Test - fun testIndexOfLast() = runBlocking { + fun testIndexOfLast() = runTest { repeat(3) { mod -> assertEquals(testList.indexOfLast { it % 2 != mod }, testList.asReceiveChannel().indexOfLast { it % 2 != mod }) @@ -306,13 +291,13 @@ class ChannelsTest { } @Test - fun testLastOrNull() = runBlocking { + fun testLastOrNull() = runTest { assertEquals(testList.lastOrNull(), testList.asReceiveChannel().lastOrNull()) assertEquals(null, emptyList().asReceiveChannel().lastOrNull()) } @Test - fun testSingleOrNull() = runBlocking { + fun testSingleOrNull() = runTest { assertEquals(1, listOf(1).asReceiveChannel().singleOrNull()) assertEquals(null, listOf(1, 2).asReceiveChannel().singleOrNull()) assertEquals(null, emptyList().asReceiveChannel().singleOrNull()) @@ -327,7 +312,7 @@ class ChannelsTest { } @Test - fun testDropWhile() = runBlocking { + fun testDropWhile() = runTest { repeat(3) { mod -> assertEquals(testList.dropWhile { it % 2 == mod }, testList.asReceiveChannel().dropWhile { it % 2 == mod }.toList()) @@ -335,7 +320,7 @@ class ChannelsTest { } @Test - fun testFilter() = runBlocking { + fun testFilter() = runTest { repeat(3) { mod -> assertEquals(testList.filter { it % 2 == mod }, testList.asReceiveChannel().filter { it % 2 == mod }.toList()) @@ -343,7 +328,7 @@ class ChannelsTest { } @Test - fun testFilterToCollection() = runBlocking { + fun testFilterToCollection() = runTest { repeat(3) { mod -> val c = mutableListOf() testList.asReceiveChannel().filterTo(c) { it % 2 == mod } @@ -352,7 +337,7 @@ class ChannelsTest { } @Test - fun testFilterToSendChannel() = runBlocking { + fun testFilterToSendChannel() = runTest { repeat(3) { mod -> val c = produce { testList.asReceiveChannel().filterTo(channel) { it % 2 == mod } @@ -362,7 +347,7 @@ class ChannelsTest { } @Test - fun testFilterNot() = runBlocking { + fun testFilterNot() = runTest { repeat(3) { mod -> assertEquals(testList.filterNot { it % 2 == mod }, testList.asReceiveChannel().filterNot { it % 2 == mod }.toList()) @@ -370,7 +355,7 @@ class ChannelsTest { } @Test - fun testFilterNotToCollection() = runBlocking { + fun testFilterNotToCollection() = runTest { repeat(3) { mod -> val c = mutableListOf() testList.asReceiveChannel().filterNotTo(c) { it % 2 == mod } @@ -379,7 +364,7 @@ class ChannelsTest { } @Test - fun testFilterNotToSendChannel() = runBlocking { + fun testFilterNotToSendChannel() = runTest { repeat(3) { mod -> val c = produce { testList.asReceiveChannel().filterNotTo(channel) { it % 2 == mod } @@ -389,7 +374,7 @@ class ChannelsTest { } @Test - fun testFilterNotNull() = runBlocking { + fun testFilterNotNull() = runTest { repeat(3) { mod -> assertEquals(testList.map { it.takeIf { it % 2 == mod } }.filterNotNull(), testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNull().toList()) @@ -397,7 +382,7 @@ class ChannelsTest { } @Test - fun testFilterNotNullToCollection() = runBlocking { + fun testFilterNotNullToCollection() = runTest { repeat(3) { mod -> val c = mutableListOf() testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(c) @@ -406,7 +391,7 @@ class ChannelsTest { } @Test - fun testFilterNotNullToSendChannel() = runBlocking { + fun testFilterNotNullToSendChannel() = runTest { repeat(3) { mod -> val c = produce { testList.asReceiveChannel().map { it.takeIf { it % 2 == mod } }.filterNotNullTo(channel) @@ -416,7 +401,7 @@ class ChannelsTest { } @Test - fun testFilterIndexed() = runBlocking { + fun testFilterIndexed() = runTest { repeat(3) { mod -> assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, testList.asReceiveChannel().filterIndexed { index, _ -> index % 2 == mod }.toList()) @@ -424,7 +409,7 @@ class ChannelsTest { } @Test - fun testFilterIndexedToCollection() = runBlocking { + fun testFilterIndexedToCollection() = runTest { repeat(3) { mod -> val c = mutableListOf() testList.asReceiveChannel().filterIndexedTo(c) { index, _ -> index % 2 == mod } @@ -433,17 +418,17 @@ class ChannelsTest { } @Test - fun testFilterIndexedToChannel() = runBlocking { + fun testFilterIndexedToChannel() = runTest { repeat(3) { mod -> val c = produce { - testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod } + testList.asReceiveChannel().filterIndexedTo(channel) { index, _ -> index % 2 == mod } } assertEquals(testList.filterIndexed { index, _ -> index % 2 == mod }, c.toList()) } } @Test - fun testTakeWhile() = runBlocking { + fun testTakeWhile() = runTest { repeat(3) { mod -> assertEquals(testList.takeWhile { it % 2 != mod }, testList.asReceiveChannel().takeWhile { it % 2 != mod }.toList()) @@ -451,7 +436,7 @@ class ChannelsTest { } @Test - fun testToChannel() = runBlocking { + fun testToChannel() = runTest { val c = produce { testList.asReceiveChannel().toChannel(channel) } @@ -459,20 +444,20 @@ class ChannelsTest { } @Test - fun testMapIndexed() = runBlocking { + fun testMapIndexed() = runTest { assertEquals(testList.mapIndexed { index, i -> index + i }, testList.asReceiveChannel().mapIndexed { index, i -> index + i }.toList()) } @Test - fun testMapIndexedToCollection() = runBlocking { + fun testMapIndexedToCollection() = runTest { val c = mutableListOf() testList.asReceiveChannel().mapIndexedTo(c) { index, i -> index + i } assertEquals(testList.mapIndexed { index, i -> index + i }, c) } @Test - fun testMapIndexedToSendChannel() = runBlocking { + fun testMapIndexedToSendChannel() = runTest { val c = produce { testList.asReceiveChannel().mapIndexedTo(channel) { index, i -> index + i } } @@ -480,7 +465,7 @@ class ChannelsTest { } @Test - fun testMapNotNull() = runBlocking { + fun testMapNotNull() = runTest { repeat(3) { mod -> assertEquals(testList.mapNotNull { i -> i.takeIf { i % 2 == mod } }, testList.asReceiveChannel().mapNotNull { i -> i.takeIf { i % 2 == mod } }.toList()) @@ -488,7 +473,7 @@ class ChannelsTest { } @Test - fun testMapNotNullToCollection() = runBlocking { + fun testMapNotNullToCollection() = runTest { repeat(3) { mod -> val c = mutableListOf() testList.asReceiveChannel().mapNotNullTo(c) { i -> i.takeIf { i % 2 == mod } } @@ -497,7 +482,7 @@ class ChannelsTest { } @Test - fun testMapNotNullToSendChannel() = runBlocking { + fun testMapNotNullToSendChannel() = runTest { repeat(3) { mod -> val c = produce { testList.asReceiveChannel().mapNotNullTo(channel) { i -> i.takeIf { i % 2 == mod } } @@ -507,7 +492,7 @@ class ChannelsTest { } @Test - fun testMapIndexedNotNull() = runBlocking { + fun testMapIndexedNotNull() = runTest { repeat(3) { mod -> assertEquals(testList.mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }, testList.asReceiveChannel().mapIndexedNotNull { index, i -> index.takeIf { i % 2 == mod } }.toList()) @@ -515,7 +500,7 @@ class ChannelsTest { } @Test - fun testMapIndexedNotNullToCollection() = runBlocking { + fun testMapIndexedNotNullToCollection() = runTest { repeat(3) { mod -> val c = mutableListOf() testList.asReceiveChannel().mapIndexedNotNullTo(c) { index, i -> index.takeIf { i % 2 == mod } } @@ -524,7 +509,7 @@ class ChannelsTest { } @Test - fun testMapIndexedNotNullToSendChannel() = runBlocking { + fun testMapIndexedNotNullToSendChannel() = runTest { repeat(3) { mod -> val c = produce { testList.asReceiveChannel().mapIndexedNotNullTo(channel) { index, i -> index.takeIf { i % 2 == mod } } @@ -534,50 +519,51 @@ class ChannelsTest { } @Test - fun testWithIndex() = runBlocking { + fun testWithIndex() = runTest { assertEquals(testList.withIndex().toList(), testList.asReceiveChannel().withIndex().toList()) } @Test - fun testMaxBy() = runBlocking { - assertEquals(testList.maxBy { 10 - Math.abs(it - 2) }, - testList.asReceiveChannel().maxBy { 10 - Math.abs(it - 2) }) + fun testMaxBy() = runTest { + assertEquals(testList.maxBy { 10 - abs(it - 2) }, + testList.asReceiveChannel().maxBy { 10 - abs(it - 2) }) } @Test - fun testMaxWith() = runBlocking { - val cmp = compareBy { 10 - Math.abs(it - 2) } + fun testMaxWith() = runTest { + val cmp = compareBy { 10 - abs(it - 2) } assertEquals(testList.maxWith(cmp), testList.asReceiveChannel().maxWith(cmp)) } @Test - fun testMinBy() = runBlocking { - assertEquals(testList.minBy { Math.abs(it - 2) }, - testList.asReceiveChannel().minBy { Math.abs(it - 2) }) + fun testMinBy() = runTest { + assertEquals(testList.minBy { abs(it - 2) }, + testList.asReceiveChannel().minBy { abs(it - 2) }) } @Test - fun testMinWith() = runBlocking { - val cmp = compareBy { Math.abs(it - 2) } + fun testMinWith() = runTest { + val cmp = compareBy { abs(it - 2) } assertEquals(testList.minWith(cmp), testList.asReceiveChannel().minWith(cmp)) } @Test - fun testSumBy() = runBlocking { + fun testSumBy() = runTest { assertEquals(testList.sumBy { it * 3 }, testList.asReceiveChannel().sumBy { it * 3 }) } @Test - fun testSumByDouble() = runBlocking { - assertEquals(testList.sumByDouble { it * 3.0 }, - testList.asReceiveChannel().sumByDouble { it * 3.0 }, 1e-10) + fun testSumByDouble() = runTest { + val expected = testList.sumByDouble { it * 3.0 } + val actual = testList.asReceiveChannel().sumByDouble { it * 3.0 } + assertEquals(expected, actual) } @Test - fun testRequireNoNulls() = runBlocking { + fun testRequireNoNulls() = runTest { assertEquals(testList.requireNoNulls(), testList.asReceiveChannel().requireNoNulls().toList()) } } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt similarity index 58% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt index 707b780d6e..4c04f8f268 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt @@ -17,85 +17,87 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* -import org.hamcrest.core.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* class ConflatedBroadcastChannelTest : TestBase() { + @Test - fun testBasicScenario() = runBlocking { + fun testBasicScenario() = runTest { expect(1) val broadcast = ConflatedBroadcastChannel() - assertThat(exceptionFrom { broadcast.value }, IsInstanceOf(IllegalStateException::class.java)) - assertThat(broadcast.valueOrNull, IsNull()) + assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException) + assertNull(broadcast.valueOrNull) + launch(coroutineContext, CoroutineStart.UNDISPATCHED) { expect(2) val sub = broadcast.openSubscription() - assertThat(sub.poll(), IsNull()) + assertNull(sub.poll()) expect(3) - assertThat(sub.receive(), IsEqual("one")) // suspends + assertEquals("one", sub.receive()) // suspends expect(6) - assertThat(sub.receive(), IsEqual("two")) // suspends + assertEquals("two", sub.receive()) // suspends expect(12) sub.close() expect(13) } + expect(4) broadcast.send("one") // does not suspend - assertThat(broadcast.value, IsEqual("one")) - assertThat(broadcast.valueOrNull, IsEqual("one")) + assertEquals("one", broadcast.value) + assertEquals("one", broadcast.valueOrNull) expect(5) yield() // to receiver expect(7) launch(coroutineContext, CoroutineStart.UNDISPATCHED) { expect(8) val sub = broadcast.openSubscription() - assertThat(sub.receive(), IsEqual("one")) // does not suspend + assertEquals("one", sub.receive()) // does not suspend expect(9) - assertThat(sub.receive(), IsEqual("two")) // suspends + assertEquals("two", sub.receive()) // suspends expect(14) - assertThat(sub.receive(), IsEqual("three")) // suspends + assertEquals("three", sub.receive()) // suspends expect(17) - assertThat(sub.receiveOrNull(), IsNull()) // suspends until closed + assertNull(sub.receiveOrNull()) // suspends until closed expect(20) sub.close() expect(21) } + expect(10) broadcast.send("two") // does not suspend - assertThat(broadcast.value, IsEqual("two")) - assertThat(broadcast.valueOrNull, IsEqual("two")) + assertEquals("two", broadcast.value) + assertEquals("two", broadcast.valueOrNull) expect(11) yield() // to both receivers expect(15) broadcast.send("three") // does not suspend - assertThat(broadcast.value, IsEqual("three")) - assertThat(broadcast.valueOrNull, IsEqual("three")) + assertEquals("three", broadcast.value) + assertEquals("three", broadcast.valueOrNull) expect(16) yield() // to second receiver expect(18) broadcast.close() - assertThat(exceptionFrom { broadcast.value }, IsInstanceOf(IllegalStateException::class.java)) - assertThat(broadcast.valueOrNull, IsNull()) + assertTrue(exceptionFromNotInline { broadcast.value } is IllegalStateException) + assertNull(broadcast.valueOrNull) expect(19) yield() // to second receiver - assertThat(exceptionFrom { broadcast.send("four") }, IsInstanceOf(ClosedSendChannelException::class.java)) + assertTrue(exceptionFrom { broadcast.send("four") } is ClosedSendChannelException) finish(22) } @Test - fun testInitialValueAndReceiveClosed() = runBlocking { + fun testInitialValueAndReceiveClosed() = runTest { expect(1) - val broadcast = ConflatedBroadcastChannel(1) - assertThat(broadcast.value, IsEqual(1)) - assertThat(broadcast.valueOrNull, IsEqual(1)) + val broadcast = ConflatedBroadcastChannel(1) + assertEquals(1, broadcast.value) + assertEquals(1, broadcast.valueOrNull) launch(coroutineContext, CoroutineStart.UNDISPATCHED) { expect(2) val sub = broadcast.openSubscription() - assertThat(sub.receive(), IsEqual(1)) + assertEquals(1, sub.receive()) expect(3) - assertThat(exceptionFrom { sub.receive() }, IsInstanceOf(ClosedReceiveChannelException::class.java)) // suspends + assertTrue(exceptionFrom { sub.receive() } is ClosedReceiveChannelException) // suspends expect(6) } expect(4) @@ -113,4 +115,14 @@ class ConflatedBroadcastChannelTest : TestBase() { return e } } -} \ No newline at end of file + + // Ugly workaround for bug in JS compiler + fun exceptionFromNotInline(block: () -> Unit): Throwable? { + try { + block() + return null + } catch (e: Throwable) { + return e + } + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt similarity index 71% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt index 6819170f0e..1fd7413495 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelTest.kt @@ -17,50 +17,49 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* -import org.hamcrest.core.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* class ConflatedChannelTest : TestBase() { + @Test fun testBasicConflationOfferPoll() { val q = ConflatedChannel() - assertThat(q.poll(), IsNull()) - assertThat(q.offer(1), IsEqual(true)) - assertThat(q.offer(2), IsEqual(true)) - assertThat(q.offer(3), IsEqual(true)) - assertThat(q.poll(), IsEqual(3)) - assertThat(q.poll(), IsNull()) + assertNull(q.poll()) + assertTrue(q.offer(1)) + assertTrue(q.offer(2)) + assertTrue(q.offer(3)) + assertEquals(3, q.poll()) + assertNull(q.poll()) } @Test - fun testConflatedSend() = runBlocking { + fun testConflatedSend() = runTest { val q = ConflatedChannel() q.send(1) q.send(2) // shall conflated previously sent - assertThat(q.receiveOrNull(), IsEqual(2)) + assertEquals(2, q.receiveOrNull()) } @Test - fun testConflatedClose() = runBlocking { + fun testConflatedClose() = runTest { val q = ConflatedChannel() q.send(1) q.close() // shall conflate sent item and become closed - assertThat(q.receiveOrNull(), IsNull()) + assertNull(q.receiveOrNull()) } @Test - fun testConflationSendReceive() = runBlocking { + fun testConflationSendReceive() = runTest { val q = ConflatedChannel() expect(1) launch(coroutineContext) { // receiver coroutine expect(4) - assertThat(q.receive(), IsEqual(2)) + assertEquals(2, q.receive()) expect(5) - assertThat(q.receive(), IsEqual(3)) // this receive suspends + assertEquals(3, q.receive()) // this receive suspends expect(8) - assertThat(q.receive(), IsEqual(6)) // last conflated value + assertEquals(6, q.receive()) // last conflated value expect(9) } expect(2) @@ -79,7 +78,7 @@ class ConflatedChannelTest : TestBase() { } @Test - fun testConsumeAll() = runBlocking { + fun testConsumeAll() = runTest { val q = ConflatedChannel() expect(1) for (i in 1..10) { diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt similarity index 72% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt index 95a6c119d9..897801e378 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/LinkedListChannelTest.kt @@ -17,29 +17,26 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.TestBase -import kotlinx.coroutines.experimental.runBlocking -import org.hamcrest.MatcherAssert.assertThat -import org.hamcrest.core.IsEqual -import org.hamcrest.core.IsNull -import org.junit.Test +import kotlin.test.* class LinkedListChannelTest : TestBase() { + @Test - fun testBasic() = runBlocking { + fun testBasic() = runTest { val c = LinkedListChannel() c.send(1) check(c.offer(2)) c.send(3) check(c.close()) check(!c.close()) - assertThat(c.receive(), IsEqual(1)) - assertThat(c.poll(), IsEqual(2)) - assertThat(c.receiveOrNull(), IsEqual(3)) - assertThat(c.receiveOrNull(), IsNull()) + assertEquals(1, c.receive()) + assertEquals(2, c.poll()) + assertEquals(3, c.receiveOrNull()) + assertNull(c.receiveOrNull()) } @Test - fun testConsumeAll() = runBlocking { + fun testConsumeAll() = runTest { val q = LinkedListChannel() for (i in 1..10) { q.send(i) // buffers @@ -49,4 +46,4 @@ class LinkedListChannelTest : TestBase() { check(q.isClosedForReceive) check(q.receiveOrNull() == null) } -} \ No newline at end of file +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt new file mode 100644 index 0000000000..6646aa61e4 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt @@ -0,0 +1,55 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class ProduceConsumeTest : TestBase() { + + @Test + fun testRendezvous() = runTest { + testProducer(1) + } + + @Test + fun testSmallBuffer() = runTest { + testProducer(1) + } + + @Test + fun testMediumBuffer() = runTest { + testProducer(10) + } + + @Test + fun testLargeMediumBuffer() = runTest { + testProducer(1000) + } + + @Test + fun testUnlimited() = runTest { + testProducer(Channel.UNLIMITED) + } + + private suspend fun testProducer(producerCapacity: Int) { + testProducer(1, producerCapacity) + testProducer(10, producerCapacity) + testProducer(100, producerCapacity) + } + + private suspend fun testProducer(messages: Int, producerCapacity: Int) { + var sentAll = false + val producer = produce(coroutineContext, capacity = producerCapacity) { + for (i in 1..messages) { + send(i) + } + sentAll = true + } + var consumed = 0 + for (x in producer) { + consumed++ + } + assertTrue(sentAll) + assertEquals(messages, consumed) + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt similarity index 98% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt index 641eff0f62..522f6d6f6b 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceTest.kt @@ -17,10 +17,11 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* -import org.junit.* import kotlin.coroutines.experimental.* +import kotlin.test.* class ProduceTest : TestBase() { + @Test fun testBasic() = runTest { val c = produce(coroutineContext) { @@ -62,4 +63,4 @@ class ProduceTest : TestBase() { check(c.receiveOrNull() == null) expect(6) } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt similarity index 85% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt index 7edbe42bf6..6e1b2c347a 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/RendezvousChannelTest.kt @@ -17,14 +17,13 @@ package kotlinx.coroutines.experimental.channels import kotlinx.coroutines.experimental.* -import org.hamcrest.core.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* class RendezvousChannelTest : TestBase() { + @Test - fun testSimple() = runBlocking { + fun testSimple() = runTest { val q = RendezvousChannel() check(q.isEmpty && q.isFull) expect(1) @@ -51,25 +50,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testStress() = runBlocking { - val n = 100_000 - val q = RendezvousChannel() - val sender = launch(coroutineContext) { - for (i in 1..n) q.send(i) - expect(2) - } - val receiver = launch(coroutineContext) { - for (i in 1..n) check(q.receive() == i) - expect(3) - } - expect(1) - sender.join() - receiver.join() - finish(4) - } - - @Test - fun testClosedReceiveOrNull() = runBlocking { + fun testClosedReceiveOrNull() = runTest { val q = RendezvousChannel() check(q.isEmpty && q.isFull && !q.isClosedForSend && !q.isClosedForReceive) expect(1) @@ -91,7 +72,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testClosedExceptions() = runBlocking { + fun testClosedExceptions() = runTest { val q = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -113,7 +94,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testOfferAndPool() = runBlocking { + fun testOfferAndPool() = runTest { val q = RendezvousChannel() assertFalse(q.offer(1)) expect(1) @@ -141,7 +122,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testIteratorClosed() = runBlocking { + fun testIteratorClosed() = runTest { val q = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -157,7 +138,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testIteratorOne() = runBlocking { + fun testIteratorOne() = runTest { val q = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -176,7 +157,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testIteratorOneWithYield() = runBlocking { + fun testIteratorOneWithYield() = runTest { val q = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -197,7 +178,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testIteratorTwo() = runBlocking { + fun testIteratorTwo() = runTest { val q = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -221,7 +202,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testIteratorTwoWithYield() = runBlocking { + fun testIteratorTwoWithYield() = runTest { val q = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -247,7 +228,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testSuspendSendOnClosedChannel() = runBlocking { + fun testSuspendSendOnClosedChannel() = runTest { val q = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -266,9 +247,9 @@ class RendezvousChannelTest : TestBase() { expect(7) yield() // try to resume sender (it will not resume despite the close!) expect(8) - assertThat(q.receiveOrNull(), IsEqual(42)) + assertEquals(42, q.receiveOrNull()) expect(9) - assertThat(q.receiveOrNull(), IsNull()) + assertNull(q.receiveOrNull()) expect(10) yield() // to sender, it was resumed! finish(12) @@ -281,7 +262,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testProduceBadClass() = runBlocking { + fun testProduceBadClass() = runTest { val bad = BadClass() val c = produce(coroutineContext) { expect(1) @@ -292,7 +273,7 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testConsumeAll() = runBlocking { + fun testConsumeAll() = runTest { val q = RendezvousChannel() for (i in 1..10) { launch(coroutineContext, CoroutineStart.UNDISPATCHED) { @@ -308,4 +289,4 @@ class RendezvousChannelTest : TestBase() { check(q.receiveOrNull() == null) finish(12) } -} \ No newline at end of file +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt new file mode 100644 index 0000000000..c85f541362 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveStressTest.kt @@ -0,0 +1,46 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class SendReceiveStressTest : TestBase() { + + // Emulate parametrized by hand :( + + @Test + fun testArrayChannel() = runTest { + testStress(ArrayChannel(2)) + } + + @Test + fun testLinkedListChannel() = runTest { + testStress(LinkedListChannel()) + } + + @Test + fun testRendezvousChannel() = runTest { + testStress(RendezvousChannel()) + } + + private suspend fun testStress(channel: Channel) { + val n = 1_000 // Do not increase, otherwise node.js will fail with timeout :( + val sender = launch(coroutineContext) { + for (i in 1..n) { + channel.send(i) + } + expect(2) + } + val receiver = launch(coroutineContext) { + for (i in 1..n) { + val next = channel.receive() + check(next == i) + } + expect(3) + } + expect(1) + sender.join() + receiver.join() + finish(4) + } +} diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt new file mode 100644 index 0000000000..69e939f2d9 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt @@ -0,0 +1,35 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +class SimpleSendReceiveTest : TestBase() { + + @Test + fun testSimpleSendReceive() = runTest { + // Parametrized common test :( + TestChannelKind.values().forEach { kind -> testSendReceive(kind, 100) } + } + + private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) { + val channel = kind.create() + + launch(coroutineContext) { + repeat(iterations) { channel.send(it) } + channel.close() + } + var expected = 0 + for (x in channel) { + if (!kind.isConflated) { + assertEquals(expected++, x) + } else { + assertTrue(x >= expected) + expected = x + 1 + } + } + if (!kind.isConflated) { + assertEquals(iterations, expected) + } + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt similarity index 95% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt index e025b195bd..60dbb97a5d 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestBroadcastChannelKind.kt @@ -18,15 +18,15 @@ package kotlinx.coroutines.experimental.channels enum class TestBroadcastChannelKind { ARRAY_1 { - override fun create(): BroadcastChannel = ArrayBroadcastChannel(1) + override fun create(): BroadcastChannel = ArrayBroadcastChannel(1) override fun toString(): String = "ArrayBroadcastChannel(1)" }, ARRAY_10 { - override fun create(): BroadcastChannel = ArrayBroadcastChannel(10) + override fun create(): BroadcastChannel = ArrayBroadcastChannel(10) override fun toString(): String = "ArrayBroadcastChannel(10)" }, CONFLATED { - override fun create(): BroadcastChannel = ConflatedBroadcastChannel() + override fun create(): BroadcastChannel = ConflatedBroadcastChannel() override fun toString(): String = "ConflatedBroadcastChannel" override val isConflated: Boolean get() = true } diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt similarity index 82% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt index 36fa8c39b1..c3ac904cd1 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt @@ -20,23 +20,23 @@ import kotlinx.coroutines.experimental.selects.SelectClause1 enum class TestChannelKind { RENDEZVOUS { - override fun create(): Channel = RendezvousChannel() + override fun create(): Channel = RendezvousChannel() override fun toString(): String = "RendezvousChannel" }, ARRAY_1 { - override fun create(): Channel = ArrayChannel(1) + override fun create(): Channel = ArrayChannel(1) override fun toString(): String = "ArrayChannel(1)" }, ARRAY_10 { - override fun create(): Channel = ArrayChannel(8) + override fun create(): Channel = ArrayChannel(8) override fun toString(): String = "ArrayChannel(8)" }, LINKED_LIST { - override fun create(): Channel = LinkedListChannel() + override fun create(): Channel = LinkedListChannel() override fun toString(): String = "LinkedListChannel" }, CONFLATED { - override fun create(): Channel = ConflatedChannel() + override fun create(): Channel = ConflatedChannel() override fun toString(): String = "ConflatedChannel" override val isConflated: Boolean get() = true }, @@ -66,8 +66,12 @@ private class ChannelViaBroadcast( override val isClosedForReceive: Boolean get() = sub.isClosedForReceive override val isEmpty: Boolean get() = sub.isEmpty - suspend override fun receive(): E = sub.receive() - suspend override fun receiveOrNull(): E? = sub.receiveOrNull() + + // Workaround for KT-23094 + override suspend fun send(element: E) = broadcast.send(element) + + override suspend fun receive(): E = sub.receive() + override suspend fun receiveOrNull(): E? = sub.receiveOrNull() override fun poll(): E? = sub.poll() override fun iterator(): ChannelIterator = sub.iterator() override fun cancel(cause: Throwable?): Boolean = sub.cancel(cause) diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvm.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvm.kt new file mode 100644 index 0000000000..7596e62cb9 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvm.kt @@ -0,0 +1,22 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* + +// -------- Operations on SendChannel -------- + +/** + * Adds [element] into to this channel, **blocking** the caller while this channel [Channel.isFull], + * or throws exception if the channel [Channel.isClosedForSend] (see [Channel.close] for details). + * + * This is a way to call [Channel.send] method inside a blocking code using [runBlocking], + * so this function should not be used from coroutine. + */ +public fun SendChannel.sendBlocking(element: E) { + // fast path + if (offer(element)) + return + // slow path + runBlocking { + send(element) + } +} diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt new file mode 100644 index 0000000000..bb365527d0 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt @@ -0,0 +1,5 @@ +package kotlinx.coroutines.experimental.internal + +actual fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int){ + System.arraycopy(source, srcPos, destination, destinationStart, length) +} \ No newline at end of file diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt new file mode 100644 index 0000000000..9552fc2706 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt @@ -0,0 +1,3 @@ +package kotlinx.coroutines.experimental.internal + +actual typealias Closeable = java.io.Closeable diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt new file mode 100644 index 0000000000..1eef996481 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt @@ -0,0 +1,10 @@ +package kotlinx.coroutines.experimental.internal + +import java.util.concurrent.* +import kotlin.concurrent.withLock as withLockJvm + +actual fun subscriberList(): MutableList = CopyOnWriteArrayList() + +actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock + +actual inline fun ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action) diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt index 057ce9d43a..b99b3fcb41 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.kt @@ -42,7 +42,7 @@ internal val LIST_EMPTY: Any = Symbol("LIST_EMPTY") private val REMOVE_PREPARED: Any = Symbol("REMOVE_PREPARED") /** @suppress **This is unstable API and it is subject to change.** */ -public typealias RemoveFirstDesc = LockFreeLinkedListNode.RemoveFirstDesc +public actual typealias RemoveFirstDesc = LockFreeLinkedListNode.RemoveFirstDesc /** @suppress **This is unstable API and it is subject to change.** */ public actual typealias AddLastDesc = LockFreeLinkedListNode.AddLastDesc @@ -101,7 +101,7 @@ public actual open class LockFreeLinkedListNode { public actual val isRemoved: Boolean get() = next is Removed // LINEARIZABLE. Returns Node | Removed - public val next: Any get() { + public actual val next: Any get() { _next.loop { next -> if (next !is OpDescriptor) return next next.perform(this) @@ -111,7 +111,7 @@ public actual open class LockFreeLinkedListNode { public actual val nextNode: Node get() = next.unwrap() // LINEARIZABLE. Returns Node | Removed - public val prev: Any get() { + public actual val prev: Any get() { _prev.loop { prev -> if (prev is Removed) return prev prev as Node // otherwise, it can be only node @@ -311,7 +311,7 @@ public actual open class LockFreeLinkedListNode { // ------ multi-word atomic operations helpers ------ - public open class AddLastDesc( + public open class AddLastDesc constructor( @JvmField val queue: Node, @JvmField val node: T ) : AbstractAtomicDesc() { diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelStressTest.kt new file mode 100644 index 0000000000..dd77e1efa4 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannelStressTest.kt @@ -0,0 +1,39 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import org.junit.* +import org.junit.runner.* +import org.junit.runners.* + +@RunWith(Parameterized::class) +class ArrayChannelStressTest(private val capacity: Int) : TestBase() { + + companion object { + @Parameterized.Parameters(name = "{0}, nSenders={1}, nReceivers={2}") + @JvmStatic + fun params(): Collection> = listOf(1, 10, 100, 100_000, 1_000_000).map { arrayOf(it) } + } + + @Test + fun testStress() = runTest { + val n = 100_000 * stressTestMultiplier + val q = ArrayChannel(capacity) + val sender = launch(kotlin.coroutines.experimental.coroutineContext) { + for (i in 1..n) { + q.send(i) + } + expect(2) + } + val receiver = launch(kotlin.coroutines.experimental.coroutineContext) { + for (i in 1..n) { + val next = q.receive() + check(next == i) + } + expect(3) + } + expect(1) + sender.join() + receiver.join() + finish(4) + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvmTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvmTest.kt new file mode 100644 index 0000000000..6a5e509bbe --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ChannelsJvmTest.kt @@ -0,0 +1,21 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import org.junit.Test +import kotlin.test.* + +class ChannelsJvmTest : TestBase() { + + @Test + fun testBlocking() { + val ch = Channel() + val sum = async { + ch.sumBy { it } + } + repeat(10) { + ch.sendBlocking(it) + } + ch.close() + assertEquals(45, runBlocking { sum.await() }) + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt index 1e4682f762..90a254a429 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/DoubleChannelCloseStressTest.kt @@ -35,4 +35,4 @@ class DoubleChannelCloseStressTest : TestBase() { actor.close() } } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeJvmTest.kt similarity index 96% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt rename to core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeJvmTest.kt index 8aea78f7a6..9fde116b9e 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ProduceConsumeJvmTest.kt @@ -24,7 +24,7 @@ import org.junit.runners.* import kotlin.coroutines.experimental.* @RunWith(Parameterized::class) -class ProduceConsumeTest( +class ProduceConsumeJvmTest( private val capacity: Int, private val number: Int ) : TestBase() { @@ -43,7 +43,7 @@ class ProduceConsumeTest( fun testProducer() = runTest { var sentAll = false val producer = produce(coroutineContext, capacity = capacity) { - for(i in 1..number) { + for (i in 1..number) { send(i) } sentAll = true @@ -72,4 +72,4 @@ class ProduceConsumeTest( actor.close() assertEquals(number, received.await()) } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RandevouzChannelStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RandevouzChannelStressTest.kt new file mode 100644 index 0000000000..0faa693042 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/RandevouzChannelStressTest.kt @@ -0,0 +1,26 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import org.junit.* +import kotlin.coroutines.experimental.* + +class RandevouzChannelStressTest : TestBase() { + + @Test + fun testStress() = runTest { + val n = 100_000 * stressTestMultiplier + val q = RendezvousChannel() + val sender = launch(coroutineContext) { + for (i in 1..n) q.send(i) + expect(2) + } + val receiver = launch(coroutineContext) { + for (i in 1..n) check(q.receive() == i) + expect(3) + } + expect(1) + sender.join() + receiver.join() + finish(4) + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveJvmStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveJvmStressTest.kt new file mode 100644 index 0000000000..4cea191d7e --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SendReceiveJvmStressTest.kt @@ -0,0 +1,45 @@ +package kotlinx.coroutines.experimental.channels + +import kotlinx.coroutines.experimental.* +import org.junit.runner.* +import org.junit.runners.* +import kotlin.coroutines.experimental.* +import kotlin.test.* + +@RunWith(Parameterized::class) +class SendReceiveJvmStressTest(private val channel: Channel) : TestBase() { + + companion object { + @Parameterized.Parameters(name = "{0}") + @JvmStatic + fun params(): Collection> = listOf( + ArrayChannel(1), + ArrayChannel(10), + ArrayChannel(1_000_000), + LinkedListChannel(), + RendezvousChannel() + ).map { arrayOf(it) } + } + + @Test + fun testStress() = runTest { + val n = 100_000 * stressTestMultiplier + val sender = launch(coroutineContext) { + for (i in 1..n) { + channel.send(i) + } + expect(2) + } + val receiver = launch(coroutineContext) { + for (i in 1..n) { + val next = channel.receive() + check(next == i) + } + expect(3) + } + expect(1) + sender.join() + receiver.join() + finish(4) + } +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveJvmTest.kt similarity index 98% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt rename to core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveJvmTest.kt index feb06e0438..73639ac3c5 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveTest.kt +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/SimpleSendReceiveJvmTest.kt @@ -25,7 +25,7 @@ import org.junit.runners.* import kotlin.coroutines.experimental.* @RunWith(Parameterized::class) -class SimpleSendReceiveTest( +class SimpleSendReceiveJvmTest( val kind: TestChannelKind, val n: Int, val concurrent: Boolean @@ -64,4 +64,4 @@ class SimpleSendReceiveTest( assertThat(expected, IsEqual(n)) } } -} \ No newline at end of file +} diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt index 37a04377aa..6894ac00a5 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/Exceptions.kt @@ -30,7 +30,7 @@ public actual class CompletionHandlerException public actual constructor( * **It is not printed to console/log by default uncaught exception handler**. * (see [handleCoroutineException]). */ -public actual open class CancellationException actual constructor(message: String) : IllegalStateException(message) +public actual open class CancellationException actual constructor(message: String?) : IllegalStateException(message) /** * Thrown by cancellable suspending functions if the [Job] of the coroutine is cancelled or completed diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt new file mode 100644 index 0000000000..133cb542e1 --- /dev/null +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt @@ -0,0 +1,9 @@ +package kotlinx.coroutines.experimental.internal + + +actual fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int) { + var destinationIndex = destinationStart + for (sourceIndex in srcPos until srcPos + length) { + destination[destinationIndex++] = source[sourceIndex] + } +} diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt new file mode 100644 index 0000000000..7888c8e3ba --- /dev/null +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt @@ -0,0 +1,5 @@ +package kotlinx.coroutines.experimental.internal + +actual interface Closeable { + actual fun close() +} diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt new file mode 100644 index 0000000000..20dcd6bb4c --- /dev/null +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt @@ -0,0 +1,12 @@ +package kotlinx.coroutines.experimental.internal + +actual typealias ReentrantLock = NoOpLock + +actual inline fun ReentrantLock.withLock(action: () -> T) = action() + +public class NoOpLock { + fun tryLock() = true + fun unlock(): Unit {} +} + +actual fun subscriberList(): MutableList = ArrayList() diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt index 453bf84f02..2569249418 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt @@ -16,6 +16,8 @@ package kotlinx.coroutines.experimental.internal +import kotlinx.coroutines.experimental.channels.* + private typealias Node = LinkedListNode /** @suppress **This is unstable API and it is subject to change.** */ @@ -31,6 +33,9 @@ public open class LinkedListNode { @PublishedApi internal var _prev = this @PublishedApi internal var _removed: Boolean = false + public val prev: Any get() = _prev + public val next: Any get() = _next + public inline val nextNode get() = _next public inline val prevNode get() = _prev public inline val isRemoved get() = _removed @@ -107,6 +112,21 @@ public actual open class AddLastDesc actual constructor( protected override val affectedNode: Node get() = queue._prev protected actual override fun onPrepare(affected: Node, next: Node): Any? = null protected override fun onComplete() = queue.addLast(node) + protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit +} + +public actual open class RemoveFirstDesc actual constructor( + actual val queue: LockFreeLinkedListNode +) : AbstractAtomicDesc() { + + @Suppress("UNCHECKED_CAST") + public actual val result: T get() = affectedNode as T + protected override val affectedNode: Node get() = queue._prev + + protected actual open fun validatePrepared(node: T): Boolean = true + protected actual final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? = null + protected override fun onComplete(): Unit { queue.removeFirstOrNull() } + protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit } /** @suppress **This is unstable API and it is subject to change.** */ @@ -116,6 +136,10 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() { protected abstract fun onComplete() actual final override fun prepare(op: AtomicOp<*>): Any? = onPrepare(affectedNode, affectedNode._next) actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() + + protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails + protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds + protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } /** @suppress **This is unstable API and it is subject to change.** */ diff --git a/js/kotlinx-coroutines-core-js/src/test/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopyKtTest.kt b/js/kotlinx-coroutines-core-js/src/test/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopyKtTest.kt new file mode 100644 index 0000000000..564969ac78 --- /dev/null +++ b/js/kotlinx-coroutines-core-js/src/test/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopyKtTest.kt @@ -0,0 +1,14 @@ +package kotlinx.coroutines.experimental.internal + +import kotlin.test.* + +class ArrayCopyTest { + + @Test + fun testArrayCopy() { + val source = Array(10, { it }) + val destination = arrayOfNulls(7) + arraycopy(source, 2, destination, 1, 5) + assertEquals(listOf(null, 2, 3, 4, 5, 6, null), destination.toList()) + } +} From e866fb3bd32128787a381ae9034b84533cdb6e5c Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Sun, 22 Apr 2018 18:26:14 +0300 Subject: [PATCH 2/4] Properly implement select clause for JS channels --- .../experimental/internal/Atomic.kt | 4 +- .../channels/ConflatedBroadcastChannelTest.kt | 2 +- .../selects/SelectArrayChannelTest.kt | 42 +++++------ .../experimental/selects/SelectBiasTest.kt | 9 ++- .../selects/SelectRendezvousChannelTest.kt | 43 +++++------ .../selects/SelectChannelStressTest.kt | 72 +++++++++++++++++++ .../experimental/internal/LinkedList.kt | 26 ++++--- 7 files changed, 140 insertions(+), 58 deletions(-) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt (86%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt (90%) rename {core/kotlinx-coroutines-core => common/kotlinx-coroutines-core-common}/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt (87%) create mode 100644 core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectChannelStressTest.kt diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt index ce19e05c1c..77b4175c3b 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Atomic.kt @@ -66,8 +66,10 @@ public abstract class AtomicOp : OpDescriptor() { final override fun perform(affected: Any?): Any? { // make decision on status var decision = this._consensus.value - if (decision === NO_DECISION) + if (decision === NO_DECISION) { decision = decide(prepare(affected as T)) + } + complete(affected as T, decision) return decision } diff --git a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt index 4c04f8f268..2cd539b5e3 100644 --- a/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedBroadcastChannelTest.kt @@ -116,7 +116,7 @@ class ConflatedBroadcastChannelTest : TestBase() { } } - // Ugly workaround for bug in JS compiler + // Workaround for KT-23921 fun exceptionFromNotInline(block: () -> Unit): Throwable? { try { block() diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt similarity index 86% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt index 54aaae0ca5..6e6c50818b 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectArrayChannelTest.kt @@ -19,13 +19,13 @@ package kotlinx.coroutines.experimental.selects import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.channels.* import kotlinx.coroutines.experimental.intrinsics.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* class SelectArrayChannelTest : TestBase() { + @Test - fun testSelectSendSuccess() = runBlocking { + fun testSelectSendSuccess() = runTest { expect(1) val channel = ArrayChannel(1) launch(coroutineContext) { @@ -44,7 +44,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectSendSuccessWithDefault() = runBlocking { + fun testSelectSendSuccessWithDefault() = runTest { expect(1) val channel = ArrayChannel(1) launch(coroutineContext) { @@ -66,7 +66,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectSendReceiveBuf() = runBlocking { + fun testSelectSendReceiveBuf() = runTest { expect(1) val channel = ArrayChannel(1) select { @@ -85,7 +85,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectSendWait() = runBlocking { + fun testSelectSendWait() = runTest { expect(1) val channel = ArrayChannel(1) launch(coroutineContext) { @@ -107,7 +107,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveSuccess() = runBlocking { + fun testSelectReceiveSuccess() = runTest { expect(1) val channel = ArrayChannel(1) channel.send("OK") @@ -122,7 +122,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveSuccessWithDefault() = runBlocking { + fun testSelectReceiveSuccessWithDefault() = runTest { expect(1) val channel = ArrayChannel(1) channel.send("OK") @@ -140,7 +140,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveWaitWithDefault() = runBlocking { + fun testSelectReceiveWaitWithDefault() = runTest { expect(1) val channel = ArrayChannel(1) select { @@ -170,7 +170,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveWait() = runBlocking { + fun testSelectReceiveWait() = runTest { expect(1) val channel = ArrayChannel(1) launch(coroutineContext) { @@ -188,8 +188,8 @@ class SelectArrayChannelTest : TestBase() { finish(6) } - @Test(expected = ClosedReceiveChannelException::class) - fun testSelectReceiveClosed() = runBlocking { + @Test + fun testSelectReceiveClosed() = runTest({it is ClosedReceiveChannelException}) { expect(1) val channel = ArrayChannel(1) channel.close() @@ -202,8 +202,8 @@ class SelectArrayChannelTest : TestBase() { expectUnreached() } - @Test(expected = ClosedReceiveChannelException::class) - fun testSelectReceiveWaitClosed() = runBlocking { + @Test + fun testSelectReceiveWaitClosed() = runTest({it is ClosedReceiveChannelException}) { expect(1) val channel = ArrayChannel(1) launch(coroutineContext) { @@ -221,9 +221,9 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectSendResourceCleanup() = runBlocking { + fun testSelectSendResourceCleanup() = runTest { val channel = ArrayChannel(1) - val n = 10_000_000 * stressTestMultiplier + val n = 1000 expect(1) channel.send(-1) // fill the buffer, so all subsequent sends cannot proceed repeat(n) { i -> @@ -236,9 +236,9 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveResourceCleanup() = runBlocking { + fun testSelectReceiveResourceCleanup() = runTest { val channel = ArrayChannel(1) - val n = 10_000_000 * stressTestMultiplier + val n = 1000 expect(1) repeat(n) { i -> select { @@ -250,7 +250,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveDispatchNonSuspending() = runBlocking { + fun testSelectReceiveDispatchNonSuspending() = runTest { val channel = ArrayChannel(1) expect(1) channel.send(42) @@ -272,7 +272,7 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveDispatchNonSuspending2() = runBlocking { + fun testSelectReceiveDispatchNonSuspending2() = runTest { val channel = ArrayChannel(1) expect(1) channel.send(42) @@ -303,4 +303,4 @@ class SelectArrayChannelTest : TestBase() { if (!trySelect(null)) return block.startCoroutineUndispatched(this) } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt similarity index 90% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt index eee7283897..20322cde1f 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectBiasTest.kt @@ -17,15 +17,14 @@ package kotlinx.coroutines.experimental.selects import kotlinx.coroutines.experimental.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* -class SelectBiasTest { +class SelectBiasTest : TestBase() { val n = 10_000 @Test - fun testBiased() = runBlocking { + fun testBiased() = runTest { val d0 = async(coroutineContext) { 0 } val d1 = async(coroutineContext) { 1 } val counter = IntArray(2) @@ -41,7 +40,7 @@ class SelectBiasTest { } @Test - fun testUnbiased() = runBlocking { + fun testUnbiased() = runTest { val d0 = async(coroutineContext) { 0 } val d1 = async(coroutineContext) { 1 } val counter = IntArray(2) diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt similarity index 87% rename from core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt rename to common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt index d71be6ae2a..04cde2a449 100644 --- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt +++ b/common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectRendezvousChannelTest.kt @@ -13,19 +13,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 package kotlinx.coroutines.experimental.selects import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.channels.* import kotlinx.coroutines.experimental.intrinsics.* -import org.junit.* -import org.junit.Assert.* import kotlin.coroutines.experimental.* +import kotlin.test.* class SelectRendezvousChannelTest : TestBase() { + @Test - fun testSelectSendSuccess() = runBlocking { + fun testSelectSendSuccess() = runTest { expect(1) val channel = RendezvousChannel() launch(coroutineContext) { @@ -44,7 +45,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectSendSuccessWithDefault() = runBlocking { + fun testSelectSendSuccessWithDefault() = runTest { expect(1) val channel = RendezvousChannel() launch(coroutineContext) { @@ -66,7 +67,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectSendWaitWithDefault() = runBlocking { + fun testSelectSendWaitWithDefault() = runTest { expect(1) val channel = RendezvousChannel() select { @@ -92,7 +93,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectSendWait() = runBlocking { + fun testSelectSendWait() = runTest { expect(1) val channel = RendezvousChannel() launch(coroutineContext) { @@ -110,7 +111,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveSuccess() = runBlocking { + fun testSelectReceiveSuccess() = runTest { expect(1) val channel = RendezvousChannel() launch(coroutineContext) { @@ -130,7 +131,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveSuccessWithDefault() = runBlocking { + fun testSelectReceiveSuccessWithDefault() = runTest { expect(1) val channel = RendezvousChannel() launch(coroutineContext) { @@ -153,7 +154,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveWaitWithDefault() = runBlocking { + fun testSelectReceiveWaitWithDefault() = runTest { expect(1) val channel = RendezvousChannel() select { @@ -179,7 +180,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveWait() = runBlocking { + fun testSelectReceiveWait() = runTest { expect(1) val channel = RendezvousChannel() launch(coroutineContext) { @@ -197,8 +198,8 @@ class SelectRendezvousChannelTest : TestBase() { finish(6) } - @Test(expected = ClosedReceiveChannelException::class) - fun testSelectReceiveClosed() = runBlocking { + @Test + fun testSelectReceiveClosed() = runTest(expected = { it is ClosedReceiveChannelException }) { expect(1) val channel = RendezvousChannel() channel.close() @@ -211,8 +212,8 @@ class SelectRendezvousChannelTest : TestBase() { expectUnreached() } - @Test(expected = ClosedReceiveChannelException::class) - fun testSelectReceiveWaitClosed() = runBlocking { + @Test + fun testSelectReceiveWaitClosed() = runTest(expected = {it is ClosedReceiveChannelException}) { expect(1) val channel = RendezvousChannel() launch(coroutineContext) { @@ -230,9 +231,9 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectSendResourceCleanup() = runBlocking { + fun testSelectSendResourceCleanup() = runTest { val channel = RendezvousChannel() - val n = 10_000_000 * stressTestMultiplier + val n = 1_000 expect(1) repeat(n) { i -> select { @@ -244,9 +245,9 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveResourceCleanup() = runBlocking { + fun testSelectReceiveResourceCleanup() = runTest { val channel = RendezvousChannel() - val n = 10_000_000 * stressTestMultiplier + val n = 1_000 expect(1) repeat(n) { i -> select { @@ -258,7 +259,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectAtomicFailure() = runBlocking { + fun testSelectAtomicFailure() = runTest { val c1 = RendezvousChannel() val c2 = RendezvousChannel() expect(1) @@ -289,7 +290,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectWaitDispatch() = runBlocking { + fun testSelectWaitDispatch() = runTest { val c = RendezvousChannel() expect(1) launch(coroutineContext) { @@ -323,4 +324,4 @@ class SelectRendezvousChannelTest : TestBase() { if (!trySelect(null)) return block.startCoroutineUndispatched(this) } -} \ No newline at end of file +} diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectChannelStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectChannelStressTest.kt new file mode 100644 index 0000000000..097fe85a39 --- /dev/null +++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/selects/SelectChannelStressTest.kt @@ -0,0 +1,72 @@ +package kotlinx.coroutines.experimental.selects + +import kotlinx.coroutines.experimental.* +import kotlinx.coroutines.experimental.channels.* +import kotlinx.coroutines.experimental.intrinsics.* +import kotlin.test.* + +class SelectChannelStressTest: TestBase() { + + @Test + fun testSelectSendResourceCleanupArrayChannel() = runTest { + val channel = ArrayChannel(1) + val n = 10_000_000 * stressTestMultiplier + expect(1) + channel.send(-1) // fill the buffer, so all subsequent sends cannot proceed + repeat(n) { i -> + select { + channel.onSend(i) { expectUnreached() } + default { expect(i + 2) } + } + } + finish(n + 2) + } + + @Test + fun testSelectReceiveResourceCleanupArrayChannel() = runTest { + val channel = ArrayChannel(1) + val n = 10_000_000 * stressTestMultiplier + expect(1) + repeat(n) { i -> + select { + channel.onReceive { expectUnreached() } + default { expect(i + 2) } + } + } + finish(n + 2) + } + + @Test + fun testSelectSendResourceCleanupRendezvousChannel() = runTest { + val channel = RendezvousChannel() + val n = 1_000_000 * stressTestMultiplier + expect(1) + repeat(n) { i -> + select { + channel.onSend(i) { expectUnreached() } + default { expect(i + 2) } + } + } + finish(n + 2) + } + + @Test + fun testSelectReceiveResourceRendezvousChannel() = runTest { + val channel = RendezvousChannel() + val n = 1_000_000 * stressTestMultiplier + expect(1) + repeat(n) { i -> + select { + channel.onReceive { expectUnreached() } + default { expect(i + 2) } + } + } + finish(n + 2) + } + + internal fun SelectBuilder.default(block: suspend () -> R) { + this as SelectBuilderImpl // type assertion + if (!trySelect(null)) return + block.startCoroutineUndispatched(this) + } +} diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt index 2569249418..1980ff5a5f 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt @@ -16,8 +16,6 @@ package kotlinx.coroutines.experimental.internal -import kotlinx.coroutines.experimental.channels.* - private typealias Node = LinkedListNode /** @suppress **This is unstable API and it is subject to change.** */ @@ -121,11 +119,14 @@ public actual open class RemoveFirstDesc actual constructor( @Suppress("UNCHECKED_CAST") public actual val result: T get() = affectedNode as T - protected override val affectedNode: Node get() = queue._prev - + protected override val affectedNode: Node = queue.next as Node protected actual open fun validatePrepared(node: T): Boolean = true - protected actual final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? = null - protected override fun onComplete(): Unit { queue.removeFirstOrNull() } + protected actual final override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? { + @Suppress("UNCHECKED_CAST") + validatePrepared(affectedNode as T) + return null + } + protected override fun onComplete() { queue.removeFirstOrNull() } protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit } @@ -134,10 +135,17 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() { protected abstract val affectedNode: Node protected actual abstract fun onPrepare(affected: Node, next: Node): Any? protected abstract fun onComplete() - actual final override fun prepare(op: AtomicOp<*>): Any? = onPrepare(affectedNode, affectedNode._next) - actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() - protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails + actual final override fun prepare(op: AtomicOp<*>): Any? { + val affected = affectedNode + val next = affected._next + val failure = failure(affected, next) + if (failure != null) return failure + return onPrepare(affected, next) + } + + actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete() + protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } From 5e79739fc2d6d7c5681adbea1e53c0181cccd412 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Sun, 22 Apr 2018 18:47:40 +0300 Subject: [PATCH 3/4] Update readme for JS channels --- js/kotlinx-coroutines-core-js/README.md | 50 +++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/js/kotlinx-coroutines-core-js/README.md b/js/kotlinx-coroutines-core-js/README.md index 21ed982ad9..552d4e0502 100644 --- a/js/kotlinx-coroutines-core-js/README.md +++ b/js/kotlinx-coroutines-core-js/README.md @@ -8,6 +8,8 @@ Coroutine builder functions: | ------------- | ------------- | ---------------- | --------------- | [launch] | [Job] | [CoroutineScope] | Launches coroutine that does not have any result | [async] | [Deferred] | [CoroutineScope] | Returns a single value with the future result +| [produce][kotlinx.coroutines.experimental.channels.produce] | [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [ProducerScope][kotlinx.coroutines.experimental.channels.ProducerScope] | Produces a stream of elements + Coroutine dispatchers implementing [CoroutineDispatcher]: @@ -23,6 +25,14 @@ More context elements: | [NonCancellable] | A non-cancelable job that is always active | [CoroutineExceptionHandler] | Handler for uncaught exception +Synchronization primitives for coroutines: + +| **Name** | **Suspending functions** | **Description** +| ---------- | ----------------------------------------------------------- | --------------- +| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | Mutual exclusion +| [Channel][kotlinx.coroutines.experimental.channels.Channel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send], [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | Communication channel (aka queue or exchanger) + + Top-level suspending functions: | **Name** | **Description** @@ -37,6 +47,19 @@ Cancellation support for user-defined suspending functions is available with [su helper function. [NonCancellable] job object is provided to suppress cancellation with `run(NonCancellable) {...}` block of code. +[Select][kotlinx.coroutines.experimental.selects.select] expression waits for the result of multiple suspending functions simultaneously: + +| **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version** +| ---------------- | --------------------------------------------- | ------------------------------------------------ | -------------------------- +| [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted] +| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] +| [SendChannel][kotlinx.coroutines.experimental.channels.SendChannel] | [send][kotlinx.coroutines.experimental.channels.SendChannel.send] | [onSend][kotlinx.coroutines.experimental.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.experimental.channels.SendChannel.offer] +| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receive][kotlinx.coroutines.experimental.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll] +| [ReceiveChannel][kotlinx.coroutines.experimental.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.experimental.channels.ReceiveChannel.poll] +| [Mutex][kotlinx.coroutines.experimental.sync.Mutex] | [lock][kotlinx.coroutines.experimental.sync.Mutex.lock] | [onLock][kotlinx.coroutines.experimental.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.experimental.sync.Mutex.tryLock] +| none | [delay] | [onTimeout][kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout] | none + + # Package kotlinx.coroutines.experimental General-purpose coroutine builders, contexts, and helper functions. @@ -59,4 +82,31 @@ General-purpose coroutine builders, contexts, and helper functions. [withTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout.html [withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/with-timeout-or-null.html [suspendCancellableCoroutine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/suspend-cancellable-coroutine.html +[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/join.html +[Job.onJoin]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/on-join.html +[Job.isCompleted]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-job/is-completed.html +[Deferred.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/await.html +[Deferred.onAwait]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/on-await.html + +[kotlinx.coroutines.experimental.sync.Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/index.html +[kotlinx.coroutines.experimental.sync.Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/lock.html +[kotlinx.coroutines.experimental.sync.Mutex.onLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/on-lock.html +[kotlinx.coroutines.experimental.sync.Mutex.tryLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.sync/-mutex/try-lock.html + +[kotlinx.coroutines.experimental.channels.produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html +[kotlinx.coroutines.experimental.channels.ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/index.html +[kotlinx.coroutines.experimental.channels.ProducerScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-producer-scope/index.html +[kotlinx.coroutines.experimental.channels.Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/index.html +[kotlinx.coroutines.experimental.channels.SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html +[kotlinx.coroutines.experimental.channels.ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html +[kotlinx.coroutines.experimental.channels.SendChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/index.html +[kotlinx.coroutines.experimental.channels.SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/on-send.html +[kotlinx.coroutines.experimental.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/offer.html +[kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive.html +[kotlinx.coroutines.experimental.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/poll.html +[kotlinx.coroutines.experimental.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive-or-null.html +[kotlinx.coroutines.experimental.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/on-receive-or-null.html + +[kotlinx.coroutines.experimental.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/select.html +[kotlinx.coroutines.experimental.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.selects/-select-builder/on-timeout.html From d2029b8004a9c51bb87c3b44c261c492b4bdbe95 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Thu, 26 Apr 2018 10:11:50 +0300 Subject: [PATCH 4/4] Hide declarations in internal package --- .../experimental/channels/AbstractChannel.kt | 16 ++++++++-------- .../experimental/internal/ArrayCopy.common.kt | 2 +- .../experimental/internal/Closeable.common.kt | 8 +++++++- .../experimental/internal/Concurrent.common.kt | 8 ++++---- .../internal/LockFreeLinkedList.common.kt | 1 + .../coroutines/experimental/internal/Symbol.kt | 2 +- .../experimental/internal/ArrayCopy.kt | 2 +- .../experimental/internal/Closeable.kt | 7 ++++++- .../experimental/internal/Concurrent.kt | 7 ++++--- .../experimental/internal/ArrayCopy.kt | 3 +-- .../experimental/internal/Closeable.kt | 9 +++++++-- .../experimental/internal/Concurrent.kt | 8 ++++---- .../experimental/internal/LinkedList.kt | 1 + 13 files changed, 46 insertions(+), 28 deletions(-) diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt index 5fcfc87bf2..697c6ef5c5 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt @@ -935,28 +935,28 @@ public abstract class AbstractChannel : AbstractSendChannel(), Channel } /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS") +@JvmField internal val OFFER_SUCCESS: Any = Symbol("OFFER_SUCCESS") /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val OFFER_FAILED: Any = Symbol("OFFER_FAILED") +@JvmField internal val OFFER_FAILED: Any = Symbol("OFFER_FAILED") /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val POLL_FAILED: Any = Symbol("POLL_FAILED") +@JvmField internal val POLL_FAILED: Any = Symbol("POLL_FAILED") /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED") +@JvmField internal val ENQUEUE_FAILED: Any = Symbol("ENQUEUE_FAILED") /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val SELECT_STARTED: Any = Symbol("SELECT_STARTED") +@JvmField internal val SELECT_STARTED: Any = Symbol("SELECT_STARTED") /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val NULL_VALUE: Any = Symbol("NULL_VALUE") +@JvmField internal val NULL_VALUE: Any = Symbol("NULL_VALUE") /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED") +@JvmField internal val CLOSE_RESUMED: Any = Symbol("CLOSE_RESUMED") /** @suppress **This is unstable API and it is subject to change.** */ -@JvmField val SEND_RESUMED = Symbol("SEND_RESUMED") +@JvmField internal val SEND_RESUMED = Symbol("SEND_RESUMED") /** * Represents sending waiter in the queue. diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt index fe20163456..eac8bbfbc5 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.common.kt @@ -3,4 +3,4 @@ package kotlinx.coroutines.experimental.internal /** * Cross-platform array copy. Overlaps of source and destination are not supported */ -expect fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int) \ No newline at end of file +internal expect fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int) \ No newline at end of file diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt index 2b1f504b2b..3004a10782 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.common.kt @@ -1,5 +1,11 @@ package kotlinx.coroutines.experimental.internal -expect interface Closeable { +/** + * Closeable entity. + * @suppress **Deprecated** + */ +@Deprecated("No replacement, see specific use") +public expect interface Closeable { + @Deprecated("No replacement, see specific code") fun close() } diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt index df36305a29..3141212b94 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.common.kt @@ -6,13 +6,13 @@ package kotlinx.coroutines.experimental.internal * * Note that this alias is intentionally not named as CopyOnWriteList to avoid accidental misusage outside of ArrayBroadcastChannel */ -typealias SubscribersList = MutableList +internal typealias SubscribersList = MutableList -expect fun subscriberList(): SubscribersList +internal expect fun subscriberList(): SubscribersList -expect class ReentrantLock() { +internal expect class ReentrantLock() { fun tryLock(): Boolean fun unlock(): Unit } -expect inline fun ReentrantLock.withLock(action: () -> T): T +internal expect inline fun ReentrantLock.withLock(action: () -> T): T diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt index e23fd988db..aab23d1cf0 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeLinkedList.common.kt @@ -60,6 +60,7 @@ public expect open class AddLastDesc( override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) } +/** @suppress **This is unstable API and it is subject to change.** */ public expect open class RemoveFirstDesc(queue: LockFreeLinkedListNode): AbstractAtomicDesc { val queue: LockFreeLinkedListNode public val result: T diff --git a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt index 530d9d7942..6f391e9fbd 100644 --- a/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt +++ b/common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/internal/Symbol.kt @@ -21,6 +21,6 @@ package kotlinx.coroutines.experimental.internal * * @suppress **This is unstable API and it is subject to change.** */ -public class Symbol(val symbol: String) { +internal class Symbol(val symbol: String) { override fun toString(): String = symbol } diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt index bb365527d0..5d3b74d7d6 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt @@ -1,5 +1,5 @@ package kotlinx.coroutines.experimental.internal -actual fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int){ +internal actual fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int){ System.arraycopy(source, srcPos, destination, destinationStart, length) } \ No newline at end of file diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt index 9552fc2706..86a7bbbf59 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt @@ -1,3 +1,8 @@ package kotlinx.coroutines.experimental.internal -actual typealias Closeable = java.io.Closeable +/** + * Closeable entity. + * @suppress **Deprecated** + */ +@Deprecated("No replacement, see specific use") +public actual typealias Closeable = java.io.Closeable diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt index 1eef996481..0422d2b0c9 100644 --- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt +++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt @@ -3,8 +3,9 @@ package kotlinx.coroutines.experimental.internal import java.util.concurrent.* import kotlin.concurrent.withLock as withLockJvm -actual fun subscriberList(): MutableList = CopyOnWriteArrayList() +internal actual fun subscriberList(): MutableList = CopyOnWriteArrayList() -actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock +@Suppress("ACTUAL_WITHOUT_EXPECT") +internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock -actual inline fun ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action) +internal actual inline fun ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action) diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt index 133cb542e1..1cab961929 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/ArrayCopy.kt @@ -1,7 +1,6 @@ package kotlinx.coroutines.experimental.internal - -actual fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int) { +internal actual fun arraycopy(source: Array, srcPos: Int, destination: Array, destinationStart: Int, length: Int) { var destinationIndex = destinationStart for (sourceIndex in srcPos until srcPos + length) { destination[destinationIndex++] = source[sourceIndex] diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt index 7888c8e3ba..938767ca43 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Closeable.kt @@ -1,5 +1,10 @@ package kotlinx.coroutines.experimental.internal -actual interface Closeable { - actual fun close() +/** + * Closeable entity. + * @suppress **Deprecated** + */ +@Deprecated("No replacement, see specific use") +public actual interface Closeable { + public actual fun close() } diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt index 20dcd6bb4c..76c1a33f77 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/Concurrent.kt @@ -1,12 +1,12 @@ package kotlinx.coroutines.experimental.internal -actual typealias ReentrantLock = NoOpLock +internal actual typealias ReentrantLock = NoOpLock -actual inline fun ReentrantLock.withLock(action: () -> T) = action() +internal actual inline fun ReentrantLock.withLock(action: () -> T) = action() -public class NoOpLock { +internal class NoOpLock { fun tryLock() = true fun unlock(): Unit {} } -actual fun subscriberList(): MutableList = ArrayList() +internal actual fun subscriberList(): MutableList = ArrayList() diff --git a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt index 1980ff5a5f..30c3f3c1d0 100644 --- a/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt +++ b/js/kotlinx-coroutines-core-js/src/main/kotlin/kotlinx/coroutines/experimental/internal/LinkedList.kt @@ -113,6 +113,7 @@ public actual open class AddLastDesc actual constructor( protected actual override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) = Unit } +/** @suppress **This is unstable API and it is subject to change.** */ public actual open class RemoveFirstDesc actual constructor( actual val queue: LockFreeLinkedListNode ) : AbstractAtomicDesc() {