Skip to content

Commit 842e7e5

Browse files
committed
Add RSocketRequester coroutines extensions
See gh-22780
1 parent 089fb57 commit 842e7e5

File tree

3 files changed

+187
-0
lines changed

3 files changed

+187
-0
lines changed

spring-messaging/spring-messaging.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ dependencies {
1919
optional("io.rsocket:rsocket-transport-netty:${rsocketVersion}")
2020
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
2121
optional("javax.xml.bind:jaxb-api:2.3.1")
22+
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
23+
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
2224
testCompile("javax.inject:javax.inject-tck:1")
2325
testCompile("javax.servlet:javax.servlet-api:4.0.1")
2426
testCompile("javax.validation:validation-api:1.1.0.Final")
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.rsocket
18+
19+
import io.rsocket.transport.ClientTransport
20+
import kotlinx.coroutines.FlowPreview
21+
import kotlinx.coroutines.flow.Flow
22+
import kotlinx.coroutines.reactive.awaitFirstOrNull
23+
import kotlinx.coroutines.reactive.awaitSingle
24+
import kotlinx.coroutines.reactive.flow.asFlow
25+
import kotlinx.coroutines.reactive.flow.asPublisher
26+
import org.springframework.core.ParameterizedTypeReference
27+
import java.net.URI
28+
29+
/**
30+
* Coroutines variant of [RSocketRequester.Builder.connect].
31+
*
32+
* @author Sebastien Deleuze
33+
* @since 5.2
34+
*/
35+
suspend fun RSocketRequester.Builder.connectAndAwait(transport: ClientTransport): RSocketRequester =
36+
connect(transport).awaitSingle()
37+
38+
/**
39+
* Coroutines variant of [RSocketRequester.Builder.connectTcp].
40+
*
41+
* @author Sebastien Deleuze
42+
* @since 5.2
43+
*/
44+
suspend fun RSocketRequester.Builder.connectTcpAndAwait(host: String, port: Int): RSocketRequester =
45+
connectTcp(host, port).awaitSingle()
46+
47+
/**
48+
* Coroutines variant of [RSocketRequester.Builder.connectWebSocket].
49+
*
50+
* @author Sebastien Deleuze
51+
* @since 5.2
52+
*/
53+
suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocketRequester =
54+
connectWebSocket(uri).awaitSingle()
55+
56+
57+
/**
58+
* Kotlin [Flow] variant of [RSocketRequester.RequestSpec.data].
59+
*
60+
* @author Sebastien Deleuze
61+
* @since 5.2
62+
*/
63+
@FlowPreview
64+
fun <T : Any> RSocketRequester.RequestSpec.dataFlow(data: Flow<T>): RSocketRequester.ResponseSpec =
65+
data(data.asPublisher(), object : ParameterizedTypeReference<T>() {})
66+
67+
/**
68+
* Coroutines variant of [RSocketRequester.ResponseSpec.send].
69+
*
70+
* @author Sebastien Deleuze
71+
* @since 5.2
72+
*/
73+
suspend fun RSocketRequester.ResponseSpec.sendAndAwait() {
74+
send().awaitFirstOrNull()
75+
}
76+
77+
/**
78+
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveMono].
79+
*
80+
* @author Sebastien Deleuze
81+
* @since 5.2
82+
*/
83+
suspend fun <T : Any> RSocketRequester.ResponseSpec.retrieveAndAwait(): T =
84+
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitSingle()
85+
86+
/**
87+
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveFlux].
88+
*
89+
* @author Sebastien Deleuze
90+
* @since 5.2
91+
*/
92+
@FlowPreview
93+
fun <T : Any> RSocketRequester.ResponseSpec.retrieveFlow(batchSize: Int = 1): Flow<T> =
94+
retrieveFlux(object : ParameterizedTypeReference<T>() {}).asFlow(batchSize)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package org.springframework.messaging.rsocket
2+
3+
import io.mockk.every
4+
import io.mockk.mockk
5+
import kotlinx.coroutines.FlowPreview
6+
import kotlinx.coroutines.flow.Flow
7+
import kotlinx.coroutines.flow.toList
8+
import kotlinx.coroutines.runBlocking
9+
import org.junit.Assert.assertEquals
10+
import org.junit.Test
11+
import org.mockito.ArgumentMatchers.anyInt
12+
import org.reactivestreams.Publisher
13+
import org.springframework.core.ParameterizedTypeReference
14+
import reactor.core.publisher.Flux
15+
import reactor.core.publisher.Mono
16+
17+
/**
18+
* Mock object based tests for [RSocketRequester] Kotlin extensions
19+
*
20+
* @author Sebastien Deleuze
21+
*/
22+
@FlowPreview
23+
class RSocketRequesterExtensionsTests {
24+
25+
@Test
26+
fun connectAndAwait() {
27+
val requester = mockk<RSocketRequester>()
28+
val builder = mockk<RSocketRequester.Builder>()
29+
every { builder.connect(any()) } returns Mono.just(requester)
30+
runBlocking {
31+
assertEquals(requester, builder.connectAndAwait(mockk()))
32+
}
33+
}
34+
35+
@Test
36+
fun connectTcpAndAwait() {
37+
val host = "127.0.0.1"
38+
val requester = mockk<RSocketRequester>()
39+
val builder = mockk<RSocketRequester.Builder>()
40+
every { builder.connectTcp(host, anyInt()) } returns Mono.just(requester)
41+
runBlocking {
42+
assertEquals(requester, builder.connectTcpAndAwait(host, 0))
43+
}
44+
}
45+
46+
@Test
47+
fun connectWebSocketAndAwait() {
48+
val requester = mockk<RSocketRequester>()
49+
val builder = mockk<RSocketRequester.Builder>()
50+
every { builder.connectWebSocket(any()) } returns Mono.just(requester)
51+
runBlocking {
52+
assertEquals(requester, builder.connectWebSocketAndAwait(mockk()))
53+
}
54+
}
55+
56+
@Test
57+
fun dataFlow() {
58+
val requestSpec = mockk<RSocketRequester.RequestSpec>()
59+
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
60+
every { requestSpec.data(any<Publisher<String>>(), any<ParameterizedTypeReference<String>>()) } returns responseSpec
61+
assertEquals(responseSpec, requestSpec.dataFlow(mockk<Flow<String>>()))
62+
}
63+
64+
@Test
65+
fun sendAndAwait() {
66+
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
67+
every { responseSpec.send() } returns Mono.empty()
68+
runBlocking {
69+
responseSpec.sendAndAwait()
70+
}
71+
}
72+
73+
@Test
74+
fun retrieveAndAwait() {
75+
val response = "foo"
76+
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
77+
every { responseSpec.retrieveMono(any<ParameterizedTypeReference<String>>()) } returns Mono.just("foo")
78+
runBlocking {
79+
assertEquals(response, responseSpec.retrieveAndAwait())
80+
}
81+
}
82+
83+
@Test
84+
fun retrieveFlow() {
85+
val responseSpec = mockk<RSocketRequester.ResponseSpec>()
86+
every { responseSpec.retrieveFlux(any<ParameterizedTypeReference<String>>()) } returns Flux.just("foo", "bar")
87+
runBlocking {
88+
assertEquals(listOf("foo", "bar"), responseSpec.retrieveFlow<String>().toList())
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)