Skip to content

Commit 4e97c83

Browse files
authored
Get rid of workaround for KT-16222 that is long time fixed (#3446)
* The correctness of the change is verified by ConsumeAsFlowLeakTest * Follow-up cleanup here and there
1 parent 4a44fef commit 4e97c83

File tree

14 files changed

+10
-49
lines changed

14 files changed

+10
-49
lines changed

kotlinx-coroutines-core/common/src/CompletionState.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ internal data class CompletedWithCancellation(
4040
* or artificial [CancellationException] if no cause was provided
4141
*/
4242
internal open class CompletedExceptionally(
43-
@JvmField public val cause: Throwable,
43+
@JvmField val cause: Throwable,
4444
handled: Boolean = false
4545
) {
4646
private val _handled = atomic(handled)

kotlinx-coroutines-core/common/src/Yield.kt

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package kotlinx.coroutines
66

77
import kotlinx.coroutines.internal.*
8-
import kotlin.coroutines.*
98
import kotlin.coroutines.intrinsics.*
109

1110
/**

kotlinx-coroutines-core/common/src/channels/Channel.kt

-3
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,6 @@ public value class ChannelResult<out T>
448448
override fun toString(): String = "Closed($cause)"
449449
}
450450

451-
@Suppress("NOTHING_TO_INLINE")
452451
@InternalCoroutinesApi
453452
public companion object {
454453
private val failed = Failed()
@@ -512,7 +511,6 @@ public inline fun <T> ChannelResult<T>.onFailure(action: (exception: Throwable?)
512511
contract {
513512
callsInPlace(action, InvocationKind.AT_MOST_ONCE)
514513
}
515-
@Suppress("UNCHECKED_CAST")
516514
if (holder is ChannelResult.Failed) action(exceptionOrNull())
517515
return this
518516
}
@@ -531,7 +529,6 @@ public inline fun <T> ChannelResult<T>.onClosed(action: (exception: Throwable?)
531529
contract {
532530
callsInPlace(action, InvocationKind.AT_MOST_ONCE)
533531
}
534-
@Suppress("UNCHECKED_CAST")
535532
if (holder is ChannelResult.Closed) action(exceptionOrNull())
536533
return this
537534
}

kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package kotlinx.coroutines.channels
77
import kotlinx.coroutines.*
88
import kotlin.coroutines.*
99

10-
@Suppress("DEPRECATION")
1110
internal open class ChannelCoroutine<E>(
1211
parentContext: CoroutineContext,
1312
protected val _channel: Channel<E>,
@@ -17,6 +16,7 @@ internal open class ChannelCoroutine<E>(
1716

1817
val channel: Channel<E> get() = this
1918

19+
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
2020
override fun cancel() {
2121
cancelInternal(defaultCancellationException())
2222
}

kotlinx-coroutines-core/common/src/channels/Channels.common.kt

-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ package kotlinx.coroutines.channels
1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.selects.*
1313
import kotlin.contracts.*
14-
import kotlin.coroutines.*
1514
import kotlin.jvm.*
1615

1716
internal const val DEFAULT_CLOSE_MESSAGE = "Channel was closed"
@@ -54,7 +53,6 @@ public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.()
5453
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
5554
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
5655
public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
57-
@Suppress("DEPRECATION", "UNCHECKED_CAST")
5856
return (this as ReceiveChannel<E?>).receiveOrNull()
5957
}
6058

@@ -66,7 +64,6 @@ public suspend fun <E : Any> ReceiveChannel<E>.receiveOrNull(): E? {
6664
level = DeprecationLevel.ERROR
6765
) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0
6866
public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
69-
@Suppress("DEPRECATION", "UNCHECKED_CAST")
7067
return (this as ReceiveChannel<E?>).onReceiveOrNull
7168
}
7269

kotlinx-coroutines-core/common/src/flow/Channels.kt

+2-27
Original file line numberDiff line numberDiff line change
@@ -31,35 +31,10 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni
3131

3232
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
3333
ensureActive()
34-
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching".
35-
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
36-
// fix retention of the last emitted value.
37-
// See https://youtrack.jetbrains.com/issue/KT-16222
38-
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1333
3934
var cause: Throwable? = null
4035
try {
41-
while (true) {
42-
// :KLUDGE: This "run" call is resolved to an extension function "run" and forces the size of
43-
// spilled state to increase by an additional slot, so there are 4 object local variables spilled here
44-
// which makes the size of spill state equal to the 4 slots that are spilled around subsequent "emit"
45-
// call, ensuring that the previously emitted value is not retained in the state while receiving
46-
// the next one.
47-
// L$0 <- this
48-
// L$1 <- channel
49-
// L$2 <- cause
50-
// L$3 <- this$run (actually equal to this)
51-
val result = run { channel.receiveCatching() }
52-
if (result.isClosed) {
53-
result.exceptionOrNull()?.let { throw it }
54-
break // returns normally when result.closeCause == null
55-
}
56-
// result is spilled here to the coroutine state and retained after the call, even though
57-
// it is not actually needed in the next loop iteration.
58-
// L$0 <- this
59-
// L$1 <- channel
60-
// L$2 <- cause
61-
// L$3 <- result
62-
emit(result.getOrThrow())
36+
for (element in channel) {
37+
emit(element)
6338
}
6439
} catch (e: Throwable) {
6540
cause = e

kotlinx-coroutines-core/common/src/flow/internal/AbstractSharedFlow.kt

-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ internal abstract class AbstractSharedFlowSlot<F> {
1919
}
2020

2121
internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
22-
@Suppress("UNCHECKED_CAST")
2322
protected var slots: Array<S?>? = null // allocated when needed
2423
private set
2524
protected var nCollectors = 0 // number of allocated (!free) slots

kotlinx-coroutines-core/common/src/flow/internal/Combine.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,14 @@
11
/*
22
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
33
*/
4-
@file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
4+
@file:Suppress("UNCHECKED_CAST") // KT-32203
55

66
package kotlinx.coroutines.flow.internal
77

88
import kotlinx.coroutines.*
99
import kotlinx.coroutines.channels.*
1010
import kotlinx.coroutines.flow.*
1111
import kotlinx.coroutines.internal.*
12-
import kotlin.coroutines.*
13-
import kotlin.coroutines.intrinsics.*
14-
1512
private typealias Update = IndexedValue<Any?>
1613

1714
@PublishedApi

kotlinx-coroutines-core/common/src/flow/terminal/Count.kt

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package kotlinx.coroutines.flow
99

10-
import kotlinx.coroutines.*
1110
import kotlin.jvm.*
1211

1312
/**

kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ internal expect fun <E> subscriberList(): SubscribersList<E>
1818

1919
internal expect class ReentrantLock() {
2020
fun tryLock(): Boolean
21-
fun unlock(): Unit
21+
fun unlock()
2222
}
2323

2424
internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T

kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ internal class DispatchedContinuation<in T>(
207207

208208
// We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
209209
// It is used only in Continuation<T>.resumeCancellableWith
210-
@Suppress("NOTHING_TO_INLINE")
211210
inline fun resumeCancellableWith(
212211
result: Result<T>,
213212
noinline onCancellation: ((cause: Throwable) -> Unit)?
@@ -235,7 +234,8 @@ internal class DispatchedContinuation<in T>(
235234
}
236235
}
237236

238-
@Suppress("NOTHING_TO_INLINE")
237+
// inline here is to save us an entry on the stack for the sake of better stacktraces
238+
239239
inline fun resumeCancelled(state: Any?): Boolean {
240240
val job = context[Job]
241241
if (job != null && !job.isActive) {
@@ -247,7 +247,6 @@ internal class DispatchedContinuation<in T>(
247247
return false
248248
}
249249

250-
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
251250
inline fun resumeUndispatchedWith(result: Result<T>) {
252251
withContinuationContext(continuation, countOrElement) {
253252
continuation.resumeWith(result)

kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt

-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ internal open class UnbiasedSelectImplementation<R>(context: CoroutineContext) :
5858
return super.doSelect()
5959
}
6060

61-
@Suppress("UNCHECKED_CAST")
6261
private fun shuffleAndRegisterClauses() = try {
6362
clausesToRegister.shuffle()
6463
clausesToRegister.forEach { it.register() }

kotlinx-coroutines-core/common/src/selects/WhileSelect.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ import kotlinx.coroutines.*
2828
*/
2929
@ExperimentalCoroutinesApi
3030
public suspend inline fun whileSelect(crossinline builder: SelectBuilder<Boolean>.() -> Unit) {
31-
while(select<Boolean>(builder)) {}
31+
while(select(builder)) { /* do nothing */ }
3232
}

kotlinx-coroutines-core/common/src/sync/Mutex.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ internal open class MutexImpl(locked: Boolean) : SemaphoreImpl(1, if (locked) 1
165165
lockSuspend(owner)
166166
}
167167

168-
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> { cont ->
168+
private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable { cont ->
169169
val contWithOwner = CancellableContinuationWithOwner(cont, owner)
170170
acquire(contWithOwner)
171171
}

0 commit comments

Comments
 (0)