From e6e92932a9811ba26e07d6d45d3399ca0a36cc08 Mon Sep 17 00:00:00 2001 From: olme04 Date: Thu, 15 Jul 2021 17:42:10 +0300 Subject: [PATCH] allow to configure connection buffer for sending frames make it default to 64 for now (instead of current unlimited) --- .../io/rsocket/kotlin/core/RSocketConnector.kt | 2 ++ .../kotlin/core/RSocketConnectorBuilder.kt | 18 ++++++++++++------ .../io/rsocket/kotlin/core/RSocketServer.kt | 2 ++ .../kotlin/core/RSocketServerBuilder.kt | 12 +++++++++++- .../io/rsocket/kotlin/internal/Connect.kt | 3 ++- .../io/rsocket/kotlin/internal/Prioritizer.kt | 6 +++--- .../rsocket/kotlin/internal/FrameSenderTest.kt | 3 ++- .../rsocket/kotlin/internal/PrioritizerTest.kt | 3 ++- .../kotlin/internal/RSocketRequesterTest.kt | 1 + .../rsocket/kotlin/keepalive/KeepAliveTest.kt | 1 + 10 files changed, 38 insertions(+), 13 deletions(-) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt index cdc62cf08..d74c8a2dd 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.* @OptIn(TransportApi::class, RSocketLoggingApi::class) public class RSocketConnector internal constructor( private val loggerFactory: LoggerFactory, + private val connectionBufferCapacity: Int, private val maxFragmentSize: Int, private val interceptors: Interceptors, private val connectionConfigProvider: () -> ConnectionConfig, @@ -64,6 +65,7 @@ public class RSocketConnector internal constructor( try { val requester = connect( connection = connection, + connectionBufferCapacity = connectionBufferCapacity, isServer = false, maxFragmentSize = maxFragmentSize, interceptors = interceptors, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt index 38a56c45e..0f8769447 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt @@ -34,6 +34,11 @@ public class RSocketConnectorBuilder internal constructor() { } field = value } + public var connectionBufferCapacity: Int = 64 + set(value) { + require(value >= 0) { "connectionBufferCapacity should be positive or equal to Int.MAX_VALUE" } + field = value + } private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder() private val interceptors: InterceptorsBuilder = InterceptorsBuilder() @@ -103,12 +108,13 @@ public class RSocketConnectorBuilder internal constructor() { @OptIn(RSocketLoggingApi::class) internal fun build(): RSocketConnector = RSocketConnector( - loggerFactory, - maxFragmentSize, - interceptors.build(), - connectionConfig.producer(), - acceptor ?: defaultAcceptor, - reconnectPredicate + loggerFactory = loggerFactory, + connectionBufferCapacity = connectionBufferCapacity, + maxFragmentSize = maxFragmentSize, + interceptors = interceptors.build(), + connectionConfigProvider = connectionConfig.producer(), + acceptor = acceptor ?: defaultAcceptor, + reconnectPredicate = reconnectPredicate ) private companion object { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt index f7dc1638b..181e40ce5 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.* @OptIn(TransportApi::class, RSocketLoggingApi::class) public class RSocketServer internal constructor( private val loggerFactory: LoggerFactory, + private val connectionBufferCapacity: Int, private val maxFragmentSize: Int, private val interceptors: Interceptors, ) { @@ -58,6 +59,7 @@ public class RSocketServer internal constructor( connection = this, isServer = true, maxFragmentSize = maxFragmentSize, + connectionBufferCapacity = connectionBufferCapacity, interceptors = interceptors, connectionConfig = ConnectionConfig( keepAlive = setupFrame.keepAlive, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt index 519a8ad9f..917b79b14 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt @@ -29,6 +29,11 @@ public class RSocketServerBuilder internal constructor() { } field = value } + public var connectionBufferCapacity: Int = 64 + set(value) { + require(value >= 0) { "connectionBufferCapacity should be positive or equal to Int.MAX_VALUE" } + field = value + } private val interceptors: InterceptorsBuilder = InterceptorsBuilder() @@ -37,7 +42,12 @@ public class RSocketServerBuilder internal constructor() { } @OptIn(RSocketLoggingApi::class) - internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build()) + internal fun build(): RSocketServer = RSocketServer( + loggerFactory = loggerFactory, + connectionBufferCapacity = connectionBufferCapacity, + maxFragmentSize = maxFragmentSize, + interceptors = interceptors.build() + ) } public fun RSocketServer(configure: RSocketServerBuilder.() -> Unit = {}): RSocketServer { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index 88bee3c4b..0ddf13a0b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -25,12 +25,13 @@ import kotlinx.coroutines.* internal suspend inline fun connect( connection: Connection, isServer: Boolean, + connectionBufferCapacity: Int, maxFragmentSize: Int, interceptors: Interceptors, connectionConfig: ConnectionConfig, acceptor: ConnectionAcceptor ): RSocket { - val prioritizer = Prioritizer() + val prioritizer = Prioritizer(connectionBufferCapacity) val frameSender = FrameSender(prioritizer, connection.pool, maxFragmentSize) val streamsStorage = StreamsStorage(isServer, connection.pool) val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, frameSender) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt index 63fbd64eb..c08adc955 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt @@ -25,9 +25,9 @@ import kotlin.native.concurrent.* @SharedImmutable private val selectFrame: suspend (Frame) -> Frame = { it } -internal class Prioritizer { - private val priorityChannel = SafeChannel(Channel.UNLIMITED) - private val commonChannel = SafeChannel(Channel.UNLIMITED) +internal class Prioritizer(capacity: Int) { + private val priorityChannel = SafeChannel(capacity) + private val commonChannel = SafeChannel(capacity) suspend fun send(frame: Frame) { currentCoroutineContext().ensureActive() diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt index d9e1eba83..605fbdac9 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt @@ -3,11 +3,12 @@ package io.rsocket.kotlin.internal import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.test.* +import kotlinx.coroutines.channels.* import kotlin.test.* class FrameSenderTest : SuspendTest, TestWithLeakCheck { - private val prioritizer = Prioritizer() + private val prioritizer = Prioritizer(Channel.UNLIMITED) private fun sender(maxFragmentSize: Int) = FrameSender(prioritizer, InUseTrackingPool, maxFragmentSize) @Test diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt index aa1b1a95d..5f5d7bba1 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt @@ -20,10 +20,11 @@ import io.ktor.utils.io.core.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.test.* import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlin.test.* class PrioritizerTest : SuspendTest, TestWithLeakCheck { - private val prioritizer = Prioritizer() + private val prioritizer = Prioritizer(Channel.UNLIMITED) @Test fun testOrdering() = test { diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index 7a24b7b06..ad9df7b85 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -37,6 +37,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { requester = connect( connection = connection, isServer = false, + connectionBufferCapacity = 64, maxFragmentSize = 0, interceptors = InterceptorsBuilder().build(), connectionConfig = ConnectionConfig( diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt index f918627c3..dfa2a9844 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt @@ -34,6 +34,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { ): RSocket = connect( connection = connection, isServer = false, + connectionBufferCapacity = 64, maxFragmentSize = 0, interceptors = InterceptorsBuilder().build(), connectionConfig = ConnectionConfig(keepAlive, DefaultPayloadMimeType, Payload.Empty)