collection ? collection
- : StreamUtils.createStreamFromIterator(ids.iterator()).collect(Collectors.toList());
+ /**
+ * Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single
+ * Flux. The operation does not allow interleave between performing the map operation for the first and second source
+ * element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be
+ * subscribed to eagerly emitting elements in order of their source.
+ *
+ *
+ * Flux.just(first-element).flatMap(...)
+ * .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
+ *
+ *
+ * @param source the collection of elements to transform.
+ * @param mapper the transformation {@link Function}. Must not be {@literal null}.
+ * @return never {@literal null}.
+ * @param source type
+ */
+ static Flux concatMapSequentially(List source,
+ Function super T, ? extends Publisher extends T>> mapper) {
+
+ if (source.isEmpty()) {
+ return Flux.empty();
+ }
+ if (source.size() == 1) {
+ return Flux.just(source.iterator().next()).flatMap(mapper);
+ }
+ if (source.size() == 2) {
+ return Flux.fromIterable(source).concatMap(mapper);
+ }
+
+ Flux first = Flux.just(source.get(0)).flatMap(mapper);
+ Flux theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper);
+ return first.concatWith(theRest);
}
- private Flux findAll(Query query) {
+ static Flux concatMapSequentially(Publisher publisher,
+ Function super T, ? extends Publisher extends T>> mapper) {
- getReadPreference().ifPresent(query::withReadPreference);
- return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
+ return Flux.from(publisher).switchOnFirst(((signal, source) -> {
+
+ if (!signal.hasValue()) {
+ return source.concatMap(mapper);
+ }
+
+ Mono firstCall = Mono.from(mapper.apply(signal.get()));
+ return firstCall.concatWith(source.skip(1).flatMapSequential(mapper));
+ }));
+ }
+
+ private static List toList(Iterable source) {
+ return source instanceof List list ? list : new ArrayList<>(toCollection(source));
+ }
+
+ private static Collection toCollection(Iterable source) {
+ return source instanceof Collection collection ? collection
+ : StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList());
}
/**
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
index 88157024ba..4981c3480b 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
@@ -467,7 +467,7 @@ public Flux saveWithErrorLogs(Person person) {
TransactionalOperator transactionalOperator = TransactionalOperator.create(manager,
new DefaultTransactionDefinition());
- return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
+ return Flux.concat(operations.save(new EventLog(new ObjectId(), "beforeConvert")), //
operations.save(new EventLog(new ObjectId(), "afterConvert")), //
operations.save(new EventLog(new ObjectId(), "beforeInsert")), //
operations.save(person), //
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java
index f8ab0f1563..c4a8c58e4b 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java
@@ -24,8 +24,10 @@
import java.util.Arrays;
import java.util.Objects;
+import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.BeansException;
@@ -40,6 +42,7 @@
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.domain.Sort.Order;
+import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory;
import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository;
@@ -48,6 +51,8 @@
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.util.ClassUtils;
/**
@@ -333,6 +338,28 @@ void savePublisherOfEntitiesShouldInsertEntity() {
assertThat(boyd.getId()).isNotNull();
}
+ @RepeatedTest(10) // GH-4838
+ void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {
+
+ ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
+ TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
+ return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia));
+ }).as(StepVerifier::create) //
+ .expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia).verifyComplete();
+ }
+
+ @RepeatedTest(10) // GH-4838
+ void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() {
+
+ ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
+ Flux personFlux = Flux.fromStream(Stream.of(oliver, dave, carter, boyd, stefan, leroi, alicia));
+
+ TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
+ return repository.saveAll(personFlux);
+ }).as(StepVerifier::create) //
+ .expectNextCount(7).verifyComplete();
+ }
+
@Test // GH-3609
void savePublisherOfImmutableEntitiesShouldInsertEntity() {
@@ -342,7 +369,7 @@ void savePublisherOfImmutableEntitiesShouldInsertEntity() {
.consumeNextWith(actual -> {
assertThat(actual.id).isNotNull();
}) //
- .verifyComplete();
+ .verifyComplete();
}
@Test // DATAMONGO-1444