Skip to content

Make the subscriber in awaitOne less permissive #2586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 7, 2021
Merged

Conversation

dkhalanskyjb
Copy link
Collaborator

Reactive Streams' Subscriber's implementation for await* operations was assuming that the publisher is correct. Now, the implementation detects some instances of problematic behavior for publishers and reports them.

Fixes #2079

@dkhalanskyjb dkhalanskyjb requested a review from qwwdfsad March 16, 2021 13:35
}

override fun onNext(t: T) {
val sub = subscription.checkInitialized("onNext")
subscription.checkInitialized("onNext")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checkInitialized seems to be redundant

}
if (seenValue)
// TODO: decide if we want to be lenient here: after all, nothing breaks if this isn't true
moreThanOneValueProvidedException(mode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When writing a comment along with a statement, please wrap it in curly braces, otherwise it's a bit hard to read

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing here breaks #2.13. This is expected, but still worth properly documenting to know why it is here even tho it violates the spec.
Also, removing this condition does not trigger any test failures, because double-resume also throws IllegalStateException. It's worth verifying the exception message at least once

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now verifying the exception messages. I don't like what I did in order to achieve it, but I like copy-pasting exception messages into tests even less, given how brittle that approach is. Maybe you have some better suggestions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach we're consistently using is the following:

  • Do not expose exception messages to test in any way (e.g. via internal function)
  • Instead of strictly comparing the error message as a whole, typically it's enough to do something like e.message!!.contains("some_key_word_or_meaningful_part_of_the_message").

It simplifies the maintenance a bit, reduces the number of functions/string consts, but it's still brittle. Yet exception messages are rarely changed and the purpose of the test is rather fix the current behaviour

@@ -169,6 +188,8 @@ private suspend fun <T> Publisher<T>.awaitOne(

@Suppress("UNCHECKED_CAST")
override fun onComplete() {
subscription.checkInitialized("onComplete") // TODO: maybe don't enforce?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like other implementations from reactive libraries do not enforce such rule, maybe we could stick with that as well

@dkhalanskyjb dkhalanskyjb requested a review from qwwdfsad March 18, 2021 12:34
block: Subscriber<in T>.(n: Long) -> Unit,
) =
assertFailsWith<IllegalStateException> { publisher(block).operation() }.let {
message == null || it.message == message
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line doesn't actually test exception messages, the resulting boolean is ignored

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry. Fixed in the commit that changes the whole error-passing mechanism here.

}
if (seenValue)
// TODO: decide if we want to be lenient here: after all, nothing breaks if this isn't true
moreThanOneValueProvidedException(mode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach we're consistently using is the following:

  • Do not expose exception messages to test in any way (e.g. via internal function)
  • Instead of strictly comparing the error message as a whole, typically it's enough to do something like e.message!!.contains("some_key_word_or_meaningful_part_of_the_message").

It simplifies the maintenance a bit, reduces the number of functions/string consts, but it's still brittle. Yet exception messages are rarely changed and the purpose of the test is rather fix the current behaviour

@dkhalanskyjb dkhalanskyjb requested a review from qwwdfsad March 26, 2021 11:26
The implementation of Reactive Streams' Subscriber used for
`await*` operations was assuming that the publisher is correct.
Now, the implementation detects some instances of problematic
behavior for publishers and reports them.

Fixes #2079
This is a more robust approach, as exceptions are prone to being
lost in the async environments, especially if some components are
(known to be) misbehaving, so it's worth it to report the
fundamental errors to the system.
@dkhalanskyjb
Copy link
Collaborator Author

@qwwdfsad could you please check the latest commit? I spotted several errors during the final review, fixed them, and added a test to verify a new statement in a comment.

@dkhalanskyjb dkhalanskyjb requested a review from qwwdfsad April 6, 2021 14:40
@dkhalanskyjb dkhalanskyjb merged commit 86a0658 into develop Apr 7, 2021
@dkhalanskyjb dkhalanskyjb deleted the fix-2079 branch April 7, 2021 10:34
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this pull request Sep 14, 2022
The implementation of Reactive Streams' Subscriber used for
`await*` operations was assuming that the publisher is correct.
Now, the implementation detects some instances of problematic
behavior for publishers and reports them.

Fixes Kotlin#2079
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants