Skip to content

Commit 8029983

Browse files
committed
DATACMNS-1763 - Expose ReactiveWrapperConverters.doOnSuccess and doOnError callback hooks.
We now provide callback hook operators that allow attaching of a Runnable and Consumer<Throwable> to a reactive wrapper type for participating in terminal signals.
1 parent e6beb5e commit 8029983

File tree

2 files changed

+305
-2
lines changed

2 files changed

+305
-2
lines changed

src/main/java/org/springframework/data/repository/util/ReactiveWrapperConverters.java

+259-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.Flowable;
1919
import io.reactivex.Maybe;
2020
import kotlinx.coroutines.flow.Flow;
21+
import kotlinx.coroutines.flow.FlowKt;
2122
import kotlinx.coroutines.reactive.ReactiveFlowKt;
2223
import reactor.core.publisher.Flux;
2324
import reactor.core.publisher.Mono;
@@ -26,6 +27,7 @@
2627

2728
import java.util.ArrayList;
2829
import java.util.List;
30+
import java.util.function.Consumer;
2931
import java.util.function.Function;
3032

3133
import javax.annotation.Nonnull;
@@ -97,6 +99,10 @@ public abstract class ReactiveWrapperConverters {
9799
REACTIVE_WRAPPERS.add(PublisherWrapper.INSTANCE);
98100
}
99101

102+
if (ReactiveWrappers.isAvailable(ReactiveLibrary.KOTLIN_COROUTINES)) {
103+
REACTIVE_WRAPPERS.add(FlowWrapper.INSTANCE);
104+
}
105+
100106
registerConvertersIn(GENERIC_CONVERSION_SERVICE);
101107
}
102108

@@ -184,6 +190,50 @@ public static <T> T map(Object reactiveObject, Function<Object, Object> converte
184190
.orElseThrow(() -> new IllegalStateException(String.format("Cannot apply converter to %s", reactiveObject)));
185191
}
186192

