Skip to content

Commit 45ba58e

Browse files
authored
Fix BlockHound false positives (#2331)
Fixes #2302 Fixes #2190 Partially fixes #2303
1 parent ee78090 commit 45ba58e

File tree

2 files changed

+179
-5
lines changed

2 files changed

+179
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,168 @@
11
@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
2+
23
package kotlinx.coroutines.debug
34

4-
import reactor.blockhound.BlockHound
55
import kotlinx.coroutines.scheduling.*
6+
import reactor.blockhound.*
67
import reactor.blockhound.integration.*
78

89
@Suppress("UNUSED")
9-
public class CoroutinesBlockHoundIntegration: BlockHoundIntegration {
10+
public class CoroutinesBlockHoundIntegration : BlockHoundIntegration {
11+
12+
override fun applyTo(builder: BlockHound.Builder): Unit = with(builder) {
13+
allowBlockingCallsInPrimitiveImplementations()
14+
allowBlockingWhenEnqueuingTasks()
15+
allowServiceLoaderInvocationsOnInit()
16+
allowBlockingCallsInReflectionImpl()
17+
/* The predicates that define that BlockHound should only report blocking calls from threads that are part of
18+
the coroutine thread pool and currently execute a CPU-bound coroutine computation. */
19+
addDynamicThreadPredicate { isSchedulerWorker(it) }
20+
nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
21+
}
22+
23+
/**
24+
* Allows blocking calls in various coroutine structures, such as flows and channels.
25+
*
26+
* They use locks in implementations, though only for protecting short pieces of fast and well-understood code, so
27+
* locking in such places doesn't affect the program liveness.
28+
*/
29+
private fun BlockHound.Builder.allowBlockingCallsInPrimitiveImplementations() {
30+
allowBlockingCallsInJobSupport()
31+
allowBlockingCallsInThreadSafeHeap()
32+
allowBlockingCallsInFlow()
33+
allowBlockingCallsInChannels()
34+
}
35+
36+
/**
37+
* Allows blocking inside [kotlinx.coroutines.JobSupport].
38+
*/
39+
private fun BlockHound.Builder.allowBlockingCallsInJobSupport() {
40+
for (method in listOf("finalizeFinishingState", "invokeOnCompletion", "makeCancelling",
41+
"tryMakeCompleting"))
42+
{
43+
allowBlockingCallsInside("kotlinx.coroutines.JobSupport", method)
44+
}
45+
}
46+
47+
/**
48+
* Allows blocking inside [kotlinx.coroutines.internal.ThreadSafeHeap].
49+
*/
50+
private fun BlockHound.Builder.allowBlockingCallsInThreadSafeHeap() {
51+
for (method in listOf("clear", "peek", "removeFirstOrNull", "addLast")) {
52+
allowBlockingCallsInside("kotlinx.coroutines.internal.ThreadSafeHeap", method)
53+
}
54+
// [addLastIf] is only used in [EventLoop.common]. Users of [removeFirstIf]:
55+
allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineDispatcher", "doActionsUntil")
56+
allowBlockingCallsInside("kotlinx.coroutines.test.TestCoroutineContext", "triggerActions")
57+
}
58+
59+
private fun BlockHound.Builder.allowBlockingCallsInFlow() {
60+
allowBlockingCallsInsideStateFlow()
61+
allowBlockingCallsInsideSharedFlow()
62+
}
63+
64+
/**
65+
* Allows blocking inside the implementation of [kotlinx.coroutines.flow.StateFlow].
66+
*/
67+
private fun BlockHound.Builder.allowBlockingCallsInsideStateFlow() {
68+
allowBlockingCallsInside("kotlinx.coroutines.flow.StateFlowImpl", "updateState")
69+
}
70+
71+
/**
72+
* Allows blocking inside the implementation of [kotlinx.coroutines.flow.SharedFlow].
73+
*/
74+
private fun BlockHound.Builder.allowBlockingCallsInsideSharedFlow() {
75+
for (method in listOf("emitSuspend", "awaitValue", "getReplayCache", "tryEmit", "cancelEmitter",
76+
"tryTakeValue", "resetReplayCache"))
77+
{
78+
allowBlockingCallsInside("kotlinx.coroutines.flow.SharedFlowImpl", method)
79+
}
80+
for (method in listOf("getSubscriptionCount", "allocateSlot", "freeSlot")) {
81+
allowBlockingCallsInside("kotlinx.coroutines.flow.internal.AbstractSharedFlow", method)
82+
}
83+
}
84+
85+
private fun BlockHound.Builder.allowBlockingCallsInChannels() {
86+
allowBlockingCallsInArrayChannel()
87+
allowBlockingCallsInBroadcastChannel()
88+
allowBlockingCallsInConflatedChannel()
89+
}
90+
91+
/**
92+
* Allows blocking inside [kotlinx.coroutines.channels.ArrayChannel].
93+
*/
94+
private fun BlockHound.Builder.allowBlockingCallsInArrayChannel() {
95+
for (method in listOf(
96+
"pollInternal", "isEmpty", "isFull", "isClosedForReceive", "offerInternal", "offerSelectInternal",
97+
"enqueueSend", "pollInternal", "pollSelectInternal", "enqueueReceiveInternal", "onCancelIdempotent"))
98+
{
99+
allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayChannel", method)
100+
}
101+
}
102+
103+
/**
104+
* Allows blocking inside [kotlinx.coroutines.channels.ArrayBroadcastChannel].
105+
*/
106+
private fun BlockHound.Builder.allowBlockingCallsInBroadcastChannel() {
107+
for (method in listOf("offerInternal", "offerSelectInternal", "updateHead")) {
108+
allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel", method)
109+
}
110+
for (method in listOf("checkOffer", "pollInternal", "pollSelectInternal")) {
111+
allowBlockingCallsInside("kotlinx.coroutines.channels.ArrayBroadcastChannel\$Subscriber", method)
112+
}
113+
}
114+
115+
/**
116+
* Allows blocking inside [kotlinx.coroutines.channels.ConflatedChannel].
117+
*/
118+
private fun BlockHound.Builder.allowBlockingCallsInConflatedChannel() {
119+
for (method in listOf("offerInternal", "offerSelectInternal", "pollInternal", "pollSelectInternal",
120+
"onCancelIdempotent"))
121+
{
122+
allowBlockingCallsInside("kotlinx.coroutines.channels.ConflatedChannel", method)
123+
}
124+
}
125+
126+
/**
127+
* Allows blocking when enqueuing tasks into a thread pool.
128+
*
129+
* Without this, the following code breaks:
130+
* ```
131+
* withContext(Dispatchers.Default) {
132+
* withContext(newSingleThreadContext("singleThreadedContext")) {
133+
* }
134+
* }
135+
* ```
136+
*/
137+
private fun BlockHound.Builder.allowBlockingWhenEnqueuingTasks() {
138+
/* This method may block as part of its implementation, but is probably safe. */
139+
allowBlockingCallsInside("java.util.concurrent.ScheduledThreadPoolExecutor", "execute")
140+
}
141+
142+
/**
143+
* Allows instances of [java.util.ServiceLoader] being called.
144+
*
145+
* Each instance is listed separately; another approach could be to generally allow the operations performed by
146+
* service loaders, as they can generally be considered safe. This was not done here because ServiceLoader has a
147+
* large API surface, with some methods being hidden as implementation details (in particular, the implementation of
148+
* its iterator is completely opaque). Relying on particular names being used in ServiceLoader's implementation
149+
* would be brittle, so here we only provide clearance rules for some specific instances.
150+
*/
151+
private fun BlockHound.Builder.allowServiceLoaderInvocationsOnInit() {
152+
allowBlockingCallsInside("kotlinx.coroutines.reactive.ReactiveFlowKt", "<clinit>")
153+
allowBlockingCallsInside("kotlinx.coroutines.CoroutineExceptionHandlerImplKt", "<clinit>")
154+
// not part of the coroutines library, but it would be nice if reflection also wasn't considered blocking
155+
allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.resolve.OverridingUtil", "<clinit>")
156+
}
10157

11-
override fun applyTo(builder: BlockHound.Builder) {
12-
builder.addDynamicThreadPredicate { isSchedulerWorker(it) }
13-
builder.nonBlockingThreadPredicate { p -> p.or { mayNotBlock(it) } }
158+
/**
159+
* Allows some blocking calls from the reflection API.
160+
*
161+
* The API is big, so surely some other blocking calls will show up, but with these rules in place, at least some
162+
* simple examples work without problems.
163+
*/
164+
private fun BlockHound.Builder.allowBlockingCallsInReflectionImpl() {
165+
allowBlockingCallsInside("kotlin.reflect.jvm.internal.impl.builtins.jvm.JvmBuiltInsPackageFragmentProvider", "findPackage")
14166
}
15167

16168
}

kotlinx-coroutines-debug/test/BlockHoundTest.kt

+22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kotlinx.coroutines.debug
22
import kotlinx.coroutines.*
3+
import kotlinx.coroutines.channels.*
34
import org.junit.*
45
import reactor.blockhound.*
56

@@ -52,6 +53,27 @@ class BlockHoundTest : TestBase() {
5253
}
5354
}
5455

56+
@Test
57+
fun testChannelsNotBeingConsideredBlocking() = runTest {
58+
withContext(Dispatchers.Default) {
59+
// Copy of kotlinx.coroutines.channels.ArrayChannelTest.testSimple
60+
val q = Channel<Int>(1)
61+
check(q.isEmpty)
62+
check(!q.isClosedForReceive)
63+
check(!q.isClosedForSend)
64+
val sender = launch {
65+
q.send(1)
66+
q.send(2)
67+
}
68+
val receiver = launch {
69+
q.receive() == 1
70+
q.receive() == 2
71+
}
72+
sender.join()
73+
receiver.join()
74+
}
75+
}
76+
5577
@Test(expected = BlockingOperationError::class)
5678
fun testReusingThreadsFailure() = runTest {
5779
val n = 100

0 commit comments

Comments
 (0)