diff --git a/pom.xml b/pom.xml index c3245aad49..83290a2d49 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4838-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 58c63dfc97..5373fa0ec5 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4838-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 98516a5ba9..681527c04a 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4838-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index cfcc5cb88b..a134498f35 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -21,6 +21,7 @@ import reactor.core.publisher.Mono; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -47,7 +48,6 @@ import org.springframework.data.mongodb.repository.query.MongoEntityInformation; import org.springframework.data.repository.query.FluentQuery; import org.springframework.data.util.StreamUtils; -import org.springframework.data.util.Streamable; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -110,21 +110,17 @@ public Flux saveAll(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); - Streamable source = Streamable.of(entities); - + List source = toList(entities); return source.stream().allMatch(entityInformation::isNew) ? // - insert(entities) : - Flux.fromIterable(entities).concatMap(this::save); + insert(source) : concatMapSequentially(source, this::save); } @Override - public Flux saveAll(Publisher entityStream) { + public Flux saveAll(Publisher publisher) { - Assert.notNull(entityStream, "The given Publisher of entities must not be null"); + Assert.notNull(publisher, "The given Publisher of entities must not be null"); - return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? // - mongoOperations.insert(entity, entityInformation.getCollectionName()) : // - mongoOperations.save(entity, entityInformation.getCollectionName())); + return concatMapSequentially(publisher, this::save); } @Override @@ -278,14 +274,10 @@ public Mono deleteAll(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); - Collection idCollection = StreamUtils.createStreamFromIterator(entities.iterator()).map(entityInformation::getId) - .collect(Collectors.toList()); + Collection ids = StreamUtils.createStreamFromIterator(entities.iterator()) + .map(entityInformation::getId).collect(Collectors.toList()); - Criteria idsInCriteria = where(entityInformation.getIdAttribute()).in(idCollection); - - Query query = new Query(idsInCriteria); - getReadPreference().ifPresent(query::withReadPreference); - return mongoOperations.remove(query, entityInformation.getJavaType(), entityInformation.getCollectionName()).then(); + return deleteAllById(ids); } @Override @@ -336,8 +328,11 @@ public Flux insert(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); - Collection source = toCollection(entities); - return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName()); + return insert(toCollection(entities)); + } + + private Flux insert(Collection entities) { + return entities.isEmpty() ? Flux.empty() : mongoOperations.insert(entities, entityInformation.getCollectionName()); } @Override @@ -440,6 +435,12 @@ void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) { this.crudMethodMetadata = crudMethodMetadata; } + private Flux findAll(Query query) { + + getReadPreference().ifPresent(query::withReadPreference); + return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName()); + } + private Optional getReadPreference() { if (crudMethodMetadata == null) { @@ -461,15 +462,61 @@ private Query getIdQuery(Iterable ids) { return new Query(where(entityInformation.getIdAttribute()).in(toCollection(ids))); } - private static Collection toCollection(Iterable ids) { - return ids instanceof Collection collection ? collection - : StreamUtils.createStreamFromIterator(ids.iterator()).collect(Collectors.toList()); + /** + * Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single + * Flux. The operation does not allow interleave between performing the map operation for the first and second source + * element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be + * subscribed to eagerly emitting elements in order of their source. + * + *
+	 * Flux.just(first-element).flatMap(...)
+	 *     .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
+	 * 
+ * + * @param source the collection of elements to transform. + * @param mapper the transformation {@link Function}. Must not be {@literal null}. + * @return never {@literal null}. + * @param source type + */ + static Flux concatMapSequentially(List source, + Function> mapper) { + + if (source.isEmpty()) { + return Flux.empty(); + } + if (source.size() == 1) { + return Flux.just(source.iterator().next()).flatMap(mapper); + } + if (source.size() == 2) { + return Flux.fromIterable(source).concatMap(mapper); + } + + Flux first = Flux.just(source.get(0)).flatMap(mapper); + Flux theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper); + return first.concatWith(theRest); } - private Flux findAll(Query query) { + static Flux concatMapSequentially(Publisher publisher, + Function> mapper) { - getReadPreference().ifPresent(query::withReadPreference); - return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName()); + return Flux.from(publisher).switchOnFirst(((signal, source) -> { + + if (!signal.hasValue()) { + return source.concatMap(mapper); + } + + Mono firstCall = Mono.from(mapper.apply(signal.get())); + return firstCall.concatWith(source.skip(1).flatMapSequential(mapper)); + })); + } + + private static List toList(Iterable source) { + return source instanceof List list ? list : new ArrayList<>(toCollection(source)); + } + + private static Collection toCollection(Iterable source) { + return source instanceof Collection collection ? collection + : StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList()); } /** diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java index 88157024ba..4981c3480b 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java @@ -467,7 +467,7 @@ public Flux saveWithErrorLogs(Person person) { TransactionalOperator transactionalOperator = TransactionalOperator.create(manager, new DefaultTransactionDefinition()); - return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // + return Flux.concat(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // operations.save(new EventLog(new ObjectId(), "afterConvert")), // operations.save(new EventLog(new ObjectId(), "beforeInsert")), // operations.save(person), // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java index f8ab0f1563..c4a8c58e4b 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java @@ -24,8 +24,10 @@ import java.util.Arrays; import java.util.Objects; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.BeansException; @@ -40,6 +42,7 @@ import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort.Direction; import org.springframework.data.domain.Sort.Order; +import org.springframework.data.mongodb.ReactiveMongoTransactionManager; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory; import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository; @@ -48,6 +51,8 @@ import org.springframework.lang.Nullable; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.reactive.TransactionalOperator; import org.springframework.util.ClassUtils; /** @@ -333,6 +338,28 @@ void savePublisherOfEntitiesShouldInsertEntity() { assertThat(boyd.getId()).isNotNull(); } + @RepeatedTest(10) // GH-4838 + void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() { + + ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory()); + TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> { + return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia)); + }).as(StepVerifier::create) // + .expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia).verifyComplete(); + } + + @RepeatedTest(10) // GH-4838 + void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() { + + ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory()); + Flux personFlux = Flux.fromStream(Stream.of(oliver, dave, carter, boyd, stefan, leroi, alicia)); + + TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> { + return repository.saveAll(personFlux); + }).as(StepVerifier::create) // + .expectNextCount(7).verifyComplete(); + } + @Test // GH-3609 void savePublisherOfImmutableEntitiesShouldInsertEntity() { @@ -342,7 +369,7 @@ void savePublisherOfImmutableEntitiesShouldInsertEntity() { .consumeNextWith(actual -> { assertThat(actual.id).isNotNull(); }) // - .verifyComplete(); + .verifyComplete(); } @Test // DATAMONGO-1444