@@ -23,18 +23,15 @@ import kotlinx.coroutines.FlowPreview
23
23
import kotlinx.coroutines.GlobalScope
24
24
import kotlinx.coroutines.async
25
25
import kotlinx.coroutines.flow.Flow
26
- import kotlinx.coroutines.flow.collect
27
- import kotlinx.coroutines.flow.flow
28
26
import kotlinx.coroutines.reactive.awaitFirstOrNull
27
+ import kotlinx.coroutines.reactive.flow.asPublisher
29
28
30
29
import kotlinx.coroutines.reactor.mono
31
30
import reactor.core.publisher.Mono
32
31
import reactor.core.publisher.onErrorMap
33
32
import java.lang.reflect.InvocationTargetException
34
33
import java.lang.reflect.Method
35
34
import kotlin.reflect.full.callSuspend
36
- import kotlin.reflect.full.isSubtypeOf
37
- import kotlin.reflect.full.starProjectedType
38
35
import kotlin.reflect.jvm.kotlinFunction
39
36
40
37
/* *
@@ -56,7 +53,8 @@ internal fun <T: Any> monoToDeferred(source: Mono<T>) =
56
53
GlobalScope .async(Dispatchers .Unconfined ) { source.awaitFirstOrNull() }
57
54
58
55
/* *
59
- * Invoke an handler method converting suspending method to [Mono] or [Flow] if necessary.
56
+ * Invoke an handler method converting suspending method to [Mono] or
57
+ * [reactor.core.publisher.Flux] if necessary.
60
58
*
61
59
* @author Sebastien Deleuze
62
60
* @since 5.2
@@ -66,18 +64,15 @@ internal fun <T: Any> monoToDeferred(source: Mono<T>) =
66
64
internal fun invokeHandlerMethod (method : Method , bean : Any , vararg args : Any? ): Any? {
67
65
val function = method.kotlinFunction!!
68
66
return if (function.isSuspend) {
69
- if (function.returnType.isSubtypeOf( Flow :: class .starProjectedType) ) {
70
- flow {
71
- (function.callSuspend(bean, * args.sliceArray( 0 .. (args.size - 2 ))) as Flow < * >).collect {
72
- emit(it)
73
- }
74
- }
67
+ val mono = GlobalScope .mono( Dispatchers . Unconfined ) {
68
+ function.callSuspend(bean, * args.sliceArray( 0 .. (args.size - 2 )))
69
+ . let { if (it == Unit ) null else it }
70
+ }.onErrorMap( InvocationTargetException :: class ) { it.targetException }
71
+ if (function.returnType.classifier == Flow :: class ) {
72
+ mono.flatMapMany { (it as Flow < Any >).asPublisher() }
75
73
}
76
74
else {
77
- GlobalScope .mono(Dispatchers .Unconfined ) {
78
- function.callSuspend(bean, * args.sliceArray(0 .. (args.size- 2 )))
79
- .let { if (it == Unit ) null else it}
80
- }.onErrorMap(InvocationTargetException ::class ) { it.targetException }
75
+ mono
81
76
}
82
77
}
83
78
else {
0 commit comments