Skip to content

Coroutine context propagation for Reactor to coroutines API migration #1377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,29 @@ public final class kotlinx/coroutines/reactive/ChannelKt {
public static synthetic fun openSubscription$default (Lorg/reactivestreams/Publisher;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
}

public abstract interface class kotlinx/coroutines/reactive/ContextInjector {
public abstract fun injectCoroutineContext (Lorg/reactivestreams/Publisher;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/ConvertKt {
public static final fun asPublisher (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/FlowKt {
public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/FlowSubscription : org/reactivestreams/Subscription {
public final field flow Lkotlinx/coroutines/flow/Flow;
public final field subscriber Lorg/reactivestreams/Subscriber;
public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
public fun cancel ()V
public fun request (J)V
}

public final class kotlinx/coroutines/reactive/PublishKt {
public static final fun publish (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
public static final fun publish (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
Expand All @@ -44,12 +62,3 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
public static final fun from (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ public final class kotlinx/coroutines/reactor/ConvertKt {
public static final fun asMono (Lkotlinx/coroutines/Job;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Mono;
}

public final class kotlinx/coroutines/reactor/FlowKt {
public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
}

public final class kotlinx/coroutines/reactor/FluxKt {
public static final fun flux (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
public static final fun flux (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Flux;
Expand Down
13 changes: 12 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import java.util.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -81,6 +82,16 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)

// ------------------------ private ------------------------

// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
private val contextInjectors: Array<ContextInjector> =
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).iterator().asSequence().toList().toTypedArray() // R8 opto

private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
contextInjectors.fold(this) { pub, contextInjector ->
contextInjector.injectCoroutineContext(pub, coroutineContext)
}

private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Expand All @@ -93,7 +104,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Subscriber<T> {
injectCoroutineContext(cont.context).subscribe(object : Subscriber<T> {
private lateinit var subscription: Subscription
private var value: T? = null
private var seenValue = false
Expand Down
15 changes: 15 additions & 0 deletions reactive/kotlinx-coroutines-reactive/src/ContextInjector.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package kotlinx.coroutines.reactive

import kotlinx.coroutines.InternalCoroutinesApi
import org.reactivestreams.Publisher
import kotlin.coroutines.CoroutineContext

/** @suppress */
@InternalCoroutinesApi
public interface ContextInjector {
/**
* Injects `ReactorContext` element from the given context into the `SubscriberContext` of the publisher.
* This API used as an indirection layer between `reactive` and `reactor` modules.
*/
public fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T>
}
112 changes: 112 additions & 0 deletions reactive/kotlinx-coroutines-reactive/src/FlowAsPublisher.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.reactive

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.reactivestreams.*
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.*

/**
* Transforms the given flow to a spec-compliant [Publisher].
*/
@ExperimentalCoroutinesApi
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)

/**
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
* [cancel] invocation cancels the original flow.
*/
@Suppress("PublisherImplementation")
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
override fun subscribe(subscriber: Subscriber<in T>?) {
if (subscriber == null) throw NullPointerException()
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
}
}

/** @suppress */
@InternalCoroutinesApi
public class FlowSubscription<T>(
@JvmField val flow: Flow<T>,
@JvmField val subscriber: Subscriber<in T>
) : Subscription {
@Volatile
private var canceled: Boolean = false
private val requested = atomic(0L)
private val producer = atomic<CancellableContinuation<Unit>?>(null)

// This is actually optimizable
private var job = GlobalScope.launch(Dispatchers.Unconfined, start = CoroutineStart.LAZY) {
try {
consumeFlow()
subscriber.onComplete()
} catch (e: Throwable) {
try {
if (e is CancellationException) {
subscriber.onComplete()
} else {
subscriber.onError(e)
}
} catch (e: Throwable) {
// Last ditch report
handleCoroutineException(coroutineContext, e)
}
}
}

private suspend fun consumeFlow() {
flow.collect { value ->
/*
* Flow is scopeless, thus if it's not active, its subscription was cancelled.
* No intermediate "child failed, but flow coroutine is not" states are allowed.
*/
coroutineContext.ensureActive()
if (requested.value == 0L) {
suspendCancellableCoroutine<Unit> {
producer.value = it
if (requested.value != 0L) it.resumeSafely()
}
}
requested.decrementAndGet()
subscriber.onNext(value)
}
}

override fun cancel() {
canceled = true
job.cancel()
}

override fun request(n: Long) {
if (n <= 0 || canceled) {
return
}
job.start()
var snapshot: Long
var newValue: Long
do {
snapshot = requested.value
newValue = snapshot + n
if (newValue <= 0L) newValue = Long.MAX_VALUE
} while (!requested.compareAndSet(snapshot, newValue))

val prev = producer.value
if (prev == null || !producer.compareAndSet(prev, null)) return
prev.resumeSafely()
}

private fun CancellableContinuation<Unit>.resumeSafely() {
val token = tryResume(Unit)
if (token != null) {
completeResume(token)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.reactive.flow
@file:JvmMultifileClass
@file:JvmName("FlowKt")

package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import java.util.*
import kotlin.coroutines.*

/**
Expand All @@ -21,13 +24,11 @@ import kotlin.coroutines.*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flights elements
* are discarded.
*/
@JvmName("from")
@ExperimentalCoroutinesApi
public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this, 1)

@FlowPreview
@JvmName("from")
@Deprecated(
message = "batchSize parameter is deprecated, use .buffer() instead to control the backpressure",
level = DeprecationLevel.ERROR,
Expand All @@ -46,7 +47,9 @@ private class PublisherAsFlow<T : Any>(
// use another channel for conflation (cannot do openSubscription)
if (capacity < 0) return super.produceImpl(scope)
// Open subscription channel directly
val channel = publisher.openSubscription(capacity)
val channel = publisher
.injectCoroutineContext(scope.coroutineContext)
.openSubscription(capacity)
val handle = scope.coroutineContext[Job]?.invokeOnCompletion(onCancelling = true) { cause ->
channel.cancel(cause?.let {
it as? CancellationException ?: CancellationException("Job was cancelled", it)
Expand All @@ -70,7 +73,7 @@ private class PublisherAsFlow<T : Any>(

override suspend fun collect(collector: FlowCollector<T>) {
val subscriber = ReactiveSubscriber<T>(capacity, requestSize)
publisher.subscribe(subscriber)
publisher.injectCoroutineContext(coroutineContext).subscribe(subscriber)
try {
var consumed = 0L
while (true) {
Expand Down Expand Up @@ -127,3 +130,11 @@ private class ReactiveSubscriber<T : Any>(
subscription.cancel()
}
}

// ContextInjector service is implemented in `kotlinx-coroutines-reactor` module only.
// If `kotlinx-coroutines-reactor` module is not included, the list is empty.
private val contextInjectors: List<ContextInjector> =
ServiceLoader.load(ContextInjector::class.java, ContextInjector::class.java.classLoader).toList()

private fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: CoroutineContext) =
contextInjectors.fold(this) { pub, contextInjector -> contextInjector.injectCoroutineContext(pub, coroutineContext) }
103 changes: 0 additions & 103 deletions reactive/kotlinx-coroutines-reactive/src/flow/FlowAsPublisher.kt

This file was deleted.

Loading