Skip to content

Commit 17ce10d

Browse files
authored
Update coroutines version and usage (#126)
* coroutines 1.4.2-native-mt * Use Flow.stateIn operator in reconnectable implementation * use CompletableJob in Cancellable interface for better connection state handling * use CompletableJob.completeExceptionally instead of Job.cancel in places, where error should happen * use new channel API with onUndeliveredElement * introduce SendChannel.safeOffer for better releasing of frames/payloads if channel is closed * better channel handing in TCP and local transports
1 parent 3353ab4 commit 17ce10d

File tree

24 files changed

+141
-100
lines changed

24 files changed

+141
-100
lines changed

examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ fun NodeJsTcpServerTransport(port: Int, onStart: () -> Unit = {}): ServerTranspo
5757
// nodejs TCP transport connection - may not work in all cases, not tested properly
5858
@OptIn(ExperimentalCoroutinesApi::class, TransportApi::class)
5959
class NodeJsTcpConnection(private val socket: Socket) : Connection {
60-
override val job: Job = Job()
60+
override val job: CompletableJob = Job()
6161

6262
private val sendChannel = Channel<ByteReadPacket>(8)
6363
private val receiveChannel = Channel<ByteReadPacket>(8)

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ group=io.rsocket.kotlin
1919
version=0.12.0
2020

2121
#Versions
22-
kotlinVersion=1.4.20
22+
kotlinVersion=1.4.21
2323
ktorVersion=1.4.3
24-
kotlinxCoroutinesVersion=1.3.9-native-mt-2
24+
kotlinxCoroutinesVersion=1.4.2-native-mt
2525
kotlinxAtomicfuVersion=0.14.4
2626
kotlinxSerializationVersion=1.0.1
2727
kotlinxBenchmarkVersion=0.2.0-dev-20

playground/src/commonMain/kotlin/streams.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@ import io.rsocket.kotlin.*
1818
import kotlinx.coroutines.*
1919
import kotlinx.coroutines.channels.*
2020
import kotlinx.coroutines.flow.*
21-
import kotlin.coroutines.*
2221

2322
@ExperimentalStreamsApi
2423
private suspend fun s() {
2524
val flow = flow {
26-
val strategy = coroutineContext[RequestStrategy]!!.provide()
25+
val strategy = currentCoroutineContext()[RequestStrategy]!!.provide()
2726
var i = strategy.firstRequest()
2827
println("INIT: $i")
2928
var r = 0

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Cancelable.kt renamed to rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Cancellable.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@ package io.rsocket.kotlin
1818

1919
import kotlinx.coroutines.*
2020

21-
interface Cancelable {
22-
val job: Job
21+
interface Cancellable {
22+
val job: CompletableJob
2323
}
2424

25-
val Cancelable.isActive: Boolean get() = job.isActive
26-
fun Cancelable.cancel(cause: CancellationException? = null): Unit = job.cancel(cause)
27-
fun Cancelable.cancel(message: String, cause: Throwable? = null): Unit = job.cancel(message, cause)
28-
suspend fun Cancelable.join(): Unit = job.join()
29-
suspend fun Cancelable.cancelAndJoin(): Unit = job.cancelAndJoin()
25+
val Cancellable.isActive: Boolean get() = job.isActive
26+
suspend fun Cancellable.join(): Unit = job.join()
27+
suspend fun Cancellable.cancelAndJoin(): Unit = job.cancelAndJoin()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import io.rsocket.kotlin.frame.*
2525
* That interface isn't stable for inheritance.
2626
*/
2727
@TransportApi
28-
interface Connection : Cancelable {
28+
interface Connection : Cancellable {
2929

3030
@DangerousInternalIoApi
3131
val pool: ObjectPool<ChunkBuffer>

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import io.ktor.utils.io.core.*
2020
import io.rsocket.kotlin.payload.*
2121
import kotlinx.coroutines.flow.*
2222

23-
interface RSocket : Cancelable {
23+
interface RSocket : Cancellable {
2424

2525
suspend fun metadataPush(metadata: ByteReadPacket) {
2626
metadata.release()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketRequestHandler.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class RSocketRequestHandlerBuilder internal constructor() {
5353
requestChannel = block
5454
}
5555

56-
internal fun build(job: Job): RSocket =
56+
internal fun build(job: CompletableJob): RSocket =
5757
RSocketRequestHandler(job, metadataPush, fireAndForget, requestResponse, requestStream, requestChannel)
5858
}
5959

@@ -65,7 +65,7 @@ fun RSocketRequestHandler(parentJob: Job? = null, configure: RSocketRequestHandl
6565
}
6666

6767
private class RSocketRequestHandler(
68-
override val job: Job,
68+
override val job: CompletableJob,
6969
private val metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null,
7070
private val fireAndForget: (suspend RSocket.(payload: Payload) -> Unit)? = null,
7171
private val requestResponse: (suspend RSocket.(payload: Payload) -> Payload)? = null,

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,10 @@ public class RSocketConnectorBuilder internal constructor() {
100100
)
101101

102102
private companion object {
103-
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor { EmptyRSocket }
103+
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor { EmptyRSocket() }
104104

105-
private object EmptyRSocket : RSocket {
106-
override val job: Job = NonCancellable
105+
private class EmptyRSocket : RSocket {
106+
override val job: CompletableJob = Job()
107107
}
108108
}
109109
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class RSocketServer internal constructor(
6868

6969
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
7070
sendFrame(ErrorFrame(0, error))
71-
cancel("Setup failed", error)
71+
job.completeExceptionally(error)
7272
throw error
7373
}
7474
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/ReconnectableRSocket.kt

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,40 +27,40 @@ import kotlinx.coroutines.flow.*
2727
internal typealias ReconnectPredicate = suspend (cause: Throwable, attempt: Long) -> Boolean
2828

2929
@Suppress("FunctionName")
30-
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
30+
@OptIn(FlowPreview::class)
3131
internal suspend fun ReconnectableRSocket(
3232
logger: Logger,
3333
connect: suspend () -> RSocket,
3434
predicate: ReconnectPredicate,
3535
): RSocket {
36-
val state = MutableStateFlow<ReconnectState>(ReconnectState.Connecting)
37-
38-
val job =
36+
val job = Job()
37+
val state =
3938
connect.asFlow()
4039
.map<RSocket, ReconnectState> { ReconnectState.Connected(it) } //if connection established - state = connected
4140
.onStart { emit(ReconnectState.Connecting) } //init - state = connecting
42-
.retryWhen { cause, attempt ->
41+
.retryWhen { cause, attempt -> //reconnection logic
4342
logger.debug(cause) { "Connection establishment failed, attempt: $attempt. Trying to reconnect..." }
4443
predicate(cause, attempt)
45-
} //reconnection logic
46-
.catch {
44+
}
45+
.catch { //reconnection failed - state = failed
4746
logger.debug(it) { "Reconnection failed" }
4847
emit(ReconnectState.Failed(it))
49-
} //reconnection failed - state = failed
50-
.mapNotNull {
51-
state.value = it //set state //TODO replace with Flow.stateIn when coroutines 1.4.0-native-mt will be released
48+
}
49+
.transform { value ->
50+
emit(value) //emit before any action, to pass value directly to state
5251

53-
when (it) {
52+
when (value) {
5453
is ReconnectState.Connected -> {
5554
logger.debug { "Connection established" }
56-
it.rSocket.join() //await for connection completion
55+
value.rSocket.join() //await for connection completion
5756
logger.debug { "Connection closed. Reconnecting..." }
5857
}
59-
is ReconnectState.Failed -> throw it.error //reconnect failed, cancel job
60-
ReconnectState.Connecting -> null //skip, still waiting for new connection
58+
is ReconnectState.Failed -> job.completeExceptionally(value.error) //reconnect failed, fail job
59+
ReconnectState.Connecting -> Unit //skip, still waiting for new connection
6160
}
6261
}
63-
.launchRestarting() //reconnect if old connection completed/failed
62+
.restarting() //reconnect if old connection completed
63+
.stateIn(CoroutineScope(Dispatchers.Unconfined + job))
6464

6565
//await first connection to fail fast if something
6666
state.mapNotNull {
@@ -74,27 +74,16 @@ internal suspend fun ReconnectableRSocket(
7474
return ReconnectableRSocket(job, state)
7575
}
7676

77-
private fun Flow<*>.launchRestarting(): Job = GlobalScope.launch(Dispatchers.Unconfined) {
78-
while (isActive) {
79-
try {
80-
collect()
81-
} catch (e: Throwable) {
82-
// KLUDGE: K/N
83-
cancel("Reconnection failed", e)
84-
break
85-
}
86-
}
87-
}
77+
private fun Flow<ReconnectState>.restarting(): Flow<ReconnectState> = flow { while (true) emitAll(this@restarting) }
8878

8979
private sealed class ReconnectState {
9080
object Connecting : ReconnectState()
9181
data class Failed(val error: Throwable) : ReconnectState()
9282
data class Connected(val rSocket: RSocket) : ReconnectState()
9383
}
9484

95-
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
9685
private class ReconnectableRSocket(
97-
override val job: Job,
86+
override val job: CompletableJob,
9887
private val state: StateFlow<ReconnectState>,
9988
) : RSocket {
10089

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal
1919
import io.ktor.utils.io.core.*
2020
import kotlinx.coroutines.*
2121
import kotlinx.coroutines.channels.*
22+
import kotlin.native.concurrent.*
2223

2324
internal inline fun <T> Closeable.closeOnError(block: () -> T): T {
2425
try {
@@ -33,9 +34,27 @@ internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
3334
cancel(cause?.let { it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) })
3435
}
3536

37+
//TODO Can be removed after fix of https://github.com/Kotlin/kotlinx.coroutines/issues/2435
3638
internal fun ReceiveChannel<Closeable>.closeReceivedElements() {
3739
try {
3840
while (true) poll()?.close() ?: break
3941
} catch (e: Throwable) {
4042
}
4143
}
44+
45+
@SharedImmutable
46+
private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close
47+
48+
@Suppress("FunctionName")
49+
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)
50+
51+
//TODO check after fix of https://github.com/Kotlin/kotlinx.coroutines/issues/2435
52+
// and https://github.com/Kotlin/kotlinx.coroutines/issues/974
53+
internal fun <E : Closeable> SendChannel<E>.safeOffer(element: E) {
54+
try {
55+
if (!offer(element)) element.close()
56+
} catch (cause: Throwable) {
57+
element.close()
58+
throw cause
59+
}
60+
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,20 @@
1717
package io.rsocket.kotlin.internal
1818

1919
import io.rsocket.kotlin.frame.*
20+
import kotlinx.coroutines.*
2021
import kotlinx.coroutines.channels.*
2122
import kotlinx.coroutines.selects.*
2223

2324
internal class Prioritizer {
24-
private val priorityChannel = Channel<Frame>(Channel.UNLIMITED)
25-
private val commonChannel = Channel<Frame>(Channel.UNLIMITED)
25+
private val priorityChannel = SafeChannel<Frame>(Channel.UNLIMITED)
26+
private val commonChannel = SafeChannel<Frame>(Channel.UNLIMITED)
2627

2728
fun send(frame: Frame) {
28-
commonChannel.offer(frame)
29+
commonChannel.safeOffer(frame)
2930
}
3031

3132
fun sendPrioritized(frame: Frame) {
32-
priorityChannel.offer(frame)
33+
priorityChannel.safeOffer(frame)
3334
}
3435

3536
suspend fun receive(): Frame {
@@ -41,10 +42,10 @@ internal class Prioritizer {
4142
}
4243
}
4344

44-
fun close(throwable: Throwable?) {
45+
fun cancel(error: CancellationException) {
4546
priorityChannel.closeReceivedElements()
4647
commonChannel.closeReceivedElements()
47-
priorityChannel.close(throwable)
48-
commonChannel.close(throwable)
48+
priorityChannel.cancel(error)
49+
commonChannel.cancel(error)
4950
}
5051
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import kotlinx.coroutines.flow.*
2727
internal class RSocketRequester(
2828
private val state: RSocketState,
2929
private val streamId: StreamId,
30-
) : RSocket, Cancelable by state {
30+
) : RSocket, Cancellable by state {
3131

3232
override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.closeOnError {
3333
checkAvailable()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import kotlinx.coroutines.*
2424
internal class RSocketResponder(
2525
private val state: RSocketState,
2626
private val requestHandler: RSocket,
27-
) : Cancelable by state {
27+
) : Cancellable by state {
2828

2929
fun handleMetadataPush(frame: MetadataPushFrame) {
3030
state.launch {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import kotlinx.coroutines.flow.*
3333
internal class RSocketState(
3434
private val connection: Connection,
3535
keepAlive: KeepAlive,
36-
) : Cancelable by connection {
36+
) : Cancellable by connection {
3737
private val prioritizer = Prioritizer()
3838
private val requestScope = CoroutineScope(SupervisorJob(job))
3939
private val scope = CoroutineScope(job)
@@ -53,7 +53,7 @@ internal class RSocketState(
5353
}
5454

5555
fun createReceiverFor(streamId: Int, initFrame: RequestFrame? = null): ReceiveChannel<RequestFrame> {
56-
val receiver = Channel<RequestFrame>(Channel.UNLIMITED)
56+
val receiver = SafeChannel<RequestFrame>(Channel.UNLIMITED)
5757
initFrame?.let(receiver::offer) //used only in RequestChannel on responder side
5858
receivers[streamId] = receiver
5959
return receiver
@@ -71,7 +71,7 @@ internal class RSocketState(
7171
if (cause != null) send(CancelFrame(streamId))
7272
receivers.remove(streamId)?.apply {
7373
closeReceivedElements()
74-
close(cause)
74+
cancelConsumed(cause)
7575
}
7676
}
7777
}
@@ -120,7 +120,7 @@ internal class RSocketState(
120120
when (val streamId = frame.streamId) {
121121
0 -> when (frame) {
122122
is ErrorFrame -> {
123-
cancel("Zero stream error", frame.throwable)
123+
job.completeExceptionally(frame.throwable)
124124
frame.release() //TODO
125125
}
126126
is KeepAliveFrame -> keepAliveHandler.receive(frame)
@@ -146,7 +146,7 @@ internal class RSocketState(
146146
frame.release()
147147
}
148148
is RequestFrame -> when (frame.type) {
149-
FrameType.Payload -> receivers[streamId]?.offer(frame)
149+
FrameType.Payload -> receivers[streamId]?.safeOffer(frame) ?: frame.release()
150150
FrameType.RequestFnF -> responder.handleFireAndForget(frame)
151151
FrameType.RequestResponse -> responder.handlerRequestResponse(frame)
152152
FrameType.RequestStream -> responder.handleRequestStream(frame)
@@ -164,20 +164,35 @@ internal class RSocketState(
164164
fun start(requestHandler: RSocket) {
165165
val responder = RSocketResponder(this, requestHandler)
166166
keepAliveHandler.startIn(scope)
167-
requestHandler.job.invokeOnCompletion { cancel("Request handled stopped", it) }
167+
requestHandler.job.invokeOnCompletion {
168+
when (it) {
169+
null -> job.complete()
170+
is CancellationException -> job.cancel(it)
171+
else -> job.completeExceptionally(it)
172+
}
173+
}
168174
job.invokeOnCompletion { error ->
169-
requestHandler.cancel("Connection closed", error)
175+
when (error) {
176+
null -> requestHandler.job.complete()
177+
is CancellationException -> requestHandler.job.cancel(error)
178+
else -> requestHandler.job.completeExceptionally(error)
179+
}
180+
val cancelError = error as? CancellationException ?: CancellationException("Connection closed", error)
170181
receivers.values().forEach {
171182
it.closeReceivedElements()
172-
it.close((error as? CancellationException)?.cause ?: error)
183+
it.cancel(cancelError)
173184
}
185+
senders.values().forEach { it.cancel(cancelError) }
174186
receivers.clear()
175187
limits.clear()
176188
senders.clear()
177-
prioritizer.close(error)
189+
prioritizer.cancel(cancelError)
178190
}
179191
scope.launch {
180-
while (connection.isActive) connection.sendFrame(prioritizer.receive())
192+
while (connection.isActive) {
193+
val frame = prioritizer.receive()
194+
frame.closeOnError { connection.sendFrame(frame) }
195+
}
181196
}
182197
scope.launch {
183198
while (connection.isActive) {

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ class SetupRejectionTest : SuspendTest, TestWithLeakCheck {
5656
}
5757
val sender = sendingRSocket.await()
5858
assertFalse(sender.isActive)
59+
val error = expectError()
60+
assertTrue(error is RSocketError.Setup.Rejected)
61+
assertEquals(errorMessage, error.message)
5962
}
6063
}
6164

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck {
8787

8888
@Test
8989
fun noKeepAliveSentAfterRSocketCanceled() = test {
90-
requester().cancel()
90+
requester().job.cancel()
9191
connection.test {
9292
expectNoEventsIn(500)
9393
}

0 commit comments

Comments
 (0)