Skip to content

Get rid of workaround for KT-16222 that is long time fixed #3446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletionState.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal data class CompletedWithCancellation(
* or artificial [CancellationException] if no cause was provided
*/
internal open class CompletedExceptionally(
@JvmField public val cause: Throwable,
@JvmField val cause: Throwable,
handled: Boolean = false
) {
private val _handled = atomic(handled)
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/Yield.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ public value class ChannelResult<out T>
override fun toString(): String = "Closed($cause)"
}

@Suppress("NOTHING_TO_INLINE")
@InternalCoroutinesApi
public companion object {
private val failed = Failed()
Expand Down Expand Up @@ -512,7 +511,6 @@ public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?)
contract {
callsInPlace(action, InvocationKind.AT_MOST_ONCE)
}
@Suppress("UNCHECKED_CAST")
if (holder is ChannelResult.Failed) action(exceptionOrNull())
return this
}
Expand All @@ -531,7 +529,6 @@ public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?)
contract {
callsInPlace(action, InvocationKind.AT_MOST_ONCE)
}
@Suppress("UNCHECKED_CAST")
if (holder is ChannelResult.Closed) action(exceptionOrNull())
return this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlin.coroutines.*

@Suppress("DEPRECATION")
internal open class ChannelCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: Channel<E>,
Expand All @@ -17,6 +16,7 @@ internal open class ChannelCoroutine<E>(

val channel: Channel<E> get() = this

@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
override fun cancel() {
cancelInternal(defaultCancellationException())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.jvm.*

internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
Expand Down Expand Up @@ -54,7 +53,6 @@ public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.()
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as ReceiveChannel<E?>).receiveOrNull()
}

Expand All @@ -66,7 +64,6 @@ public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
level = DeprecationLevel.ERROR
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as ReceiveChannel<E?>).onReceiveOrNull
}

Expand Down
29 changes: 2 additions & 27 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,10 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
ensureActive()
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching".
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
// fix retention of the last emitted value.
// See https://youtrack.jetbrains.com/issue/KT-16222
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1333
var cause: Throwable? = null
try {
while (true) {
// :KLUDGE: This "run" call is resolved to an extension function "run" and forces the size of
// spilled state to increase by an additional slot, so there are 4 object local variables spilled here
// which makes the size of spill state equal to the 4 slots that are spilled around subsequent "emit"
// call, ensuring that the previously emitted value is not retained in the state while receiving
// the next one.
// L$0 <- this
// L$1 <- channel
// L$2 <- cause
// L$3 <- this$run (actually equal to this)
val result = run { channel.receiveCatching() }
if (result.isClosed) {
result.exceptionOrNull()?.let { throw it }
break // returns normally when result.closeCause == null
}
// result is spilled here to the coroutine state and retained after the call, even though
// it is not actually needed in the next loop iteration.
// L$0 <- this
// L$1 <- channel
// L$2 <- cause
// L$3 <- result
emit(result.getOrThrow())
for (element in channel) {
emit(element)
}
} catch (e: Throwable) {
cause = e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ internal abstract class AbstractSharedFlowSlot<F> {
}

internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
@Suppress("UNCHECKED_CAST")
protected var slots: Array<S?>? = null // allocated when needed
private set
protected var nCollectors = 0 // number of allocated (!free) slots
Expand Down
5 changes: 1 addition & 4 deletions kotlinx-coroutines-core/common/src/flow/internal/Combine.kt
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
@file:Suppress("UNCHECKED_CAST") // KT-32203

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

private typealias Update = IndexedValue<Any?>

@PublishedApi
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/terminal/Count.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.jvm.*

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal expect fun <E> subscriberList(): SubscribersList<E>

internal expect class ReentrantLock() {
fun tryLock(): Boolean
fun unlock(): Unit
fun unlock()
}

internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ internal class DispatchedContinuation<in T>(

// We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
// It is used only in Continuation<T>.resumeCancellableWith
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
Expand Down Expand Up @@ -235,7 +234,8 @@ internal class DispatchedContinuation<in T>(
}
}

@Suppress("NOTHING_TO_INLINE")
// inline here is to save us an entry on the stack for the sake of better stacktraces

inline fun resumeCancelled(state: Any?): Boolean {
val job = context[Job]
if (job != null && !job.isActive) {
Expand All @@ -247,7 +247,6 @@ internal class DispatchedContinuation<in T>(
return false
}

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
withContinuationContext(continuation, countOrElement) {
continuation.resumeWith(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ internal open class UnbiasedSelectImplementation<R>(context: CoroutineContext) :
return super.doSelect()
}

@Suppress("UNCHECKED_CAST")
private fun shuffleAndRegisterClauses() = try {
clausesToRegister.shuffle()
clausesToRegister.forEach { it.register() }
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/selects/WhileSelect.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ import kotlinx.coroutines.*
*/
@ExperimentalCoroutinesApi
public suspend inline fun whileSelect(crossinline builder: SelectBuilder<Boolean>.() -> Unit) {
while(select<Boolean>(builder)) {}
while(select(builder)) { /* do nothing */ }
}
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/sync/Mutex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
lockSuspend(owner)
}

private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable { cont ->
val contWithOwner = CancellableContinuationWithOwner(cont, owner)
acquire(contWithOwner)
}
Expand Down