Skip to content

Commit 797dbb5

Browse files
authored
Improve save(Flux<T>) implementations.
Original Pull Request #2497 Closes #2496 Closes #2492
1 parent a7d6b9d commit 797dbb5

File tree

7 files changed

+176
-11
lines changed

7 files changed

+176
-11
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717

1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
20+
import reactor.core.publisher.Sinks;
2021
import reactor.util.function.Tuple2;
2122

2223
import java.time.Duration;
2324
import java.util.Collection;
2425
import java.util.List;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2527
import java.util.stream.Collectors;
2628

29+
import org.reactivestreams.Subscriber;
30+
import org.reactivestreams.Subscription;
2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
2933
import org.springframework.beans.BeansException;
@@ -207,6 +211,61 @@ public <T> Mono<T> save(T entity) {
207211
return save(entity, getIndexCoordinatesFor(entity.getClass()));
208212
}
209213

214+
@Override
215+
public <T> Flux<T> save(Flux<T> entities, Class<?> clazz, int bulkSize) {
216+
return save(entities, getIndexCoordinatesFor(clazz), bulkSize);
217+
}
218+
219+
@Override
220+
public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize) {
221+
222+
Assert.notNull(entities, "entities must not be null");
223+
Assert.notNull(index, "index must not be null");
224+
Assert.isTrue(bulkSize > 0, "bulkSize must be greater than 0");
225+
226+
return Flux.defer(() -> {
227+
Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
228+
entities //
229+
.bufferTimeout(bulkSize, Duration.ofMillis(200)) //
230+
.subscribe(new Subscriber<List<T>>() {
231+
private Subscription subscription;
232+
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);
233+
234+
@Override
235+
public void onSubscribe(Subscription subscription) {
236+
this.subscription = subscription;
237+
subscription.request(1);
238+
}
239+
240+
@Override
241+
public void onNext(List<T> entityList) {
242+
saveAll(entityList, index) //
243+
.map(sink::tryEmitNext) //
244+
.doOnComplete(() -> {
245+
if (!upstreamComplete.get()) {
246+
subscription.request(1);
247+
} else {
248+
sink.tryEmitComplete();
249+
}
250+
}).subscribe();
251+
}
252+
253+
@Override
254+
public void onError(Throwable throwable) {
255+
subscription.cancel();
256+
sink.tryEmitError(throwable);
257+
}
258+
259+
@Override
260+
public void onComplete() {
261+
upstreamComplete.set(true);
262+
}
263+
});
264+
return sink.asFlux();
265+
});
266+
267+
}
268+
210269
@Override
211270
public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, Class<T> clazz) {
212271
return saveAll(entities, getIndexCoordinatesFor(clazz));

src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
* @since 4.0
4545
*/
4646
public interface ReactiveDocumentOperations {
47+
48+
int FLUX_SAVE_BULK_SIZE = 500;
49+
4750
/**
4851
* Index the given entity, once available, extracting index from entity metadata.
4952
*
@@ -93,6 +96,61 @@ default <T> Mono<T> save(Mono<? extends T> entityPublisher, IndexCoordinates ind
9396
*/
9497
<T> Mono<T> save(T entity, IndexCoordinates index);
9598

99+
/**
100+
* Indexes the entities into the index extracted from entity metadata.
101+
*
102+
* @param entities
103+
* @param clazz the class to get the index name from
104+
* @param <T> entity type
105+
* @return a Flux emitting the saved entities
106+
* @since 5.1
107+
*/
108+
default <T> Flux<T> save(Flux<T> entities, Class<?> clazz) {
109+
return save(entities, clazz, FLUX_SAVE_BULK_SIZE);
110+
}
111+
112+
/**
113+
* Indexes the entities into the index extracted from entity metadata. The entities are collected into batches of
114+
* {bulkSize} with a maximal timeout of 200 ms, see
115+
* {@link reactor.core.publisher.Flux#bufferTimeout(int, java.time .Duration)} and then sent in a bulk operation to
116+
* Elasticsearch.
117+
*
118+
* @param entities
119+
* @param clazz the class to get the index name from
120+
* @param bulkSize number of entities to put in a bulk request
121+
* @param <T> entity type
122+
* @return a Flux emitting the saved entities
123+
* @since 5.1
124+
*/
125+
<T> Flux<T> save(Flux<T> entities, Class<?> clazz, int bulkSize);
126+
127+
/**
128+
* Indexes the entities into the given index.
129+
*
130+
* @param entities the entities to save
131+
* @param index the index to save to
132+
* @param <T> entity type
133+
* @return a Flux emitting the saved entities
134+
* @since 5.1
135+
*/
136+
default <T> Flux<T> save(Flux<T> entities, IndexCoordinates index) {
137+
return save(entities, index, FLUX_SAVE_BULK_SIZE);
138+
}
139+
140+
/**
141+
* Indexes the entities into the given index. The entities are collected into batches of {bulkSize} with a maximal
142+
* timeout of 200 ms, see {@link reactor.core.publisher.Flux#bufferTimeout(int, java.time * .Duration)} and then sent
143+
* in a bulk operation to Elasticsearch.
144+
*
145+
* @param entities the entities to save
146+
* @param index the index to save to
147+
* @param bulkSize number of entities to put in a bulk request
148+
* @param <T> entity type
149+
* @return a Flux emitting the saved entities
150+
* @since 5.1
151+
*/
152+
<T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize);
153+
96154
/**
97155
* Index entities the index extracted from entity metadata.
98156
*

src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package org.springframework.data.elasticsearch.repository.support;
1717

18-
import org.springframework.data.elasticsearch.core.query.BaseQuery;
1918
import reactor.core.publisher.Flux;
2019
import reactor.core.publisher.Mono;
2120

@@ -30,6 +29,7 @@
3029
import org.springframework.data.elasticsearch.core.SearchHit;
3130
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
3231
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
32+
import org.springframework.data.elasticsearch.core.query.BaseQuery;
3333
import org.springframework.data.elasticsearch.core.query.Query;
3434
import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository;
3535
import org.springframework.util.Assert;
@@ -97,7 +97,7 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
9797

9898
Assert.notNull(entityStream, "EntityStream must not be null!");
9999

100-
return operations.saveAll(Flux.from(entityStream).collectList(), entityInformation.getIndexCoordinates())
100+
return operations.save(Flux.from(entityStream), entityInformation.getIndexCoordinates())
101101
.concatWith(doRefresh().then(Mono.empty()));
102102
}
103103

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import static org.elasticsearch.index.query.QueryBuilders.*;
2121
import static org.springframework.data.elasticsearch.annotations.FieldType.*;
2222

23-
import org.springframework.data.elasticsearch.annotations.IndexedIndexName;
23+
import reactor.core.publisher.Flux;
2424
import reactor.core.publisher.Mono;
2525
import reactor.test.StepVerifier;
2626

@@ -61,6 +61,7 @@
6161
import org.springframework.data.elasticsearch.annotations.Document;
6262
import org.springframework.data.elasticsearch.annotations.Field;
6363
import org.springframework.data.elasticsearch.annotations.FieldType;
64+
import org.springframework.data.elasticsearch.annotations.IndexedIndexName;
6465
import org.springframework.data.elasticsearch.annotations.Mapping;
6566
import org.springframework.data.elasticsearch.annotations.Setting;
6667
import org.springframework.data.elasticsearch.annotations.WriteOnlyProperty;
@@ -170,7 +171,6 @@ void shouldSetIndexedIndexNameProperty() {
170171
assertThat(saved.getIndexedIndexName()).isEqualTo(indexNameProvider.indexName() + "-indexedindexname");
171172
}
172173

173-
174174
private Mono<Boolean> documentWithIdExistsInIndex(String id, String index) {
175175
return operations.exists(id, IndexCoordinates.of(index));
176176
}
@@ -1180,6 +1180,25 @@ void shouldWorkWithReadonlyId() {
11801180
assertThat(readEntity.getPart2()).isEqualTo(entity.getPart2()); //
11811181
}).verifyComplete();
11821182
}
1183+
1184+
@Test // #2496
1185+
@DisplayName("should save data from Flux and return saved data in a flux")
1186+
void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
1187+
1188+
var count = 12_345;
1189+
var entityList = IntStream.rangeClosed(1, count)//
1190+
.mapToObj(SampleEntity::of) //
1191+
.collect(Collectors.toList());
1192+
1193+
var entityFlux = Flux.fromIterable(entityList);
1194+
1195+
operations.save(entityFlux, SampleEntity.class).collectList() //
1196+
.as(StepVerifier::create) //
1197+
.consumeNextWith(savedEntities -> {
1198+
assertThat(savedEntities).isEqualTo(entityList);
1199+
}) //
1200+
.verifyComplete();
1201+
}
11831202
// endregion
11841203

11851204
// region Helper functions
@@ -1262,6 +1281,13 @@ static class SampleEntity {
12621281
@Nullable
12631282
@Version private Long version;
12641283

1284+
static SampleEntity of(int id) {
1285+
var entity = new SampleEntity();
1286+
entity.setId("" + id);
1287+
entity.setMessage(" message " + id);
1288+
return entity;
1289+
}
1290+
12651291
@Nullable
12661292
public String getId() {
12671293
return id;
@@ -1543,15 +1569,15 @@ public void setPart2(String part2) {
15431569
this.part2 = part2;
15441570
}
15451571
}
1572+
15461573
@Document(indexName = "#{@indexNameProvider.indexName()}-indexedindexname")
15471574
private static class IndexedIndexNameEntity {
15481575
@Nullable
15491576
@Id private String id;
15501577
@Nullable
15511578
@Field(type = Text) private String someText;
15521579
@Nullable
1553-
@IndexedIndexName
1554-
private String indexedIndexName;
1580+
@IndexedIndexName private String indexedIndexName;
15551581

15561582
@Nullable
15571583
public String getId() {
@@ -1579,5 +1605,6 @@ public String getIndexedIndexName() {
15791605
public void setIndexedIndexName(@Nullable String indexedIndexName) {
15801606
this.indexedIndexName = indexedIndexName;
15811607
}
1582-
} // endregion
1608+
}
1609+
// endregion
15831610
}

src/test/java/org/springframework/data/elasticsearch/core/suggest/CompletionIntegrationTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ private void loadAnnotatedCompletionObjectEntitiesWithWeights() {
112112
operations.bulkIndex(indexQueries, AnnotatedCompletionEntity.class);
113113
}
114114

115-
// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150")
116115
@Test
117116
public void shouldFindSuggestionsForGivenCriteriaQueryUsingCompletionEntity() {
118117

@@ -144,7 +143,6 @@ void shouldRetrieveEntityWithCompletion() {
144143
operations.get("1", CompletionEntity.class);
145144
}
146145

147-
// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150")
148146
@Test
149147
public void shouldFindSuggestionsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {
150148

@@ -168,7 +166,6 @@ public void shouldFindSuggestionsForGivenCriteriaQueryUsingAnnotatedCompletionEn
168166
assertThat(options.get(1).getText()).isIn("Marchand", "Mohsin");
169167
}
170168

171-
// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES 1issue 150")
172169
@Test
173170
public void shouldFindSuggestionsWithWeightsForGivenCriteriaQueryUsingAnnotatedCompletionEntity() {
174171

src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveSuggestIntegrationTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ void cleanup() {
6666
operations.indexOps(IndexCoordinates.of(indexNameProvider.getPrefix() + "*")).delete().block();
6767
}
6868

69-
// @DisabledIf(value = "newElasticsearchClient", disabledReason = "todo #2139, ES issue 150")
7069
@Test // #1302
7170
@DisplayName("should find suggestions for given prefix completion")
7271
void shouldFindSuggestionsForGivenPrefixCompletion() {

src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryIntegrationTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,24 @@ void shouldUseSourceExcludesFromParameter() {
739739
.verifyComplete();
740740
}
741741

742+
@Test // #2496
743+
@DisplayName("should save data from Flux and return saved data in a flux")
744+
void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
745+
var count = 12_345;
746+
var entityList = IntStream.rangeClosed(1, count)//
747+
.mapToObj(SampleEntity::of) //
748+
.collect(Collectors.toList());
749+
750+
var entityFlux = Flux.fromIterable(entityList);
751+
752+
repository.saveAll(entityFlux).collectList() //
753+
.as(StepVerifier::create) //
754+
.consumeNextWith(savedEntities -> {
755+
assertThat(savedEntities).isEqualTo(entityList);
756+
}) //
757+
.verifyComplete();
758+
}
759+
742760
Mono<Void> bulkIndex(SampleEntity... entities) {
743761
return operations.saveAll(Arrays.asList(entities), IndexCoordinates.of(indexNameProvider.indexName())).then();
744762
}
@@ -829,6 +847,13 @@ static class SampleEntity {
829847
@Field(name = "custom_field_name", type = FieldType.Text)
830848
@Nullable private String customFieldNameMessage;
831849

850+
static SampleEntity of(int id) {
851+
var entity = new SampleEntity();
852+
entity.setId("" + id);
853+
entity.setMessage(" message " + id);
854+
return entity;
855+
}
856+
832857
public SampleEntity() {}
833858

834859
public SampleEntity(@Nullable String id) {

0 commit comments

Comments
 (0)