diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index 3fee5f2db..a2449b2a7 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -233,6 +233,7 @@ public Flux save(Flux entities, IndexCoordinates index, int bulkSize) .subscribe(new Subscriber<>() { @Nullable private Subscription subscription = null; private final AtomicBoolean upstreamComplete = new AtomicBoolean(false); + private final AtomicBoolean onNextHasBeenCalled = new AtomicBoolean(false); @Override public void onSubscribe(Subscription subscription) { @@ -242,6 +243,7 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(List entityList) { + onNextHasBeenCalled.set(true); saveAll(entityList, index) .map(sink::tryEmitNext) .doOnComplete(() -> { @@ -267,6 +269,10 @@ public void onError(Throwable throwable) { @Override public void onComplete() { upstreamComplete.set(true); + if (!onNextHasBeenCalled.get()) { + // this happens when an empty flux is saved + sink.tryEmitComplete(); + } } }); return sink.asFlux(); diff --git a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java index c7b77e584..17fddb928 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java @@ -105,6 +105,26 @@ private Mono documentWithIdExistsInIndex(String id) { return operations.exists(id, IndexCoordinates.of(indexNameProvider.indexName())); } + @Test // #3093 + @DisplayName("should save all from empty collection") + void shouldSaveAllFromEmptyCollection() { + + repository.saveAll(Collections.emptyList()) + .as(StepVerifier::create) + .expectNextCount(0) + .verifyComplete(); + } + + @Test // #3093 + @DisplayName("should save all from empty flux") + void shouldSaveAllFromEmptyFlux() { + + repository.saveAll(Flux.empty()) + .as(StepVerifier::create) + .expectNextCount(0) + .verifyComplete(); + } + @Test // DATAES-519 void saveShouldComputeMultipleEntities() {