Skip to content

Commit 8784c39

Browse files
authored
Reworked streams API using CoroutineContext.Element (RequestStrategy) for better control over requestN frames (#118)
1 parent d09999a commit 8784c39

File tree

20 files changed

+351
-188
lines changed

20 files changed

+351
-188
lines changed

README.md

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,9 @@ From [RSocket protocol](https://github.com/rsocket/rsocket/blob/master/Protocol.
208208
This is a credit-based model where the Requester grants the Responder credit for the number of PAYLOADs it can send.
209209
It is sometimes referred to as "request-n" or "request(n)".
210210

211-
`kotlinx.coroutines` doesn't truly support `request(n)` semantic, but it has `Flow.buffer(n)` operator
212-
which can be used to achieve something similar:
211+
`kotlinx.coroutines` doesn't truly support `request(n)` semantic, but it has flexible `CoroutineContext`
212+
which can be used to achieve something similar. `rsocket-kotlin` contains `RequestStrategy` coroutine context element, which defines,
213+
strategy for sending of `requestN` frames.
213214

214215
Example:
215216

@@ -220,13 +221,11 @@ val client: RSocket = TODO()
220221
//and stream
221222
val stream: Flow<Payload> = client.requestStream(Payload("data"))
222223

223-
//now we can use buffer to tell underlying transport to request values in chunks
224-
val bufferedStream: Flow<Payload> = stream.buffer(10) //here buffer is 10, if `buffer` operator is not used buffer is by default 64
225-
226-
//now you can collect as any other `Flow`
227-
//just after collection first request for 10 elements will be sent
228-
//after 10 elements collected, 10 more elements will be requested, and so on
229-
bufferedStream.collect { payload: Payload ->
224+
//now we can use `flowOn` to add request strategy to context of flow
225+
//here we use prefetch strategy which will send requestN for 10 elements, when, there is 5 elements left to collect
226+
//so on call `collect`, requestStream frame with requestN will be sent, and then, after 5 elements will be collected
227+
//new requestN with 5 will be sent, so collect will be smooth
228+
stream.flowOn(PrefetchStrategy(requestSize = 10, requestOn = 5)).collect { payload: Payload ->
230229
println(payload.data.readText())
231230
}
232231
```

benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import java.util.concurrent.locks.*
2424

2525
@BenchmarkMode(Mode.Throughput)
2626
@Fork(value = 2)
27-
@Warmup(iterations = 10, time = 10)
28-
@Measurement(iterations = 7, time = 10)
27+
@Warmup(iterations = 5, time = 5)
28+
@Measurement(iterations = 5, time = 5)
2929
@State(Scope.Benchmark)
3030
abstract class RSocketBenchmark<Payload : Any> {
3131

@@ -40,7 +40,7 @@ abstract class RSocketBenchmark<Payload : Any> {
4040

4141
@TearDown(Level.Iteration)
4242
fun awaitToBeConsumed() {
43-
LockSupport.parkNanos(5000)
43+
LockSupport.parkNanos(2000)
4444
}
4545

4646
abstract fun createPayload(size: Int): Payload
@@ -58,10 +58,10 @@ abstract class RSocketBenchmark<Payload : Any> {
5858
fun requestResponseBlocking(bh: Blackhole) = blocking(bh, ::requestResponse)
5959

6060
@Benchmark
61-
fun requestResponseParallel(bh: Blackhole) = parallel(bh, 500, ::requestResponse)
61+
fun requestResponseParallel(bh: Blackhole) = parallel(bh, 1000, ::requestResponse)
6262

6363
@Benchmark
64-
fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 500, ::requestResponse)
64+
fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 1000, ::requestResponse)
6565

6666

6767
@Benchmark
@@ -78,10 +78,10 @@ abstract class RSocketBenchmark<Payload : Any> {
7878
fun requestChannelBlocking(bh: Blackhole) = blocking(bh, ::requestChannel)
7979

8080
@Benchmark
81-
fun requestChannelParallel(bh: Blackhole) = parallel(bh, 3, ::requestChannel)
81+
fun requestChannelParallel(bh: Blackhole) = parallel(bh, 10, ::requestChannel)
8282

8383
@Benchmark
84-
fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 3, ::requestChannel)
84+
fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestChannel)
8585

8686

8787
private suspend fun requestResponse(bh: Blackhole) {

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,37 @@ import kotlinx.coroutines.*
2525
import kotlinx.coroutines.flow.*
2626
import kotlin.random.*
2727

28+
@OptIn(ExperimentalStreamsApi::class, ExperimentalCoroutinesApi::class)
2829
class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
30+
private val requestStrategy = PrefetchStrategy(64, 0)
2931

3032
lateinit var client: RSocket
3133
lateinit var server: Job
3234

3335
lateinit var payload: Payload
3436
lateinit var payloadsFlow: Flow<Payload>
3537

38+
fun payloadCopy(): Payload = payload.copy()
39+
3640
override fun setup() {
3741
payload = createPayload(payloadSize)
38-
payloadsFlow = flow { repeat(5000) { emit(payload.copy()) } }
42+
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }
3943

4044
val localServer = LocalServer()
4145
server = RSocketServer().bind(localServer) {
4246
RSocketRequestHandler {
4347
requestResponse {
4448
it.release()
45-
payload
49+
payloadCopy()
4650
}
4751
requestStream {
4852
it.release()
4953
payloadsFlow
5054
}
51-
requestChannel { it }
55+
requestChannel { it.flowOn(requestStrategy) }
5256
}
5357
}
54-
return runBlocking {
58+
client = runBlocking {
5559
RSocketConnector().connect(localServer)
5660
}
5761
}
@@ -72,10 +76,10 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
7276
payload.release()
7377
}
7478

75-
override suspend fun doRequestResponse(): Payload = client.requestResponse(payload.copy())
79+
override suspend fun doRequestResponse(): Payload = client.requestResponse(payloadCopy())
7680

77-
override suspend fun doRequestStream(): Flow<Payload> = client.requestStream(payload.copy())
81+
override suspend fun doRequestStream(): Flow<Payload> = client.requestStream(payloadCopy()).flowOn(requestStrategy)
7882

79-
override suspend fun doRequestChannel(): Flow<Payload> = client.requestChannel(payloadsFlow)
83+
override suspend fun doRequestChannel(): Flow<Payload> = client.requestChannel(payloadsFlow).flowOn(requestStrategy)
8084

8185
}

build.gradle.kts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ subprojects {
197197
extensions.configure<KotlinMultiplatformExtension> {
198198
val isTestProject = project.name == "rsocket-test"
199199
val isLibProject = project.name.startsWith("rsocket")
200+
val isPlaygroundProject = project.name == "playground"
201+
val isExampleProject = "examples" in project.path
200202

201203
sourceSets.all {
202204
languageSettings.apply {
@@ -206,7 +208,7 @@ subprojects {
206208

207209
useExperimentalAnnotation("kotlin.RequiresOptIn")
208210

209-
if (name.contains("test", ignoreCase = true) || isTestProject) {
211+
if (name.contains("test", ignoreCase = true) || isTestProject || isPlaygroundProject) {
210212
useExperimentalAnnotation("kotlin.time.ExperimentalTime")
211213
useExperimentalAnnotation("kotlin.ExperimentalStdlibApi")
212214

@@ -221,6 +223,7 @@ subprojects {
221223

222224
useExperimentalAnnotation("io.rsocket.kotlin.TransportApi")
223225
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalMetadataApi")
226+
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalStreamsApi")
224227
}
225228
}
226229
}
@@ -233,7 +236,7 @@ subprojects {
233236
}
234237

235238
//fix atomicfu for examples and playground
236-
if ("examples" in project.path || project.name == "playground") {
239+
if (isExampleProject || isPlaygroundProject) {
237240
sourceSets["commonMain"].dependencies {
238241
implementation("org.jetbrains.kotlinx:atomicfu:$kotlinxAtomicfuVersion")
239242
}

examples/interactions/src/jvmMain/kotlin/ReconnectExample.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.ktor.utils.io.core.*
1818
import io.rsocket.kotlin.*
1919
import io.rsocket.kotlin.core.*
20-
import io.rsocket.kotlin.payload.*
2120
import io.rsocket.kotlin.transport.*
2221
import io.rsocket.kotlin.transport.local.*
2322
import kotlinx.atomicfu.*
@@ -62,7 +61,7 @@ fun main(): Unit = runBlocking {
6261

6362
//do request
6463
try {
65-
rSocket.requestStream(Payload("Hello", "World")).buffer(3).collect {
64+
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).collect {
6665
val index = it.data.readText().substringAfter("Payload: ").toInt()
6766
println("Client receives index: $index")
6867
}
@@ -72,7 +71,7 @@ fun main(): Unit = runBlocking {
7271

7372
//do request just after it
7473

75-
rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
74+
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).take(3).collect {
7675
val index = it.data.readText().substringAfter("Payload: ").toInt()
7776
println("Client receives index: $index after reconnection")
7877
}

examples/interactions/src/jvmMain/kotlin/ReconnectOnConnectFailExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import io.rsocket.kotlin.*
1818
import io.rsocket.kotlin.core.*
19-
import io.rsocket.kotlin.payload.*
2019
import io.rsocket.kotlin.transport.*
2120
import io.rsocket.kotlin.transport.local.*
2221
import kotlinx.coroutines.*
@@ -62,7 +61,7 @@ fun main(): Unit = runBlocking {
6261
})
6362

6463
//do request
65-
rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
64+
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).take(3).collect {
6665
val index = it.data.readText().substringAfter("Payload: ").toInt()
6766
println("Client receives index: $index")
6867
}

examples/interactions/src/jvmMain/kotlin/RequestChannelExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import io.rsocket.kotlin.*
1818
import io.rsocket.kotlin.core.*
19-
import io.rsocket.kotlin.payload.*
2019
import io.rsocket.kotlin.transport.local.*
2120
import kotlinx.coroutines.*
2221
import kotlinx.coroutines.flow.*
@@ -27,7 +26,7 @@ fun main(): Unit = runBlocking {
2726
RSocketServer().bind(server) {
2827
RSocketRequestHandler {
2928
requestChannel { request ->
30-
request.buffer(3).take(3).flatMapConcat { payload ->
29+
request.flowOn(PrefetchStrategy(3, 0)).take(3).flatMapConcat { payload ->
3130
val data = payload.data.readText()
3231
flow {
3332
repeat(3) {

examples/interactions/src/jvmMain/kotlin/RequestStreamExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import io.rsocket.kotlin.*
1818
import io.rsocket.kotlin.core.*
19-
import io.rsocket.kotlin.payload.*
2019
import io.rsocket.kotlin.transport.local.*
2120
import kotlinx.coroutines.*
2221
import kotlinx.coroutines.flow.*
@@ -44,7 +43,7 @@ fun main(): Unit = runBlocking {
4443
val response = rSocket.requestStream(Payload("Hello", "World"))
4544

4645
response
47-
.buffer(2) //use buffer as first operator to use RequestN semantic, so request by 2 elements
46+
.flowOn(PrefetchStrategy(2, 0))
4847
.map { it.data.readText().substringAfter("Payload: ").toInt() }
4948
.take(2)
5049
.collect {

playground/src/commonMain/kotlin/Stub.kt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ suspend fun RSocket.doSomething() {
4444
// launch { rSocket.fireAndForget(Payload(byteArrayOf(1, 1, 1), byteArrayOf(2, 2, 2))) }
4545
// launch { rSocket.metadataPush(byteArrayOf(1, 2, 3)) }
4646
var i = 0
47-
requestStream(buildPayload {
48-
data(byteArrayOf(1, 1, 1))
49-
metadata(byteArrayOf(2, 2, 2))
50-
}).buffer(10000).collect {
47+
requestStream(
48+
buildPayload {
49+
data(byteArrayOf(1, 1, 1))
50+
metadata(byteArrayOf(2, 2, 2))
51+
}
52+
).flowOn(PrefetchStrategy(10000, 0)).collect {
5153
println(it.data.readBytes().contentToString())
5254
if (++i == 10000) error("")
5355
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import io.rsocket.kotlin.*
18+
import kotlinx.coroutines.*
19+
import kotlinx.coroutines.channels.*
20+
import kotlinx.coroutines.flow.*
21+
import kotlin.coroutines.*
22+
23+
@ExperimentalStreamsApi
24+
private suspend fun s() {
25+
val flow = flow {
26+
val strategy = coroutineContext[RequestStrategy]!!.provide()
27+
var i = strategy.firstRequest()
28+
println("INIT: $i")
29+
var r = 0
30+
while (i > 0) {
31+
emit(r++)
32+
val n = strategy.nextRequest()
33+
println("")
34+
if (n > 0) i += n
35+
i--
36+
}
37+
}
38+
39+
flow.flowOn(PrefetchStrategy(64, 16)).onEach { println(it) }.launchIn(GlobalScope)
40+
41+
val ch = Channel<Int>()
42+
43+
flow.flowOn(ChannelStrategy(ch)).onEach { println(it) }.launchIn(GlobalScope)
44+
45+
delay(100)
46+
ch.send(5)
47+
delay(100)
48+
ch.send(5)
49+
delay(100)
50+
ch.send(5)
51+
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Annotations.kt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@ package io.rsocket.kotlin
2020
@RequiresOptIn(
2121
level = RequiresOptIn.Level.WARNING,
2222
message = "This is an API which is used to implement transport for RSocket, such as WS or TCP. " +
23-
"This API can change in future in non backwards-incompatible manner."
23+
"This API can change in future in non backwards-compatible manner."
2424
)
2525
public annotation class TransportApi
2626

2727
@Retention(value = AnnotationRetention.BINARY)
2828
@RequiresOptIn(
2929
level = RequiresOptIn.Level.WARNING,
30-
message = "This is an API to work with metadata. This API can change in future in non backwards-incompatible manner."
30+
message = "This is an API to work with metadata. This API can change in future in non backwards-compatible manner."
3131
)
3232
public annotation class ExperimentalMetadataApi
33+
34+
@Retention(value = AnnotationRetention.BINARY)
35+
@RequiresOptIn(
36+
level = RequiresOptIn.Level.WARNING,
37+
message = "This is an API to customize request strategy of streams. This API can change in future in non backwards-compatible manner."
38+
)
39+
public annotation class ExperimentalStreamsApi

0 commit comments

Comments
 (0)