Skip to content

Commit 94a4fe7

Browse files
christophstroblmp911de
authored andcommitted
Update reactive SimpleReactiveMongoRepository.saveAll flow.
Closes #4838 Original pull request: #4843
1 parent 315b910 commit 94a4fe7

File tree

2 files changed

+100
-26
lines changed

2 files changed

+100
-26
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

+72-25
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import reactor.core.publisher.Mono;
2222

2323
import java.io.Serializable;
24+
import java.util.ArrayList;
2425
import java.util.Collection;
2526
import java.util.Collections;
2627
import java.util.List;
@@ -47,7 +48,6 @@
4748
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
4849
import org.springframework.data.repository.query.FluentQuery;
4950
import org.springframework.data.util.StreamUtils;
50-
import org.springframework.data.util.Streamable;
5151
import org.springframework.lang.Nullable;
5252
import org.springframework.util.Assert;
5353

@@ -110,21 +110,17 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
110110

111111
Assert.notNull(entities, "The given Iterable of entities must not be null");
112112

113-
Streamable<S> source = Streamable.of(entities);
114-
113+
List<S> source = toList(entities);
115114
return source.stream().allMatch(entityInformation::isNew) ? //
116-
insert(entities) :
117-
Flux.fromIterable(entities).concatMap(this::save);
115+
insert(source) : concatMapSequentially(source, this::save);
118116
}
119117

120118
@Override
121-
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
119+
public <S extends T> Flux<S> saveAll(Publisher<S> publisher) {
122120

123-
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
121+
Assert.notNull(publisher, "The given Publisher of entities must not be null");
124122

125-
return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? //
126-
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
127-
mongoOperations.save(entity, entityInformation.getCollectionName()));
123+
return concatMapSequentially(publisher, this::save);
128124
}
129125

130126
@Override
@@ -278,14 +274,10 @@ public Mono<Void> deleteAll(Iterable<? extends T> entities) {
278274

279275
Assert.notNull(entities, "The given Iterable of entities must not be null");
280276

281-
Collection<?> idCollection = StreamUtils.createStreamFromIterator(entities.iterator()).map(entityInformation::getId)
282-
.collect(Collectors.toList());
277+
Collection<? extends ID> ids = StreamUtils.createStreamFromIterator(entities.iterator())
278+
.map(entityInformation::getId).collect(Collectors.toList());
283279

284-
Criteria idsInCriteria = where(entityInformation.getIdAttribute()).in(idCollection);
285-
286-
Query query = new Query(idsInCriteria);
287-
getReadPreference().ifPresent(query::withReadPreference);
288-
return mongoOperations.remove(query, entityInformation.getJavaType(), entityInformation.getCollectionName()).then();
280+
return deleteAllById(ids);
289281
}
290282

291283
@Override
@@ -336,8 +328,11 @@ public <S extends T> Flux<S> insert(Iterable<S> entities) {
336328

337329
Assert.notNull(entities, "The given Iterable of entities must not be null");
338330

339-
Collection<S> source = toCollection(entities);
340-
return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName());
331+
return insert(toCollection(entities));
332+
}
333+
334+
private <S extends T> Flux<S> insert(Collection<S> entities) {
335+
return entities.isEmpty() ? Flux.empty() : mongoOperations.insert(entities, entityInformation.getCollectionName());
341336
}
342337

343338
@Override
@@ -440,6 +435,12 @@ void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
440435
this.crudMethodMetadata = crudMethodMetadata;
441436
}
442437

