Skip to content

Commit 3ed0674

Browse files
author
olme04
authored
improves transport api and lifecycle (#178)
Co-authored-by: olme04 <olme04>
1 parent 899e9b7 commit 3ed0674

File tree

59 files changed

+454
-451
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+454
-451
lines changed

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import kotlinx.coroutines.*
2525
import kotlinx.coroutines.flow.*
2626
import kotlin.random.*
2727

28-
@OptIn(ExperimentalStreamsApi::class)
28+
@OptIn(ExperimentalStreamsApi::class, DelicateCoroutinesApi::class)
2929
class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
3030
private val requestStrategy = PrefetchStrategy(64, 0)
3131

32+
private val benchJob = Job()
3233
lateinit var client: RSocket
33-
lateinit var server: Job
3434

3535
lateinit var payload: Payload
3636
lateinit var payloadsFlow: Flow<Payload>
@@ -40,9 +40,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
4040
override fun setup() {
4141
payload = createPayload(payloadSize)
4242
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }
43-
44-
val localServer = LocalServer()
45-
server = RSocketServer().bind(localServer) {
43+
val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) {
4644
RSocketRequestHandler {
4745
requestResponse {
4846
it.release()
@@ -59,14 +57,14 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
5957
}
6058
}
6159
client = runBlocking {
62-
RSocketConnector().connect(localServer)
60+
RSocketConnector().connect(server)
6361
}
6462
}
6563

6664
override fun cleanup() {
6765
runBlocking {
68-
client.job.runCatching { cancelAndJoin() }
69-
server.runCatching { cancelAndJoin() }
66+
client.coroutineContext.job.cancelAndJoin()
67+
benchJob.cancelAndJoin()
7068
}
7169
}
7270

examples/interactions/src/jvmMain/kotlin/ReconnectExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ import kotlinx.coroutines.flow.*
2525

2626
@TransportApi
2727
fun main(): Unit = runBlocking {
28-
val server = LocalServer()
29-
RSocketServer().bind(server) {
28+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
3029
RSocketRequestHandler {
3130
requestStream { requestPayload ->
3231
val data = requestPayload.data.readText()

examples/interactions/src/jvmMain/kotlin/ReconnectOnConnectFailExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ import kotlinx.coroutines.flow.*
2323

2424
@TransportApi
2525
fun main(): Unit = runBlocking {
26-
val server = LocalServer()
27-
RSocketServer().bind(server) {
26+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2827
RSocketRequestHandler {
2928
requestStream { requestPayload ->
3029
val data = requestPayload.data.readText()

examples/interactions/src/jvmMain/kotlin/RequestChannelExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import kotlinx.coroutines.flow.*
2222

2323

2424
fun main(): Unit = runBlocking {
25-
val server = LocalServer()
26-
RSocketServer().bind(server) {
25+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2726
RSocketRequestHandler {
2827
requestChannel { init, request ->
2928
println("Init with: ${init.data.readText()}")

examples/interactions/src/jvmMain/kotlin/RequestResponseErrorExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
2020
import kotlinx.coroutines.*
2121

2222
fun main(): Unit = runBlocking {
23-
val server = LocalServer()
24-
RSocketServer().bind(server) {
23+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2524
RSocketRequestHandler {
2625
requestResponse {
2726
val data = it.data.readText()

examples/interactions/src/jvmMain/kotlin/RequestResponseExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
2020
import kotlinx.coroutines.*
2121

2222
fun main(): Unit = runBlocking {
23-
val server = LocalServer()
24-
RSocketServer().bind(server) {
23+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2524
RSocketRequestHandler {
2625
requestResponse {
2726
val data = it.data.readText()

examples/interactions/src/jvmMain/kotlin/RequestStreamExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import kotlinx.coroutines.*
2121
import kotlinx.coroutines.flow.*
2222

2323
fun main(): Unit = runBlocking {
24-
val server = LocalServer()
25-
RSocketServer().bind(server) {
24+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2625
RSocketRequestHandler {
2726
requestStream {
2827
val data = it.data.readText()

examples/interactions/src/jvmMain/kotlin/ServerRequestExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
2020
import kotlinx.coroutines.*
2121

2222
fun main(): Unit = runBlocking {
23-
val server = LocalServer()
24-
RSocketServer().bind(server) {
23+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2524
RSocketRequestHandler {
2625
requestResponse {
2726
val clientRequest = it.data.readText()

examples/interactions/src/jvmMain/kotlin/ServerSetupExample.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ import kotlinx.coroutines.flow.*
2323

2424

2525
fun main(): Unit = runBlocking {
26-
27-
val server = LocalServer()
28-
RSocketServer().bind(server) {
26+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2927
val data = config.setupPayload.metadata?.readText() ?: error("Empty metadata")
3028
RSocketRequestHandler {
3129
when (data) {
@@ -43,8 +41,8 @@ fun main(): Unit = runBlocking {
4341

4442
suspend fun client1() {
4543
val rSocketClient = RSocketConnector().connect(server)
46-
rSocketClient.job.join()
47-
println("Client 1 canceled: ${rSocketClient.job.isCancelled}")
44+
rSocketClient.coroutineContext.job.join()
45+
println("Client 1 canceled: ${rSocketClient.coroutineContext.job.isCancelled}")
4846
try {
4947
rSocketClient.requestResponse(Payload.Empty)
5048
} catch (e: Throwable) {

examples/multiplatform-chat/src/clientMain/kotlin/Api.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616

1717
import io.ktor.client.*
1818
import io.ktor.client.features.websocket.*
19-
import io.ktor.network.selector.*
20-
import io.ktor.util.*
2119
import io.rsocket.kotlin.*
2220
import io.rsocket.kotlin.core.*
2321
import io.rsocket.kotlin.payload.*
2422
import io.rsocket.kotlin.transport.ktor.*
2523
import io.rsocket.kotlin.transport.ktor.client.*
24+
import kotlinx.coroutines.*
2625

2726
class Api(rSocket: RSocket) {
2827
private val proto = ConfiguredProtoBuf
@@ -42,9 +41,10 @@ suspend fun connectToApiUsingWS(name: String): Api {
4241
return Api(client.rSocket(port = 9000))
4342
}
4443

45-
@OptIn(InternalAPI::class)
4644
suspend fun connectToApiUsingTCP(name: String): Api {
47-
val transport = TcpClientTransport(SelectorManager(), "0.0.0.0", 8000)
45+
val transport = TcpClientTransport("0.0.0.0", 8000, CoroutineExceptionHandler { coroutineContext, throwable ->
46+
println("FAIL: $coroutineContext, $throwable")
47+
})
4848
return Api(connector(name).connect(transport))
4949
}
5050

examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
*/
1616

1717
import io.ktor.application.*
18-
import io.ktor.network.selector.*
1918
import io.ktor.routing.*
2019
import io.ktor.server.cio.*
2120
import io.ktor.server.engine.*
22-
import io.ktor.util.*
2321
import io.ktor.websocket.*
2422
import io.rsocket.kotlin.*
2523
import io.rsocket.kotlin.core.*
@@ -31,7 +29,7 @@ import kotlinx.coroutines.*
3129
import kotlinx.coroutines.flow.*
3230
import kotlinx.serialization.*
3331

34-
@OptIn(ExperimentalSerializationApi::class, ExperimentalMetadataApi::class, InternalAPI::class)
32+
@OptIn(ExperimentalSerializationApi::class, ExperimentalMetadataApi::class, DelicateCoroutinesApi::class)
3533
fun main() {
3634
val proto = ConfiguredProtoBuf
3735
val users = Users()
@@ -97,7 +95,7 @@ fun main() {
9795
}
9896

9997
//start TCP server
100-
rSocketServer.bind(TcpServerTransport(ActorSelectorManager(Dispatchers.IO), port = 9000), acceptor)
98+
rSocketServer.bind(TcpServerTransport(port = 8000), acceptor)
10199

102100
//start WS server
103101
embeddedServer(CIO, port = 9000) {

playground/src/commonMain/kotlin/TCP.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,18 @@
1414
* limitations under the License.
1515
*/
1616

17-
import io.ktor.network.selector.*
1817
import io.rsocket.kotlin.core.*
1918
import io.rsocket.kotlin.payload.*
2019
import io.rsocket.kotlin.transport.ktor.*
21-
import kotlin.coroutines.*
2220

23-
24-
suspend fun runTcpClient(dispatcher: CoroutineContext) {
25-
val transport = TcpClientTransport(SelectorManager(dispatcher), "0.0.0.0", 4444)
21+
suspend fun runTcpClient() {
22+
val transport = TcpClientTransport("0.0.0.0", 4444)
2623
RSocketConnector().connect(transport).doSomething()
2724
}
2825

2926
//to test nodejs tcp server
30-
suspend fun testNodeJsServer(dispatcher: CoroutineContext) {
31-
val transport = TcpClientTransport(SelectorManager(dispatcher), "127.0.0.1", 9000)
27+
suspend fun testNodeJsServer() {
28+
val transport = TcpClientTransport("127.0.0.1", 9000)
3229
val client = RSocketConnector().connect(transport)
3330

3431
val response = client.requestResponse(buildPayload { data("Hello from JVM") })

playground/src/jvmMain/kotlin/TcpClientApp.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
* limitations under the License.
1515
*/
1616

17-
import kotlinx.coroutines.*
17+
suspend fun main(): Unit = runTcpClient()
1818

19-
suspend fun main(): Unit = runTcpClient(Dispatchers.IO)
20-
21-
//suspend fun main(): Unit = testNodeJsServer(Dispatchers.IO)
19+
//suspend fun main(): Unit = testNodeJsServer()

playground/src/jvmMain/kotlin/TcpServerApp.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414
* limitations under the License.
1515
*/
1616

17-
import io.ktor.network.selector.*
1817
import io.rsocket.kotlin.core.*
1918
import io.rsocket.kotlin.transport.ktor.*
2019
import kotlinx.coroutines.*
2120
import kotlin.coroutines.*
2221

2322
suspend fun runTcpServer(dispatcher: CoroutineContext) {
24-
val transport = TcpServerTransport(SelectorManager(dispatcher), "0.0.0.0", 4444)
25-
RSocketServer().bind(transport, rSocketAcceptor).join()
23+
val transport = TcpServerTransport("0.0.0.0", 4444)
24+
RSocketServer().bindIn(CoroutineScope(dispatcher), transport, rSocketAcceptor).handlerJob.join()
2625
}
2726

2827
suspend fun main(): Unit = runTcpServer(Dispatchers.IO)

playground/src/nativeMain/kotlin/TcpApp.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
import io.ktor.util.*
1817
import kotlinx.coroutines.*
19-
import kotlin.coroutines.*
2018

21-
@OptIn(InternalAPI::class)
2219
fun main() {
2320
runBlocking {
24-
runTcpClient(EmptyCoroutineContext)
21+
runTcpClient()
2522
}
2623
}
2724

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,15 @@ import kotlinx.coroutines.*
2727
* That interface isn't stable for inheritance.
2828
*/
2929
@TransportApi
30-
public interface Connection {
31-
public val job: Job
32-
30+
public interface Connection : CoroutineScope {
3331
public val pool: ObjectPool<ChunkBuffer> get() = ChunkBuffer.Pool
3432

3533
public suspend fun send(packet: ByteReadPacket)
3634
public suspend fun receive(): ByteReadPacket
3735
}
3836

3937
@OptIn(TransportApi::class)
40-
internal suspend fun Connection.receiveFrame(): Frame = receive().readFrame(pool)
38+
internal suspend inline fun <T> Connection.receiveFrame(block: (frame: Frame) -> T): T = receive().readFrame(pool).closeOnError(block)
4139

4240
@OptIn(TransportApi::class)
4341
internal suspend fun Connection.sendFrame(frame: Frame) {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import io.rsocket.kotlin.payload.*
2121
import kotlinx.coroutines.*
2222
import kotlinx.coroutines.flow.*
2323

24-
public interface RSocket {
25-
public val job: Job
24+
public interface RSocket : CoroutineScope {
2625

2726
public suspend fun metadataPush(metadata: ByteReadPacket) {
2827
metadata.release()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketRequestHandler.kt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import io.ktor.utils.io.core.*
2020
import io.rsocket.kotlin.payload.*
2121
import kotlinx.coroutines.*
2222
import kotlinx.coroutines.flow.*
23+
import kotlin.coroutines.*
2324

2425
public class RSocketRequestHandlerBuilder internal constructor() {
2526
private var metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null
@@ -53,19 +54,29 @@ public class RSocketRequestHandlerBuilder internal constructor() {
5354
requestChannel = block
5455
}
5556

56-
internal fun build(job: Job): RSocket =
57-
RSocketRequestHandler(job, metadataPush, fireAndForget, requestResponse, requestStream, requestChannel)
57+
internal fun build(parentContext: CoroutineContext): RSocket =
58+
RSocketRequestHandler(
59+
parentContext + Job(parentContext[Job]),
60+
metadataPush,
61+
fireAndForget,
62+
requestResponse,
63+
requestStream,
64+
requestChannel
65+
)
5866
}
5967

6068
@Suppress("FunctionName")
61-
public fun RSocketRequestHandler(parentJob: Job? = null, configure: RSocketRequestHandlerBuilder.() -> Unit): RSocket {
69+
public fun RSocketRequestHandler(
70+
parentContext: CoroutineContext = EmptyCoroutineContext,
71+
configure: RSocketRequestHandlerBuilder.() -> Unit
72+
): RSocket {
6273
val builder = RSocketRequestHandlerBuilder()
6374
builder.configure()
64-
return builder.build(Job(parentJob))
75+
return builder.build(parentContext)
6576
}
6677

6778
private class RSocketRequestHandler(
68-
override val job: Job,
79+
override val coroutineContext: CoroutineContext,
6980
private val metadataPush: (suspend RSocket.(metadata: ByteReadPacket) -> Unit)? = null,
7081
private val fireAndForget: (suspend RSocket.(payload: Payload) -> Unit)? = null,
7182
private val requestResponse: (suspend RSocket.(payload: Payload) -> Payload)? = null,

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ public class RSocketConnector internal constructor(
3535
) {
3636

3737
public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
38-
null -> connectOnce(transport)
39-
else -> ReconnectableRSocket(
40-
logger = loggerFactory.logger("io.rsocket.kotlin.connection"),
41-
connect = { connectOnce(transport) },
42-
predicate = reconnectPredicate
38+
//TODO current coroutineContext job is overriden by transport coroutineContext jov
39+
null -> withContext(transport.coroutineContext) { connectOnce(transport) }
40+
else -> connectWithReconnect(
41+
transport.coroutineContext,
42+
loggerFactory.logger("io.rsocket.kotlin.connection"),
43+
{ connectOnce(transport) },
44+
reconnectPredicate,
4345
)
4446
}
4547

@@ -48,7 +50,7 @@ public class RSocketConnector internal constructor(
4850
val connectionConfig = try {
4951
connectionConfigProvider()
5052
} catch (cause: Throwable) {
51-
connection.job.cancel("Connection config provider failed", cause)
53+
connection.cancel("Connection config provider failed", cause)
5254
throw cause
5355
}
5456
val setupFrame = SetupFrame(
@@ -60,7 +62,8 @@ public class RSocketConnector internal constructor(
6062
payload = connectionConfig.setupPayload.copy() //copy needed, as it can be used in acceptor
6163
)
6264
try {
63-
val requester = connection.connect(
65+
val requester = connect(
66+
connection = connection,
6467
isServer = false,
6568
maxFragmentSize = maxFragmentSize,
6669
interceptors = interceptors,
@@ -72,7 +75,7 @@ public class RSocketConnector internal constructor(
7275
} catch (cause: Throwable) {
7376
connectionConfig.setupPayload.release()
7477
setupFrame.release()
75-
connection.job.cancel("Connection establishment failed", cause)
78+
connection.cancel("Connection establishment failed", cause)
7679
throw cause
7780
}
7881
}

0 commit comments

Comments
 (0)