diff --git a/benchmarks/src/jmh/kotlin/benchmarks/CancellableContinuationBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/CancellableContinuationBenchmark.kt deleted file mode 100644 index 99c0f04902..0000000000 --- a/benchmarks/src/jmh/kotlin/benchmarks/CancellableContinuationBenchmark.kt +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package benchmarks - -import kotlinx.coroutines.* -import org.openjdk.jmh.annotations.* -import java.util.concurrent.* -import kotlin.coroutines.* -import kotlin.coroutines.intrinsics.* - -@Warmup(iterations = 5) -@Measurement(iterations = 10) -@BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -@State(Scope.Benchmark) -@Fork(2) -open class CancellableContinuationBenchmark { - - @Benchmark - fun awaitWithSuspension(): Int { - val deferred = CompletableDeferred() - return run(allowSuspend = true) { deferred.await() } - } - - @Benchmark - fun awaitNoSuspension(): Int { - val deferred = CompletableDeferred(1) - return run { deferred.await() } - } - - private fun run(allowSuspend: Boolean = false, block: suspend () -> Int): Int { - val value = block.startCoroutineUninterceptedOrReturn(EmptyContinuation) - if (value === COROUTINE_SUSPENDED) { - if (!allowSuspend) { - throw IllegalStateException("Unexpected suspend") - } else { - return -1 - } - } - - return value as Int - } - - object EmptyContinuation : Continuation { - override val context: CoroutineContext - get() = EmptyCoroutineContext - - override fun resumeWith(result: Result) { - } - } -} diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt index e037069d22..e0bc2fcc48 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt @@ -13,22 +13,6 @@ import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* import java.util.concurrent.* -/* - * Results: - * - * // Throw FlowAborted overhead - * Numbers.primes avgt 7 3039.185 ± 25.598 us/op - * Numbers.primesRx avgt 7 2677.937 ± 17.720 us/op - * - * // On par - * Numbers.transformations avgt 7 16.207 ± 0.133 us/op - * Numbers.transformationsRx avgt 7 19.626 ± 0.135 us/op - * - * // Channels overhead - * Numbers.zip avgt 7 434.160 ± 7.014 us/op - * Numbers.zipRx avgt 7 87.898 ± 5.007 us/op - * - */ @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Fork(value = 1) @@ -39,11 +23,11 @@ open class NumbersBenchmark { companion object { private const val primes = 100 - private const val natural = 1000 + private const val natural = 1000L } - private fun numbers() = flow { - for (i in 2L..Long.MAX_VALUE) emit(i) + private fun numbers(limit: Long = Long.MAX_VALUE) = flow { + for (i in 2L..limit) emit(i) } private fun primesFlow(): Flow = flow { @@ -80,7 +64,7 @@ open class NumbersBenchmark { @Benchmark fun zip() = runBlocking { - val numbers = numbers().take(natural) + val numbers = numbers(natural) val first = numbers .filter { it % 2L != 0L } .map { it * it } @@ -105,8 +89,7 @@ open class NumbersBenchmark { @Benchmark fun transformations(): Int = runBlocking { - numbers() - .take(natural) + numbers(natural) .filter { it % 2L != 0L } .map { it * it } .filter { (it + 1) % 3 == 0L }.count() @@ -120,4 +103,4 @@ open class NumbersBenchmark { .filter { (it + 1) % 3 == 0L }.count() .blockingGet() } -} +} \ No newline at end of file diff --git a/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt b/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt new file mode 100644 index 0000000000..ee1ef724cb --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannel.kt @@ -0,0 +1,98 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.tailcall + +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.coroutines.intrinsics.* + +@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") +public abstract class SimpleChannel { + companion object { + const val NULL_SURROGATE: Int = -1 + } + + @JvmField + protected var producer: Continuation? = null + @JvmField + protected var enqueuedValue: Int = NULL_SURROGATE + @JvmField + protected var consumer: Continuation? = null + + suspend fun send(element: Int) { + require(element != NULL_SURROGATE) + if (offer(element)) { + return + } + + return suspendSend(element) + } + + private fun offer(element: Int): Boolean { + if (consumer == null) { + return false + } + + consumer!!.resume(element) + consumer = null + return true + } + + suspend fun receive(): Int { + // Cached value + if (enqueuedValue != NULL_SURROGATE) { + val result = enqueuedValue + enqueuedValue = NULL_SURROGATE + producer!!.resume(Unit) + return result + } + + return suspendReceive() + } + + abstract suspend fun suspendReceive(): Int + abstract suspend fun suspendSend(element: Int) +} + +class NonCancellableChannel : SimpleChannel() { + override suspend fun suspendReceive(): Int = suspendCoroutineUninterceptedOrReturn { + consumer = it.intercepted() + COROUTINE_SUSPENDED + } + + override suspend fun suspendSend(element: Int) = suspendCoroutineUninterceptedOrReturn { + enqueuedValue = element + producer = it.intercepted() + COROUTINE_SUSPENDED + } +} + +class CancellableChannel : SimpleChannel() { + override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine { + consumer = it.intercepted() + COROUTINE_SUSPENDED + } + + override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine { + enqueuedValue = element + producer = it.intercepted() + COROUTINE_SUSPENDED + } +} + +class CancellableReusableChannel : SimpleChannel() { + @Suppress("INVISIBLE_MEMBER") + override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable { + consumer = it.intercepted() + COROUTINE_SUSPENDED + } + + @Suppress("INVISIBLE_MEMBER") + override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable { + enqueuedValue = element + producer = it.intercepted() + COROUTINE_SUSPENDED + } +} \ No newline at end of file diff --git a/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannelBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannelBenchmark.kt new file mode 100644 index 0000000000..09ff7697f6 --- /dev/null +++ b/benchmarks/src/jmh/kotlin/benchmarks/tailcall/SimpleChannelBenchmark.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package benchmarks.tailcall + +import kotlinx.coroutines.* +import org.openjdk.jmh.annotations.* +import java.util.concurrent.* + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +open class SimpleChannelBenchmark { + + private val iterations = 10_000 + + @Volatile + private var sink: Int = 0 + + @Benchmark + fun cancellable() = runBlocking { + val ch = CancellableChannel() + launch { + repeat(iterations) { ch.send(it) } + } + + launch { + repeat(iterations) { sink = ch.receive() } + } + } + + @Benchmark + fun cancellableReusable() = runBlocking { + val ch = CancellableReusableChannel() + launch { + repeat(iterations) { ch.send(it) } + } + + launch { + repeat(iterations) { sink = ch.receive() } + } + } + + @Benchmark + fun nonCancellable() = runBlocking { + val ch = NonCancellableChannel() + launch { + repeat(iterations) { ch.send(it) } + } + + launch { + repeat(iterations) { + sink = ch.receive() + } + } + } +} diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index 492e367bb0..d1e99529a5 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -223,6 +223,40 @@ public suspend inline fun suspendAtomicCancellableCoroutine( cancellable.getResult() } +/** + * Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible. + */ +internal suspend inline fun suspendAtomicCancellableCoroutineReusable( + crossinline block: (CancellableContinuation) -> Unit +): T = suspendCoroutineUninterceptedOrReturn { uCont -> + val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) + block(cancellable) + cancellable.getResult() + } + +internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { + // If used outside of our dispatcher + if (delegate !is DispatchedContinuation) { + return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT) + } + /* + * Attempt to claim reusable instance. + * + * suspendAtomicCancellableCoroutineReusable { // <- claimed + * // Any asynchronous cancellation is "postponed" while this block + * // is being executed + * } // postponed cancellation is checked here. + * + * Claim can fail for the following reasons: + * 1) Someone tried to make idempotent resume. + * Idempotent resume is internal (used only by us) and is used only in `select`, + * thus leaking CC instance for indefinite time. + * 2) Continuation was cancelled. Then we should prevent any further reuse and bail out. + */ + return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() } + ?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT) +} + /** * @suppress **Deprecated** */ diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 40344c9c17..6cd28f16a9 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -78,10 +78,33 @@ internal open class CancellableContinuationImpl( // This method does nothing. Leftover for binary compatibility with old compiled code } - // It is only invoked from an internal getResult function, so we can be sure it is not invoked twice - private fun installParentCancellationHandler() { - if (isCompleted) return // fast path 1 -- don't need to do anything if already completed - val parent = delegate.context[Job] ?: return // fast path 2 -- don't do anything without parent + private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable + + /** + * Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work. + * Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state. + */ + internal fun resetState(): Boolean { + assert { parentHandle !== NonDisposableHandle } + val state = _state.value + assert { state !is NotCompleted } + if (state is CompletedIdempotentResult) { + detachChild() + return false + } + _decision.value = UNDECIDED + _state.value = Active + return true + } + + /** + * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. + * It is only invoked from an internal [getResult] function. + */ + private fun setupCancellation() { + if (checkCompleted()) return + if (parentHandle !== null) return // fast path 2 -- was already initialized + val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent parent.start() // make sure the parent is started val handle = parent.invokeOnCompletion( onCancelling = true, @@ -89,12 +112,25 @@ internal open class CancellableContinuationImpl( ) parentHandle = handle // now check our state _after_ registering (could have completed while we were registering) - if (isCompleted) { + // Also note that we do not dispose parent for reusable continuations, dispatcher will do that for us + if (isCompleted && !isReusable()) { handle.dispose() // it is Ok to call dispose twice -- here and in disposeParentHandle parentHandle = NonDisposableHandle // release it just in case, to aid GC } } + private fun checkCompleted(): Boolean { + val completed = isCompleted + if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations + val dispatched = delegate as? DispatchedContinuation<*> ?: return completed + val cause = dispatched.checkPostponedCancellation(this) ?: return completed + if (!completed) { + // Note: this cancel may fail if one more concurrent cancel is currently being invoked + cancel(cause) + } + return true + } + public override val callerFrame: CoroutineStackFrame? get() = delegate as? CoroutineStackFrame @@ -110,6 +146,15 @@ internal open class CancellableContinuationImpl( } } + /* + * Attempt to postpone cancellation for reusable cancellable continuation + */ + private fun cancelLater(cause: Throwable): Boolean { + if (resumeMode != MODE_ATOMIC_DEFAULT) return false + val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false + return dispatched.postponeCancellation(cause) + } + public override fun cancel(cause: Throwable?): Boolean { _state.loop { state -> if (state !is NotCompleted) return false // false if already complete or cancelling @@ -119,12 +164,19 @@ internal open class CancellableContinuationImpl( // Invoke cancel handler if it was present if (state is CancelHandler) invokeHandlerSafely { state.invoke(cause) } // Complete state update - disposeParentHandle() + detachChildIfNonResuable() dispatchResume(mode = MODE_ATOMIC_DEFAULT) return true } } + internal fun parentCancelled(cause: Throwable) { + if (cancelLater(cause)) return + cancel(cause) + // Even if cancellation has failed, we should detach child to avoid potential leak + detachChildIfNonResuable() + } + private inline fun invokeHandlerSafely(block: () -> Unit) { try { block() @@ -165,7 +217,7 @@ internal open class CancellableContinuationImpl( @PublishedApi internal fun getResult(): Any? { - installParentCancellationHandler() + setupCancellation() if (trySuspend()) return COROUTINE_SUSPENDED // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state val state = this.state @@ -256,7 +308,7 @@ internal open class CancellableContinuationImpl( when (state) { is NotCompleted -> { if (!_state.compareAndSet(state, proposedUpdate)) return@loop // retry on cas failure - disposeParentHandle() + detachChildIfNonResuable() dispatchResume(resumeMode) return null } @@ -278,11 +330,19 @@ internal open class CancellableContinuationImpl( } // Unregister from parent job - private fun disposeParentHandle() { - parentHandle?.let { // volatile read parentHandle (once) - it.dispose() - parentHandle = NonDisposableHandle // release it just in case, to aid GC - } + private fun detachChildIfNonResuable() { + // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end + if (!isReusable()) detachChild() + } + + /** + * Detaches from the parent. + * Invariant: used used from [CoroutineDispatcher.releaseInterceptedContinuation] iff [isReusable] is `true` + */ + internal fun detachChild() { + val handle = parentHandle + handle?.dispose() + parentHandle = NonDisposableHandle } override fun tryResume(value: T, idempotent: Any?): Any? { @@ -292,7 +352,7 @@ internal open class CancellableContinuationImpl( val update: Any? = if (idempotent == null) value else CompletedIdempotentResult(idempotent, value, state) if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure - disposeParentHandle() + detachChildIfNonResuable() return state } is CompletedIdempotentResult -> { @@ -314,7 +374,7 @@ internal open class CancellableContinuationImpl( is NotCompleted -> { val update = CompletedExceptionally(exception) if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure - disposeParentHandle() + detachChildIfNonResuable() return state } else -> return null // cannot resume -- not active anymore diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index df7a2daac1..f08f8f782f 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -101,6 +101,11 @@ public abstract class CoroutineDispatcher : public final override fun interceptContinuation(continuation: Continuation): Continuation = DispatchedContinuation(this, continuation) + @InternalCoroutinesApi + public override fun releaseInterceptedContinuation(continuation: Continuation<*>) { + (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild() + } + /** * @suppress **Error**: Operator '+' on two CoroutineDispatcher objects is meaningless. * CoroutineDispatcher is a coroutine context element and `+` is a set-sum operator for coroutine contexts. diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index d7ca5f6750..18f5ed6cca 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -1415,7 +1415,7 @@ internal class ChildContinuation( @JvmField val child: CancellableContinuationImpl<*> ) : JobCancellingNode(parent) { override fun invoke(cause: Throwable?) { - child.cancel(child.getContinuationCancellationCause(job)) + child.parentCancelled(child.getContinuationCancellationCause(job)) } override fun toString(): String = "ChildContinuation[$child]" diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index be18942d9e..9b887a5bdd 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -178,7 +178,7 @@ internal abstract class AbstractSendChannel : SendChannel { return closed.sendException } - private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont -> + private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutineReusable sc@ { cont -> loop@ while (true) { if (full) { val send = SendElement(element, cont) @@ -555,7 +555,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutine sc@ { cont -> + private suspend fun receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont -> val receive = ReceiveElement(cont as CancellableContinuation, receiveMode) while (true) { if (enqueueReceive(receive)) { @@ -833,7 +833,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel + private suspend fun hasNextSuspend(): Boolean = suspendAtomicCancellableCoroutineReusable sc@ { cont -> val receive = ReceiveHasNext(this, cont) while (true) { if (channel.enqueueReceive(receive)) { diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt new file mode 100644 index 0000000000..bb5e312410 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -0,0 +1,275 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* +import kotlin.jvm.* + +@SharedImmutable +private val UNDEFINED = Symbol("UNDEFINED") +@SharedImmutable +@JvmField +internal val REUSABLE_CLAIMED = Symbol("REUSABLE_CLAIMED") + +internal class DispatchedContinuation( + @JvmField val dispatcher: CoroutineDispatcher, + @JvmField val continuation: Continuation +) : DispatchedTask(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation by continuation { + @JvmField + @Suppress("PropertyName") + internal var _state: Any? = UNDEFINED + override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame + override fun getStackTraceElement(): StackTraceElement? = null + @JvmField // pre-cached value to avoid ctx.fold on every resumption + internal val countOrElement = threadContextElements(context) + + /** + * Possible states of reusability: + * + * 1) `null`. Cancellable continuation wasn't yet attempted to be reused or + * was used and then invalidated (e.g. because of the cancellation). + * 2) [CancellableContinuation]. Continuation to be/that is being reused. + * 3) [REUSABLE_CLAIMED]. CC is currently being reused and its owner executes `suspend` block: + * ``` + * // state == null | CC + * suspendAtomicCancellableCoroutineReusable { cont -> + * // state == REUSABLE_CLAIMED + * block(cont) + * } + * // state == CC + * ``` + * 4) [Throwable] continuation was cancelled with this cause while being in [suspendAtomicCancellableCoroutineReusable], + * [CancellableContinuationImpl.getResult] will check for cancellation later. + * + * [REUSABLE_CLAIMED] state is required to prevent the lost resume in the channel. + * AbstractChannel.receive method relies on the fact that the following pattern + * ``` + * suspendAtomicCancellableCoroutineReusable { cont -> + * val result = pollFastPath() + * if (result != null) cont.resume(result) + * } + * ``` + * always succeeds. + * To make it always successful, we actually postpone "reusable" cancellation + * to this phase and set cancellation only at the moment of instantiation. + */ + private val _reusableCancellableContinuation = atomic(null) + + public val reusableCancellableContinuation: CancellableContinuationImpl<*>? + get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*> + + public val isReusable: Boolean + get() = _reusableCancellableContinuation.value != null + + /** + * Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block, + * so all cancellations will be postponed. + */ + @Suppress("UNCHECKED_CAST") + fun claimReusableCancellableContinuation(): CancellableContinuationImpl? { + /* + * Transitions: + * 1) `null` -> claimed, caller will instantiate CC instance + * 2) `CC` -> claimed, caller will reuse CC instance + */ + _reusableCancellableContinuation.loop { state -> + when { + state === null -> { + /* + * null -> CC was not yet published -> we do not compete with cancel + * -> can use plain store instead of CAS + */ + _reusableCancellableContinuation.value = REUSABLE_CLAIMED + return null + } + state is CancellableContinuationImpl<*> -> { + if (_reusableCancellableContinuation.compareAndSet(state, REUSABLE_CLAIMED)) { + return state as CancellableContinuationImpl + } + } + else -> error("Inconsistent state $state") + } + } + } + + /** + * Checks whether there were any attempts to cancel reusable CC while it was in [REUSABLE_CLAIMED] state + * and returns cancellation cause if so, `null` otherwise. + * If continuation was cancelled, it becomes non-reusable. + * + * ``` + * suspendAtomicCancellableCoroutineReusable { // <- claimed + * // Any asynchronous cancellation is "postponed" while this block + * // is being executed + * } // postponed cancellation is checked here in `getResult` + * ``` + * + * See [CancellableContinuationImpl.getResult]. + */ + fun checkPostponedCancellation(continuation: CancellableContinuation<*>): Throwable? { + _reusableCancellableContinuation.loop { state -> + // not when(state) to avoid Intrinsics.equals call + when { + state === REUSABLE_CLAIMED -> { + if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, continuation)) return null + } + state === null -> return null + state is Throwable -> { + require(_reusableCancellableContinuation.compareAndSet(state, null)) + return state + } + else -> error("Inconsistent state $state") + } + } + } + + /** + * Tries to postpone cancellation if reusable CC is currently in [REUSABLE_CLAIMED] state. + * Returns `true` if cancellation is (or previously was) postponed, `false` otherwise. + */ + fun postponeCancellation(cause: Throwable): Boolean { + _reusableCancellableContinuation.loop { state -> + when (state) { + REUSABLE_CLAIMED -> { + if (_reusableCancellableContinuation.compareAndSet(REUSABLE_CLAIMED, cause)) + return true + } + is Throwable -> return true + else -> { + // Invalidate + if (_reusableCancellableContinuation.compareAndSet(state, null)) + return false + } + } + } + } + + override fun takeState(): Any? { + val state = _state + assert { state !== UNDEFINED } // fail-fast if repeatedly invoked + _state = UNDEFINED + return state + } + + override val delegate: Continuation + get() = this + + override fun resumeWith(result: Result) { + val context = continuation.context + val state = result.toState() + if (dispatcher.isDispatchNeeded(context)) { + _state = state + resumeMode = MODE_ATOMIC_DEFAULT + dispatcher.dispatch(context, this) + } else { + executeUnconfined(state, MODE_ATOMIC_DEFAULT) { + withCoroutineContext(this.context, countOrElement) { + continuation.resumeWith(result) + } + } + } + } + + @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack + inline fun resumeCancellable(value: T) { + if (dispatcher.isDispatchNeeded(context)) { + _state = value + resumeMode = MODE_CANCELLABLE + dispatcher.dispatch(context, this) + } else { + executeUnconfined(value, MODE_CANCELLABLE) { + if (!resumeCancelled()) { + resumeUndispatched(value) + } + } + } + } + + @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack + inline fun resumeCancellableWithException(exception: Throwable) { + val context = continuation.context + val state = CompletedExceptionally(exception) + if (dispatcher.isDispatchNeeded(context)) { + _state = CompletedExceptionally(exception) + resumeMode = MODE_CANCELLABLE + dispatcher.dispatch(context, this) + } else { + executeUnconfined(state, MODE_CANCELLABLE) { + if (!resumeCancelled()) { + resumeUndispatchedWithException(exception) + } + } + } + } + + @Suppress("NOTHING_TO_INLINE") + inline fun resumeCancelled(): Boolean { + val job = context[Job] + if (job != null && !job.isActive) { + resumeWithException(job.getCancellationException()) + return true + } + + return false + } + + @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack + inline fun resumeUndispatched(value: T) { + withCoroutineContext(context, countOrElement) { + continuation.resume(value) + } + } + + @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack + inline fun resumeUndispatchedWithException(exception: Throwable) { + withCoroutineContext(context, countOrElement) { + continuation.resumeWithStackTrace(exception) + } + } + + // used by "yield" implementation + internal fun dispatchYield(value: T) { + val context = continuation.context + _state = value + resumeMode = MODE_CANCELLABLE + dispatcher.dispatchYield(context, this) + } + + override fun toString(): String = + "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]" +} + +internal fun DispatchedContinuation.yieldUndispatched(): Boolean = + executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) { + run() + } + +/** + * Executes given [block] as part of current event loop, updating current continuation + * mode and state if continuation is not resumed immediately. + * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). + * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. + */ +private inline fun DispatchedContinuation<*>.executeUnconfined( + contState: Any?, mode: Int, doYield: Boolean = false, + block: () -> Unit +): Boolean { + val eventLoop = ThreadLocalEventLoop.eventLoop + // If we are yielding and unconfined queue is empty, we can bail out as part of fast path + if (doYield && eventLoop.isUnconfinedQueueEmpty) return false + return if (eventLoop.isUnconfinedLoopActive) { + // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow + _state = contState + resumeMode = mode + eventLoop.dispatchUnconfined(this) + true // queued into the active loop + } else { + // Was not active -- run event loop until all unconfined tasks are executed + runUnconfinedEventLoop(eventLoop, block = block) + false + } +} diff --git a/kotlinx-coroutines-core/common/src/Dispatched.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt similarity index 58% rename from kotlinx-coroutines-core/common/src/Dispatched.kt rename to kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt index a9624bd86e..eb72b5a95a 100644 --- a/kotlinx-coroutines-core/common/src/Dispatched.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -8,198 +8,6 @@ import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.jvm.* -@Suppress("PrivatePropertyName") -@SharedImmutable -private val UNDEFINED = Symbol("UNDEFINED") - -/** - * Executes given [block] as part of current event loop, updating current continuation - * mode and state if continuation is not resumed immediately. - * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). - * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. - */ -private inline fun DispatchedContinuation<*>.executeUnconfined( - contState: Any?, mode: Int, doYield: Boolean = false, - block: () -> Unit -) : Boolean { - val eventLoop = ThreadLocalEventLoop.eventLoop - // If we are yielding and unconfined queue is empty, we can bail out as part of fast path - if (doYield && eventLoop.isUnconfinedQueueEmpty) return false - return if (eventLoop.isUnconfinedLoopActive) { - // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow - _state = contState - resumeMode = mode - eventLoop.dispatchUnconfined(this) - true // queued into the active loop - } else { - // Was not active -- run event loop until all unconfined tasks are executed - runUnconfinedEventLoop(eventLoop, block = block) - false - } -} - -private fun DispatchedTask<*>.resumeUnconfined() { - val eventLoop = ThreadLocalEventLoop.eventLoop - if (eventLoop.isUnconfinedLoopActive) { - // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow - eventLoop.dispatchUnconfined(this) - } else { - // Was not active -- run event loop until all unconfined tasks are executed - runUnconfinedEventLoop(eventLoop) { - resume(delegate, MODE_UNDISPATCHED) - } - } -} - -private inline fun DispatchedTask<*>.runUnconfinedEventLoop( - eventLoop: EventLoop, - block: () -> Unit -) { - eventLoop.incrementUseCount(unconfined = true) - try { - block() - while (true) { - // break when all unconfined continuations where executed - if (!eventLoop.processUnconfinedEvent()) break - } - } catch (e: Throwable) { - /* - * This exception doesn't happen normally, only if we have a bug in implementation. - * Report it as a fatal exception. - */ - handleFatalException(e, null) - } finally { - eventLoop.decrementUseCount(unconfined = true) - } -} - -internal class DispatchedContinuation( - @JvmField val dispatcher: CoroutineDispatcher, - @JvmField val continuation: Continuation -) : DispatchedTask(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation by continuation { - @JvmField - @Suppress("PropertyName") - internal var _state: Any? = UNDEFINED - override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame - override fun getStackTraceElement(): StackTraceElement? = null - @JvmField // pre-cached value to avoid ctx.fold on every resumption - internal val countOrElement = threadContextElements(context) - - override fun takeState(): Any? { - val state = _state - assert { state !== UNDEFINED } // fail-fast if repeatedly invoked - _state = UNDEFINED - return state - } - - override val delegate: Continuation - get() = this - - override fun resumeWith(result: Result) { - val context = continuation.context - val state = result.toState() - if (dispatcher.isDispatchNeeded(context)) { - _state = state - resumeMode = MODE_ATOMIC_DEFAULT - dispatcher.dispatch(context, this) - } else { - executeUnconfined(state, MODE_ATOMIC_DEFAULT) { - withCoroutineContext(this.context, countOrElement) { - continuation.resumeWith(result) - } - } - } - } - - @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack - inline fun resumeCancellable(value: T) { - if (dispatcher.isDispatchNeeded(context)) { - _state = value - resumeMode = MODE_CANCELLABLE - dispatcher.dispatch(context, this) - } else { - executeUnconfined(value, MODE_CANCELLABLE) { - if (!resumeCancelled()) { - resumeUndispatched(value) - } - } - } - } - - @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack - inline fun resumeCancellableWithException(exception: Throwable) { - val context = continuation.context - val state = CompletedExceptionally(exception) - if (dispatcher.isDispatchNeeded(context)) { - _state = CompletedExceptionally(exception) - resumeMode = MODE_CANCELLABLE - dispatcher.dispatch(context, this) - } else { - executeUnconfined(state, MODE_CANCELLABLE) { - if (!resumeCancelled()) { - resumeUndispatchedWithException(exception) - } - } - } - } - - @Suppress("NOTHING_TO_INLINE") - inline fun resumeCancelled(): Boolean { - val job = context[Job] - if (job != null && !job.isActive) { - resumeWithException(job.getCancellationException()) - return true - } - - return false - } - - @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack - inline fun resumeUndispatched(value: T) { - withCoroutineContext(context, countOrElement) { - continuation.resume(value) - } - } - - @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack - inline fun resumeUndispatchedWithException(exception: Throwable) { - withCoroutineContext(context, countOrElement) { - continuation.resumeWithStackTrace(exception) - } - } - - // used by "yield" implementation - internal fun dispatchYield(value: T) { - val context = continuation.context - _state = value - resumeMode = MODE_CANCELLABLE - dispatcher.dispatchYield(context, this) - } - - override fun toString(): String = - "DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]" -} - -internal fun Continuation.resumeCancellable(value: T) = when (this) { - is DispatchedContinuation -> resumeCancellable(value) - else -> resume(value) -} - -internal fun Continuation.resumeCancellableWithException(exception: Throwable) = when (this) { - is DispatchedContinuation -> resumeCancellableWithException(exception) - else -> resumeWithStackTrace(exception) -} - -internal fun Continuation.resumeDirect(value: T) = when (this) { - is DispatchedContinuation -> continuation.resume(value) - else -> resume(value) -} - -internal fun Continuation.resumeDirectWithException(exception: Throwable) = when (this) { - is DispatchedContinuation -> continuation.resumeWithStackTrace(exception) - else -> resumeWithStackTrace(exception) -} - internal abstract class DispatchedTask( @JvmField public var resumeMode: Int ) : SchedulerTask() { @@ -281,11 +89,6 @@ internal abstract class DispatchedTask( } } -internal fun DispatchedContinuation.yieldUndispatched(): Boolean = - executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) { - run() - } - internal fun DispatchedTask.dispatch(mode: Int = MODE_CANCELLABLE) { val delegate = this.delegate if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { @@ -320,6 +123,61 @@ internal fun DispatchedTask.resume(delegate: Continuation, useMode: In } } +private fun DispatchedTask<*>.resumeUnconfined() { + val eventLoop = ThreadLocalEventLoop.eventLoop + if (eventLoop.isUnconfinedLoopActive) { + // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow + eventLoop.dispatchUnconfined(this) + } else { + // Was not active -- run event loop until all unconfined tasks are executed + runUnconfinedEventLoop(eventLoop) { + resume(delegate, MODE_UNDISPATCHED) + } + } +} + +internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( + eventLoop: EventLoop, + block: () -> Unit +) { + eventLoop.incrementUseCount(unconfined = true) + try { + block() + while (true) { + // break when all unconfined continuations where executed + if (!eventLoop.processUnconfinedEvent()) break + } + } catch (e: Throwable) { + /* + * This exception doesn't happen normally, only if we have a bug in implementation. + * Report it as a fatal exception. + */ + handleFatalException(e, null) + } finally { + eventLoop.decrementUseCount(unconfined = true) + } +} + + +internal fun Continuation.resumeCancellable(value: T) = when (this) { + is DispatchedContinuation -> resumeCancellable(value) + else -> resume(value) +} + +internal fun Continuation.resumeCancellableWithException(exception: Throwable) = when (this) { + is DispatchedContinuation -> resumeCancellableWithException(exception) + else -> resumeWithStackTrace(exception) +} + +internal fun Continuation.resumeDirect(value: T) = when (this) { + is DispatchedContinuation -> continuation.resume(value) + else -> resume(value) +} + +internal fun Continuation.resumeDirectWithException(exception: Throwable) = when (this) { + is DispatchedContinuation -> continuation.resumeWithStackTrace(exception) + else -> resumeWithStackTrace(exception) +} @Suppress("NOTHING_TO_INLINE") internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) { diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 3c72915379..ea75e1fe28 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -187,7 +187,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { return lockSuspend(owner) } - private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutine sc@ { cont -> + private suspend fun lockSuspend(owner: Any?) = suspendAtomicCancellableCoroutineReusable sc@ { cont -> val waiter = LockCont(owner, cont) _state.loop { state -> when (state) { diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index a9df15cf49..b6ebc501ff 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -131,7 +131,7 @@ private class SemaphoreImpl( cur + 1 } - private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutine sc@ { cont -> + private suspend fun addToQueueAndSuspend() = suspendAtomicCancellableCoroutineReusable sc@ { cont -> val last = this.tail val enqIdx = enqIdx.getAndIncrement() val segment = getSegment(last, enqIdx / SEGMENT_SIZE) diff --git a/kotlinx-coroutines-core/jvm/test/FieldWalker.kt b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt new file mode 100644 index 0000000000..bb8b855498 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/FieldWalker.kt @@ -0,0 +1,115 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import java.lang.reflect.* +import java.util.* +import java.util.Collections.* +import kotlin.collections.ArrayList + +object FieldWalker { + + /* + * Reflectively starts to walk through object graph and returns identity set of all reachable objects. + */ + public fun walk(root: Any): Set { + val result = newSetFromMap(IdentityHashMap()) + result.add(root) + val stack = ArrayDeque() + stack.addLast(root) + while (stack.isNotEmpty()) { + val element = stack.removeLast() + val type = element.javaClass + type.visit(element, result, stack) + } + return result + } + + private fun Class<*>.visit( + element: Any, + result: MutableSet, + stack: ArrayDeque + ) { + val fields = fields() + fields.forEach { + it.isAccessible = true + val value = it.get(element) ?: return@forEach + if (result.add(value)) { + stack.addLast(value) + } + } + + if (isArray && !componentType.isPrimitive) { + val array = element as Array + array.filterNotNull().forEach { + if (result.add(it)) { + stack.addLast(it) + } + } + } + } + + private fun Class<*>.fields(): List { + val result = ArrayList() + var type = this + while (type != Any::class.java) { + val fields = type.declaredFields.filter { + !it.type.isPrimitive + && !Modifier.isStatic(it.modifiers) + && !(it.type.isArray && it.type.componentType.isPrimitive) + } + result.addAll(fields) + type = type.superclass + } + + return result + } + + // Debugging-only + @Suppress("UNUSED") + fun printPath(from: Any, to: Any) { + val pathNodes = ArrayList() + val visited = newSetFromMap(IdentityHashMap()) + visited.add(from) + if (findPath(from, to, visited, pathNodes)) { + pathNodes.reverse() + println(pathNodes.joinToString(" -> ", from.javaClass.simpleName + " -> ", "-> " + to.javaClass.simpleName)) + } else { + println("Path from $from to $to not found") + } + } + + private fun findPath(from: Any, to: Any, visited: MutableSet, pathNodes: MutableList): Boolean { + if (from === to) { + return true + } + + val type = from.javaClass + if (type.isArray) { + if (type.componentType.isPrimitive) return false + val array = from as Array + array.filterNotNull().forEach { + if (findPath(it, to, visited, pathNodes)) { + return true + } + } + return false + } + + val fields = type.fields() + fields.forEach { + it.isAccessible = true + val value = it.get(from) ?: return@forEach + if (!visited.add(value)) return@forEach + val found = findPath(value, to, visited, pathNodes) + if (found) { + pathNodes += from.javaClass.simpleName + ":" + it.name + return true + } + } + + return false + } +} diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt new file mode 100644 index 0000000000..b324e7ed3a --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -0,0 +1,195 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.selects.* +import org.junit.Test +import kotlin.coroutines.* +import kotlin.test.* + +class ReusableCancellableContinuationTest : TestBase() { + + @Test + fun testReusable() = runTest { + testContinuationsCount(10, 1, ::suspendAtomicCancellableCoroutineReusable) + } + + @Test + fun testRegular() = runTest { + testContinuationsCount(10, 10, ::suspendAtomicCancellableCoroutine) + } + + private suspend inline fun CoroutineScope.testContinuationsCount( + iterations: Int, + expectedInstances: Int, + suspender: suspend ((CancellableContinuation) -> Unit) -> Unit + ) { + val result = mutableSetOf>() + val job = coroutineContext[Job]!! + val channel = Channel>(1) + launch { + channel.consumeEach { + val f = FieldWalker.walk(job) + result.addAll(f.filterIsInstance>()) + it.resumeWith(Result.success(Unit)) + } + } + + repeat(iterations) { + suspender { + assertTrue(channel.offer(it)) + } + } + channel.close() + assertEquals(expectedInstances, result.size - 1) + } + + @Test + fun testCancelledOnClaimedCancel() = runTest { + expect(1) + try { + suspendAtomicCancellableCoroutineReusable { + it.cancel() + } + expectUnreached() + } catch (e: CancellationException) { + finish(2) + } + } + + @Test + fun testNotCancelledOnClaimedResume() = runTest({ it is CancellationException }) { + expect(1) + // Bind child at first + var continuation: Continuation<*>? = null + suspendAtomicCancellableCoroutineReusable { + expect(2) + continuation = it + launch { // Attach to the parent, avoid fast path + expect(3) + it.resume(Unit) + } + } + expect(4) + ensureActive() + // Verify child was bound + assertNotNull(FieldWalker.walk(coroutineContext[Job]!!).single { it === continuation }) + suspendAtomicCancellableCoroutineReusable { + expect(5) + coroutineContext[Job]!!.cancel() + it.resume(Unit) + } + assertFalse(isActive) + finish(6) + } + + @Test + fun testResumeReusablePreservesReference() = runTest { + expect(1) + var cont: Continuation? = null + launch { + cont!!.resumeWith(Result.success(Unit)) + } + suspendAtomicCancellableCoroutineReusable { + cont = it + } + ensureActive() + assertTrue { FieldWalker.walk(coroutineContext[Job]!!).contains(cont!!) } + finish(2) + } + + @Test + fun testResumeRegularDoesntPreservesReference() = runTest { + expect(1) + var cont: Continuation? = null + launch { // Attach to the parent, avoid fast path + cont!!.resumeWith(Result.success(Unit)) + } + suspendAtomicCancellableCoroutine { + cont = it + } + ensureActive() + assertFalse { FieldWalker.walk(coroutineContext[Job]!!).contains(cont!!) } + finish(2) + } + + @Test + fun testDetachedOnCancel() = runTest { + expect(1) + var cont: Continuation<*>? = null + try { + suspendAtomicCancellableCoroutineReusable { + cont = it + it.cancel() + } + expectUnreached() + } catch (e: CancellationException) { + assertFalse { FieldWalker.walk(coroutineContext[Job]!!).contains(cont!!) } + finish(2) + } + } + + @Test + fun testPropagatedCancel() = runTest({it is CancellationException}) { + val currentJob = coroutineContext[Job]!! + expect(1) + // Bind child at first + suspendAtomicCancellableCoroutineReusable { + expect(2) + // Attach to the parent, avoid fast path + launch { + expect(3) + it.resume(Unit) + } + } + expect(4) + ensureActive() + // Verify child was bound + assertEquals(1, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> }) + currentJob.cancel() + assertFalse(isActive) + // Child detached + assertEquals(0, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> }) + suspendAtomicCancellableCoroutineReusable { it.resume(Unit) } + suspendAtomicCancellableCoroutineReusable { it.resume(Unit) } + assertEquals(0, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> }) + + try { + suspendAtomicCancellableCoroutineReusable {} + } catch (e: CancellationException) { + assertEquals(0, FieldWalker.walk(currentJob).count { it is CancellableContinuation<*> }) + finish(5) + } + } + + @Test + fun testChannelMemoryLeak() = runTest { + val iterations = 100 + val channel = Channel() + launch { + repeat(iterations) { + select { + channel.onSend(Unit) {} + } + } + } + + val receiver = launch { + repeat(iterations) { + channel.receive() + } + expect(2) + val job = coroutineContext[Job]!! + // 1 for reusable CC, another one for outer joiner + assertEquals(2, FieldWalker.walk(job).count { it is CancellableContinuation<*> }) + } + expect(1) + receiver.join() + // Reference should be claimed at this point + assertEquals(0, FieldWalker.walk(receiver).count { it is CancellableContinuation<*> }) + finish(3) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelAtomicCancelStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelAtomicCancelStressTest.kt index 6223213d67..5afac37c9c 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelAtomicCancelStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelAtomicCancelStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -10,7 +10,7 @@ import org.junit.* import org.junit.Assert.* import org.junit.runner.* import org.junit.runners.* -import java.util.* +import kotlin.random.Random import java.util.concurrent.atomic.* /** @@ -57,9 +57,6 @@ class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBas private inline fun cancellable(done: Channel, block: () -> Unit) { try { block() - } catch (e: Throwable) { - if (e !is CancellationException) fail(e) - throw e } finally { if (!done.offer(true)) fail(IllegalStateException("failed to offer to done channel")) @@ -72,9 +69,8 @@ class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBas val deadline = System.currentTimeMillis() + TEST_DURATION launchSender() launchReceiver() - val rnd = Random() while (System.currentTimeMillis() < deadline && failed.get() == null) { - when (rnd.nextInt(3)) { + when (Random.nextInt(3)) { 0 -> { // cancel & restart sender stopSender() launchSender() @@ -104,12 +100,11 @@ class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBas private fun launchSender() { sender = scope.launch(start = CoroutineStart.ATOMIC) { - val rnd = Random() cancellable(senderDone) { var counter = 0 while (true) { val trySend = lastSent + 1 - when (rnd.nextInt(2)) { + when (Random.nextInt(2)) { 0 -> channel.send(trySend) 1 -> select { channel.onSend(trySend) {} } else -> error("cannot happen") @@ -134,10 +129,9 @@ class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBas private fun launchReceiver() { receiver = scope.launch(start = CoroutineStart.ATOMIC) { - val rnd = Random() cancellable(receiverDone) { while (true) { - val received = when (rnd.nextInt(2)) { + val received = when (Random.nextInt(2)) { 0 -> channel.receive() 1 -> select { channel.onReceive { it } } else -> error("cannot happen") diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSelectStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSelectStressTest.kt new file mode 100644 index 0000000000..0fa64276df --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSelectStressTest.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.channels + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.selects.* +import org.junit.* +import org.junit.Assert.* +import java.util.concurrent.atomic.AtomicLongArray + +class ChannelSelectStressTest : TestBase() { + private val pairedCoroutines = 3 + private val dispatcher = newFixedThreadPoolContext(pairedCoroutines * 2, "ChannelSelectStressTest") + private val scope = CoroutineScope(dispatcher) + private val elementsToSend = 20_000 * Long.SIZE_BITS * stressTestMultiplier + private val sent = atomic(0) + private val received = atomic(0) + private val receivedArray = AtomicLongArray(elementsToSend / Long.SIZE_BITS) + private val channel = Channel() + + @After + fun tearDown() { + dispatcher.close() + } + + @Test + fun testAtomicCancelStress() = runTest { + repeat(pairedCoroutines) { launchSender() } + repeat(pairedCoroutines) { launchReceiver() } + val job = scope.coroutineContext[Job] as CompletableJob + job.complete() + job.join() + + for (i in 0 until receivedArray.length()) { + assertEquals("Missing element detected", 0L.inv(), receivedArray[i]) + } + } + + private fun launchSender() { + scope.launch { + while (sent.value < elementsToSend) { + val element = sent.getAndIncrement() + if (element >= elementsToSend) break + select { channel.onSend(element) {} } + } + channel.close(CancellationException()) + } + } + + private fun launchReceiver() { + scope.launch { + while (received.value != elementsToSend) { + val element = select { channel.onReceive { it } } + received.incrementAndGet() + val index = (element / Long.SIZE_BITS) + val mask = 1L shl (element % Long.SIZE_BITS.toLong()).toInt() + while (true) { + val bits = receivedArray.get(index) + if (bits and mask != 0L) { + error("Detected duplicate") + } + if (receivedArray.compareAndSet(index, bits, bits or mask)) break + } + } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt index e7b46cd105..0f87960f24 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryTest.kt @@ -102,8 +102,7 @@ class StackTraceRecoveryTest : TestBase() { "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testReceiveFromChannel\$1.invokeSuspend(StackTraceRecoveryTest.kt:89)", "Caused by: java.lang.IllegalArgumentException\n" + "\tat kotlinx.coroutines.exceptions.StackTraceRecoveryTest\$testReceiveFromChannel\$1\$job\$1.invokeSuspend(StackTraceRecoveryTest.kt:93)\n" + - "\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n" + - "\tat kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:152)") + "\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n") expect(3) job.join() finish(4) diff --git a/kotlinx-coroutines-core/jvm/test/flow/ConsumeAsFlowLeakTest.kt b/kotlinx-coroutines-core/jvm/test/flow/ConsumeAsFlowLeakTest.kt new file mode 100644 index 0000000000..3fcceaf1fb --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/ConsumeAsFlowLeakTest.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.junit.Test +import kotlin.test.* + +class ConsumeAsFlowLeakTest : TestBase() { + + private data class Box(val i: Int) + + // In companion to avoid references through runTest + companion object { + private val first = Box(4) + private val second = Box(5) + } + + // @Test //ignored until KT-33986 + fun testReferenceIsNotRetained() = testReferenceNotRetained(true) + + @Test + fun testReferenceIsNotRetainedNoSuspension() = testReferenceNotRetained(false) + + private fun testReferenceNotRetained(shouldSuspendOnSend: Boolean) = runTest { + val channel = BroadcastChannel(1) + val job = launch { + expect(2) + channel.asFlow().collect { + expect(it.i) + } + } + + expect(1) + yield() + expect(3) + channel.send(first) + if (shouldSuspendOnSend) yield() + channel.send(second) + yield() + assertEquals(0, FieldWalker.walk(channel).count { it === second }) + finish(6) + job.cancelAndJoin() + } +} diff --git a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt index 223a33469f..bf6cbdf10c 100644 --- a/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt +++ b/kotlinx-coroutines-debug/test/SanitizedProbesTest.kt @@ -68,7 +68,7 @@ class SanitizedProbesTest : DebugTestBase() { "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest.access\$createActiveDeferred(SanitizedProbesTest.kt:16)\n" + "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest\$testCoroutinesDump\$1.invokeSuspend(SanitizedProbesTest.kt:57)\n" + "\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n" + - "\tat kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:237)\n" + + "\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:237)\n" + "\tat kotlinx.coroutines.TestBase.runTest\$default(TestBase.kt:141)\n" + "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest.testCoroutinesDump(SanitizedProbesTest.kt:56)" ) @@ -96,7 +96,7 @@ class SanitizedProbesTest : DebugTestBase() { "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest.access\$launchSelector(SanitizedProbesTest.kt:16)\n" + "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest\$testSelectBuilder\$1.invokeSuspend(SanitizedProbesTest.kt:89)\n" + "\tat kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)\n" + - "\tat kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:233)\n" + + "\tat kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:233)\n" + "\tat kotlinx.coroutines.TestBase.runTest\$default(TestBase.kt:154)\n" + "\tat definitely.not.kotlinx.coroutines.SanitizedProbesTest.testSelectBuilder(SanitizedProbesTest.kt:88)") finish(4)