438+
private Flux<T> findAll(Query query) {
439+
440+
getReadPreference().ifPresent(query::withReadPreference);
441+
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
442+
}
443+
443444
private Optional<ReadPreference> getReadPreference() {
444445

445446
if (crudMethodMetadata == null) {
@@ -461,15 +462,61 @@ private Query getIdQuery(Iterable<? extends ID> ids) {
461462
return new Query(where(entityInformation.getIdAttribute()).in(toCollection(ids)));
462463
}
463464

464-
private static <E> Collection<E> toCollection(Iterable<E> ids) {
465-
return ids instanceof Collection<E> collection ? collection
466-
: StreamUtils.createStreamFromIterator(ids.iterator()).collect(Collectors.toList());
465+
/**
466+
* Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single
467+
* Flux. The operation does not allow interleave between performing the map operation for the first and second source
468+
* element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be
469+
* subscribed to eagerly emitting elements in order of their source.
470+
*
471+
* <pre class="code">
472+
* Flux.just(first-element).flatMap(...)
473+
* .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
474+
* </pre>
475+
*
476+
* @param source the collection of elements to transform.
477+
* @param mapper the transformation {@link Function}. Must not be {@literal null}.
478+
* @return never {@literal null}.
479+
* @param <T> source type
480+
*/
481+
static <T> Flux<T> concatMapSequentially(List<T> source,
482+
Function<? super T, ? extends Publisher<? extends T>> mapper) {
483+
484+
if (source.isEmpty()) {
485+
return Flux.empty();
486+
}
487+
if (source.size() == 1) {
488+
return Flux.just(source.iterator().next()).flatMap(mapper);
489+
}
490+
if (source.size() == 2) {
491+
return Flux.fromIterable(source).concatMap(mapper);
492+
}
493+
494+
Flux<T> first = Flux.just(source.get(0)).flatMap(mapper);
495+
Flux<T> theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
496+
return first.concatWith(theRest);
467497
}
468498

469-
private Flux<T> findAll(Query query) {
499+
static <T> Flux<T> concatMapSequentially(Publisher<T> publisher,
500+
Function<? super T, ? extends Publisher<? extends T>> mapper) {
470501

471-
getReadPreference().ifPresent(query::withReadPreference);
472-
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
502+
return Flux.from(publisher).switchOnFirst(((signal, source) -> {
503+
504+
if (!signal.hasValue()) {
505+
return source.concatMap(mapper);
506+
}
507+
508+
Mono<T> firstCall = Mono.from(mapper.apply(signal.get()));
509+
return firstCall.concatWith(source.skip(1).flatMapSequential(mapper));
510+
}));
511+
}
512+
513+
private static <E> List<E> toList(Iterable<E> source) {
514+
return source instanceof List<E> list ? list : new ArrayList<>(toCollection(source));
515+
}
516+
517+
private static <E> Collection<E> toCollection(Iterable<E> source) {
518+
return source instanceof Collection<E> collection ? collection
519+
: StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList());
473520
}
474521

475522
/**

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424

2525
import java.util.Arrays;
2626
import java.util.Objects;
27+
import java.util.stream.Stream;
2728

2829
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.RepeatedTest;
2931
import org.junit.jupiter.api.Test;
3032
import org.junit.jupiter.api.extension.ExtendWith;
3133
import org.springframework.beans.BeansException;
@@ -40,6 +42,7 @@
4042
import org.springframework.data.domain.Sort;
4143
import org.springframework.data.domain.Sort.Direction;
4244
import org.springframework.data.domain.Sort.Order;
45+
import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
4346
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
4447
import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory;
4548
import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository;
@@ -48,6 +51,8 @@
4851
import org.springframework.lang.Nullable;
4952
import org.springframework.test.context.ContextConfiguration;
5053
import org.springframework.test.context.junit.jupiter.SpringExtension;
54+
import org.springframework.transaction.TransactionDefinition;
55+
import org.springframework.transaction.reactive.TransactionalOperator;
5156
import org.springframework.util.ClassUtils;
5257

5358
/**
@@ -333,6 +338,28 @@ void savePublisherOfEntitiesShouldInsertEntity() {
333338
assertThat(boyd.getId()).isNotNull();
334339
}
335340

341+
@RepeatedTest(10) // GH-4838
342+
void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {
343+
344+
ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
345+
TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
346+
return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia));
347+
}).as(StepVerifier::create) //
348+
.expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia).verifyComplete();
349+
}
350+
351+
@RepeatedTest(10) // GH-4838
352+
void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() {
353+
354+
ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
355+
Flux<ReactivePerson> personFlux = Flux.fromStream(Stream.of(oliver, dave, carter, boyd, stefan, leroi, alicia));
356+
357+
TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
358+
return repository.saveAll(personFlux);
359+
}).as(StepVerifier::create) //
360+
.expectNextCount(7).verifyComplete();
361+
}
362+
336363
@Test // GH-3609
337364
void savePublisherOfImmutableEntitiesShouldInsertEntity() {
338365

@@ -342,7 +369,7 @@ void savePublisherOfImmutableEntitiesShouldInsertEntity() {
342369
.consumeNextWith(actual -> {
343370
assertThat(actual.id).isNotNull();
344371
}) //
345-
.verifyComplete();
372+
.verifyComplete();
346373
}
347374

348375
@Test // DATAMONGO-1444

0 commit comments

Comments
 (0)