Skip to content

Commit a1613d1

Browse files
committed
Merge branch '6.1.x'
2 parents 17f319b + 2ab0101 commit a1613d1

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

spring-webflux/src/main/kotlin/org/springframework/web/reactive/function/client/WebClientExtensions.kt

+10-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.web.reactive.function.client
1818

19+
import kotlinx.coroutines.Job
20+
import kotlinx.coroutines.currentCoroutineContext
1921
import kotlinx.coroutines.Dispatchers
2022
import kotlinx.coroutines.flow.Flow
2123
import kotlinx.coroutines.reactive.asFlow
@@ -99,16 +101,20 @@ suspend fun RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(): Clien
99101
* @author Sebastien Deleuze
100102
* @since 5.3
101103
*/
102-
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T =
103-
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitSingle()
104+
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchange(responseHandler: suspend (ClientResponse) -> T): T {
105+
val context = currentCoroutineContext().minusKey(Job.Key)
106+
return exchangeToMono { mono(context) { responseHandler.invoke(it) } }.awaitSingle()
107+
}
104108

105109
/**
106110
* Variant of [WebClient.RequestHeadersSpec.awaitExchange] that allows a nullable return
107111
*
108112
* @since 5.3.8
109113
*/
110-
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchangeOrNull(responseHandler: suspend (ClientResponse) -> T?): T? =
111-
exchangeToMono { mono(Dispatchers.Unconfined) { responseHandler.invoke(it) } }.awaitSingleOrNull()
114+
suspend fun <T: Any> RequestHeadersSpec<out RequestHeadersSpec<*>>.awaitExchangeOrNull(responseHandler: suspend (ClientResponse) -> T?): T? {
115+
val context = currentCoroutineContext().minusKey(Job.Key)
116+
return exchangeToMono { mono(context) { responseHandler.invoke(it) } }.awaitSingleOrNull()
117+
}
112118

113119
/**
114120
* Coroutines variant of [WebClient.RequestHeadersSpec.exchangeToFlux].

spring-webflux/src/test/kotlin/org/springframework/web/reactive/function/client/WebClientExtensionsTests.kt

+32
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package org.springframework.web.reactive.function.client
1818

1919
import io.mockk.every
2020
import io.mockk.mockk
21+
import io.mockk.slot
2122
import io.mockk.verify
23+
import kotlinx.coroutines.currentCoroutineContext
2224
import kotlinx.coroutines.flow.Flow
2325
import kotlinx.coroutines.flow.flow
2426
import kotlinx.coroutines.flow.toList
@@ -32,6 +34,8 @@ import reactor.core.publisher.Flux
3234
import reactor.core.publisher.Mono
3335
import java.util.concurrent.CompletableFuture
3436
import java.util.function.Function
37+
import kotlin.coroutines.AbstractCoroutineContextElement
38+
import kotlin.coroutines.CoroutineContext
3539

3640
/**
3741
* Mock object based tests for [WebClient] Kotlin extensions
@@ -110,6 +114,18 @@ class WebClientExtensionsTests {
110114
}
111115
}
112116

117+
@Test
118+
fun `awaitExchange with coroutines context`() {
119+
val foo = mockk<Foo>()
120+
val slot = slot<Function<ClientResponse, Mono<Foo>>>()
121+
every { requestBodySpec.exchangeToMono(capture(slot)) } answers {
122+
slot.captured.apply(mockk<ClientResponse>())
123+
}
124+
runBlocking(FooContextElement(foo)) {
125+
assertThat(requestBodySpec.awaitExchange { currentCoroutineContext()[FooContextElement]!!.foo }).isEqualTo(foo)
126+
}
127+
}
128+
113129
@Test
114130
fun `awaitExchangeOrNull returning null`() {
115131
val foo = mockk<Foo>()
@@ -128,6 +144,18 @@ class WebClientExtensionsTests {
128144
}
129145
}
130146

147+
@Test
148+
fun `awaitExchangeOrNull with coroutines context`() {
149+
val foo = mockk<Foo>()
150+
val slot = slot<Function<ClientResponse, Mono<Foo>>>()
151+
every { requestBodySpec.exchangeToMono(capture(slot)) } answers {
152+
slot.captured.apply(mockk<ClientResponse>())
153+
}
154+
runBlocking(FooContextElement(foo)) {
155+
assertThat(requestBodySpec.awaitExchangeOrNull { currentCoroutineContext()[FooContextElement]!!.foo }).isEqualTo(foo)
156+
}
157+
}
158+
131159
@Test
132160
fun exchangeToFlow() {
133161
val foo = mockk<Foo>()
@@ -209,4 +237,8 @@ class WebClientExtensionsTests {
209237
}
210238

211239
class Foo
240+
241+
private data class FooContextElement(val foo: Foo) : AbstractCoroutineContextElement(FooContextElement) {
242+
companion object Key : CoroutineContext.Key<FooContextElement>
243+
}
212244
}

0 commit comments

Comments
 (0)