Skip to content

Commit d0ee280

Browse files
christophstroblmp911de
authored andcommitted
Retain order doing reactive save operations with multiple elements.
Ensure subscription order on multi document operations. Original pull request: #4824 Closes #4804
1 parent 6200440 commit d0ee280

File tree

2 files changed

+7
-9
lines changed

2 files changed

+7
-9
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1413,7 +1413,7 @@ protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWrite
14131413
});
14141414

14151415
return Flux.fromIterable(elementsByCollection.keySet())
1416-
.flatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
1416+
.concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
14171417
}
14181418

14191419
protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,16 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
112112
Streamable<S> source = Streamable.of(entities);
113113

114114
return source.stream().allMatch(entityInformation::isNew) ? //
115-
mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : //
116-
Flux.fromIterable(entities).flatMap(this::save);
115+
insert(entities) :
116+
Flux.fromIterable(entities).concatMap(this::save);
117117
}
118118

119119
@Override
120120
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
121121

122122
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
123123

124-
return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
124+
return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? //
125125
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
126126
mongoOperations.save(entity, entityInformation.getCollectionName()));
127127
}
@@ -295,7 +295,7 @@ public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
295295
Optional<ReadPreference> readPreference = getReadPreference();
296296
return Flux.from(entityStream)//
297297
.map(entityInformation::getRequiredId)//
298-
.flatMap(id -> deleteById(id, readPreference))//
298+
.concatMap(id -> deleteById(id, readPreference))//
299299
.then();
300300
}
301301

@@ -336,17 +336,15 @@ public <S extends T> Flux<S> insert(Iterable<S> entities) {
336336
Assert.notNull(entities, "The given Iterable of entities must not be null");
337337

338338
Collection<S> source = toCollection(entities);
339-
340-
return source.isEmpty() ? Flux.empty() : mongoOperations.insertAll(source);
339+
return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName());
341340
}
342341

343342
@Override
344343
public <S extends T> Flux<S> insert(Publisher<S> entities) {
345344

346345
Assert.notNull(entities, "The given Publisher of entities must not be null");
347346

348-
return Flux.from(entities)
349-
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
347+
return Flux.from(entities).concatMap(this::insert);
350348
}
351349

352350
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)