17
17
18
18
import static org .springframework .data .mongodb .core .query .Criteria .*;
19
19
20
+ import reactor .core .CoreSubscriber ;
20
21
import reactor .core .publisher .Flux ;
21
22
import reactor .core .publisher .Mono ;
22
23
@@ -113,7 +114,7 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
113
114
Streamable <S > source = Streamable .of (entities );
114
115
115
116
return source .stream ().allMatch (entityInformation ::isNew ) ? //
116
- insert (entities ) : doItSomewhatSequentially (source , this ::save );
117
+ insert (entities ) : new AeonFlux <> (source ). combatMap ( this ::save );
117
118
}
118
119
119
120
@ Override
@@ -126,20 +127,6 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
126
127
mongoOperations .save (entity , entityInformation .getCollectionName ()));
127
128
}
128
129
129
- static <T > Flux <T > doItSomewhatSequentially /* how should we actually call this? */ (Streamable <T > ts , Function <? super T , ? extends Publisher <? extends T >> mapper ) {
130
-
131
- List <T > list = ts .toList ();
132
- if (list .size () == 1 ) {
133
- return Flux .just (list .iterator ().next ()).flatMap (mapper );
134
- } else if (list .size () == 2 ) {
135
- return Flux .fromIterable (list ).concatMap (mapper );
136
- }
137
-
138
- Flux <T > first = Flux .just (list .get (0 )).flatMap (mapper );
139
- Flux <T > theRest = Flux .fromIterable (list .subList (1 , list .size ())).flatMapSequential (mapper );
140
- return first .concatWith (theRest );
141
- }
142
-
143
130
@ Override
144
131
public Mono <T > findById (ID id ) {
145
132
@@ -579,4 +566,46 @@ private ReactiveFindOperation.TerminatingFind<T> createQuery(UnaryOperator<Query
579
566
}
580
567
581
568
}
569
+
570
+ static class AeonFlux <T > extends Flux <T > {
571
+
572
+ private final Streamable <T > source ;
573
+ private final Flux <T > delegate ;
574
+
575
+ AeonFlux (Streamable <T > source ) {
576
+ this (source , Flux .fromIterable (source ));
577
+ }
578
+
579
+ private AeonFlux (Streamable <T > source , Flux <T > delegate ) {
580
+ this .source = source ;
581
+ this .delegate = delegate ;
582
+ }
583
+
584
+ @ Override
585
+ public void subscribe (CoreSubscriber <? super T > actual ) {
586
+ delegate .subscribe (actual );
587
+ }
588
+
589
+ Flux <T > combatMap (Function <? super T , ? extends Publisher <? extends T >> mapper ) {
590
+ return new AeonFlux <>(source , combatMapList (source .toList (), mapper ));
591
+ }
592
+
593
+ private static <T > Flux <T > combatMapList (List <T > list ,
594
+ Function <? super T , ? extends Publisher <? extends T >> mapper ) {
595
+
596
+ if (list .isEmpty ()) {
597
+ return Flux .empty ();
598
+ }
599
+ if (list .size () == 1 ) {
600
+ return Flux .just (list .iterator ().next ()).flatMap (mapper );
601
+ }
602
+ if (list .size () == 2 ) {
603
+ return Flux .fromIterable (list ).concatMap (mapper );
604
+ }
605
+
606
+ Flux <T > first = Flux .just (list .get (0 )).flatMap (mapper );
607
+ Flux <T > theRest = Flux .fromIterable (list .subList (1 , list .size ())).flatMapSequential (mapper );
608
+ return first .concatWith (theRest );
609
+ }
610
+ }
582
611
}
0 commit comments