193+
/**
194+
* Apply a {@link Runnable} when the reactive type emits a completion signal.
195+
*
196+
* @param reactiveObject must not be {@literal null}.
197+
* @param onSuccess must not be {@literal null}.
198+
* @return
199+
* @since 2.4
200+
*/
201+
@SuppressWarnings("unchecked")
202+
public static <T> T doOnSuccess(Object reactiveObject, Runnable onSuccess) {
203+
204+
Assert.notNull(reactiveObject, "Reactive source object must not be null!");
205+
Assert.notNull(onSuccess, "onSuccess callback must not be null!");
206+
207+
return REACTIVE_WRAPPERS.stream()//
208+
.filter(it -> ClassUtils.isAssignable(it.getWrapperClass(), reactiveObject.getClass()))//
209+
.findFirst()//
210+
.map(it -> (T) it.doOnSuccess(reactiveObject, onSuccess))//
211+
.orElseThrow(
212+
() -> new IllegalStateException(String.format("Cannot apply onSuccess callback to %s", reactiveObject)));
213+
}
214+
215+
/**
216+
* Apply a {@link Consumer} when the reactive type emits an error signal.
217+
*
218+
* @param reactiveObject must not be {@literal null}.
219+
* @param onError must not be {@literal null}.
220+
* @return
221+
* @since 2.4
222+
*/
223+
@SuppressWarnings("unchecked")
224+
public static <T> T doOnError(Object reactiveObject, Consumer<? super Throwable> onError) {
225+
226+
Assert.notNull(reactiveObject, "Reactive source object must not be null!");
227+
Assert.notNull(onError, "onError callback must not be null!");
228+
229+
return REACTIVE_WRAPPERS.stream()//
230+
.filter(it -> ClassUtils.isAssignable(it.getWrapperClass(), reactiveObject.getClass()))//
231+
.findFirst()//
232+
.map(it -> (T) it.doOnError(reactiveObject, onError))//
233+
.orElseThrow(
234+
() -> new IllegalStateException(String.format("Cannot apply onError callback to %s", reactiveObject)));
235+
}
236+
187237
/**
188238
* Return {@literal true} if objects of {@code sourceType} can be converted to the {@code targetType}.
189239
*
@@ -209,7 +259,7 @@ public static boolean canConvert(Class<?> sourceType, Class<?> targetType) {
209259
* @author Mark Paluch
210260
* @author Christoph Strobl
211261
*/
212-
private static interface ReactiveTypeWrapper<T> {
262+
private interface ReactiveTypeWrapper<T> {
213263

214264
/**
215265
* @return the wrapper class.
@@ -224,6 +274,26 @@ private static interface ReactiveTypeWrapper<T> {
224274
* @return the reactive type applying conversion.
225275
*/
226276
Object map(Object wrapper, Function<Object, Object> function);
277+
278+
/**
279+
* Apply a {@link Runnable} when the reactive type emits a completion signal.
280+
*
281+
* @param wrapper the reactive type, must not be {@literal null}.
282+
* @param onSuccess the signal callback, must not be {@literal null}.
283+
* @return the reactive type with {@code onSuccess} attached.
284+
* @since 2.4
285+
*/
286+
Object doOnSuccess(Object wrapper, Runnable onSuccess);
287+
288+
/**
289+
* Apply a {@link Consumer} when the reactive type emits an error signal.
290+
*
291+
* @param wrapper the reactive type, must not be {@literal null}.
292+
* @param onError the error consumer, must not be {@literal null}.
293+
* @return the reactive type with {@code onError} attached.
294+
* @since 2.4
295+
*/
296+
Object doOnError(Object wrapper, Consumer<? super Throwable> onError);
227297
}
228298

229299
/**
@@ -242,6 +312,16 @@ public Class<? super Mono<?>> getWrapperClass() {
242312
public Mono<?> map(Object wrapper, Function<Object, Object> function) {
243313
return ((Mono<?>) wrapper).map(function::apply);
244314
}
315+
316+
@Override
317+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
318+
return ((Mono<?>) wrapper).doOnSuccess(o -> onSuccess.run());
319+
}
320+
321+
@Override
322+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
323+
return ((Mono<?>) wrapper).doOnError(onError);
324+
}
245325
}
246326

247327
/**
@@ -257,7 +337,56 @@ public Class<? super Flux<?>> getWrapperClass() {
257337
}
258338

259339
public Flux<?> map(Object wrapper, Function<Object, Object> function) {
260-
return ((Flux<?>) wrapper).map(function::apply);
340+
return ((Flux<?>) wrapper).map(function);
341+
}
342+
343+
@Override
344+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
345+
return ((Flux<?>) wrapper).doOnComplete(onSuccess);
346+
}
347+
348+
@Override
349+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
350+
return ((Flux<?>) wrapper).doOnError(onError);
351+
}
352+
}
353+
354+
/**
355+
* Wrapper for Kotlin's {@link Flow}.
356+
*
357+
* @since 2.4
358+
*/
359+
private enum FlowWrapper implements ReactiveTypeWrapper<Flow<?>> {
360+
361+
INSTANCE;
362+
363+
@Override
364+
public Class<? super Flow<?>> getWrapperClass() {
365+
return Flow.class;
366+
}
367+
368+
public Flow<?> map(Object wrapper, Function<Object, Object> function) {
369+
return FlowKt.map((Flow<?>) wrapper, (o, continuation) -> function.apply(o));
370+
}
371+
372+
@Override
373+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
374+
return FlowKt.onCompletion((Flow<?>) wrapper, (collector, ex, continuation) -> {
375+
if (ex == null) {
376+
onSuccess.run();
377+
}
378+
return collector;
379+
});
380+
}
381+
382+
@Override
383+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
384+
return FlowKt.onCompletion((Flow<?>) wrapper, (collector, ex, continuation) -> {
385+
if (ex != null) {
386+
onError.accept(ex);
387+
}
388+
return collector;
389+
});
261390
}
262391
}
263392

