Skip to content

Commit 061dfa8

Browse files
committed
Merge remote-tracking branch 'origin/develop'
2 parents 17856f5 + e4b6f09 commit 061dfa8

File tree

4 files changed

+65
-21
lines changed

4 files changed

+65
-21
lines changed

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt

+35-10
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,11 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.CancellableContinuation
20-
import kotlinx.coroutines.experimental.DisposableHandle
19+
import kotlinx.coroutines.experimental.*
2120
import kotlinx.coroutines.experimental.internal.*
22-
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
23-
import kotlinx.coroutines.experimental.removeOnCancel
24-
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
25-
import kotlinx.coroutines.experimental.selects.SelectClause1
26-
import kotlinx.coroutines.experimental.selects.SelectClause2
27-
import kotlinx.coroutines.experimental.selects.SelectInstance
28-
import kotlinx.coroutines.experimental.suspendAtomicCancellableCoroutine
29-
import kotlin.coroutines.experimental.startCoroutine
21+
import kotlinx.coroutines.experimental.intrinsics.*
22+
import kotlinx.coroutines.experimental.selects.*
23+
import kotlin.coroutines.experimental.*
3024

3125
/**
3226
* Abstract send channel. It is a base class for all send channel implementations.
@@ -374,6 +368,37 @@ public abstract class AbstractSendChannel<E> : SendChannel<E> {
374368
}
375369
}
376370

371+
// ------ debug ------
372+
373+
public override fun toString() =
374+
"$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
375+
376+
private val queueDebugStateString: String
377+
get() {
378+
val head = queue.next
379+
if (head === queue) return "EmptyQueue"
380+
var result = when (head) {
381+
is Closed<*> -> head.toString()
382+
is Receive<*> -> "ReceiveQueued"
383+
is Send -> "SendQueued"
384+
else -> "UNEXPECTED:$head" // should not happen
385+
}
386+
val tail = queue.prev
387+
if (tail !== head) {
388+
result += ",queueSize=${countQueueSize()}"
389+
if (tail is Closed<*>) result += ",closedForSend=$tail"
390+
}
391+
return result
392+
}
393+
394+
private fun countQueueSize(): Int {
395+
var size = 0
396+
queue.forEach<LockFreeLinkedListNode> { size++ }
397+
return size
398+
}
399+
400+
protected open val bufferDebugString: String get() = ""
401+
377402
// ------ private ------
378403

379404
private class SendSelect<E, R>(

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
20-
import kotlinx.coroutines.experimental.selects.SelectInstance
21-
import java.util.concurrent.CopyOnWriteArrayList
22-
import java.util.concurrent.locks.ReentrantLock
23-
import kotlin.concurrent.withLock
19+
import kotlinx.coroutines.experimental.selects.*
20+
import java.util.concurrent.*
21+
import java.util.concurrent.locks.*
22+
import kotlin.concurrent.*
2423

2524
/**
2625
* Broadcast channel with array buffer of a fixed [capacity].
@@ -353,4 +352,9 @@ class ArrayBroadcastChannel<E>(
353352
return result
354353
}
355354
}
355+
356+
// ------ debug ------
357+
358+
override val bufferDebugString: String
359+
get() = "(buffer:capacity=${buffer.size},size=$size)"
356360
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayChannel.kt

+8-4
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616

1717
package kotlinx.coroutines.experimental.channels
1818

19-
import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
20-
import kotlinx.coroutines.experimental.selects.SelectInstance
21-
import java.util.concurrent.locks.ReentrantLock
22-
import kotlin.concurrent.withLock
19+
import kotlinx.coroutines.experimental.selects.*
20+
import java.util.concurrent.locks.*
21+
import kotlin.concurrent.*
2322

2423
/**
2524
* Channel with array buffer of a fixed [capacity].
@@ -245,4 +244,9 @@ public open class ArrayChannel<E>(
245244
// then clean all queued senders
246245
super.cleanupSendQueueOnCancel()
247246
}
247+
248+
// ------ debug ------
249+
250+
override val bufferDebugString: String
251+
get() = "(buffer:capacity=${buffer.size},size=$size)"
248252
}

core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

+13-2
Original file line numberDiff line numberDiff line change
@@ -577,8 +577,19 @@ public inline suspend fun <E, C : SendChannel<E>> ReceiveChannel<E>.filterIndexe
577577
* This function [consumes][consume] all elements of the original [ReceiveChannel].
578578
*/
579579
// todo: mark predicate with crossinline modifier when it is supported: https://youtrack.jetbrains.com/issue/KT-19159
580-
public fun <E> ReceiveChannel<E>.filterNot(predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
581-
filter { !predicate(it) }
580+
public fun <E> ReceiveChannel<E>.filterNot(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
581+
filter(context) { !predicate(it) }
582+
583+
/**
584+
* Returns a channel containing all elements not matching the given [predicate].
585+
*
586+
* The operation is _intermediate_ and _stateless_.
587+
* This function [consumes][consume] all elements of the original [ReceiveChannel].
588+
*
589+
* @suppress **Deprecated**: For binary compatibility only
590+
*/
591+
@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
592+
public fun <E> ReceiveChannel<E>.filterNot(predicate: suspend (E) -> Boolean): ReceiveChannel<E> = filterNot(predicate = predicate)
582593

583594
/**
584595
* Returns a channel containing all elements that are not `null`.

0 commit comments

Comments
 (0)