Skip to content

Commit 5526b27

Browse files
author
olme04
authored
Rework internals - preparation for fragmentation (#176)
* improve setup frame handling * simplify requestFlow * rewrite internals to store all data about stream in one place - preparation for fragmentation Co-authored-by: olme04 <olme04>
1 parent 4185675 commit 5526b27

34 files changed

+1173
-572
lines changed

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,5 @@ internal suspend fun Connection.receiveFrame(): Frame = receive().readFrame(pool
4141

4242
@OptIn(TransportApi::class)
4343
internal suspend fun Connection.sendFrame(frame: Frame) {
44-
val packet = frame.toPacket(pool)
45-
packet.closeOnError { send(packet) }
44+
frame.toPacket(pool).closeOnError { send(it) }
4645
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import io.rsocket.kotlin.frame.io.*
2222
import io.rsocket.kotlin.internal.*
2323
import io.rsocket.kotlin.logging.*
2424
import io.rsocket.kotlin.transport.*
25+
import kotlinx.coroutines.*
2526

2627
@OptIn(TransportApi::class, RSocketLoggingApi::class)
2728
public class RSocketConnector internal constructor(
@@ -43,18 +44,34 @@ public class RSocketConnector internal constructor(
4344

4445
private suspend fun connectOnce(transport: ClientTransport): RSocket {
4546
val connection = transport.connect().wrapConnection()
46-
val connectionConfig = connectionConfigProvider()
47-
48-
return connection.connect(isServer = false, interceptors, connectionConfig, acceptor) {
49-
val setupFrame = SetupFrame(
50-
version = Version.Current,
51-
honorLease = false,
52-
keepAlive = connectionConfig.keepAlive,
53-
resumeToken = null,
54-
payloadMimeType = connectionConfig.payloadMimeType,
55-
payload = connectionConfig.setupPayload
47+
val connectionConfig = try {
48+
connectionConfigProvider()
49+
} catch (cause: Throwable) {
50+
connection.job.cancel("Connection config provider failed", cause)
51+
throw cause
52+
}
53+
val setupFrame = SetupFrame(
54+
version = Version.Current,
55+
honorLease = false,
56+
keepAlive = connectionConfig.keepAlive,
57+
resumeToken = null,
58+
payloadMimeType = connectionConfig.payloadMimeType,
59+
payload = connectionConfig.setupPayload.copy() //copy needed, as it can be used in acceptor
60+
)
61+
try {
62+
val requester = connection.connect(
63+
isServer = false,
64+
interceptors = interceptors,
65+
connectionConfig = connectionConfig,
66+
acceptor = acceptor
5667
)
5768
connection.sendFrame(setupFrame)
69+
return requester
70+
} catch (cause: Throwable) {
71+
connectionConfig.setupPayload.release()
72+
setupFrame.release()
73+
connection.job.cancel("Connection establishment failed", cause)
74+
throw cause
5875
}
5976
}
6077

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ public class RSocketConnectorBuilder internal constructor() {
103103
)
104104

105105
private companion object {
106-
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor { EmptyRSocket() }
106+
private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor {
107+
config.setupPayload.release()
108+
EmptyRSocket()
109+
}
107110

108111
private class EmptyRSocket : RSocket {
109112
override val job: Job = Job()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,44 +33,40 @@ public class RSocketServer internal constructor(
3333
public fun <T> bind(
3434
transport: ServerTransport<T>,
3535
acceptor: ConnectionAcceptor,
36-
): T = transport.start {
37-
val connection = it.wrapConnection()
38-
val setupFrame = connection.validateSetup()
39-
connection.start(setupFrame, acceptor)
40-
connection.job.join()
41-
}
42-
43-
private suspend fun Connection.start(setupFrame: SetupFrame, acceptor: ConnectionAcceptor) {
44-
val connectionConfig = ConnectionConfig(
45-
keepAlive = setupFrame.keepAlive,
46-
payloadMimeType = setupFrame.payloadMimeType,
47-
setupPayload = setupFrame.payload
48-
)
49-
try {
50-
connect(isServer = true, interceptors, connectionConfig, acceptor)
51-
} catch (e: Throwable) {
52-
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
53-
}
54-
}
36+
): T = transport.start { it.wrapConnection().bind(acceptor).join() }
5537

56-
private suspend fun Connection.validateSetup(): SetupFrame {
57-
val setupFrame = receiveFrame()
58-
return when {
38+
private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame().closeOnError { setupFrame ->
39+
when {
5940
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
6041
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
6142
setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported"))
6243
setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported"))
63-
else -> setupFrame
44+
else -> try {
45+
connect(
46+
isServer = true,
47+
interceptors = interceptors,
48+
connectionConfig = ConnectionConfig(
49+
keepAlive = setupFrame.keepAlive,
50+
payloadMimeType = setupFrame.payloadMimeType,
51+
setupPayload = setupFrame.payload
52+
),
53+
acceptor = acceptor
54+
)
55+
job
56+
} catch (e: Throwable) {
57+
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
58+
}
6459
}
6560
}
6661

67-
private fun Connection.wrapConnection(): Connection =
68-
interceptors.wrapConnection(this)
69-
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
70-
7162
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
7263
sendFrame(ErrorFrame(0, error))
7364
job.cancel("Connection establishment failed", error)
7465
throw error
7566
}
67+
68+
private fun Connection.wrapConnection(): Connection =
69+
interceptors.wrapConnection(this)
70+
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
71+
7672
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/CloseOperations.kt

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,29 @@
1717
package io.rsocket.kotlin.internal
1818

1919
import io.ktor.utils.io.core.*
20-
import kotlinx.coroutines.*
2120
import kotlinx.coroutines.channels.*
2221
import kotlin.native.concurrent.*
2322

24-
internal inline fun <T> Closeable.closeOnError(block: () -> T): T {
23+
internal inline fun <T : Closeable, R> T.closeOnError(block: (T) -> R): R {
2524
try {
26-
return block()
25+
return block(this)
2726
} catch (e: Throwable) {
2827
close()
2928
throw e
3029
}
3130
}
3231

33-
internal fun ReceiveChannel<*>.cancelConsumed(cause: Throwable?) {
34-
cancel(cause?.let { it as? CancellationException ?: CancellationException("Channel was consumed, consumer had failed", it) })
35-
}
36-
3732
@SharedImmutable
3833
private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close
3934

4035
@Suppress("FunctionName")
4136
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)
4237

43-
internal fun <E : Closeable> SendChannel<E>.safeOffer(element: E) {
44-
trySend(element)
45-
.onFailure { element.close() }
46-
.getOrThrow() //TODO
38+
internal fun <E : Closeable> SendChannel<E>.safeTrySend(element: E) {
39+
trySend(element).onFailure { element.close() }
40+
}
41+
42+
internal fun Channel<out Closeable>.fullClose(cause: Throwable?) {
43+
close(cause) // close channel to provide right cause
44+
cancel() // force call of onUndeliveredElement to release buffered elements
4745
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,80 @@
1616

1717
package io.rsocket.kotlin.internal
1818

19+
import io.ktor.utils.io.core.*
1920
import io.rsocket.kotlin.*
2021
import io.rsocket.kotlin.core.*
22+
import io.rsocket.kotlin.frame.*
23+
import kotlinx.coroutines.*
2124

2225
@OptIn(TransportApi::class)
2326
internal suspend inline fun Connection.connect(
2427
isServer: Boolean,
2528
interceptors: Interceptors,
2629
connectionConfig: ConnectionConfig,
27-
acceptor: ConnectionAcceptor,
28-
beforeStart: () -> Unit = {},
30+
acceptor: ConnectionAcceptor
2931
): RSocket {
30-
val state = RSocketState(this, connectionConfig.keepAlive)
31-
val requester = RSocketRequester(state, StreamId(isServer)).let(interceptors::wrapRequester)
32-
val connectionContext = ConnectionAcceptorContext(connectionConfig, requester)
33-
val requestHandler = with(interceptors.wrapAcceptor(acceptor)) { connectionContext.accept() }.let(interceptors::wrapResponder)
34-
beforeStart()
35-
state.start(requestHandler)
32+
val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive)
33+
val prioritizer = Prioritizer()
34+
val streamsStorage = StreamsStorage(isServer)
35+
val requestJob = SupervisorJob(job)
36+
37+
requestJob.invokeOnCompletion {
38+
prioritizer.close(it)
39+
streamsStorage.cleanup(it)
40+
connectionConfig.setupPayload.release()
41+
}
42+
43+
val requestScope = CoroutineScope(requestJob + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> })
44+
val connectionScope = CoroutineScope(job + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> })
45+
46+
val requester = interceptors.wrapRequester(RSocketRequester(job, prioritizer, streamsStorage, requestScope))
47+
val requestHandler = interceptors.wrapResponder(
48+
with(interceptors.wrapAcceptor(acceptor)) {
49+
ConnectionAcceptorContext(connectionConfig, requester).accept()
50+
}
51+
)
52+
53+
// link completing of connection and requestHandler
54+
job.invokeOnCompletion { requestHandler.job.cancel("Connection closed", it) }
55+
requestHandler.job.invokeOnCompletion { if (it != null) job.cancel("Request handler failed", it) }
56+
57+
// start keepalive ticks
58+
connectionScope.launch {
59+
while (isActive) {
60+
keepAliveHandler.tick()
61+
prioritizer.send(KeepAliveFrame(true, 0, ByteReadPacket.Empty))
62+
}
63+
}
64+
65+
// start sending frames to connection
66+
connectionScope.launch {
67+
while (isActive) {
68+
sendFrame(prioritizer.receive())
69+
}
70+
}
71+
72+
// start frame handling
73+
connectionScope.launch {
74+
val rSocketResponder = RSocketResponder(prioritizer, requestHandler, requestScope)
75+
while (isActive) {
76+
receiveFrame().closeOnError { frame ->
77+
when (frame.streamId) {
78+
0 -> when (frame) {
79+
is MetadataPushFrame -> rSocketResponder.handleMetadataPush(frame.metadata)
80+
is ErrorFrame -> job.cancel("Error frame received on 0 stream", frame.throwable)
81+
is KeepAliveFrame -> {
82+
keepAliveHandler.mark()
83+
if (frame.respond) prioritizer.send(KeepAliveFrame(false, 0, frame.data)) else Unit
84+
}
85+
is LeaseFrame -> frame.release().also { error("lease isn't implemented") }
86+
else -> frame.release()
87+
}
88+
else -> streamsStorage.handleFrame(frame, rSocketResponder)
89+
}
90+
}
91+
}
92+
}
93+
3694
return requester
3795
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,23 @@
1616

1717
package io.rsocket.kotlin.internal
1818

19-
import io.ktor.utils.io.core.*
2019
import io.rsocket.kotlin.*
21-
import io.rsocket.kotlin.frame.*
2220
import io.rsocket.kotlin.keepalive.*
2321
import kotlinx.atomicfu.*
2422
import kotlinx.coroutines.*
2523

26-
internal class KeepAliveHandler(
27-
private val keepAlive: KeepAlive,
28-
private val offerFrame: (frame: Frame) -> Unit,
29-
) {
24+
internal class KeepAliveHandler(private val keepAlive: KeepAlive) {
25+
private val lastMark = atomic(currentMillis()) // mark initial timestamp for keepalive
3026

31-
private val lastMark = atomic(currentMillis())
32-
33-
fun receive(frame: KeepAliveFrame) {
27+
fun mark() {
3428
lastMark.value = currentMillis()
35-
if (frame.respond) {
36-
offerFrame(KeepAliveFrame(false, 0, frame.data))
37-
}
3829
}
3930

40-
fun startIn(scope: CoroutineScope) {
41-
scope.launch {
42-
while (isActive) {
43-
delay(keepAlive.intervalMillis.toLong())
44-
if (currentMillis() - lastMark.value >= keepAlive.maxLifetimeMillis) {
45-
//for K/N
46-
scope.cancel("Keep alive failed", RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetimeMillis} ms"))
47-
break
48-
}
49-
offerFrame(KeepAliveFrame(true, 0, ByteReadPacket.Empty))
50-
}
51-
}
31+
// return boolean because of native
32+
suspend fun tick() {
33+
delay(keepAlive.intervalMillis.toLong())
34+
if (currentMillis() - lastMark.value < keepAlive.maxLifetimeMillis) return
35+
throw RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetimeMillis} ms")
5236
}
5337
}
5438

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,25 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.rsocket.kotlin.internal.flow
17+
package io.rsocket.kotlin.internal
1818

19-
import io.rsocket.kotlin.frame.*
20-
import io.rsocket.kotlin.internal.*
2119
import io.rsocket.kotlin.payload.*
2220
import kotlinx.atomicfu.*
2321
import kotlinx.coroutines.*
2422
import kotlinx.coroutines.flow.*
2523
import kotlin.coroutines.*
2624

27-
internal class LimitingFlowCollector(
28-
private val state: RSocketState,
29-
private val streamId: Int,
30-
initial: Int,
31-
) : FlowCollector<Payload> {
25+
internal suspend inline fun Flow<Payload>.collectLimiting(limiter: Limiter, crossinline action: suspend (value: Payload) -> Unit) {
26+
collect { payload ->
27+
payload.closeOnError {
28+
limiter.useRequest()
29+
action(it)
30+
}
31+
}
32+
}
33+
34+
//TODO revisit 2 atomics
35+
internal class Limiter(initial: Int) {
3236
private val requests = atomic(initial)
3337
private val awaiter = atomic<CancellableContinuation<Unit>?>(null)
3438

@@ -38,12 +42,7 @@ internal class LimitingFlowCollector(
3842
awaiter.getAndSet(null)?.takeIf(CancellableContinuation<Unit>::isActive)?.resume(Unit)
3943
}
4044

41-
override suspend fun emit(value: Payload): Unit = value.closeOnError {
42-
useRequest()
43-
state.send(NextPayloadFrame(streamId, value))
44-
}
45-
46-
private suspend fun useRequest() {
45+
suspend fun useRequest() {
4746
if (requests.getAndDecrement() > 0) {
4847
currentCoroutineContext().ensureActive()
4948
} else {

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ private class LoggingConnection(
3737
}
3838

3939
override suspend fun send(packet: ByteReadPacket) {
40-
logger.debug { "Send: ${packet.dumpFrameToString()}" }
40+
logger.debug { "Send: ${packet.dumpFrameToString()}" }
4141
delegate.send(packet)
4242
}
4343

0 commit comments

Comments
 (0)