@@ -286,6 +415,34 @@ public Publisher<?> map(Object wrapper, Function<Object, Object> function) {
286415

287416
return FluxWrapper.INSTANCE.map(Flux.from((Publisher<?>) wrapper), function);
288417
}
418+
419+
@Override
420+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
421+
422+
if (wrapper instanceof Mono) {
423+
return MonoWrapper.INSTANCE.doOnSuccess(wrapper, onSuccess);
424+
}
425+
426+
if (wrapper instanceof Flux) {
427+
return FluxWrapper.INSTANCE.doOnSuccess(wrapper, onSuccess);
428+
}
429+
430+
return FluxWrapper.INSTANCE.doOnSuccess(Flux.from((Publisher<?>) wrapper), onSuccess);
431+
}
432+
433+
@Override
434+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
435+
436+
if (wrapper instanceof Mono) {
437+
return MonoWrapper.INSTANCE.doOnError(wrapper, onError);
438+
}
439+
440+
if (wrapper instanceof Flux) {
441+
return FluxWrapper.INSTANCE.doOnError(wrapper, onError);
442+
}
443+
444+
return FluxWrapper.INSTANCE.doOnError(Flux.from((Publisher<?>) wrapper), onError);
445+
}
289446
}
290447

291448
// -------------------------------------------------------------------------
@@ -308,6 +465,16 @@ public Class<? super Single<?>> getWrapperClass() {
308465
public Single<?> map(Object wrapper, Function<Object, Object> function) {
309466
return ((Single<?>) wrapper).map(function::apply);
310467
}
468+
469+
@Override
470+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
471+
return ((Single<?>) wrapper).doOnSuccess(o -> onSuccess.run());
472+
}
473+
474+
@Override
475+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
476+
return ((Single<?>) wrapper).doOnError(onError::accept);
477+
}
311478
}
312479

