-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Graceful degradation of spinlock-like implementations during internal errors or system malfunction #3635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I would much rather use only
This will generally not be possible to do; supporting a non-spin waiting system requires use of a compare-and-set or get-and-set call which is more expensive than the current blind write. I would much rather have spin-free code than save a tiny number of cycles in this bit of infrastructure. https://abseil.io/docs/cpp/atomic_danger |
Good point,
Right. Though the CAS on its own is unlikely to seriously contribute in the system performance (AFAIR uncontended CAS'es are pretty close to regular barrier'd writes on modern architectures) What I'd like to achieve is to ensure that randevouz logic (specifically, |
Following up on this - check out slide 56 of this presentation which shows AMD's recommendations, summarized as:
I really think you should not have any unbounded spin sections. The tools that could be used for scheduler-cooperative spinlocking are not available in the JVM, and certainly not on all platforms. |
If I understand this correctly, in all the code locations referenced in the bug, an atomic reference is used to communicate state updates between threads. Checks on the state are performed in an active waiting loop; exiting the loop is typically conditioned on some precondition (the active waiting part) – in addition to being able to change the state safely. The first case is the bug description is already using the atomicReference loop() extension (which doesn't seem to do anything other than spinning). So to address the main issue without significant code changes, we could look into the feasibility of providing an atomic reference that provides a loop instruction which falls back to "safe" waiting if spinning for too long, e.g exceeding a given amount of spins. Of course cleaner individual solutions would be preferable, but I wonder if something like this could get us out of the bad spot quickly.... Perhaps something along these lines: /**
* Sketch of a AtomicReference drop-in with a loop() method that switches from spinning to waiting
* for a value change after a set number of spins.
*/
value class LoopableAtomicReference<T>
private constructor(private val atomicRef: AtomicReference<Any?>) {
val value: T
get() {
val result = atomicRef.value
return (if (result is Waiting<*>) result.value else result) as T
}
fun compareAndSet(expected: T, newValue: T): Boolean {
if (atomicRef.compareAndSet(expected, newValue)) {
return true
}
val value = atomicRef.value
if (value !is Waiting<*> || value.value != expected) {
return false
}
// value is an instance of Waiting and the wrapped value is equal to expected:
// Lock the mutex, try setting and -- if successful -- signal the condition.
val waiting = value as Waiting<*>
pthread_mutex_lock(waiting.mutex.ptr)
if (!atomicRef.compareAndSet(value, newValue)) {
pthread_mutex_unlock(waiting.mutex.ptr)
return false
}
pthread_cond_broadcast(waiting.condition.ptr) // EDIT: was signal()
pthread_mutex_unlock(waiting.mutex.ptr)
return true
}
/** task() is expected to jump out of the loop e.g. via a higher level return statement. */
// TODO: should task return a boolean indicating whether the loop should continue?
fun loop(task: (T) -> Unit) {
for (i in 0..MAX_BUSY_WAIT_SPINS) {
task(value)
}
while (true) {
// Call the task with the current value first: If we weren't sucessful below because of a
// value change in the meantime, that means it makes sense to try again.
val oldValue = atomicRef.value
task((if (oldValue is Waiting<*>) oldValue.value else oldValue) as T)
// Wrap the value in a Waiting instance
val waiting: Waiting<*>
if (oldValue is Waiting<*>) {
waiting = oldValue
pthread_mutex_lock(waiting.mutex.ptr)
if (atomicRef.value != oldValue) {
// Try again if the value has changed in the meantime
pthread_mutex_unlock(waiting.mutex.ptr)
continue
}
} else {
waiting = Waiting(oldValue)
pthread_mutex_lock(waiting.mutex.ptr)
if (!atomicRef.compareAndSet(oldValue, waiting)) {
// Try again if the value has changed in the meantime
pthread_mutex_unlock(waiting.mutex.ptr)
continue
}
}
// Wait for a change
pthread_cond_wait(waiting.condition.ptr, waiting.mutex.ptr)
pthread_mutex_unlock(waiting.mutex.ptr)
}
}
internal class Waiting<T>(val value: T) {
val mutex: pthread_mutex_t = nativeHeap.alloc()
val condition: pthread_cond_t = nativeHeap.alloc()
init {
pthread_mutex_init(mutex.ptr, DEFAULT_MUTEX_ATTR.ptr)
pthread_cond_init(condition.ptr, DEFAULT_COND_ATTR.ptr)
}
}
companion object {
const val MAX_BUSY_WAIT_SPINS = 1000
private val DEFAULT_COND_ATTR: pthread_condattr_t = nativeHeap.alloc()
private val DEFAULT_MUTEX_ATTR: pthread_mutexattr_t = nativeHeap.alloc()
init {
pthread_mutexattr_init(DEFAULT_MUTEX_ATTR.ptr)
}
fun <T> LoopableAtomicReference(value: T) =
LoopableAtomicReference<T>(AtomicReference<Any?>(value))
}
} |
That works, but we should have an explicit open signal so that we can have more than one waiting thread if needed. So we'd have either: internal class Waiting<T>(val value: T) {
val mutex: pthread_mutex_t = nativeHeap.alloc()
val condition: pthread_cond_t = nativeHeap.alloc()
var open;
init {
pthread_mutex_init(mutex.ptr, DEFAULT_MUTEX_ATTR.ptr)
pthread_cond_init(condition.ptr, DEFAULT_COND_ATTR.ptr)
}
func notify() {
pthread_mutex_lock(mutex.ptr)
try {
open = true;
pthread_cond_signal(waiting.condition.ptr)
} finally {
pthread_mutex_unlock(mutex.ptr)
}
}
func await() {
pthread_mutex_lock(mutex.ptr)
try {
while (!open) {
pthread_cond_wait(waiting.condition.ptr)
}
} finally {
pthread_mutex_unlock(mutex.ptr)
}
}
} First waiter thread creates the internal class Waiting<T>(val Thread, val value: T) {
var nextThread
} Where we park after pushing on to the stack and unpark all the waiters after doing a CAS to a new value. This would not require platform specific code, and could be faster even in kotlin native. |
Thanks! I had looked into doing the same with parking initially, but the following problems made me go back to "plain" posix:
Concerning multiple waiting threads, wouldn't it sufficient to fix signal to broadcast (I have changed the code snippet accordingly)? |
Park handles this internally, you can unpark a thread before it's parked and that just makes its subsequent call to park a noop.
It's pretty easy to implement on top of posix; you just have a condition variable that shares its lifetime with the thread. I didn't realize that wasn't implemented for kotlin native already. |
Why not just pthread_cond_broadcast() though? Concerning parking: doesn't managing my own wait list mean that I am taking away an opportunity for sorting out "some" qos among the waiting threads from the os? |
How about exponential backoff? Pseudocode: inline fun <T> AtomicRef<T>.loop(task: (T) -> Unit) {
repeat(OPTIMISTIC_LOOPS) {
task(get())
}
var backoff = 1.milliseconds
while (true) {
Thread.sleep(backoff)
task(get())
backoff = backoff * 2
}
} This is guaranteed to preserve progress if it's possible; when attached to with a debugger, gives a good estimate of how long this has been stuck; simple and readable. |
#3613 uncovered the whole new class of bugs for coroutines: infinite CPU-intensive spin-loops during "unexpected" exceptions at the core of the implementation.
While this particular bug was addressed mechanically (#3634), the possibility of such bugs is still there:
StackOverflowError
in system-level methods (this particular issue was addressed by https://openjdk.org/jeps/270 in Java for Java's primitives)OutOfMemoryError
from an arbitrary place of code that attempted an innocuous allocationLinkageError
due to misaligned dependency or thread death)Being an application-level framework, it is close to impossible to ensure that coroutines continue to operate bugless and preserve all the internal invariants in the face of implicit exceptions being thrown from an arbitrary line of code, so the best we can do is to make the best effort (pun intended).
What we should do is to ensure that prior to system collapse, it stays responsive (i.e. available for introspection with tools like
jps
) and graceful (i.e. it eventualy deadlocks instead of intensively burning CPUand user's pocket).In order to do that, all our spin-lock based solutions (which, contrary to many kernel-level APIs, spin in scenarios "it shouldn't take long" rather than "this one is totally safe, recoverable and interruption-safe") should degrade gracefully into sleep/yield/onSpinWait behaviour first and, as a last resort, to the full-blown thread parking later.
For now, we are aware of three such places:
DispatchedContinuation.awaitReusability
that matches racy scenarios such as "suspend (T1) resume (T2) getResult() (T1)`Mutex
expandBuffer
operation inBufferedChannel
The key here is to ensure that the solution is robust enough (i.e. that when timings are really unlucky, the solution actually works and proceeds) and doesn't obstruct the fast-path (i.e. "happy path" performance is not affected)
The text was updated successfully, but these errors were encountered: