Skip to content

Replace hand-rolled ArrayQueue with ArrayDeque in standard library in … #3438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
* Queue used by [Dispatchers.Unconfined] tasks.
* These tasks are thread-local for performance and take precedence over the rest of the queue.
*/
private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null

/**
* Processes next event in this event loop.
Expand All @@ -49,7 +49,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
* **NOTE**: Must be invoked only from the event loop's thread
* (no check for performance reasons, may be added in the future).
*/
public open fun processNextEvent(): Long {
open fun processNextEvent(): Long {
if (!processUnconfinedEvent()) return Long.MAX_VALUE
return 0
}
Expand All @@ -59,10 +59,10 @@ internal abstract class EventLoop : CoroutineDispatcher() {
protected open val nextTime: Long
get() {
val queue = unconfinedQueue ?: return Long.MAX_VALUE
return if (queue.isEmpty) Long.MAX_VALUE else 0L
return if (queue.isEmpty()) Long.MAX_VALUE else 0L
}

public fun processUnconfinedEvent(): Boolean {
fun processUnconfinedEvent(): Boolean {
val queue = unconfinedQueue ?: return false
val task = queue.removeFirstOrNull() ?: return false
task.run()
Expand All @@ -74,27 +74,27 @@ internal abstract class EventLoop : CoroutineDispatcher() {
* By default, event loop implementation is thread-local and should not processed in the context
* (current thread's event loop should be processed instead).
*/
public open fun shouldBeProcessedFromContext(): Boolean = false
open fun shouldBeProcessedFromContext(): Boolean = false

/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
* into the current event loop.
*/
public fun dispatchUnconfined(task: DispatchedTask<*>) {
fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?:
ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
queue.addLast(task)
}

public val isActive: Boolean
val isActive: Boolean
get() = useCount > 0

public val isUnconfinedLoopActive: Boolean
val isUnconfinedLoopActive: Boolean
get() = useCount >= delta(unconfined = true)

// May only be used from the event loop's thread
public val isUnconfinedQueueEmpty: Boolean
get() = unconfinedQueue?.isEmpty ?: true
val isUnconfinedQueueEmpty: Boolean
get() = unconfinedQueue?.isEmpty() ?: true

private fun delta(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L
Expand Down Expand Up @@ -200,7 +200,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
}
}

protected override val nextTime: Long
override val nextTime: Long
get() {
if (super.nextTime == 0L) return 0L
val queue = _queue.value
Expand All @@ -227,7 +227,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
rescheduleAllDelayed()
}

public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
Expand Down Expand Up @@ -283,7 +283,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
return nextTime
}

public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

open fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
Expand Down Expand Up @@ -362,7 +362,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {

}

public fun schedule(now: Long, delayedTask: DelayedTask) {
fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
Expand Down Expand Up @@ -481,7 +481,6 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
final override fun dispose() {
val heap = _heap
if (heap === DISPOSED_TASK) return // already disposed
@Suppress("UNCHECKED_CAST")
(heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
_heap = DISPOSED_TASK // never add again to any heap
}
Expand Down Expand Up @@ -530,7 +529,7 @@ internal expect fun createEventLoop(): EventLoop
internal expect fun nanoTime(): Long

internal expect object DefaultExecutor {
public fun enqueue(task: Runnable)
fun enqueue(task: Runnable)
}

/**
Expand Down
52 changes: 0 additions & 52 deletions kotlinx-coroutines-core/common/src/internal/ArrayQueue.kt

This file was deleted.

6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/js/src/JSDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private class WindowMessageQueue(private val window: Window) : MessageQueue() {
*
* Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
*/
internal abstract class MessageQueue : ArrayQueue<Runnable>() {
internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
private var scheduled = false

Expand All @@ -138,7 +138,7 @@ internal abstract class MessageQueue : ArrayQueue<Runnable>() {
abstract fun reschedule()

fun enqueue(element: Runnable) {
addLast(element)
add(element)
if (!scheduled) {
scheduled = true
schedule()
Expand All @@ -153,7 +153,7 @@ internal abstract class MessageQueue : ArrayQueue<Runnable>() {
element.run()
}
} finally {
if (isEmpty) {
if (isEmpty()) {
scheduled = false
} else {
reschedule()
Expand Down
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/js/src/Window.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import org.w3c.dom.Window

/**
Expand Down Expand Up @@ -35,8 +34,8 @@ private fun Window.asWindowAnimationQueue(): WindowAnimationQueue =
private class WindowAnimationQueue(private val window: Window) {
private val dispatcher = window.asCoroutineDispatcher()
private var scheduled = false
private var current = ArrayQueue<CancellableContinuation<Double>>()
private var next = ArrayQueue<CancellableContinuation<Double>>()
private var current = ArrayDeque<CancellableContinuation<Double>>()
private var next = ArrayDeque<CancellableContinuation<Double>>()
private var timestamp = 0.0

fun enqueue(cont: CancellableContinuation<Double>) {
Expand Down
22 changes: 11 additions & 11 deletions kotlinx-coroutines-core/js/test/MessageQueueTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,41 +36,41 @@ class MessageQueueTest {

@Test
fun testBasic() {
assertTrue(queue.isEmpty)
assertTrue(queue.isEmpty())
queue.enqueue(Box(1))
assertFalse(queue.isEmpty)
assertFalse(queue.isEmpty())
assertTrue(scheduled)
queue.enqueue(Box(2))
assertFalse(queue.isEmpty)
assertFalse(queue.isEmpty())
scheduled = false
queue.process()
assertEquals(listOf(1, 2), processed)
assertFalse(scheduled)
assertTrue(queue.isEmpty)
assertTrue(queue.isEmpty())
}

@Test fun testRescheduleFromProcess() {
assertTrue(queue.isEmpty)
assertTrue(queue.isEmpty())
queue.enqueue(ReBox(1))
assertFalse(queue.isEmpty)
assertFalse(queue.isEmpty())
assertTrue(scheduled)
queue.enqueue(ReBox(2))
assertFalse(queue.isEmpty)
assertFalse(queue.isEmpty())
scheduled = false
queue.process()
assertEquals(listOf(1, 2, 11, 12), processed)
assertFalse(scheduled)
assertTrue(queue.isEmpty)
assertTrue(queue.isEmpty())
}

@Test
fun testResizeAndWrap() {
repeat(10) { phase ->
val n = 10 * (phase + 1)
assertTrue(queue.isEmpty)
assertTrue(queue.isEmpty())
repeat(n) {
queue.enqueue(Box(it))
assertFalse(queue.isEmpty)
assertFalse(queue.isEmpty())
assertTrue(scheduled)
}
var countYields = 0
Expand All @@ -84,4 +84,4 @@ class MessageQueueTest {
processed.clear()
}
}
}
}