313480
/**
@@ -326,6 +493,16 @@ public Class<? super Observable<?>> getWrapperClass() {
326493
public Observable<?> map(Object wrapper, Function<Object, Object> function) {
327494
return ((Observable<?>) wrapper).map(function::apply);
328495
}
496+
497+
@Override
498+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
499+
return ((Observable<?>) wrapper).doOnCompleted(onSuccess::run);
500+
}
501+
502+
@Override
503+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
504+
return ((Observable<?>) wrapper).doOnError(onError::accept);
505+
}
329506
}
330507

331508
// -------------------------------------------------------------------------
@@ -348,6 +525,16 @@ public Class<? super io.reactivex.Single<?>> getWrapperClass() {
348525
public io.reactivex.Single<?> map(Object wrapper, Function<Object, Object> function) {
349526
return ((io.reactivex.Single<?>) wrapper).map(function::apply);
350527
}
528+
529+
@Override
530+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
531+
return ((io.reactivex.Single<?>) wrapper).doOnSuccess(o -> onSuccess.run());
532+
}
533+
534+
@Override
535+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
536+
return ((io.reactivex.Single<?>) wrapper).doOnError(onError::accept);
537+
}
351538
}
352539

353540
/**
@@ -366,6 +553,16 @@ public Class<? super io.reactivex.Maybe<?>> getWrapperClass() {
366553
public io.reactivex.Maybe<?> map(Object wrapper, Function<Object, Object> function) {
367554
return ((io.reactivex.Maybe<?>) wrapper).map(function::apply);
368555
}
556+
557+
@Override
558+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
559+
return ((io.reactivex.Maybe<?>) wrapper).doOnSuccess(o -> onSuccess.run());
560+
}
561+
562+
@Override
563+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
564+
return ((io.reactivex.Maybe<?>) wrapper).doOnError(onError::accept);
565+
}
369566
}
370567

371568
/**
@@ -384,6 +581,16 @@ public Class<? super io.reactivex.Observable<?>> getWrapperClass() {
384581
public io.reactivex.Observable<?> map(Object wrapper, Function<Object, Object> function) {
385582
return ((io.reactivex.Observable<?>) wrapper).map(function::apply);
386583
}
584+
585+
@Override
586+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
587+
return ((io.reactivex.Observable<?>) wrapper).doOnComplete(onSuccess::run);
588+
}
589+
590+
@Override
591+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
592+
return ((io.reactivex.Observable<?>) wrapper).doOnError(onError::accept);
593+
}
387594
}
388595

389596
/**
@@ -402,6 +609,16 @@ public Class<? super Flowable<?>> getWrapperClass() {
402609
public io.reactivex.Flowable<?> map(Object wrapper, Function<Object, Object> function) {
403610
return ((io.reactivex.Flowable<?>) wrapper).map(function::apply);
404611
}
612+
613+
@Override
614+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
615+
return ((io.reactivex.Flowable<?>) wrapper).doOnComplete(onSuccess::run);
616+
}
617+
618+
@Override
619+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
620+
return ((io.reactivex.Flowable<?>) wrapper).doOnError(onError::accept);
621+
}
405622
}
406623

407624
// -------------------------------------------------------------------------
@@ -424,6 +641,16 @@ public Class<? super io.reactivex.rxjava3.core.Single<?>> getWrapperClass() {
424641
public io.reactivex.rxjava3.core.Single<?> map(Object wrapper, Function<Object, Object> function) {
425642
return ((io.reactivex.rxjava3.core.Single<?>) wrapper).map(function::apply);
426643
}
644+
645+
@Override
646+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
647+
return ((io.reactivex.rxjava3.core.Single<?>) wrapper).doOnSuccess(o -> onSuccess.run());
648+
}
649+
650+
@Override
651+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
652+
return ((io.reactivex.rxjava3.core.Single<?>) wrapper).doOnError(onError::accept);
653+
}
427654
}
428655

429656
/**
@@ -442,6 +669,16 @@ public Class<? super io.reactivex.rxjava3.core.Maybe<?>> getWrapperClass() {
442669
public io.reactivex.rxjava3.core.Maybe<?> map(Object wrapper, Function<Object, Object> function) {
443670
return ((io.reactivex.rxjava3.core.Maybe<?>) wrapper).map(function::apply);
444671
}
672+
673+
@Override
674+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
675+
return ((io.reactivex.rxjava3.core.Maybe<?>) wrapper).doOnSuccess(o -> onSuccess.run());
676+
}
677+
678+
@Override
679+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
680+
return ((io.reactivex.rxjava3.core.Maybe<?>) wrapper).doOnError(onError::accept);
681+
}
445682
}
446683

447684
/**
@@ -460,6 +697,16 @@ public Class<? super io.reactivex.rxjava3.core.Observable<?>> getWrapperClass()
460697
public io.reactivex.rxjava3.core.Observable<?> map(Object wrapper, Function<Object, Object> function) {
461698
return ((io.reactivex.rxjava3.core.Observable<?>) wrapper).map(function::apply);
462699
}
700+
701+
@Override
702+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
703+
return ((io.reactivex.rxjava3.core.Observable<?>) wrapper).doOnComplete(onSuccess::run);
704+
}
705+
706+
@Override
707+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
708+
return ((io.reactivex.rxjava3.core.Observable<?>) wrapper).doOnError(onError::accept);
709+
}
463710
}
464711

465712
/**
@@ -478,6 +725,16 @@ public Class<? super io.reactivex.rxjava3.core.Flowable<?>> getWrapperClass() {
478725
public io.reactivex.rxjava3.core.Flowable<?> map(Object wrapper, Function<Object, Object> function) {
479726
return ((io.reactivex.rxjava3.core.Flowable<?>) wrapper).map(function::apply);
480727
}
728+
729+
@Override
730+
public Object doOnSuccess(Object wrapper, Runnable onSuccess) {
731+
return ((io.reactivex.rxjava3.core.Flowable<?>) wrapper).doOnComplete(onSuccess::run);
732+
}
733+
734+
@Override
735+
public Object doOnError(Object wrapper, Consumer<? super Throwable> onError) {
736+
return ((io.reactivex.rxjava3.core.Flowable<?>) wrapper).doOnError(onError::accept);
737+
}
481738
}
482739

483740
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)