Skip to content

Commit 24ce99d

Browse files
committed
Fix reactorPublish's incompatibility with the RS spec
1 parent b60872b commit 24ce99d

File tree

1 file changed

+22
-4
lines changed
  • reactive/kotlinx-coroutines-reactor/src

1 file changed

+22
-4
lines changed

reactive/kotlinx-coroutines-reactor/src/Flux.kt

+22-4
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,11 @@ private fun <T> reactorPublish(
6060
scope: CoroutineScope,
6161
context: CoroutineContext = EmptyCoroutineContext,
6262
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
63-
): Publisher<T> = Publisher { subscriber ->
64-
// specification requires NPE on null subscriber
65-
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
66-
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
63+
): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? ->
64+
if (subscriber !is CoreSubscriber) {
65+
subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted."))
66+
return@onSubscribe
67+
}
6768
val currentContext = subscriber.currentContext()
6869
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
6970
val newContext = scope.newCoroutineContext(context + reactorContext)
@@ -81,3 +82,20 @@ private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { e, ctx ->
8182
}
8283
}
8384
}
85+
86+
/** The proper way to reject the subscriber, according to the
87+
* [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
88+
*/
89+
private fun <T> Subscriber<T>?.reject(t: Throwable) {
90+
if (this == null)
91+
throw NullPointerException("The subscriber can not be null")
92+
onSubscribe(object: Subscription {
93+
override fun request(n: Long) {
94+
// intentionally left blank
95+
}
96+
override fun cancel() {
97+
// intentionally left blank
98+
}
99+
})
100+
onError(t)
101+
}

0 commit comments

Comments
 (0)