Skip to content

Commit aacf37e

Browse files
hacking
1 parent f88d86a commit aacf37e

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
113113
Streamable<S> source = Streamable.of(entities);
114114

115115
return source.stream().allMatch(entityInformation::isNew) ? //
116-
insert(entities) :
117-
Flux.fromIterable(entities).concatMap(this::save);
116+
insert(entities) : doItSomewhatSequentially(source, this::save);
118117
}
119118

120119
@Override
@@ -127,6 +126,20 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
127126
mongoOperations.save(entity, entityInformation.getCollectionName()));
128127
}
129128

129+
static <T> Flux<T> doItSomewhatSequentially/* how should we actually call this? */(Streamable<T> ts, Function<? super T, ? extends Publisher<? extends T>> mapper) {
130+
131+
List<T> list = ts.stream().toList();
132+
if (list.size() == 1) {
133+
return Flux.just(list.iterator().next()).flatMap(mapper);
134+
} else if (list.size() == 2) {
135+
return Flux.fromIterable(list).concatMap(mapper);
136+
}
137+
138+
Flux<T> first = Flux.just(list.get(0)).flatMap(mapper);
139+
Flux<T> theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper);
140+
return first.concatWith(theRest);
141+
}
142+
130143
@Override
131144
public Mono<T> findById(ID id) {
132145

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java

+16
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
import static org.assertj.core.api.Assertions.*;
1919
import static org.springframework.data.domain.ExampleMatcher.*;
2020

21+
import org.junit.jupiter.api.RepeatedTest;
22+
import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
23+
import org.springframework.transaction.TransactionDefinition;
24+
import org.springframework.transaction.reactive.TransactionalOperator;
2125
import reactor.core.publisher.Flux;
2226
import reactor.core.publisher.Mono;
2327
import reactor.test.StepVerifier;
@@ -333,6 +337,18 @@ void savePublisherOfEntitiesShouldInsertEntity() {
333337
assertThat(boyd.getId()).isNotNull();
334338
}
335339

340+
@RepeatedTest(10)
341+
void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() {
342+
343+
ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory());
344+
TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> {
345+
return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia));
346+
})
347+
.as(StepVerifier::create) //
348+
.expectNextCount(7) //
349+
.verifyComplete();
350+
}
351+
336352
@Test // GH-3609
337353
void savePublisherOfImmutableEntitiesShouldInsertEntity() {
338354

0 commit comments

Comments
 (0)