Skip to content

Commit a07ac3c

Browse files
authored
Fix code not terminating on repository saving an empty flux.
Original Pull Request #3099 Closes: #3039 Signed-off-by: Peter-Josef Meisch <[email protected]>
1 parent 9d025dd commit a07ac3c

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

+6
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize)
233233
.subscribe(new Subscriber<>() {
234234
@Nullable private Subscription subscription = null;
235235
private final AtomicBoolean upstreamComplete = new AtomicBoolean(false);
236+
private final AtomicBoolean onNextHasBeenCalled = new AtomicBoolean(false);
236237

237238
@Override
238239
public void onSubscribe(Subscription subscription) {
@@ -242,6 +243,7 @@ public void onSubscribe(Subscription subscription) {
242243

243244
@Override
244245
public void onNext(List<T> entityList) {
246+
onNextHasBeenCalled.set(true);
245247
saveAll(entityList, index)
246248
.map(sink::tryEmitNext)
247249
.doOnComplete(() -> {
@@ -267,6 +269,10 @@ public void onError(Throwable throwable) {
267269
@Override
268270
public void onComplete() {
269271
upstreamComplete.set(true);
272+
if (!onNextHasBeenCalled.get()) {
273+
// this happens when an empty flux is saved
274+
sink.tryEmitComplete();
275+
}
270276
}
271277
});
272278
return sink.asFlux();

src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java

+20
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,26 @@ private Mono<Boolean> documentWithIdExistsInIndex(String id) {
105105
return operations.exists(id, IndexCoordinates.of(indexNameProvider.indexName()));
106106
}
107107

108+
@Test // #3093
109+
@DisplayName("should save all from empty collection")
110+
void shouldSaveAllFromEmptyCollection() {
111+
112+
repository.saveAll(Collections.emptyList())
113+
.as(StepVerifier::create)
114+
.expectNextCount(0)
115+
.verifyComplete();
116+
}
117+
118+
@Test // #3093
119+
@DisplayName("should save all from empty flux")
120+
void shouldSaveAllFromEmptyFlux() {
121+
122+
repository.saveAll(Flux.empty())
123+
.as(StepVerifier::create)
124+
.expectNextCount(0)
125+
.verifyComplete();
126+
}
127+
108128
@Test // DATAES-519
109129
void saveShouldComputeMultipleEntities() {
110130

0 commit comments

Comments
 (0)