-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Flow.transformWhile operator #2066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a small documentation mistake.
internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: suspend (value: T) -> Boolean) { | ||
val collector = object : FlowCollector<T> { | ||
override suspend fun emit(value: T) { | ||
if (!predicate(value)) throw AbortFlowException(this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move throw
to the separate line:
if (!predicate(value)) {
throw AbortFlowException(this)
}
It would make other people debugging experience a bit better (they can put a breakpoint on throw
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Also, all flow-truncating operators are refactored via a common internal collectWhile operator that properly uses AbortFlowException and checks for its ownership, so that we don't have to look for bugs in interactions between all those operators (and zip, too, which is also flow-truncating). Fixes #2065
Co-authored-by: EdwarDDay <[email protected]>
* Restore fast tail-call impl for take * Benchmark for takeWhile * Test flow invariants for takeWhile
b9c352c
to
fdd724d
Compare
(0L..Long.MAX_VALUE).asFlow().takeWhileViaCollectWhile { it < size }.consume() | ||
} | ||
|
||
// Direct implemenatation by checking predicate and throwing AbortFlowException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
benchmarks/src/jmh/kotlin/benchmarks/flow/TakeWhileBenchmark.kt
Outdated
Show resolved
Hide resolved
Co-authored-by: Louis CAD <[email protected]>
Also, most flow-truncating operators are refactored via a common internal collectWhile operator that properly uses AbortFlowException and checks for its ownership, so that we don't have to look for bugs in interactions between all those operators (and zip, too, which is also flow-truncating). But `take` operator still users a custom highly-tuned implementation. Fixes Kotlin#2065 Co-authored-by: EdwarDDay <[email protected]> Co-authored-by: Louis CAD <[email protected]>
Also, all flow-truncating operators are refactored via a common internal collectWhile operator that properly uses AbortFlowException and checks for its ownership, so that we don't have to look for bugs in interactions between all those operators (and zip, too, which is also flow-truncating).
Fixes #2065