|
22 | 22 |
|
23 | 23 | import kotlin.Unit;
|
24 | 24 | import kotlin.jvm.JvmClassMappingKt;
|
| 25 | +import kotlin.reflect.KClass; |
25 | 26 | import kotlin.reflect.KClassifier;
|
26 | 27 | import kotlin.reflect.KFunction;
|
27 | 28 | import kotlin.reflect.full.KCallables;
|
@@ -75,13 +76,23 @@ public static Publisher<?> invokeSuspendingFunction(Method method, Object target
|
75 | 76 | if (method.isAccessible() && !KCallablesJvm.isAccessible(function)) {
|
76 | 77 | KCallablesJvm.setAccessible(function, true);
|
77 | 78 | }
|
78 |
| - KClassifier classifier = function.getReturnType().getClassifier(); |
79 | 79 | Mono<Object> mono = MonoKt.mono(Dispatchers.getUnconfined(), (scope, continuation) ->
|
80 | 80 | KCallables.callSuspend(function, getSuspendedFunctionArgs(target, args), continuation))
|
81 | 81 | .filter(result -> !Objects.equals(result, Unit.INSTANCE))
|
82 | 82 | .onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);
|
83 |
| - if (classifier != null && classifier.equals(JvmClassMappingKt.getKotlinClass(Flow.class))) { |
84 |
| - return mono.flatMapMany(CoroutinesUtils::asFlux); |
| 83 | + |
| 84 | + KClassifier returnType = function.getReturnType().getClassifier(); |
| 85 | + if (returnType != null) { |
| 86 | + if (returnType.equals(JvmClassMappingKt.getKotlinClass(Flow.class))) { |
| 87 | + return mono.flatMapMany(CoroutinesUtils::asFlux); |
| 88 | + } |
| 89 | + else if (returnType.equals(JvmClassMappingKt.getKotlinClass(Mono.class))) { |
| 90 | + return mono.flatMap(o -> ((Mono<?>)o)); |
| 91 | + } |
| 92 | + else if (returnType instanceof KClass<?> kClass && |
| 93 | + Publisher.class.isAssignableFrom(JvmClassMappingKt.getJavaClass(kClass))) { |
| 94 | + return mono.flatMapMany(o -> ((Publisher<?>)o)); |
| 95 | + } |
85 | 96 | }
|
86 | 97 | return mono;
|
87 | 98 | }
|
|
0 commit comments