Skip to content

Commit 5cbef7b

Browse files
committed
Fix reactorPublish's incompatibility with the RS spec
1 parent 40de24a commit 5cbef7b

File tree

1 file changed

+22
-4
lines changed
  • reactive/kotlinx-coroutines-reactor/src

1 file changed

+22
-4
lines changed

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+22-4
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ private fun <T> reactorPublish(
4444
scope: CoroutineScope,
4545
context: CoroutineContext = EmptyCoroutineContext,
4646
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
47-
): Publisher<T> = Publisher { subscriber ->
48-
// specification requires NPE on null subscriber
49-
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
50-
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
47+
): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? ->
48+
if (subscriber !is CoreSubscriber) {
49+
subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted."))
50+
return@onSubscribe
51+
}
5152
val currentContext = subscriber.currentContext()
5253
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
5354
val newContext = scope.newCoroutineContext(context + reactorContext)
@@ -67,6 +68,23 @@ private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ct
6768
}
6869
}
6970

71+
/** The proper way to reject the subscriber, according to
72+
* [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
73+
*/
74+
private fun <T> Subscriber<T>?.reject(t: Throwable) {
75+
if (this == null)
76+
throw NullPointerException("The subscriber can not be null")
77+
onSubscribe(object: Subscription {
78+
override fun request(n: Long) {
79+
// intentionally left blank
80+
}
81+
override fun cancel() {
82+
// intentionally left blank
83+
}
84+
})
85+
onError(t)
86+
}
87+
7088
@Deprecated(
7189
message = "CoroutineScope.flux is deprecated in favour of top-level flux",
7290
level = DeprecationLevel.HIDDEN,

0 commit comments

Comments
 (0)