-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Don't allocate threads on every dispatch in Native's thread pools #3595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
8330a36
e69842d
14affab
e495627
aa30736
1f83a18
e4a7463
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines | ||
|
||
import kotlinx.atomicfu.* | ||
import kotlin.coroutines.* | ||
import kotlin.test.* | ||
|
||
class MultithreadedDispatcherStressTest { | ||
val shared = atomic(0) | ||
|
||
/** | ||
* Tests that [newFixedThreadPoolContext] will not drop tasks when closed. | ||
*/ | ||
@Test | ||
fun testClosingNotDroppingTasks() { | ||
repeat(7) { | ||
shared.value = 0 | ||
val nThreads = it + 1 | ||
val dispatcher = newFixedThreadPoolContext(nThreads, "testMultiThreadedContext") | ||
repeat(1_000) { | ||
dispatcher.dispatch(EmptyCoroutineContext, Runnable { | ||
shared.incrementAndGet() | ||
}) | ||
} | ||
dispatcher.close() | ||
while (shared.value < 1_000) { | ||
// spin. | ||
// the test will hang here if the dispatcher drops tasks. | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines | ||
|
||
import kotlin.native.concurrent.* | ||
import kotlin.test.* | ||
|
||
class MultithreadedDispatchersTest { | ||
/** | ||
* Test that [newFixedThreadPoolContext] does not allocate more dispatchers than it needs to. | ||
* Incidentally also tests that it will allocate enough workers for its needs. Otherwise, the test will hang. | ||
*/ | ||
@Test | ||
fun testNotAllocatingExtraDispatchers() { | ||
suspend fun spin(set: MutableSet<Worker>) { | ||
repeat(100) { | ||
set.add(Worker.current) | ||
delay(1) | ||
} | ||
} | ||
val dispatcher = newFixedThreadPoolContext(64, "test") | ||
try { | ||
runBlocking { | ||
val encounteredWorkers = mutableSetOf<Worker>() | ||
var canStart1 = false | ||
var canStart2 = false | ||
val coroutine1 = launch(dispatcher) { | ||
while (!canStart1) { | ||
// intentionally empty | ||
} | ||
canStart2 = true | ||
spin(encounteredWorkers) | ||
} | ||
val coroutine2 = launch(dispatcher) { | ||
canStart1 = true | ||
while (!canStart2) { | ||
// intentionally empty | ||
} | ||
spin(encounteredWorkers) | ||
} | ||
listOf(coroutine1, coroutine2).joinAll() | ||
assertEquals(2, encounteredWorkers.size) | ||
} | ||
} finally { | ||
dispatcher.close() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test failed in the CI with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like there are at least two data-races:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Wow, what a ridiculous mistake. Looks like I've exhausted my resource for the understanding of concurrency for this PR on the main code.
I still fail to see the problem, sorry, could you spell it out? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We all have been there :)
Non-atomic variable is being read and written concurrently, I honestly not sure what guarantees K/N (or LLVM) provides in such scenarios. In theory, the compiler is allowed to host memory read in |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't its capacity be limited with
workersCount
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can. Would it improve something? I wanted the code to say "we don't want to ever drop anything here", and "buffer size N" does imply "drop if larger than N."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing really, was rather surprised that it has unlimited, while effectively never having more than
workersCount
elements/waiters.Maybe it's worth adding asserts/postconditions to each
trySend
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some checks. Are they what you meant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's it, thanks