Skip to content

Mutex.unlock with Unconfined dispatcher causes StackOverflowError #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wetnose opened this issue Jul 6, 2017 · 6 comments
Closed

Comments

@wetnose
Copy link

wetnose commented Jul 6, 2017

Just run the following example and look at the result

fun main(args: Array<String>) {

    val waiters = 1000

    val mutex = Mutex()
    val start = CountDownLatch(1)
    val done  = CountDownLatch(waiters)

    launch(CommonPool) {
        mutex.lock()
        try {
            start.await()
        } finally {
            mutex.unlock() // StackOverflowError
        }
    }

    repeat(waiters) {
        launch(Unconfined) {
            mutex.withLock {
                done.countDown()
            }
        }
    }

    start.countDown()
    println(done.await(1, SECONDS))
}

Could you avoid using call stack when iterating lock waiters?

@elizarov
Copy link
Contributor

elizarov commented Jul 6, 2017

I'll need to think if that is even conceptually possible to fix for Unconfined coroutines. That is actually a great example of why Unconfined is dangerous and should not be used in general code. The problem is that Unconfined coroutines cannot unwind a stack, because in order to unwind a stack you have to have some kind of a dispatcher to post your continuation for an later execution, but Unconfined dispatcher does not have one.

@elizarov elizarov changed the title Mutex.unlock causes StackOverflowError Mutex.unlock with Unconfined dispatcher causes StackOverflowError Jul 6, 2017
@wetnose
Copy link
Author

wetnose commented Jul 6, 2017

I've made a quick alternative that does not use call stack. But Mutex looks more matured and I want to use it instead of my temporary solution.

class SourceLock {

    private val queues = ConcurrentHashMap<Any, WaiterQueue>()


    suspend fun <T> withLock(src: Any, block: suspend () -> T): T {
        val lock = lock(src)
        try {
            return block()
        } finally {
            lock.unlock()
        }
    }


    suspend fun lock(src: Any): Lock {
        return suspendCoroutineOrReturn<Lock> { cont ->
            val newQ   = WaiterQueue(src)
            val waiter = newQ.Waiter(cont).apply { newQ.last.set(this) }
            while (true) {
                val oldQ = queues.putIfAbsent(src, newQ)
                if (oldQ == null) {
                    waiter.lock()
                    break
                } else {
                    if (oldQ.add(cont)) break
                }
            }
            COROUTINE_SUSPENDED
        }
    }


    private inner class WaiterQueue(val src: Any) {

        val last = AtomicReference<Waiter?>()

        fun add(cont: Continuation<Lock>): Boolean {
            while (true) {
                val prev = last.get() ?: return false
                synchronized(prev) {
                    val item = Waiter(cont)
                    if (last.compareAndSet(prev, item)) {
                        prev.next = item
                        return true
                    }
                }
            }
        }

        inner class Waiter(val cont: Continuation<Lock>) {

            var locked = AtomicBoolean() // prevent from stack utilization
            var next   : Waiter? = null

            fun next() = synchronized(this) { next!! }

            fun lock(w: Waiter = this) {
                var waiter = w
                while (true) {
                    locked.set(true)
                    waiter.cont.resume(object : Lock { override fun unlock() = unlock(waiter) })
                    if (escape(waiter)) break
                    waiter = waiter.next()
                }
            }

            fun unlock(waiter: Waiter) {
                if (!escape(waiter)) lock(waiter.next())
            }

            fun escape(waiter: Waiter): Boolean {
                return locked.compareAndSet(true, false)
                        || last.compareAndSet(waiter, null) && queues.remove(src, this@WaiterQueue)
            }
        }
    }


    interface Lock {
        fun unlock()
    }
}

@elizarov
Copy link
Contributor

elizarov commented Jul 6, 2017

Thanks for the code. This is an interesting approach to solve the problem of StockOverflow on mutex release. I'll see if something along these lines could be integrated into kotlinx.coroutines Mutex implementation.

@elizarov
Copy link
Contributor

I've committed a fixed (implemented using your idea) to the develop branch: 11c140a

@wetnose
Copy link
Author

wetnose commented Jul 22, 2017

Thank you!

@elizarov
Copy link
Contributor

Released in version 0.17

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants