Skip to content

Commit 723db60

Browse files
christophstroblmp911de
authored andcommitted
Retain order doing reactive save operations with multiple elements.
Ensure subscription order on multi document operations. Original pull request: #4824 Closes #4804
1 parent eeab992 commit 723db60

File tree

2 files changed

+7
-9
lines changed

2 files changed

+7
-9
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1413,7 +1413,7 @@ protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWrite
14131413
});
14141414

14151415
return Flux.fromIterable(elementsByCollection.keySet())
1416-
.flatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
1416+
.concatMap(collectionName -> doInsertBatch(collectionName, elementsByCollection.get(collectionName), writer));
14171417
}
14181418

14191419
protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends T> batchToSave,

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,16 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
113113
Streamable<S> source = Streamable.of(entities);
114114

115115
return source.stream().allMatch(entityInformation::isNew) ? //
116-
mongoOperations.insert(source.stream().collect(Collectors.toList()), entityInformation.getCollectionName()) : //
117-
Flux.fromIterable(entities).flatMap(this::save);
116+
insert(entities) :
117+
Flux.fromIterable(entities).concatMap(this::save);
118118
}
119119

120120
@Override
121121
public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
122122

123123
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
124124

125-
return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
125+
return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? //
126126
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
127127
mongoOperations.save(entity, entityInformation.getCollectionName()));
128128
}
@@ -296,7 +296,7 @@ public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
296296
Optional<ReadPreference> readPreference = getReadPreference();
297297
return Flux.from(entityStream)//
298298
.map(entityInformation::getRequiredId)//
299-
.flatMap(id -> deleteById(id, readPreference))//
299+
.concatMap(id -> deleteById(id, readPreference))//
300300
.then();
301301
}
302302

@@ -337,17 +337,15 @@ public <S extends T> Flux<S> insert(Iterable<S> entities) {
337337
Assert.notNull(entities, "The given Iterable of entities must not be null");
338338

339339
Collection<S> source = toCollection(entities);
340-
341-
return source.isEmpty() ? Flux.empty() : mongoOperations.insertAll(source);
340+
return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName());
342341
}
343342

344343
@Override
345344
public <S extends T> Flux<S> insert(Publisher<S> entities) {
346345

347346
Assert.notNull(entities, "The given Publisher of entities must not be null");
348347

349-
return Flux.from(entities)
350-
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
348+
return Flux.from(entities).concatMap(this::insert);
351349
}
352350

353351
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)