Skip to content

Update reactive SimpleReactiveMongoRepository.saveAll flow #4843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.5.0-SNAPSHOT</version>
<version>4.5.x-GH-4838-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.5.0-SNAPSHOT</version>
<version>4.5.x-GH-4838-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.5.0-SNAPSHOT</version>
<version>4.5.x-GH-4838-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import reactor.core.publisher.Mono;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -47,7 +48,6 @@
import org.springframework.data.mongodb.repository.query.MongoEntityInformation;
import org.springframework.data.repository.query.FluentQuery;
import org.springframework.data.util.StreamUtils;
import org.springframework.data.util.Streamable;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -110,21 +110,17 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {

Assert.notNull(entities, "The given Iterable of entities must not be null");

Streamable<S> source = Streamable.of(entities);

List<S> source = toList(entities);
return source.stream().allMatch(entityInformation::isNew) ? //
insert(entities) :
Flux.fromIterable(entities).concatMap(this::save);
insert(source) : concatMapSequentially(source, this::save);
}

@Override
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
public <S extends T> Flux<S> saveAll(Publisher<S> publisher) {

Assert.notNull(entityStream, "The given Publisher of entities must not be null");
Assert.notNull(publisher, "The given Publisher of entities must not be null");

return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? //
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
mongoOperations.save(entity, entityInformation.getCollectionName()));
return concatMapSequentially(publisher, this::save);
}

@Override
Expand Down Expand Up @@ -278,14 +274,10 @@ public Mono<Void> deleteAll(Iterable<? extends T> entities) {

Assert.notNull(entities, "The given Iterable of entities must not be null");

Collection<?> idCollection = StreamUtils.createStreamFromIterator(entities.iterator()).map(entityInformation::getId)
.collect(Collectors.toList());
Collection<? extends ID> ids = StreamUtils.createStreamFromIterator(entities.iterator())
.map(entityInformation::getId).collect(Collectors.toList());

Criteria idsInCriteria = where(entityInformation.getIdAttribute()).in(idCollection);

Query query = new Query(idsInCriteria);
getReadPreference().ifPresent(query::withReadPreference);
return mongoOperations.remove(query, entityInformation.getJavaType(), entityInformation.getCollectionName()).then();
return deleteAllById(ids);
}

@Override
Expand Down Expand Up @@ -336,8 +328,11 @@ public <S extends T> Flux<S> insert(Iterable<S> entities) {

Assert.notNull(entities, "The given Iterable of entities must not be null");

Collection<S> source = toCollection(entities);
return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName());
return insert(toCollection(entities));
}

private <S extends T> Flux<S> insert(Collection<S> entities) {
return entities.isEmpty() ? Flux.empty() : mongoOperations.insert(entities, entityInformation.getCollectionName());
}

@Override
Expand Down Expand Up @@ -440,6 +435,12 @@ void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) {
this.crudMethodMetadata = crudMethodMetadata;
}

private Flux<T> findAll(Query query) {

getReadPreference().ifPresent(query::withReadPreference);
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
}

private Optional<ReadPreference> getReadPreference() {

if (crudMethodMetadata == null) {
Expand All @@ -461,15 +462,61 @@ private Query getIdQuery(Iterable<? extends ID> ids) {
return new Query(where(entityInformation.getIdAttribute()).in(toCollection(ids)));
}

private static <E> Collection<E> toCollection(Iterable<E> ids) {
return ids instanceof Collection<E> collection ? collection
: StreamUtils.createStreamFromIterator(ids.iterator()).collect(Collectors.toList());
/**
* Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single
* Flux. The operation does not allow interleave between performing the map operation for the first and second source
* element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be
* subscribed to eagerly emitting elements in order of their source.
*
* <pre class="code">
* Flux.just(first-element).flatMap(...)
* .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
* </pre>
*
* @param source the collection of elements to transform.
* @param mapper the transformation {@link Function}. Must not be {@literal null}.
* @return never {@literal null}.
* @param <T> source type
*/
static <T> Flux<T> concatMapSequentially(List<T> source,
Function<? super T, ? extends Publisher<? extends T>> mapper) {

if (source.isEmpty()) {
return Flux.empty();
}
if (source.size() == 1) {
return Flux.just(source.iterator().next()).flatMap(mapper);
}
if (source.size() == 2) {
return Flux.fromIterable(source).concatMap(mapper);
}

Flux<T> first = Flux.just(source.get(0)).flatMap(mapper);
Flux<T> theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
return first.concatWith(theRest);
}

private Flux<T> findAll(Query query) {
static <T> Flux<T> concatMapSequentially(Publisher<T> publisher,
Function<? super T, ? extends Publisher<? extends T>> mapper) {

getReadPreference().ifPresent(query::withReadPreference);
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
return Flux.from(publisher).switchOnFirst(((signal, source) -> {

if (!signal.hasValue()) {
return source.concatMap(mapper);
}

Mono<T> firstCall = Mono.from(mapper.apply(signal.get()));
return firstCall.concatWith(source.skip(1).flatMapSequential(mapper));
}));
}

private static <E> List<E> toList(Iterable<E> source) {
return source instanceof List<E> list ? list : new ArrayList<>(toCollection(source));
}

private static <E> Collection<E> toCollection(Iterable<E> source) {
return source instanceof Collection<E> collection ? collection
: StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ public Flux<Void> saveWithErrorLogs(Person person) {
TransactionalOperator transactionalOperator = TransactionalOperator.create(manager,
new DefaultTransactionDefinition());

return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
return Flux.concat(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
operations.save(new EventLog(new ObjectId(), "afterConvert")), //
operations.save(new EventLog(new ObjectId(), "beforeInsert")), //
operations.save(person), //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.BeansException;
Expand All @@ -40,6 +42,7 @@
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory;
import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository;
Expand All @@ -48,6 +51,8 @@
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.util.ClassUtils;

/**
Expand Down Expand Up @@ -333,6 +338,28 @@ void savePublisherOfEntitiesShouldInsertEntity() {
assertThat(boyd.getId()).isNotNull();
}

@RepeatedTest(10) // GH-4838
void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {

ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia));
}).as(StepVerifier::create) //
.expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia).verifyComplete();
}

@RepeatedTest(10) // GH-4838
void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() {

ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
Flux<ReactivePerson> personFlux = Flux.fromStream(Stream.of(oliver, dave, carter, boyd, stefan, leroi, alicia));

TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
return repository.saveAll(personFlux);
}).as(StepVerifier::create) //
.expectNextCount(7).verifyComplete();
}

@Test // GH-3609
void savePublisherOfImmutableEntitiesShouldInsertEntity() {

Expand All @@ -342,7 +369,7 @@ void savePublisherOfImmutableEntitiesShouldInsertEntity() {
.consumeNextWith(actual -> {
assertThat(actual.id).isNotNull();
}) //
.verifyComplete();
.verifyComplete();
}

@Test // DATAMONGO-1444
Expand Down