Skip to content

Graceful degradation of spinlock-like implementations during internal errors or system malfunction #3635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
qwwdfsad opened this issue Feb 21, 2023 · 9 comments

Comments

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Feb 21, 2023

#3613 uncovered the whole new class of bugs for coroutines: infinite CPU-intensive spin-loops during "unexpected" exceptions at the core of the implementation.

While this particular bug was addressed mechanically (#3634), the possibility of such bugs is still there:

  • StackOverflowError in system-level methods (this particular issue was addressed by https://openjdk.org/jeps/270 in Java for Java's primitives)
  • OutOfMemoryError from an arbitrary place of code that attempted an innocuous allocation
  • An arbitrary programmatic bug in our own implementation
  • Any other "implicit" exception (whether it's NPE during non-trivial data race, LinkageError due to misaligned dependency or thread death)

Being an application-level framework, it is close to impossible to ensure that coroutines continue to operate bugless and preserve all the internal invariants in the face of implicit exceptions being thrown from an arbitrary line of code, so the best we can do is to make the best effort (pun intended).
What we should do is to ensure that prior to system collapse, it stays responsive (i.e. available for introspection with tools like jps) and graceful (i.e. it eventualy deadlocks instead of intensively burning CPU and user's pocket).

In order to do that, all our spin-lock based solutions (which, contrary to many kernel-level APIs, spin in scenarios "it shouldn't take long" rather than "this one is totally safe, recoverable and interruption-safe") should degrade gracefully into sleep/yield/onSpinWait behaviour first and, as a last resort, to the full-blown thread parking later.

For now, we are aware of three such places:

  • Waiting for reusability token in DispatchedContinuation.awaitReusability that matches racy scenarios such as "suspend (T1) resume (T2) getResult() (T1)`
  • Waiting for ownership token of owner-supplied operation in Mutex
  • Waiting for logical expandBuffer operation in BufferedChannel

The key here is to ensure that the solution is robust enough (i.e. that when timings are really unlucky, the solution actually works and proceeds) and doesn't obstruct the fast-path (i.e. "happy path" performance is not affected)

@charlesmunger
Copy link

charlesmunger commented Feb 21, 2023

degrade gracefully into sleep/yield/onSpinWait behaviour first and, as a last resort, to the full-blown thread parking later

I would much rather use only onSpinWait, and past a certain spin count use park. Two threads calling yield in a loop can saturate the CPU without actually allowing whatever thread is supposed to be releasing them to run. Sleep is usually implemented by the same code as park, but is less efficient because it won't get woken up eagerly.

the solution actually works and proceeds) and doesn't obstruct the fast-path (i.e. "happy path" performance is not affected)

This will generally not be possible to do; supporting a non-spin waiting system requires use of a compare-and-set or get-and-set call which is more expensive than the current blind write. I would much rather have spin-free code than save a tiny number of cycles in this bit of infrastructure. https://abseil.io/docs/cpp/atomic_danger

@qwwdfsad
Copy link
Collaborator Author

Good point, yield indeed may increase CPU consumption in an unpredictable manner.

This will generally not be possible to do; supporting a non-spin waiting system requires use of a compare-and-set or get-and-set call which is more expensive than the current blind write

Right. Though the CAS on its own is unlikely to seriously contribute in the system performance (AFAIR uncontended CAS'es are pretty close to regular barrier'd writes on modern architectures) What I'd like to achieve is to ensure that randevouz logic (specifically, park/unpark pairs) do no interfere with regular code-paths, so the overall amortized operations throughput/latency is mostly unaffected.

@charlesmunger
Copy link

Following up on this - check out slide 56 of this presentation which shows AMD's recommendations, summarized as:

  • Don't spin, use mutexes
  • If you are going to spin anyway:
    • Use the pause instruction (this is Thread.onSpinWait())
    • Alignas(64) lock variable (you don't have much control over this)
    • Test and test-and-set (this means to do a relaxed read, and then attempt a non-relaxed CAS based on the results)
    • The OS may be unaware that threads are spinning; scheduling efficiency and battery life may be lost

I really think you should not have any unbounded spin sections. The tools that could be used for scheduler-cooperative spinlocking are not available in the JVM, and certainly not on all platforms.

@stefanhaustein
Copy link

stefanhaustein commented Sep 5, 2024

If I understand this correctly, in all the code locations referenced in the bug, an atomic reference is used to communicate state updates between threads. Checks on the state are performed in an active waiting loop; exiting the loop is typically conditioned on some precondition (the active waiting part) – in addition to being able to change the state safely.

The first case is the bug description is already using the atomicReference loop() extension (which doesn't seem to do anything other than spinning).

So to address the main issue without significant code changes, we could look into the feasibility of providing an atomic reference that provides a loop instruction which falls back to "safe" waiting if spinning for too long, e.g exceeding a given amount of spins. Of course cleaner individual solutions would be preferable, but I wonder if something like this could get us out of the bad spot quickly....

Perhaps something along these lines:

/**
 * Sketch of a AtomicReference drop-in with a loop() method that switches from spinning to waiting
 * for a value change after a set number of spins.
 */
value class LoopableAtomicReference<T>
private constructor(private val atomicRef: AtomicReference<Any?>) {

  val value: T
    get() {
      val result = atomicRef.value
      return (if (result is Waiting<*>) result.value else result) as T
    }

  fun compareAndSet(expected: T, newValue: T): Boolean {
    if (atomicRef.compareAndSet(expected, newValue)) {
      return true
    }
    val value = atomicRef.value
    if (value !is Waiting<*> || value.value != expected) {
      return false
    }
    // value is an instance of Waiting and the wrapped value is equal to expected:
    // Lock the mutex, try setting and -- if successful -- signal the condition.
    val waiting = value as Waiting<*>
    pthread_mutex_lock(waiting.mutex.ptr)
    if (!atomicRef.compareAndSet(value, newValue)) {
      pthread_mutex_unlock(waiting.mutex.ptr)
      return false
    }
    pthread_cond_broadcast(waiting.condition.ptr)      // EDIT:  was signal()
    pthread_mutex_unlock(waiting.mutex.ptr)
    return true
  }

  /** task() is expected to jump out of the loop e.g. via a higher level return statement. */
  // TODO: should task return a boolean indicating whether the loop should continue?
  fun loop(task: (T) -> Unit) {
    for (i in 0..MAX_BUSY_WAIT_SPINS) {
      task(value)
    }

    while (true) {
      // Call the task with the current value first: If we weren't sucessful below because of a
      // value change in the meantime, that means it makes sense to try again.
      val oldValue = atomicRef.value
      task((if (oldValue is Waiting<*>) oldValue.value else oldValue) as T)

      // Wrap the value in a Waiting instance
      val waiting: Waiting<*>
      if (oldValue is Waiting<*>) {
        waiting = oldValue
        pthread_mutex_lock(waiting.mutex.ptr)
        if (atomicRef.value != oldValue) {
          // Try again if the value has changed in the meantime
          pthread_mutex_unlock(waiting.mutex.ptr)
          continue
        }
      } else {
        waiting = Waiting(oldValue)
        pthread_mutex_lock(waiting.mutex.ptr)
        if (!atomicRef.compareAndSet(oldValue, waiting)) {
          // Try again if the value has changed in the meantime
          pthread_mutex_unlock(waiting.mutex.ptr)
          continue
        }
      }

      // Wait for a change
      pthread_cond_wait(waiting.condition.ptr, waiting.mutex.ptr)
      pthread_mutex_unlock(waiting.mutex.ptr)
    }
  }

  internal class Waiting<T>(val value: T) {
    val mutex: pthread_mutex_t = nativeHeap.alloc()
    val condition: pthread_cond_t = nativeHeap.alloc()

    init {
      pthread_mutex_init(mutex.ptr, DEFAULT_MUTEX_ATTR.ptr)
      pthread_cond_init(condition.ptr, DEFAULT_COND_ATTR.ptr)
    }
  }

  companion object {
    const val MAX_BUSY_WAIT_SPINS = 1000
    private val DEFAULT_COND_ATTR: pthread_condattr_t = nativeHeap.alloc()
    private val DEFAULT_MUTEX_ATTR: pthread_mutexattr_t = nativeHeap.alloc()

    init {
      pthread_mutexattr_init(DEFAULT_MUTEX_ATTR.ptr)
    }

    fun <T> LoopableAtomicReference(value: T) =
      LoopableAtomicReference<T>(AtomicReference<Any?>(value))
  }
}

@charlesmunger
Copy link

That works, but we should have an explicit open signal so that we can have more than one waiting thread if needed. So we'd have either:

  internal class Waiting<T>(val value: T) {
    val mutex: pthread_mutex_t = nativeHeap.alloc()
    val condition: pthread_cond_t = nativeHeap.alloc()
    var open;

    init {
      pthread_mutex_init(mutex.ptr, DEFAULT_MUTEX_ATTR.ptr)
      pthread_cond_init(condition.ptr, DEFAULT_COND_ATTR.ptr)
    }

    func notify() {
      pthread_mutex_lock(mutex.ptr)
      try {
        open = true;
        pthread_cond_signal(waiting.condition.ptr)
      } finally {
        pthread_mutex_unlock(mutex.ptr)
      }
    }

    func await() {
      pthread_mutex_lock(mutex.ptr)
      try {
        while (!open) {
          pthread_cond_wait(waiting.condition.ptr)
        }
      } finally {
        pthread_mutex_unlock(mutex.ptr)
      }
    }
  }

First waiter thread creates the Waiting and CAS-es to it, subsequent threads call await after seeing it. Or this could be implemented portably and without needing to dispose of native pointers using a Treiber stack:

  internal class Waiting<T>(val Thread, val value: T) {
    var nextThread
  }

Where we park after pushing on to the stack and unpark all the waiters after doing a CAS to a new value. This would not require platform specific code, and could be faster even in kotlin native.

@stefanhaustein
Copy link

Thanks! I had looked into doing the same with parking initially, but the following problems made me go back to "plain" posix:

  • A potential race condition: The thread registers itself for parking, then parks itself. How do we prevent another thread from trying to unpark the thread between these steps, i.e. before it's actually parked? Would "unpark" reliably fail or block in this case?
  • I did not find a platform-independent parking API for Kotlin? The Java equivalents for the posix calls used should be relatively obvious...

Concerning multiple waiting threads, wouldn't it sufficient to fix signal to broadcast (I have changed the code snippet accordingly)?

@charlesmunger
Copy link

A potential race condition: The thread registers itself for parking, then parks itself. How do we prevent another thread from trying to unpark the thread between these steps, i.e. before it's actually parked? Would "unpark" reliably fail or block in this case?

Park handles this internally, you can unpark a thread before it's parked and that just makes its subsequent call to park a noop.

I did not find a platform-independent parking API for Kotlin? The Java equivalents for the posix calls used should be relatively obvious...

It's pretty easy to implement on top of posix; you just have a condition variable that shares its lifetime with the thread. I didn't realize that wasn't implemented for kotlin native already.

@stefanhaustein
Copy link

stefanhaustein commented Oct 3, 2024

Why not just pthread_cond_broadcast() though? Concerning parking: doesn't managing my own wait list mean that I am taking away an opportunity for sorting out "some" qos among the waiting threads from the os?

@dkhalanskyjb
Copy link
Collaborator

How about exponential backoff? Pseudocode:

inline fun <T> AtomicRef<T>.loop(task: (T) -> Unit) {
  repeat(OPTIMISTIC_LOOPS) {
    task(get())
  }
  var backoff = 1.milliseconds
  while (true) {
    Thread.sleep(backoff)
    task(get())
    backoff = backoff * 2
  }
}

This is guaranteed to preserve progress if it's possible; when attached to with a debugger, gives a good estimate of how long this has been stuck; simple and readable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants